1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 from typing
import Any
, Dict
, List
24 import logging
.handlers
35 from osm_lcm
import ROclient
36 from osm_lcm
.data_utils
.nsr
import (
39 get_deployed_vca_list
,
42 from osm_lcm
.data_utils
.vca
import (
51 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
52 from osm_lcm
.lcm_utils
import (
59 check_juju_bundle_existence
,
60 get_charm_artifact_path
,
62 from osm_lcm
.data_utils
.nsd
import (
63 get_ns_configuration_relation_list
,
67 from osm_lcm
.data_utils
.vnfd
import (
73 get_ee_sorted_initial_config_primitive_list
,
74 get_ee_sorted_terminate_config_primitive_list
,
76 get_virtual_link_profiles
,
81 get_number_of_instances
,
83 get_kdu_resource_profile
,
84 find_software_version
,
86 from osm_lcm
.data_utils
.list_utils
import find_in_list
87 from osm_lcm
.data_utils
.vnfr
import (
91 get_volumes_from_instantiation_params
,
93 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
94 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
95 from n2vc
.definitions
import RelationEndpoint
96 from n2vc
.k8s_helm_conn
import K8sHelmConnector
97 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
98 from n2vc
.k8s_juju_conn
import K8sJujuConnector
100 from osm_common
.dbbase
import DbException
101 from osm_common
.fsbase
import FsException
103 from osm_lcm
.data_utils
.database
.database
import Database
104 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
105 from osm_lcm
.data_utils
.wim
import (
107 get_target_wim_attrs
,
108 select_feasible_wim_account
,
111 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
112 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
114 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
115 from osm_lcm
.osm_config
import OsmConfigBuilder
116 from osm_lcm
.prometheus
import parse_job
118 from copy
import copy
, deepcopy
119 from time
import time
120 from uuid
import uuid4
122 from random
import randint
124 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
127 class NsLcm(LcmBase
):
128 timeout_vca_on_error
= (
130 ) # Time for charm from first time at blocked,error status to mark as failed
131 timeout_ns_deploy
= 2 * 3600 # default global timeout for deployment a ns
132 timeout_ns_terminate
= 1800 # default global timeout for un deployment a ns
133 timeout_ns_heal
= 1800 # default global timeout for un deployment a ns
134 timeout_charm_delete
= 10 * 60
135 timeout_primitive
= 30 * 60 # timeout for primitive execution
136 timeout_ns_update
= 30 * 60 # timeout for ns update
137 timeout_progress_primitive
= (
139 ) # timeout for some progress in a primitive execution
140 timeout_migrate
= 1800 # default global timeout for migrating vnfs
141 timeout_operate
= 1800 # default global timeout for migrating vnfs
142 timeout_verticalscale
= 1800 # default global timeout for Vertical Sclaing
143 SUBOPERATION_STATUS_NOT_FOUND
= -1
144 SUBOPERATION_STATUS_NEW
= -2
145 SUBOPERATION_STATUS_SKIP
= -3
146 task_name_deploy_vca
= "Deploying VCA"
148 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
150 Init, Connect to database, filesystem storage, and messaging
151 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
154 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
156 self
.db
= Database().instance
.db
157 self
.fs
= Filesystem().instance
.fs
159 self
.lcm_tasks
= lcm_tasks
160 self
.timeout
= config
["timeout"]
161 self
.ro_config
= config
["ro_config"]
162 self
.ng_ro
= config
["ro_config"].get("ng")
163 self
.vca_config
= config
["VCA"].copy()
165 # create N2VC connector
166 self
.n2vc
= N2VCJujuConnector(
169 on_update_db
=self
._on
_update
_n
2vc
_db
,
174 self
.conn_helm_ee
= LCMHelmConn(
177 vca_config
=self
.vca_config
,
178 on_update_db
=self
._on
_update
_n
2vc
_db
,
181 self
.k8sclusterhelm2
= K8sHelmConnector(
182 kubectl_command
=self
.vca_config
.get("kubectlpath"),
183 helm_command
=self
.vca_config
.get("helmpath"),
190 self
.k8sclusterhelm3
= K8sHelm3Connector(
191 kubectl_command
=self
.vca_config
.get("kubectlpath"),
192 helm_command
=self
.vca_config
.get("helm3path"),
199 self
.k8sclusterjuju
= K8sJujuConnector(
200 kubectl_command
=self
.vca_config
.get("kubectlpath"),
201 juju_command
=self
.vca_config
.get("jujupath"),
204 on_update_db
=self
._on
_update
_k
8s
_db
,
209 self
.k8scluster_map
= {
210 "helm-chart": self
.k8sclusterhelm2
,
211 "helm-chart-v3": self
.k8sclusterhelm3
,
212 "chart": self
.k8sclusterhelm3
,
213 "juju-bundle": self
.k8sclusterjuju
,
214 "juju": self
.k8sclusterjuju
,
218 "lxc_proxy_charm": self
.n2vc
,
219 "native_charm": self
.n2vc
,
220 "k8s_proxy_charm": self
.n2vc
,
221 "helm": self
.conn_helm_ee
,
222 "helm-v3": self
.conn_helm_ee
,
226 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
)
228 self
.op_status_map
= {
229 "instantiation": self
.RO
.status
,
230 "termination": self
.RO
.status
,
231 "migrate": self
.RO
.status
,
232 "healing": self
.RO
.recreate_status
,
233 "verticalscale": self
.RO
.status
,
234 "start_stop_rebuild": self
.RO
.status
,
238 def increment_ip_mac(ip_mac
, vm_index
=1):
239 if not isinstance(ip_mac
, str):
242 # try with ipv4 look for last dot
243 i
= ip_mac
.rfind(".")
246 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
247 # try with ipv6 or mac look for last colon. Operate in hex
248 i
= ip_mac
.rfind(":")
251 # format in hex, len can be 2 for mac or 4 for ipv6
252 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
253 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
259 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
260 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
263 # TODO filter RO descriptor fields...
267 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
268 db_dict
["deploymentStatus"] = ro_descriptor
269 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
271 except Exception as e
:
273 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id
, e
)
276 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
277 # remove last dot from path (if exists)
278 if path
.endswith("."):
281 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
282 # .format(table, filter, path, updated_data))
284 nsr_id
= filter.get("_id")
286 # read ns record from database
287 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
288 current_ns_status
= nsr
.get("nsState")
290 # get vca status for NS
291 status_dict
= await self
.n2vc
.get_status(
292 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
297 db_dict
["vcaStatus"] = status_dict
298 await self
.n2vc
.update_vca_status(db_dict
["vcaStatus"], vca_id
=vca_id
)
300 # update configurationStatus for this VCA
302 vca_index
= int(path
[path
.rfind(".") + 1 :])
305 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
307 vca_status
= vca_list
[vca_index
].get("status")
309 configuration_status_list
= nsr
.get("configurationStatus")
310 config_status
= configuration_status_list
[vca_index
].get("status")
312 if config_status
== "BROKEN" and vca_status
!= "failed":
313 db_dict
["configurationStatus"][vca_index
] = "READY"
314 elif config_status
!= "BROKEN" and vca_status
== "failed":
315 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
316 except Exception as e
:
317 # not update configurationStatus
318 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
320 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
321 # if nsState = 'DEGRADED' check if all is OK
323 if current_ns_status
in ("READY", "DEGRADED"):
324 error_description
= ""
326 if status_dict
.get("machines"):
327 for machine_id
in status_dict
.get("machines"):
328 machine
= status_dict
.get("machines").get(machine_id
)
329 # check machine agent-status
330 if machine
.get("agent-status"):
331 s
= machine
.get("agent-status").get("status")
334 error_description
+= (
335 "machine {} agent-status={} ; ".format(
339 # check machine instance status
340 if machine
.get("instance-status"):
341 s
= machine
.get("instance-status").get("status")
344 error_description
+= (
345 "machine {} instance-status={} ; ".format(
350 if status_dict
.get("applications"):
351 for app_id
in status_dict
.get("applications"):
352 app
= status_dict
.get("applications").get(app_id
)
353 # check application status
354 if app
.get("status"):
355 s
= app
.get("status").get("status")
358 error_description
+= (
359 "application {} status={} ; ".format(app_id
, s
)
362 if error_description
:
363 db_dict
["errorDescription"] = error_description
364 if current_ns_status
== "READY" and is_degraded
:
365 db_dict
["nsState"] = "DEGRADED"
366 if current_ns_status
== "DEGRADED" and not is_degraded
:
367 db_dict
["nsState"] = "READY"
370 self
.update_db_2("nsrs", nsr_id
, db_dict
)
372 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
374 except Exception as e
:
375 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
377 async def _on_update_k8s_db(
378 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None, cluster_type
="juju"
381 Updating vca status in NSR record
382 :param cluster_uuid: UUID of a k8s cluster
383 :param kdu_instance: The unique name of the KDU instance
384 :param filter: To get nsr_id
385 :cluster_type: The cluster type (juju, k8s)
389 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
390 # .format(cluster_uuid, kdu_instance, filter))
392 nsr_id
= filter.get("_id")
394 vca_status
= await self
.k8scluster_map
[cluster_type
].status_kdu(
395 cluster_uuid
=cluster_uuid
,
396 kdu_instance
=kdu_instance
,
398 complete_status
=True,
404 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
406 if cluster_type
in ("juju-bundle", "juju"):
407 # TODO -> this should be done in a more uniform way, I think in N2VC, in order to update the K8s VCA
408 # status in a similar way between Juju Bundles and Helm Charts on this side
409 await self
.k8sclusterjuju
.update_vca_status(
410 db_dict
["vcaStatus"],
416 f
"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
420 self
.update_db_2("nsrs", nsr_id
, db_dict
)
421 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
423 except Exception as e
:
424 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
427 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
429 env
= Environment(undefined
=StrictUndefined
, autoescape
=True)
430 template
= env
.from_string(cloud_init_text
)
431 return template
.render(additional_params
or {})
432 except UndefinedError
as e
:
434 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
435 "file, must be provided in the instantiation parameters inside the "
436 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
438 except (TemplateError
, TemplateNotFound
) as e
:
440 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
445 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
446 cloud_init_content
= cloud_init_file
= None
448 if vdu
.get("cloud-init-file"):
449 base_folder
= vnfd
["_admin"]["storage"]
450 if base_folder
["pkg-dir"]:
451 cloud_init_file
= "{}/{}/cloud_init/{}".format(
452 base_folder
["folder"],
453 base_folder
["pkg-dir"],
454 vdu
["cloud-init-file"],
457 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
458 base_folder
["folder"],
459 vdu
["cloud-init-file"],
461 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
462 cloud_init_content
= ci_file
.read()
463 elif vdu
.get("cloud-init"):
464 cloud_init_content
= vdu
["cloud-init"]
466 return cloud_init_content
467 except FsException
as e
:
469 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
470 vnfd
["id"], vdu
["id"], cloud_init_file
, e
474 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
476 (vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]), {}
478 additional_params
= vdur
.get("additionalParams")
479 return parse_yaml_strings(additional_params
)
481 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
483 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
484 :param vnfd: input vnfd
485 :param new_id: overrides vnf id if provided
486 :param additionalParams: Instantiation params for VNFs provided
487 :param nsrId: Id of the NSR
488 :return: copy of vnfd
490 vnfd_RO
= deepcopy(vnfd
)
491 # remove unused by RO configuration, monitoring, scaling and internal keys
492 vnfd_RO
.pop("_id", None)
493 vnfd_RO
.pop("_admin", None)
494 vnfd_RO
.pop("monitoring-param", None)
495 vnfd_RO
.pop("scaling-group-descriptor", None)
496 vnfd_RO
.pop("kdu", None)
497 vnfd_RO
.pop("k8s-cluster", None)
499 vnfd_RO
["id"] = new_id
501 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
502 for vdu
in get_iterable(vnfd_RO
, "vdu"):
503 vdu
.pop("cloud-init-file", None)
504 vdu
.pop("cloud-init", None)
508 def ip_profile_2_RO(ip_profile
):
509 RO_ip_profile
= deepcopy(ip_profile
)
510 if "dns-server" in RO_ip_profile
:
511 if isinstance(RO_ip_profile
["dns-server"], list):
512 RO_ip_profile
["dns-address"] = []
513 for ds
in RO_ip_profile
.pop("dns-server"):
514 RO_ip_profile
["dns-address"].append(ds
["address"])
516 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
517 if RO_ip_profile
.get("ip-version") == "ipv4":
518 RO_ip_profile
["ip-version"] = "IPv4"
519 if RO_ip_profile
.get("ip-version") == "ipv6":
520 RO_ip_profile
["ip-version"] = "IPv6"
521 if "dhcp-params" in RO_ip_profile
:
522 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
525 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
526 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
527 if db_vim
["_admin"]["operationalState"] != "ENABLED":
529 "VIM={} is not available. operationalState={}".format(
530 vim_account
, db_vim
["_admin"]["operationalState"]
533 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
536 def get_ro_wim_id_for_wim_account(self
, wim_account
):
537 if isinstance(wim_account
, str):
538 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
539 if db_wim
["_admin"]["operationalState"] != "ENABLED":
541 "WIM={} is not available. operationalState={}".format(
542 wim_account
, db_wim
["_admin"]["operationalState"]
545 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
550 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
551 db_vdu_push_list
= []
553 db_update
= {"_admin.modified": time()}
555 for vdu_id
, vdu_count
in vdu_create
.items():
559 for vdur
in reversed(db_vnfr
["vdur"])
560 if vdur
["vdu-id-ref"] == vdu_id
565 # Read the template saved in the db:
567 "No vdur in the database. Using the vdur-template to scale"
569 vdur_template
= db_vnfr
.get("vdur-template")
570 if not vdur_template
:
572 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
576 vdur
= vdur_template
[0]
577 # Delete a template from the database after using it
580 {"_id": db_vnfr
["_id"]},
582 pull
={"vdur-template": {"_id": vdur
["_id"]}},
584 for count
in range(vdu_count
):
585 vdur_copy
= deepcopy(vdur
)
586 vdur_copy
["status"] = "BUILD"
587 vdur_copy
["status-detailed"] = None
588 vdur_copy
["ip-address"] = None
589 vdur_copy
["_id"] = str(uuid4())
590 vdur_copy
["count-index"] += count
+ 1
591 vdur_copy
["id"] = "{}-{}".format(
592 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
594 vdur_copy
.pop("vim_info", None)
595 for iface
in vdur_copy
["interfaces"]:
596 if iface
.get("fixed-ip"):
597 iface
["ip-address"] = self
.increment_ip_mac(
598 iface
["ip-address"], count
+ 1
601 iface
.pop("ip-address", None)
602 if iface
.get("fixed-mac"):
603 iface
["mac-address"] = self
.increment_ip_mac(
604 iface
["mac-address"], count
+ 1
607 iface
.pop("mac-address", None)
611 ) # only first vdu can be managment of vnf
612 db_vdu_push_list
.append(vdur_copy
)
613 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
615 if len(db_vnfr
["vdur"]) == 1:
616 # The scale will move to 0 instances
618 "Scaling to 0 !, creating the template with the last vdur"
620 template_vdur
= [db_vnfr
["vdur"][0]]
621 for vdu_id
, vdu_count
in vdu_delete
.items():
623 indexes_to_delete
= [
625 for iv
in enumerate(db_vnfr
["vdur"])
626 if iv
[1]["vdu-id-ref"] == vdu_id
630 "vdur.{}.status".format(i
): "DELETING"
631 for i
in indexes_to_delete
[-vdu_count
:]
635 # it must be deleted one by one because common.db does not allow otherwise
638 for v
in reversed(db_vnfr
["vdur"])
639 if v
["vdu-id-ref"] == vdu_id
641 for vdu
in vdus_to_delete
[:vdu_count
]:
644 {"_id": db_vnfr
["_id"]},
646 pull
={"vdur": {"_id": vdu
["_id"]}},
650 db_push
["vdur"] = db_vdu_push_list
652 db_push
["vdur-template"] = template_vdur
655 db_vnfr
["vdur-template"] = template_vdur
656 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
657 # modify passed dictionary db_vnfr
658 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
659 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
661 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
663 Updates database nsr with the RO info for the created vld
664 :param ns_update_nsr: dictionary to be filled with the updated info
665 :param db_nsr: content of db_nsr. This is also modified
666 :param nsr_desc_RO: nsr descriptor from RO
667 :return: Nothing, LcmException is raised on errors
670 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
671 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
672 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
674 vld
["vim-id"] = net_RO
.get("vim_net_id")
675 vld
["name"] = net_RO
.get("vim_name")
676 vld
["status"] = net_RO
.get("status")
677 vld
["status-detailed"] = net_RO
.get("error_msg")
678 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
682 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
685 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
687 for db_vnfr
in db_vnfrs
.values():
688 vnfr_update
= {"status": "ERROR"}
689 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
690 if "status" not in vdur
:
691 vdur
["status"] = "ERROR"
692 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
694 vdur
["status-detailed"] = str(error_text
)
696 "vdur.{}.status-detailed".format(vdu_index
)
698 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
699 except DbException
as e
:
700 self
.logger
.error("Cannot update vnf. {}".format(e
))
702 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
704 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
705 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
706 :param nsr_desc_RO: nsr descriptor from RO
707 :return: Nothing, LcmException is raised on errors
709 for vnf_index
, db_vnfr
in db_vnfrs
.items():
710 for vnf_RO
in nsr_desc_RO
["vnfs"]:
711 if vnf_RO
["member_vnf_index"] != vnf_index
:
714 if vnf_RO
.get("ip_address"):
715 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
[
718 elif not db_vnfr
.get("ip-address"):
719 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
720 raise LcmExceptionNoMgmtIP(
721 "ns member_vnf_index '{}' has no IP address".format(
726 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
727 vdur_RO_count_index
= 0
728 if vdur
.get("pdu-type"):
730 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
731 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
733 if vdur
["count-index"] != vdur_RO_count_index
:
734 vdur_RO_count_index
+= 1
736 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
737 if vdur_RO
.get("ip_address"):
738 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
740 vdur
["ip-address"] = None
741 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
742 vdur
["name"] = vdur_RO
.get("vim_name")
743 vdur
["status"] = vdur_RO
.get("status")
744 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
745 for ifacer
in get_iterable(vdur
, "interfaces"):
746 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
747 if ifacer
["name"] == interface_RO
.get("internal_name"):
748 ifacer
["ip-address"] = interface_RO
.get(
751 ifacer
["mac-address"] = interface_RO
.get(
757 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
758 "from VIM info".format(
759 vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]
762 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
766 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
768 vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]
772 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
773 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
774 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
776 vld
["vim-id"] = net_RO
.get("vim_net_id")
777 vld
["name"] = net_RO
.get("vim_name")
778 vld
["status"] = net_RO
.get("status")
779 vld
["status-detailed"] = net_RO
.get("error_msg")
780 vnfr_update
["vld.{}".format(vld_index
)] = vld
784 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
789 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
794 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
799 def _get_ns_config_info(self
, nsr_id
):
801 Generates a mapping between vnf,vdu elements and the N2VC id
802 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
803 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
804 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
805 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
807 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
808 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
810 ns_config_info
= {"osm-config-mapping": mapping
}
811 for vca
in vca_deployed_list
:
812 if not vca
["member-vnf-index"]:
814 if not vca
["vdu_id"]:
815 mapping
[vca
["member-vnf-index"]] = vca
["application"]
819 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
821 ] = vca
["application"]
822 return ns_config_info
824 async def _instantiate_ng_ro(
840 def get_vim_account(vim_account_id
):
842 if vim_account_id
in db_vims
:
843 return db_vims
[vim_account_id
]
844 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
845 db_vims
[vim_account_id
] = db_vim
848 # modify target_vld info with instantiation parameters
849 def parse_vld_instantiation_params(
850 target_vim
, target_vld
, vld_params
, target_sdn
852 if vld_params
.get("ip-profile"):
853 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
[
856 if vld_params
.get("provider-network"):
857 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
860 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
861 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
865 # check if WIM is needed; if needed, choose a feasible WIM able to connect VIMs
866 # if wim_account_id is specified in vld_params, validate if it is feasible.
867 wim_account_id
, db_wim
= select_feasible_wim_account(
868 db_nsr
, db_vnfrs
, target_vld
, vld_params
, self
.logger
872 # WIM is needed and a feasible one was found, populate WIM target and SDN ports
873 self
.logger
.info("WIM selected: {:s}".format(str(wim_account_id
)))
874 # update vld_params with correct WIM account Id
875 vld_params
["wimAccountId"] = wim_account_id
877 target_wim
= "wim:{}".format(wim_account_id
)
878 target_wim_attrs
= get_target_wim_attrs(nsr_id
, target_vld
, vld_params
)
879 sdn_ports
= get_sdn_ports(vld_params
, db_wim
)
880 if len(sdn_ports
) > 0:
881 target_vld
["vim_info"][target_wim
] = target_wim_attrs
882 target_vld
["vim_info"][target_wim
]["sdn-ports"] = sdn_ports
885 "Target VLD with WIM data: {:s}".format(str(target_vld
))
888 for param
in ("vim-network-name", "vim-network-id"):
889 if vld_params
.get(param
):
890 if isinstance(vld_params
[param
], dict):
891 for vim
, vim_net
in vld_params
[param
].items():
892 other_target_vim
= "vim:" + vim
894 target_vld
["vim_info"],
895 (other_target_vim
, param
.replace("-", "_")),
898 else: # isinstance str
899 target_vld
["vim_info"][target_vim
][
900 param
.replace("-", "_")
901 ] = vld_params
[param
]
902 if vld_params
.get("common_id"):
903 target_vld
["common_id"] = vld_params
.get("common_id")
905 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
906 def update_ns_vld_target(target
, ns_params
):
907 for vnf_params
in ns_params
.get("vnf", ()):
908 if vnf_params
.get("vimAccountId"):
912 for vnfr
in db_vnfrs
.values()
913 if vnf_params
["member-vnf-index"]
914 == vnfr
["member-vnf-index-ref"]
918 vdur
= next((vdur
for vdur
in target_vnf
.get("vdur", ())), None)
921 for a_index
, a_vld
in enumerate(target
["ns"]["vld"]):
922 target_vld
= find_in_list(
923 get_iterable(vdur
, "interfaces"),
924 lambda iface
: iface
.get("ns-vld-id") == a_vld
["name"],
927 vld_params
= find_in_list(
928 get_iterable(ns_params
, "vld"),
929 lambda v_vld
: v_vld
["name"] in (a_vld
["name"], a_vld
["id"]),
932 if vnf_params
.get("vimAccountId") not in a_vld
.get(
935 target_vim_network_list
= [
936 v
for _
, v
in a_vld
.get("vim_info").items()
938 target_vim_network_name
= next(
940 item
.get("vim_network_name", "")
941 for item
in target_vim_network_list
946 target
["ns"]["vld"][a_index
].get("vim_info").update(
948 "vim:{}".format(vnf_params
["vimAccountId"]): {
949 "vim_network_name": target_vim_network_name
,
955 for param
in ("vim-network-name", "vim-network-id"):
956 if vld_params
.get(param
) and isinstance(
957 vld_params
[param
], dict
959 for vim
, vim_net
in vld_params
[
962 other_target_vim
= "vim:" + vim
964 target
["ns"]["vld"][a_index
].get(
969 param
.replace("-", "_"),
974 nslcmop_id
= db_nslcmop
["_id"]
976 "name": db_nsr
["name"],
979 "image": deepcopy(db_nsr
["image"]),
980 "flavor": deepcopy(db_nsr
["flavor"]),
981 "action_id": nslcmop_id
,
982 "cloud_init_content": {},
984 for image
in target
["image"]:
985 image
["vim_info"] = {}
986 for flavor
in target
["flavor"]:
987 flavor
["vim_info"] = {}
988 if db_nsr
.get("affinity-or-anti-affinity-group"):
989 target
["affinity-or-anti-affinity-group"] = deepcopy(
990 db_nsr
["affinity-or-anti-affinity-group"]
992 for affinity_or_anti_affinity_group
in target
[
993 "affinity-or-anti-affinity-group"
995 affinity_or_anti_affinity_group
["vim_info"] = {}
997 if db_nslcmop
.get("lcmOperationType") != "instantiate":
998 # get parameters of instantiation:
999 db_nslcmop_instantiate
= self
.db
.get_list(
1002 "nsInstanceId": db_nslcmop
["nsInstanceId"],
1003 "lcmOperationType": "instantiate",
1006 ns_params
= db_nslcmop_instantiate
.get("operationParams")
1008 ns_params
= db_nslcmop
.get("operationParams")
1009 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
1010 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
1013 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
1014 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
1017 "name": vld
["name"],
1018 "mgmt-network": vld
.get("mgmt-network", False),
1019 "type": vld
.get("type"),
1022 "vim_network_name": vld
.get("vim-network-name"),
1023 "vim_account_id": ns_params
["vimAccountId"],
1027 # check if this network needs SDN assist
1028 if vld
.get("pci-interfaces"):
1029 db_vim
= get_vim_account(ns_params
["vimAccountId"])
1030 if vim_config
:= db_vim
.get("config"):
1031 if sdnc_id
:= vim_config
.get("sdn-controller"):
1032 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
1033 target_sdn
= "sdn:{}".format(sdnc_id
)
1034 target_vld
["vim_info"][target_sdn
] = {
1036 "target_vim": target_vim
,
1038 "type": vld
.get("type"),
1041 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
1042 for nsd_vnf_profile
in nsd_vnf_profiles
:
1043 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
1044 if cp
["virtual-link-profile-id"] == vld
["id"]:
1046 "member_vnf:{}.{}".format(
1047 cp
["constituent-cpd-id"][0][
1048 "constituent-base-element-id"
1050 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
1052 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
1054 # check at nsd descriptor, if there is an ip-profile
1056 nsd_vlp
= find_in_list(
1057 get_virtual_link_profiles(nsd
),
1058 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
1063 and nsd_vlp
.get("virtual-link-protocol-data")
1064 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1066 ip_profile_source_data
= nsd_vlp
["virtual-link-protocol-data"][
1069 ip_profile_dest_data
= {}
1070 if "ip-version" in ip_profile_source_data
:
1071 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1074 if "cidr" in ip_profile_source_data
:
1075 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1078 if "gateway-ip" in ip_profile_source_data
:
1079 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
[
1082 if "dhcp-enabled" in ip_profile_source_data
:
1083 ip_profile_dest_data
["dhcp-params"] = {
1084 "enabled": ip_profile_source_data
["dhcp-enabled"]
1086 vld_params
["ip-profile"] = ip_profile_dest_data
1088 # update vld_params with instantiation params
1089 vld_instantiation_params
= find_in_list(
1090 get_iterable(ns_params
, "vld"),
1091 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
1093 if vld_instantiation_params
:
1094 vld_params
.update(vld_instantiation_params
)
1095 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
1096 target
["ns"]["vld"].append(target_vld
)
1097 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1098 update_ns_vld_target(target
, ns_params
)
1100 for vnfr
in db_vnfrs
.values():
1101 vnfd
= find_in_list(
1102 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
1104 vnf_params
= find_in_list(
1105 get_iterable(ns_params
, "vnf"),
1106 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
1108 target_vnf
= deepcopy(vnfr
)
1109 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
1110 for vld
in target_vnf
.get("vld", ()):
1111 # check if connected to a ns.vld, to fill target'
1112 vnf_cp
= find_in_list(
1113 vnfd
.get("int-virtual-link-desc", ()),
1114 lambda cpd
: cpd
.get("id") == vld
["id"],
1117 ns_cp
= "member_vnf:{}.{}".format(
1118 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
1120 if cp2target
.get(ns_cp
):
1121 vld
["target"] = cp2target
[ns_cp
]
1124 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
1126 # check if this network needs SDN assist
1128 if vld
.get("pci-interfaces"):
1129 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1130 sdnc_id
= db_vim
["config"].get("sdn-controller")
1132 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
1133 target_sdn
= "sdn:{}".format(sdnc_id
)
1134 vld
["vim_info"][target_sdn
] = {
1136 "target_vim": target_vim
,
1138 "type": vld
.get("type"),
1141 # check at vnfd descriptor, if there is an ip-profile
1143 vnfd_vlp
= find_in_list(
1144 get_virtual_link_profiles(vnfd
),
1145 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
1149 and vnfd_vlp
.get("virtual-link-protocol-data")
1150 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1152 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"][
1155 ip_profile_dest_data
= {}
1156 if "ip-version" in ip_profile_source_data
:
1157 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1160 if "cidr" in ip_profile_source_data
:
1161 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1164 if "gateway-ip" in ip_profile_source_data
:
1165 ip_profile_dest_data
[
1167 ] = ip_profile_source_data
["gateway-ip"]
1168 if "dhcp-enabled" in ip_profile_source_data
:
1169 ip_profile_dest_data
["dhcp-params"] = {
1170 "enabled": ip_profile_source_data
["dhcp-enabled"]
1173 vld_params
["ip-profile"] = ip_profile_dest_data
1174 # update vld_params with instantiation params
1176 vld_instantiation_params
= find_in_list(
1177 get_iterable(vnf_params
, "internal-vld"),
1178 lambda i_vld
: i_vld
["name"] == vld
["id"],
1180 if vld_instantiation_params
:
1181 vld_params
.update(vld_instantiation_params
)
1182 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
1185 for vdur
in target_vnf
.get("vdur", ()):
1186 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
1187 continue # This vdu must not be created
1188 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
1190 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
1193 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
1194 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
1197 and vdu_configuration
.get("config-access")
1198 and vdu_configuration
.get("config-access").get("ssh-access")
1200 vdur
["ssh-keys"] = ssh_keys_all
1201 vdur
["ssh-access-required"] = vdu_configuration
[
1203 ]["ssh-access"]["required"]
1206 and vnf_configuration
.get("config-access")
1207 and vnf_configuration
.get("config-access").get("ssh-access")
1208 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
1210 vdur
["ssh-keys"] = ssh_keys_all
1211 vdur
["ssh-access-required"] = vnf_configuration
[
1213 ]["ssh-access"]["required"]
1214 elif ssh_keys_instantiation
and find_in_list(
1215 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
1217 vdur
["ssh-keys"] = ssh_keys_instantiation
1219 self
.logger
.debug("NS > vdur > {}".format(vdur
))
1221 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1223 if vdud
.get("cloud-init-file"):
1224 vdur
["cloud-init"] = "{}:file:{}".format(
1225 vnfd
["_id"], vdud
.get("cloud-init-file")
1227 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1228 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1229 base_folder
= vnfd
["_admin"]["storage"]
1230 if base_folder
["pkg-dir"]:
1231 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1232 base_folder
["folder"],
1233 base_folder
["pkg-dir"],
1234 vdud
.get("cloud-init-file"),
1237 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
1238 base_folder
["folder"],
1239 vdud
.get("cloud-init-file"),
1241 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1242 target
["cloud_init_content"][
1245 elif vdud
.get("cloud-init"):
1246 vdur
["cloud-init"] = "{}:vdu:{}".format(
1247 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1249 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1250 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1253 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1254 deploy_params_vdu
= self
._format
_additional
_params
(
1255 vdur
.get("additionalParams") or {}
1257 deploy_params_vdu
["OSM"] = get_osm_params(
1258 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1260 vdur
["additionalParams"] = deploy_params_vdu
1263 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1264 if target_vim
not in ns_flavor
["vim_info"]:
1265 ns_flavor
["vim_info"][target_vim
] = {}
1268 # in case alternative images are provided we must check if they should be applied
1269 # for the vim_type, modify the vim_type taking into account
1270 ns_image_id
= int(vdur
["ns-image-id"])
1271 if vdur
.get("alt-image-ids"):
1272 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1273 vim_type
= db_vim
["vim_type"]
1274 for alt_image_id
in vdur
.get("alt-image-ids"):
1275 ns_alt_image
= target
["image"][int(alt_image_id
)]
1276 if vim_type
== ns_alt_image
.get("vim-type"):
1277 # must use alternative image
1279 "use alternative image id: {}".format(alt_image_id
)
1281 ns_image_id
= alt_image_id
1282 vdur
["ns-image-id"] = ns_image_id
1284 ns_image
= target
["image"][int(ns_image_id
)]
1285 if target_vim
not in ns_image
["vim_info"]:
1286 ns_image
["vim_info"][target_vim
] = {}
1289 if vdur
.get("affinity-or-anti-affinity-group-id"):
1290 for ags_id
in vdur
["affinity-or-anti-affinity-group-id"]:
1291 ns_ags
= target
["affinity-or-anti-affinity-group"][int(ags_id
)]
1292 if target_vim
not in ns_ags
["vim_info"]:
1293 ns_ags
["vim_info"][target_vim
] = {}
1295 vdur
["vim_info"] = {target_vim
: {}}
1296 # instantiation parameters
1298 vdu_instantiation_params
= find_in_list(
1299 get_iterable(vnf_params
, "vdu"),
1300 lambda i_vdu
: i_vdu
["id"] == vdud
["id"],
1302 if vdu_instantiation_params
:
1303 # Parse the vdu_volumes from the instantiation params
1304 vdu_volumes
= get_volumes_from_instantiation_params(
1305 vdu_instantiation_params
, vdud
1307 vdur
["additionalParams"]["OSM"]["vdu_volumes"] = vdu_volumes
1308 vdur_list
.append(vdur
)
1309 target_vnf
["vdur"] = vdur_list
1310 target
["vnf"].append(target_vnf
)
1312 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
1313 desc
= await self
.RO
.deploy(nsr_id
, target
)
1314 self
.logger
.debug("RO return > {}".format(desc
))
1315 action_id
= desc
["action_id"]
1316 await self
._wait
_ng
_ro
(
1323 operation
="instantiation",
1328 "_admin.deployed.RO.operational-status": "running",
1329 "detailed-status": " ".join(stage
),
1331 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1332 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1333 self
._write
_op
_status
(nslcmop_id
, stage
)
1335 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1339 async def _wait_ng_ro(
1349 detailed_status_old
= None
1351 start_time
= start_time
or time()
1352 while time() <= start_time
+ timeout
:
1353 desc_status
= await self
.op_status_map
[operation
](nsr_id
, action_id
)
1354 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1355 if desc_status
["status"] == "FAILED":
1356 raise NgRoException(desc_status
["details"])
1357 elif desc_status
["status"] == "BUILD":
1359 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1360 elif desc_status
["status"] == "DONE":
1362 stage
[2] = "Deployed at VIM"
1365 assert False, "ROclient.check_ns_status returns unknown {}".format(
1366 desc_status
["status"]
1368 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1369 detailed_status_old
= stage
[2]
1370 db_nsr_update
["detailed-status"] = " ".join(stage
)
1371 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1372 self
._write
_op
_status
(nslcmop_id
, stage
)
1373 await asyncio
.sleep(15, loop
=self
.loop
)
1374 else: # timeout_ns_deploy
1375 raise NgRoException("Timeout waiting ns to deploy")
1377 async def _terminate_ng_ro(
1378 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1383 start_deploy
= time()
1390 "action_id": nslcmop_id
,
1392 desc
= await self
.RO
.deploy(nsr_id
, target
)
1393 action_id
= desc
["action_id"]
1394 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1395 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1398 + "ns terminate action at RO. action_id={}".format(action_id
)
1402 delete_timeout
= 20 * 60 # 20 minutes
1403 await self
._wait
_ng
_ro
(
1410 operation
="termination",
1413 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1414 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1416 await self
.RO
.delete(nsr_id
)
1417 except Exception as e
:
1418 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
1419 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1420 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1421 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1423 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1425 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
1426 failed_detail
.append("delete conflict: {}".format(e
))
1429 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1432 failed_detail
.append("delete error: {}".format(e
))
1435 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1439 stage
[2] = "Error deleting from VIM"
1441 stage
[2] = "Deleted from VIM"
1442 db_nsr_update
["detailed-status"] = " ".join(stage
)
1443 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1444 self
._write
_op
_status
(nslcmop_id
, stage
)
1447 raise LcmException("; ".join(failed_detail
))
1450 async def instantiate_RO(
1464 :param logging_text: preffix text to use at logging
1465 :param nsr_id: nsr identity
1466 :param nsd: database content of ns descriptor
1467 :param db_nsr: database content of ns record
1468 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1470 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1471 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1472 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1473 :return: None or exception
1476 start_deploy
= time()
1477 ns_params
= db_nslcmop
.get("operationParams")
1478 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1479 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1481 timeout_ns_deploy
= self
.timeout
.get(
1482 "ns_deploy", self
.timeout_ns_deploy
1485 # Check for and optionally request placement optimization. Database will be updated if placement activated
1486 stage
[2] = "Waiting for Placement."
1487 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1488 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1489 for vnfr
in db_vnfrs
.values():
1490 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1493 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1495 return await self
._instantiate
_ng
_ro
(
1508 except Exception as e
:
1509 stage
[2] = "ERROR deploying at VIM"
1510 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1512 "Error deploying at VIM {}".format(e
),
1513 exc_info
=not isinstance(
1516 ROclient
.ROClientException
,
1525 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1527 Wait for kdu to be up, get ip address
1528 :param logging_text: prefix use for logging
1532 :return: IP address, K8s services
1535 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1538 while nb_tries
< 360:
1539 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1543 for x
in get_iterable(db_vnfr
, "kdur")
1544 if x
.get("kdu-name") == kdu_name
1550 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1552 if kdur
.get("status"):
1553 if kdur
["status"] in ("READY", "ENABLED"):
1554 return kdur
.get("ip-address"), kdur
.get("services")
1557 "target KDU={} is in error state".format(kdu_name
)
1560 await asyncio
.sleep(10, loop
=self
.loop
)
1562 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1564 async def wait_vm_up_insert_key_ro(
1565 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1568 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1569 :param logging_text: prefix use for logging
1574 :param pub_key: public ssh key to inject, None to skip
1575 :param user: user to apply the public ssh key
1579 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1583 target_vdu_id
= None
1588 if ro_retries
>= 360: # 1 hour
1590 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1593 await asyncio
.sleep(10, loop
=self
.loop
)
1596 if not target_vdu_id
:
1597 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1599 if not vdu_id
: # for the VNF case
1600 if db_vnfr
.get("status") == "ERROR":
1602 "Cannot inject ssh-key because target VNF is in error state"
1604 ip_address
= db_vnfr
.get("ip-address")
1610 for x
in get_iterable(db_vnfr
, "vdur")
1611 if x
.get("ip-address") == ip_address
1619 for x
in get_iterable(db_vnfr
, "vdur")
1620 if x
.get("vdu-id-ref") == vdu_id
1621 and x
.get("count-index") == vdu_index
1627 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1628 ): # If only one, this should be the target vdu
1629 vdur
= db_vnfr
["vdur"][0]
1632 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1633 vnfr_id
, vdu_id
, vdu_index
1636 # New generation RO stores information at "vim_info"
1639 if vdur
.get("vim_info"):
1641 t
for t
in vdur
["vim_info"]
1642 ) # there should be only one key
1643 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1645 vdur
.get("pdu-type")
1646 or vdur
.get("status") == "ACTIVE"
1647 or ng_ro_status
== "ACTIVE"
1649 ip_address
= vdur
.get("ip-address")
1652 target_vdu_id
= vdur
["vdu-id-ref"]
1653 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1655 "Cannot inject ssh-key because target VM is in error state"
1658 if not target_vdu_id
:
1661 # inject public key into machine
1662 if pub_key
and user
:
1663 self
.logger
.debug(logging_text
+ "Inserting RO key")
1664 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1665 if vdur
.get("pdu-type"):
1666 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1669 ro_vm_id
= "{}-{}".format(
1670 db_vnfr
["member-vnf-index-ref"], target_vdu_id
1671 ) # TODO add vdu_index
1675 "action": "inject_ssh_key",
1679 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1681 desc
= await self
.RO
.deploy(nsr_id
, target
)
1682 action_id
= desc
["action_id"]
1683 await self
._wait
_ng
_ro
(
1684 nsr_id
, action_id
, timeout
=600, operation
="instantiation"
1688 # wait until NS is deployed at RO
1690 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1691 ro_nsr_id
= deep_get(
1692 db_nsrs
, ("_admin", "deployed", "RO", "nsr_id")
1696 result_dict
= await self
.RO
.create_action(
1698 item_id_name
=ro_nsr_id
,
1700 "add_public_key": pub_key
,
1705 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1706 if not result_dict
or not isinstance(result_dict
, dict):
1708 "Unknown response from RO when injecting key"
1710 for result
in result_dict
.values():
1711 if result
.get("vim_result") == 200:
1714 raise ROclient
.ROClientException(
1715 "error injecting key: {}".format(
1716 result
.get("description")
1720 except NgRoException
as e
:
1722 "Reaching max tries injecting key. Error: {}".format(e
)
1724 except ROclient
.ROClientException
as e
:
1728 + "error injecting key: {}. Retrying until {} seconds".format(
1735 "Reaching max tries injecting key. Error: {}".format(e
)
1742 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1744 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1746 my_vca
= vca_deployed_list
[vca_index
]
1747 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1748 # vdu or kdu: no dependencies
1752 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1753 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1754 configuration_status_list
= db_nsr
["configurationStatus"]
1755 for index
, vca_deployed
in enumerate(configuration_status_list
):
1756 if index
== vca_index
:
1759 if not my_vca
.get("member-vnf-index") or (
1760 vca_deployed
.get("member-vnf-index")
1761 == my_vca
.get("member-vnf-index")
1763 internal_status
= configuration_status_list
[index
].get("status")
1764 if internal_status
== "READY":
1766 elif internal_status
== "BROKEN":
1768 "Configuration aborted because dependent charm/s has failed"
1773 # no dependencies, return
1775 await asyncio
.sleep(10)
1778 raise LcmException("Configuration aborted because dependent charm/s timeout")
1780 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1783 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1785 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1786 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1789 async def instantiate_N2VC(
1806 ee_config_descriptor
,
1808 nsr_id
= db_nsr
["_id"]
1809 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1810 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1811 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1812 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1814 "collection": "nsrs",
1815 "filter": {"_id": nsr_id
},
1816 "path": db_update_entry
,
1821 element_under_configuration
= nsr_id
1825 vnfr_id
= db_vnfr
["_id"]
1826 osm_config
["osm"]["vnf_id"] = vnfr_id
1828 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1830 if vca_type
== "native_charm":
1833 index_number
= vdu_index
or 0
1836 element_type
= "VNF"
1837 element_under_configuration
= vnfr_id
1838 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1840 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1841 element_type
= "VDU"
1842 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1843 osm_config
["osm"]["vdu_id"] = vdu_id
1845 namespace
+= ".{}".format(kdu_name
)
1846 element_type
= "KDU"
1847 element_under_configuration
= kdu_name
1848 osm_config
["osm"]["kdu_name"] = kdu_name
1851 if base_folder
["pkg-dir"]:
1852 artifact_path
= "{}/{}/{}/{}".format(
1853 base_folder
["folder"],
1854 base_folder
["pkg-dir"],
1857 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1862 artifact_path
= "{}/Scripts/{}/{}/".format(
1863 base_folder
["folder"],
1866 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1871 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1873 # get initial_config_primitive_list that applies to this element
1874 initial_config_primitive_list
= config_descriptor
.get(
1875 "initial-config-primitive"
1879 "Initial config primitive list > {}".format(
1880 initial_config_primitive_list
1884 # add config if not present for NS charm
1885 ee_descriptor_id
= ee_config_descriptor
.get("id")
1886 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1887 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1888 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1892 "Initial config primitive list #2 > {}".format(
1893 initial_config_primitive_list
1896 # n2vc_redesign STEP 3.1
1897 # find old ee_id if exists
1898 ee_id
= vca_deployed
.get("ee_id")
1900 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1901 # create or register execution environment in VCA
1902 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1903 self
._write
_configuration
_status
(
1905 vca_index
=vca_index
,
1907 element_under_configuration
=element_under_configuration
,
1908 element_type
=element_type
,
1911 step
= "create execution environment"
1912 self
.logger
.debug(logging_text
+ step
)
1916 if vca_type
== "k8s_proxy_charm":
1917 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1918 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1919 namespace
=namespace
,
1920 artifact_path
=artifact_path
,
1924 elif vca_type
== "helm" or vca_type
== "helm-v3":
1925 ee_id
, credentials
= await self
.vca_map
[
1927 ].create_execution_environment(
1928 namespace
=namespace
,
1932 artifact_path
=artifact_path
,
1936 ee_id
, credentials
= await self
.vca_map
[
1938 ].create_execution_environment(
1939 namespace
=namespace
,
1945 elif vca_type
== "native_charm":
1946 step
= "Waiting to VM being up and getting IP address"
1947 self
.logger
.debug(logging_text
+ step
)
1948 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1957 credentials
= {"hostname": rw_mgmt_ip
}
1959 username
= deep_get(
1960 config_descriptor
, ("config-access", "ssh-access", "default-user")
1962 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1963 # merged. Meanwhile let's get username from initial-config-primitive
1964 if not username
and initial_config_primitive_list
:
1965 for config_primitive
in initial_config_primitive_list
:
1966 for param
in config_primitive
.get("parameter", ()):
1967 if param
["name"] == "ssh-username":
1968 username
= param
["value"]
1972 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1973 "'config-access.ssh-access.default-user'"
1975 credentials
["username"] = username
1976 # n2vc_redesign STEP 3.2
1978 self
._write
_configuration
_status
(
1980 vca_index
=vca_index
,
1981 status
="REGISTERING",
1982 element_under_configuration
=element_under_configuration
,
1983 element_type
=element_type
,
1986 step
= "register execution environment {}".format(credentials
)
1987 self
.logger
.debug(logging_text
+ step
)
1988 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1989 credentials
=credentials
,
1990 namespace
=namespace
,
1995 # for compatibility with MON/POL modules, the need model and application name at database
1996 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1997 ee_id_parts
= ee_id
.split(".")
1998 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1999 if len(ee_id_parts
) >= 2:
2000 model_name
= ee_id_parts
[0]
2001 application_name
= ee_id_parts
[1]
2002 db_nsr_update
[db_update_entry
+ "model"] = model_name
2003 db_nsr_update
[db_update_entry
+ "application"] = application_name
2005 # n2vc_redesign STEP 3.3
2006 step
= "Install configuration Software"
2008 self
._write
_configuration
_status
(
2010 vca_index
=vca_index
,
2011 status
="INSTALLING SW",
2012 element_under_configuration
=element_under_configuration
,
2013 element_type
=element_type
,
2014 other_update
=db_nsr_update
,
2017 # TODO check if already done
2018 self
.logger
.debug(logging_text
+ step
)
2020 if vca_type
== "native_charm":
2021 config_primitive
= next(
2022 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
2025 if config_primitive
:
2026 config
= self
._map
_primitive
_params
(
2027 config_primitive
, {}, deploy_params
2030 if vca_type
== "lxc_proxy_charm":
2031 if element_type
== "NS":
2032 num_units
= db_nsr
.get("config-units") or 1
2033 elif element_type
== "VNF":
2034 num_units
= db_vnfr
.get("config-units") or 1
2035 elif element_type
== "VDU":
2036 for v
in db_vnfr
["vdur"]:
2037 if vdu_id
== v
["vdu-id-ref"]:
2038 num_units
= v
.get("config-units") or 1
2040 if vca_type
!= "k8s_proxy_charm":
2041 await self
.vca_map
[vca_type
].install_configuration_sw(
2043 artifact_path
=artifact_path
,
2046 num_units
=num_units
,
2051 # write in db flag of configuration_sw already installed
2053 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
2056 # add relations for this VCA (wait for other peers related with this VCA)
2057 await self
._add
_vca
_relations
(
2058 logging_text
=logging_text
,
2061 vca_index
=vca_index
,
2064 # if SSH access is required, then get execution environment SSH public
2065 # if native charm we have waited already to VM be UP
2066 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
2069 # self.logger.debug("get ssh key block")
2071 config_descriptor
, ("config-access", "ssh-access", "required")
2073 # self.logger.debug("ssh key needed")
2074 # Needed to inject a ssh key
2077 ("config-access", "ssh-access", "default-user"),
2079 step
= "Install configuration Software, getting public ssh key"
2080 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
2081 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
2084 step
= "Insert public key into VM user={} ssh_key={}".format(
2088 # self.logger.debug("no need to get ssh key")
2089 step
= "Waiting to VM being up and getting IP address"
2090 self
.logger
.debug(logging_text
+ step
)
2092 # default rw_mgmt_ip to None, avoiding the non definition of the variable
2095 # n2vc_redesign STEP 5.1
2096 # wait for RO (ip-address) Insert pub_key into VM
2099 rw_mgmt_ip
, services
= await self
.wait_kdu_up(
2100 logging_text
, nsr_id
, vnfr_id
, kdu_name
2102 vnfd
= self
.db
.get_one(
2104 {"_id": f
'{db_vnfr["vnfd-id"]}:{db_vnfr["revision"]}'},
2106 kdu
= get_kdu(vnfd
, kdu_name
)
2108 service
["name"] for service
in get_kdu_services(kdu
)
2110 exposed_services
= []
2111 for service
in services
:
2112 if any(s
in service
["name"] for s
in kdu_services
):
2113 exposed_services
.append(service
)
2114 await self
.vca_map
[vca_type
].exec_primitive(
2116 primitive_name
="config",
2118 "osm-config": json
.dumps(
2120 k8s
={"services": exposed_services
}
2127 # This verification is needed in order to avoid trying to add a public key
2128 # to a VM, when the VNF is a KNF (in the edge case where the user creates a VCA
2129 # for a KNF and not for its KDUs, the previous verification gives False, and the code
2130 # jumps to this block, meaning that there is the need to verify if the VNF is actually a VNF
2132 elif db_vnfr
.get("vdur"):
2133 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
2143 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
2145 # store rw_mgmt_ip in deploy params for later replacement
2146 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
2148 # n2vc_redesign STEP 6 Execute initial config primitive
2149 step
= "execute initial config primitive"
2151 # wait for dependent primitives execution (NS -> VNF -> VDU)
2152 if initial_config_primitive_list
:
2153 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
2155 # stage, in function of element type: vdu, kdu, vnf or ns
2156 my_vca
= vca_deployed_list
[vca_index
]
2157 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
2159 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
2160 elif my_vca
.get("member-vnf-index"):
2162 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
2165 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
2167 self
._write
_configuration
_status
(
2168 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
2171 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2173 check_if_terminated_needed
= True
2174 for initial_config_primitive
in initial_config_primitive_list
:
2175 # adding information on the vca_deployed if it is a NS execution environment
2176 if not vca_deployed
["member-vnf-index"]:
2177 deploy_params
["ns_config_info"] = json
.dumps(
2178 self
._get
_ns
_config
_info
(nsr_id
)
2180 # TODO check if already done
2181 primitive_params_
= self
._map
_primitive
_params
(
2182 initial_config_primitive
, {}, deploy_params
2185 step
= "execute primitive '{}' params '{}'".format(
2186 initial_config_primitive
["name"], primitive_params_
2188 self
.logger
.debug(logging_text
+ step
)
2189 await self
.vca_map
[vca_type
].exec_primitive(
2191 primitive_name
=initial_config_primitive
["name"],
2192 params_dict
=primitive_params_
,
2197 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2198 if check_if_terminated_needed
:
2199 if config_descriptor
.get("terminate-config-primitive"):
2201 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
2203 check_if_terminated_needed
= False
2205 # TODO register in database that primitive is done
2207 # STEP 7 Configure metrics
2208 if vca_type
== "helm" or vca_type
== "helm-v3":
2209 prometheus_jobs
= await self
.extract_prometheus_scrape_jobs(
2211 artifact_path
=artifact_path
,
2212 ee_config_descriptor
=ee_config_descriptor
,
2215 target_ip
=rw_mgmt_ip
,
2221 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
2224 for job
in prometheus_jobs
:
2227 {"job_name": job
["job_name"]},
2230 fail_on_empty
=False,
2233 step
= "instantiated at VCA"
2234 self
.logger
.debug(logging_text
+ step
)
2236 self
._write
_configuration
_status
(
2237 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
2240 except Exception as e
: # TODO not use Exception but N2VC exception
2241 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2243 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
2246 "Exception while {} : {}".format(step
, e
), exc_info
=True
2248 self
._write
_configuration
_status
(
2249 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2251 raise LcmException("{} {}".format(step
, e
)) from e
2253 def _write_ns_status(
2257 current_operation
: str,
2258 current_operation_id
: str,
2259 error_description
: str = None,
2260 error_detail
: str = None,
2261 other_update
: dict = None,
2264 Update db_nsr fields.
2267 :param current_operation:
2268 :param current_operation_id:
2269 :param error_description:
2270 :param error_detail:
2271 :param other_update: Other required changes at database if provided, will be cleared
2275 db_dict
= other_update
or {}
2278 ] = current_operation_id
# for backward compatibility
2279 db_dict
["_admin.current-operation"] = current_operation_id
2280 db_dict
["_admin.operation-type"] = (
2281 current_operation
if current_operation
!= "IDLE" else None
2283 db_dict
["currentOperation"] = current_operation
2284 db_dict
["currentOperationID"] = current_operation_id
2285 db_dict
["errorDescription"] = error_description
2286 db_dict
["errorDetail"] = error_detail
2289 db_dict
["nsState"] = ns_state
2290 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2291 except DbException
as e
:
2292 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2294 def _write_op_status(
2298 error_message
: str = None,
2299 queuePosition
: int = 0,
2300 operation_state
: str = None,
2301 other_update
: dict = None,
2304 db_dict
= other_update
or {}
2305 db_dict
["queuePosition"] = queuePosition
2306 if isinstance(stage
, list):
2307 db_dict
["stage"] = stage
[0]
2308 db_dict
["detailed-status"] = " ".join(stage
)
2309 elif stage
is not None:
2310 db_dict
["stage"] = str(stage
)
2312 if error_message
is not None:
2313 db_dict
["errorMessage"] = error_message
2314 if operation_state
is not None:
2315 db_dict
["operationState"] = operation_state
2316 db_dict
["statusEnteredTime"] = time()
2317 self
.update_db_2("nslcmops", op_id
, db_dict
)
2318 except DbException
as e
:
2320 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2323 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2325 nsr_id
= db_nsr
["_id"]
2326 # configurationStatus
2327 config_status
= db_nsr
.get("configurationStatus")
2330 "configurationStatus.{}.status".format(index
): status
2331 for index
, v
in enumerate(config_status
)
2335 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2337 except DbException
as e
:
2339 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2342 def _write_configuration_status(
2347 element_under_configuration
: str = None,
2348 element_type
: str = None,
2349 other_update
: dict = None,
2351 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2352 # .format(vca_index, status))
2355 db_path
= "configurationStatus.{}.".format(vca_index
)
2356 db_dict
= other_update
or {}
2358 db_dict
[db_path
+ "status"] = status
2359 if element_under_configuration
:
2361 db_path
+ "elementUnderConfiguration"
2362 ] = element_under_configuration
2364 db_dict
[db_path
+ "elementType"] = element_type
2365 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2366 except DbException
as e
:
2368 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2369 status
, nsr_id
, vca_index
, e
2373 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2375 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2376 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2377 Database is used because the result can be obtained from a different LCM worker in case of HA.
2378 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2379 :param db_nslcmop: database content of nslcmop
2380 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2381 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2382 computed 'vim-account-id'
2385 nslcmop_id
= db_nslcmop
["_id"]
2386 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2387 if placement_engine
== "PLA":
2389 logging_text
+ "Invoke and wait for placement optimization"
2391 await self
.msg
.aiowrite(
2392 "pla", "get_placement", {"nslcmopId": nslcmop_id
}, loop
=self
.loop
2394 db_poll_interval
= 5
2395 wait
= db_poll_interval
* 10
2397 while not pla_result
and wait
>= 0:
2398 await asyncio
.sleep(db_poll_interval
)
2399 wait
-= db_poll_interval
2400 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2401 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2405 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2408 for pla_vnf
in pla_result
["vnf"]:
2409 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2410 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2415 {"_id": vnfr
["_id"]},
2416 {"vim-account-id": pla_vnf
["vimAccountId"]},
2419 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2422 def update_nsrs_with_pla_result(self
, params
):
2424 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2426 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2428 except Exception as e
:
2429 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2431 async def instantiate(self
, nsr_id
, nslcmop_id
):
2434 :param nsr_id: ns instance to deploy
2435 :param nslcmop_id: operation to run
2439 # Try to lock HA task here
2440 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2441 if not task_is_locked_by_me
:
2443 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2447 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2448 self
.logger
.debug(logging_text
+ "Enter")
2450 # get all needed from database
2452 # database nsrs record
2455 # database nslcmops record
2458 # update operation on nsrs
2460 # update operation on nslcmops
2461 db_nslcmop_update
= {}
2463 nslcmop_operation_state
= None
2464 db_vnfrs
= {} # vnf's info indexed by member-index
2466 tasks_dict_info
= {} # from task to info text
2470 "Stage 1/5: preparation of the environment.",
2471 "Waiting for previous operations to terminate.",
2474 # ^ stage, step, VIM progress
2476 # wait for any previous tasks in process
2477 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2479 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2480 stage
[1] = "Reading from database."
2481 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2482 db_nsr_update
["detailed-status"] = "creating"
2483 db_nsr_update
["operational-status"] = "init"
2484 self
._write
_ns
_status
(
2486 ns_state
="BUILDING",
2487 current_operation
="INSTANTIATING",
2488 current_operation_id
=nslcmop_id
,
2489 other_update
=db_nsr_update
,
2491 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2493 # read from db: operation
2494 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2495 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2496 if db_nslcmop
["operationParams"].get("additionalParamsForVnf"):
2497 db_nslcmop
["operationParams"]["additionalParamsForVnf"] = json
.loads(
2498 db_nslcmop
["operationParams"]["additionalParamsForVnf"]
2500 ns_params
= db_nslcmop
.get("operationParams")
2501 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2502 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2504 timeout_ns_deploy
= self
.timeout
.get(
2505 "ns_deploy", self
.timeout_ns_deploy
2509 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2510 self
.logger
.debug(logging_text
+ stage
[1])
2511 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2512 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2513 self
.logger
.debug(logging_text
+ stage
[1])
2514 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2515 self
.fs
.sync(db_nsr
["nsd-id"])
2517 # nsr_name = db_nsr["name"] # TODO short-name??
2519 # read from db: vnf's of this ns
2520 stage
[1] = "Getting vnfrs from db."
2521 self
.logger
.debug(logging_text
+ stage
[1])
2522 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2524 # read from db: vnfd's for every vnf
2525 db_vnfds
= [] # every vnfd data
2527 # for each vnf in ns, read vnfd
2528 for vnfr
in db_vnfrs_list
:
2529 if vnfr
.get("kdur"):
2531 for kdur
in vnfr
["kdur"]:
2532 if kdur
.get("additionalParams"):
2533 kdur
["additionalParams"] = json
.loads(
2534 kdur
["additionalParams"]
2536 kdur_list
.append(kdur
)
2537 vnfr
["kdur"] = kdur_list
2539 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2540 vnfd_id
= vnfr
["vnfd-id"]
2541 vnfd_ref
= vnfr
["vnfd-ref"]
2542 self
.fs
.sync(vnfd_id
)
2544 # if we haven't this vnfd, read it from db
2545 if vnfd_id
not in db_vnfds
:
2547 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2550 self
.logger
.debug(logging_text
+ stage
[1])
2551 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2554 db_vnfds
.append(vnfd
)
2556 # Get or generates the _admin.deployed.VCA list
2557 vca_deployed_list
= None
2558 if db_nsr
["_admin"].get("deployed"):
2559 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2560 if vca_deployed_list
is None:
2561 vca_deployed_list
= []
2562 configuration_status_list
= []
2563 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2564 db_nsr_update
["configurationStatus"] = configuration_status_list
2565 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2566 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2567 elif isinstance(vca_deployed_list
, dict):
2568 # maintain backward compatibility. Change a dict to list at database
2569 vca_deployed_list
= list(vca_deployed_list
.values())
2570 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2571 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2574 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2576 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2577 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2579 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2580 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2581 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2583 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2586 # n2vc_redesign STEP 2 Deploy Network Scenario
2587 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2588 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2590 stage
[1] = "Deploying KDUs."
2591 # self.logger.debug(logging_text + "Before deploy_kdus")
2592 # Call to deploy_kdus in case exists the "vdu:kdu" param
2593 await self
.deploy_kdus(
2594 logging_text
=logging_text
,
2596 nslcmop_id
=nslcmop_id
,
2599 task_instantiation_info
=tasks_dict_info
,
2602 stage
[1] = "Getting VCA public key."
2603 # n2vc_redesign STEP 1 Get VCA public ssh-key
2604 # feature 1429. Add n2vc public key to needed VMs
2605 n2vc_key
= self
.n2vc
.get_public_key()
2606 n2vc_key_list
= [n2vc_key
]
2607 if self
.vca_config
.get("public_key"):
2608 n2vc_key_list
.append(self
.vca_config
["public_key"])
2610 stage
[1] = "Deploying NS at VIM."
2611 task_ro
= asyncio
.ensure_future(
2612 self
.instantiate_RO(
2613 logging_text
=logging_text
,
2617 db_nslcmop
=db_nslcmop
,
2620 n2vc_key_list
=n2vc_key_list
,
2624 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2625 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2627 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2628 stage
[1] = "Deploying Execution Environments."
2629 self
.logger
.debug(logging_text
+ stage
[1])
2631 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2632 for vnf_profile
in get_vnf_profiles(nsd
):
2633 vnfd_id
= vnf_profile
["vnfd-id"]
2634 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2635 member_vnf_index
= str(vnf_profile
["id"])
2636 db_vnfr
= db_vnfrs
[member_vnf_index
]
2637 base_folder
= vnfd
["_admin"]["storage"]
2643 # Get additional parameters
2644 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2645 if db_vnfr
.get("additionalParamsForVnf"):
2646 deploy_params
.update(
2647 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2650 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2651 if descriptor_config
:
2653 logging_text
=logging_text
2654 + "member_vnf_index={} ".format(member_vnf_index
),
2657 nslcmop_id
=nslcmop_id
,
2663 member_vnf_index
=member_vnf_index
,
2664 vdu_index
=vdu_index
,
2666 deploy_params
=deploy_params
,
2667 descriptor_config
=descriptor_config
,
2668 base_folder
=base_folder
,
2669 task_instantiation_info
=tasks_dict_info
,
2673 # Deploy charms for each VDU that supports one.
2674 for vdud
in get_vdu_list(vnfd
):
2676 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2677 vdur
= find_in_list(
2678 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2681 if vdur
.get("additionalParams"):
2682 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2684 deploy_params_vdu
= deploy_params
2685 deploy_params_vdu
["OSM"] = get_osm_params(
2686 db_vnfr
, vdu_id
, vdu_count_index
=0
2688 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2690 self
.logger
.debug("VDUD > {}".format(vdud
))
2692 "Descriptor config > {}".format(descriptor_config
)
2694 if descriptor_config
:
2697 for vdu_index
in range(vdud_count
):
2698 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2700 logging_text
=logging_text
2701 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2702 member_vnf_index
, vdu_id
, vdu_index
2706 nslcmop_id
=nslcmop_id
,
2712 member_vnf_index
=member_vnf_index
,
2713 vdu_index
=vdu_index
,
2715 deploy_params
=deploy_params_vdu
,
2716 descriptor_config
=descriptor_config
,
2717 base_folder
=base_folder
,
2718 task_instantiation_info
=tasks_dict_info
,
2721 for kdud
in get_kdu_list(vnfd
):
2722 kdu_name
= kdud
["name"]
2723 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2724 if descriptor_config
:
2729 x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
2731 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2732 if kdur
.get("additionalParams"):
2733 deploy_params_kdu
.update(
2734 parse_yaml_strings(kdur
["additionalParams"].copy())
2738 logging_text
=logging_text
,
2741 nslcmop_id
=nslcmop_id
,
2747 member_vnf_index
=member_vnf_index
,
2748 vdu_index
=vdu_index
,
2750 deploy_params
=deploy_params_kdu
,
2751 descriptor_config
=descriptor_config
,
2752 base_folder
=base_folder
,
2753 task_instantiation_info
=tasks_dict_info
,
2757 # Check if this NS has a charm configuration
2758 descriptor_config
= nsd
.get("ns-configuration")
2759 if descriptor_config
and descriptor_config
.get("juju"):
2762 member_vnf_index
= None
2768 # Get additional parameters
2769 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2770 if db_nsr
.get("additionalParamsForNs"):
2771 deploy_params
.update(
2772 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2774 base_folder
= nsd
["_admin"]["storage"]
2776 logging_text
=logging_text
,
2779 nslcmop_id
=nslcmop_id
,
2785 member_vnf_index
=member_vnf_index
,
2786 vdu_index
=vdu_index
,
2788 deploy_params
=deploy_params
,
2789 descriptor_config
=descriptor_config
,
2790 base_folder
=base_folder
,
2791 task_instantiation_info
=tasks_dict_info
,
2795 # rest of staff will be done at finally
2798 ROclient
.ROClientException
,
2804 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
2807 except asyncio
.CancelledError
:
2809 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
2811 exc
= "Operation was cancelled"
2812 except Exception as e
:
2813 exc
= traceback
.format_exc()
2814 self
.logger
.critical(
2815 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
2820 error_list
.append(str(exc
))
2822 # wait for pending tasks
2824 stage
[1] = "Waiting for instantiate pending tasks."
2825 self
.logger
.debug(logging_text
+ stage
[1])
2826 error_list
+= await self
._wait
_for
_tasks
(
2834 stage
[1] = stage
[2] = ""
2835 except asyncio
.CancelledError
:
2836 error_list
.append("Cancelled")
2837 # TODO cancel all tasks
2838 except Exception as exc
:
2839 error_list
.append(str(exc
))
2841 # update operation-status
2842 db_nsr_update
["operational-status"] = "running"
2843 # let's begin with VCA 'configured' status (later we can change it)
2844 db_nsr_update
["config-status"] = "configured"
2845 for task
, task_name
in tasks_dict_info
.items():
2846 if not task
.done() or task
.cancelled() or task
.exception():
2847 if task_name
.startswith(self
.task_name_deploy_vca
):
2848 # A N2VC task is pending
2849 db_nsr_update
["config-status"] = "failed"
2851 # RO or KDU task is pending
2852 db_nsr_update
["operational-status"] = "failed"
2854 # update status at database
2856 error_detail
= ". ".join(error_list
)
2857 self
.logger
.error(logging_text
+ error_detail
)
2858 error_description_nslcmop
= "{} Detail: {}".format(
2859 stage
[0], error_detail
2861 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
2862 nslcmop_id
, stage
[0]
2865 db_nsr_update
["detailed-status"] = (
2866 error_description_nsr
+ " Detail: " + error_detail
2868 db_nslcmop_update
["detailed-status"] = error_detail
2869 nslcmop_operation_state
= "FAILED"
2873 error_description_nsr
= error_description_nslcmop
= None
2875 db_nsr_update
["detailed-status"] = "Done"
2876 db_nslcmop_update
["detailed-status"] = "Done"
2877 nslcmop_operation_state
= "COMPLETED"
2880 self
._write
_ns
_status
(
2883 current_operation
="IDLE",
2884 current_operation_id
=None,
2885 error_description
=error_description_nsr
,
2886 error_detail
=error_detail
,
2887 other_update
=db_nsr_update
,
2889 self
._write
_op
_status
(
2892 error_message
=error_description_nslcmop
,
2893 operation_state
=nslcmop_operation_state
,
2894 other_update
=db_nslcmop_update
,
2897 if nslcmop_operation_state
:
2899 await self
.msg
.aiowrite(
2904 "nslcmop_id": nslcmop_id
,
2905 "operationState": nslcmop_operation_state
,
2909 except Exception as e
:
2911 logging_text
+ "kafka_write notification Exception {}".format(e
)
2914 self
.logger
.debug(logging_text
+ "Exit")
2915 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2917 def _get_vnfd(self
, vnfd_id
: str, projects_read
: str, cached_vnfds
: Dict
[str, Any
]):
2918 if vnfd_id
not in cached_vnfds
:
2919 cached_vnfds
[vnfd_id
] = self
.db
.get_one(
2920 "vnfds", {"id": vnfd_id
, "_admin.projects_read": projects_read
}
2922 return cached_vnfds
[vnfd_id
]
2924 def _get_vnfr(self
, nsr_id
: str, vnf_profile_id
: str, cached_vnfrs
: Dict
[str, Any
]):
2925 if vnf_profile_id
not in cached_vnfrs
:
2926 cached_vnfrs
[vnf_profile_id
] = self
.db
.get_one(
2929 "member-vnf-index-ref": vnf_profile_id
,
2930 "nsr-id-ref": nsr_id
,
2933 return cached_vnfrs
[vnf_profile_id
]
2935 def _is_deployed_vca_in_relation(
2936 self
, vca
: DeployedVCA
, relation
: Relation
2939 for endpoint
in (relation
.provider
, relation
.requirer
):
2940 if endpoint
["kdu-resource-profile-id"]:
2943 vca
.vnf_profile_id
== endpoint
.vnf_profile_id
2944 and vca
.vdu_profile_id
== endpoint
.vdu_profile_id
2945 and vca
.execution_environment_ref
== endpoint
.execution_environment_ref
2951 def _update_ee_relation_data_with_implicit_data(
2952 self
, nsr_id
, nsd
, ee_relation_data
, cached_vnfds
, vnf_profile_id
: str = None
2954 ee_relation_data
= safe_get_ee_relation(
2955 nsr_id
, ee_relation_data
, vnf_profile_id
=vnf_profile_id
2957 ee_relation_level
= EELevel
.get_level(ee_relation_data
)
2958 if (ee_relation_level
in (EELevel
.VNF
, EELevel
.VDU
)) and not ee_relation_data
[
2959 "execution-environment-ref"
2961 vnf_profile
= get_vnf_profile(nsd
, ee_relation_data
["vnf-profile-id"])
2962 vnfd_id
= vnf_profile
["vnfd-id"]
2963 project
= nsd
["_admin"]["projects_read"][0]
2964 db_vnfd
= self
._get
_vnfd
(vnfd_id
, project
, cached_vnfds
)
2967 if ee_relation_level
== EELevel
.VNF
2968 else ee_relation_data
["vdu-profile-id"]
2970 ee
= get_juju_ee_ref(db_vnfd
, entity_id
)
2973 f
"not execution environments found for ee_relation {ee_relation_data}"
2975 ee_relation_data
["execution-environment-ref"] = ee
["id"]
2976 return ee_relation_data
2978 def _get_ns_relations(
2981 nsd
: Dict
[str, Any
],
2983 cached_vnfds
: Dict
[str, Any
],
2984 ) -> List
[Relation
]:
2986 db_ns_relations
= get_ns_configuration_relation_list(nsd
)
2987 for r
in db_ns_relations
:
2988 provider_dict
= None
2989 requirer_dict
= None
2990 if all(key
in r
for key
in ("provider", "requirer")):
2991 provider_dict
= r
["provider"]
2992 requirer_dict
= r
["requirer"]
2993 elif "entities" in r
:
2994 provider_id
= r
["entities"][0]["id"]
2997 "endpoint": r
["entities"][0]["endpoint"],
2999 if provider_id
!= nsd
["id"]:
3000 provider_dict
["vnf-profile-id"] = provider_id
3001 requirer_id
= r
["entities"][1]["id"]
3004 "endpoint": r
["entities"][1]["endpoint"],
3006 if requirer_id
!= nsd
["id"]:
3007 requirer_dict
["vnf-profile-id"] = requirer_id
3010 "provider/requirer or entities must be included in the relation."
3012 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3013 nsr_id
, nsd
, provider_dict
, cached_vnfds
3015 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3016 nsr_id
, nsd
, requirer_dict
, cached_vnfds
3018 provider
= EERelation(relation_provider
)
3019 requirer
= EERelation(relation_requirer
)
3020 relation
= Relation(r
["name"], provider
, requirer
)
3021 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
3023 relations
.append(relation
)
3026 def _get_vnf_relations(
3029 nsd
: Dict
[str, Any
],
3031 cached_vnfds
: Dict
[str, Any
],
3032 ) -> List
[Relation
]:
3034 vnf_profile
= get_vnf_profile(nsd
, vca
.vnf_profile_id
)
3035 vnf_profile_id
= vnf_profile
["id"]
3036 vnfd_id
= vnf_profile
["vnfd-id"]
3037 project
= nsd
["_admin"]["projects_read"][0]
3038 db_vnfd
= self
._get
_vnfd
(vnfd_id
, project
, cached_vnfds
)
3039 db_vnf_relations
= get_relation_list(db_vnfd
, vnfd_id
)
3040 for r
in db_vnf_relations
:
3041 provider_dict
= None
3042 requirer_dict
= None
3043 if all(key
in r
for key
in ("provider", "requirer")):
3044 provider_dict
= r
["provider"]
3045 requirer_dict
= r
["requirer"]
3046 elif "entities" in r
:
3047 provider_id
= r
["entities"][0]["id"]
3050 "vnf-profile-id": vnf_profile_id
,
3051 "endpoint": r
["entities"][0]["endpoint"],
3053 if provider_id
!= vnfd_id
:
3054 provider_dict
["vdu-profile-id"] = provider_id
3055 requirer_id
= r
["entities"][1]["id"]
3058 "vnf-profile-id": vnf_profile_id
,
3059 "endpoint": r
["entities"][1]["endpoint"],
3061 if requirer_id
!= vnfd_id
:
3062 requirer_dict
["vdu-profile-id"] = requirer_id
3065 "provider/requirer or entities must be included in the relation."
3067 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3068 nsr_id
, nsd
, provider_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3070 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3071 nsr_id
, nsd
, requirer_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3073 provider
= EERelation(relation_provider
)
3074 requirer
= EERelation(relation_requirer
)
3075 relation
= Relation(r
["name"], provider
, requirer
)
3076 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
3078 relations
.append(relation
)
3081 def _get_kdu_resource_data(
3083 ee_relation
: EERelation
,
3084 db_nsr
: Dict
[str, Any
],
3085 cached_vnfds
: Dict
[str, Any
],
3086 ) -> DeployedK8sResource
:
3087 nsd
= get_nsd(db_nsr
)
3088 vnf_profiles
= get_vnf_profiles(nsd
)
3089 vnfd_id
= find_in_list(
3091 lambda vnf_profile
: vnf_profile
["id"] == ee_relation
.vnf_profile_id
,
3093 project
= nsd
["_admin"]["projects_read"][0]
3094 db_vnfd
= self
._get
_vnfd
(vnfd_id
, project
, cached_vnfds
)
3095 kdu_resource_profile
= get_kdu_resource_profile(
3096 db_vnfd
, ee_relation
.kdu_resource_profile_id
3098 kdu_name
= kdu_resource_profile
["kdu-name"]
3099 deployed_kdu
, _
= get_deployed_kdu(
3100 db_nsr
.get("_admin", ()).get("deployed", ()),
3102 ee_relation
.vnf_profile_id
,
3104 deployed_kdu
.update({"resource-name": kdu_resource_profile
["resource-name"]})
3107 def _get_deployed_component(
3109 ee_relation
: EERelation
,
3110 db_nsr
: Dict
[str, Any
],
3111 cached_vnfds
: Dict
[str, Any
],
3112 ) -> DeployedComponent
:
3113 nsr_id
= db_nsr
["_id"]
3114 deployed_component
= None
3115 ee_level
= EELevel
.get_level(ee_relation
)
3116 if ee_level
== EELevel
.NS
:
3117 vca
= get_deployed_vca(db_nsr
, {"vdu_id": None, "member-vnf-index": None})
3119 deployed_component
= DeployedVCA(nsr_id
, vca
)
3120 elif ee_level
== EELevel
.VNF
:
3121 vca
= get_deployed_vca(
3125 "member-vnf-index": ee_relation
.vnf_profile_id
,
3126 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3130 deployed_component
= DeployedVCA(nsr_id
, vca
)
3131 elif ee_level
== EELevel
.VDU
:
3132 vca
= get_deployed_vca(
3135 "vdu_id": ee_relation
.vdu_profile_id
,
3136 "member-vnf-index": ee_relation
.vnf_profile_id
,
3137 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3141 deployed_component
= DeployedVCA(nsr_id
, vca
)
3142 elif ee_level
== EELevel
.KDU
:
3143 kdu_resource_data
= self
._get
_kdu
_resource
_data
(
3144 ee_relation
, db_nsr
, cached_vnfds
3146 if kdu_resource_data
:
3147 deployed_component
= DeployedK8sResource(kdu_resource_data
)
3148 return deployed_component
3150 async def _add_relation(
3154 db_nsr
: Dict
[str, Any
],
3155 cached_vnfds
: Dict
[str, Any
],
3156 cached_vnfrs
: Dict
[str, Any
],
3158 deployed_provider
= self
._get
_deployed
_component
(
3159 relation
.provider
, db_nsr
, cached_vnfds
3161 deployed_requirer
= self
._get
_deployed
_component
(
3162 relation
.requirer
, db_nsr
, cached_vnfds
3166 and deployed_requirer
3167 and deployed_provider
.config_sw_installed
3168 and deployed_requirer
.config_sw_installed
3170 provider_db_vnfr
= (
3172 relation
.provider
.nsr_id
,
3173 relation
.provider
.vnf_profile_id
,
3176 if relation
.provider
.vnf_profile_id
3179 requirer_db_vnfr
= (
3181 relation
.requirer
.nsr_id
,
3182 relation
.requirer
.vnf_profile_id
,
3185 if relation
.requirer
.vnf_profile_id
3188 provider_vca_id
= self
.get_vca_id(provider_db_vnfr
, db_nsr
)
3189 requirer_vca_id
= self
.get_vca_id(requirer_db_vnfr
, db_nsr
)
3190 provider_relation_endpoint
= RelationEndpoint(
3191 deployed_provider
.ee_id
,
3193 relation
.provider
.endpoint
,
3195 requirer_relation_endpoint
= RelationEndpoint(
3196 deployed_requirer
.ee_id
,
3198 relation
.requirer
.endpoint
,
3200 await self
.vca_map
[vca_type
].add_relation(
3201 provider
=provider_relation_endpoint
,
3202 requirer
=requirer_relation_endpoint
,
3204 # remove entry from relations list
3208 async def _add_vca_relations(
3214 timeout
: int = 3600,
3217 # 1. find all relations for this VCA
3218 # 2. wait for other peers related
3222 # STEP 1: find all relations for this VCA
3225 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3226 nsd
= get_nsd(db_nsr
)
3229 deployed_vca_dict
= get_deployed_vca_list(db_nsr
)[vca_index
]
3230 my_vca
= DeployedVCA(nsr_id
, deployed_vca_dict
)
3235 relations
.extend(self
._get
_ns
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3236 relations
.extend(self
._get
_vnf
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3238 # if no relations, terminate
3240 self
.logger
.debug(logging_text
+ " No relations")
3243 self
.logger
.debug(logging_text
+ " adding relations {}".format(relations
))
3250 if now
- start
>= timeout
:
3251 self
.logger
.error(logging_text
+ " : timeout adding relations")
3254 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3255 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3257 # for each relation, find the VCA's related
3258 for relation
in relations
.copy():
3259 added
= await self
._add
_relation
(
3267 relations
.remove(relation
)
3270 self
.logger
.debug("Relations added")
3272 await asyncio
.sleep(5.0)
3276 except Exception as e
:
3277 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
3280 async def _install_kdu(
3288 k8s_instance_info
: dict,
3289 k8params
: dict = None,
3294 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
3297 "collection": "nsrs",
3298 "filter": {"_id": nsr_id
},
3299 "path": nsr_db_path
,
3302 if k8s_instance_info
.get("kdu-deployment-name"):
3303 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
3305 kdu_instance
= self
.k8scluster_map
[
3307 ].generate_kdu_instance_name(
3308 db_dict
=db_dict_install
,
3309 kdu_model
=k8s_instance_info
["kdu-model"],
3310 kdu_name
=k8s_instance_info
["kdu-name"],
3313 # Update the nsrs table with the kdu-instance value
3317 _desc
={nsr_db_path
+ ".kdu-instance": kdu_instance
},
3320 # Update the nsrs table with the actual namespace being used, if the k8scluster-type is `juju` or
3321 # `juju-bundle`. This verification is needed because there is not a standard/homogeneous namespace
3322 # between the Helm Charts and Juju Bundles-based KNFs. If we found a way of having an homogeneous
3323 # namespace, this first verification could be removed, and the next step would be done for any kind
3325 # TODO -> find a way to have an homogeneous namespace between the Helm Charts and Juju Bundles-based
3326 # KNFs (Bug 2027: https://osm.etsi.org/bugzilla/show_bug.cgi?id=2027)
3327 if k8sclustertype
in ("juju", "juju-bundle"):
3328 # First, verify if the current namespace is present in the `_admin.projects_read` (if not, it means
3329 # that the user passed a namespace which he wants its KDU to be deployed in)
3335 "_admin.projects_write": k8s_instance_info
["namespace"],
3336 "_admin.projects_read": k8s_instance_info
["namespace"],
3342 f
"Updating namespace/model for Juju Bundle from {k8s_instance_info['namespace']} to {kdu_instance}"
3347 _desc
={f
"{nsr_db_path}.namespace": kdu_instance
},
3349 k8s_instance_info
["namespace"] = kdu_instance
3351 await self
.k8scluster_map
[k8sclustertype
].install(
3352 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3353 kdu_model
=k8s_instance_info
["kdu-model"],
3356 db_dict
=db_dict_install
,
3358 kdu_name
=k8s_instance_info
["kdu-name"],
3359 namespace
=k8s_instance_info
["namespace"],
3360 kdu_instance
=kdu_instance
,
3364 # Obtain services to obtain management service ip
3365 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
3366 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3367 kdu_instance
=kdu_instance
,
3368 namespace
=k8s_instance_info
["namespace"],
3371 # Obtain management service info (if exists)
3372 vnfr_update_dict
= {}
3373 kdu_config
= get_configuration(vnfd
, kdud
["name"])
3375 target_ee_list
= kdu_config
.get("execution-environment-list", [])
3380 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
3383 for service
in kdud
.get("service", [])
3384 if service
.get("mgmt-service")
3386 for mgmt_service
in mgmt_services
:
3387 for service
in services
:
3388 if service
["name"].startswith(mgmt_service
["name"]):
3389 # Mgmt service found, Obtain service ip
3390 ip
= service
.get("external_ip", service
.get("cluster_ip"))
3391 if isinstance(ip
, list) and len(ip
) == 1:
3395 "kdur.{}.ip-address".format(kdu_index
)
3398 # Check if must update also mgmt ip at the vnf
3399 service_external_cp
= mgmt_service
.get(
3400 "external-connection-point-ref"
3402 if service_external_cp
:
3404 deep_get(vnfd
, ("mgmt-interface", "cp"))
3405 == service_external_cp
3407 vnfr_update_dict
["ip-address"] = ip
3412 "external-connection-point-ref", ""
3414 == service_external_cp
,
3417 "kdur.{}.ip-address".format(kdu_index
)
3422 "Mgmt service name: {} not found".format(
3423 mgmt_service
["name"]
3427 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3428 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3430 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3433 and kdu_config
.get("initial-config-primitive")
3434 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3436 initial_config_primitive_list
= kdu_config
.get(
3437 "initial-config-primitive"
3439 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3441 for initial_config_primitive
in initial_config_primitive_list
:
3442 primitive_params_
= self
._map
_primitive
_params
(
3443 initial_config_primitive
, {}, {}
3446 await asyncio
.wait_for(
3447 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3448 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3449 kdu_instance
=kdu_instance
,
3450 primitive_name
=initial_config_primitive
["name"],
3451 params
=primitive_params_
,
3452 db_dict
=db_dict_install
,
3458 except Exception as e
:
3459 # Prepare update db with error and raise exception
3462 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3466 vnfr_data
.get("_id"),
3467 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3470 # ignore to keep original exception
3472 # reraise original error
3477 async def deploy_kdus(
3484 task_instantiation_info
,
3486 # Launch kdus if present in the descriptor
3488 k8scluster_id_2_uuic
= {
3489 "helm-chart-v3": {},
3494 async def _get_cluster_id(cluster_id
, cluster_type
):
3495 nonlocal k8scluster_id_2_uuic
3496 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3497 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3499 # check if K8scluster is creating and wait look if previous tasks in process
3500 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3501 "k8scluster", cluster_id
3504 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3505 task_name
, cluster_id
3507 self
.logger
.debug(logging_text
+ text
)
3508 await asyncio
.wait(task_dependency
, timeout
=3600)
3510 db_k8scluster
= self
.db
.get_one(
3511 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3513 if not db_k8scluster
:
3514 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3516 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3518 if cluster_type
== "helm-chart-v3":
3520 # backward compatibility for existing clusters that have not been initialized for helm v3
3521 k8s_credentials
= yaml
.safe_dump(
3522 db_k8scluster
.get("credentials")
3524 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3525 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3527 db_k8scluster_update
= {}
3528 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3529 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3530 db_k8scluster_update
[
3531 "_admin.helm-chart-v3.created"
3533 db_k8scluster_update
[
3534 "_admin.helm-chart-v3.operationalState"
3537 "k8sclusters", cluster_id
, db_k8scluster_update
3539 except Exception as e
:
3542 + "error initializing helm-v3 cluster: {}".format(str(e
))
3545 "K8s cluster '{}' has not been initialized for '{}'".format(
3546 cluster_id
, cluster_type
3551 "K8s cluster '{}' has not been initialized for '{}'".format(
3552 cluster_id
, cluster_type
3555 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3558 logging_text
+= "Deploy kdus: "
3561 db_nsr_update
= {"_admin.deployed.K8s": []}
3562 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3565 updated_cluster_list
= []
3566 updated_v3_cluster_list
= []
3568 for vnfr_data
in db_vnfrs
.values():
3569 vca_id
= self
.get_vca_id(vnfr_data
, {})
3570 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3571 # Step 0: Prepare and set parameters
3572 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3573 vnfd_id
= vnfr_data
.get("vnfd-id")
3574 vnfd_with_id
= find_in_list(
3575 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3579 for kdud
in vnfd_with_id
["kdu"]
3580 if kdud
["name"] == kdur
["kdu-name"]
3582 namespace
= kdur
.get("k8s-namespace")
3583 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3584 if kdur
.get("helm-chart"):
3585 kdumodel
= kdur
["helm-chart"]
3586 # Default version: helm3, if helm-version is v2 assign v2
3587 k8sclustertype
= "helm-chart-v3"
3588 self
.logger
.debug("kdur: {}".format(kdur
))
3590 kdur
.get("helm-version")
3591 and kdur
.get("helm-version") == "v2"
3593 k8sclustertype
= "helm-chart"
3594 elif kdur
.get("juju-bundle"):
3595 kdumodel
= kdur
["juju-bundle"]
3596 k8sclustertype
= "juju-bundle"
3599 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3600 "juju-bundle. Maybe an old NBI version is running".format(
3601 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3604 # check if kdumodel is a file and exists
3606 vnfd_with_id
= find_in_list(
3607 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3609 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3610 if storage
: # may be not present if vnfd has not artifacts
3611 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3612 if storage
["pkg-dir"]:
3613 filename
= "{}/{}/{}s/{}".format(
3620 filename
= "{}/Scripts/{}s/{}".format(
3625 if self
.fs
.file_exists(
3626 filename
, mode
="file"
3627 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3628 kdumodel
= self
.fs
.path
+ filename
3629 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3631 except Exception: # it is not a file
3634 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3635 step
= "Synchronize repos for k8s cluster '{}'".format(
3638 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3642 k8sclustertype
== "helm-chart"
3643 and cluster_uuid
not in updated_cluster_list
3645 k8sclustertype
== "helm-chart-v3"
3646 and cluster_uuid
not in updated_v3_cluster_list
3648 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3649 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3650 cluster_uuid
=cluster_uuid
3653 if del_repo_list
or added_repo_dict
:
3654 if k8sclustertype
== "helm-chart":
3656 "_admin.helm_charts_added." + item
: None
3657 for item
in del_repo_list
3660 "_admin.helm_charts_added." + item
: name
3661 for item
, name
in added_repo_dict
.items()
3663 updated_cluster_list
.append(cluster_uuid
)
3664 elif k8sclustertype
== "helm-chart-v3":
3666 "_admin.helm_charts_v3_added." + item
: None
3667 for item
in del_repo_list
3670 "_admin.helm_charts_v3_added." + item
: name
3671 for item
, name
in added_repo_dict
.items()
3673 updated_v3_cluster_list
.append(cluster_uuid
)
3675 logging_text
+ "repos synchronized on k8s cluster "
3676 "'{}' to_delete: {}, to_add: {}".format(
3677 k8s_cluster_id
, del_repo_list
, added_repo_dict
3682 {"_id": k8s_cluster_id
},
3688 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3689 vnfr_data
["member-vnf-index-ref"],
3693 k8s_instance_info
= {
3694 "kdu-instance": None,
3695 "k8scluster-uuid": cluster_uuid
,
3696 "k8scluster-type": k8sclustertype
,
3697 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3698 "kdu-name": kdur
["kdu-name"],
3699 "kdu-model": kdumodel
,
3700 "namespace": namespace
,
3701 "kdu-deployment-name": kdu_deployment_name
,
3703 db_path
= "_admin.deployed.K8s.{}".format(index
)
3704 db_nsr_update
[db_path
] = k8s_instance_info
3705 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3706 vnfd_with_id
= find_in_list(
3707 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3709 task
= asyncio
.ensure_future(
3718 k8params
=desc_params
,
3723 self
.lcm_tasks
.register(
3727 "instantiate_KDU-{}".format(index
),
3730 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3736 except (LcmException
, asyncio
.CancelledError
):
3738 except Exception as e
:
3739 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3740 if isinstance(e
, (N2VCException
, DbException
)):
3741 self
.logger
.error(logging_text
+ msg
)
3743 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3744 raise LcmException(msg
)
3747 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3766 task_instantiation_info
,
3769 # launch instantiate_N2VC in a asyncio task and register task object
3770 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3771 # if not found, create one entry and update database
3772 # fill db_nsr._admin.deployed.VCA.<index>
3775 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
3777 if "execution-environment-list" in descriptor_config
:
3778 ee_list
= descriptor_config
.get("execution-environment-list", [])
3779 elif "juju" in descriptor_config
:
3780 ee_list
= [descriptor_config
] # ns charms
3781 else: # other types as script are not supported
3784 for ee_item
in ee_list
:
3787 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3788 ee_item
.get("juju"), ee_item
.get("helm-chart")
3791 ee_descriptor_id
= ee_item
.get("id")
3792 if ee_item
.get("juju"):
3793 vca_name
= ee_item
["juju"].get("charm")
3796 if ee_item
["juju"].get("charm") is not None
3799 if ee_item
["juju"].get("cloud") == "k8s":
3800 vca_type
= "k8s_proxy_charm"
3801 elif ee_item
["juju"].get("proxy") is False:
3802 vca_type
= "native_charm"
3803 elif ee_item
.get("helm-chart"):
3804 vca_name
= ee_item
["helm-chart"]
3805 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
3808 vca_type
= "helm-v3"
3811 logging_text
+ "skipping non juju neither charm configuration"
3816 for vca_index
, vca_deployed
in enumerate(
3817 db_nsr
["_admin"]["deployed"]["VCA"]
3819 if not vca_deployed
:
3822 vca_deployed
.get("member-vnf-index") == member_vnf_index
3823 and vca_deployed
.get("vdu_id") == vdu_id
3824 and vca_deployed
.get("kdu_name") == kdu_name
3825 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
3826 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
3830 # not found, create one.
3832 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
3835 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
3837 target
+= "/kdu/{}".format(kdu_name
)
3839 "target_element": target
,
3840 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3841 "member-vnf-index": member_vnf_index
,
3843 "kdu_name": kdu_name
,
3844 "vdu_count_index": vdu_index
,
3845 "operational-status": "init", # TODO revise
3846 "detailed-status": "", # TODO revise
3847 "step": "initial-deploy", # TODO revise
3849 "vdu_name": vdu_name
,
3851 "ee_descriptor_id": ee_descriptor_id
,
3855 # create VCA and configurationStatus in db
3857 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
3858 "configurationStatus.{}".format(vca_index
): dict(),
3860 self
.update_db_2("nsrs", nsr_id
, db_dict
)
3862 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
3864 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
3865 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
3866 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
3869 task_n2vc
= asyncio
.ensure_future(
3870 self
.instantiate_N2VC(
3871 logging_text
=logging_text
,
3872 vca_index
=vca_index
,
3878 vdu_index
=vdu_index
,
3879 deploy_params
=deploy_params
,
3880 config_descriptor
=descriptor_config
,
3881 base_folder
=base_folder
,
3882 nslcmop_id
=nslcmop_id
,
3886 ee_config_descriptor
=ee_item
,
3889 self
.lcm_tasks
.register(
3893 "instantiate_N2VC-{}".format(vca_index
),
3896 task_instantiation_info
[
3898 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
3899 member_vnf_index
or "", vdu_id
or ""
3903 def _create_nslcmop(nsr_id
, operation
, params
):
3905 Creates a ns-lcm-opp content to be stored at database.
3906 :param nsr_id: internal id of the instance
3907 :param operation: instantiate, terminate, scale, action, ...
3908 :param params: user parameters for the operation
3909 :return: dictionary following SOL005 format
3911 # Raise exception if invalid arguments
3912 if not (nsr_id
and operation
and params
):
3914 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3921 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3922 "operationState": "PROCESSING",
3923 "statusEnteredTime": now
,
3924 "nsInstanceId": nsr_id
,
3925 "lcmOperationType": operation
,
3927 "isAutomaticInvocation": False,
3928 "operationParams": params
,
3929 "isCancelPending": False,
3931 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
3932 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
3937 def _format_additional_params(self
, params
):
3938 params
= params
or {}
3939 for key
, value
in params
.items():
3940 if str(value
).startswith("!!yaml "):
3941 params
[key
] = yaml
.safe_load(value
[7:])
3944 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
3945 primitive
= seq
.get("name")
3946 primitive_params
= {}
3948 "member_vnf_index": vnf_index
,
3949 "primitive": primitive
,
3950 "primitive_params": primitive_params
,
3953 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
3957 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
3958 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
3959 if op
.get("operationState") == "COMPLETED":
3960 # b. Skip sub-operation
3961 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3962 return self
.SUBOPERATION_STATUS_SKIP
3964 # c. retry executing sub-operation
3965 # The sub-operation exists, and operationState != 'COMPLETED'
3966 # Update operationState = 'PROCESSING' to indicate a retry.
3967 operationState
= "PROCESSING"
3968 detailed_status
= "In progress"
3969 self
._update
_suboperation
_status
(
3970 db_nslcmop
, op_index
, operationState
, detailed_status
3972 # Return the sub-operation index
3973 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3974 # with arguments extracted from the sub-operation
3977 # Find a sub-operation where all keys in a matching dictionary must match
3978 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3979 def _find_suboperation(self
, db_nslcmop
, match
):
3980 if db_nslcmop
and match
:
3981 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
3982 for i
, op
in enumerate(op_list
):
3983 if all(op
.get(k
) == match
[k
] for k
in match
):
3985 return self
.SUBOPERATION_STATUS_NOT_FOUND
3987 # Update status for a sub-operation given its index
3988 def _update_suboperation_status(
3989 self
, db_nslcmop
, op_index
, operationState
, detailed_status
3991 # Update DB for HA tasks
3992 q_filter
= {"_id": db_nslcmop
["_id"]}
3994 "_admin.operations.{}.operationState".format(op_index
): operationState
,
3995 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
3998 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
4001 # Add sub-operation, return the index of the added sub-operation
4002 # Optionally, set operationState, detailed-status, and operationType
4003 # Status and type are currently set for 'scale' sub-operations:
4004 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
4005 # 'detailed-status' : status message
4006 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
4007 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
4008 def _add_suboperation(
4016 mapped_primitive_params
,
4017 operationState
=None,
4018 detailed_status
=None,
4021 RO_scaling_info
=None,
4024 return self
.SUBOPERATION_STATUS_NOT_FOUND
4025 # Get the "_admin.operations" list, if it exists
4026 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
4027 op_list
= db_nslcmop_admin
.get("operations")
4028 # Create or append to the "_admin.operations" list
4030 "member_vnf_index": vnf_index
,
4032 "vdu_count_index": vdu_count_index
,
4033 "primitive": primitive
,
4034 "primitive_params": mapped_primitive_params
,
4037 new_op
["operationState"] = operationState
4039 new_op
["detailed-status"] = detailed_status
4041 new_op
["lcmOperationType"] = operationType
4043 new_op
["RO_nsr_id"] = RO_nsr_id
4045 new_op
["RO_scaling_info"] = RO_scaling_info
4047 # No existing operations, create key 'operations' with current operation as first list element
4048 db_nslcmop_admin
.update({"operations": [new_op
]})
4049 op_list
= db_nslcmop_admin
.get("operations")
4051 # Existing operations, append operation to list
4052 op_list
.append(new_op
)
4054 db_nslcmop_update
= {"_admin.operations": op_list
}
4055 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
4056 op_index
= len(op_list
) - 1
4059 # Helper methods for scale() sub-operations
4061 # pre-scale/post-scale:
4062 # Check for 3 different cases:
4063 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
4064 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
4065 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
4066 def _check_or_add_scale_suboperation(
4070 vnf_config_primitive
,
4074 RO_scaling_info
=None,
4076 # Find this sub-operation
4077 if RO_nsr_id
and RO_scaling_info
:
4078 operationType
= "SCALE-RO"
4080 "member_vnf_index": vnf_index
,
4081 "RO_nsr_id": RO_nsr_id
,
4082 "RO_scaling_info": RO_scaling_info
,
4086 "member_vnf_index": vnf_index
,
4087 "primitive": vnf_config_primitive
,
4088 "primitive_params": primitive_params
,
4089 "lcmOperationType": operationType
,
4091 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
4092 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
4093 # a. New sub-operation
4094 # The sub-operation does not exist, add it.
4095 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
4096 # The following parameters are set to None for all kind of scaling:
4098 vdu_count_index
= None
4100 if RO_nsr_id
and RO_scaling_info
:
4101 vnf_config_primitive
= None
4102 primitive_params
= None
4105 RO_scaling_info
= None
4106 # Initial status for sub-operation
4107 operationState
= "PROCESSING"
4108 detailed_status
= "In progress"
4109 # Add sub-operation for pre/post-scaling (zero or more operations)
4110 self
._add
_suboperation
(
4116 vnf_config_primitive
,
4124 return self
.SUBOPERATION_STATUS_NEW
4126 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
4127 # or op_index (operationState != 'COMPLETED')
4128 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
4130 # Function to return execution_environment id
4132 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
4133 # TODO vdu_index_count
4134 for vca
in vca_deployed_list
:
4135 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
4138 async def destroy_N2VC(
4146 exec_primitives
=True,
4151 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
4152 :param logging_text:
4154 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
4155 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
4156 :param vca_index: index in the database _admin.deployed.VCA
4157 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
4158 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
4159 not executed properly
4160 :param scaling_in: True destroys the application, False destroys the model
4161 :return: None or exception
4166 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
4167 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
4171 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
4173 # execute terminate_primitives
4175 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
4176 config_descriptor
.get("terminate-config-primitive"),
4177 vca_deployed
.get("ee_descriptor_id"),
4179 vdu_id
= vca_deployed
.get("vdu_id")
4180 vdu_count_index
= vca_deployed
.get("vdu_count_index")
4181 vdu_name
= vca_deployed
.get("vdu_name")
4182 vnf_index
= vca_deployed
.get("member-vnf-index")
4183 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
4184 for seq
in terminate_primitives
:
4185 # For each sequence in list, get primitive and call _ns_execute_primitive()
4186 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
4187 vnf_index
, seq
.get("name")
4189 self
.logger
.debug(logging_text
+ step
)
4190 # Create the primitive for each sequence, i.e. "primitive": "touch"
4191 primitive
= seq
.get("name")
4192 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
4197 self
._add
_suboperation
(
4204 mapped_primitive_params
,
4206 # Sub-operations: Call _ns_execute_primitive() instead of action()
4208 result
, result_detail
= await self
._ns
_execute
_primitive
(
4209 vca_deployed
["ee_id"],
4211 mapped_primitive_params
,
4215 except LcmException
:
4216 # this happens when VCA is not deployed. In this case it is not needed to terminate
4218 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
4219 if result
not in result_ok
:
4221 "terminate_primitive {} for vnf_member_index={} fails with "
4222 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
4224 # set that this VCA do not need terminated
4225 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
4229 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
4232 # Delete Prometheus Jobs if any
4233 # This uses NSR_ID, so it will destroy any jobs under this index
4234 self
.db
.del_list("prometheus_jobs", {"nsr_id": db_nslcmop
["nsInstanceId"]})
4237 await self
.vca_map
[vca_type
].delete_execution_environment(
4238 vca_deployed
["ee_id"],
4239 scaling_in
=scaling_in
,
4244 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
4245 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
4246 namespace
= "." + db_nsr
["_id"]
4248 await self
.n2vc
.delete_namespace(
4249 namespace
=namespace
,
4250 total_timeout
=self
.timeout_charm_delete
,
4253 except N2VCNotFound
: # already deleted. Skip
4255 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
4257 async def _terminate_RO(
4258 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4261 Terminates a deployment from RO
4262 :param logging_text:
4263 :param nsr_deployed: db_nsr._admin.deployed
4266 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
4267 this method will update only the index 2, but it will write on database the concatenated content of the list
4272 ro_nsr_id
= ro_delete_action
= None
4273 if nsr_deployed
and nsr_deployed
.get("RO"):
4274 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
4275 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
4278 stage
[2] = "Deleting ns from VIM."
4279 db_nsr_update
["detailed-status"] = " ".join(stage
)
4280 self
._write
_op
_status
(nslcmop_id
, stage
)
4281 self
.logger
.debug(logging_text
+ stage
[2])
4282 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4283 self
._write
_op
_status
(nslcmop_id
, stage
)
4284 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
4285 ro_delete_action
= desc
["action_id"]
4287 "_admin.deployed.RO.nsr_delete_action_id"
4288 ] = ro_delete_action
4289 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4290 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4291 if ro_delete_action
:
4292 # wait until NS is deleted from VIM
4293 stage
[2] = "Waiting ns deleted from VIM."
4294 detailed_status_old
= None
4298 + " RO_id={} ro_delete_action={}".format(
4299 ro_nsr_id
, ro_delete_action
4302 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4303 self
._write
_op
_status
(nslcmop_id
, stage
)
4305 delete_timeout
= 20 * 60 # 20 minutes
4306 while delete_timeout
> 0:
4307 desc
= await self
.RO
.show(
4309 item_id_name
=ro_nsr_id
,
4310 extra_item
="action",
4311 extra_item_id
=ro_delete_action
,
4315 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
4317 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
4318 if ns_status
== "ERROR":
4319 raise ROclient
.ROClientException(ns_status_info
)
4320 elif ns_status
== "BUILD":
4321 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
4322 elif ns_status
== "ACTIVE":
4323 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4324 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4329 ), "ROclient.check_action_status returns unknown {}".format(
4332 if stage
[2] != detailed_status_old
:
4333 detailed_status_old
= stage
[2]
4334 db_nsr_update
["detailed-status"] = " ".join(stage
)
4335 self
._write
_op
_status
(nslcmop_id
, stage
)
4336 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4337 await asyncio
.sleep(5, loop
=self
.loop
)
4339 else: # delete_timeout <= 0:
4340 raise ROclient
.ROClientException(
4341 "Timeout waiting ns deleted from VIM"
4344 except Exception as e
:
4345 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4347 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4349 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4350 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4351 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4353 logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
)
4356 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4358 failed_detail
.append("delete conflict: {}".format(e
))
4361 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
)
4364 failed_detail
.append("delete error: {}".format(e
))
4366 logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
)
4370 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
4371 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
4373 stage
[2] = "Deleting nsd from RO."
4374 db_nsr_update
["detailed-status"] = " ".join(stage
)
4375 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4376 self
._write
_op
_status
(nslcmop_id
, stage
)
4377 await self
.RO
.delete("nsd", ro_nsd_id
)
4379 logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
)
4381 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4382 except Exception as e
:
4384 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4386 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4388 logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
)
4391 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4393 failed_detail
.append(
4394 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
)
4396 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4398 failed_detail
.append(
4399 "ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
)
4401 self
.logger
.error(logging_text
+ failed_detail
[-1])
4403 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
4404 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
4405 if not vnf_deployed
or not vnf_deployed
["id"]:
4408 ro_vnfd_id
= vnf_deployed
["id"]
4411 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4412 vnf_deployed
["member-vnf-index"], ro_vnfd_id
4414 db_nsr_update
["detailed-status"] = " ".join(stage
)
4415 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4416 self
._write
_op
_status
(nslcmop_id
, stage
)
4417 await self
.RO
.delete("vnfd", ro_vnfd_id
)
4419 logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
)
4421 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
4422 except Exception as e
:
4424 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4427 "_admin.deployed.RO.vnfd.{}.id".format(index
)
4431 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
)
4434 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4436 failed_detail
.append(
4437 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
)
4439 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4441 failed_detail
.append(
4442 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
)
4444 self
.logger
.error(logging_text
+ failed_detail
[-1])
4447 stage
[2] = "Error deleting from VIM"
4449 stage
[2] = "Deleted from VIM"
4450 db_nsr_update
["detailed-status"] = " ".join(stage
)
4451 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4452 self
._write
_op
_status
(nslcmop_id
, stage
)
4455 raise LcmException("; ".join(failed_detail
))
4457 async def terminate(self
, nsr_id
, nslcmop_id
):
4458 # Try to lock HA task here
4459 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4460 if not task_is_locked_by_me
:
4463 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4464 self
.logger
.debug(logging_text
+ "Enter")
4465 timeout_ns_terminate
= self
.timeout_ns_terminate
4468 operation_params
= None
4470 error_list
= [] # annotates all failed error messages
4471 db_nslcmop_update
= {}
4472 autoremove
= False # autoremove after terminated
4473 tasks_dict_info
= {}
4476 "Stage 1/3: Preparing task.",
4477 "Waiting for previous operations to terminate.",
4480 # ^ contains [stage, step, VIM-status]
4482 # wait for any previous tasks in process
4483 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4485 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4486 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4487 operation_params
= db_nslcmop
.get("operationParams") or {}
4488 if operation_params
.get("timeout_ns_terminate"):
4489 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4490 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4491 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4493 db_nsr_update
["operational-status"] = "terminating"
4494 db_nsr_update
["config-status"] = "terminating"
4495 self
._write
_ns
_status
(
4497 ns_state
="TERMINATING",
4498 current_operation
="TERMINATING",
4499 current_operation_id
=nslcmop_id
,
4500 other_update
=db_nsr_update
,
4502 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4503 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4504 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4507 stage
[1] = "Getting vnf descriptors from db."
4508 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4510 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4512 db_vnfds_from_id
= {}
4513 db_vnfds_from_member_index
= {}
4515 for vnfr
in db_vnfrs_list
:
4516 vnfd_id
= vnfr
["vnfd-id"]
4517 if vnfd_id
not in db_vnfds_from_id
:
4518 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4519 db_vnfds_from_id
[vnfd_id
] = vnfd
4520 db_vnfds_from_member_index
[
4521 vnfr
["member-vnf-index-ref"]
4522 ] = db_vnfds_from_id
[vnfd_id
]
4524 # Destroy individual execution environments when there are terminating primitives.
4525 # Rest of EE will be deleted at once
4526 # TODO - check before calling _destroy_N2VC
4527 # if not operation_params.get("skip_terminate_primitives"):#
4528 # or not vca.get("needed_terminate"):
4529 stage
[0] = "Stage 2/3 execute terminating primitives."
4530 self
.logger
.debug(logging_text
+ stage
[0])
4531 stage
[1] = "Looking execution environment that needs terminate."
4532 self
.logger
.debug(logging_text
+ stage
[1])
4534 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4535 config_descriptor
= None
4536 vca_member_vnf_index
= vca
.get("member-vnf-index")
4537 vca_id
= self
.get_vca_id(
4538 db_vnfrs_dict
.get(vca_member_vnf_index
)
4539 if vca_member_vnf_index
4543 if not vca
or not vca
.get("ee_id"):
4545 if not vca
.get("member-vnf-index"):
4547 config_descriptor
= db_nsr
.get("ns-configuration")
4548 elif vca
.get("vdu_id"):
4549 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4550 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4551 elif vca
.get("kdu_name"):
4552 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4553 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4555 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4556 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4557 vca_type
= vca
.get("type")
4558 exec_terminate_primitives
= not operation_params
.get(
4559 "skip_terminate_primitives"
4560 ) and vca
.get("needed_terminate")
4561 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4562 # pending native charms
4564 True if vca_type
in ("helm", "helm-v3", "native_charm") else False
4566 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4567 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4568 task
= asyncio
.ensure_future(
4576 exec_terminate_primitives
,
4580 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4582 # wait for pending tasks of terminate primitives
4586 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4588 error_list
= await self
._wait
_for
_tasks
(
4591 min(self
.timeout_charm_delete
, timeout_ns_terminate
),
4595 tasks_dict_info
.clear()
4597 return # raise LcmException("; ".join(error_list))
4599 # remove All execution environments at once
4600 stage
[0] = "Stage 3/3 delete all."
4602 if nsr_deployed
.get("VCA"):
4603 stage
[1] = "Deleting all execution environments."
4604 self
.logger
.debug(logging_text
+ stage
[1])
4605 vca_id
= self
.get_vca_id({}, db_nsr
)
4606 task_delete_ee
= asyncio
.ensure_future(
4608 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4609 timeout
=self
.timeout_charm_delete
,
4612 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4613 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4615 # Delete from k8scluster
4616 stage
[1] = "Deleting KDUs."
4617 self
.logger
.debug(logging_text
+ stage
[1])
4618 # print(nsr_deployed)
4619 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4620 if not kdu
or not kdu
.get("kdu-instance"):
4622 kdu_instance
= kdu
.get("kdu-instance")
4623 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4624 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4625 vca_id
= self
.get_vca_id({}, db_nsr
)
4626 task_delete_kdu_instance
= asyncio
.ensure_future(
4627 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4628 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4629 kdu_instance
=kdu_instance
,
4631 namespace
=kdu
.get("namespace"),
4637 + "Unknown k8s deployment type {}".format(
4638 kdu
.get("k8scluster-type")
4643 task_delete_kdu_instance
4644 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4647 stage
[1] = "Deleting ns from VIM."
4649 task_delete_ro
= asyncio
.ensure_future(
4650 self
._terminate
_ng
_ro
(
4651 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4655 task_delete_ro
= asyncio
.ensure_future(
4657 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4660 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4662 # rest of staff will be done at finally
4665 ROclient
.ROClientException
,
4670 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4672 except asyncio
.CancelledError
:
4674 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4676 exc
= "Operation was cancelled"
4677 except Exception as e
:
4678 exc
= traceback
.format_exc()
4679 self
.logger
.critical(
4680 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4685 error_list
.append(str(exc
))
4687 # wait for pending tasks
4689 stage
[1] = "Waiting for terminate pending tasks."
4690 self
.logger
.debug(logging_text
+ stage
[1])
4691 error_list
+= await self
._wait
_for
_tasks
(
4694 timeout_ns_terminate
,
4698 stage
[1] = stage
[2] = ""
4699 except asyncio
.CancelledError
:
4700 error_list
.append("Cancelled")
4701 # TODO cancell all tasks
4702 except Exception as exc
:
4703 error_list
.append(str(exc
))
4704 # update status at database
4706 error_detail
= "; ".join(error_list
)
4707 # self.logger.error(logging_text + error_detail)
4708 error_description_nslcmop
= "{} Detail: {}".format(
4709 stage
[0], error_detail
4711 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4712 nslcmop_id
, stage
[0]
4715 db_nsr_update
["operational-status"] = "failed"
4716 db_nsr_update
["detailed-status"] = (
4717 error_description_nsr
+ " Detail: " + error_detail
4719 db_nslcmop_update
["detailed-status"] = error_detail
4720 nslcmop_operation_state
= "FAILED"
4724 error_description_nsr
= error_description_nslcmop
= None
4725 ns_state
= "NOT_INSTANTIATED"
4726 db_nsr_update
["operational-status"] = "terminated"
4727 db_nsr_update
["detailed-status"] = "Done"
4728 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4729 db_nslcmop_update
["detailed-status"] = "Done"
4730 nslcmop_operation_state
= "COMPLETED"
4733 self
._write
_ns
_status
(
4736 current_operation
="IDLE",
4737 current_operation_id
=None,
4738 error_description
=error_description_nsr
,
4739 error_detail
=error_detail
,
4740 other_update
=db_nsr_update
,
4742 self
._write
_op
_status
(
4745 error_message
=error_description_nslcmop
,
4746 operation_state
=nslcmop_operation_state
,
4747 other_update
=db_nslcmop_update
,
4749 if ns_state
== "NOT_INSTANTIATED":
4753 {"nsr-id-ref": nsr_id
},
4754 {"_admin.nsState": "NOT_INSTANTIATED"},
4756 except DbException
as e
:
4759 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4763 if operation_params
:
4764 autoremove
= operation_params
.get("autoremove", False)
4765 if nslcmop_operation_state
:
4767 await self
.msg
.aiowrite(
4772 "nslcmop_id": nslcmop_id
,
4773 "operationState": nslcmop_operation_state
,
4774 "autoremove": autoremove
,
4778 except Exception as e
:
4780 logging_text
+ "kafka_write notification Exception {}".format(e
)
4783 self
.logger
.debug(logging_text
+ "Exit")
4784 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4786 async def _wait_for_tasks(
4787 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4790 error_detail_list
= []
4792 pending_tasks
= list(created_tasks_info
.keys())
4793 num_tasks
= len(pending_tasks
)
4795 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4796 self
._write
_op
_status
(nslcmop_id
, stage
)
4797 while pending_tasks
:
4799 _timeout
= timeout
+ time_start
- time()
4800 done
, pending_tasks
= await asyncio
.wait(
4801 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4803 num_done
+= len(done
)
4804 if not done
: # Timeout
4805 for task
in pending_tasks
:
4806 new_error
= created_tasks_info
[task
] + ": Timeout"
4807 error_detail_list
.append(new_error
)
4808 error_list
.append(new_error
)
4811 if task
.cancelled():
4814 exc
= task
.exception()
4816 if isinstance(exc
, asyncio
.TimeoutError
):
4818 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4819 error_list
.append(created_tasks_info
[task
])
4820 error_detail_list
.append(new_error
)
4827 ROclient
.ROClientException
,
4833 self
.logger
.error(logging_text
+ new_error
)
4835 exc_traceback
= "".join(
4836 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4840 + created_tasks_info
[task
]
4846 logging_text
+ created_tasks_info
[task
] + ": Done"
4848 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4850 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4851 if nsr_id
: # update also nsr
4856 "errorDescription": "Error at: " + ", ".join(error_list
),
4857 "errorDetail": ". ".join(error_detail_list
),
4860 self
._write
_op
_status
(nslcmop_id
, stage
)
4861 return error_detail_list
4864 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4866 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4867 The default-value is used. If it is between < > it look for a value at instantiation_params
4868 :param primitive_desc: portion of VNFD/NSD that describes primitive
4869 :param params: Params provided by user
4870 :param instantiation_params: Instantiation params provided by user
4871 :return: a dictionary with the calculated params
4873 calculated_params
= {}
4874 for parameter
in primitive_desc
.get("parameter", ()):
4875 param_name
= parameter
["name"]
4876 if param_name
in params
:
4877 calculated_params
[param_name
] = params
[param_name
]
4878 elif "default-value" in parameter
or "value" in parameter
:
4879 if "value" in parameter
:
4880 calculated_params
[param_name
] = parameter
["value"]
4882 calculated_params
[param_name
] = parameter
["default-value"]
4884 isinstance(calculated_params
[param_name
], str)
4885 and calculated_params
[param_name
].startswith("<")
4886 and calculated_params
[param_name
].endswith(">")
4888 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4889 calculated_params
[param_name
] = instantiation_params
[
4890 calculated_params
[param_name
][1:-1]
4894 "Parameter {} needed to execute primitive {} not provided".format(
4895 calculated_params
[param_name
], primitive_desc
["name"]
4900 "Parameter {} needed to execute primitive {} not provided".format(
4901 param_name
, primitive_desc
["name"]
4905 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4906 calculated_params
[param_name
] = yaml
.safe_dump(
4907 calculated_params
[param_name
], default_flow_style
=True, width
=256
4909 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4911 ].startswith("!!yaml "):
4912 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4913 if parameter
.get("data-type") == "INTEGER":
4915 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4916 except ValueError: # error converting string to int
4918 "Parameter {} of primitive {} must be integer".format(
4919 param_name
, primitive_desc
["name"]
4922 elif parameter
.get("data-type") == "BOOLEAN":
4923 calculated_params
[param_name
] = not (
4924 (str(calculated_params
[param_name
])).lower() == "false"
4927 # add always ns_config_info if primitive name is config
4928 if primitive_desc
["name"] == "config":
4929 if "ns_config_info" in instantiation_params
:
4930 calculated_params
["ns_config_info"] = instantiation_params
[
4933 return calculated_params
4935 def _look_for_deployed_vca(
4942 ee_descriptor_id
=None,
4944 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4945 for vca
in deployed_vca
:
4948 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4951 vdu_count_index
is not None
4952 and vdu_count_index
!= vca
["vdu_count_index"]
4955 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4957 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4961 # vca_deployed not found
4963 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4964 " is not deployed".format(
4973 ee_id
= vca
.get("ee_id")
4975 "type", "lxc_proxy_charm"
4976 ) # default value for backward compatibility - proxy charm
4979 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4980 "execution environment".format(
4981 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
4984 return ee_id
, vca_type
4986 async def _ns_execute_primitive(
4992 retries_interval
=30,
4999 if primitive
== "config":
5000 primitive_params
= {"params": primitive_params
}
5002 vca_type
= vca_type
or "lxc_proxy_charm"
5006 output
= await asyncio
.wait_for(
5007 self
.vca_map
[vca_type
].exec_primitive(
5009 primitive_name
=primitive
,
5010 params_dict
=primitive_params
,
5011 progress_timeout
=self
.timeout_progress_primitive
,
5012 total_timeout
=self
.timeout_primitive
,
5017 timeout
=timeout
or self
.timeout_primitive
,
5021 except asyncio
.CancelledError
:
5023 except Exception as e
: # asyncio.TimeoutError
5024 if isinstance(e
, asyncio
.TimeoutError
):
5029 "Error executing action {} on {} -> {}".format(
5034 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
5036 return "FAILED", str(e
)
5038 return "COMPLETED", output
5040 except (LcmException
, asyncio
.CancelledError
):
5042 except Exception as e
:
5043 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
5045 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
5047 Updating the vca_status with latest juju information in nsrs record
5048 :param: nsr_id: Id of the nsr
5049 :param: nslcmop_id: Id of the nslcmop
5053 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
5054 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5055 vca_id
= self
.get_vca_id({}, db_nsr
)
5056 if db_nsr
["_admin"]["deployed"]["K8s"]:
5057 for _
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
5058 cluster_uuid
, kdu_instance
, cluster_type
= (
5059 k8s
["k8scluster-uuid"],
5060 k8s
["kdu-instance"],
5061 k8s
["k8scluster-type"],
5063 await self
._on
_update
_k
8s
_db
(
5064 cluster_uuid
=cluster_uuid
,
5065 kdu_instance
=kdu_instance
,
5066 filter={"_id": nsr_id
},
5068 cluster_type
=cluster_type
,
5071 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
5072 table
, filter = "nsrs", {"_id": nsr_id
}
5073 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
5074 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
5076 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
5077 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
5079 async def action(self
, nsr_id
, nslcmop_id
):
5080 # Try to lock HA task here
5081 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5082 if not task_is_locked_by_me
:
5085 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
5086 self
.logger
.debug(logging_text
+ "Enter")
5087 # get all needed from database
5091 db_nslcmop_update
= {}
5092 nslcmop_operation_state
= None
5093 error_description_nslcmop
= None
5096 # wait for any previous tasks in process
5097 step
= "Waiting for previous operations to terminate"
5098 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5100 self
._write
_ns
_status
(
5103 current_operation
="RUNNING ACTION",
5104 current_operation_id
=nslcmop_id
,
5107 step
= "Getting information from database"
5108 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5109 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5110 if db_nslcmop
["operationParams"].get("primitive_params"):
5111 db_nslcmop
["operationParams"]["primitive_params"] = json
.loads(
5112 db_nslcmop
["operationParams"]["primitive_params"]
5115 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5116 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
5117 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
5118 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
5119 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
5120 primitive
= db_nslcmop
["operationParams"]["primitive"]
5121 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
5122 timeout_ns_action
= db_nslcmop
["operationParams"].get(
5123 "timeout_ns_action", self
.timeout_primitive
5127 step
= "Getting vnfr from database"
5128 db_vnfr
= self
.db
.get_one(
5129 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
5131 if db_vnfr
.get("kdur"):
5133 for kdur
in db_vnfr
["kdur"]:
5134 if kdur
.get("additionalParams"):
5135 kdur
["additionalParams"] = json
.loads(
5136 kdur
["additionalParams"]
5138 kdur_list
.append(kdur
)
5139 db_vnfr
["kdur"] = kdur_list
5140 step
= "Getting vnfd from database"
5141 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
5143 # Sync filesystem before running a primitive
5144 self
.fs
.sync(db_vnfr
["vnfd-id"])
5146 step
= "Getting nsd from database"
5147 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
5149 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5150 # for backward compatibility
5151 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5152 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5153 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5154 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5156 # look for primitive
5157 config_primitive_desc
= descriptor_configuration
= None
5159 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
5161 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
5163 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
5165 descriptor_configuration
= db_nsd
.get("ns-configuration")
5167 if descriptor_configuration
and descriptor_configuration
.get(
5170 for config_primitive
in descriptor_configuration
["config-primitive"]:
5171 if config_primitive
["name"] == primitive
:
5172 config_primitive_desc
= config_primitive
5175 if not config_primitive_desc
:
5176 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
5178 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
5182 primitive_name
= primitive
5183 ee_descriptor_id
= None
5185 primitive_name
= config_primitive_desc
.get(
5186 "execution-environment-primitive", primitive
5188 ee_descriptor_id
= config_primitive_desc
.get(
5189 "execution-environment-ref"
5195 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
5197 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
5200 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
5202 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
5204 desc_params
= parse_yaml_strings(
5205 db_vnfr
.get("additionalParamsForVnf")
5208 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
5209 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
5210 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
5212 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
5213 actions
.add(primitive
["name"])
5214 for primitive
in kdu_configuration
.get("config-primitive", []):
5215 actions
.add(primitive
["name"])
5217 nsr_deployed
["K8s"],
5218 lambda kdu
: kdu_name
== kdu
["kdu-name"]
5219 and kdu
["member-vnf-index"] == vnf_index
,
5223 if primitive_name
in actions
5224 and kdu
["k8scluster-type"] not in ("helm-chart", "helm-chart-v3")
5228 # TODO check if ns is in a proper status
5230 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
5232 # kdur and desc_params already set from before
5233 if primitive_params
:
5234 desc_params
.update(primitive_params
)
5235 # TODO Check if we will need something at vnf level
5236 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
5238 kdu_name
== kdu
["kdu-name"]
5239 and kdu
["member-vnf-index"] == vnf_index
5244 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
5247 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
5248 msg
= "unknown k8scluster-type '{}'".format(
5249 kdu
.get("k8scluster-type")
5251 raise LcmException(msg
)
5254 "collection": "nsrs",
5255 "filter": {"_id": nsr_id
},
5256 "path": "_admin.deployed.K8s.{}".format(index
),
5260 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
5262 step
= "Executing kdu {}".format(primitive_name
)
5263 if primitive_name
== "upgrade":
5264 if desc_params
.get("kdu_model"):
5265 kdu_model
= desc_params
.get("kdu_model")
5266 del desc_params
["kdu_model"]
5268 kdu_model
= kdu
.get("kdu-model")
5269 if kdu_model
.count("/") < 2:
5270 parts
= kdu_model
.split(sep
=":")
5272 kdu_model
= parts
[0]
5273 if desc_params
.get("kdu_atomic_upgrade"):
5274 atomic_upgrade
= desc_params
.get(
5275 "kdu_atomic_upgrade"
5276 ).lower() in ("yes", "true", "1")
5277 del desc_params
["kdu_atomic_upgrade"]
5279 atomic_upgrade
= True
5281 detailed_status
= await asyncio
.wait_for(
5282 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
5283 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5284 kdu_instance
=kdu
.get("kdu-instance"),
5285 atomic
=atomic_upgrade
,
5286 kdu_model
=kdu_model
,
5289 timeout
=timeout_ns_action
,
5291 timeout
=timeout_ns_action
+ 10,
5294 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
5296 elif primitive_name
== "rollback":
5297 detailed_status
= await asyncio
.wait_for(
5298 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
5299 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5300 kdu_instance
=kdu
.get("kdu-instance"),
5303 timeout
=timeout_ns_action
,
5305 elif primitive_name
== "status":
5306 detailed_status
= await asyncio
.wait_for(
5307 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
5308 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5309 kdu_instance
=kdu
.get("kdu-instance"),
5312 timeout
=timeout_ns_action
,
5315 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
5316 kdu
["kdu-name"], nsr_id
5318 params
= self
._map
_primitive
_params
(
5319 config_primitive_desc
, primitive_params
, desc_params
5322 detailed_status
= await asyncio
.wait_for(
5323 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
5324 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5325 kdu_instance
=kdu_instance
,
5326 primitive_name
=primitive_name
,
5329 timeout
=timeout_ns_action
,
5332 timeout
=timeout_ns_action
,
5336 nslcmop_operation_state
= "COMPLETED"
5338 detailed_status
= ""
5339 nslcmop_operation_state
= "FAILED"
5341 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5342 nsr_deployed
["VCA"],
5343 member_vnf_index
=vnf_index
,
5345 vdu_count_index
=vdu_count_index
,
5346 ee_descriptor_id
=ee_descriptor_id
,
5348 for vca_index
, vca_deployed
in enumerate(
5349 db_nsr
["_admin"]["deployed"]["VCA"]
5351 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5353 "collection": "nsrs",
5354 "filter": {"_id": nsr_id
},
5355 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
5359 nslcmop_operation_state
,
5361 ) = await self
._ns
_execute
_primitive
(
5363 primitive
=primitive_name
,
5364 primitive_params
=self
._map
_primitive
_params
(
5365 config_primitive_desc
, primitive_params
, desc_params
5367 timeout
=timeout_ns_action
,
5373 db_nslcmop_update
["detailed-status"] = detailed_status
5374 error_description_nslcmop
= (
5375 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
5379 + " task Done with result {} {}".format(
5380 nslcmop_operation_state
, detailed_status
5383 return # database update is called inside finally
5385 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5386 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5388 except asyncio
.CancelledError
:
5390 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5392 exc
= "Operation was cancelled"
5393 except asyncio
.TimeoutError
:
5394 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5396 except Exception as e
:
5397 exc
= traceback
.format_exc()
5398 self
.logger
.critical(
5399 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5408 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5409 nslcmop_operation_state
= "FAILED"
5411 self
._write
_ns
_status
(
5415 ], # TODO check if degraded. For the moment use previous status
5416 current_operation
="IDLE",
5417 current_operation_id
=None,
5418 # error_description=error_description_nsr,
5419 # error_detail=error_detail,
5420 other_update
=db_nsr_update
,
5423 self
._write
_op
_status
(
5426 error_message
=error_description_nslcmop
,
5427 operation_state
=nslcmop_operation_state
,
5428 other_update
=db_nslcmop_update
,
5431 if nslcmop_operation_state
:
5433 await self
.msg
.aiowrite(
5438 "nslcmop_id": nslcmop_id
,
5439 "operationState": nslcmop_operation_state
,
5443 except Exception as e
:
5445 logging_text
+ "kafka_write notification Exception {}".format(e
)
5447 self
.logger
.debug(logging_text
+ "Exit")
5448 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5449 return nslcmop_operation_state
, detailed_status
5451 async def terminate_vdus(
5452 self
, db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
5454 """This method terminates VDUs
5457 db_vnfr: VNF instance record
5458 member_vnf_index: VNF index to identify the VDUs to be removed
5459 db_nsr: NS instance record
5460 update_db_nslcmops: Nslcmop update record
5462 vca_scaling_info
= []
5463 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5464 scaling_info
["scaling_direction"] = "IN"
5465 scaling_info
["vdu-delete"] = {}
5466 scaling_info
["kdu-delete"] = {}
5467 db_vdur
= db_vnfr
.get("vdur")
5468 vdur_list
= copy(db_vdur
)
5470 for index
, vdu
in enumerate(vdur_list
):
5471 vca_scaling_info
.append(
5473 "osm_vdu_id": vdu
["vdu-id-ref"],
5474 "member-vnf-index": member_vnf_index
,
5476 "vdu_index": count_index
,
5479 scaling_info
["vdu-delete"][vdu
["vdu-id-ref"]] = count_index
5480 scaling_info
["vdu"].append(
5482 "name": vdu
.get("name") or vdu
.get("vdu-name"),
5483 "vdu_id": vdu
["vdu-id-ref"],
5487 for interface
in vdu
["interfaces"]:
5488 scaling_info
["vdu"][index
]["interface"].append(
5490 "name": interface
["name"],
5491 "ip_address": interface
["ip-address"],
5492 "mac_address": interface
.get("mac-address"),
5495 self
.logger
.info("NS update scaling info{}".format(scaling_info
))
5496 stage
[2] = "Terminating VDUs"
5497 if scaling_info
.get("vdu-delete"):
5498 # scale_process = "RO"
5499 if self
.ro_config
.get("ng"):
5500 await self
._scale
_ng
_ro
(
5509 async def remove_vnf(self
, nsr_id
, nslcmop_id
, vnf_instance_id
):
5510 """This method is to Remove VNF instances from NS.
5513 nsr_id: NS instance id
5514 nslcmop_id: nslcmop id of update
5515 vnf_instance_id: id of the VNF instance to be removed
5518 result: (str, str) COMPLETED/FAILED, details
5522 logging_text
= "Task ns={} update ".format(nsr_id
)
5523 check_vnfr_count
= len(self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}))
5524 self
.logger
.info("check_vnfr_count {}".format(check_vnfr_count
))
5525 if check_vnfr_count
> 1:
5526 stage
= ["", "", ""]
5527 step
= "Getting nslcmop from database"
5529 step
+ " after having waited for previous tasks to be completed"
5531 # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5532 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5533 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5534 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5535 """ db_vnfr = self.db.get_one(
5536 "vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id}) """
5538 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5539 await self
.terminate_vdus(
5548 constituent_vnfr
= db_nsr
.get("constituent-vnfr-ref")
5549 constituent_vnfr
.remove(db_vnfr
.get("_id"))
5550 db_nsr_update
["constituent-vnfr-ref"] = db_nsr
.get(
5551 "constituent-vnfr-ref"
5553 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5554 self
.db
.del_one("vnfrs", {"_id": db_vnfr
.get("_id")})
5555 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5556 return "COMPLETED", "Done"
5558 step
= "Terminate VNF Failed with"
5560 "{} Cannot terminate the last VNF in this NS.".format(
5564 except (LcmException
, asyncio
.CancelledError
):
5566 except Exception as e
:
5567 self
.logger
.debug("Error removing VNF {}".format(e
))
5568 return "FAILED", "Error removing VNF {}".format(e
)
5570 async def _ns_redeploy_vnf(
5578 """This method updates and redeploys VNF instances
5581 nsr_id: NS instance id
5582 nslcmop_id: nslcmop id
5583 db_vnfd: VNF descriptor
5584 db_vnfr: VNF instance record
5585 db_nsr: NS instance record
5588 result: (str, str) COMPLETED/FAILED, details
5592 stage
= ["", "", ""]
5593 logging_text
= "Task ns={} update ".format(nsr_id
)
5594 latest_vnfd_revision
= db_vnfd
["_admin"].get("revision")
5595 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5597 # Terminate old VNF resources
5598 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5599 await self
.terminate_vdus(
5608 # old_vnfd_id = db_vnfr["vnfd-id"]
5609 # new_db_vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
5610 new_db_vnfd
= db_vnfd
5611 # new_vnfd_ref = new_db_vnfd["id"]
5612 # new_vnfd_id = vnfd_id
5616 for cp
in new_db_vnfd
.get("ext-cpd", ()):
5618 "name": cp
.get("id"),
5619 "connection-point-id": cp
.get("int-cpd", {}).get("cpd"),
5620 "connection-point-vdu-id": cp
.get("int-cpd", {}).get("vdu-id"),
5623 new_vnfr_cp
.append(vnf_cp
)
5624 new_vdur
= update_db_nslcmops
["operationParams"]["newVdur"]
5625 # new_vdur = self._create_vdur_descriptor_from_vnfd(db_nsd, db_vnfd, old_db_vnfd, vnfd_id, db_nsr, member_vnf_index)
5626 # new_vnfr_update = {"vnfd-ref": new_vnfd_ref, "vnfd-id": new_vnfd_id, "connection-point": new_vnfr_cp, "vdur": new_vdur, "ip-address": ""}
5628 "revision": latest_vnfd_revision
,
5629 "connection-point": new_vnfr_cp
,
5633 self
.update_db_2("vnfrs", db_vnfr
["_id"], new_vnfr_update
)
5634 updated_db_vnfr
= self
.db
.get_one(
5636 {"member-vnf-index-ref": member_vnf_index
, "nsr-id-ref": nsr_id
},
5639 # Instantiate new VNF resources
5640 # update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5641 vca_scaling_info
= []
5642 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5643 scaling_info
["scaling_direction"] = "OUT"
5644 scaling_info
["vdu-create"] = {}
5645 scaling_info
["kdu-create"] = {}
5646 vdud_instantiate_list
= db_vnfd
["vdu"]
5647 for index
, vdud
in enumerate(vdud_instantiate_list
):
5648 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(vdud
, db_vnfd
)
5650 additional_params
= (
5651 self
._get
_vdu
_additional
_params
(updated_db_vnfr
, vdud
["id"])
5654 cloud_init_list
= []
5656 # TODO Information of its own ip is not available because db_vnfr is not updated.
5657 additional_params
["OSM"] = get_osm_params(
5658 updated_db_vnfr
, vdud
["id"], 1
5660 cloud_init_list
.append(
5661 self
._parse
_cloud
_init
(
5668 vca_scaling_info
.append(
5670 "osm_vdu_id": vdud
["id"],
5671 "member-vnf-index": member_vnf_index
,
5673 "vdu_index": count_index
,
5676 scaling_info
["vdu-create"][vdud
["id"]] = count_index
5677 if self
.ro_config
.get("ng"):
5679 "New Resources to be deployed: {}".format(scaling_info
)
5681 await self
._scale
_ng
_ro
(
5689 return "COMPLETED", "Done"
5690 except (LcmException
, asyncio
.CancelledError
):
5692 except Exception as e
:
5693 self
.logger
.debug("Error updating VNF {}".format(e
))
5694 return "FAILED", "Error updating VNF {}".format(e
)
5696 async def _ns_charm_upgrade(
5702 timeout
: float = None,
5704 """This method upgrade charms in VNF instances
5707 ee_id: Execution environment id
5708 path: Local path to the charm
5710 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
5711 timeout: (Float) Timeout for the ns update operation
5714 result: (str, str) COMPLETED/FAILED, details
5717 charm_type
= charm_type
or "lxc_proxy_charm"
5718 output
= await self
.vca_map
[charm_type
].upgrade_charm(
5722 charm_type
=charm_type
,
5723 timeout
=timeout
or self
.timeout_ns_update
,
5727 return "COMPLETED", output
5729 except (LcmException
, asyncio
.CancelledError
):
5732 except Exception as e
:
5733 self
.logger
.debug("Error upgrading charm {}".format(path
))
5735 return "FAILED", "Error upgrading charm {}: {}".format(path
, e
)
5737 async def update(self
, nsr_id
, nslcmop_id
):
5738 """Update NS according to different update types
5740 This method performs upgrade of VNF instances then updates the revision
5741 number in VNF record
5744 nsr_id: Network service will be updated
5745 nslcmop_id: ns lcm operation id
5748 It may raise DbException, LcmException, N2VCException, K8sException
5751 # Try to lock HA task here
5752 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5753 if not task_is_locked_by_me
:
5756 logging_text
= "Task ns={} update={} ".format(nsr_id
, nslcmop_id
)
5757 self
.logger
.debug(logging_text
+ "Enter")
5759 # Set the required variables to be filled up later
5761 db_nslcmop_update
= {}
5763 nslcmop_operation_state
= None
5765 error_description_nslcmop
= ""
5767 change_type
= "updated"
5768 detailed_status
= ""
5771 # wait for any previous tasks in process
5772 step
= "Waiting for previous operations to terminate"
5773 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5774 self
._write
_ns
_status
(
5777 current_operation
="UPDATING",
5778 current_operation_id
=nslcmop_id
,
5781 step
= "Getting nslcmop from database"
5782 db_nslcmop
= self
.db
.get_one(
5783 "nslcmops", {"_id": nslcmop_id
}, fail_on_empty
=False
5785 update_type
= db_nslcmop
["operationParams"]["updateType"]
5787 step
= "Getting nsr from database"
5788 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5789 old_operational_status
= db_nsr
["operational-status"]
5790 db_nsr_update
["operational-status"] = "updating"
5791 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5792 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5794 if update_type
== "CHANGE_VNFPKG":
5795 # Get the input parameters given through update request
5796 vnf_instance_id
= db_nslcmop
["operationParams"][
5797 "changeVnfPackageData"
5798 ].get("vnfInstanceId")
5800 vnfd_id
= db_nslcmop
["operationParams"]["changeVnfPackageData"].get(
5803 timeout_seconds
= db_nslcmop
["operationParams"].get("timeout_ns_update")
5805 step
= "Getting vnfr from database"
5806 db_vnfr
= self
.db
.get_one(
5807 "vnfrs", {"_id": vnf_instance_id
}, fail_on_empty
=False
5810 step
= "Getting vnfds from database"
5812 latest_vnfd
= self
.db
.get_one(
5813 "vnfds", {"_id": vnfd_id
}, fail_on_empty
=False
5815 latest_vnfd_revision
= latest_vnfd
["_admin"].get("revision")
5818 current_vnf_revision
= db_vnfr
.get("revision", 1)
5819 current_vnfd
= self
.db
.get_one(
5821 {"_id": vnfd_id
+ ":" + str(current_vnf_revision
)},
5822 fail_on_empty
=False,
5824 # Charm artifact paths will be filled up later
5826 current_charm_artifact_path
,
5827 target_charm_artifact_path
,
5828 charm_artifact_paths
,
5831 step
= "Checking if revision has changed in VNFD"
5832 if current_vnf_revision
!= latest_vnfd_revision
:
5833 change_type
= "policy_updated"
5835 # There is new revision of VNFD, update operation is required
5836 current_vnfd_path
= vnfd_id
+ ":" + str(current_vnf_revision
)
5837 latest_vnfd_path
= vnfd_id
+ ":" + str(latest_vnfd_revision
)
5839 step
= "Removing the VNFD packages if they exist in the local path"
5840 shutil
.rmtree(self
.fs
.path
+ current_vnfd_path
, ignore_errors
=True)
5841 shutil
.rmtree(self
.fs
.path
+ latest_vnfd_path
, ignore_errors
=True)
5843 step
= "Get the VNFD packages from FSMongo"
5844 self
.fs
.sync(from_path
=latest_vnfd_path
)
5845 self
.fs
.sync(from_path
=current_vnfd_path
)
5848 "Get the charm-type, charm-id, ee-id if there is deployed VCA"
5850 base_folder
= latest_vnfd
["_admin"]["storage"]
5852 for charm_index
, charm_deployed
in enumerate(
5853 get_iterable(nsr_deployed
, "VCA")
5855 vnf_index
= db_vnfr
.get("member-vnf-index-ref")
5857 # Getting charm-id and charm-type
5858 if charm_deployed
.get("member-vnf-index") == vnf_index
:
5859 charm_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5860 charm_type
= charm_deployed
.get("type")
5863 ee_id
= charm_deployed
.get("ee_id")
5865 step
= "Getting descriptor config"
5866 descriptor_config
= get_configuration(
5867 current_vnfd
, current_vnfd
["id"]
5870 if "execution-environment-list" in descriptor_config
:
5871 ee_list
= descriptor_config
.get(
5872 "execution-environment-list", []
5877 # There could be several charm used in the same VNF
5878 for ee_item
in ee_list
:
5879 if ee_item
.get("juju"):
5880 step
= "Getting charm name"
5881 charm_name
= ee_item
["juju"].get("charm")
5883 step
= "Setting Charm artifact paths"
5884 current_charm_artifact_path
.append(
5885 get_charm_artifact_path(
5889 current_vnf_revision
,
5892 target_charm_artifact_path
.append(
5893 get_charm_artifact_path(
5897 latest_vnfd_revision
,
5901 charm_artifact_paths
= zip(
5902 current_charm_artifact_path
, target_charm_artifact_path
5905 step
= "Checking if software version has changed in VNFD"
5906 if find_software_version(current_vnfd
) != find_software_version(
5909 step
= "Checking if existing VNF has charm"
5910 for current_charm_path
, target_charm_path
in list(
5911 charm_artifact_paths
5913 if current_charm_path
:
5915 "Software version change is not supported as VNF instance {} has charm.".format(
5920 # There is no change in the charm package, then redeploy the VNF
5921 # based on new descriptor
5922 step
= "Redeploying VNF"
5923 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5924 (result
, detailed_status
) = await self
._ns
_redeploy
_vnf
(
5925 nsr_id
, nslcmop_id
, latest_vnfd
, db_vnfr
, db_nsr
5927 if result
== "FAILED":
5928 nslcmop_operation_state
= result
5929 error_description_nslcmop
= detailed_status
5930 db_nslcmop_update
["detailed-status"] = detailed_status
5933 + " step {} Done with result {} {}".format(
5934 step
, nslcmop_operation_state
, detailed_status
5939 step
= "Checking if any charm package has changed or not"
5940 for current_charm_path
, target_charm_path
in list(
5941 charm_artifact_paths
5945 and target_charm_path
5946 and self
.check_charm_hash_changed(
5947 current_charm_path
, target_charm_path
5950 step
= "Checking whether VNF uses juju bundle"
5951 if check_juju_bundle_existence(current_vnfd
):
5953 "Charm upgrade is not supported for the instance which"
5954 " uses juju-bundle: {}".format(
5955 check_juju_bundle_existence(current_vnfd
)
5959 step
= "Upgrading Charm"
5963 ) = await self
._ns
_charm
_upgrade
(
5966 charm_type
=charm_type
,
5967 path
=self
.fs
.path
+ target_charm_path
,
5968 timeout
=timeout_seconds
,
5971 if result
== "FAILED":
5972 nslcmop_operation_state
= result
5973 error_description_nslcmop
= detailed_status
5975 db_nslcmop_update
["detailed-status"] = detailed_status
5978 + " step {} Done with result {} {}".format(
5979 step
, nslcmop_operation_state
, detailed_status
5983 step
= "Updating policies"
5984 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5985 result
= "COMPLETED"
5986 detailed_status
= "Done"
5987 db_nslcmop_update
["detailed-status"] = "Done"
5989 # If nslcmop_operation_state is None, so any operation is not failed.
5990 if not nslcmop_operation_state
:
5991 nslcmop_operation_state
= "COMPLETED"
5993 # If update CHANGE_VNFPKG nslcmop_operation is successful
5994 # vnf revision need to be updated
5995 vnfr_update
["revision"] = latest_vnfd_revision
5996 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
6000 + " task Done with result {} {}".format(
6001 nslcmop_operation_state
, detailed_status
6004 elif update_type
== "REMOVE_VNF":
6005 # This part is included in https://osm.etsi.org/gerrit/11876
6006 vnf_instance_id
= db_nslcmop
["operationParams"]["removeVnfInstanceId"]
6007 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
6008 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
6009 step
= "Removing VNF"
6010 (result
, detailed_status
) = await self
.remove_vnf(
6011 nsr_id
, nslcmop_id
, vnf_instance_id
6013 if result
== "FAILED":
6014 nslcmop_operation_state
= result
6015 error_description_nslcmop
= detailed_status
6016 db_nslcmop_update
["detailed-status"] = detailed_status
6017 change_type
= "vnf_terminated"
6018 if not nslcmop_operation_state
:
6019 nslcmop_operation_state
= "COMPLETED"
6022 + " task Done with result {} {}".format(
6023 nslcmop_operation_state
, detailed_status
6027 elif update_type
== "OPERATE_VNF":
6028 vnf_id
= db_nslcmop
["operationParams"]["operateVnfData"][
6031 operation_type
= db_nslcmop
["operationParams"]["operateVnfData"][
6034 additional_param
= db_nslcmop
["operationParams"]["operateVnfData"][
6037 (result
, detailed_status
) = await self
.rebuild_start_stop(
6038 nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
6040 if result
== "FAILED":
6041 nslcmop_operation_state
= result
6042 error_description_nslcmop
= detailed_status
6043 db_nslcmop_update
["detailed-status"] = detailed_status
6044 if not nslcmop_operation_state
:
6045 nslcmop_operation_state
= "COMPLETED"
6048 + " task Done with result {} {}".format(
6049 nslcmop_operation_state
, detailed_status
6053 # If nslcmop_operation_state is None, so any operation is not failed.
6054 # All operations are executed in overall.
6055 if not nslcmop_operation_state
:
6056 nslcmop_operation_state
= "COMPLETED"
6057 db_nsr_update
["operational-status"] = old_operational_status
6059 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
6060 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
6062 except asyncio
.CancelledError
:
6064 logging_text
+ "Cancelled Exception while '{}'".format(step
)
6066 exc
= "Operation was cancelled"
6067 except asyncio
.TimeoutError
:
6068 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
6070 except Exception as e
:
6071 exc
= traceback
.format_exc()
6072 self
.logger
.critical(
6073 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6082 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
6083 nslcmop_operation_state
= "FAILED"
6084 db_nsr_update
["operational-status"] = old_operational_status
6086 self
._write
_ns
_status
(
6088 ns_state
=db_nsr
["nsState"],
6089 current_operation
="IDLE",
6090 current_operation_id
=None,
6091 other_update
=db_nsr_update
,
6094 self
._write
_op
_status
(
6097 error_message
=error_description_nslcmop
,
6098 operation_state
=nslcmop_operation_state
,
6099 other_update
=db_nslcmop_update
,
6102 if nslcmop_operation_state
:
6106 "nslcmop_id": nslcmop_id
,
6107 "operationState": nslcmop_operation_state
,
6109 if change_type
in ("vnf_terminated", "policy_updated"):
6110 msg
.update({"vnf_member_index": member_vnf_index
})
6111 await self
.msg
.aiowrite("ns", change_type
, msg
, loop
=self
.loop
)
6112 except Exception as e
:
6114 logging_text
+ "kafka_write notification Exception {}".format(e
)
6116 self
.logger
.debug(logging_text
+ "Exit")
6117 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_update")
6118 return nslcmop_operation_state
, detailed_status
6120 async def scale(self
, nsr_id
, nslcmop_id
):
6121 # Try to lock HA task here
6122 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
6123 if not task_is_locked_by_me
:
6126 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
6127 stage
= ["", "", ""]
6128 tasks_dict_info
= {}
6129 # ^ stage, step, VIM progress
6130 self
.logger
.debug(logging_text
+ "Enter")
6131 # get all needed from database
6133 db_nslcmop_update
= {}
6136 # in case of error, indicates what part of scale was failed to put nsr at error status
6137 scale_process
= None
6138 old_operational_status
= ""
6139 old_config_status
= ""
6142 # wait for any previous tasks in process
6143 step
= "Waiting for previous operations to terminate"
6144 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
6145 self
._write
_ns
_status
(
6148 current_operation
="SCALING",
6149 current_operation_id
=nslcmop_id
,
6152 step
= "Getting nslcmop from database"
6154 step
+ " after having waited for previous tasks to be completed"
6156 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
6158 step
= "Getting nsr from database"
6159 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
6160 old_operational_status
= db_nsr
["operational-status"]
6161 old_config_status
= db_nsr
["config-status"]
6163 step
= "Parsing scaling parameters"
6164 db_nsr_update
["operational-status"] = "scaling"
6165 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6166 nsr_deployed
= db_nsr
["_admin"].get("deployed")
6168 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
6170 ]["member-vnf-index"]
6171 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
6173 ]["scaling-group-descriptor"]
6174 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
6175 # for backward compatibility
6176 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
6177 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
6178 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
6179 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6181 step
= "Getting vnfr from database"
6182 db_vnfr
= self
.db
.get_one(
6183 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
6186 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
6188 step
= "Getting vnfd from database"
6189 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
6191 base_folder
= db_vnfd
["_admin"]["storage"]
6193 step
= "Getting scaling-group-descriptor"
6194 scaling_descriptor
= find_in_list(
6195 get_scaling_aspect(db_vnfd
),
6196 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
6198 if not scaling_descriptor
:
6200 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
6201 "at vnfd:scaling-group-descriptor".format(scaling_group
)
6204 step
= "Sending scale order to VIM"
6205 # TODO check if ns is in a proper status
6207 if not db_nsr
["_admin"].get("scaling-group"):
6212 "_admin.scaling-group": [
6213 {"name": scaling_group
, "nb-scale-op": 0}
6217 admin_scale_index
= 0
6219 for admin_scale_index
, admin_scale_info
in enumerate(
6220 db_nsr
["_admin"]["scaling-group"]
6222 if admin_scale_info
["name"] == scaling_group
:
6223 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
6225 else: # not found, set index one plus last element and add new entry with the name
6226 admin_scale_index
+= 1
6228 "_admin.scaling-group.{}.name".format(admin_scale_index
)
6231 vca_scaling_info
= []
6232 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
6233 if scaling_type
== "SCALE_OUT":
6234 if "aspect-delta-details" not in scaling_descriptor
:
6236 "Aspect delta details not fount in scaling descriptor {}".format(
6237 scaling_descriptor
["name"]
6240 # count if max-instance-count is reached
6241 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6243 scaling_info
["scaling_direction"] = "OUT"
6244 scaling_info
["vdu-create"] = {}
6245 scaling_info
["kdu-create"] = {}
6246 for delta
in deltas
:
6247 for vdu_delta
in delta
.get("vdu-delta", {}):
6248 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
6249 # vdu_index also provides the number of instance of the targeted vdu
6250 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6251 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
6255 additional_params
= (
6256 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
6259 cloud_init_list
= []
6261 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6262 max_instance_count
= 10
6263 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
6264 max_instance_count
= vdu_profile
.get(
6265 "max-number-of-instances", 10
6268 default_instance_num
= get_number_of_instances(
6271 instances_number
= vdu_delta
.get("number-of-instances", 1)
6272 nb_scale_op
+= instances_number
6274 new_instance_count
= nb_scale_op
+ default_instance_num
6275 # Control if new count is over max and vdu count is less than max.
6276 # Then assign new instance count
6277 if new_instance_count
> max_instance_count
> vdu_count
:
6278 instances_number
= new_instance_count
- max_instance_count
6280 instances_number
= instances_number
6282 if new_instance_count
> max_instance_count
:
6284 "reached the limit of {} (max-instance-count) "
6285 "scaling-out operations for the "
6286 "scaling-group-descriptor '{}'".format(
6287 nb_scale_op
, scaling_group
6290 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6292 # TODO Information of its own ip is not available because db_vnfr is not updated.
6293 additional_params
["OSM"] = get_osm_params(
6294 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
6296 cloud_init_list
.append(
6297 self
._parse
_cloud
_init
(
6304 vca_scaling_info
.append(
6306 "osm_vdu_id": vdu_delta
["id"],
6307 "member-vnf-index": vnf_index
,
6309 "vdu_index": vdu_index
+ x
,
6312 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
6313 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6314 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6315 kdu_name
= kdu_profile
["kdu-name"]
6316 resource_name
= kdu_profile
.get("resource-name", "")
6318 # Might have different kdus in the same delta
6319 # Should have list for each kdu
6320 if not scaling_info
["kdu-create"].get(kdu_name
, None):
6321 scaling_info
["kdu-create"][kdu_name
] = []
6323 kdur
= get_kdur(db_vnfr
, kdu_name
)
6324 if kdur
.get("helm-chart"):
6325 k8s_cluster_type
= "helm-chart-v3"
6326 self
.logger
.debug("kdur: {}".format(kdur
))
6328 kdur
.get("helm-version")
6329 and kdur
.get("helm-version") == "v2"
6331 k8s_cluster_type
= "helm-chart"
6332 elif kdur
.get("juju-bundle"):
6333 k8s_cluster_type
= "juju-bundle"
6336 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6337 "juju-bundle. Maybe an old NBI version is running".format(
6338 db_vnfr
["member-vnf-index-ref"], kdu_name
6342 max_instance_count
= 10
6343 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
6344 max_instance_count
= kdu_profile
.get(
6345 "max-number-of-instances", 10
6348 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
6349 deployed_kdu
, _
= get_deployed_kdu(
6350 nsr_deployed
, kdu_name
, vnf_index
6352 if deployed_kdu
is None:
6354 "KDU '{}' for vnf '{}' not deployed".format(
6358 kdu_instance
= deployed_kdu
.get("kdu-instance")
6359 instance_num
= await self
.k8scluster_map
[
6365 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6366 kdu_model
=deployed_kdu
.get("kdu-model"),
6368 kdu_replica_count
= instance_num
+ kdu_delta
.get(
6369 "number-of-instances", 1
6372 # Control if new count is over max and instance_num is less than max.
6373 # Then assign max instance number to kdu replica count
6374 if kdu_replica_count
> max_instance_count
> instance_num
:
6375 kdu_replica_count
= max_instance_count
6376 if kdu_replica_count
> max_instance_count
:
6378 "reached the limit of {} (max-instance-count) "
6379 "scaling-out operations for the "
6380 "scaling-group-descriptor '{}'".format(
6381 instance_num
, scaling_group
6385 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6386 vca_scaling_info
.append(
6388 "osm_kdu_id": kdu_name
,
6389 "member-vnf-index": vnf_index
,
6391 "kdu_index": instance_num
+ x
- 1,
6394 scaling_info
["kdu-create"][kdu_name
].append(
6396 "member-vnf-index": vnf_index
,
6398 "k8s-cluster-type": k8s_cluster_type
,
6399 "resource-name": resource_name
,
6400 "scale": kdu_replica_count
,
6403 elif scaling_type
== "SCALE_IN":
6404 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6406 scaling_info
["scaling_direction"] = "IN"
6407 scaling_info
["vdu-delete"] = {}
6408 scaling_info
["kdu-delete"] = {}
6410 for delta
in deltas
:
6411 for vdu_delta
in delta
.get("vdu-delta", {}):
6412 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6413 min_instance_count
= 0
6414 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6415 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
6416 min_instance_count
= vdu_profile
["min-number-of-instances"]
6418 default_instance_num
= get_number_of_instances(
6419 db_vnfd
, vdu_delta
["id"]
6421 instance_num
= vdu_delta
.get("number-of-instances", 1)
6422 nb_scale_op
-= instance_num
6424 new_instance_count
= nb_scale_op
+ default_instance_num
6426 if new_instance_count
< min_instance_count
< vdu_count
:
6427 instances_number
= min_instance_count
- new_instance_count
6429 instances_number
= instance_num
6431 if new_instance_count
< min_instance_count
:
6433 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6434 "scaling-group-descriptor '{}'".format(
6435 nb_scale_op
, scaling_group
6438 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6439 vca_scaling_info
.append(
6441 "osm_vdu_id": vdu_delta
["id"],
6442 "member-vnf-index": vnf_index
,
6444 "vdu_index": vdu_index
- 1 - x
,
6447 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
6448 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6449 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6450 kdu_name
= kdu_profile
["kdu-name"]
6451 resource_name
= kdu_profile
.get("resource-name", "")
6453 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
6454 scaling_info
["kdu-delete"][kdu_name
] = []
6456 kdur
= get_kdur(db_vnfr
, kdu_name
)
6457 if kdur
.get("helm-chart"):
6458 k8s_cluster_type
= "helm-chart-v3"
6459 self
.logger
.debug("kdur: {}".format(kdur
))
6461 kdur
.get("helm-version")
6462 and kdur
.get("helm-version") == "v2"
6464 k8s_cluster_type
= "helm-chart"
6465 elif kdur
.get("juju-bundle"):
6466 k8s_cluster_type
= "juju-bundle"
6469 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6470 "juju-bundle. Maybe an old NBI version is running".format(
6471 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
6475 min_instance_count
= 0
6476 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
6477 min_instance_count
= kdu_profile
["min-number-of-instances"]
6479 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
6480 deployed_kdu
, _
= get_deployed_kdu(
6481 nsr_deployed
, kdu_name
, vnf_index
6483 if deployed_kdu
is None:
6485 "KDU '{}' for vnf '{}' not deployed".format(
6489 kdu_instance
= deployed_kdu
.get("kdu-instance")
6490 instance_num
= await self
.k8scluster_map
[
6496 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6497 kdu_model
=deployed_kdu
.get("kdu-model"),
6499 kdu_replica_count
= instance_num
- kdu_delta
.get(
6500 "number-of-instances", 1
6503 if kdu_replica_count
< min_instance_count
< instance_num
:
6504 kdu_replica_count
= min_instance_count
6505 if kdu_replica_count
< min_instance_count
:
6507 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6508 "scaling-group-descriptor '{}'".format(
6509 instance_num
, scaling_group
6513 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6514 vca_scaling_info
.append(
6516 "osm_kdu_id": kdu_name
,
6517 "member-vnf-index": vnf_index
,
6519 "kdu_index": instance_num
- x
- 1,
6522 scaling_info
["kdu-delete"][kdu_name
].append(
6524 "member-vnf-index": vnf_index
,
6526 "k8s-cluster-type": k8s_cluster_type
,
6527 "resource-name": resource_name
,
6528 "scale": kdu_replica_count
,
6532 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
6533 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
6534 if scaling_info
["scaling_direction"] == "IN":
6535 for vdur
in reversed(db_vnfr
["vdur"]):
6536 if vdu_delete
.get(vdur
["vdu-id-ref"]):
6537 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
6538 scaling_info
["vdu"].append(
6540 "name": vdur
.get("name") or vdur
.get("vdu-name"),
6541 "vdu_id": vdur
["vdu-id-ref"],
6545 for interface
in vdur
["interfaces"]:
6546 scaling_info
["vdu"][-1]["interface"].append(
6548 "name": interface
["name"],
6549 "ip_address": interface
["ip-address"],
6550 "mac_address": interface
.get("mac-address"),
6553 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
6556 step
= "Executing pre-scale vnf-config-primitive"
6557 if scaling_descriptor
.get("scaling-config-action"):
6558 for scaling_config_action
in scaling_descriptor
[
6559 "scaling-config-action"
6562 scaling_config_action
.get("trigger") == "pre-scale-in"
6563 and scaling_type
== "SCALE_IN"
6565 scaling_config_action
.get("trigger") == "pre-scale-out"
6566 and scaling_type
== "SCALE_OUT"
6568 vnf_config_primitive
= scaling_config_action
[
6569 "vnf-config-primitive-name-ref"
6571 step
= db_nslcmop_update
[
6573 ] = "executing pre-scale scaling-config-action '{}'".format(
6574 vnf_config_primitive
6577 # look for primitive
6578 for config_primitive
in (
6579 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6580 ).get("config-primitive", ()):
6581 if config_primitive
["name"] == vnf_config_primitive
:
6585 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
6586 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
6587 "primitive".format(scaling_group
, vnf_config_primitive
)
6590 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6591 if db_vnfr
.get("additionalParamsForVnf"):
6592 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6594 scale_process
= "VCA"
6595 db_nsr_update
["config-status"] = "configuring pre-scaling"
6596 primitive_params
= self
._map
_primitive
_params
(
6597 config_primitive
, {}, vnfr_params
6600 # Pre-scale retry check: Check if this sub-operation has been executed before
6601 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6604 vnf_config_primitive
,
6608 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6609 # Skip sub-operation
6610 result
= "COMPLETED"
6611 result_detail
= "Done"
6614 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6615 vnf_config_primitive
, result
, result_detail
6619 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6620 # New sub-operation: Get index of this sub-operation
6622 len(db_nslcmop
.get("_admin", {}).get("operations"))
6627 + "vnf_config_primitive={} New sub-operation".format(
6628 vnf_config_primitive
6632 # retry: Get registered params for this existing sub-operation
6633 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6636 vnf_index
= op
.get("member_vnf_index")
6637 vnf_config_primitive
= op
.get("primitive")
6638 primitive_params
= op
.get("primitive_params")
6641 + "vnf_config_primitive={} Sub-operation retry".format(
6642 vnf_config_primitive
6645 # Execute the primitive, either with new (first-time) or registered (reintent) args
6646 ee_descriptor_id
= config_primitive
.get(
6647 "execution-environment-ref"
6649 primitive_name
= config_primitive
.get(
6650 "execution-environment-primitive", vnf_config_primitive
6652 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6653 nsr_deployed
["VCA"],
6654 member_vnf_index
=vnf_index
,
6656 vdu_count_index
=None,
6657 ee_descriptor_id
=ee_descriptor_id
,
6659 result
, result_detail
= await self
._ns
_execute
_primitive
(
6668 + "vnf_config_primitive={} Done with result {} {}".format(
6669 vnf_config_primitive
, result
, result_detail
6672 # Update operationState = COMPLETED | FAILED
6673 self
._update
_suboperation
_status
(
6674 db_nslcmop
, op_index
, result
, result_detail
6677 if result
== "FAILED":
6678 raise LcmException(result_detail
)
6679 db_nsr_update
["config-status"] = old_config_status
6680 scale_process
= None
6684 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
6687 "_admin.scaling-group.{}.time".format(admin_scale_index
)
6690 # SCALE-IN VCA - BEGIN
6691 if vca_scaling_info
:
6692 step
= db_nslcmop_update
[
6694 ] = "Deleting the execution environments"
6695 scale_process
= "VCA"
6696 for vca_info
in vca_scaling_info
:
6697 if vca_info
["type"] == "delete" and not vca_info
.get("osm_kdu_id"):
6698 member_vnf_index
= str(vca_info
["member-vnf-index"])
6700 logging_text
+ "vdu info: {}".format(vca_info
)
6702 if vca_info
.get("osm_vdu_id"):
6703 vdu_id
= vca_info
["osm_vdu_id"]
6704 vdu_index
= int(vca_info
["vdu_index"])
6707 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6708 member_vnf_index
, vdu_id
, vdu_index
6710 stage
[2] = step
= "Scaling in VCA"
6711 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6712 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
6713 config_update
= db_nsr
["configurationStatus"]
6714 for vca_index
, vca
in enumerate(vca_update
):
6716 (vca
or vca
.get("ee_id"))
6717 and vca
["member-vnf-index"] == member_vnf_index
6718 and vca
["vdu_count_index"] == vdu_index
6720 if vca
.get("vdu_id"):
6721 config_descriptor
= get_configuration(
6722 db_vnfd
, vca
.get("vdu_id")
6724 elif vca
.get("kdu_name"):
6725 config_descriptor
= get_configuration(
6726 db_vnfd
, vca
.get("kdu_name")
6729 config_descriptor
= get_configuration(
6730 db_vnfd
, db_vnfd
["id"]
6732 operation_params
= (
6733 db_nslcmop
.get("operationParams") or {}
6735 exec_terminate_primitives
= not operation_params
.get(
6736 "skip_terminate_primitives"
6737 ) and vca
.get("needed_terminate")
6738 task
= asyncio
.ensure_future(
6747 exec_primitives
=exec_terminate_primitives
,
6751 timeout
=self
.timeout_charm_delete
,
6754 tasks_dict_info
[task
] = "Terminating VCA {}".format(
6757 del vca_update
[vca_index
]
6758 del config_update
[vca_index
]
6759 # wait for pending tasks of terminate primitives
6763 + "Waiting for tasks {}".format(
6764 list(tasks_dict_info
.keys())
6767 error_list
= await self
._wait
_for
_tasks
(
6771 self
.timeout_charm_delete
, self
.timeout_ns_terminate
6776 tasks_dict_info
.clear()
6778 raise LcmException("; ".join(error_list
))
6780 db_vca_and_config_update
= {
6781 "_admin.deployed.VCA": vca_update
,
6782 "configurationStatus": config_update
,
6785 "nsrs", db_nsr
["_id"], db_vca_and_config_update
6787 scale_process
= None
6788 # SCALE-IN VCA - END
6791 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
6792 scale_process
= "RO"
6793 if self
.ro_config
.get("ng"):
6794 await self
._scale
_ng
_ro
(
6795 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
6797 scaling_info
.pop("vdu-create", None)
6798 scaling_info
.pop("vdu-delete", None)
6800 scale_process
= None
6804 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
6805 scale_process
= "KDU"
6806 await self
._scale
_kdu
(
6807 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6809 scaling_info
.pop("kdu-create", None)
6810 scaling_info
.pop("kdu-delete", None)
6812 scale_process
= None
6816 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6818 # SCALE-UP VCA - BEGIN
6819 if vca_scaling_info
:
6820 step
= db_nslcmop_update
[
6822 ] = "Creating new execution environments"
6823 scale_process
= "VCA"
6824 for vca_info
in vca_scaling_info
:
6825 if vca_info
["type"] == "create" and not vca_info
.get("osm_kdu_id"):
6826 member_vnf_index
= str(vca_info
["member-vnf-index"])
6828 logging_text
+ "vdu info: {}".format(vca_info
)
6830 vnfd_id
= db_vnfr
["vnfd-ref"]
6831 if vca_info
.get("osm_vdu_id"):
6832 vdu_index
= int(vca_info
["vdu_index"])
6833 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
6834 if db_vnfr
.get("additionalParamsForVnf"):
6835 deploy_params
.update(
6837 db_vnfr
["additionalParamsForVnf"].copy()
6840 descriptor_config
= get_configuration(
6841 db_vnfd
, db_vnfd
["id"]
6843 if descriptor_config
:
6848 logging_text
=logging_text
6849 + "member_vnf_index={} ".format(member_vnf_index
),
6852 nslcmop_id
=nslcmop_id
,
6858 member_vnf_index
=member_vnf_index
,
6859 vdu_index
=vdu_index
,
6861 deploy_params
=deploy_params
,
6862 descriptor_config
=descriptor_config
,
6863 base_folder
=base_folder
,
6864 task_instantiation_info
=tasks_dict_info
,
6867 vdu_id
= vca_info
["osm_vdu_id"]
6868 vdur
= find_in_list(
6869 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
6871 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
6872 if vdur
.get("additionalParams"):
6873 deploy_params_vdu
= parse_yaml_strings(
6874 vdur
["additionalParams"]
6877 deploy_params_vdu
= deploy_params
6878 deploy_params_vdu
["OSM"] = get_osm_params(
6879 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
6881 if descriptor_config
:
6886 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6887 member_vnf_index
, vdu_id
, vdu_index
6889 stage
[2] = step
= "Scaling out VCA"
6890 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6892 logging_text
=logging_text
6893 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6894 member_vnf_index
, vdu_id
, vdu_index
6898 nslcmop_id
=nslcmop_id
,
6904 member_vnf_index
=member_vnf_index
,
6905 vdu_index
=vdu_index
,
6907 deploy_params
=deploy_params_vdu
,
6908 descriptor_config
=descriptor_config
,
6909 base_folder
=base_folder
,
6910 task_instantiation_info
=tasks_dict_info
,
6913 # SCALE-UP VCA - END
6914 scale_process
= None
6917 # execute primitive service POST-SCALING
6918 step
= "Executing post-scale vnf-config-primitive"
6919 if scaling_descriptor
.get("scaling-config-action"):
6920 for scaling_config_action
in scaling_descriptor
[
6921 "scaling-config-action"
6924 scaling_config_action
.get("trigger") == "post-scale-in"
6925 and scaling_type
== "SCALE_IN"
6927 scaling_config_action
.get("trigger") == "post-scale-out"
6928 and scaling_type
== "SCALE_OUT"
6930 vnf_config_primitive
= scaling_config_action
[
6931 "vnf-config-primitive-name-ref"
6933 step
= db_nslcmop_update
[
6935 ] = "executing post-scale scaling-config-action '{}'".format(
6936 vnf_config_primitive
6939 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6940 if db_vnfr
.get("additionalParamsForVnf"):
6941 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6943 # look for primitive
6944 for config_primitive
in (
6945 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6946 ).get("config-primitive", ()):
6947 if config_primitive
["name"] == vnf_config_primitive
:
6951 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
6952 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
6953 "config-primitive".format(
6954 scaling_group
, vnf_config_primitive
6957 scale_process
= "VCA"
6958 db_nsr_update
["config-status"] = "configuring post-scaling"
6959 primitive_params
= self
._map
_primitive
_params
(
6960 config_primitive
, {}, vnfr_params
6963 # Post-scale retry check: Check if this sub-operation has been executed before
6964 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6967 vnf_config_primitive
,
6971 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6972 # Skip sub-operation
6973 result
= "COMPLETED"
6974 result_detail
= "Done"
6977 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6978 vnf_config_primitive
, result
, result_detail
6982 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6983 # New sub-operation: Get index of this sub-operation
6985 len(db_nslcmop
.get("_admin", {}).get("operations"))
6990 + "vnf_config_primitive={} New sub-operation".format(
6991 vnf_config_primitive
6995 # retry: Get registered params for this existing sub-operation
6996 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6999 vnf_index
= op
.get("member_vnf_index")
7000 vnf_config_primitive
= op
.get("primitive")
7001 primitive_params
= op
.get("primitive_params")
7004 + "vnf_config_primitive={} Sub-operation retry".format(
7005 vnf_config_primitive
7008 # Execute the primitive, either with new (first-time) or registered (reintent) args
7009 ee_descriptor_id
= config_primitive
.get(
7010 "execution-environment-ref"
7012 primitive_name
= config_primitive
.get(
7013 "execution-environment-primitive", vnf_config_primitive
7015 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
7016 nsr_deployed
["VCA"],
7017 member_vnf_index
=vnf_index
,
7019 vdu_count_index
=None,
7020 ee_descriptor_id
=ee_descriptor_id
,
7022 result
, result_detail
= await self
._ns
_execute
_primitive
(
7031 + "vnf_config_primitive={} Done with result {} {}".format(
7032 vnf_config_primitive
, result
, result_detail
7035 # Update operationState = COMPLETED | FAILED
7036 self
._update
_suboperation
_status
(
7037 db_nslcmop
, op_index
, result
, result_detail
7040 if result
== "FAILED":
7041 raise LcmException(result_detail
)
7042 db_nsr_update
["config-status"] = old_config_status
7043 scale_process
= None
7048 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
7049 db_nsr_update
["operational-status"] = (
7051 if old_operational_status
== "failed"
7052 else old_operational_status
7054 db_nsr_update
["config-status"] = old_config_status
7057 ROclient
.ROClientException
,
7062 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7064 except asyncio
.CancelledError
:
7066 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7068 exc
= "Operation was cancelled"
7069 except Exception as e
:
7070 exc
= traceback
.format_exc()
7071 self
.logger
.critical(
7072 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7076 self
._write
_ns
_status
(
7079 current_operation
="IDLE",
7080 current_operation_id
=None,
7083 stage
[1] = "Waiting for instantiate pending tasks."
7084 self
.logger
.debug(logging_text
+ stage
[1])
7085 exc
= await self
._wait
_for
_tasks
(
7088 self
.timeout_ns_deploy
,
7096 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7097 nslcmop_operation_state
= "FAILED"
7099 db_nsr_update
["operational-status"] = old_operational_status
7100 db_nsr_update
["config-status"] = old_config_status
7101 db_nsr_update
["detailed-status"] = ""
7103 if "VCA" in scale_process
:
7104 db_nsr_update
["config-status"] = "failed"
7105 if "RO" in scale_process
:
7106 db_nsr_update
["operational-status"] = "failed"
7109 ] = "FAILED scaling nslcmop={} {}: {}".format(
7110 nslcmop_id
, step
, exc
7113 error_description_nslcmop
= None
7114 nslcmop_operation_state
= "COMPLETED"
7115 db_nslcmop_update
["detailed-status"] = "Done"
7117 self
._write
_op
_status
(
7120 error_message
=error_description_nslcmop
,
7121 operation_state
=nslcmop_operation_state
,
7122 other_update
=db_nslcmop_update
,
7125 self
._write
_ns
_status
(
7128 current_operation
="IDLE",
7129 current_operation_id
=None,
7130 other_update
=db_nsr_update
,
7133 if nslcmop_operation_state
:
7137 "nslcmop_id": nslcmop_id
,
7138 "operationState": nslcmop_operation_state
,
7140 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
7141 except Exception as e
:
7143 logging_text
+ "kafka_write notification Exception {}".format(e
)
7145 self
.logger
.debug(logging_text
+ "Exit")
7146 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
7148 async def _scale_kdu(
7149 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
7151 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
7152 for kdu_name
in _scaling_info
:
7153 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
7154 deployed_kdu
, index
= get_deployed_kdu(
7155 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
7157 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
7158 kdu_instance
= deployed_kdu
["kdu-instance"]
7159 kdu_model
= deployed_kdu
.get("kdu-model")
7160 scale
= int(kdu_scaling_info
["scale"])
7161 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
7164 "collection": "nsrs",
7165 "filter": {"_id": nsr_id
},
7166 "path": "_admin.deployed.K8s.{}".format(index
),
7169 step
= "scaling application {}".format(
7170 kdu_scaling_info
["resource-name"]
7172 self
.logger
.debug(logging_text
+ step
)
7174 if kdu_scaling_info
["type"] == "delete":
7175 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7178 and kdu_config
.get("terminate-config-primitive")
7179 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7181 terminate_config_primitive_list
= kdu_config
.get(
7182 "terminate-config-primitive"
7184 terminate_config_primitive_list
.sort(
7185 key
=lambda val
: int(val
["seq"])
7189 terminate_config_primitive
7190 ) in terminate_config_primitive_list
:
7191 primitive_params_
= self
._map
_primitive
_params
(
7192 terminate_config_primitive
, {}, {}
7194 step
= "execute terminate config primitive"
7195 self
.logger
.debug(logging_text
+ step
)
7196 await asyncio
.wait_for(
7197 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7198 cluster_uuid
=cluster_uuid
,
7199 kdu_instance
=kdu_instance
,
7200 primitive_name
=terminate_config_primitive
["name"],
7201 params
=primitive_params_
,
7208 await asyncio
.wait_for(
7209 self
.k8scluster_map
[k8s_cluster_type
].scale(
7212 kdu_scaling_info
["resource-name"],
7214 cluster_uuid
=cluster_uuid
,
7215 kdu_model
=kdu_model
,
7219 timeout
=self
.timeout_vca_on_error
,
7222 if kdu_scaling_info
["type"] == "create":
7223 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7226 and kdu_config
.get("initial-config-primitive")
7227 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7229 initial_config_primitive_list
= kdu_config
.get(
7230 "initial-config-primitive"
7232 initial_config_primitive_list
.sort(
7233 key
=lambda val
: int(val
["seq"])
7236 for initial_config_primitive
in initial_config_primitive_list
:
7237 primitive_params_
= self
._map
_primitive
_params
(
7238 initial_config_primitive
, {}, {}
7240 step
= "execute initial config primitive"
7241 self
.logger
.debug(logging_text
+ step
)
7242 await asyncio
.wait_for(
7243 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7244 cluster_uuid
=cluster_uuid
,
7245 kdu_instance
=kdu_instance
,
7246 primitive_name
=initial_config_primitive
["name"],
7247 params
=primitive_params_
,
7254 async def _scale_ng_ro(
7255 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
7257 nsr_id
= db_nslcmop
["nsInstanceId"]
7258 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7261 # read from db: vnfd's for every vnf
7264 # for each vnf in ns, read vnfd
7265 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
7266 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
7267 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
7268 # if we haven't this vnfd, read it from db
7269 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
7271 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7272 db_vnfds
.append(vnfd
)
7273 n2vc_key
= self
.n2vc
.get_public_key()
7274 n2vc_key_list
= [n2vc_key
]
7277 vdu_scaling_info
.get("vdu-create"),
7278 vdu_scaling_info
.get("vdu-delete"),
7281 # db_vnfr has been updated, update db_vnfrs to use it
7282 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
7283 await self
._instantiate
_ng
_ro
(
7293 start_deploy
=time(),
7294 timeout_ns_deploy
=self
.timeout_ns_deploy
,
7296 if vdu_scaling_info
.get("vdu-delete"):
7298 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
7301 async def extract_prometheus_scrape_jobs(
7302 self
, ee_id
, artifact_path
, ee_config_descriptor
, vnfr_id
, nsr_id
, target_ip
7304 # look if exist a file called 'prometheus*.j2' and
7305 artifact_content
= self
.fs
.dir_ls(artifact_path
)
7309 for f
in artifact_content
7310 if f
.startswith("prometheus") and f
.endswith(".j2")
7316 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
7320 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
7321 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
7323 vnfr_id
= vnfr_id
.replace("-", "")
7325 "JOB_NAME": vnfr_id
,
7326 "TARGET_IP": target_ip
,
7327 "EXPORTER_POD_IP": host_name
,
7328 "EXPORTER_POD_PORT": host_port
,
7330 job_list
= parse_job(job_data
, variables
)
7331 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
7332 for job
in job_list
:
7334 not isinstance(job
.get("job_name"), str)
7335 or vnfr_id
not in job
["job_name"]
7337 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
7338 job
["nsr_id"] = nsr_id
7339 job
["vnfr_id"] = vnfr_id
7342 async def rebuild_start_stop(
7343 self
, nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
7345 logging_text
= "Task ns={} {}={} ".format(nsr_id
, operation_type
, nslcmop_id
)
7346 self
.logger
.info(logging_text
+ "Enter")
7347 stage
= ["Preparing the environment", ""]
7348 # database nsrs record
7352 # in case of error, indicates what part of scale was failed to put nsr at error status
7353 start_deploy
= time()
7355 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_id
})
7356 vim_account_id
= db_vnfr
.get("vim-account-id")
7357 vim_info_key
= "vim:" + vim_account_id
7358 vdu_id
= additional_param
["vdu_id"]
7359 vdurs
= [item
for item
in db_vnfr
["vdur"] if item
["vdu-id-ref"] == vdu_id
]
7360 vdur
= find_in_list(
7361 vdurs
, lambda vdu
: vdu
["count-index"] == additional_param
["count-index"]
7364 vdu_vim_name
= vdur
["name"]
7365 vim_vm_id
= vdur
["vim_info"][vim_info_key
]["vim_id"]
7366 target_vim
, _
= next(k_v
for k_v
in vdur
["vim_info"].items())
7368 raise LcmException("Target vdu is not found")
7369 self
.logger
.info("vdu_vim_name >> {} ".format(vdu_vim_name
))
7370 # wait for any previous tasks in process
7371 stage
[1] = "Waiting for previous operations to terminate"
7372 self
.logger
.info(stage
[1])
7373 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7375 stage
[1] = "Reading from database."
7376 self
.logger
.info(stage
[1])
7377 self
._write
_ns
_status
(
7380 current_operation
=operation_type
.upper(),
7381 current_operation_id
=nslcmop_id
,
7383 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7386 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
7387 db_nsr_update
["operational-status"] = operation_type
7388 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7392 "vim_vm_id": vim_vm_id
,
7394 "vdu_index": additional_param
["count-index"],
7395 "vdu_id": vdur
["id"],
7396 "target_vim": target_vim
,
7397 "vim_account_id": vim_account_id
,
7400 stage
[1] = "Sending rebuild request to RO... {}".format(desc
)
7401 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7402 self
.logger
.info("ro nsr id: {}".format(nsr_id
))
7403 result_dict
= await self
.RO
.operate(nsr_id
, desc
, operation_type
)
7404 self
.logger
.info("response from RO: {}".format(result_dict
))
7405 action_id
= result_dict
["action_id"]
7406 await self
._wait
_ng
_ro
(
7411 self
.timeout_operate
,
7413 "start_stop_rebuild",
7415 return "COMPLETED", "Done"
7416 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7417 self
.logger
.error("Exit Exception {}".format(e
))
7419 except asyncio
.CancelledError
:
7420 self
.logger
.error("Cancelled Exception while '{}'".format(stage
))
7421 exc
= "Operation was cancelled"
7422 except Exception as e
:
7423 exc
= traceback
.format_exc()
7424 self
.logger
.critical(
7425 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7427 return "FAILED", "Error in operate VNF {}".format(exc
)
7429 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7431 Get VCA Cloud and VCA Cloud Credentials for the VIM account
7433 :param: vim_account_id: VIM Account ID
7435 :return: (cloud_name, cloud_credential)
7437 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7438 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
7440 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7442 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
7444 :param: vim_account_id: VIM Account ID
7446 :return: (cloud_name, cloud_credential)
7448 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7449 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")
7451 async def migrate(self
, nsr_id
, nslcmop_id
):
7453 Migrate VNFs and VDUs instances in a NS
7455 :param: nsr_id: NS Instance ID
7456 :param: nslcmop_id: nslcmop ID of migrate
7459 # Try to lock HA task here
7460 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7461 if not task_is_locked_by_me
:
7463 logging_text
= "Task ns={} migrate ".format(nsr_id
)
7464 self
.logger
.debug(logging_text
+ "Enter")
7465 # get all needed from database
7467 db_nslcmop_update
= {}
7468 nslcmop_operation_state
= None
7472 # in case of error, indicates what part of scale was failed to put nsr at error status
7473 start_deploy
= time()
7476 # wait for any previous tasks in process
7477 step
= "Waiting for previous operations to terminate"
7478 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7480 self
._write
_ns
_status
(
7483 current_operation
="MIGRATING",
7484 current_operation_id
=nslcmop_id
,
7486 step
= "Getting nslcmop from database"
7488 step
+ " after having waited for previous tasks to be completed"
7490 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7491 migrate_params
= db_nslcmop
.get("operationParams")
7494 target
.update(migrate_params
)
7495 desc
= await self
.RO
.migrate(nsr_id
, target
)
7496 self
.logger
.debug("RO return > {}".format(desc
))
7497 action_id
= desc
["action_id"]
7498 await self
._wait
_ng
_ro
(
7503 self
.timeout_migrate
,
7504 operation
="migrate",
7506 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7507 self
.logger
.error("Exit Exception {}".format(e
))
7509 except asyncio
.CancelledError
:
7510 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
7511 exc
= "Operation was cancelled"
7512 except Exception as e
:
7513 exc
= traceback
.format_exc()
7514 self
.logger
.critical(
7515 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7518 self
._write
_ns
_status
(
7521 current_operation
="IDLE",
7522 current_operation_id
=None,
7525 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
7526 nslcmop_operation_state
= "FAILED"
7528 nslcmop_operation_state
= "COMPLETED"
7529 db_nslcmop_update
["detailed-status"] = "Done"
7530 db_nsr_update
["detailed-status"] = "Done"
7532 self
._write
_op
_status
(
7536 operation_state
=nslcmop_operation_state
,
7537 other_update
=db_nslcmop_update
,
7539 if nslcmop_operation_state
:
7543 "nslcmop_id": nslcmop_id
,
7544 "operationState": nslcmop_operation_state
,
7546 await self
.msg
.aiowrite("ns", "migrated", msg
, loop
=self
.loop
)
7547 except Exception as e
:
7549 logging_text
+ "kafka_write notification Exception {}".format(e
)
7551 self
.logger
.debug(logging_text
+ "Exit")
7552 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_migrate")
7554 async def heal(self
, nsr_id
, nslcmop_id
):
7558 :param nsr_id: ns instance to heal
7559 :param nslcmop_id: operation to run
7563 # Try to lock HA task here
7564 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7565 if not task_is_locked_by_me
:
7568 logging_text
= "Task ns={} heal={} ".format(nsr_id
, nslcmop_id
)
7569 stage
= ["", "", ""]
7570 tasks_dict_info
= {}
7571 # ^ stage, step, VIM progress
7572 self
.logger
.debug(logging_text
+ "Enter")
7573 # get all needed from database
7575 db_nslcmop_update
= {}
7577 db_vnfrs
= {} # vnf's info indexed by _id
7579 old_operational_status
= ""
7580 old_config_status
= ""
7583 # wait for any previous tasks in process
7584 step
= "Waiting for previous operations to terminate"
7585 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7586 self
._write
_ns
_status
(
7589 current_operation
="HEALING",
7590 current_operation_id
=nslcmop_id
,
7593 step
= "Getting nslcmop from database"
7595 step
+ " after having waited for previous tasks to be completed"
7597 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7599 step
= "Getting nsr from database"
7600 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
7601 old_operational_status
= db_nsr
["operational-status"]
7602 old_config_status
= db_nsr
["config-status"]
7605 "_admin.deployed.RO.operational-status": "healing",
7607 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7609 step
= "Sending heal order to VIM"
7610 # task_ro = asyncio.ensure_future(
7612 # logging_text=logging_text,
7614 # db_nslcmop=db_nslcmop,
7618 # self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "heal_RO", task_ro)
7619 # tasks_dict_info[task_ro] = "Healing at VIM"
7621 logging_text
=logging_text
,
7623 db_nslcmop
=db_nslcmop
,
7628 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
7629 self
.logger
.debug(logging_text
+ stage
[1])
7630 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7631 self
.fs
.sync(db_nsr
["nsd-id"])
7633 # read from db: vnfr's of this ns
7634 step
= "Getting vnfrs from db"
7635 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
7636 for vnfr
in db_vnfrs_list
:
7637 db_vnfrs
[vnfr
["_id"]] = vnfr
7638 self
.logger
.debug("ns.heal db_vnfrs={}".format(db_vnfrs
))
7640 # Check for each target VNF
7641 target_list
= db_nslcmop
.get("operationParams", {}).get("healVnfData", {})
7642 for target_vnf
in target_list
:
7643 # Find this VNF in the list from DB
7644 vnfr_id
= target_vnf
.get("vnfInstanceId", None)
7646 db_vnfr
= db_vnfrs
[vnfr_id
]
7647 vnfd_id
= db_vnfr
.get("vnfd-id")
7648 vnfd_ref
= db_vnfr
.get("vnfd-ref")
7649 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7650 base_folder
= vnfd
["_admin"]["storage"]
7655 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
7656 member_vnf_index
= db_vnfr
.get("member-vnf-index-ref")
7658 # Check each target VDU and deploy N2VC
7659 target_vdu_list
= target_vnf
.get("additionalParams", {}).get(
7662 if not target_vdu_list
:
7663 # Codigo nuevo para crear diccionario
7664 target_vdu_list
= []
7665 for existing_vdu
in db_vnfr
.get("vdur"):
7666 vdu_name
= existing_vdu
.get("vdu-name", None)
7667 vdu_index
= existing_vdu
.get("count-index", 0)
7668 vdu_run_day1
= target_vnf
.get("additionalParams", {}).get(
7671 vdu_to_be_healed
= {
7673 "count-index": vdu_index
,
7674 "run-day1": vdu_run_day1
,
7676 target_vdu_list
.append(vdu_to_be_healed
)
7677 for target_vdu
in target_vdu_list
:
7678 deploy_params_vdu
= target_vdu
7679 # Set run-day1 vnf level value if not vdu level value exists
7680 if not deploy_params_vdu
.get("run-day1") and target_vnf
[
7683 deploy_params_vdu
["run-day1"] = target_vnf
[
7686 vdu_name
= target_vdu
.get("vdu-id", None)
7687 # TODO: Get vdu_id from vdud.
7689 # For multi instance VDU count-index is mandatory
7690 # For single session VDU count-indes is 0
7691 vdu_index
= target_vdu
.get("count-index", 0)
7693 # n2vc_redesign STEP 3 to 6 Deploy N2VC
7694 stage
[1] = "Deploying Execution Environments."
7695 self
.logger
.debug(logging_text
+ stage
[1])
7697 # VNF Level charm. Normal case when proxy charms.
7698 # If target instance is management machine continue with actions: recreate EE for native charms or reinject juju key for proxy charms.
7699 descriptor_config
= get_configuration(vnfd
, vnfd_ref
)
7700 if descriptor_config
:
7701 # Continue if healed machine is management machine
7702 vnf_ip_address
= db_vnfr
.get("ip-address")
7703 target_instance
= None
7704 for instance
in db_vnfr
.get("vdur", None):
7706 instance
["vdu-name"] == vdu_name
7707 and instance
["count-index"] == vdu_index
7709 target_instance
= instance
7711 if vnf_ip_address
== target_instance
.get("ip-address"):
7713 logging_text
=logging_text
7714 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7715 member_vnf_index
, vdu_name
, vdu_index
7719 nslcmop_id
=nslcmop_id
,
7725 member_vnf_index
=member_vnf_index
,
7728 deploy_params
=deploy_params_vdu
,
7729 descriptor_config
=descriptor_config
,
7730 base_folder
=base_folder
,
7731 task_instantiation_info
=tasks_dict_info
,
7735 # VDU Level charm. Normal case with native charms.
7736 descriptor_config
= get_configuration(vnfd
, vdu_name
)
7737 if descriptor_config
:
7739 logging_text
=logging_text
7740 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7741 member_vnf_index
, vdu_name
, vdu_index
7745 nslcmop_id
=nslcmop_id
,
7751 member_vnf_index
=member_vnf_index
,
7752 vdu_index
=vdu_index
,
7754 deploy_params
=deploy_params_vdu
,
7755 descriptor_config
=descriptor_config
,
7756 base_folder
=base_folder
,
7757 task_instantiation_info
=tasks_dict_info
,
7762 ROclient
.ROClientException
,
7767 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7769 except asyncio
.CancelledError
:
7771 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7773 exc
= "Operation was cancelled"
7774 except Exception as e
:
7775 exc
= traceback
.format_exc()
7776 self
.logger
.critical(
7777 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7782 stage
[1] = "Waiting for healing pending tasks."
7783 self
.logger
.debug(logging_text
+ stage
[1])
7784 exc
= await self
._wait
_for
_tasks
(
7787 self
.timeout_ns_deploy
,
7795 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7796 nslcmop_operation_state
= "FAILED"
7798 db_nsr_update
["operational-status"] = old_operational_status
7799 db_nsr_update
["config-status"] = old_config_status
7802 ] = "FAILED healing nslcmop={} {}: {}".format(nslcmop_id
, step
, exc
)
7803 for task
, task_name
in tasks_dict_info
.items():
7804 if not task
.done() or task
.cancelled() or task
.exception():
7805 if task_name
.startswith(self
.task_name_deploy_vca
):
7806 # A N2VC task is pending
7807 db_nsr_update
["config-status"] = "failed"
7809 # RO task is pending
7810 db_nsr_update
["operational-status"] = "failed"
7812 error_description_nslcmop
= None
7813 nslcmop_operation_state
= "COMPLETED"
7814 db_nslcmop_update
["detailed-status"] = "Done"
7815 db_nsr_update
["detailed-status"] = "Done"
7816 db_nsr_update
["operational-status"] = "running"
7817 db_nsr_update
["config-status"] = "configured"
7819 self
._write
_op
_status
(
7822 error_message
=error_description_nslcmop
,
7823 operation_state
=nslcmop_operation_state
,
7824 other_update
=db_nslcmop_update
,
7827 self
._write
_ns
_status
(
7830 current_operation
="IDLE",
7831 current_operation_id
=None,
7832 other_update
=db_nsr_update
,
7835 if nslcmop_operation_state
:
7839 "nslcmop_id": nslcmop_id
,
7840 "operationState": nslcmop_operation_state
,
7842 await self
.msg
.aiowrite("ns", "healed", msg
, loop
=self
.loop
)
7843 except Exception as e
:
7845 logging_text
+ "kafka_write notification Exception {}".format(e
)
7847 self
.logger
.debug(logging_text
+ "Exit")
7848 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_heal")
7859 :param logging_text: preffix text to use at logging
7860 :param nsr_id: nsr identity
7861 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
7862 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
7863 :return: None or exception
7866 def get_vim_account(vim_account_id
):
7868 if vim_account_id
in db_vims
:
7869 return db_vims
[vim_account_id
]
7870 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
7871 db_vims
[vim_account_id
] = db_vim
7876 ns_params
= db_nslcmop
.get("operationParams")
7877 if ns_params
and ns_params
.get("timeout_ns_heal"):
7878 timeout_ns_heal
= ns_params
["timeout_ns_heal"]
7880 timeout_ns_heal
= self
.timeout
.get("ns_heal", self
.timeout_ns_heal
)
7884 nslcmop_id
= db_nslcmop
["_id"]
7886 "action_id": nslcmop_id
,
7888 self
.logger
.warning(
7889 "db_nslcmop={} and timeout_ns_heal={}".format(
7890 db_nslcmop
, timeout_ns_heal
7893 target
.update(db_nslcmop
.get("operationParams", {}))
7895 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
7896 desc
= await self
.RO
.recreate(nsr_id
, target
)
7897 self
.logger
.debug("RO return > {}".format(desc
))
7898 action_id
= desc
["action_id"]
7899 # waits for RO to complete because Reinjecting juju key at ro can find VM in state Deleted
7900 await self
._wait
_ng
_ro
(
7907 operation
="healing",
7912 "_admin.deployed.RO.operational-status": "running",
7913 "detailed-status": " ".join(stage
),
7915 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7916 self
._write
_op
_status
(nslcmop_id
, stage
)
7918 logging_text
+ "ns healed at RO. RO_id={}".format(action_id
)
7921 except Exception as e
:
7922 stage
[2] = "ERROR healing at VIM"
7923 # self.set_vnfr_at_error(db_vnfrs, str(e))
7925 "Error healing at VIM {}".format(e
),
7926 exc_info
=not isinstance(
7929 ROclient
.ROClientException
,
7955 task_instantiation_info
,
7958 # launch instantiate_N2VC in a asyncio task and register task object
7959 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
7960 # if not found, create one entry and update database
7961 # fill db_nsr._admin.deployed.VCA.<index>
7964 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
7966 if "execution-environment-list" in descriptor_config
:
7967 ee_list
= descriptor_config
.get("execution-environment-list", [])
7968 elif "juju" in descriptor_config
:
7969 ee_list
= [descriptor_config
] # ns charms
7970 else: # other types as script are not supported
7973 for ee_item
in ee_list
:
7976 + "_deploy_n2vc ee_item juju={}, helm={}".format(
7977 ee_item
.get("juju"), ee_item
.get("helm-chart")
7980 ee_descriptor_id
= ee_item
.get("id")
7981 if ee_item
.get("juju"):
7982 vca_name
= ee_item
["juju"].get("charm")
7985 if ee_item
["juju"].get("charm") is not None
7988 if ee_item
["juju"].get("cloud") == "k8s":
7989 vca_type
= "k8s_proxy_charm"
7990 elif ee_item
["juju"].get("proxy") is False:
7991 vca_type
= "native_charm"
7992 elif ee_item
.get("helm-chart"):
7993 vca_name
= ee_item
["helm-chart"]
7994 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
7997 vca_type
= "helm-v3"
8000 logging_text
+ "skipping non juju neither charm configuration"
8005 for vca_index
, vca_deployed
in enumerate(
8006 db_nsr
["_admin"]["deployed"]["VCA"]
8008 if not vca_deployed
:
8011 vca_deployed
.get("member-vnf-index") == member_vnf_index
8012 and vca_deployed
.get("vdu_id") == vdu_id
8013 and vca_deployed
.get("kdu_name") == kdu_name
8014 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
8015 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
8019 # not found, create one.
8021 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
8024 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
8026 target
+= "/kdu/{}".format(kdu_name
)
8028 "target_element": target
,
8029 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
8030 "member-vnf-index": member_vnf_index
,
8032 "kdu_name": kdu_name
,
8033 "vdu_count_index": vdu_index
,
8034 "operational-status": "init", # TODO revise
8035 "detailed-status": "", # TODO revise
8036 "step": "initial-deploy", # TODO revise
8038 "vdu_name": vdu_name
,
8040 "ee_descriptor_id": ee_descriptor_id
,
8044 # create VCA and configurationStatus in db
8046 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
8047 "configurationStatus.{}".format(vca_index
): dict(),
8049 self
.update_db_2("nsrs", nsr_id
, db_dict
)
8051 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
8053 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
8054 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
8055 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
8058 task_n2vc
= asyncio
.ensure_future(
8060 logging_text
=logging_text
,
8061 vca_index
=vca_index
,
8067 vdu_index
=vdu_index
,
8068 deploy_params
=deploy_params
,
8069 config_descriptor
=descriptor_config
,
8070 base_folder
=base_folder
,
8071 nslcmop_id
=nslcmop_id
,
8075 ee_config_descriptor
=ee_item
,
8078 self
.lcm_tasks
.register(
8082 "instantiate_N2VC-{}".format(vca_index
),
8085 task_instantiation_info
[
8087 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
8088 member_vnf_index
or "", vdu_id
or ""
8091 async def heal_N2VC(
8108 ee_config_descriptor
,
8110 nsr_id
= db_nsr
["_id"]
8111 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
8112 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
8113 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
8114 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
8116 "collection": "nsrs",
8117 "filter": {"_id": nsr_id
},
8118 "path": db_update_entry
,
8123 element_under_configuration
= nsr_id
8127 vnfr_id
= db_vnfr
["_id"]
8128 osm_config
["osm"]["vnf_id"] = vnfr_id
8130 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
8132 if vca_type
== "native_charm":
8135 index_number
= vdu_index
or 0
8138 element_type
= "VNF"
8139 element_under_configuration
= vnfr_id
8140 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
8142 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
8143 element_type
= "VDU"
8144 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
8145 osm_config
["osm"]["vdu_id"] = vdu_id
8147 namespace
+= ".{}".format(kdu_name
)
8148 element_type
= "KDU"
8149 element_under_configuration
= kdu_name
8150 osm_config
["osm"]["kdu_name"] = kdu_name
8153 if base_folder
["pkg-dir"]:
8154 artifact_path
= "{}/{}/{}/{}".format(
8155 base_folder
["folder"],
8156 base_folder
["pkg-dir"],
8159 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8164 artifact_path
= "{}/Scripts/{}/{}/".format(
8165 base_folder
["folder"],
8168 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8173 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
8175 # get initial_config_primitive_list that applies to this element
8176 initial_config_primitive_list
= config_descriptor
.get(
8177 "initial-config-primitive"
8181 "Initial config primitive list > {}".format(
8182 initial_config_primitive_list
8186 # add config if not present for NS charm
8187 ee_descriptor_id
= ee_config_descriptor
.get("id")
8188 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
8189 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
8190 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
8194 "Initial config primitive list #2 > {}".format(
8195 initial_config_primitive_list
8198 # n2vc_redesign STEP 3.1
8199 # find old ee_id if exists
8200 ee_id
= vca_deployed
.get("ee_id")
8202 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
8203 # create or register execution environment in VCA. Only for native charms when healing
8204 if vca_type
== "native_charm":
8205 step
= "Waiting to VM being up and getting IP address"
8206 self
.logger
.debug(logging_text
+ step
)
8207 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8216 credentials
= {"hostname": rw_mgmt_ip
}
8218 username
= deep_get(
8219 config_descriptor
, ("config-access", "ssh-access", "default-user")
8221 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
8222 # merged. Meanwhile let's get username from initial-config-primitive
8223 if not username
and initial_config_primitive_list
:
8224 for config_primitive
in initial_config_primitive_list
:
8225 for param
in config_primitive
.get("parameter", ()):
8226 if param
["name"] == "ssh-username":
8227 username
= param
["value"]
8231 "Cannot determine the username neither with 'initial-config-primitive' nor with "
8232 "'config-access.ssh-access.default-user'"
8234 credentials
["username"] = username
8236 # n2vc_redesign STEP 3.2
8237 # TODO: Before healing at RO it is needed to destroy native charm units to be deleted.
8238 self
._write
_configuration
_status
(
8240 vca_index
=vca_index
,
8241 status
="REGISTERING",
8242 element_under_configuration
=element_under_configuration
,
8243 element_type
=element_type
,
8246 step
= "register execution environment {}".format(credentials
)
8247 self
.logger
.debug(logging_text
+ step
)
8248 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
8249 credentials
=credentials
,
8250 namespace
=namespace
,
8255 # update ee_id en db
8257 "_admin.deployed.VCA.{}.ee_id".format(vca_index
): ee_id
,
8259 self
.update_db_2("nsrs", nsr_id
, db_dict_ee_id
)
8261 # for compatibility with MON/POL modules, the need model and application name at database
8262 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
8263 # Not sure if this need to be done when healing
8265 ee_id_parts = ee_id.split(".")
8266 db_nsr_update = {db_update_entry + "ee_id": ee_id}
8267 if len(ee_id_parts) >= 2:
8268 model_name = ee_id_parts[0]
8269 application_name = ee_id_parts[1]
8270 db_nsr_update[db_update_entry + "model"] = model_name
8271 db_nsr_update[db_update_entry + "application"] = application_name
8274 # n2vc_redesign STEP 3.3
8275 # Install configuration software. Only for native charms.
8276 step
= "Install configuration Software"
8278 self
._write
_configuration
_status
(
8280 vca_index
=vca_index
,
8281 status
="INSTALLING SW",
8282 element_under_configuration
=element_under_configuration
,
8283 element_type
=element_type
,
8284 # other_update=db_nsr_update,
8288 # TODO check if already done
8289 self
.logger
.debug(logging_text
+ step
)
8291 if vca_type
== "native_charm":
8292 config_primitive
= next(
8293 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
8296 if config_primitive
:
8297 config
= self
._map
_primitive
_params
(
8298 config_primitive
, {}, deploy_params
8300 await self
.vca_map
[vca_type
].install_configuration_sw(
8302 artifact_path
=artifact_path
,
8310 # write in db flag of configuration_sw already installed
8312 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
8315 # Not sure if this need to be done when healing
8317 # add relations for this VCA (wait for other peers related with this VCA)
8318 await self._add_vca_relations(
8319 logging_text=logging_text,
8322 vca_index=vca_index,
8326 # if SSH access is required, then get execution environment SSH public
8327 # if native charm we have waited already to VM be UP
8328 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
8331 # self.logger.debug("get ssh key block")
8333 config_descriptor
, ("config-access", "ssh-access", "required")
8335 # self.logger.debug("ssh key needed")
8336 # Needed to inject a ssh key
8339 ("config-access", "ssh-access", "default-user"),
8341 step
= "Install configuration Software, getting public ssh key"
8342 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
8343 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
8346 step
= "Insert public key into VM user={} ssh_key={}".format(
8350 # self.logger.debug("no need to get ssh key")
8351 step
= "Waiting to VM being up and getting IP address"
8352 self
.logger
.debug(logging_text
+ step
)
8354 # n2vc_redesign STEP 5.1
8355 # wait for RO (ip-address) Insert pub_key into VM
8356 # IMPORTANT: We need do wait for RO to complete healing operation.
8357 await self
._wait
_heal
_ro
(nsr_id
, self
.timeout_ns_heal
)
8360 rw_mgmt_ip
= await self
.wait_kdu_up(
8361 logging_text
, nsr_id
, vnfr_id
, kdu_name
8364 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8374 rw_mgmt_ip
= None # This is for a NS configuration
8376 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
8378 # store rw_mgmt_ip in deploy params for later replacement
8379 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
8382 # get run-day1 operation parameter
8383 runDay1
= deploy_params
.get("run-day1", False)
8385 " Healing vnf={}, vdu={}, runDay1 ={}".format(vnfr_id
, vdu_id
, runDay1
)
8388 # n2vc_redesign STEP 6 Execute initial config primitive
8389 step
= "execute initial config primitive"
8391 # wait for dependent primitives execution (NS -> VNF -> VDU)
8392 if initial_config_primitive_list
:
8393 await self
._wait
_dependent
_n
2vc
(
8394 nsr_id
, vca_deployed_list
, vca_index
8397 # stage, in function of element type: vdu, kdu, vnf or ns
8398 my_vca
= vca_deployed_list
[vca_index
]
8399 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
8401 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
8402 elif my_vca
.get("member-vnf-index"):
8404 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
8407 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
8409 self
._write
_configuration
_status
(
8410 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
8413 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
8415 check_if_terminated_needed
= True
8416 for initial_config_primitive
in initial_config_primitive_list
:
8417 # adding information on the vca_deployed if it is a NS execution environment
8418 if not vca_deployed
["member-vnf-index"]:
8419 deploy_params
["ns_config_info"] = json
.dumps(
8420 self
._get
_ns
_config
_info
(nsr_id
)
8422 # TODO check if already done
8423 primitive_params_
= self
._map
_primitive
_params
(
8424 initial_config_primitive
, {}, deploy_params
8427 step
= "execute primitive '{}' params '{}'".format(
8428 initial_config_primitive
["name"], primitive_params_
8430 self
.logger
.debug(logging_text
+ step
)
8431 await self
.vca_map
[vca_type
].exec_primitive(
8433 primitive_name
=initial_config_primitive
["name"],
8434 params_dict
=primitive_params_
,
8439 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
8440 if check_if_terminated_needed
:
8441 if config_descriptor
.get("terminate-config-primitive"):
8445 {db_update_entry
+ "needed_terminate": True},
8447 check_if_terminated_needed
= False
8449 # TODO register in database that primitive is done
8451 # STEP 7 Configure metrics
8452 # Not sure if this need to be done when healing
8454 if vca_type == "helm" or vca_type == "helm-v3":
8455 prometheus_jobs = await self.extract_prometheus_scrape_jobs(
8457 artifact_path=artifact_path,
8458 ee_config_descriptor=ee_config_descriptor,
8461 target_ip=rw_mgmt_ip,
8467 {db_update_entry + "prometheus_jobs": prometheus_jobs},
8470 for job in prometheus_jobs:
8473 {"job_name": job["job_name"]},
8476 fail_on_empty=False,
8480 step
= "instantiated at VCA"
8481 self
.logger
.debug(logging_text
+ step
)
8483 self
._write
_configuration
_status
(
8484 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
8487 except Exception as e
: # TODO not use Exception but N2VC exception
8488 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
8490 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
8493 "Exception while {} : {}".format(step
, e
), exc_info
=True
8495 self
._write
_configuration
_status
(
8496 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
8498 raise LcmException("{} {}".format(step
, e
)) from e
8500 async def _wait_heal_ro(
8506 while time() <= start_time
+ timeout
:
8507 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
8508 operational_status_ro
= db_nsr
["_admin"]["deployed"]["RO"][
8509 "operational-status"
8511 self
.logger
.debug("Wait Heal RO > {}".format(operational_status_ro
))
8512 if operational_status_ro
!= "healing":
8514 await asyncio
.sleep(15, loop
=self
.loop
)
8515 else: # timeout_ns_deploy
8516 raise NgRoException("Timeout waiting ns to deploy")
8518 async def vertical_scale(self
, nsr_id
, nslcmop_id
):
8520 Vertical Scale the VDUs in a NS
8522 :param: nsr_id: NS Instance ID
8523 :param: nslcmop_id: nslcmop ID of migrate
8526 # Try to lock HA task here
8527 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
8528 if not task_is_locked_by_me
:
8530 logging_text
= "Task ns={} vertical scale ".format(nsr_id
)
8531 self
.logger
.debug(logging_text
+ "Enter")
8532 # get all needed from database
8534 db_nslcmop_update
= {}
8535 nslcmop_operation_state
= None
8539 # in case of error, indicates what part of scale was failed to put nsr at error status
8540 start_deploy
= time()
8543 # wait for any previous tasks in process
8544 step
= "Waiting for previous operations to terminate"
8545 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
8547 self
._write
_ns
_status
(
8550 current_operation
="VerticalScale",
8551 current_operation_id
=nslcmop_id
,
8553 step
= "Getting nslcmop from database"
8555 step
+ " after having waited for previous tasks to be completed"
8557 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
8558 operationParams
= db_nslcmop
.get("operationParams")
8560 target
.update(operationParams
)
8561 desc
= await self
.RO
.vertical_scale(nsr_id
, target
)
8562 self
.logger
.debug("RO return > {}".format(desc
))
8563 action_id
= desc
["action_id"]
8564 await self
._wait
_ng
_ro
(
8569 self
.timeout_verticalscale
,
8570 operation
="verticalscale",
8572 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
8573 self
.logger
.error("Exit Exception {}".format(e
))
8575 except asyncio
.CancelledError
:
8576 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
8577 exc
= "Operation was cancelled"
8578 except Exception as e
:
8579 exc
= traceback
.format_exc()
8580 self
.logger
.critical(
8581 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
8584 self
._write
_ns
_status
(
8587 current_operation
="IDLE",
8588 current_operation_id
=None,
8591 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
8592 nslcmop_operation_state
= "FAILED"
8594 nslcmop_operation_state
= "COMPLETED"
8595 db_nslcmop_update
["detailed-status"] = "Done"
8596 db_nsr_update
["detailed-status"] = "Done"
8598 self
._write
_op
_status
(
8602 operation_state
=nslcmop_operation_state
,
8603 other_update
=db_nslcmop_update
,
8605 if nslcmop_operation_state
:
8609 "nslcmop_id": nslcmop_id
,
8610 "operationState": nslcmop_operation_state
,
8612 await self
.msg
.aiowrite("ns", "verticalscaled", msg
, loop
=self
.loop
)
8613 except Exception as e
:
8615 logging_text
+ "kafka_write notification Exception {}".format(e
)
8617 self
.logger
.debug(logging_text
+ "Exit")
8618 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_verticalscale")