1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 from typing
import Any
, Dict
, List
24 import logging
.handlers
35 from osm_lcm
import ROclient
36 from osm_lcm
.data_utils
.nsr
import (
39 get_deployed_vca_list
,
42 from osm_lcm
.data_utils
.vca
import (
51 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
52 from osm_lcm
.lcm_utils
import (
59 check_juju_bundle_existence
,
60 get_charm_artifact_path
,
62 from osm_lcm
.data_utils
.nsd
import (
63 get_ns_configuration_relation_list
,
67 from osm_lcm
.data_utils
.vnfd
import (
73 get_ee_sorted_initial_config_primitive_list
,
74 get_ee_sorted_terminate_config_primitive_list
,
76 get_virtual_link_profiles
,
81 get_number_of_instances
,
83 get_kdu_resource_profile
,
84 find_software_version
,
86 from osm_lcm
.data_utils
.list_utils
import find_in_list
87 from osm_lcm
.data_utils
.vnfr
import (
91 get_volumes_from_instantiation_params
,
93 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
94 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
95 from n2vc
.definitions
import RelationEndpoint
96 from n2vc
.k8s_helm_conn
import K8sHelmConnector
97 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
98 from n2vc
.k8s_juju_conn
import K8sJujuConnector
100 from osm_common
.dbbase
import DbException
101 from osm_common
.fsbase
import FsException
103 from osm_lcm
.data_utils
.database
.database
import Database
104 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
106 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
107 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
109 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
110 from osm_lcm
.osm_config
import OsmConfigBuilder
111 from osm_lcm
.prometheus
import parse_job
113 from copy
import copy
, deepcopy
114 from time
import time
115 from uuid
import uuid4
117 from random
import randint
119 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
122 class NsLcm(LcmBase
):
123 timeout_vca_on_error
= (
125 ) # Time for charm from first time at blocked,error status to mark as failed
126 timeout_ns_deploy
= 2 * 3600 # default global timeout for deployment a ns
127 timeout_ns_terminate
= 1800 # default global timeout for un deployment a ns
128 timeout_ns_heal
= 1800 # default global timeout for un deployment a ns
129 timeout_charm_delete
= 10 * 60
130 timeout_primitive
= 30 * 60 # timeout for primitive execution
131 timeout_ns_update
= 30 * 60 # timeout for ns update
132 timeout_progress_primitive
= (
134 ) # timeout for some progress in a primitive execution
135 timeout_migrate
= 1800 # default global timeout for migrating vnfs
136 timeout_operate
= 1800 # default global timeout for migrating vnfs
137 timeout_verticalscale
= 1800 # default global timeout for Vertical Sclaing
138 SUBOPERATION_STATUS_NOT_FOUND
= -1
139 SUBOPERATION_STATUS_NEW
= -2
140 SUBOPERATION_STATUS_SKIP
= -3
141 task_name_deploy_vca
= "Deploying VCA"
143 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
145 Init, Connect to database, filesystem storage, and messaging
146 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
149 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
151 self
.db
= Database().instance
.db
152 self
.fs
= Filesystem().instance
.fs
154 self
.lcm_tasks
= lcm_tasks
155 self
.timeout
= config
["timeout"]
156 self
.ro_config
= config
["ro_config"]
157 self
.ng_ro
= config
["ro_config"].get("ng")
158 self
.vca_config
= config
["VCA"].copy()
160 # create N2VC connector
161 self
.n2vc
= N2VCJujuConnector(
164 on_update_db
=self
._on
_update
_n
2vc
_db
,
169 self
.conn_helm_ee
= LCMHelmConn(
172 vca_config
=self
.vca_config
,
173 on_update_db
=self
._on
_update
_n
2vc
_db
,
176 self
.k8sclusterhelm2
= K8sHelmConnector(
177 kubectl_command
=self
.vca_config
.get("kubectlpath"),
178 helm_command
=self
.vca_config
.get("helmpath"),
185 self
.k8sclusterhelm3
= K8sHelm3Connector(
186 kubectl_command
=self
.vca_config
.get("kubectlpath"),
187 helm_command
=self
.vca_config
.get("helm3path"),
194 self
.k8sclusterjuju
= K8sJujuConnector(
195 kubectl_command
=self
.vca_config
.get("kubectlpath"),
196 juju_command
=self
.vca_config
.get("jujupath"),
199 on_update_db
=self
._on
_update
_k
8s
_db
,
204 self
.k8scluster_map
= {
205 "helm-chart": self
.k8sclusterhelm2
,
206 "helm-chart-v3": self
.k8sclusterhelm3
,
207 "chart": self
.k8sclusterhelm3
,
208 "juju-bundle": self
.k8sclusterjuju
,
209 "juju": self
.k8sclusterjuju
,
213 "lxc_proxy_charm": self
.n2vc
,
214 "native_charm": self
.n2vc
,
215 "k8s_proxy_charm": self
.n2vc
,
216 "helm": self
.conn_helm_ee
,
217 "helm-v3": self
.conn_helm_ee
,
221 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
)
223 self
.op_status_map
= {
224 "instantiation": self
.RO
.status
,
225 "termination": self
.RO
.status
,
226 "migrate": self
.RO
.status
,
227 "healing": self
.RO
.recreate_status
,
228 "verticalscale": self
.RO
.status
,
232 def increment_ip_mac(ip_mac
, vm_index
=1):
233 if not isinstance(ip_mac
, str):
236 # try with ipv4 look for last dot
237 i
= ip_mac
.rfind(".")
240 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
241 # try with ipv6 or mac look for last colon. Operate in hex
242 i
= ip_mac
.rfind(":")
245 # format in hex, len can be 2 for mac or 4 for ipv6
246 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
247 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
253 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
255 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
258 # TODO filter RO descriptor fields...
262 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
263 db_dict
["deploymentStatus"] = ro_descriptor
264 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
266 except Exception as e
:
268 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id
, e
)
271 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
273 # remove last dot from path (if exists)
274 if path
.endswith("."):
277 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
278 # .format(table, filter, path, updated_data))
281 nsr_id
= filter.get("_id")
283 # read ns record from database
284 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
285 current_ns_status
= nsr
.get("nsState")
287 # get vca status for NS
288 status_dict
= await self
.n2vc
.get_status(
289 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
294 db_dict
["vcaStatus"] = status_dict
295 await self
.n2vc
.update_vca_status(db_dict
["vcaStatus"], vca_id
=vca_id
)
297 # update configurationStatus for this VCA
299 vca_index
= int(path
[path
.rfind(".") + 1 :])
302 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
304 vca_status
= vca_list
[vca_index
].get("status")
306 configuration_status_list
= nsr
.get("configurationStatus")
307 config_status
= configuration_status_list
[vca_index
].get("status")
309 if config_status
== "BROKEN" and vca_status
!= "failed":
310 db_dict
["configurationStatus"][vca_index
] = "READY"
311 elif config_status
!= "BROKEN" and vca_status
== "failed":
312 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
313 except Exception as e
:
314 # not update configurationStatus
315 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
317 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
318 # if nsState = 'DEGRADED' check if all is OK
320 if current_ns_status
in ("READY", "DEGRADED"):
321 error_description
= ""
323 if status_dict
.get("machines"):
324 for machine_id
in status_dict
.get("machines"):
325 machine
= status_dict
.get("machines").get(machine_id
)
326 # check machine agent-status
327 if machine
.get("agent-status"):
328 s
= machine
.get("agent-status").get("status")
331 error_description
+= (
332 "machine {} agent-status={} ; ".format(
336 # check machine instance status
337 if machine
.get("instance-status"):
338 s
= machine
.get("instance-status").get("status")
341 error_description
+= (
342 "machine {} instance-status={} ; ".format(
347 if status_dict
.get("applications"):
348 for app_id
in status_dict
.get("applications"):
349 app
= status_dict
.get("applications").get(app_id
)
350 # check application status
351 if app
.get("status"):
352 s
= app
.get("status").get("status")
355 error_description
+= (
356 "application {} status={} ; ".format(app_id
, s
)
359 if error_description
:
360 db_dict
["errorDescription"] = error_description
361 if current_ns_status
== "READY" and is_degraded
:
362 db_dict
["nsState"] = "DEGRADED"
363 if current_ns_status
== "DEGRADED" and not is_degraded
:
364 db_dict
["nsState"] = "READY"
367 self
.update_db_2("nsrs", nsr_id
, db_dict
)
369 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
371 except Exception as e
:
372 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
374 async def _on_update_k8s_db(
375 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None, cluster_type
="juju"
378 Updating vca status in NSR record
379 :param cluster_uuid: UUID of a k8s cluster
380 :param kdu_instance: The unique name of the KDU instance
381 :param filter: To get nsr_id
382 :cluster_type: The cluster type (juju, k8s)
386 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
387 # .format(cluster_uuid, kdu_instance, filter))
389 nsr_id
= filter.get("_id")
391 vca_status
= await self
.k8scluster_map
[cluster_type
].status_kdu(
392 cluster_uuid
=cluster_uuid
,
393 kdu_instance
=kdu_instance
,
395 complete_status
=True,
401 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
403 if cluster_type
in ("juju-bundle", "juju"):
404 # TODO -> this should be done in a more uniform way, I think in N2VC, in order to update the K8s VCA
405 # status in a similar way between Juju Bundles and Helm Charts on this side
406 await self
.k8sclusterjuju
.update_vca_status(
407 db_dict
["vcaStatus"],
413 f
"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
417 self
.update_db_2("nsrs", nsr_id
, db_dict
)
418 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
420 except Exception as e
:
421 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
424 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
426 env
= Environment(undefined
=StrictUndefined
, autoescape
=True)
427 template
= env
.from_string(cloud_init_text
)
428 return template
.render(additional_params
or {})
429 except UndefinedError
as e
:
431 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
432 "file, must be provided in the instantiation parameters inside the "
433 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
435 except (TemplateError
, TemplateNotFound
) as e
:
437 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
442 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
443 cloud_init_content
= cloud_init_file
= None
445 if vdu
.get("cloud-init-file"):
446 base_folder
= vnfd
["_admin"]["storage"]
447 if base_folder
["pkg-dir"]:
448 cloud_init_file
= "{}/{}/cloud_init/{}".format(
449 base_folder
["folder"],
450 base_folder
["pkg-dir"],
451 vdu
["cloud-init-file"],
454 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
455 base_folder
["folder"],
456 vdu
["cloud-init-file"],
458 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
459 cloud_init_content
= ci_file
.read()
460 elif vdu
.get("cloud-init"):
461 cloud_init_content
= vdu
["cloud-init"]
463 return cloud_init_content
464 except FsException
as e
:
466 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
467 vnfd
["id"], vdu
["id"], cloud_init_file
, e
471 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
473 (vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]), {}
475 additional_params
= vdur
.get("additionalParams")
476 return parse_yaml_strings(additional_params
)
478 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
480 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
481 :param vnfd: input vnfd
482 :param new_id: overrides vnf id if provided
483 :param additionalParams: Instantiation params for VNFs provided
484 :param nsrId: Id of the NSR
485 :return: copy of vnfd
487 vnfd_RO
= deepcopy(vnfd
)
488 # remove unused by RO configuration, monitoring, scaling and internal keys
489 vnfd_RO
.pop("_id", None)
490 vnfd_RO
.pop("_admin", None)
491 vnfd_RO
.pop("monitoring-param", None)
492 vnfd_RO
.pop("scaling-group-descriptor", None)
493 vnfd_RO
.pop("kdu", None)
494 vnfd_RO
.pop("k8s-cluster", None)
496 vnfd_RO
["id"] = new_id
498 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
499 for vdu
in get_iterable(vnfd_RO
, "vdu"):
500 vdu
.pop("cloud-init-file", None)
501 vdu
.pop("cloud-init", None)
505 def ip_profile_2_RO(ip_profile
):
506 RO_ip_profile
= deepcopy(ip_profile
)
507 if "dns-server" in RO_ip_profile
:
508 if isinstance(RO_ip_profile
["dns-server"], list):
509 RO_ip_profile
["dns-address"] = []
510 for ds
in RO_ip_profile
.pop("dns-server"):
511 RO_ip_profile
["dns-address"].append(ds
["address"])
513 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
514 if RO_ip_profile
.get("ip-version") == "ipv4":
515 RO_ip_profile
["ip-version"] = "IPv4"
516 if RO_ip_profile
.get("ip-version") == "ipv6":
517 RO_ip_profile
["ip-version"] = "IPv6"
518 if "dhcp-params" in RO_ip_profile
:
519 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
522 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
523 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
524 if db_vim
["_admin"]["operationalState"] != "ENABLED":
526 "VIM={} is not available. operationalState={}".format(
527 vim_account
, db_vim
["_admin"]["operationalState"]
530 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
533 def get_ro_wim_id_for_wim_account(self
, wim_account
):
534 if isinstance(wim_account
, str):
535 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
536 if db_wim
["_admin"]["operationalState"] != "ENABLED":
538 "WIM={} is not available. operationalState={}".format(
539 wim_account
, db_wim
["_admin"]["operationalState"]
542 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
547 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
549 db_vdu_push_list
= []
551 db_update
= {"_admin.modified": time()}
553 for vdu_id
, vdu_count
in vdu_create
.items():
557 for vdur
in reversed(db_vnfr
["vdur"])
558 if vdur
["vdu-id-ref"] == vdu_id
563 # Read the template saved in the db:
565 "No vdur in the database. Using the vdur-template to scale"
567 vdur_template
= db_vnfr
.get("vdur-template")
568 if not vdur_template
:
570 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
574 vdur
= vdur_template
[0]
575 # Delete a template from the database after using it
578 {"_id": db_vnfr
["_id"]},
580 pull
={"vdur-template": {"_id": vdur
["_id"]}},
582 for count
in range(vdu_count
):
583 vdur_copy
= deepcopy(vdur
)
584 vdur_copy
["status"] = "BUILD"
585 vdur_copy
["status-detailed"] = None
586 vdur_copy
["ip-address"] = None
587 vdur_copy
["_id"] = str(uuid4())
588 vdur_copy
["count-index"] += count
+ 1
589 vdur_copy
["id"] = "{}-{}".format(
590 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
592 vdur_copy
.pop("vim_info", None)
593 for iface
in vdur_copy
["interfaces"]:
594 if iface
.get("fixed-ip"):
595 iface
["ip-address"] = self
.increment_ip_mac(
596 iface
["ip-address"], count
+ 1
599 iface
.pop("ip-address", None)
600 if iface
.get("fixed-mac"):
601 iface
["mac-address"] = self
.increment_ip_mac(
602 iface
["mac-address"], count
+ 1
605 iface
.pop("mac-address", None)
609 ) # only first vdu can be managment of vnf
610 db_vdu_push_list
.append(vdur_copy
)
611 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
613 if len(db_vnfr
["vdur"]) == 1:
614 # The scale will move to 0 instances
616 "Scaling to 0 !, creating the template with the last vdur"
618 template_vdur
= [db_vnfr
["vdur"][0]]
619 for vdu_id
, vdu_count
in vdu_delete
.items():
621 indexes_to_delete
= [
623 for iv
in enumerate(db_vnfr
["vdur"])
624 if iv
[1]["vdu-id-ref"] == vdu_id
628 "vdur.{}.status".format(i
): "DELETING"
629 for i
in indexes_to_delete
[-vdu_count
:]
633 # it must be deleted one by one because common.db does not allow otherwise
636 for v
in reversed(db_vnfr
["vdur"])
637 if v
["vdu-id-ref"] == vdu_id
639 for vdu
in vdus_to_delete
[:vdu_count
]:
642 {"_id": db_vnfr
["_id"]},
644 pull
={"vdur": {"_id": vdu
["_id"]}},
648 db_push
["vdur"] = db_vdu_push_list
650 db_push
["vdur-template"] = template_vdur
653 db_vnfr
["vdur-template"] = template_vdur
654 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
655 # modify passed dictionary db_vnfr
656 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
657 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
659 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
661 Updates database nsr with the RO info for the created vld
662 :param ns_update_nsr: dictionary to be filled with the updated info
663 :param db_nsr: content of db_nsr. This is also modified
664 :param nsr_desc_RO: nsr descriptor from RO
665 :return: Nothing, LcmException is raised on errors
668 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
669 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
670 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
672 vld
["vim-id"] = net_RO
.get("vim_net_id")
673 vld
["name"] = net_RO
.get("vim_name")
674 vld
["status"] = net_RO
.get("status")
675 vld
["status-detailed"] = net_RO
.get("error_msg")
676 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
680 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
683 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
685 for db_vnfr
in db_vnfrs
.values():
686 vnfr_update
= {"status": "ERROR"}
687 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
688 if "status" not in vdur
:
689 vdur
["status"] = "ERROR"
690 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
692 vdur
["status-detailed"] = str(error_text
)
694 "vdur.{}.status-detailed".format(vdu_index
)
696 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
697 except DbException
as e
:
698 self
.logger
.error("Cannot update vnf. {}".format(e
))
700 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
702 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
703 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
704 :param nsr_desc_RO: nsr descriptor from RO
705 :return: Nothing, LcmException is raised on errors
707 for vnf_index
, db_vnfr
in db_vnfrs
.items():
708 for vnf_RO
in nsr_desc_RO
["vnfs"]:
709 if vnf_RO
["member_vnf_index"] != vnf_index
:
712 if vnf_RO
.get("ip_address"):
713 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
[
716 elif not db_vnfr
.get("ip-address"):
717 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
718 raise LcmExceptionNoMgmtIP(
719 "ns member_vnf_index '{}' has no IP address".format(
724 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
725 vdur_RO_count_index
= 0
726 if vdur
.get("pdu-type"):
728 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
729 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
731 if vdur
["count-index"] != vdur_RO_count_index
:
732 vdur_RO_count_index
+= 1
734 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
735 if vdur_RO
.get("ip_address"):
736 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
738 vdur
["ip-address"] = None
739 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
740 vdur
["name"] = vdur_RO
.get("vim_name")
741 vdur
["status"] = vdur_RO
.get("status")
742 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
743 for ifacer
in get_iterable(vdur
, "interfaces"):
744 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
745 if ifacer
["name"] == interface_RO
.get("internal_name"):
746 ifacer
["ip-address"] = interface_RO
.get(
749 ifacer
["mac-address"] = interface_RO
.get(
755 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
756 "from VIM info".format(
757 vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]
760 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
764 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
766 vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]
770 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
771 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
772 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
774 vld
["vim-id"] = net_RO
.get("vim_net_id")
775 vld
["name"] = net_RO
.get("vim_name")
776 vld
["status"] = net_RO
.get("status")
777 vld
["status-detailed"] = net_RO
.get("error_msg")
778 vnfr_update
["vld.{}".format(vld_index
)] = vld
782 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
787 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
792 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
797 def _get_ns_config_info(self
, nsr_id
):
799 Generates a mapping between vnf,vdu elements and the N2VC id
800 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
801 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
802 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
803 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
805 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
806 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
808 ns_config_info
= {"osm-config-mapping": mapping
}
809 for vca
in vca_deployed_list
:
810 if not vca
["member-vnf-index"]:
812 if not vca
["vdu_id"]:
813 mapping
[vca
["member-vnf-index"]] = vca
["application"]
817 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
819 ] = vca
["application"]
820 return ns_config_info
822 async def _instantiate_ng_ro(
839 def get_vim_account(vim_account_id
):
841 if vim_account_id
in db_vims
:
842 return db_vims
[vim_account_id
]
843 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
844 db_vims
[vim_account_id
] = db_vim
847 # modify target_vld info with instantiation parameters
848 def parse_vld_instantiation_params(
849 target_vim
, target_vld
, vld_params
, target_sdn
851 if vld_params
.get("ip-profile"):
852 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
[
855 if vld_params
.get("provider-network"):
856 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
859 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
860 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
863 if vld_params
.get("wimAccountId"):
864 target_wim
= "wim:{}".format(vld_params
["wimAccountId"])
865 target_vld
["vim_info"][target_wim
] = {}
866 for param
in ("vim-network-name", "vim-network-id"):
867 if vld_params
.get(param
):
868 if isinstance(vld_params
[param
], dict):
869 for vim
, vim_net
in vld_params
[param
].items():
870 other_target_vim
= "vim:" + vim
872 target_vld
["vim_info"],
873 (other_target_vim
, param
.replace("-", "_")),
876 else: # isinstance str
877 target_vld
["vim_info"][target_vim
][
878 param
.replace("-", "_")
879 ] = vld_params
[param
]
880 if vld_params
.get("common_id"):
881 target_vld
["common_id"] = vld_params
.get("common_id")
883 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
884 def update_ns_vld_target(target
, ns_params
):
885 for vnf_params
in ns_params
.get("vnf", ()):
886 if vnf_params
.get("vimAccountId"):
890 for vnfr
in db_vnfrs
.values()
891 if vnf_params
["member-vnf-index"]
892 == vnfr
["member-vnf-index-ref"]
896 vdur
= next((vdur
for vdur
in target_vnf
.get("vdur", ())), None)
897 for a_index
, a_vld
in enumerate(target
["ns"]["vld"]):
898 target_vld
= find_in_list(
899 get_iterable(vdur
, "interfaces"),
900 lambda iface
: iface
.get("ns-vld-id") == a_vld
["name"],
903 vld_params
= find_in_list(
904 get_iterable(ns_params
, "vld"),
905 lambda v_vld
: v_vld
["name"] in (a_vld
["name"], a_vld
["id"]),
909 if vnf_params
.get("vimAccountId") not in a_vld
.get(
912 target_vim_network_list
= [
913 v
for _
, v
in a_vld
.get("vim_info").items()
915 target_vim_network_name
= next(
917 item
.get("vim_network_name", "")
918 for item
in target_vim_network_list
923 target
["ns"]["vld"][a_index
].get("vim_info").update(
925 "vim:{}".format(vnf_params
["vimAccountId"]): {
926 "vim_network_name": target_vim_network_name
,
932 for param
in ("vim-network-name", "vim-network-id"):
933 if vld_params
.get(param
) and isinstance(
934 vld_params
[param
], dict
936 for vim
, vim_net
in vld_params
[
939 other_target_vim
= "vim:" + vim
941 target
["ns"]["vld"][a_index
].get(
946 param
.replace("-", "_"),
951 nslcmop_id
= db_nslcmop
["_id"]
953 "name": db_nsr
["name"],
956 "image": deepcopy(db_nsr
["image"]),
957 "flavor": deepcopy(db_nsr
["flavor"]),
958 "action_id": nslcmop_id
,
959 "cloud_init_content": {},
961 for image
in target
["image"]:
962 image
["vim_info"] = {}
963 for flavor
in target
["flavor"]:
964 flavor
["vim_info"] = {}
965 if db_nsr
.get("affinity-or-anti-affinity-group"):
966 target
["affinity-or-anti-affinity-group"] = deepcopy(
967 db_nsr
["affinity-or-anti-affinity-group"]
969 for affinity_or_anti_affinity_group
in target
[
970 "affinity-or-anti-affinity-group"
972 affinity_or_anti_affinity_group
["vim_info"] = {}
974 if db_nslcmop
.get("lcmOperationType") != "instantiate":
975 # get parameters of instantiation:
976 db_nslcmop_instantiate
= self
.db
.get_list(
979 "nsInstanceId": db_nslcmop
["nsInstanceId"],
980 "lcmOperationType": "instantiate",
983 ns_params
= db_nslcmop_instantiate
.get("operationParams")
985 ns_params
= db_nslcmop
.get("operationParams")
986 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
987 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
990 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
991 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
995 "mgmt-network": vld
.get("mgmt-network", False),
996 "type": vld
.get("type"),
999 "vim_network_name": vld
.get("vim-network-name"),
1000 "vim_account_id": ns_params
["vimAccountId"],
1004 # check if this network needs SDN assist
1005 if vld
.get("pci-interfaces"):
1006 db_vim
= get_vim_account(ns_params
["vimAccountId"])
1007 sdnc_id
= db_vim
["config"].get("sdn-controller")
1009 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
1010 target_sdn
= "sdn:{}".format(sdnc_id
)
1011 target_vld
["vim_info"][target_sdn
] = {
1013 "target_vim": target_vim
,
1015 "type": vld
.get("type"),
1018 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
1019 for nsd_vnf_profile
in nsd_vnf_profiles
:
1020 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
1021 if cp
["virtual-link-profile-id"] == vld
["id"]:
1023 "member_vnf:{}.{}".format(
1024 cp
["constituent-cpd-id"][0][
1025 "constituent-base-element-id"
1027 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
1029 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
1031 # check at nsd descriptor, if there is an ip-profile
1033 nsd_vlp
= find_in_list(
1034 get_virtual_link_profiles(nsd
),
1035 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
1040 and nsd_vlp
.get("virtual-link-protocol-data")
1041 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1043 ip_profile_source_data
= nsd_vlp
["virtual-link-protocol-data"][
1046 ip_profile_dest_data
= {}
1047 if "ip-version" in ip_profile_source_data
:
1048 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1051 if "cidr" in ip_profile_source_data
:
1052 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1055 if "gateway-ip" in ip_profile_source_data
:
1056 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
[
1059 if "dhcp-enabled" in ip_profile_source_data
:
1060 ip_profile_dest_data
["dhcp-params"] = {
1061 "enabled": ip_profile_source_data
["dhcp-enabled"]
1063 vld_params
["ip-profile"] = ip_profile_dest_data
1065 # update vld_params with instantiation params
1066 vld_instantiation_params
= find_in_list(
1067 get_iterable(ns_params
, "vld"),
1068 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
1070 if vld_instantiation_params
:
1071 vld_params
.update(vld_instantiation_params
)
1072 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
1073 target
["ns"]["vld"].append(target_vld
)
1074 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1075 update_ns_vld_target(target
, ns_params
)
1077 for vnfr
in db_vnfrs
.values():
1078 vnfd
= find_in_list(
1079 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
1081 vnf_params
= find_in_list(
1082 get_iterable(ns_params
, "vnf"),
1083 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
1085 target_vnf
= deepcopy(vnfr
)
1086 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
1087 for vld
in target_vnf
.get("vld", ()):
1088 # check if connected to a ns.vld, to fill target'
1089 vnf_cp
= find_in_list(
1090 vnfd
.get("int-virtual-link-desc", ()),
1091 lambda cpd
: cpd
.get("id") == vld
["id"],
1094 ns_cp
= "member_vnf:{}.{}".format(
1095 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
1097 if cp2target
.get(ns_cp
):
1098 vld
["target"] = cp2target
[ns_cp
]
1101 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
1103 # check if this network needs SDN assist
1105 if vld
.get("pci-interfaces"):
1106 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1107 sdnc_id
= db_vim
["config"].get("sdn-controller")
1109 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
1110 target_sdn
= "sdn:{}".format(sdnc_id
)
1111 vld
["vim_info"][target_sdn
] = {
1113 "target_vim": target_vim
,
1115 "type": vld
.get("type"),
1118 # check at vnfd descriptor, if there is an ip-profile
1120 vnfd_vlp
= find_in_list(
1121 get_virtual_link_profiles(vnfd
),
1122 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
1126 and vnfd_vlp
.get("virtual-link-protocol-data")
1127 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1129 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"][
1132 ip_profile_dest_data
= {}
1133 if "ip-version" in ip_profile_source_data
:
1134 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1137 if "cidr" in ip_profile_source_data
:
1138 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1141 if "gateway-ip" in ip_profile_source_data
:
1142 ip_profile_dest_data
[
1144 ] = ip_profile_source_data
["gateway-ip"]
1145 if "dhcp-enabled" in ip_profile_source_data
:
1146 ip_profile_dest_data
["dhcp-params"] = {
1147 "enabled": ip_profile_source_data
["dhcp-enabled"]
1150 vld_params
["ip-profile"] = ip_profile_dest_data
1151 # update vld_params with instantiation params
1153 vld_instantiation_params
= find_in_list(
1154 get_iterable(vnf_params
, "internal-vld"),
1155 lambda i_vld
: i_vld
["name"] == vld
["id"],
1157 if vld_instantiation_params
:
1158 vld_params
.update(vld_instantiation_params
)
1159 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
1162 for vdur
in target_vnf
.get("vdur", ()):
1163 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
1164 continue # This vdu must not be created
1165 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
1167 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
1170 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
1171 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
1174 and vdu_configuration
.get("config-access")
1175 and vdu_configuration
.get("config-access").get("ssh-access")
1177 vdur
["ssh-keys"] = ssh_keys_all
1178 vdur
["ssh-access-required"] = vdu_configuration
[
1180 ]["ssh-access"]["required"]
1183 and vnf_configuration
.get("config-access")
1184 and vnf_configuration
.get("config-access").get("ssh-access")
1185 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
1187 vdur
["ssh-keys"] = ssh_keys_all
1188 vdur
["ssh-access-required"] = vnf_configuration
[
1190 ]["ssh-access"]["required"]
1191 elif ssh_keys_instantiation
and find_in_list(
1192 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
1194 vdur
["ssh-keys"] = ssh_keys_instantiation
1196 self
.logger
.debug("NS > vdur > {}".format(vdur
))
1198 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1200 if vdud
.get("cloud-init-file"):
1201 vdur
["cloud-init"] = "{}:file:{}".format(
1202 vnfd
["_id"], vdud
.get("cloud-init-file")
1204 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1205 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1206 base_folder
= vnfd
["_admin"]["storage"]
1207 if base_folder
["pkg-dir"]:
1208 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1209 base_folder
["folder"],
1210 base_folder
["pkg-dir"],
1211 vdud
.get("cloud-init-file"),
1214 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
1215 base_folder
["folder"],
1216 vdud
.get("cloud-init-file"),
1218 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1219 target
["cloud_init_content"][
1222 elif vdud
.get("cloud-init"):
1223 vdur
["cloud-init"] = "{}:vdu:{}".format(
1224 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1226 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1227 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1230 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1231 deploy_params_vdu
= self
._format
_additional
_params
(
1232 vdur
.get("additionalParams") or {}
1234 deploy_params_vdu
["OSM"] = get_osm_params(
1235 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1237 vdur
["additionalParams"] = deploy_params_vdu
1240 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1241 if target_vim
not in ns_flavor
["vim_info"]:
1242 ns_flavor
["vim_info"][target_vim
] = {}
1245 # in case alternative images are provided we must check if they should be applied
1246 # for the vim_type, modify the vim_type taking into account
1247 ns_image_id
= int(vdur
["ns-image-id"])
1248 if vdur
.get("alt-image-ids"):
1249 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1250 vim_type
= db_vim
["vim_type"]
1251 for alt_image_id
in vdur
.get("alt-image-ids"):
1252 ns_alt_image
= target
["image"][int(alt_image_id
)]
1253 if vim_type
== ns_alt_image
.get("vim-type"):
1254 # must use alternative image
1256 "use alternative image id: {}".format(alt_image_id
)
1258 ns_image_id
= alt_image_id
1259 vdur
["ns-image-id"] = ns_image_id
1261 ns_image
= target
["image"][int(ns_image_id
)]
1262 if target_vim
not in ns_image
["vim_info"]:
1263 ns_image
["vim_info"][target_vim
] = {}
1266 if vdur
.get("affinity-or-anti-affinity-group-id"):
1267 for ags_id
in vdur
["affinity-or-anti-affinity-group-id"]:
1268 ns_ags
= target
["affinity-or-anti-affinity-group"][int(ags_id
)]
1269 if target_vim
not in ns_ags
["vim_info"]:
1270 ns_ags
["vim_info"][target_vim
] = {}
1272 vdur
["vim_info"] = {target_vim
: {}}
1273 # instantiation parameters
1275 vdu_instantiation_params
= find_in_list(
1276 get_iterable(vnf_params
, "vdu"),
1277 lambda i_vdu
: i_vdu
["id"] == vdud
["id"],
1279 if vdu_instantiation_params
:
1280 # Parse the vdu_volumes from the instantiation params
1281 vdu_volumes
= get_volumes_from_instantiation_params(
1282 vdu_instantiation_params
, vdud
1284 vdur
["additionalParams"]["OSM"]["vdu_volumes"] = vdu_volumes
1285 vdur_list
.append(vdur
)
1286 target_vnf
["vdur"] = vdur_list
1287 target
["vnf"].append(target_vnf
)
1289 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
1290 desc
= await self
.RO
.deploy(nsr_id
, target
)
1291 self
.logger
.debug("RO return > {}".format(desc
))
1292 action_id
= desc
["action_id"]
1293 await self
._wait
_ng
_ro
(
1294 nsr_id
, action_id
, nslcmop_id
, start_deploy
, timeout_ns_deploy
, stage
,
1295 operation
="instantiation"
1300 "_admin.deployed.RO.operational-status": "running",
1301 "detailed-status": " ".join(stage
),
1303 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1304 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1305 self
._write
_op
_status
(nslcmop_id
, stage
)
1307 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1311 async def _wait_ng_ro(
1321 detailed_status_old
= None
1323 start_time
= start_time
or time()
1324 while time() <= start_time
+ timeout
:
1325 desc_status
= await self
.op_status_map
[operation
](nsr_id
, action_id
)
1326 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1327 if desc_status
["status"] == "FAILED":
1328 raise NgRoException(desc_status
["details"])
1329 elif desc_status
["status"] == "BUILD":
1331 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1332 elif desc_status
["status"] == "DONE":
1334 stage
[2] = "Deployed at VIM"
1337 assert False, "ROclient.check_ns_status returns unknown {}".format(
1338 desc_status
["status"]
1340 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1341 detailed_status_old
= stage
[2]
1342 db_nsr_update
["detailed-status"] = " ".join(stage
)
1343 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1344 self
._write
_op
_status
(nslcmop_id
, stage
)
1345 await asyncio
.sleep(15, loop
=self
.loop
)
1346 else: # timeout_ns_deploy
1347 raise NgRoException("Timeout waiting ns to deploy")
1349 async def _terminate_ng_ro(
1350 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1355 start_deploy
= time()
1362 "action_id": nslcmop_id
,
1364 desc
= await self
.RO
.deploy(nsr_id
, target
)
1365 action_id
= desc
["action_id"]
1366 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1367 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1370 + "ns terminate action at RO. action_id={}".format(action_id
)
1374 delete_timeout
= 20 * 60 # 20 minutes
1375 await self
._wait
_ng
_ro
(
1376 nsr_id
, action_id
, nslcmop_id
, start_deploy
, delete_timeout
, stage
,
1377 operation
="termination"
1380 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1381 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1383 await self
.RO
.delete(nsr_id
)
1384 except Exception as e
:
1385 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
1386 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1387 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1388 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1390 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1392 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
1393 failed_detail
.append("delete conflict: {}".format(e
))
1396 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1399 failed_detail
.append("delete error: {}".format(e
))
1402 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1406 stage
[2] = "Error deleting from VIM"
1408 stage
[2] = "Deleted from VIM"
1409 db_nsr_update
["detailed-status"] = " ".join(stage
)
1410 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1411 self
._write
_op
_status
(nslcmop_id
, stage
)
1414 raise LcmException("; ".join(failed_detail
))
1417 async def instantiate_RO(
1431 :param logging_text: preffix text to use at logging
1432 :param nsr_id: nsr identity
1433 :param nsd: database content of ns descriptor
1434 :param db_nsr: database content of ns record
1435 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1437 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1438 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1439 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1440 :return: None or exception
1443 start_deploy
= time()
1444 ns_params
= db_nslcmop
.get("operationParams")
1445 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1446 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1448 timeout_ns_deploy
= self
.timeout
.get(
1449 "ns_deploy", self
.timeout_ns_deploy
1452 # Check for and optionally request placement optimization. Database will be updated if placement activated
1453 stage
[2] = "Waiting for Placement."
1454 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1455 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1456 for vnfr
in db_vnfrs
.values():
1457 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1460 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1462 return await self
._instantiate
_ng
_ro
(
1475 except Exception as e
:
1476 stage
[2] = "ERROR deploying at VIM"
1477 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1479 "Error deploying at VIM {}".format(e
),
1480 exc_info
=not isinstance(
1483 ROclient
.ROClientException
,
1492 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1494 Wait for kdu to be up, get ip address
1495 :param logging_text: prefix use for logging
1499 :return: IP address, K8s services
1502 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1505 while nb_tries
< 360:
1506 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1510 for x
in get_iterable(db_vnfr
, "kdur")
1511 if x
.get("kdu-name") == kdu_name
1517 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1519 if kdur
.get("status"):
1520 if kdur
["status"] in ("READY", "ENABLED"):
1521 return kdur
.get("ip-address"), kdur
.get("services")
1524 "target KDU={} is in error state".format(kdu_name
)
1527 await asyncio
.sleep(10, loop
=self
.loop
)
1529 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1531 async def wait_vm_up_insert_key_ro(
1532 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1535 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1536 :param logging_text: prefix use for logging
1541 :param pub_key: public ssh key to inject, None to skip
1542 :param user: user to apply the public ssh key
1546 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1550 target_vdu_id
= None
1556 if ro_retries
>= 360: # 1 hour
1558 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1561 await asyncio
.sleep(10, loop
=self
.loop
)
1564 if not target_vdu_id
:
1565 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1567 if not vdu_id
: # for the VNF case
1568 if db_vnfr
.get("status") == "ERROR":
1570 "Cannot inject ssh-key because target VNF is in error state"
1572 ip_address
= db_vnfr
.get("ip-address")
1578 for x
in get_iterable(db_vnfr
, "vdur")
1579 if x
.get("ip-address") == ip_address
1587 for x
in get_iterable(db_vnfr
, "vdur")
1588 if x
.get("vdu-id-ref") == vdu_id
1589 and x
.get("count-index") == vdu_index
1595 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1596 ): # If only one, this should be the target vdu
1597 vdur
= db_vnfr
["vdur"][0]
1600 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1601 vnfr_id
, vdu_id
, vdu_index
1604 # New generation RO stores information at "vim_info"
1607 if vdur
.get("vim_info"):
1609 t
for t
in vdur
["vim_info"]
1610 ) # there should be only one key
1611 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1613 vdur
.get("pdu-type")
1614 or vdur
.get("status") == "ACTIVE"
1615 or ng_ro_status
== "ACTIVE"
1617 ip_address
= vdur
.get("ip-address")
1620 target_vdu_id
= vdur
["vdu-id-ref"]
1621 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1623 "Cannot inject ssh-key because target VM is in error state"
1626 if not target_vdu_id
:
1629 # inject public key into machine
1630 if pub_key
and user
:
1631 self
.logger
.debug(logging_text
+ "Inserting RO key")
1632 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1633 if vdur
.get("pdu-type"):
1634 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1637 ro_vm_id
= "{}-{}".format(
1638 db_vnfr
["member-vnf-index-ref"], target_vdu_id
1639 ) # TODO add vdu_index
1643 "action": "inject_ssh_key",
1647 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1649 desc
= await self
.RO
.deploy(nsr_id
, target
)
1650 action_id
= desc
["action_id"]
1651 await self
._wait
_ng
_ro
(nsr_id
, action_id
, timeout
=600, operation
="instantiation")
1654 # wait until NS is deployed at RO
1656 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1657 ro_nsr_id
= deep_get(
1658 db_nsrs
, ("_admin", "deployed", "RO", "nsr_id")
1662 result_dict
= await self
.RO
.create_action(
1664 item_id_name
=ro_nsr_id
,
1666 "add_public_key": pub_key
,
1671 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1672 if not result_dict
or not isinstance(result_dict
, dict):
1674 "Unknown response from RO when injecting key"
1676 for result
in result_dict
.values():
1677 if result
.get("vim_result") == 200:
1680 raise ROclient
.ROClientException(
1681 "error injecting key: {}".format(
1682 result
.get("description")
1686 except NgRoException
as e
:
1688 "Reaching max tries injecting key. Error: {}".format(e
)
1690 except ROclient
.ROClientException
as e
:
1694 + "error injecting key: {}. Retrying until {} seconds".format(
1701 "Reaching max tries injecting key. Error: {}".format(e
)
1708 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1710 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1712 my_vca
= vca_deployed_list
[vca_index
]
1713 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1714 # vdu or kdu: no dependencies
1718 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1719 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1720 configuration_status_list
= db_nsr
["configurationStatus"]
1721 for index
, vca_deployed
in enumerate(configuration_status_list
):
1722 if index
== vca_index
:
1725 if not my_vca
.get("member-vnf-index") or (
1726 vca_deployed
.get("member-vnf-index")
1727 == my_vca
.get("member-vnf-index")
1729 internal_status
= configuration_status_list
[index
].get("status")
1730 if internal_status
== "READY":
1732 elif internal_status
== "BROKEN":
1734 "Configuration aborted because dependent charm/s has failed"
1739 # no dependencies, return
1741 await asyncio
.sleep(10)
1744 raise LcmException("Configuration aborted because dependent charm/s timeout")
1746 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1749 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1751 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1752 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1755 async def instantiate_N2VC(
1772 ee_config_descriptor
,
1774 nsr_id
= db_nsr
["_id"]
1775 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1776 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1777 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1778 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1780 "collection": "nsrs",
1781 "filter": {"_id": nsr_id
},
1782 "path": db_update_entry
,
1788 element_under_configuration
= nsr_id
1792 vnfr_id
= db_vnfr
["_id"]
1793 osm_config
["osm"]["vnf_id"] = vnfr_id
1795 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1797 if vca_type
== "native_charm":
1800 index_number
= vdu_index
or 0
1803 element_type
= "VNF"
1804 element_under_configuration
= vnfr_id
1805 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1807 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1808 element_type
= "VDU"
1809 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1810 osm_config
["osm"]["vdu_id"] = vdu_id
1812 namespace
+= ".{}".format(kdu_name
)
1813 element_type
= "KDU"
1814 element_under_configuration
= kdu_name
1815 osm_config
["osm"]["kdu_name"] = kdu_name
1818 if base_folder
["pkg-dir"]:
1819 artifact_path
= "{}/{}/{}/{}".format(
1820 base_folder
["folder"],
1821 base_folder
["pkg-dir"],
1824 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1829 artifact_path
= "{}/Scripts/{}/{}/".format(
1830 base_folder
["folder"],
1833 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1838 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1840 # get initial_config_primitive_list that applies to this element
1841 initial_config_primitive_list
= config_descriptor
.get(
1842 "initial-config-primitive"
1846 "Initial config primitive list > {}".format(
1847 initial_config_primitive_list
1851 # add config if not present for NS charm
1852 ee_descriptor_id
= ee_config_descriptor
.get("id")
1853 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1854 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1855 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1859 "Initial config primitive list #2 > {}".format(
1860 initial_config_primitive_list
1863 # n2vc_redesign STEP 3.1
1864 # find old ee_id if exists
1865 ee_id
= vca_deployed
.get("ee_id")
1867 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1868 # create or register execution environment in VCA
1869 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1871 self
._write
_configuration
_status
(
1873 vca_index
=vca_index
,
1875 element_under_configuration
=element_under_configuration
,
1876 element_type
=element_type
,
1879 step
= "create execution environment"
1880 self
.logger
.debug(logging_text
+ step
)
1884 if vca_type
== "k8s_proxy_charm":
1885 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1886 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1887 namespace
=namespace
,
1888 artifact_path
=artifact_path
,
1892 elif vca_type
== "helm" or vca_type
== "helm-v3":
1893 ee_id
, credentials
= await self
.vca_map
[
1895 ].create_execution_environment(
1896 namespace
=namespace
,
1900 artifact_path
=artifact_path
,
1904 ee_id
, credentials
= await self
.vca_map
[
1906 ].create_execution_environment(
1907 namespace
=namespace
,
1913 elif vca_type
== "native_charm":
1914 step
= "Waiting to VM being up and getting IP address"
1915 self
.logger
.debug(logging_text
+ step
)
1916 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1925 credentials
= {"hostname": rw_mgmt_ip
}
1927 username
= deep_get(
1928 config_descriptor
, ("config-access", "ssh-access", "default-user")
1930 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1931 # merged. Meanwhile let's get username from initial-config-primitive
1932 if not username
and initial_config_primitive_list
:
1933 for config_primitive
in initial_config_primitive_list
:
1934 for param
in config_primitive
.get("parameter", ()):
1935 if param
["name"] == "ssh-username":
1936 username
= param
["value"]
1940 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1941 "'config-access.ssh-access.default-user'"
1943 credentials
["username"] = username
1944 # n2vc_redesign STEP 3.2
1946 self
._write
_configuration
_status
(
1948 vca_index
=vca_index
,
1949 status
="REGISTERING",
1950 element_under_configuration
=element_under_configuration
,
1951 element_type
=element_type
,
1954 step
= "register execution environment {}".format(credentials
)
1955 self
.logger
.debug(logging_text
+ step
)
1956 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1957 credentials
=credentials
,
1958 namespace
=namespace
,
1963 # for compatibility with MON/POL modules, the need model and application name at database
1964 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1965 ee_id_parts
= ee_id
.split(".")
1966 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1967 if len(ee_id_parts
) >= 2:
1968 model_name
= ee_id_parts
[0]
1969 application_name
= ee_id_parts
[1]
1970 db_nsr_update
[db_update_entry
+ "model"] = model_name
1971 db_nsr_update
[db_update_entry
+ "application"] = application_name
1973 # n2vc_redesign STEP 3.3
1974 step
= "Install configuration Software"
1976 self
._write
_configuration
_status
(
1978 vca_index
=vca_index
,
1979 status
="INSTALLING SW",
1980 element_under_configuration
=element_under_configuration
,
1981 element_type
=element_type
,
1982 other_update
=db_nsr_update
,
1985 # TODO check if already done
1986 self
.logger
.debug(logging_text
+ step
)
1988 if vca_type
== "native_charm":
1989 config_primitive
= next(
1990 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
1993 if config_primitive
:
1994 config
= self
._map
_primitive
_params
(
1995 config_primitive
, {}, deploy_params
1998 if vca_type
== "lxc_proxy_charm":
1999 if element_type
== "NS":
2000 num_units
= db_nsr
.get("config-units") or 1
2001 elif element_type
== "VNF":
2002 num_units
= db_vnfr
.get("config-units") or 1
2003 elif element_type
== "VDU":
2004 for v
in db_vnfr
["vdur"]:
2005 if vdu_id
== v
["vdu-id-ref"]:
2006 num_units
= v
.get("config-units") or 1
2008 if vca_type
!= "k8s_proxy_charm":
2009 await self
.vca_map
[vca_type
].install_configuration_sw(
2011 artifact_path
=artifact_path
,
2014 num_units
=num_units
,
2019 # write in db flag of configuration_sw already installed
2021 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
2024 # add relations for this VCA (wait for other peers related with this VCA)
2025 await self
._add
_vca
_relations
(
2026 logging_text
=logging_text
,
2029 vca_index
=vca_index
,
2032 # if SSH access is required, then get execution environment SSH public
2033 # if native charm we have waited already to VM be UP
2034 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
2037 # self.logger.debug("get ssh key block")
2039 config_descriptor
, ("config-access", "ssh-access", "required")
2041 # self.logger.debug("ssh key needed")
2042 # Needed to inject a ssh key
2045 ("config-access", "ssh-access", "default-user"),
2047 step
= "Install configuration Software, getting public ssh key"
2048 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
2049 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
2052 step
= "Insert public key into VM user={} ssh_key={}".format(
2056 # self.logger.debug("no need to get ssh key")
2057 step
= "Waiting to VM being up and getting IP address"
2058 self
.logger
.debug(logging_text
+ step
)
2060 # default rw_mgmt_ip to None, avoiding the non definition of the variable
2063 # n2vc_redesign STEP 5.1
2064 # wait for RO (ip-address) Insert pub_key into VM
2067 rw_mgmt_ip
, services
= await self
.wait_kdu_up(
2068 logging_text
, nsr_id
, vnfr_id
, kdu_name
2070 vnfd
= self
.db
.get_one(
2072 {"_id": f
'{db_vnfr["vnfd-id"]}:{db_vnfr["revision"]}'},
2074 kdu
= get_kdu(vnfd
, kdu_name
)
2076 service
["name"] for service
in get_kdu_services(kdu
)
2078 exposed_services
= []
2079 for service
in services
:
2080 if any(s
in service
["name"] for s
in kdu_services
):
2081 exposed_services
.append(service
)
2082 await self
.vca_map
[vca_type
].exec_primitive(
2084 primitive_name
="config",
2086 "osm-config": json
.dumps(
2088 k8s
={"services": exposed_services
}
2095 # This verification is needed in order to avoid trying to add a public key
2096 # to a VM, when the VNF is a KNF (in the edge case where the user creates a VCA
2097 # for a KNF and not for its KDUs, the previous verification gives False, and the code
2098 # jumps to this block, meaning that there is the need to verify if the VNF is actually a VNF
2100 elif db_vnfr
.get('vdur'):
2101 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
2111 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
2113 # store rw_mgmt_ip in deploy params for later replacement
2114 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
2116 # n2vc_redesign STEP 6 Execute initial config primitive
2117 step
= "execute initial config primitive"
2119 # wait for dependent primitives execution (NS -> VNF -> VDU)
2120 if initial_config_primitive_list
:
2121 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
2123 # stage, in function of element type: vdu, kdu, vnf or ns
2124 my_vca
= vca_deployed_list
[vca_index
]
2125 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
2127 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
2128 elif my_vca
.get("member-vnf-index"):
2130 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
2133 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
2135 self
._write
_configuration
_status
(
2136 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
2139 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2141 check_if_terminated_needed
= True
2142 for initial_config_primitive
in initial_config_primitive_list
:
2143 # adding information on the vca_deployed if it is a NS execution environment
2144 if not vca_deployed
["member-vnf-index"]:
2145 deploy_params
["ns_config_info"] = json
.dumps(
2146 self
._get
_ns
_config
_info
(nsr_id
)
2148 # TODO check if already done
2149 primitive_params_
= self
._map
_primitive
_params
(
2150 initial_config_primitive
, {}, deploy_params
2153 step
= "execute primitive '{}' params '{}'".format(
2154 initial_config_primitive
["name"], primitive_params_
2156 self
.logger
.debug(logging_text
+ step
)
2157 await self
.vca_map
[vca_type
].exec_primitive(
2159 primitive_name
=initial_config_primitive
["name"],
2160 params_dict
=primitive_params_
,
2165 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2166 if check_if_terminated_needed
:
2167 if config_descriptor
.get("terminate-config-primitive"):
2169 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
2171 check_if_terminated_needed
= False
2173 # TODO register in database that primitive is done
2175 # STEP 7 Configure metrics
2176 if vca_type
== "helm" or vca_type
== "helm-v3":
2177 prometheus_jobs
= await self
.extract_prometheus_scrape_jobs(
2179 artifact_path
=artifact_path
,
2180 ee_config_descriptor
=ee_config_descriptor
,
2183 target_ip
=rw_mgmt_ip
,
2189 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
2192 for job
in prometheus_jobs
:
2195 {"job_name": job
["job_name"]},
2198 fail_on_empty
=False,
2201 step
= "instantiated at VCA"
2202 self
.logger
.debug(logging_text
+ step
)
2204 self
._write
_configuration
_status
(
2205 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
2208 except Exception as e
: # TODO not use Exception but N2VC exception
2209 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2211 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
2214 "Exception while {} : {}".format(step
, e
), exc_info
=True
2216 self
._write
_configuration
_status
(
2217 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2219 raise LcmException("{} {}".format(step
, e
)) from e
2221 def _write_ns_status(
2225 current_operation
: str,
2226 current_operation_id
: str,
2227 error_description
: str = None,
2228 error_detail
: str = None,
2229 other_update
: dict = None,
2232 Update db_nsr fields.
2235 :param current_operation:
2236 :param current_operation_id:
2237 :param error_description:
2238 :param error_detail:
2239 :param other_update: Other required changes at database if provided, will be cleared
2243 db_dict
= other_update
or {}
2246 ] = current_operation_id
# for backward compatibility
2247 db_dict
["_admin.current-operation"] = current_operation_id
2248 db_dict
["_admin.operation-type"] = (
2249 current_operation
if current_operation
!= "IDLE" else None
2251 db_dict
["currentOperation"] = current_operation
2252 db_dict
["currentOperationID"] = current_operation_id
2253 db_dict
["errorDescription"] = error_description
2254 db_dict
["errorDetail"] = error_detail
2257 db_dict
["nsState"] = ns_state
2258 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2259 except DbException
as e
:
2260 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2262 def _write_op_status(
2266 error_message
: str = None,
2267 queuePosition
: int = 0,
2268 operation_state
: str = None,
2269 other_update
: dict = None,
2272 db_dict
= other_update
or {}
2273 db_dict
["queuePosition"] = queuePosition
2274 if isinstance(stage
, list):
2275 db_dict
["stage"] = stage
[0]
2276 db_dict
["detailed-status"] = " ".join(stage
)
2277 elif stage
is not None:
2278 db_dict
["stage"] = str(stage
)
2280 if error_message
is not None:
2281 db_dict
["errorMessage"] = error_message
2282 if operation_state
is not None:
2283 db_dict
["operationState"] = operation_state
2284 db_dict
["statusEnteredTime"] = time()
2285 self
.update_db_2("nslcmops", op_id
, db_dict
)
2286 except DbException
as e
:
2288 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2291 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2293 nsr_id
= db_nsr
["_id"]
2294 # configurationStatus
2295 config_status
= db_nsr
.get("configurationStatus")
2298 "configurationStatus.{}.status".format(index
): status
2299 for index
, v
in enumerate(config_status
)
2303 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2305 except DbException
as e
:
2307 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2310 def _write_configuration_status(
2315 element_under_configuration
: str = None,
2316 element_type
: str = None,
2317 other_update
: dict = None,
2320 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2321 # .format(vca_index, status))
2324 db_path
= "configurationStatus.{}.".format(vca_index
)
2325 db_dict
= other_update
or {}
2327 db_dict
[db_path
+ "status"] = status
2328 if element_under_configuration
:
2330 db_path
+ "elementUnderConfiguration"
2331 ] = element_under_configuration
2333 db_dict
[db_path
+ "elementType"] = element_type
2334 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2335 except DbException
as e
:
2337 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2338 status
, nsr_id
, vca_index
, e
2342 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2344 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2345 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2346 Database is used because the result can be obtained from a different LCM worker in case of HA.
2347 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2348 :param db_nslcmop: database content of nslcmop
2349 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2350 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2351 computed 'vim-account-id'
2354 nslcmop_id
= db_nslcmop
["_id"]
2355 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2356 if placement_engine
== "PLA":
2358 logging_text
+ "Invoke and wait for placement optimization"
2360 await self
.msg
.aiowrite(
2361 "pla", "get_placement", {"nslcmopId": nslcmop_id
}, loop
=self
.loop
2363 db_poll_interval
= 5
2364 wait
= db_poll_interval
* 10
2366 while not pla_result
and wait
>= 0:
2367 await asyncio
.sleep(db_poll_interval
)
2368 wait
-= db_poll_interval
2369 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2370 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2374 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2377 for pla_vnf
in pla_result
["vnf"]:
2378 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2379 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2384 {"_id": vnfr
["_id"]},
2385 {"vim-account-id": pla_vnf
["vimAccountId"]},
2388 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2391 def update_nsrs_with_pla_result(self
, params
):
2393 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2395 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2397 except Exception as e
:
2398 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2400 async def instantiate(self
, nsr_id
, nslcmop_id
):
2403 :param nsr_id: ns instance to deploy
2404 :param nslcmop_id: operation to run
2408 # Try to lock HA task here
2409 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2410 if not task_is_locked_by_me
:
2412 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2416 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2417 self
.logger
.debug(logging_text
+ "Enter")
2419 # get all needed from database
2421 # database nsrs record
2424 # database nslcmops record
2427 # update operation on nsrs
2429 # update operation on nslcmops
2430 db_nslcmop_update
= {}
2432 nslcmop_operation_state
= None
2433 db_vnfrs
= {} # vnf's info indexed by member-index
2435 tasks_dict_info
= {} # from task to info text
2439 "Stage 1/5: preparation of the environment.",
2440 "Waiting for previous operations to terminate.",
2443 # ^ stage, step, VIM progress
2445 # wait for any previous tasks in process
2446 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2448 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2449 stage
[1] = "Reading from database."
2450 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2451 db_nsr_update
["detailed-status"] = "creating"
2452 db_nsr_update
["operational-status"] = "init"
2453 self
._write
_ns
_status
(
2455 ns_state
="BUILDING",
2456 current_operation
="INSTANTIATING",
2457 current_operation_id
=nslcmop_id
,
2458 other_update
=db_nsr_update
,
2460 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2462 # read from db: operation
2463 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2464 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2465 if db_nslcmop
["operationParams"].get("additionalParamsForVnf"):
2466 db_nslcmop
["operationParams"]["additionalParamsForVnf"] = json
.loads(
2467 db_nslcmop
["operationParams"]["additionalParamsForVnf"]
2469 ns_params
= db_nslcmop
.get("operationParams")
2470 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2471 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2473 timeout_ns_deploy
= self
.timeout
.get(
2474 "ns_deploy", self
.timeout_ns_deploy
2478 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2479 self
.logger
.debug(logging_text
+ stage
[1])
2480 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2481 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2482 self
.logger
.debug(logging_text
+ stage
[1])
2483 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2484 self
.fs
.sync(db_nsr
["nsd-id"])
2486 # nsr_name = db_nsr["name"] # TODO short-name??
2488 # read from db: vnf's of this ns
2489 stage
[1] = "Getting vnfrs from db."
2490 self
.logger
.debug(logging_text
+ stage
[1])
2491 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2493 # read from db: vnfd's for every vnf
2494 db_vnfds
= [] # every vnfd data
2496 # for each vnf in ns, read vnfd
2497 for vnfr
in db_vnfrs_list
:
2498 if vnfr
.get("kdur"):
2500 for kdur
in vnfr
["kdur"]:
2501 if kdur
.get("additionalParams"):
2502 kdur
["additionalParams"] = json
.loads(
2503 kdur
["additionalParams"]
2505 kdur_list
.append(kdur
)
2506 vnfr
["kdur"] = kdur_list
2508 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2509 vnfd_id
= vnfr
["vnfd-id"]
2510 vnfd_ref
= vnfr
["vnfd-ref"]
2511 self
.fs
.sync(vnfd_id
)
2513 # if we haven't this vnfd, read it from db
2514 if vnfd_id
not in db_vnfds
:
2516 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2519 self
.logger
.debug(logging_text
+ stage
[1])
2520 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2523 db_vnfds
.append(vnfd
)
2525 # Get or generates the _admin.deployed.VCA list
2526 vca_deployed_list
= None
2527 if db_nsr
["_admin"].get("deployed"):
2528 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2529 if vca_deployed_list
is None:
2530 vca_deployed_list
= []
2531 configuration_status_list
= []
2532 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2533 db_nsr_update
["configurationStatus"] = configuration_status_list
2534 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2535 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2536 elif isinstance(vca_deployed_list
, dict):
2537 # maintain backward compatibility. Change a dict to list at database
2538 vca_deployed_list
= list(vca_deployed_list
.values())
2539 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2540 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2543 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2545 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2546 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2548 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2549 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2550 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2552 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2555 # n2vc_redesign STEP 2 Deploy Network Scenario
2556 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2557 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2559 stage
[1] = "Deploying KDUs."
2560 # self.logger.debug(logging_text + "Before deploy_kdus")
2561 # Call to deploy_kdus in case exists the "vdu:kdu" param
2562 await self
.deploy_kdus(
2563 logging_text
=logging_text
,
2565 nslcmop_id
=nslcmop_id
,
2568 task_instantiation_info
=tasks_dict_info
,
2571 stage
[1] = "Getting VCA public key."
2572 # n2vc_redesign STEP 1 Get VCA public ssh-key
2573 # feature 1429. Add n2vc public key to needed VMs
2574 n2vc_key
= self
.n2vc
.get_public_key()
2575 n2vc_key_list
= [n2vc_key
]
2576 if self
.vca_config
.get("public_key"):
2577 n2vc_key_list
.append(self
.vca_config
["public_key"])
2579 stage
[1] = "Deploying NS at VIM."
2580 task_ro
= asyncio
.ensure_future(
2581 self
.instantiate_RO(
2582 logging_text
=logging_text
,
2586 db_nslcmop
=db_nslcmop
,
2589 n2vc_key_list
=n2vc_key_list
,
2593 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2594 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2596 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2597 stage
[1] = "Deploying Execution Environments."
2598 self
.logger
.debug(logging_text
+ stage
[1])
2600 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2601 for vnf_profile
in get_vnf_profiles(nsd
):
2602 vnfd_id
= vnf_profile
["vnfd-id"]
2603 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2604 member_vnf_index
= str(vnf_profile
["id"])
2605 db_vnfr
= db_vnfrs
[member_vnf_index
]
2606 base_folder
= vnfd
["_admin"]["storage"]
2612 # Get additional parameters
2613 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2614 if db_vnfr
.get("additionalParamsForVnf"):
2615 deploy_params
.update(
2616 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2619 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2620 if descriptor_config
:
2622 logging_text
=logging_text
2623 + "member_vnf_index={} ".format(member_vnf_index
),
2626 nslcmop_id
=nslcmop_id
,
2632 member_vnf_index
=member_vnf_index
,
2633 vdu_index
=vdu_index
,
2635 deploy_params
=deploy_params
,
2636 descriptor_config
=descriptor_config
,
2637 base_folder
=base_folder
,
2638 task_instantiation_info
=tasks_dict_info
,
2642 # Deploy charms for each VDU that supports one.
2643 for vdud
in get_vdu_list(vnfd
):
2645 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2646 vdur
= find_in_list(
2647 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2650 if vdur
.get("additionalParams"):
2651 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2653 deploy_params_vdu
= deploy_params
2654 deploy_params_vdu
["OSM"] = get_osm_params(
2655 db_vnfr
, vdu_id
, vdu_count_index
=0
2657 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2659 self
.logger
.debug("VDUD > {}".format(vdud
))
2661 "Descriptor config > {}".format(descriptor_config
)
2663 if descriptor_config
:
2666 for vdu_index
in range(vdud_count
):
2667 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2669 logging_text
=logging_text
2670 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2671 member_vnf_index
, vdu_id
, vdu_index
2675 nslcmop_id
=nslcmop_id
,
2681 member_vnf_index
=member_vnf_index
,
2682 vdu_index
=vdu_index
,
2684 deploy_params
=deploy_params_vdu
,
2685 descriptor_config
=descriptor_config
,
2686 base_folder
=base_folder
,
2687 task_instantiation_info
=tasks_dict_info
,
2690 for kdud
in get_kdu_list(vnfd
):
2691 kdu_name
= kdud
["name"]
2692 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2693 if descriptor_config
:
2698 x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
2700 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2701 if kdur
.get("additionalParams"):
2702 deploy_params_kdu
.update(
2703 parse_yaml_strings(kdur
["additionalParams"].copy())
2707 logging_text
=logging_text
,
2710 nslcmop_id
=nslcmop_id
,
2716 member_vnf_index
=member_vnf_index
,
2717 vdu_index
=vdu_index
,
2719 deploy_params
=deploy_params_kdu
,
2720 descriptor_config
=descriptor_config
,
2721 base_folder
=base_folder
,
2722 task_instantiation_info
=tasks_dict_info
,
2726 # Check if this NS has a charm configuration
2727 descriptor_config
= nsd
.get("ns-configuration")
2728 if descriptor_config
and descriptor_config
.get("juju"):
2731 member_vnf_index
= None
2737 # Get additional parameters
2738 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2739 if db_nsr
.get("additionalParamsForNs"):
2740 deploy_params
.update(
2741 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2743 base_folder
= nsd
["_admin"]["storage"]
2745 logging_text
=logging_text
,
2748 nslcmop_id
=nslcmop_id
,
2754 member_vnf_index
=member_vnf_index
,
2755 vdu_index
=vdu_index
,
2757 deploy_params
=deploy_params
,
2758 descriptor_config
=descriptor_config
,
2759 base_folder
=base_folder
,
2760 task_instantiation_info
=tasks_dict_info
,
2764 # rest of staff will be done at finally
2767 ROclient
.ROClientException
,
2773 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
2776 except asyncio
.CancelledError
:
2778 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
2780 exc
= "Operation was cancelled"
2781 except Exception as e
:
2782 exc
= traceback
.format_exc()
2783 self
.logger
.critical(
2784 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
2789 error_list
.append(str(exc
))
2791 # wait for pending tasks
2793 stage
[1] = "Waiting for instantiate pending tasks."
2794 self
.logger
.debug(logging_text
+ stage
[1])
2795 error_list
+= await self
._wait
_for
_tasks
(
2803 stage
[1] = stage
[2] = ""
2804 except asyncio
.CancelledError
:
2805 error_list
.append("Cancelled")
2806 # TODO cancel all tasks
2807 except Exception as exc
:
2808 error_list
.append(str(exc
))
2810 # update operation-status
2811 db_nsr_update
["operational-status"] = "running"
2812 # let's begin with VCA 'configured' status (later we can change it)
2813 db_nsr_update
["config-status"] = "configured"
2814 for task
, task_name
in tasks_dict_info
.items():
2815 if not task
.done() or task
.cancelled() or task
.exception():
2816 if task_name
.startswith(self
.task_name_deploy_vca
):
2817 # A N2VC task is pending
2818 db_nsr_update
["config-status"] = "failed"
2820 # RO or KDU task is pending
2821 db_nsr_update
["operational-status"] = "failed"
2823 # update status at database
2825 error_detail
= ". ".join(error_list
)
2826 self
.logger
.error(logging_text
+ error_detail
)
2827 error_description_nslcmop
= "{} Detail: {}".format(
2828 stage
[0], error_detail
2830 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
2831 nslcmop_id
, stage
[0]
2834 db_nsr_update
["detailed-status"] = (
2835 error_description_nsr
+ " Detail: " + error_detail
2837 db_nslcmop_update
["detailed-status"] = error_detail
2838 nslcmop_operation_state
= "FAILED"
2842 error_description_nsr
= error_description_nslcmop
= None
2844 db_nsr_update
["detailed-status"] = "Done"
2845 db_nslcmop_update
["detailed-status"] = "Done"
2846 nslcmop_operation_state
= "COMPLETED"
2849 self
._write
_ns
_status
(
2852 current_operation
="IDLE",
2853 current_operation_id
=None,
2854 error_description
=error_description_nsr
,
2855 error_detail
=error_detail
,
2856 other_update
=db_nsr_update
,
2858 self
._write
_op
_status
(
2861 error_message
=error_description_nslcmop
,
2862 operation_state
=nslcmop_operation_state
,
2863 other_update
=db_nslcmop_update
,
2866 if nslcmop_operation_state
:
2868 await self
.msg
.aiowrite(
2873 "nslcmop_id": nslcmop_id
,
2874 "operationState": nslcmop_operation_state
,
2878 except Exception as e
:
2880 logging_text
+ "kafka_write notification Exception {}".format(e
)
2883 self
.logger
.debug(logging_text
+ "Exit")
2884 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2886 def _get_vnfd(self
, vnfd_id
: str, cached_vnfds
: Dict
[str, Any
]):
2887 if vnfd_id
not in cached_vnfds
:
2888 cached_vnfds
[vnfd_id
] = self
.db
.get_one("vnfds", {"id": vnfd_id
})
2889 return cached_vnfds
[vnfd_id
]
2891 def _get_vnfr(self
, nsr_id
: str, vnf_profile_id
: str, cached_vnfrs
: Dict
[str, Any
]):
2892 if vnf_profile_id
not in cached_vnfrs
:
2893 cached_vnfrs
[vnf_profile_id
] = self
.db
.get_one(
2896 "member-vnf-index-ref": vnf_profile_id
,
2897 "nsr-id-ref": nsr_id
,
2900 return cached_vnfrs
[vnf_profile_id
]
2902 def _is_deployed_vca_in_relation(
2903 self
, vca
: DeployedVCA
, relation
: Relation
2906 for endpoint
in (relation
.provider
, relation
.requirer
):
2907 if endpoint
["kdu-resource-profile-id"]:
2910 vca
.vnf_profile_id
== endpoint
.vnf_profile_id
2911 and vca
.vdu_profile_id
== endpoint
.vdu_profile_id
2912 and vca
.execution_environment_ref
== endpoint
.execution_environment_ref
2918 def _update_ee_relation_data_with_implicit_data(
2919 self
, nsr_id
, nsd
, ee_relation_data
, cached_vnfds
, vnf_profile_id
: str = None
2921 ee_relation_data
= safe_get_ee_relation(
2922 nsr_id
, ee_relation_data
, vnf_profile_id
=vnf_profile_id
2924 ee_relation_level
= EELevel
.get_level(ee_relation_data
)
2925 if (ee_relation_level
in (EELevel
.VNF
, EELevel
.VDU
)) and not ee_relation_data
[
2926 "execution-environment-ref"
2928 vnf_profile
= get_vnf_profile(nsd
, ee_relation_data
["vnf-profile-id"])
2929 vnfd_id
= vnf_profile
["vnfd-id"]
2930 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2933 if ee_relation_level
== EELevel
.VNF
2934 else ee_relation_data
["vdu-profile-id"]
2936 ee
= get_juju_ee_ref(db_vnfd
, entity_id
)
2939 f
"not execution environments found for ee_relation {ee_relation_data}"
2941 ee_relation_data
["execution-environment-ref"] = ee
["id"]
2942 return ee_relation_data
2944 def _get_ns_relations(
2947 nsd
: Dict
[str, Any
],
2949 cached_vnfds
: Dict
[str, Any
],
2950 ) -> List
[Relation
]:
2952 db_ns_relations
= get_ns_configuration_relation_list(nsd
)
2953 for r
in db_ns_relations
:
2954 provider_dict
= None
2955 requirer_dict
= None
2956 if all(key
in r
for key
in ("provider", "requirer")):
2957 provider_dict
= r
["provider"]
2958 requirer_dict
= r
["requirer"]
2959 elif "entities" in r
:
2960 provider_id
= r
["entities"][0]["id"]
2963 "endpoint": r
["entities"][0]["endpoint"],
2965 if provider_id
!= nsd
["id"]:
2966 provider_dict
["vnf-profile-id"] = provider_id
2967 requirer_id
= r
["entities"][1]["id"]
2970 "endpoint": r
["entities"][1]["endpoint"],
2972 if requirer_id
!= nsd
["id"]:
2973 requirer_dict
["vnf-profile-id"] = requirer_id
2976 "provider/requirer or entities must be included in the relation."
2978 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2979 nsr_id
, nsd
, provider_dict
, cached_vnfds
2981 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2982 nsr_id
, nsd
, requirer_dict
, cached_vnfds
2984 provider
= EERelation(relation_provider
)
2985 requirer
= EERelation(relation_requirer
)
2986 relation
= Relation(r
["name"], provider
, requirer
)
2987 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
2989 relations
.append(relation
)
2992 def _get_vnf_relations(
2995 nsd
: Dict
[str, Any
],
2997 cached_vnfds
: Dict
[str, Any
],
2998 ) -> List
[Relation
]:
3000 vnf_profile
= get_vnf_profile(nsd
, vca
.vnf_profile_id
)
3001 vnf_profile_id
= vnf_profile
["id"]
3002 vnfd_id
= vnf_profile
["vnfd-id"]
3003 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
3004 db_vnf_relations
= get_relation_list(db_vnfd
, vnfd_id
)
3005 for r
in db_vnf_relations
:
3006 provider_dict
= None
3007 requirer_dict
= None
3008 if all(key
in r
for key
in ("provider", "requirer")):
3009 provider_dict
= r
["provider"]
3010 requirer_dict
= r
["requirer"]
3011 elif "entities" in r
:
3012 provider_id
= r
["entities"][0]["id"]
3015 "vnf-profile-id": vnf_profile_id
,
3016 "endpoint": r
["entities"][0]["endpoint"],
3018 if provider_id
!= vnfd_id
:
3019 provider_dict
["vdu-profile-id"] = provider_id
3020 requirer_id
= r
["entities"][1]["id"]
3023 "vnf-profile-id": vnf_profile_id
,
3024 "endpoint": r
["entities"][1]["endpoint"],
3026 if requirer_id
!= vnfd_id
:
3027 requirer_dict
["vdu-profile-id"] = requirer_id
3030 "provider/requirer or entities must be included in the relation."
3032 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3033 nsr_id
, nsd
, provider_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3035 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3036 nsr_id
, nsd
, requirer_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3038 provider
= EERelation(relation_provider
)
3039 requirer
= EERelation(relation_requirer
)
3040 relation
= Relation(r
["name"], provider
, requirer
)
3041 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
3043 relations
.append(relation
)
3046 def _get_kdu_resource_data(
3048 ee_relation
: EERelation
,
3049 db_nsr
: Dict
[str, Any
],
3050 cached_vnfds
: Dict
[str, Any
],
3051 ) -> DeployedK8sResource
:
3052 nsd
= get_nsd(db_nsr
)
3053 vnf_profiles
= get_vnf_profiles(nsd
)
3054 vnfd_id
= find_in_list(
3056 lambda vnf_profile
: vnf_profile
["id"] == ee_relation
.vnf_profile_id
,
3058 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
3059 kdu_resource_profile
= get_kdu_resource_profile(
3060 db_vnfd
, ee_relation
.kdu_resource_profile_id
3062 kdu_name
= kdu_resource_profile
["kdu-name"]
3063 deployed_kdu
, _
= get_deployed_kdu(
3064 db_nsr
.get("_admin", ()).get("deployed", ()),
3066 ee_relation
.vnf_profile_id
,
3068 deployed_kdu
.update({"resource-name": kdu_resource_profile
["resource-name"]})
3071 def _get_deployed_component(
3073 ee_relation
: EERelation
,
3074 db_nsr
: Dict
[str, Any
],
3075 cached_vnfds
: Dict
[str, Any
],
3076 ) -> DeployedComponent
:
3077 nsr_id
= db_nsr
["_id"]
3078 deployed_component
= None
3079 ee_level
= EELevel
.get_level(ee_relation
)
3080 if ee_level
== EELevel
.NS
:
3081 vca
= get_deployed_vca(db_nsr
, {"vdu_id": None, "member-vnf-index": None})
3083 deployed_component
= DeployedVCA(nsr_id
, vca
)
3084 elif ee_level
== EELevel
.VNF
:
3085 vca
= get_deployed_vca(
3089 "member-vnf-index": ee_relation
.vnf_profile_id
,
3090 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3094 deployed_component
= DeployedVCA(nsr_id
, vca
)
3095 elif ee_level
== EELevel
.VDU
:
3096 vca
= get_deployed_vca(
3099 "vdu_id": ee_relation
.vdu_profile_id
,
3100 "member-vnf-index": ee_relation
.vnf_profile_id
,
3101 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3105 deployed_component
= DeployedVCA(nsr_id
, vca
)
3106 elif ee_level
== EELevel
.KDU
:
3107 kdu_resource_data
= self
._get
_kdu
_resource
_data
(
3108 ee_relation
, db_nsr
, cached_vnfds
3110 if kdu_resource_data
:
3111 deployed_component
= DeployedK8sResource(kdu_resource_data
)
3112 return deployed_component
3114 async def _add_relation(
3118 db_nsr
: Dict
[str, Any
],
3119 cached_vnfds
: Dict
[str, Any
],
3120 cached_vnfrs
: Dict
[str, Any
],
3122 deployed_provider
= self
._get
_deployed
_component
(
3123 relation
.provider
, db_nsr
, cached_vnfds
3125 deployed_requirer
= self
._get
_deployed
_component
(
3126 relation
.requirer
, db_nsr
, cached_vnfds
3130 and deployed_requirer
3131 and deployed_provider
.config_sw_installed
3132 and deployed_requirer
.config_sw_installed
3134 provider_db_vnfr
= (
3136 relation
.provider
.nsr_id
,
3137 relation
.provider
.vnf_profile_id
,
3140 if relation
.provider
.vnf_profile_id
3143 requirer_db_vnfr
= (
3145 relation
.requirer
.nsr_id
,
3146 relation
.requirer
.vnf_profile_id
,
3149 if relation
.requirer
.vnf_profile_id
3152 provider_vca_id
= self
.get_vca_id(provider_db_vnfr
, db_nsr
)
3153 requirer_vca_id
= self
.get_vca_id(requirer_db_vnfr
, db_nsr
)
3154 provider_relation_endpoint
= RelationEndpoint(
3155 deployed_provider
.ee_id
,
3157 relation
.provider
.endpoint
,
3159 requirer_relation_endpoint
= RelationEndpoint(
3160 deployed_requirer
.ee_id
,
3162 relation
.requirer
.endpoint
,
3164 await self
.vca_map
[vca_type
].add_relation(
3165 provider
=provider_relation_endpoint
,
3166 requirer
=requirer_relation_endpoint
,
3168 # remove entry from relations list
3172 async def _add_vca_relations(
3178 timeout
: int = 3600,
3182 # 1. find all relations for this VCA
3183 # 2. wait for other peers related
3187 # STEP 1: find all relations for this VCA
3190 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3191 nsd
= get_nsd(db_nsr
)
3194 deployed_vca_dict
= get_deployed_vca_list(db_nsr
)[vca_index
]
3195 my_vca
= DeployedVCA(nsr_id
, deployed_vca_dict
)
3200 relations
.extend(self
._get
_ns
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3201 relations
.extend(self
._get
_vnf
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3203 # if no relations, terminate
3205 self
.logger
.debug(logging_text
+ " No relations")
3208 self
.logger
.debug(logging_text
+ " adding relations {}".format(relations
))
3215 if now
- start
>= timeout
:
3216 self
.logger
.error(logging_text
+ " : timeout adding relations")
3219 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3220 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3222 # for each relation, find the VCA's related
3223 for relation
in relations
.copy():
3224 added
= await self
._add
_relation
(
3232 relations
.remove(relation
)
3235 self
.logger
.debug("Relations added")
3237 await asyncio
.sleep(5.0)
3241 except Exception as e
:
3242 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
3245 async def _install_kdu(
3253 k8s_instance_info
: dict,
3254 k8params
: dict = None,
3260 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
3263 "collection": "nsrs",
3264 "filter": {"_id": nsr_id
},
3265 "path": nsr_db_path
,
3268 if k8s_instance_info
.get("kdu-deployment-name"):
3269 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
3271 kdu_instance
= self
.k8scluster_map
[
3273 ].generate_kdu_instance_name(
3274 db_dict
=db_dict_install
,
3275 kdu_model
=k8s_instance_info
["kdu-model"],
3276 kdu_name
=k8s_instance_info
["kdu-name"],
3279 # Update the nsrs table with the kdu-instance value
3283 _desc
={nsr_db_path
+ ".kdu-instance": kdu_instance
},
3286 # Update the nsrs table with the actual namespace being used, if the k8scluster-type is `juju` or
3287 # `juju-bundle`. This verification is needed because there is not a standard/homogeneous namespace
3288 # between the Helm Charts and Juju Bundles-based KNFs. If we found a way of having an homogeneous
3289 # namespace, this first verification could be removed, and the next step would be done for any kind
3291 # TODO -> find a way to have an homogeneous namespace between the Helm Charts and Juju Bundles-based
3292 # KNFs (Bug 2027: https://osm.etsi.org/bugzilla/show_bug.cgi?id=2027)
3293 if k8sclustertype
in ("juju", "juju-bundle"):
3294 # First, verify if the current namespace is present in the `_admin.projects_read` (if not, it means
3295 # that the user passed a namespace which he wants its KDU to be deployed in)
3301 "_admin.projects_write": k8s_instance_info
["namespace"],
3302 "_admin.projects_read": k8s_instance_info
["namespace"],
3308 f
"Updating namespace/model for Juju Bundle from {k8s_instance_info['namespace']} to {kdu_instance}"
3313 _desc
={f
"{nsr_db_path}.namespace": kdu_instance
},
3315 k8s_instance_info
["namespace"] = kdu_instance
3317 await self
.k8scluster_map
[k8sclustertype
].install(
3318 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3319 kdu_model
=k8s_instance_info
["kdu-model"],
3322 db_dict
=db_dict_install
,
3324 kdu_name
=k8s_instance_info
["kdu-name"],
3325 namespace
=k8s_instance_info
["namespace"],
3326 kdu_instance
=kdu_instance
,
3330 # Obtain services to obtain management service ip
3331 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
3332 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3333 kdu_instance
=kdu_instance
,
3334 namespace
=k8s_instance_info
["namespace"],
3337 # Obtain management service info (if exists)
3338 vnfr_update_dict
= {}
3339 kdu_config
= get_configuration(vnfd
, kdud
["name"])
3341 target_ee_list
= kdu_config
.get("execution-environment-list", [])
3346 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
3349 for service
in kdud
.get("service", [])
3350 if service
.get("mgmt-service")
3352 for mgmt_service
in mgmt_services
:
3353 for service
in services
:
3354 if service
["name"].startswith(mgmt_service
["name"]):
3355 # Mgmt service found, Obtain service ip
3356 ip
= service
.get("external_ip", service
.get("cluster_ip"))
3357 if isinstance(ip
, list) and len(ip
) == 1:
3361 "kdur.{}.ip-address".format(kdu_index
)
3364 # Check if must update also mgmt ip at the vnf
3365 service_external_cp
= mgmt_service
.get(
3366 "external-connection-point-ref"
3368 if service_external_cp
:
3370 deep_get(vnfd
, ("mgmt-interface", "cp"))
3371 == service_external_cp
3373 vnfr_update_dict
["ip-address"] = ip
3378 "external-connection-point-ref", ""
3380 == service_external_cp
,
3383 "kdur.{}.ip-address".format(kdu_index
)
3388 "Mgmt service name: {} not found".format(
3389 mgmt_service
["name"]
3393 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3394 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3396 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3399 and kdu_config
.get("initial-config-primitive")
3400 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3402 initial_config_primitive_list
= kdu_config
.get(
3403 "initial-config-primitive"
3405 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3407 for initial_config_primitive
in initial_config_primitive_list
:
3408 primitive_params_
= self
._map
_primitive
_params
(
3409 initial_config_primitive
, {}, {}
3412 await asyncio
.wait_for(
3413 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3414 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3415 kdu_instance
=kdu_instance
,
3416 primitive_name
=initial_config_primitive
["name"],
3417 params
=primitive_params_
,
3418 db_dict
=db_dict_install
,
3424 except Exception as e
:
3425 # Prepare update db with error and raise exception
3428 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3432 vnfr_data
.get("_id"),
3433 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3436 # ignore to keep original exception
3438 # reraise original error
3443 async def deploy_kdus(
3450 task_instantiation_info
,
3452 # Launch kdus if present in the descriptor
3454 k8scluster_id_2_uuic
= {
3455 "helm-chart-v3": {},
3460 async def _get_cluster_id(cluster_id
, cluster_type
):
3461 nonlocal k8scluster_id_2_uuic
3462 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3463 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3465 # check if K8scluster is creating and wait look if previous tasks in process
3466 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3467 "k8scluster", cluster_id
3470 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3471 task_name
, cluster_id
3473 self
.logger
.debug(logging_text
+ text
)
3474 await asyncio
.wait(task_dependency
, timeout
=3600)
3476 db_k8scluster
= self
.db
.get_one(
3477 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3479 if not db_k8scluster
:
3480 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3482 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3484 if cluster_type
== "helm-chart-v3":
3486 # backward compatibility for existing clusters that have not been initialized for helm v3
3487 k8s_credentials
= yaml
.safe_dump(
3488 db_k8scluster
.get("credentials")
3490 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3491 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3493 db_k8scluster_update
= {}
3494 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3495 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3496 db_k8scluster_update
[
3497 "_admin.helm-chart-v3.created"
3499 db_k8scluster_update
[
3500 "_admin.helm-chart-v3.operationalState"
3503 "k8sclusters", cluster_id
, db_k8scluster_update
3505 except Exception as e
:
3508 + "error initializing helm-v3 cluster: {}".format(str(e
))
3511 "K8s cluster '{}' has not been initialized for '{}'".format(
3512 cluster_id
, cluster_type
3517 "K8s cluster '{}' has not been initialized for '{}'".format(
3518 cluster_id
, cluster_type
3521 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3524 logging_text
+= "Deploy kdus: "
3527 db_nsr_update
= {"_admin.deployed.K8s": []}
3528 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3531 updated_cluster_list
= []
3532 updated_v3_cluster_list
= []
3534 for vnfr_data
in db_vnfrs
.values():
3535 vca_id
= self
.get_vca_id(vnfr_data
, {})
3536 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3537 # Step 0: Prepare and set parameters
3538 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3539 vnfd_id
= vnfr_data
.get("vnfd-id")
3540 vnfd_with_id
= find_in_list(
3541 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3545 for kdud
in vnfd_with_id
["kdu"]
3546 if kdud
["name"] == kdur
["kdu-name"]
3548 namespace
= kdur
.get("k8s-namespace")
3549 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3550 if kdur
.get("helm-chart"):
3551 kdumodel
= kdur
["helm-chart"]
3552 # Default version: helm3, if helm-version is v2 assign v2
3553 k8sclustertype
= "helm-chart-v3"
3554 self
.logger
.debug("kdur: {}".format(kdur
))
3556 kdur
.get("helm-version")
3557 and kdur
.get("helm-version") == "v2"
3559 k8sclustertype
= "helm-chart"
3560 elif kdur
.get("juju-bundle"):
3561 kdumodel
= kdur
["juju-bundle"]
3562 k8sclustertype
= "juju-bundle"
3565 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3566 "juju-bundle. Maybe an old NBI version is running".format(
3567 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3570 # check if kdumodel is a file and exists
3572 vnfd_with_id
= find_in_list(
3573 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3575 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3576 if storage
: # may be not present if vnfd has not artifacts
3577 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3578 if storage
["pkg-dir"]:
3579 filename
= "{}/{}/{}s/{}".format(
3586 filename
= "{}/Scripts/{}s/{}".format(
3591 if self
.fs
.file_exists(
3592 filename
, mode
="file"
3593 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3594 kdumodel
= self
.fs
.path
+ filename
3595 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3597 except Exception: # it is not a file
3600 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3601 step
= "Synchronize repos for k8s cluster '{}'".format(
3604 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3608 k8sclustertype
== "helm-chart"
3609 and cluster_uuid
not in updated_cluster_list
3611 k8sclustertype
== "helm-chart-v3"
3612 and cluster_uuid
not in updated_v3_cluster_list
3614 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3615 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3616 cluster_uuid
=cluster_uuid
3619 if del_repo_list
or added_repo_dict
:
3620 if k8sclustertype
== "helm-chart":
3622 "_admin.helm_charts_added." + item
: None
3623 for item
in del_repo_list
3626 "_admin.helm_charts_added." + item
: name
3627 for item
, name
in added_repo_dict
.items()
3629 updated_cluster_list
.append(cluster_uuid
)
3630 elif k8sclustertype
== "helm-chart-v3":
3632 "_admin.helm_charts_v3_added." + item
: None
3633 for item
in del_repo_list
3636 "_admin.helm_charts_v3_added." + item
: name
3637 for item
, name
in added_repo_dict
.items()
3639 updated_v3_cluster_list
.append(cluster_uuid
)
3641 logging_text
+ "repos synchronized on k8s cluster "
3642 "'{}' to_delete: {}, to_add: {}".format(
3643 k8s_cluster_id
, del_repo_list
, added_repo_dict
3648 {"_id": k8s_cluster_id
},
3654 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3655 vnfr_data
["member-vnf-index-ref"],
3659 k8s_instance_info
= {
3660 "kdu-instance": None,
3661 "k8scluster-uuid": cluster_uuid
,
3662 "k8scluster-type": k8sclustertype
,
3663 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3664 "kdu-name": kdur
["kdu-name"],
3665 "kdu-model": kdumodel
,
3666 "namespace": namespace
,
3667 "kdu-deployment-name": kdu_deployment_name
,
3669 db_path
= "_admin.deployed.K8s.{}".format(index
)
3670 db_nsr_update
[db_path
] = k8s_instance_info
3671 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3672 vnfd_with_id
= find_in_list(
3673 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3675 task
= asyncio
.ensure_future(
3684 k8params
=desc_params
,
3689 self
.lcm_tasks
.register(
3693 "instantiate_KDU-{}".format(index
),
3696 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3702 except (LcmException
, asyncio
.CancelledError
):
3704 except Exception as e
:
3705 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3706 if isinstance(e
, (N2VCException
, DbException
)):
3707 self
.logger
.error(logging_text
+ msg
)
3709 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3710 raise LcmException(msg
)
3713 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3732 task_instantiation_info
,
3735 # launch instantiate_N2VC in a asyncio task and register task object
3736 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3737 # if not found, create one entry and update database
3738 # fill db_nsr._admin.deployed.VCA.<index>
3741 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
3743 if "execution-environment-list" in descriptor_config
:
3744 ee_list
= descriptor_config
.get("execution-environment-list", [])
3745 elif "juju" in descriptor_config
:
3746 ee_list
= [descriptor_config
] # ns charms
3747 else: # other types as script are not supported
3750 for ee_item
in ee_list
:
3753 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3754 ee_item
.get("juju"), ee_item
.get("helm-chart")
3757 ee_descriptor_id
= ee_item
.get("id")
3758 if ee_item
.get("juju"):
3759 vca_name
= ee_item
["juju"].get("charm")
3762 if ee_item
["juju"].get("charm") is not None
3765 if ee_item
["juju"].get("cloud") == "k8s":
3766 vca_type
= "k8s_proxy_charm"
3767 elif ee_item
["juju"].get("proxy") is False:
3768 vca_type
= "native_charm"
3769 elif ee_item
.get("helm-chart"):
3770 vca_name
= ee_item
["helm-chart"]
3771 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
3774 vca_type
= "helm-v3"
3777 logging_text
+ "skipping non juju neither charm configuration"
3782 for vca_index
, vca_deployed
in enumerate(
3783 db_nsr
["_admin"]["deployed"]["VCA"]
3785 if not vca_deployed
:
3788 vca_deployed
.get("member-vnf-index") == member_vnf_index
3789 and vca_deployed
.get("vdu_id") == vdu_id
3790 and vca_deployed
.get("kdu_name") == kdu_name
3791 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
3792 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
3796 # not found, create one.
3798 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
3801 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
3803 target
+= "/kdu/{}".format(kdu_name
)
3805 "target_element": target
,
3806 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3807 "member-vnf-index": member_vnf_index
,
3809 "kdu_name": kdu_name
,
3810 "vdu_count_index": vdu_index
,
3811 "operational-status": "init", # TODO revise
3812 "detailed-status": "", # TODO revise
3813 "step": "initial-deploy", # TODO revise
3815 "vdu_name": vdu_name
,
3817 "ee_descriptor_id": ee_descriptor_id
,
3821 # create VCA and configurationStatus in db
3823 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
3824 "configurationStatus.{}".format(vca_index
): dict(),
3826 self
.update_db_2("nsrs", nsr_id
, db_dict
)
3828 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
3830 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
3831 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
3832 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
3835 task_n2vc
= asyncio
.ensure_future(
3836 self
.instantiate_N2VC(
3837 logging_text
=logging_text
,
3838 vca_index
=vca_index
,
3844 vdu_index
=vdu_index
,
3845 deploy_params
=deploy_params
,
3846 config_descriptor
=descriptor_config
,
3847 base_folder
=base_folder
,
3848 nslcmop_id
=nslcmop_id
,
3852 ee_config_descriptor
=ee_item
,
3855 self
.lcm_tasks
.register(
3859 "instantiate_N2VC-{}".format(vca_index
),
3862 task_instantiation_info
[
3864 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
3865 member_vnf_index
or "", vdu_id
or ""
3869 def _create_nslcmop(nsr_id
, operation
, params
):
3871 Creates a ns-lcm-opp content to be stored at database.
3872 :param nsr_id: internal id of the instance
3873 :param operation: instantiate, terminate, scale, action, ...
3874 :param params: user parameters for the operation
3875 :return: dictionary following SOL005 format
3877 # Raise exception if invalid arguments
3878 if not (nsr_id
and operation
and params
):
3880 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3887 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3888 "operationState": "PROCESSING",
3889 "statusEnteredTime": now
,
3890 "nsInstanceId": nsr_id
,
3891 "lcmOperationType": operation
,
3893 "isAutomaticInvocation": False,
3894 "operationParams": params
,
3895 "isCancelPending": False,
3897 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
3898 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
3903 def _format_additional_params(self
, params
):
3904 params
= params
or {}
3905 for key
, value
in params
.items():
3906 if str(value
).startswith("!!yaml "):
3907 params
[key
] = yaml
.safe_load(value
[7:])
3910 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
3911 primitive
= seq
.get("name")
3912 primitive_params
= {}
3914 "member_vnf_index": vnf_index
,
3915 "primitive": primitive
,
3916 "primitive_params": primitive_params
,
3919 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
3923 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
3924 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
3925 if op
.get("operationState") == "COMPLETED":
3926 # b. Skip sub-operation
3927 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3928 return self
.SUBOPERATION_STATUS_SKIP
3930 # c. retry executing sub-operation
3931 # The sub-operation exists, and operationState != 'COMPLETED'
3932 # Update operationState = 'PROCESSING' to indicate a retry.
3933 operationState
= "PROCESSING"
3934 detailed_status
= "In progress"
3935 self
._update
_suboperation
_status
(
3936 db_nslcmop
, op_index
, operationState
, detailed_status
3938 # Return the sub-operation index
3939 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3940 # with arguments extracted from the sub-operation
3943 # Find a sub-operation where all keys in a matching dictionary must match
3944 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3945 def _find_suboperation(self
, db_nslcmop
, match
):
3946 if db_nslcmop
and match
:
3947 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
3948 for i
, op
in enumerate(op_list
):
3949 if all(op
.get(k
) == match
[k
] for k
in match
):
3951 return self
.SUBOPERATION_STATUS_NOT_FOUND
3953 # Update status for a sub-operation given its index
3954 def _update_suboperation_status(
3955 self
, db_nslcmop
, op_index
, operationState
, detailed_status
3957 # Update DB for HA tasks
3958 q_filter
= {"_id": db_nslcmop
["_id"]}
3960 "_admin.operations.{}.operationState".format(op_index
): operationState
,
3961 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
3964 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
3967 # Add sub-operation, return the index of the added sub-operation
3968 # Optionally, set operationState, detailed-status, and operationType
3969 # Status and type are currently set for 'scale' sub-operations:
3970 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3971 # 'detailed-status' : status message
3972 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3973 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3974 def _add_suboperation(
3982 mapped_primitive_params
,
3983 operationState
=None,
3984 detailed_status
=None,
3987 RO_scaling_info
=None,
3990 return self
.SUBOPERATION_STATUS_NOT_FOUND
3991 # Get the "_admin.operations" list, if it exists
3992 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
3993 op_list
= db_nslcmop_admin
.get("operations")
3994 # Create or append to the "_admin.operations" list
3996 "member_vnf_index": vnf_index
,
3998 "vdu_count_index": vdu_count_index
,
3999 "primitive": primitive
,
4000 "primitive_params": mapped_primitive_params
,
4003 new_op
["operationState"] = operationState
4005 new_op
["detailed-status"] = detailed_status
4007 new_op
["lcmOperationType"] = operationType
4009 new_op
["RO_nsr_id"] = RO_nsr_id
4011 new_op
["RO_scaling_info"] = RO_scaling_info
4013 # No existing operations, create key 'operations' with current operation as first list element
4014 db_nslcmop_admin
.update({"operations": [new_op
]})
4015 op_list
= db_nslcmop_admin
.get("operations")
4017 # Existing operations, append operation to list
4018 op_list
.append(new_op
)
4020 db_nslcmop_update
= {"_admin.operations": op_list
}
4021 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
4022 op_index
= len(op_list
) - 1
4025 # Helper methods for scale() sub-operations
4027 # pre-scale/post-scale:
4028 # Check for 3 different cases:
4029 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
4030 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
4031 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
4032 def _check_or_add_scale_suboperation(
4036 vnf_config_primitive
,
4040 RO_scaling_info
=None,
4042 # Find this sub-operation
4043 if RO_nsr_id
and RO_scaling_info
:
4044 operationType
= "SCALE-RO"
4046 "member_vnf_index": vnf_index
,
4047 "RO_nsr_id": RO_nsr_id
,
4048 "RO_scaling_info": RO_scaling_info
,
4052 "member_vnf_index": vnf_index
,
4053 "primitive": vnf_config_primitive
,
4054 "primitive_params": primitive_params
,
4055 "lcmOperationType": operationType
,
4057 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
4058 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
4059 # a. New sub-operation
4060 # The sub-operation does not exist, add it.
4061 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
4062 # The following parameters are set to None for all kind of scaling:
4064 vdu_count_index
= None
4066 if RO_nsr_id
and RO_scaling_info
:
4067 vnf_config_primitive
= None
4068 primitive_params
= None
4071 RO_scaling_info
= None
4072 # Initial status for sub-operation
4073 operationState
= "PROCESSING"
4074 detailed_status
= "In progress"
4075 # Add sub-operation for pre/post-scaling (zero or more operations)
4076 self
._add
_suboperation
(
4082 vnf_config_primitive
,
4090 return self
.SUBOPERATION_STATUS_NEW
4092 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
4093 # or op_index (operationState != 'COMPLETED')
4094 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
4096 # Function to return execution_environment id
4098 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
4099 # TODO vdu_index_count
4100 for vca
in vca_deployed_list
:
4101 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
4104 async def destroy_N2VC(
4112 exec_primitives
=True,
4117 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
4118 :param logging_text:
4120 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
4121 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
4122 :param vca_index: index in the database _admin.deployed.VCA
4123 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
4124 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
4125 not executed properly
4126 :param scaling_in: True destroys the application, False destroys the model
4127 :return: None or exception
4132 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
4133 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
4137 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
4139 # execute terminate_primitives
4141 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
4142 config_descriptor
.get("terminate-config-primitive"),
4143 vca_deployed
.get("ee_descriptor_id"),
4145 vdu_id
= vca_deployed
.get("vdu_id")
4146 vdu_count_index
= vca_deployed
.get("vdu_count_index")
4147 vdu_name
= vca_deployed
.get("vdu_name")
4148 vnf_index
= vca_deployed
.get("member-vnf-index")
4149 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
4150 for seq
in terminate_primitives
:
4151 # For each sequence in list, get primitive and call _ns_execute_primitive()
4152 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
4153 vnf_index
, seq
.get("name")
4155 self
.logger
.debug(logging_text
+ step
)
4156 # Create the primitive for each sequence, i.e. "primitive": "touch"
4157 primitive
= seq
.get("name")
4158 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
4163 self
._add
_suboperation
(
4170 mapped_primitive_params
,
4172 # Sub-operations: Call _ns_execute_primitive() instead of action()
4174 result
, result_detail
= await self
._ns
_execute
_primitive
(
4175 vca_deployed
["ee_id"],
4177 mapped_primitive_params
,
4181 except LcmException
:
4182 # this happens when VCA is not deployed. In this case it is not needed to terminate
4184 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
4185 if result
not in result_ok
:
4187 "terminate_primitive {} for vnf_member_index={} fails with "
4188 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
4190 # set that this VCA do not need terminated
4191 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
4195 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
4198 # Delete Prometheus Jobs if any
4199 # This uses NSR_ID, so it will destroy any jobs under this index
4200 self
.db
.del_list("prometheus_jobs", {"nsr_id": db_nslcmop
["nsInstanceId"]})
4203 await self
.vca_map
[vca_type
].delete_execution_environment(
4204 vca_deployed
["ee_id"],
4205 scaling_in
=scaling_in
,
4210 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
4211 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
4212 namespace
= "." + db_nsr
["_id"]
4214 await self
.n2vc
.delete_namespace(
4215 namespace
=namespace
,
4216 total_timeout
=self
.timeout_charm_delete
,
4219 except N2VCNotFound
: # already deleted. Skip
4221 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
4223 async def _terminate_RO(
4224 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4227 Terminates a deployment from RO
4228 :param logging_text:
4229 :param nsr_deployed: db_nsr._admin.deployed
4232 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
4233 this method will update only the index 2, but it will write on database the concatenated content of the list
4238 ro_nsr_id
= ro_delete_action
= None
4239 if nsr_deployed
and nsr_deployed
.get("RO"):
4240 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
4241 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
4244 stage
[2] = "Deleting ns from VIM."
4245 db_nsr_update
["detailed-status"] = " ".join(stage
)
4246 self
._write
_op
_status
(nslcmop_id
, stage
)
4247 self
.logger
.debug(logging_text
+ stage
[2])
4248 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4249 self
._write
_op
_status
(nslcmop_id
, stage
)
4250 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
4251 ro_delete_action
= desc
["action_id"]
4253 "_admin.deployed.RO.nsr_delete_action_id"
4254 ] = ro_delete_action
4255 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4256 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4257 if ro_delete_action
:
4258 # wait until NS is deleted from VIM
4259 stage
[2] = "Waiting ns deleted from VIM."
4260 detailed_status_old
= None
4264 + " RO_id={} ro_delete_action={}".format(
4265 ro_nsr_id
, ro_delete_action
4268 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4269 self
._write
_op
_status
(nslcmop_id
, stage
)
4271 delete_timeout
= 20 * 60 # 20 minutes
4272 while delete_timeout
> 0:
4273 desc
= await self
.RO
.show(
4275 item_id_name
=ro_nsr_id
,
4276 extra_item
="action",
4277 extra_item_id
=ro_delete_action
,
4281 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
4283 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
4284 if ns_status
== "ERROR":
4285 raise ROclient
.ROClientException(ns_status_info
)
4286 elif ns_status
== "BUILD":
4287 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
4288 elif ns_status
== "ACTIVE":
4289 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4290 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4295 ), "ROclient.check_action_status returns unknown {}".format(
4298 if stage
[2] != detailed_status_old
:
4299 detailed_status_old
= stage
[2]
4300 db_nsr_update
["detailed-status"] = " ".join(stage
)
4301 self
._write
_op
_status
(nslcmop_id
, stage
)
4302 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4303 await asyncio
.sleep(5, loop
=self
.loop
)
4305 else: # delete_timeout <= 0:
4306 raise ROclient
.ROClientException(
4307 "Timeout waiting ns deleted from VIM"
4310 except Exception as e
:
4311 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4313 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4315 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4316 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4317 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4319 logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
)
4322 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4324 failed_detail
.append("delete conflict: {}".format(e
))
4327 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
)
4330 failed_detail
.append("delete error: {}".format(e
))
4332 logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
)
4336 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
4337 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
4339 stage
[2] = "Deleting nsd from RO."
4340 db_nsr_update
["detailed-status"] = " ".join(stage
)
4341 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4342 self
._write
_op
_status
(nslcmop_id
, stage
)
4343 await self
.RO
.delete("nsd", ro_nsd_id
)
4345 logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
)
4347 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4348 except Exception as e
:
4350 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4352 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4354 logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
)
4357 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4359 failed_detail
.append(
4360 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
)
4362 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4364 failed_detail
.append(
4365 "ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
)
4367 self
.logger
.error(logging_text
+ failed_detail
[-1])
4369 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
4370 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
4371 if not vnf_deployed
or not vnf_deployed
["id"]:
4374 ro_vnfd_id
= vnf_deployed
["id"]
4377 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4378 vnf_deployed
["member-vnf-index"], ro_vnfd_id
4380 db_nsr_update
["detailed-status"] = " ".join(stage
)
4381 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4382 self
._write
_op
_status
(nslcmop_id
, stage
)
4383 await self
.RO
.delete("vnfd", ro_vnfd_id
)
4385 logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
)
4387 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
4388 except Exception as e
:
4390 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4393 "_admin.deployed.RO.vnfd.{}.id".format(index
)
4397 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
)
4400 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4402 failed_detail
.append(
4403 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
)
4405 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4407 failed_detail
.append(
4408 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
)
4410 self
.logger
.error(logging_text
+ failed_detail
[-1])
4413 stage
[2] = "Error deleting from VIM"
4415 stage
[2] = "Deleted from VIM"
4416 db_nsr_update
["detailed-status"] = " ".join(stage
)
4417 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4418 self
._write
_op
_status
(nslcmop_id
, stage
)
4421 raise LcmException("; ".join(failed_detail
))
4423 async def terminate(self
, nsr_id
, nslcmop_id
):
4424 # Try to lock HA task here
4425 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4426 if not task_is_locked_by_me
:
4429 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4430 self
.logger
.debug(logging_text
+ "Enter")
4431 timeout_ns_terminate
= self
.timeout_ns_terminate
4434 operation_params
= None
4436 error_list
= [] # annotates all failed error messages
4437 db_nslcmop_update
= {}
4438 autoremove
= False # autoremove after terminated
4439 tasks_dict_info
= {}
4442 "Stage 1/3: Preparing task.",
4443 "Waiting for previous operations to terminate.",
4446 # ^ contains [stage, step, VIM-status]
4448 # wait for any previous tasks in process
4449 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4451 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4452 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4453 operation_params
= db_nslcmop
.get("operationParams") or {}
4454 if operation_params
.get("timeout_ns_terminate"):
4455 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4456 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4457 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4459 db_nsr_update
["operational-status"] = "terminating"
4460 db_nsr_update
["config-status"] = "terminating"
4461 self
._write
_ns
_status
(
4463 ns_state
="TERMINATING",
4464 current_operation
="TERMINATING",
4465 current_operation_id
=nslcmop_id
,
4466 other_update
=db_nsr_update
,
4468 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4469 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4470 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4473 stage
[1] = "Getting vnf descriptors from db."
4474 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4476 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4478 db_vnfds_from_id
= {}
4479 db_vnfds_from_member_index
= {}
4481 for vnfr
in db_vnfrs_list
:
4482 vnfd_id
= vnfr
["vnfd-id"]
4483 if vnfd_id
not in db_vnfds_from_id
:
4484 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4485 db_vnfds_from_id
[vnfd_id
] = vnfd
4486 db_vnfds_from_member_index
[
4487 vnfr
["member-vnf-index-ref"]
4488 ] = db_vnfds_from_id
[vnfd_id
]
4490 # Destroy individual execution environments when there are terminating primitives.
4491 # Rest of EE will be deleted at once
4492 # TODO - check before calling _destroy_N2VC
4493 # if not operation_params.get("skip_terminate_primitives"):#
4494 # or not vca.get("needed_terminate"):
4495 stage
[0] = "Stage 2/3 execute terminating primitives."
4496 self
.logger
.debug(logging_text
+ stage
[0])
4497 stage
[1] = "Looking execution environment that needs terminate."
4498 self
.logger
.debug(logging_text
+ stage
[1])
4500 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4501 config_descriptor
= None
4502 vca_member_vnf_index
= vca
.get("member-vnf-index")
4503 vca_id
= self
.get_vca_id(
4504 db_vnfrs_dict
.get(vca_member_vnf_index
)
4505 if vca_member_vnf_index
4509 if not vca
or not vca
.get("ee_id"):
4511 if not vca
.get("member-vnf-index"):
4513 config_descriptor
= db_nsr
.get("ns-configuration")
4514 elif vca
.get("vdu_id"):
4515 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4516 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4517 elif vca
.get("kdu_name"):
4518 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4519 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4521 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4522 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4523 vca_type
= vca
.get("type")
4524 exec_terminate_primitives
= not operation_params
.get(
4525 "skip_terminate_primitives"
4526 ) and vca
.get("needed_terminate")
4527 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4528 # pending native charms
4530 True if vca_type
in ("helm", "helm-v3", "native_charm") else False
4532 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4533 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4534 task
= asyncio
.ensure_future(
4542 exec_terminate_primitives
,
4546 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4548 # wait for pending tasks of terminate primitives
4552 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4554 error_list
= await self
._wait
_for
_tasks
(
4557 min(self
.timeout_charm_delete
, timeout_ns_terminate
),
4561 tasks_dict_info
.clear()
4563 return # raise LcmException("; ".join(error_list))
4565 # remove All execution environments at once
4566 stage
[0] = "Stage 3/3 delete all."
4568 if nsr_deployed
.get("VCA"):
4569 stage
[1] = "Deleting all execution environments."
4570 self
.logger
.debug(logging_text
+ stage
[1])
4571 vca_id
= self
.get_vca_id({}, db_nsr
)
4572 task_delete_ee
= asyncio
.ensure_future(
4574 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4575 timeout
=self
.timeout_charm_delete
,
4578 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4579 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4581 # Delete from k8scluster
4582 stage
[1] = "Deleting KDUs."
4583 self
.logger
.debug(logging_text
+ stage
[1])
4584 # print(nsr_deployed)
4585 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4586 if not kdu
or not kdu
.get("kdu-instance"):
4588 kdu_instance
= kdu
.get("kdu-instance")
4589 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4590 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4591 vca_id
= self
.get_vca_id({}, db_nsr
)
4592 task_delete_kdu_instance
= asyncio
.ensure_future(
4593 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4594 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4595 kdu_instance
=kdu_instance
,
4597 namespace
=kdu
.get("namespace"),
4603 + "Unknown k8s deployment type {}".format(
4604 kdu
.get("k8scluster-type")
4609 task_delete_kdu_instance
4610 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4613 stage
[1] = "Deleting ns from VIM."
4615 task_delete_ro
= asyncio
.ensure_future(
4616 self
._terminate
_ng
_ro
(
4617 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4621 task_delete_ro
= asyncio
.ensure_future(
4623 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4626 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4628 # rest of staff will be done at finally
4631 ROclient
.ROClientException
,
4636 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4638 except asyncio
.CancelledError
:
4640 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4642 exc
= "Operation was cancelled"
4643 except Exception as e
:
4644 exc
= traceback
.format_exc()
4645 self
.logger
.critical(
4646 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4651 error_list
.append(str(exc
))
4653 # wait for pending tasks
4655 stage
[1] = "Waiting for terminate pending tasks."
4656 self
.logger
.debug(logging_text
+ stage
[1])
4657 error_list
+= await self
._wait
_for
_tasks
(
4660 timeout_ns_terminate
,
4664 stage
[1] = stage
[2] = ""
4665 except asyncio
.CancelledError
:
4666 error_list
.append("Cancelled")
4667 # TODO cancell all tasks
4668 except Exception as exc
:
4669 error_list
.append(str(exc
))
4670 # update status at database
4672 error_detail
= "; ".join(error_list
)
4673 # self.logger.error(logging_text + error_detail)
4674 error_description_nslcmop
= "{} Detail: {}".format(
4675 stage
[0], error_detail
4677 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4678 nslcmop_id
, stage
[0]
4681 db_nsr_update
["operational-status"] = "failed"
4682 db_nsr_update
["detailed-status"] = (
4683 error_description_nsr
+ " Detail: " + error_detail
4685 db_nslcmop_update
["detailed-status"] = error_detail
4686 nslcmop_operation_state
= "FAILED"
4690 error_description_nsr
= error_description_nslcmop
= None
4691 ns_state
= "NOT_INSTANTIATED"
4692 db_nsr_update
["operational-status"] = "terminated"
4693 db_nsr_update
["detailed-status"] = "Done"
4694 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4695 db_nslcmop_update
["detailed-status"] = "Done"
4696 nslcmop_operation_state
= "COMPLETED"
4699 self
._write
_ns
_status
(
4702 current_operation
="IDLE",
4703 current_operation_id
=None,
4704 error_description
=error_description_nsr
,
4705 error_detail
=error_detail
,
4706 other_update
=db_nsr_update
,
4708 self
._write
_op
_status
(
4711 error_message
=error_description_nslcmop
,
4712 operation_state
=nslcmop_operation_state
,
4713 other_update
=db_nslcmop_update
,
4715 if ns_state
== "NOT_INSTANTIATED":
4719 {"nsr-id-ref": nsr_id
},
4720 {"_admin.nsState": "NOT_INSTANTIATED"},
4722 except DbException
as e
:
4725 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4729 if operation_params
:
4730 autoremove
= operation_params
.get("autoremove", False)
4731 if nslcmop_operation_state
:
4733 await self
.msg
.aiowrite(
4738 "nslcmop_id": nslcmop_id
,
4739 "operationState": nslcmop_operation_state
,
4740 "autoremove": autoremove
,
4744 except Exception as e
:
4746 logging_text
+ "kafka_write notification Exception {}".format(e
)
4749 self
.logger
.debug(logging_text
+ "Exit")
4750 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4752 async def _wait_for_tasks(
4753 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4756 error_detail_list
= []
4758 pending_tasks
= list(created_tasks_info
.keys())
4759 num_tasks
= len(pending_tasks
)
4761 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4762 self
._write
_op
_status
(nslcmop_id
, stage
)
4763 while pending_tasks
:
4765 _timeout
= timeout
+ time_start
- time()
4766 done
, pending_tasks
= await asyncio
.wait(
4767 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4769 num_done
+= len(done
)
4770 if not done
: # Timeout
4771 for task
in pending_tasks
:
4772 new_error
= created_tasks_info
[task
] + ": Timeout"
4773 error_detail_list
.append(new_error
)
4774 error_list
.append(new_error
)
4777 if task
.cancelled():
4780 exc
= task
.exception()
4782 if isinstance(exc
, asyncio
.TimeoutError
):
4784 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4785 error_list
.append(created_tasks_info
[task
])
4786 error_detail_list
.append(new_error
)
4793 ROclient
.ROClientException
,
4799 self
.logger
.error(logging_text
+ new_error
)
4801 exc_traceback
= "".join(
4802 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4806 + created_tasks_info
[task
]
4812 logging_text
+ created_tasks_info
[task
] + ": Done"
4814 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4816 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4817 if nsr_id
: # update also nsr
4822 "errorDescription": "Error at: " + ", ".join(error_list
),
4823 "errorDetail": ". ".join(error_detail_list
),
4826 self
._write
_op
_status
(nslcmop_id
, stage
)
4827 return error_detail_list
4830 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4832 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4833 The default-value is used. If it is between < > it look for a value at instantiation_params
4834 :param primitive_desc: portion of VNFD/NSD that describes primitive
4835 :param params: Params provided by user
4836 :param instantiation_params: Instantiation params provided by user
4837 :return: a dictionary with the calculated params
4839 calculated_params
= {}
4840 for parameter
in primitive_desc
.get("parameter", ()):
4841 param_name
= parameter
["name"]
4842 if param_name
in params
:
4843 calculated_params
[param_name
] = params
[param_name
]
4844 elif "default-value" in parameter
or "value" in parameter
:
4845 if "value" in parameter
:
4846 calculated_params
[param_name
] = parameter
["value"]
4848 calculated_params
[param_name
] = parameter
["default-value"]
4850 isinstance(calculated_params
[param_name
], str)
4851 and calculated_params
[param_name
].startswith("<")
4852 and calculated_params
[param_name
].endswith(">")
4854 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4855 calculated_params
[param_name
] = instantiation_params
[
4856 calculated_params
[param_name
][1:-1]
4860 "Parameter {} needed to execute primitive {} not provided".format(
4861 calculated_params
[param_name
], primitive_desc
["name"]
4866 "Parameter {} needed to execute primitive {} not provided".format(
4867 param_name
, primitive_desc
["name"]
4871 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4872 calculated_params
[param_name
] = yaml
.safe_dump(
4873 calculated_params
[param_name
], default_flow_style
=True, width
=256
4875 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4877 ].startswith("!!yaml "):
4878 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4879 if parameter
.get("data-type") == "INTEGER":
4881 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4882 except ValueError: # error converting string to int
4884 "Parameter {} of primitive {} must be integer".format(
4885 param_name
, primitive_desc
["name"]
4888 elif parameter
.get("data-type") == "BOOLEAN":
4889 calculated_params
[param_name
] = not (
4890 (str(calculated_params
[param_name
])).lower() == "false"
4893 # add always ns_config_info if primitive name is config
4894 if primitive_desc
["name"] == "config":
4895 if "ns_config_info" in instantiation_params
:
4896 calculated_params
["ns_config_info"] = instantiation_params
[
4899 return calculated_params
4901 def _look_for_deployed_vca(
4908 ee_descriptor_id
=None,
4910 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4911 for vca
in deployed_vca
:
4914 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4917 vdu_count_index
is not None
4918 and vdu_count_index
!= vca
["vdu_count_index"]
4921 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4923 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4927 # vca_deployed not found
4929 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4930 " is not deployed".format(
4939 ee_id
= vca
.get("ee_id")
4941 "type", "lxc_proxy_charm"
4942 ) # default value for backward compatibility - proxy charm
4945 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4946 "execution environment".format(
4947 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
4950 return ee_id
, vca_type
4952 async def _ns_execute_primitive(
4958 retries_interval
=30,
4965 if primitive
== "config":
4966 primitive_params
= {"params": primitive_params
}
4968 vca_type
= vca_type
or "lxc_proxy_charm"
4972 output
= await asyncio
.wait_for(
4973 self
.vca_map
[vca_type
].exec_primitive(
4975 primitive_name
=primitive
,
4976 params_dict
=primitive_params
,
4977 progress_timeout
=self
.timeout_progress_primitive
,
4978 total_timeout
=self
.timeout_primitive
,
4983 timeout
=timeout
or self
.timeout_primitive
,
4987 except asyncio
.CancelledError
:
4989 except Exception as e
: # asyncio.TimeoutError
4990 if isinstance(e
, asyncio
.TimeoutError
):
4995 "Error executing action {} on {} -> {}".format(
5000 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
5002 return "FAILED", str(e
)
5004 return "COMPLETED", output
5006 except (LcmException
, asyncio
.CancelledError
):
5008 except Exception as e
:
5009 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
5011 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
5013 Updating the vca_status with latest juju information in nsrs record
5014 :param: nsr_id: Id of the nsr
5015 :param: nslcmop_id: Id of the nslcmop
5019 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
5020 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5021 vca_id
= self
.get_vca_id({}, db_nsr
)
5022 if db_nsr
["_admin"]["deployed"]["K8s"]:
5023 for _
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
5024 cluster_uuid
, kdu_instance
, cluster_type
= (
5025 k8s
["k8scluster-uuid"],
5026 k8s
["kdu-instance"],
5027 k8s
["k8scluster-type"],
5029 await self
._on
_update
_k
8s
_db
(
5030 cluster_uuid
=cluster_uuid
,
5031 kdu_instance
=kdu_instance
,
5032 filter={"_id": nsr_id
},
5034 cluster_type
=cluster_type
,
5037 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
5038 table
, filter = "nsrs", {"_id": nsr_id
}
5039 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
5040 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
5042 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
5043 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
5045 async def action(self
, nsr_id
, nslcmop_id
):
5046 # Try to lock HA task here
5047 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5048 if not task_is_locked_by_me
:
5051 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
5052 self
.logger
.debug(logging_text
+ "Enter")
5053 # get all needed from database
5057 db_nslcmop_update
= {}
5058 nslcmop_operation_state
= None
5059 error_description_nslcmop
= None
5062 # wait for any previous tasks in process
5063 step
= "Waiting for previous operations to terminate"
5064 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5066 self
._write
_ns
_status
(
5069 current_operation
="RUNNING ACTION",
5070 current_operation_id
=nslcmop_id
,
5073 step
= "Getting information from database"
5074 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5075 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5076 if db_nslcmop
["operationParams"].get("primitive_params"):
5077 db_nslcmop
["operationParams"]["primitive_params"] = json
.loads(
5078 db_nslcmop
["operationParams"]["primitive_params"]
5081 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5082 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
5083 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
5084 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
5085 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
5086 primitive
= db_nslcmop
["operationParams"]["primitive"]
5087 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
5088 timeout_ns_action
= db_nslcmop
["operationParams"].get(
5089 "timeout_ns_action", self
.timeout_primitive
5093 step
= "Getting vnfr from database"
5094 db_vnfr
= self
.db
.get_one(
5095 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
5097 if db_vnfr
.get("kdur"):
5099 for kdur
in db_vnfr
["kdur"]:
5100 if kdur
.get("additionalParams"):
5101 kdur
["additionalParams"] = json
.loads(
5102 kdur
["additionalParams"]
5104 kdur_list
.append(kdur
)
5105 db_vnfr
["kdur"] = kdur_list
5106 step
= "Getting vnfd from database"
5107 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
5109 # Sync filesystem before running a primitive
5110 self
.fs
.sync(db_vnfr
["vnfd-id"])
5112 step
= "Getting nsd from database"
5113 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
5115 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5116 # for backward compatibility
5117 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5118 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5119 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5120 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5122 # look for primitive
5123 config_primitive_desc
= descriptor_configuration
= None
5125 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
5127 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
5129 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
5131 descriptor_configuration
= db_nsd
.get("ns-configuration")
5133 if descriptor_configuration
and descriptor_configuration
.get(
5136 for config_primitive
in descriptor_configuration
["config-primitive"]:
5137 if config_primitive
["name"] == primitive
:
5138 config_primitive_desc
= config_primitive
5141 if not config_primitive_desc
:
5142 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
5144 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
5148 primitive_name
= primitive
5149 ee_descriptor_id
= None
5151 primitive_name
= config_primitive_desc
.get(
5152 "execution-environment-primitive", primitive
5154 ee_descriptor_id
= config_primitive_desc
.get(
5155 "execution-environment-ref"
5161 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
5163 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
5166 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
5168 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
5170 desc_params
= parse_yaml_strings(
5171 db_vnfr
.get("additionalParamsForVnf")
5174 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
5175 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
5176 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
5178 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
5179 actions
.add(primitive
["name"])
5180 for primitive
in kdu_configuration
.get("config-primitive", []):
5181 actions
.add(primitive
["name"])
5183 nsr_deployed
["K8s"],
5184 lambda kdu
: kdu_name
== kdu
["kdu-name"]
5185 and kdu
["member-vnf-index"] == vnf_index
,
5189 if primitive_name
in actions
5190 and kdu
["k8scluster-type"] not in ("helm-chart", "helm-chart-v3")
5194 # TODO check if ns is in a proper status
5196 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
5198 # kdur and desc_params already set from before
5199 if primitive_params
:
5200 desc_params
.update(primitive_params
)
5201 # TODO Check if we will need something at vnf level
5202 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
5204 kdu_name
== kdu
["kdu-name"]
5205 and kdu
["member-vnf-index"] == vnf_index
5210 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
5213 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
5214 msg
= "unknown k8scluster-type '{}'".format(
5215 kdu
.get("k8scluster-type")
5217 raise LcmException(msg
)
5220 "collection": "nsrs",
5221 "filter": {"_id": nsr_id
},
5222 "path": "_admin.deployed.K8s.{}".format(index
),
5226 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
5228 step
= "Executing kdu {}".format(primitive_name
)
5229 if primitive_name
== "upgrade":
5230 if desc_params
.get("kdu_model"):
5231 kdu_model
= desc_params
.get("kdu_model")
5232 del desc_params
["kdu_model"]
5234 kdu_model
= kdu
.get("kdu-model")
5235 parts
= kdu_model
.split(sep
=":")
5237 kdu_model
= parts
[0]
5239 detailed_status
= await asyncio
.wait_for(
5240 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
5241 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5242 kdu_instance
=kdu
.get("kdu-instance"),
5244 kdu_model
=kdu_model
,
5247 timeout
=timeout_ns_action
,
5249 timeout
=timeout_ns_action
+ 10,
5252 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
5254 elif primitive_name
== "rollback":
5255 detailed_status
= await asyncio
.wait_for(
5256 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
5257 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5258 kdu_instance
=kdu
.get("kdu-instance"),
5261 timeout
=timeout_ns_action
,
5263 elif primitive_name
== "status":
5264 detailed_status
= await asyncio
.wait_for(
5265 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
5266 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5267 kdu_instance
=kdu
.get("kdu-instance"),
5270 timeout
=timeout_ns_action
,
5273 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
5274 kdu
["kdu-name"], nsr_id
5276 params
= self
._map
_primitive
_params
(
5277 config_primitive_desc
, primitive_params
, desc_params
5280 detailed_status
= await asyncio
.wait_for(
5281 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
5282 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5283 kdu_instance
=kdu_instance
,
5284 primitive_name
=primitive_name
,
5287 timeout
=timeout_ns_action
,
5290 timeout
=timeout_ns_action
,
5294 nslcmop_operation_state
= "COMPLETED"
5296 detailed_status
= ""
5297 nslcmop_operation_state
= "FAILED"
5299 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5300 nsr_deployed
["VCA"],
5301 member_vnf_index
=vnf_index
,
5303 vdu_count_index
=vdu_count_index
,
5304 ee_descriptor_id
=ee_descriptor_id
,
5306 for vca_index
, vca_deployed
in enumerate(
5307 db_nsr
["_admin"]["deployed"]["VCA"]
5309 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5311 "collection": "nsrs",
5312 "filter": {"_id": nsr_id
},
5313 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
5317 nslcmop_operation_state
,
5319 ) = await self
._ns
_execute
_primitive
(
5321 primitive
=primitive_name
,
5322 primitive_params
=self
._map
_primitive
_params
(
5323 config_primitive_desc
, primitive_params
, desc_params
5325 timeout
=timeout_ns_action
,
5331 db_nslcmop_update
["detailed-status"] = detailed_status
5332 error_description_nslcmop
= (
5333 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
5337 + " task Done with result {} {}".format(
5338 nslcmop_operation_state
, detailed_status
5341 return # database update is called inside finally
5343 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5344 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5346 except asyncio
.CancelledError
:
5348 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5350 exc
= "Operation was cancelled"
5351 except asyncio
.TimeoutError
:
5352 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5354 except Exception as e
:
5355 exc
= traceback
.format_exc()
5356 self
.logger
.critical(
5357 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5366 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5367 nslcmop_operation_state
= "FAILED"
5369 self
._write
_ns
_status
(
5373 ], # TODO check if degraded. For the moment use previous status
5374 current_operation
="IDLE",
5375 current_operation_id
=None,
5376 # error_description=error_description_nsr,
5377 # error_detail=error_detail,
5378 other_update
=db_nsr_update
,
5381 self
._write
_op
_status
(
5384 error_message
=error_description_nslcmop
,
5385 operation_state
=nslcmop_operation_state
,
5386 other_update
=db_nslcmop_update
,
5389 if nslcmop_operation_state
:
5391 await self
.msg
.aiowrite(
5396 "nslcmop_id": nslcmop_id
,
5397 "operationState": nslcmop_operation_state
,
5401 except Exception as e
:
5403 logging_text
+ "kafka_write notification Exception {}".format(e
)
5405 self
.logger
.debug(logging_text
+ "Exit")
5406 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5407 return nslcmop_operation_state
, detailed_status
5409 async def terminate_vdus(
5410 self
, db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
5412 """This method terminates VDUs
5415 db_vnfr: VNF instance record
5416 member_vnf_index: VNF index to identify the VDUs to be removed
5417 db_nsr: NS instance record
5418 update_db_nslcmops: Nslcmop update record
5420 vca_scaling_info
= []
5421 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5422 scaling_info
["scaling_direction"] = "IN"
5423 scaling_info
["vdu-delete"] = {}
5424 scaling_info
["kdu-delete"] = {}
5425 db_vdur
= db_vnfr
.get("vdur")
5426 vdur_list
= copy(db_vdur
)
5428 for index
, vdu
in enumerate(vdur_list
):
5429 vca_scaling_info
.append(
5431 "osm_vdu_id": vdu
["vdu-id-ref"],
5432 "member-vnf-index": member_vnf_index
,
5434 "vdu_index": count_index
,
5436 scaling_info
["vdu-delete"][vdu
["vdu-id-ref"]] = count_index
5437 scaling_info
["vdu"].append(
5439 "name": vdu
.get("name") or vdu
.get("vdu-name"),
5440 "vdu_id": vdu
["vdu-id-ref"],
5443 for interface
in vdu
["interfaces"]:
5444 scaling_info
["vdu"][index
]["interface"].append(
5446 "name": interface
["name"],
5447 "ip_address": interface
["ip-address"],
5448 "mac_address": interface
.get("mac-address"),
5450 self
.logger
.info("NS update scaling info{}".format(scaling_info
))
5451 stage
[2] = "Terminating VDUs"
5452 if scaling_info
.get("vdu-delete"):
5453 # scale_process = "RO"
5454 if self
.ro_config
.get("ng"):
5455 await self
._scale
_ng
_ro
(
5456 logging_text
, db_nsr
, update_db_nslcmops
, db_vnfr
, scaling_info
, stage
5459 async def remove_vnf(
5460 self
, nsr_id
, nslcmop_id
, vnf_instance_id
5462 """This method is to Remove VNF instances from NS.
5465 nsr_id: NS instance id
5466 nslcmop_id: nslcmop id of update
5467 vnf_instance_id: id of the VNF instance to be removed
5470 result: (str, str) COMPLETED/FAILED, details
5474 logging_text
= "Task ns={} update ".format(nsr_id
)
5475 check_vnfr_count
= len(self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}))
5476 self
.logger
.info("check_vnfr_count {}".format(check_vnfr_count
))
5477 if check_vnfr_count
> 1:
5478 stage
= ["", "", ""]
5479 step
= "Getting nslcmop from database"
5480 self
.logger
.debug(step
+ " after having waited for previous tasks to be completed")
5481 # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5482 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5483 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5484 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5485 """ db_vnfr = self.db.get_one(
5486 "vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id}) """
5488 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5489 await self
.terminate_vdus(db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
)
5491 constituent_vnfr
= db_nsr
.get("constituent-vnfr-ref")
5492 constituent_vnfr
.remove(db_vnfr
.get("_id"))
5493 db_nsr_update
["constituent-vnfr-ref"] = db_nsr
.get("constituent-vnfr-ref")
5494 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5495 self
.db
.del_one("vnfrs", {"_id": db_vnfr
.get("_id")})
5496 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5497 return "COMPLETED", "Done"
5499 step
= "Terminate VNF Failed with"
5500 raise LcmException("{} Cannot terminate the last VNF in this NS.".format(
5502 except (LcmException
, asyncio
.CancelledError
):
5504 except Exception as e
:
5505 self
.logger
.debug("Error removing VNF {}".format(e
))
5506 return "FAILED", "Error removing VNF {}".format(e
)
5508 async def _ns_redeploy_vnf(
5509 self
, nsr_id
, nslcmop_id
, db_vnfd
, db_vnfr
, db_nsr
,
5511 """This method updates and redeploys VNF instances
5514 nsr_id: NS instance id
5515 nslcmop_id: nslcmop id
5516 db_vnfd: VNF descriptor
5517 db_vnfr: VNF instance record
5518 db_nsr: NS instance record
5521 result: (str, str) COMPLETED/FAILED, details
5525 stage
= ["", "", ""]
5526 logging_text
= "Task ns={} update ".format(nsr_id
)
5527 latest_vnfd_revision
= db_vnfd
["_admin"].get("revision")
5528 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5530 # Terminate old VNF resources
5531 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5532 await self
.terminate_vdus(db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
)
5534 # old_vnfd_id = db_vnfr["vnfd-id"]
5535 # new_db_vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
5536 new_db_vnfd
= db_vnfd
5537 # new_vnfd_ref = new_db_vnfd["id"]
5538 # new_vnfd_id = vnfd_id
5542 for cp
in new_db_vnfd
.get("ext-cpd", ()):
5544 "name": cp
.get("id"),
5545 "connection-point-id": cp
.get("int-cpd", {}).get("cpd"),
5546 "connection-point-vdu-id": cp
.get("int-cpd", {}).get("vdu-id"),
5549 new_vnfr_cp
.append(vnf_cp
)
5550 new_vdur
= update_db_nslcmops
["operationParams"]["newVdur"]
5551 # new_vdur = self._create_vdur_descriptor_from_vnfd(db_nsd, db_vnfd, old_db_vnfd, vnfd_id, db_nsr, member_vnf_index)
5552 # new_vnfr_update = {"vnfd-ref": new_vnfd_ref, "vnfd-id": new_vnfd_id, "connection-point": new_vnfr_cp, "vdur": new_vdur, "ip-address": ""}
5553 new_vnfr_update
= {"revision": latest_vnfd_revision
, "connection-point": new_vnfr_cp
, "vdur": new_vdur
, "ip-address": ""}
5554 self
.update_db_2("vnfrs", db_vnfr
["_id"], new_vnfr_update
)
5555 updated_db_vnfr
= self
.db
.get_one(
5556 "vnfrs", {"member-vnf-index-ref": member_vnf_index
, "nsr-id-ref": nsr_id
}
5559 # Instantiate new VNF resources
5560 # update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5561 vca_scaling_info
= []
5562 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5563 scaling_info
["scaling_direction"] = "OUT"
5564 scaling_info
["vdu-create"] = {}
5565 scaling_info
["kdu-create"] = {}
5566 vdud_instantiate_list
= db_vnfd
["vdu"]
5567 for index
, vdud
in enumerate(vdud_instantiate_list
):
5568 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
5572 additional_params
= (
5573 self
._get
_vdu
_additional
_params
(updated_db_vnfr
, vdud
["id"])
5576 cloud_init_list
= []
5578 # TODO Information of its own ip is not available because db_vnfr is not updated.
5579 additional_params
["OSM"] = get_osm_params(
5580 updated_db_vnfr
, vdud
["id"], 1
5582 cloud_init_list
.append(
5583 self
._parse
_cloud
_init
(
5590 vca_scaling_info
.append(
5592 "osm_vdu_id": vdud
["id"],
5593 "member-vnf-index": member_vnf_index
,
5595 "vdu_index": count_index
,
5598 scaling_info
["vdu-create"][vdud
["id"]] = count_index
5599 if self
.ro_config
.get("ng"):
5601 "New Resources to be deployed: {}".format(scaling_info
))
5602 await self
._scale
_ng
_ro
(
5603 logging_text
, db_nsr
, update_db_nslcmops
, updated_db_vnfr
, scaling_info
, stage
5605 return "COMPLETED", "Done"
5606 except (LcmException
, asyncio
.CancelledError
):
5608 except Exception as e
:
5609 self
.logger
.debug("Error updating VNF {}".format(e
))
5610 return "FAILED", "Error updating VNF {}".format(e
)
5612 async def _ns_charm_upgrade(
5618 timeout
: float = None,
5620 """This method upgrade charms in VNF instances
5623 ee_id: Execution environment id
5624 path: Local path to the charm
5626 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
5627 timeout: (Float) Timeout for the ns update operation
5630 result: (str, str) COMPLETED/FAILED, details
5633 charm_type
= charm_type
or "lxc_proxy_charm"
5634 output
= await self
.vca_map
[charm_type
].upgrade_charm(
5638 charm_type
=charm_type
,
5639 timeout
=timeout
or self
.timeout_ns_update
,
5643 return "COMPLETED", output
5645 except (LcmException
, asyncio
.CancelledError
):
5648 except Exception as e
:
5650 self
.logger
.debug("Error upgrading charm {}".format(path
))
5652 return "FAILED", "Error upgrading charm {}: {}".format(path
, e
)
5654 async def update(self
, nsr_id
, nslcmop_id
):
5655 """Update NS according to different update types
5657 This method performs upgrade of VNF instances then updates the revision
5658 number in VNF record
5661 nsr_id: Network service will be updated
5662 nslcmop_id: ns lcm operation id
5665 It may raise DbException, LcmException, N2VCException, K8sException
5668 # Try to lock HA task here
5669 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5670 if not task_is_locked_by_me
:
5673 logging_text
= "Task ns={} update={} ".format(nsr_id
, nslcmop_id
)
5674 self
.logger
.debug(logging_text
+ "Enter")
5676 # Set the required variables to be filled up later
5678 db_nslcmop_update
= {}
5680 nslcmop_operation_state
= None
5682 error_description_nslcmop
= ""
5684 change_type
= "updated"
5685 detailed_status
= ""
5688 # wait for any previous tasks in process
5689 step
= "Waiting for previous operations to terminate"
5690 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5691 self
._write
_ns
_status
(
5694 current_operation
="UPDATING",
5695 current_operation_id
=nslcmop_id
,
5698 step
= "Getting nslcmop from database"
5699 db_nslcmop
= self
.db
.get_one(
5700 "nslcmops", {"_id": nslcmop_id
}, fail_on_empty
=False
5702 update_type
= db_nslcmop
["operationParams"]["updateType"]
5704 step
= "Getting nsr from database"
5705 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5706 old_operational_status
= db_nsr
["operational-status"]
5707 db_nsr_update
["operational-status"] = "updating"
5708 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5709 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5711 if update_type
== "CHANGE_VNFPKG":
5713 # Get the input parameters given through update request
5714 vnf_instance_id
= db_nslcmop
["operationParams"][
5715 "changeVnfPackageData"
5716 ].get("vnfInstanceId")
5718 vnfd_id
= db_nslcmop
["operationParams"]["changeVnfPackageData"].get(
5721 timeout_seconds
= db_nslcmop
["operationParams"].get("timeout_ns_update")
5723 step
= "Getting vnfr from database"
5724 db_vnfr
= self
.db
.get_one(
5725 "vnfrs", {"_id": vnf_instance_id
}, fail_on_empty
=False
5728 step
= "Getting vnfds from database"
5730 latest_vnfd
= self
.db
.get_one(
5731 "vnfds", {"_id": vnfd_id
}, fail_on_empty
=False
5733 latest_vnfd_revision
= latest_vnfd
["_admin"].get("revision")
5736 current_vnf_revision
= db_vnfr
.get("revision", 1)
5737 current_vnfd
= self
.db
.get_one(
5739 {"_id": vnfd_id
+ ":" + str(current_vnf_revision
)},
5740 fail_on_empty
=False,
5742 # Charm artifact paths will be filled up later
5744 current_charm_artifact_path
,
5745 target_charm_artifact_path
,
5746 charm_artifact_paths
,
5749 step
= "Checking if revision has changed in VNFD"
5750 if current_vnf_revision
!= latest_vnfd_revision
:
5752 change_type
= "policy_updated"
5754 # There is new revision of VNFD, update operation is required
5755 current_vnfd_path
= vnfd_id
+ ":" + str(current_vnf_revision
)
5756 latest_vnfd_path
= vnfd_id
+ ":" + str(latest_vnfd_revision
)
5758 step
= "Removing the VNFD packages if they exist in the local path"
5759 shutil
.rmtree(self
.fs
.path
+ current_vnfd_path
, ignore_errors
=True)
5760 shutil
.rmtree(self
.fs
.path
+ latest_vnfd_path
, ignore_errors
=True)
5762 step
= "Get the VNFD packages from FSMongo"
5763 self
.fs
.sync(from_path
=latest_vnfd_path
)
5764 self
.fs
.sync(from_path
=current_vnfd_path
)
5767 "Get the charm-type, charm-id, ee-id if there is deployed VCA"
5769 base_folder
= latest_vnfd
["_admin"]["storage"]
5771 for charm_index
, charm_deployed
in enumerate(
5772 get_iterable(nsr_deployed
, "VCA")
5774 vnf_index
= db_vnfr
.get("member-vnf-index-ref")
5776 # Getting charm-id and charm-type
5777 if charm_deployed
.get("member-vnf-index") == vnf_index
:
5778 charm_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5779 charm_type
= charm_deployed
.get("type")
5782 ee_id
= charm_deployed
.get("ee_id")
5784 step
= "Getting descriptor config"
5785 descriptor_config
= get_configuration(
5786 current_vnfd
, current_vnfd
["id"]
5789 if "execution-environment-list" in descriptor_config
:
5790 ee_list
= descriptor_config
.get(
5791 "execution-environment-list", []
5796 # There could be several charm used in the same VNF
5797 for ee_item
in ee_list
:
5798 if ee_item
.get("juju"):
5800 step
= "Getting charm name"
5801 charm_name
= ee_item
["juju"].get("charm")
5803 step
= "Setting Charm artifact paths"
5804 current_charm_artifact_path
.append(
5805 get_charm_artifact_path(
5809 current_vnf_revision
,
5812 target_charm_artifact_path
.append(
5813 get_charm_artifact_path(
5817 latest_vnfd_revision
,
5821 charm_artifact_paths
= zip(
5822 current_charm_artifact_path
, target_charm_artifact_path
5825 step
= "Checking if software version has changed in VNFD"
5826 if find_software_version(current_vnfd
) != find_software_version(
5830 step
= "Checking if existing VNF has charm"
5831 for current_charm_path
, target_charm_path
in list(
5832 charm_artifact_paths
5834 if current_charm_path
:
5836 "Software version change is not supported as VNF instance {} has charm.".format(
5841 # There is no change in the charm package, then redeploy the VNF
5842 # based on new descriptor
5843 step
= "Redeploying VNF"
5844 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5848 ) = await self
._ns
_redeploy
_vnf
(
5855 if result
== "FAILED":
5856 nslcmop_operation_state
= result
5857 error_description_nslcmop
= detailed_status
5858 db_nslcmop_update
["detailed-status"] = detailed_status
5861 + " step {} Done with result {} {}".format(
5862 step
, nslcmop_operation_state
, detailed_status
5867 step
= "Checking if any charm package has changed or not"
5868 for current_charm_path
, target_charm_path
in list(
5869 charm_artifact_paths
5873 and target_charm_path
5874 and self
.check_charm_hash_changed(
5875 current_charm_path
, target_charm_path
5879 step
= "Checking whether VNF uses juju bundle"
5880 if check_juju_bundle_existence(current_vnfd
):
5883 "Charm upgrade is not supported for the instance which"
5884 " uses juju-bundle: {}".format(
5885 check_juju_bundle_existence(current_vnfd
)
5889 step
= "Upgrading Charm"
5893 ) = await self
._ns
_charm
_upgrade
(
5896 charm_type
=charm_type
,
5897 path
=self
.fs
.path
+ target_charm_path
,
5898 timeout
=timeout_seconds
,
5901 if result
== "FAILED":
5902 nslcmop_operation_state
= result
5903 error_description_nslcmop
= detailed_status
5905 db_nslcmop_update
["detailed-status"] = detailed_status
5908 + " step {} Done with result {} {}".format(
5909 step
, nslcmop_operation_state
, detailed_status
5913 step
= "Updating policies"
5914 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5915 result
= "COMPLETED"
5916 detailed_status
= "Done"
5917 db_nslcmop_update
["detailed-status"] = "Done"
5919 # If nslcmop_operation_state is None, so any operation is not failed.
5920 if not nslcmop_operation_state
:
5921 nslcmop_operation_state
= "COMPLETED"
5923 # If update CHANGE_VNFPKG nslcmop_operation is successful
5924 # vnf revision need to be updated
5925 vnfr_update
["revision"] = latest_vnfd_revision
5926 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
5930 + " task Done with result {} {}".format(
5931 nslcmop_operation_state
, detailed_status
5934 elif update_type
== "REMOVE_VNF":
5935 # This part is included in https://osm.etsi.org/gerrit/11876
5936 vnf_instance_id
= db_nslcmop
["operationParams"]["removeVnfInstanceId"]
5937 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5938 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5939 step
= "Removing VNF"
5940 (result
, detailed_status
) = await self
.remove_vnf(nsr_id
, nslcmop_id
, vnf_instance_id
)
5941 if result
== "FAILED":
5942 nslcmop_operation_state
= result
5943 error_description_nslcmop
= detailed_status
5944 db_nslcmop_update
["detailed-status"] = detailed_status
5945 change_type
= "vnf_terminated"
5946 if not nslcmop_operation_state
:
5947 nslcmop_operation_state
= "COMPLETED"
5950 + " task Done with result {} {}".format(
5951 nslcmop_operation_state
, detailed_status
5955 elif update_type
== "OPERATE_VNF":
5956 vnf_id
= db_nslcmop
["operationParams"]["operateVnfData"]["vnfInstanceId"]
5957 operation_type
= db_nslcmop
["operationParams"]["operateVnfData"]["changeStateTo"]
5958 additional_param
= db_nslcmop
["operationParams"]["operateVnfData"]["additionalParam"]
5959 (result
, detailed_status
) = await self
.rebuild_start_stop(
5960 nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
5962 if result
== "FAILED":
5963 nslcmop_operation_state
= result
5964 error_description_nslcmop
= detailed_status
5965 db_nslcmop_update
["detailed-status"] = detailed_status
5966 if not nslcmop_operation_state
:
5967 nslcmop_operation_state
= "COMPLETED"
5970 + " task Done with result {} {}".format(
5971 nslcmop_operation_state
, detailed_status
5975 # If nslcmop_operation_state is None, so any operation is not failed.
5976 # All operations are executed in overall.
5977 if not nslcmop_operation_state
:
5978 nslcmop_operation_state
= "COMPLETED"
5979 db_nsr_update
["operational-status"] = old_operational_status
5981 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5982 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5984 except asyncio
.CancelledError
:
5986 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5988 exc
= "Operation was cancelled"
5989 except asyncio
.TimeoutError
:
5990 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5992 except Exception as e
:
5993 exc
= traceback
.format_exc()
5994 self
.logger
.critical(
5995 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6004 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
6005 nslcmop_operation_state
= "FAILED"
6006 db_nsr_update
["operational-status"] = old_operational_status
6008 self
._write
_ns
_status
(
6010 ns_state
=db_nsr
["nsState"],
6011 current_operation
="IDLE",
6012 current_operation_id
=None,
6013 other_update
=db_nsr_update
,
6016 self
._write
_op
_status
(
6019 error_message
=error_description_nslcmop
,
6020 operation_state
=nslcmop_operation_state
,
6021 other_update
=db_nslcmop_update
,
6024 if nslcmop_operation_state
:
6028 "nslcmop_id": nslcmop_id
,
6029 "operationState": nslcmop_operation_state
,
6031 if change_type
in ("vnf_terminated", "policy_updated"):
6032 msg
.update({"vnf_member_index": member_vnf_index
})
6033 await self
.msg
.aiowrite("ns", change_type
, msg
, loop
=self
.loop
)
6034 except Exception as e
:
6036 logging_text
+ "kafka_write notification Exception {}".format(e
)
6038 self
.logger
.debug(logging_text
+ "Exit")
6039 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_update")
6040 return nslcmop_operation_state
, detailed_status
6042 async def scale(self
, nsr_id
, nslcmop_id
):
6043 # Try to lock HA task here
6044 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
6045 if not task_is_locked_by_me
:
6048 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
6049 stage
= ["", "", ""]
6050 tasks_dict_info
= {}
6051 # ^ stage, step, VIM progress
6052 self
.logger
.debug(logging_text
+ "Enter")
6053 # get all needed from database
6055 db_nslcmop_update
= {}
6058 # in case of error, indicates what part of scale was failed to put nsr at error status
6059 scale_process
= None
6060 old_operational_status
= ""
6061 old_config_status
= ""
6064 # wait for any previous tasks in process
6065 step
= "Waiting for previous operations to terminate"
6066 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
6067 self
._write
_ns
_status
(
6070 current_operation
="SCALING",
6071 current_operation_id
=nslcmop_id
,
6074 step
= "Getting nslcmop from database"
6076 step
+ " after having waited for previous tasks to be completed"
6078 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
6080 step
= "Getting nsr from database"
6081 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
6082 old_operational_status
= db_nsr
["operational-status"]
6083 old_config_status
= db_nsr
["config-status"]
6085 step
= "Parsing scaling parameters"
6086 db_nsr_update
["operational-status"] = "scaling"
6087 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6088 nsr_deployed
= db_nsr
["_admin"].get("deployed")
6090 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
6092 ]["member-vnf-index"]
6093 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
6095 ]["scaling-group-descriptor"]
6096 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
6097 # for backward compatibility
6098 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
6099 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
6100 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
6101 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6103 step
= "Getting vnfr from database"
6104 db_vnfr
= self
.db
.get_one(
6105 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
6108 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
6110 step
= "Getting vnfd from database"
6111 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
6113 base_folder
= db_vnfd
["_admin"]["storage"]
6115 step
= "Getting scaling-group-descriptor"
6116 scaling_descriptor
= find_in_list(
6117 get_scaling_aspect(db_vnfd
),
6118 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
6120 if not scaling_descriptor
:
6122 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
6123 "at vnfd:scaling-group-descriptor".format(scaling_group
)
6126 step
= "Sending scale order to VIM"
6127 # TODO check if ns is in a proper status
6129 if not db_nsr
["_admin"].get("scaling-group"):
6134 "_admin.scaling-group": [
6135 {"name": scaling_group
, "nb-scale-op": 0}
6139 admin_scale_index
= 0
6141 for admin_scale_index
, admin_scale_info
in enumerate(
6142 db_nsr
["_admin"]["scaling-group"]
6144 if admin_scale_info
["name"] == scaling_group
:
6145 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
6147 else: # not found, set index one plus last element and add new entry with the name
6148 admin_scale_index
+= 1
6150 "_admin.scaling-group.{}.name".format(admin_scale_index
)
6153 vca_scaling_info
= []
6154 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
6155 if scaling_type
== "SCALE_OUT":
6156 if "aspect-delta-details" not in scaling_descriptor
:
6158 "Aspect delta details not fount in scaling descriptor {}".format(
6159 scaling_descriptor
["name"]
6162 # count if max-instance-count is reached
6163 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6165 scaling_info
["scaling_direction"] = "OUT"
6166 scaling_info
["vdu-create"] = {}
6167 scaling_info
["kdu-create"] = {}
6168 for delta
in deltas
:
6169 for vdu_delta
in delta
.get("vdu-delta", {}):
6170 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
6171 # vdu_index also provides the number of instance of the targeted vdu
6172 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6173 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
6177 additional_params
= (
6178 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
6181 cloud_init_list
= []
6183 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6184 max_instance_count
= 10
6185 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
6186 max_instance_count
= vdu_profile
.get(
6187 "max-number-of-instances", 10
6190 default_instance_num
= get_number_of_instances(
6193 instances_number
= vdu_delta
.get("number-of-instances", 1)
6194 nb_scale_op
+= instances_number
6196 new_instance_count
= nb_scale_op
+ default_instance_num
6197 # Control if new count is over max and vdu count is less than max.
6198 # Then assign new instance count
6199 if new_instance_count
> max_instance_count
> vdu_count
:
6200 instances_number
= new_instance_count
- max_instance_count
6202 instances_number
= instances_number
6204 if new_instance_count
> max_instance_count
:
6206 "reached the limit of {} (max-instance-count) "
6207 "scaling-out operations for the "
6208 "scaling-group-descriptor '{}'".format(
6209 nb_scale_op
, scaling_group
6212 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6214 # TODO Information of its own ip is not available because db_vnfr is not updated.
6215 additional_params
["OSM"] = get_osm_params(
6216 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
6218 cloud_init_list
.append(
6219 self
._parse
_cloud
_init
(
6226 vca_scaling_info
.append(
6228 "osm_vdu_id": vdu_delta
["id"],
6229 "member-vnf-index": vnf_index
,
6231 "vdu_index": vdu_index
+ x
,
6234 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
6235 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6236 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6237 kdu_name
= kdu_profile
["kdu-name"]
6238 resource_name
= kdu_profile
.get("resource-name", "")
6240 # Might have different kdus in the same delta
6241 # Should have list for each kdu
6242 if not scaling_info
["kdu-create"].get(kdu_name
, None):
6243 scaling_info
["kdu-create"][kdu_name
] = []
6245 kdur
= get_kdur(db_vnfr
, kdu_name
)
6246 if kdur
.get("helm-chart"):
6247 k8s_cluster_type
= "helm-chart-v3"
6248 self
.logger
.debug("kdur: {}".format(kdur
))
6250 kdur
.get("helm-version")
6251 and kdur
.get("helm-version") == "v2"
6253 k8s_cluster_type
= "helm-chart"
6254 elif kdur
.get("juju-bundle"):
6255 k8s_cluster_type
= "juju-bundle"
6258 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6259 "juju-bundle. Maybe an old NBI version is running".format(
6260 db_vnfr
["member-vnf-index-ref"], kdu_name
6264 max_instance_count
= 10
6265 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
6266 max_instance_count
= kdu_profile
.get(
6267 "max-number-of-instances", 10
6270 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
6271 deployed_kdu
, _
= get_deployed_kdu(
6272 nsr_deployed
, kdu_name
, vnf_index
6274 if deployed_kdu
is None:
6276 "KDU '{}' for vnf '{}' not deployed".format(
6280 kdu_instance
= deployed_kdu
.get("kdu-instance")
6281 instance_num
= await self
.k8scluster_map
[
6287 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6288 kdu_model
=deployed_kdu
.get("kdu-model"),
6290 kdu_replica_count
= instance_num
+ kdu_delta
.get(
6291 "number-of-instances", 1
6294 # Control if new count is over max and instance_num is less than max.
6295 # Then assign max instance number to kdu replica count
6296 if kdu_replica_count
> max_instance_count
> instance_num
:
6297 kdu_replica_count
= max_instance_count
6298 if kdu_replica_count
> max_instance_count
:
6300 "reached the limit of {} (max-instance-count) "
6301 "scaling-out operations for the "
6302 "scaling-group-descriptor '{}'".format(
6303 instance_num
, scaling_group
6307 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6308 vca_scaling_info
.append(
6310 "osm_kdu_id": kdu_name
,
6311 "member-vnf-index": vnf_index
,
6313 "kdu_index": instance_num
+ x
- 1,
6316 scaling_info
["kdu-create"][kdu_name
].append(
6318 "member-vnf-index": vnf_index
,
6320 "k8s-cluster-type": k8s_cluster_type
,
6321 "resource-name": resource_name
,
6322 "scale": kdu_replica_count
,
6325 elif scaling_type
== "SCALE_IN":
6326 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6328 scaling_info
["scaling_direction"] = "IN"
6329 scaling_info
["vdu-delete"] = {}
6330 scaling_info
["kdu-delete"] = {}
6332 for delta
in deltas
:
6333 for vdu_delta
in delta
.get("vdu-delta", {}):
6334 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6335 min_instance_count
= 0
6336 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6337 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
6338 min_instance_count
= vdu_profile
["min-number-of-instances"]
6340 default_instance_num
= get_number_of_instances(
6341 db_vnfd
, vdu_delta
["id"]
6343 instance_num
= vdu_delta
.get("number-of-instances", 1)
6344 nb_scale_op
-= instance_num
6346 new_instance_count
= nb_scale_op
+ default_instance_num
6348 if new_instance_count
< min_instance_count
< vdu_count
:
6349 instances_number
= min_instance_count
- new_instance_count
6351 instances_number
= instance_num
6353 if new_instance_count
< min_instance_count
:
6355 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6356 "scaling-group-descriptor '{}'".format(
6357 nb_scale_op
, scaling_group
6360 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6361 vca_scaling_info
.append(
6363 "osm_vdu_id": vdu_delta
["id"],
6364 "member-vnf-index": vnf_index
,
6366 "vdu_index": vdu_index
- 1 - x
,
6369 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
6370 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6371 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6372 kdu_name
= kdu_profile
["kdu-name"]
6373 resource_name
= kdu_profile
.get("resource-name", "")
6375 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
6376 scaling_info
["kdu-delete"][kdu_name
] = []
6378 kdur
= get_kdur(db_vnfr
, kdu_name
)
6379 if kdur
.get("helm-chart"):
6380 k8s_cluster_type
= "helm-chart-v3"
6381 self
.logger
.debug("kdur: {}".format(kdur
))
6383 kdur
.get("helm-version")
6384 and kdur
.get("helm-version") == "v2"
6386 k8s_cluster_type
= "helm-chart"
6387 elif kdur
.get("juju-bundle"):
6388 k8s_cluster_type
= "juju-bundle"
6391 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6392 "juju-bundle. Maybe an old NBI version is running".format(
6393 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
6397 min_instance_count
= 0
6398 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
6399 min_instance_count
= kdu_profile
["min-number-of-instances"]
6401 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
6402 deployed_kdu
, _
= get_deployed_kdu(
6403 nsr_deployed
, kdu_name
, vnf_index
6405 if deployed_kdu
is None:
6407 "KDU '{}' for vnf '{}' not deployed".format(
6411 kdu_instance
= deployed_kdu
.get("kdu-instance")
6412 instance_num
= await self
.k8scluster_map
[
6418 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6419 kdu_model
=deployed_kdu
.get("kdu-model"),
6421 kdu_replica_count
= instance_num
- kdu_delta
.get(
6422 "number-of-instances", 1
6425 if kdu_replica_count
< min_instance_count
< instance_num
:
6426 kdu_replica_count
= min_instance_count
6427 if kdu_replica_count
< min_instance_count
:
6429 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6430 "scaling-group-descriptor '{}'".format(
6431 instance_num
, scaling_group
6435 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6436 vca_scaling_info
.append(
6438 "osm_kdu_id": kdu_name
,
6439 "member-vnf-index": vnf_index
,
6441 "kdu_index": instance_num
- x
- 1,
6444 scaling_info
["kdu-delete"][kdu_name
].append(
6446 "member-vnf-index": vnf_index
,
6448 "k8s-cluster-type": k8s_cluster_type
,
6449 "resource-name": resource_name
,
6450 "scale": kdu_replica_count
,
6454 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
6455 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
6456 if scaling_info
["scaling_direction"] == "IN":
6457 for vdur
in reversed(db_vnfr
["vdur"]):
6458 if vdu_delete
.get(vdur
["vdu-id-ref"]):
6459 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
6460 scaling_info
["vdu"].append(
6462 "name": vdur
.get("name") or vdur
.get("vdu-name"),
6463 "vdu_id": vdur
["vdu-id-ref"],
6467 for interface
in vdur
["interfaces"]:
6468 scaling_info
["vdu"][-1]["interface"].append(
6470 "name": interface
["name"],
6471 "ip_address": interface
["ip-address"],
6472 "mac_address": interface
.get("mac-address"),
6475 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
6478 step
= "Executing pre-scale vnf-config-primitive"
6479 if scaling_descriptor
.get("scaling-config-action"):
6480 for scaling_config_action
in scaling_descriptor
[
6481 "scaling-config-action"
6484 scaling_config_action
.get("trigger") == "pre-scale-in"
6485 and scaling_type
== "SCALE_IN"
6487 scaling_config_action
.get("trigger") == "pre-scale-out"
6488 and scaling_type
== "SCALE_OUT"
6490 vnf_config_primitive
= scaling_config_action
[
6491 "vnf-config-primitive-name-ref"
6493 step
= db_nslcmop_update
[
6495 ] = "executing pre-scale scaling-config-action '{}'".format(
6496 vnf_config_primitive
6499 # look for primitive
6500 for config_primitive
in (
6501 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6502 ).get("config-primitive", ()):
6503 if config_primitive
["name"] == vnf_config_primitive
:
6507 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
6508 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
6509 "primitive".format(scaling_group
, vnf_config_primitive
)
6512 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6513 if db_vnfr
.get("additionalParamsForVnf"):
6514 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6516 scale_process
= "VCA"
6517 db_nsr_update
["config-status"] = "configuring pre-scaling"
6518 primitive_params
= self
._map
_primitive
_params
(
6519 config_primitive
, {}, vnfr_params
6522 # Pre-scale retry check: Check if this sub-operation has been executed before
6523 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6526 vnf_config_primitive
,
6530 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6531 # Skip sub-operation
6532 result
= "COMPLETED"
6533 result_detail
= "Done"
6536 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6537 vnf_config_primitive
, result
, result_detail
6541 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6542 # New sub-operation: Get index of this sub-operation
6544 len(db_nslcmop
.get("_admin", {}).get("operations"))
6549 + "vnf_config_primitive={} New sub-operation".format(
6550 vnf_config_primitive
6554 # retry: Get registered params for this existing sub-operation
6555 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6558 vnf_index
= op
.get("member_vnf_index")
6559 vnf_config_primitive
= op
.get("primitive")
6560 primitive_params
= op
.get("primitive_params")
6563 + "vnf_config_primitive={} Sub-operation retry".format(
6564 vnf_config_primitive
6567 # Execute the primitive, either with new (first-time) or registered (reintent) args
6568 ee_descriptor_id
= config_primitive
.get(
6569 "execution-environment-ref"
6571 primitive_name
= config_primitive
.get(
6572 "execution-environment-primitive", vnf_config_primitive
6574 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6575 nsr_deployed
["VCA"],
6576 member_vnf_index
=vnf_index
,
6578 vdu_count_index
=None,
6579 ee_descriptor_id
=ee_descriptor_id
,
6581 result
, result_detail
= await self
._ns
_execute
_primitive
(
6590 + "vnf_config_primitive={} Done with result {} {}".format(
6591 vnf_config_primitive
, result
, result_detail
6594 # Update operationState = COMPLETED | FAILED
6595 self
._update
_suboperation
_status
(
6596 db_nslcmop
, op_index
, result
, result_detail
6599 if result
== "FAILED":
6600 raise LcmException(result_detail
)
6601 db_nsr_update
["config-status"] = old_config_status
6602 scale_process
= None
6606 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
6609 "_admin.scaling-group.{}.time".format(admin_scale_index
)
6612 # SCALE-IN VCA - BEGIN
6613 if vca_scaling_info
:
6614 step
= db_nslcmop_update
[
6616 ] = "Deleting the execution environments"
6617 scale_process
= "VCA"
6618 for vca_info
in vca_scaling_info
:
6619 if vca_info
["type"] == "delete" and not vca_info
.get("osm_kdu_id"):
6620 member_vnf_index
= str(vca_info
["member-vnf-index"])
6622 logging_text
+ "vdu info: {}".format(vca_info
)
6624 if vca_info
.get("osm_vdu_id"):
6625 vdu_id
= vca_info
["osm_vdu_id"]
6626 vdu_index
= int(vca_info
["vdu_index"])
6629 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6630 member_vnf_index
, vdu_id
, vdu_index
6632 stage
[2] = step
= "Scaling in VCA"
6633 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6634 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
6635 config_update
= db_nsr
["configurationStatus"]
6636 for vca_index
, vca
in enumerate(vca_update
):
6638 (vca
or vca
.get("ee_id"))
6639 and vca
["member-vnf-index"] == member_vnf_index
6640 and vca
["vdu_count_index"] == vdu_index
6642 if vca
.get("vdu_id"):
6643 config_descriptor
= get_configuration(
6644 db_vnfd
, vca
.get("vdu_id")
6646 elif vca
.get("kdu_name"):
6647 config_descriptor
= get_configuration(
6648 db_vnfd
, vca
.get("kdu_name")
6651 config_descriptor
= get_configuration(
6652 db_vnfd
, db_vnfd
["id"]
6654 operation_params
= (
6655 db_nslcmop
.get("operationParams") or {}
6657 exec_terminate_primitives
= not operation_params
.get(
6658 "skip_terminate_primitives"
6659 ) and vca
.get("needed_terminate")
6660 task
= asyncio
.ensure_future(
6669 exec_primitives
=exec_terminate_primitives
,
6673 timeout
=self
.timeout_charm_delete
,
6676 tasks_dict_info
[task
] = "Terminating VCA {}".format(
6679 del vca_update
[vca_index
]
6680 del config_update
[vca_index
]
6681 # wait for pending tasks of terminate primitives
6685 + "Waiting for tasks {}".format(
6686 list(tasks_dict_info
.keys())
6689 error_list
= await self
._wait
_for
_tasks
(
6693 self
.timeout_charm_delete
, self
.timeout_ns_terminate
6698 tasks_dict_info
.clear()
6700 raise LcmException("; ".join(error_list
))
6702 db_vca_and_config_update
= {
6703 "_admin.deployed.VCA": vca_update
,
6704 "configurationStatus": config_update
,
6707 "nsrs", db_nsr
["_id"], db_vca_and_config_update
6709 scale_process
= None
6710 # SCALE-IN VCA - END
6713 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
6714 scale_process
= "RO"
6715 if self
.ro_config
.get("ng"):
6716 await self
._scale
_ng
_ro
(
6717 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
6719 scaling_info
.pop("vdu-create", None)
6720 scaling_info
.pop("vdu-delete", None)
6722 scale_process
= None
6726 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
6727 scale_process
= "KDU"
6728 await self
._scale
_kdu
(
6729 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6731 scaling_info
.pop("kdu-create", None)
6732 scaling_info
.pop("kdu-delete", None)
6734 scale_process
= None
6738 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6740 # SCALE-UP VCA - BEGIN
6741 if vca_scaling_info
:
6742 step
= db_nslcmop_update
[
6744 ] = "Creating new execution environments"
6745 scale_process
= "VCA"
6746 for vca_info
in vca_scaling_info
:
6747 if vca_info
["type"] == "create" and not vca_info
.get("osm_kdu_id"):
6748 member_vnf_index
= str(vca_info
["member-vnf-index"])
6750 logging_text
+ "vdu info: {}".format(vca_info
)
6752 vnfd_id
= db_vnfr
["vnfd-ref"]
6753 if vca_info
.get("osm_vdu_id"):
6754 vdu_index
= int(vca_info
["vdu_index"])
6755 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
6756 if db_vnfr
.get("additionalParamsForVnf"):
6757 deploy_params
.update(
6759 db_vnfr
["additionalParamsForVnf"].copy()
6762 descriptor_config
= get_configuration(
6763 db_vnfd
, db_vnfd
["id"]
6765 if descriptor_config
:
6770 logging_text
=logging_text
6771 + "member_vnf_index={} ".format(member_vnf_index
),
6774 nslcmop_id
=nslcmop_id
,
6780 member_vnf_index
=member_vnf_index
,
6781 vdu_index
=vdu_index
,
6783 deploy_params
=deploy_params
,
6784 descriptor_config
=descriptor_config
,
6785 base_folder
=base_folder
,
6786 task_instantiation_info
=tasks_dict_info
,
6789 vdu_id
= vca_info
["osm_vdu_id"]
6790 vdur
= find_in_list(
6791 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
6793 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
6794 if vdur
.get("additionalParams"):
6795 deploy_params_vdu
= parse_yaml_strings(
6796 vdur
["additionalParams"]
6799 deploy_params_vdu
= deploy_params
6800 deploy_params_vdu
["OSM"] = get_osm_params(
6801 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
6803 if descriptor_config
:
6808 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6809 member_vnf_index
, vdu_id
, vdu_index
6811 stage
[2] = step
= "Scaling out VCA"
6812 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6814 logging_text
=logging_text
6815 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6816 member_vnf_index
, vdu_id
, vdu_index
6820 nslcmop_id
=nslcmop_id
,
6826 member_vnf_index
=member_vnf_index
,
6827 vdu_index
=vdu_index
,
6829 deploy_params
=deploy_params_vdu
,
6830 descriptor_config
=descriptor_config
,
6831 base_folder
=base_folder
,
6832 task_instantiation_info
=tasks_dict_info
,
6835 # SCALE-UP VCA - END
6836 scale_process
= None
6839 # execute primitive service POST-SCALING
6840 step
= "Executing post-scale vnf-config-primitive"
6841 if scaling_descriptor
.get("scaling-config-action"):
6842 for scaling_config_action
in scaling_descriptor
[
6843 "scaling-config-action"
6846 scaling_config_action
.get("trigger") == "post-scale-in"
6847 and scaling_type
== "SCALE_IN"
6849 scaling_config_action
.get("trigger") == "post-scale-out"
6850 and scaling_type
== "SCALE_OUT"
6852 vnf_config_primitive
= scaling_config_action
[
6853 "vnf-config-primitive-name-ref"
6855 step
= db_nslcmop_update
[
6857 ] = "executing post-scale scaling-config-action '{}'".format(
6858 vnf_config_primitive
6861 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6862 if db_vnfr
.get("additionalParamsForVnf"):
6863 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6865 # look for primitive
6866 for config_primitive
in (
6867 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6868 ).get("config-primitive", ()):
6869 if config_primitive
["name"] == vnf_config_primitive
:
6873 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
6874 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
6875 "config-primitive".format(
6876 scaling_group
, vnf_config_primitive
6879 scale_process
= "VCA"
6880 db_nsr_update
["config-status"] = "configuring post-scaling"
6881 primitive_params
= self
._map
_primitive
_params
(
6882 config_primitive
, {}, vnfr_params
6885 # Post-scale retry check: Check if this sub-operation has been executed before
6886 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6889 vnf_config_primitive
,
6893 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6894 # Skip sub-operation
6895 result
= "COMPLETED"
6896 result_detail
= "Done"
6899 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6900 vnf_config_primitive
, result
, result_detail
6904 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6905 # New sub-operation: Get index of this sub-operation
6907 len(db_nslcmop
.get("_admin", {}).get("operations"))
6912 + "vnf_config_primitive={} New sub-operation".format(
6913 vnf_config_primitive
6917 # retry: Get registered params for this existing sub-operation
6918 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6921 vnf_index
= op
.get("member_vnf_index")
6922 vnf_config_primitive
= op
.get("primitive")
6923 primitive_params
= op
.get("primitive_params")
6926 + "vnf_config_primitive={} Sub-operation retry".format(
6927 vnf_config_primitive
6930 # Execute the primitive, either with new (first-time) or registered (reintent) args
6931 ee_descriptor_id
= config_primitive
.get(
6932 "execution-environment-ref"
6934 primitive_name
= config_primitive
.get(
6935 "execution-environment-primitive", vnf_config_primitive
6937 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6938 nsr_deployed
["VCA"],
6939 member_vnf_index
=vnf_index
,
6941 vdu_count_index
=None,
6942 ee_descriptor_id
=ee_descriptor_id
,
6944 result
, result_detail
= await self
._ns
_execute
_primitive
(
6953 + "vnf_config_primitive={} Done with result {} {}".format(
6954 vnf_config_primitive
, result
, result_detail
6957 # Update operationState = COMPLETED | FAILED
6958 self
._update
_suboperation
_status
(
6959 db_nslcmop
, op_index
, result
, result_detail
6962 if result
== "FAILED":
6963 raise LcmException(result_detail
)
6964 db_nsr_update
["config-status"] = old_config_status
6965 scale_process
= None
6970 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
6971 db_nsr_update
["operational-status"] = (
6973 if old_operational_status
== "failed"
6974 else old_operational_status
6976 db_nsr_update
["config-status"] = old_config_status
6979 ROclient
.ROClientException
,
6984 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
6986 except asyncio
.CancelledError
:
6988 logging_text
+ "Cancelled Exception while '{}'".format(step
)
6990 exc
= "Operation was cancelled"
6991 except Exception as e
:
6992 exc
= traceback
.format_exc()
6993 self
.logger
.critical(
6994 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6998 self
._write
_ns
_status
(
7001 current_operation
="IDLE",
7002 current_operation_id
=None,
7005 stage
[1] = "Waiting for instantiate pending tasks."
7006 self
.logger
.debug(logging_text
+ stage
[1])
7007 exc
= await self
._wait
_for
_tasks
(
7010 self
.timeout_ns_deploy
,
7018 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7019 nslcmop_operation_state
= "FAILED"
7021 db_nsr_update
["operational-status"] = old_operational_status
7022 db_nsr_update
["config-status"] = old_config_status
7023 db_nsr_update
["detailed-status"] = ""
7025 if "VCA" in scale_process
:
7026 db_nsr_update
["config-status"] = "failed"
7027 if "RO" in scale_process
:
7028 db_nsr_update
["operational-status"] = "failed"
7031 ] = "FAILED scaling nslcmop={} {}: {}".format(
7032 nslcmop_id
, step
, exc
7035 error_description_nslcmop
= None
7036 nslcmop_operation_state
= "COMPLETED"
7037 db_nslcmop_update
["detailed-status"] = "Done"
7039 self
._write
_op
_status
(
7042 error_message
=error_description_nslcmop
,
7043 operation_state
=nslcmop_operation_state
,
7044 other_update
=db_nslcmop_update
,
7047 self
._write
_ns
_status
(
7050 current_operation
="IDLE",
7051 current_operation_id
=None,
7052 other_update
=db_nsr_update
,
7055 if nslcmop_operation_state
:
7059 "nslcmop_id": nslcmop_id
,
7060 "operationState": nslcmop_operation_state
,
7062 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
7063 except Exception as e
:
7065 logging_text
+ "kafka_write notification Exception {}".format(e
)
7067 self
.logger
.debug(logging_text
+ "Exit")
7068 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
7070 async def _scale_kdu(
7071 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
7073 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
7074 for kdu_name
in _scaling_info
:
7075 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
7076 deployed_kdu
, index
= get_deployed_kdu(
7077 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
7079 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
7080 kdu_instance
= deployed_kdu
["kdu-instance"]
7081 kdu_model
= deployed_kdu
.get("kdu-model")
7082 scale
= int(kdu_scaling_info
["scale"])
7083 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
7086 "collection": "nsrs",
7087 "filter": {"_id": nsr_id
},
7088 "path": "_admin.deployed.K8s.{}".format(index
),
7091 step
= "scaling application {}".format(
7092 kdu_scaling_info
["resource-name"]
7094 self
.logger
.debug(logging_text
+ step
)
7096 if kdu_scaling_info
["type"] == "delete":
7097 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7100 and kdu_config
.get("terminate-config-primitive")
7101 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7103 terminate_config_primitive_list
= kdu_config
.get(
7104 "terminate-config-primitive"
7106 terminate_config_primitive_list
.sort(
7107 key
=lambda val
: int(val
["seq"])
7111 terminate_config_primitive
7112 ) in terminate_config_primitive_list
:
7113 primitive_params_
= self
._map
_primitive
_params
(
7114 terminate_config_primitive
, {}, {}
7116 step
= "execute terminate config primitive"
7117 self
.logger
.debug(logging_text
+ step
)
7118 await asyncio
.wait_for(
7119 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7120 cluster_uuid
=cluster_uuid
,
7121 kdu_instance
=kdu_instance
,
7122 primitive_name
=terminate_config_primitive
["name"],
7123 params
=primitive_params_
,
7130 await asyncio
.wait_for(
7131 self
.k8scluster_map
[k8s_cluster_type
].scale(
7134 kdu_scaling_info
["resource-name"],
7136 cluster_uuid
=cluster_uuid
,
7137 kdu_model
=kdu_model
,
7141 timeout
=self
.timeout_vca_on_error
,
7144 if kdu_scaling_info
["type"] == "create":
7145 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7148 and kdu_config
.get("initial-config-primitive")
7149 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7151 initial_config_primitive_list
= kdu_config
.get(
7152 "initial-config-primitive"
7154 initial_config_primitive_list
.sort(
7155 key
=lambda val
: int(val
["seq"])
7158 for initial_config_primitive
in initial_config_primitive_list
:
7159 primitive_params_
= self
._map
_primitive
_params
(
7160 initial_config_primitive
, {}, {}
7162 step
= "execute initial config primitive"
7163 self
.logger
.debug(logging_text
+ step
)
7164 await asyncio
.wait_for(
7165 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7166 cluster_uuid
=cluster_uuid
,
7167 kdu_instance
=kdu_instance
,
7168 primitive_name
=initial_config_primitive
["name"],
7169 params
=primitive_params_
,
7176 async def _scale_ng_ro(
7177 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
7179 nsr_id
= db_nslcmop
["nsInstanceId"]
7180 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7183 # read from db: vnfd's for every vnf
7186 # for each vnf in ns, read vnfd
7187 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
7188 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
7189 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
7190 # if we haven't this vnfd, read it from db
7191 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
7193 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7194 db_vnfds
.append(vnfd
)
7195 n2vc_key
= self
.n2vc
.get_public_key()
7196 n2vc_key_list
= [n2vc_key
]
7199 vdu_scaling_info
.get("vdu-create"),
7200 vdu_scaling_info
.get("vdu-delete"),
7203 # db_vnfr has been updated, update db_vnfrs to use it
7204 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
7205 await self
._instantiate
_ng
_ro
(
7215 start_deploy
=time(),
7216 timeout_ns_deploy
=self
.timeout_ns_deploy
,
7218 if vdu_scaling_info
.get("vdu-delete"):
7220 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
7223 async def extract_prometheus_scrape_jobs(
7224 self
, ee_id
, artifact_path
, ee_config_descriptor
, vnfr_id
, nsr_id
, target_ip
7226 # look if exist a file called 'prometheus*.j2' and
7227 artifact_content
= self
.fs
.dir_ls(artifact_path
)
7231 for f
in artifact_content
7232 if f
.startswith("prometheus") and f
.endswith(".j2")
7238 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
7242 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
7243 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
7245 vnfr_id
= vnfr_id
.replace("-", "")
7247 "JOB_NAME": vnfr_id
,
7248 "TARGET_IP": target_ip
,
7249 "EXPORTER_POD_IP": host_name
,
7250 "EXPORTER_POD_PORT": host_port
,
7252 job_list
= parse_job(job_data
, variables
)
7253 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
7254 for job
in job_list
:
7256 not isinstance(job
.get("job_name"), str)
7257 or vnfr_id
not in job
["job_name"]
7259 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
7260 job
["nsr_id"] = nsr_id
7261 job
["vnfr_id"] = vnfr_id
7264 async def rebuild_start_stop(self
, nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
):
7265 logging_text
= "Task ns={} {}={} ".format(nsr_id
, operation_type
, nslcmop_id
)
7266 self
.logger
.info(logging_text
+ "Enter")
7267 stage
= ["Preparing the environment", ""]
7268 # database nsrs record
7272 # in case of error, indicates what part of scale was failed to put nsr at error status
7273 start_deploy
= time()
7275 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_id
})
7276 vim_account_id
= db_vnfr
.get("vim-account-id")
7277 vim_info_key
= "vim:" + vim_account_id
7278 vdur
= find_in_list(
7279 db_vnfr
["vdur"], lambda vdu
: vdu
["count-index"] == additional_param
["count-index"]
7282 vdu_vim_name
= vdur
["name"]
7283 vim_vm_id
= vdur
["vim_info"][vim_info_key
]["vim_id"]
7284 target_vim
, _
= next(k_v
for k_v
in vdur
["vim_info"].items())
7285 self
.logger
.info("vdu_vim_name >> {} ".format(vdu_vim_name
))
7286 # wait for any previous tasks in process
7287 stage
[1] = "Waiting for previous operations to terminate"
7288 self
.logger
.info(stage
[1])
7289 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
7291 stage
[1] = "Reading from database."
7292 self
.logger
.info(stage
[1])
7293 self
._write
_ns
_status
(
7296 current_operation
=operation_type
.upper(),
7297 current_operation_id
=nslcmop_id
7299 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7302 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
7303 db_nsr_update
["operational-status"] = operation_type
7304 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7308 "vim_vm_id": vim_vm_id
,
7310 "vdu_index": additional_param
["count-index"],
7311 "vdu_id": vdur
["id"],
7312 "target_vim": target_vim
,
7313 "vim_account_id": vim_account_id
7316 stage
[1] = "Sending rebuild request to RO... {}".format(desc
)
7317 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7318 self
.logger
.info("ro nsr id: {}".format(nsr_id
))
7319 result_dict
= await self
.RO
.operate(nsr_id
, desc
, operation_type
)
7320 self
.logger
.info("response from RO: {}".format(result_dict
))
7321 action_id
= result_dict
["action_id"]
7322 await self
._wait
_ng
_ro
(
7323 nsr_id
, action_id
, nslcmop_id
, start_deploy
, self
.timeout_operate
7325 return "COMPLETED", "Done"
7326 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7327 self
.logger
.error("Exit Exception {}".format(e
))
7329 except asyncio
.CancelledError
:
7330 self
.logger
.error("Cancelled Exception while '{}'".format(stage
))
7331 exc
= "Operation was cancelled"
7332 except Exception as e
:
7333 exc
= traceback
.format_exc()
7334 self
.logger
.critical("Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
7335 return "FAILED", "Error in operate VNF {}".format(exc
)
7337 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7339 Get VCA Cloud and VCA Cloud Credentials for the VIM account
7341 :param: vim_account_id: VIM Account ID
7343 :return: (cloud_name, cloud_credential)
7345 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7346 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
7348 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7350 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
7352 :param: vim_account_id: VIM Account ID
7354 :return: (cloud_name, cloud_credential)
7356 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7357 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")
7359 async def migrate(self
, nsr_id
, nslcmop_id
):
7361 Migrate VNFs and VDUs instances in a NS
7363 :param: nsr_id: NS Instance ID
7364 :param: nslcmop_id: nslcmop ID of migrate
7367 # Try to lock HA task here
7368 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7369 if not task_is_locked_by_me
:
7371 logging_text
= "Task ns={} migrate ".format(nsr_id
)
7372 self
.logger
.debug(logging_text
+ "Enter")
7373 # get all needed from database
7375 db_nslcmop_update
= {}
7376 nslcmop_operation_state
= None
7380 # in case of error, indicates what part of scale was failed to put nsr at error status
7381 start_deploy
= time()
7384 # wait for any previous tasks in process
7385 step
= "Waiting for previous operations to terminate"
7386 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7388 self
._write
_ns
_status
(
7391 current_operation
="MIGRATING",
7392 current_operation_id
=nslcmop_id
,
7394 step
= "Getting nslcmop from database"
7396 step
+ " after having waited for previous tasks to be completed"
7398 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7399 migrate_params
= db_nslcmop
.get("operationParams")
7402 target
.update(migrate_params
)
7403 desc
= await self
.RO
.migrate(nsr_id
, target
)
7404 self
.logger
.debug("RO return > {}".format(desc
))
7405 action_id
= desc
["action_id"]
7406 await self
._wait
_ng
_ro
(
7407 nsr_id
, action_id
, nslcmop_id
, start_deploy
, self
.timeout_migrate
,
7410 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7411 self
.logger
.error("Exit Exception {}".format(e
))
7413 except asyncio
.CancelledError
:
7414 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
7415 exc
= "Operation was cancelled"
7416 except Exception as e
:
7417 exc
= traceback
.format_exc()
7418 self
.logger
.critical(
7419 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7422 self
._write
_ns
_status
(
7425 current_operation
="IDLE",
7426 current_operation_id
=None,
7429 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
7430 nslcmop_operation_state
= "FAILED"
7432 nslcmop_operation_state
= "COMPLETED"
7433 db_nslcmop_update
["detailed-status"] = "Done"
7434 db_nsr_update
["detailed-status"] = "Done"
7436 self
._write
_op
_status
(
7440 operation_state
=nslcmop_operation_state
,
7441 other_update
=db_nslcmop_update
,
7443 if nslcmop_operation_state
:
7447 "nslcmop_id": nslcmop_id
,
7448 "operationState": nslcmop_operation_state
,
7450 await self
.msg
.aiowrite("ns", "migrated", msg
, loop
=self
.loop
)
7451 except Exception as e
:
7453 logging_text
+ "kafka_write notification Exception {}".format(e
)
7455 self
.logger
.debug(logging_text
+ "Exit")
7456 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_migrate")
7459 async def heal(self
, nsr_id
, nslcmop_id
):
7463 :param nsr_id: ns instance to heal
7464 :param nslcmop_id: operation to run
7468 # Try to lock HA task here
7469 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7470 if not task_is_locked_by_me
:
7473 logging_text
= "Task ns={} heal={} ".format(nsr_id
, nslcmop_id
)
7474 stage
= ["", "", ""]
7475 tasks_dict_info
= {}
7476 # ^ stage, step, VIM progress
7477 self
.logger
.debug(logging_text
+ "Enter")
7478 # get all needed from database
7480 db_nslcmop_update
= {}
7482 db_vnfrs
= {} # vnf's info indexed by _id
7484 old_operational_status
= ""
7485 old_config_status
= ""
7488 # wait for any previous tasks in process
7489 step
= "Waiting for previous operations to terminate"
7490 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7491 self
._write
_ns
_status
(
7494 current_operation
="HEALING",
7495 current_operation_id
=nslcmop_id
,
7498 step
= "Getting nslcmop from database"
7500 step
+ " after having waited for previous tasks to be completed"
7502 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7504 step
= "Getting nsr from database"
7505 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
7506 old_operational_status
= db_nsr
["operational-status"]
7507 old_config_status
= db_nsr
["config-status"]
7510 "_admin.deployed.RO.operational-status": "healing",
7512 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7514 step
= "Sending heal order to VIM"
7515 task_ro
= asyncio
.ensure_future(
7517 logging_text
=logging_text
,
7519 db_nslcmop
=db_nslcmop
,
7523 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "heal_RO", task_ro
)
7524 tasks_dict_info
[task_ro
] = "Healing at VIM"
7528 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
7529 self
.logger
.debug(logging_text
+ stage
[1])
7530 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7531 self
.fs
.sync(db_nsr
["nsd-id"])
7533 # read from db: vnfr's of this ns
7534 step
= "Getting vnfrs from db"
7535 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
7536 for vnfr
in db_vnfrs_list
:
7537 db_vnfrs
[vnfr
["_id"]] = vnfr
7538 self
.logger
.debug("ns.heal db_vnfrs={}".format(db_vnfrs
))
7540 # Check for each target VNF
7541 target_list
= db_nslcmop
.get("operationParams", {}).get("healVnfData", {})
7542 for target_vnf
in target_list
:
7543 # Find this VNF in the list from DB
7544 vnfr_id
= target_vnf
.get("vnfInstanceId", None)
7546 db_vnfr
= db_vnfrs
[vnfr_id
]
7547 vnfd_id
= db_vnfr
.get("vnfd-id")
7548 vnfd_ref
= db_vnfr
.get("vnfd-ref")
7549 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7550 base_folder
= vnfd
["_admin"]["storage"]
7555 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
7556 member_vnf_index
= db_vnfr
.get("member-vnf-index-ref")
7558 # Check each target VDU and deploy N2VC
7559 for target_vdu
in target_vnf
["additionalParams"].get("vdu", None):
7560 deploy_params_vdu
= target_vdu
7561 # Set run-day1 vnf level value if not vdu level value exists
7562 if not deploy_params_vdu
.get("run-day1") and target_vnf
["additionalParams"].get("run-day1"):
7563 deploy_params_vdu
["run-day1"] = target_vnf
["additionalParams"].get("run-day1")
7564 vdu_name
= target_vdu
.get("vdu-id", None)
7565 # TODO: Get vdu_id from vdud.
7567 # For multi instance VDU count-index is mandatory
7568 # For single session VDU count-indes is 0
7569 vdu_index
= target_vdu
.get("count-index",0)
7571 # n2vc_redesign STEP 3 to 6 Deploy N2VC
7572 stage
[1] = "Deploying Execution Environments."
7573 self
.logger
.debug(logging_text
+ stage
[1])
7575 # VNF Level charm. Normal case when proxy charms.
7576 # If target instance is management machine continue with actions: recreate EE for native charms or reinject juju key for proxy charms.
7577 descriptor_config
= get_configuration(vnfd
, vnfd_ref
)
7578 if descriptor_config
:
7579 # Continue if healed machine is management machine
7580 vnf_ip_address
= db_vnfr
.get("ip-address")
7581 target_instance
= None
7582 for instance
in db_vnfr
.get("vdur", None):
7583 if ( instance
["vdu-name"] == vdu_name
and instance
["count-index"] == vdu_index
):
7584 target_instance
= instance
7586 if vnf_ip_address
== target_instance
.get("ip-address"):
7588 logging_text
=logging_text
7589 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7590 member_vnf_index
, vdu_name
, vdu_index
7594 nslcmop_id
=nslcmop_id
,
7600 member_vnf_index
=member_vnf_index
,
7603 deploy_params
=deploy_params_vdu
,
7604 descriptor_config
=descriptor_config
,
7605 base_folder
=base_folder
,
7606 task_instantiation_info
=tasks_dict_info
,
7610 # VDU Level charm. Normal case with native charms.
7611 descriptor_config
= get_configuration(vnfd
, vdu_name
)
7612 if descriptor_config
:
7614 logging_text
=logging_text
7615 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7616 member_vnf_index
, vdu_name
, vdu_index
7620 nslcmop_id
=nslcmop_id
,
7626 member_vnf_index
=member_vnf_index
,
7627 vdu_index
=vdu_index
,
7629 deploy_params
=deploy_params_vdu
,
7630 descriptor_config
=descriptor_config
,
7631 base_folder
=base_folder
,
7632 task_instantiation_info
=tasks_dict_info
,
7637 ROclient
.ROClientException
,
7642 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7644 except asyncio
.CancelledError
:
7646 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7648 exc
= "Operation was cancelled"
7649 except Exception as e
:
7650 exc
= traceback
.format_exc()
7651 self
.logger
.critical(
7652 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7657 stage
[1] = "Waiting for healing pending tasks."
7658 self
.logger
.debug(logging_text
+ stage
[1])
7659 exc
= await self
._wait
_for
_tasks
(
7662 self
.timeout_ns_deploy
,
7670 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7671 nslcmop_operation_state
= "FAILED"
7673 db_nsr_update
["operational-status"] = old_operational_status
7674 db_nsr_update
["config-status"] = old_config_status
7677 ] = "FAILED healing nslcmop={} {}: {}".format(
7678 nslcmop_id
, step
, exc
7680 for task
, task_name
in tasks_dict_info
.items():
7681 if not task
.done() or task
.cancelled() or task
.exception():
7682 if task_name
.startswith(self
.task_name_deploy_vca
):
7683 # A N2VC task is pending
7684 db_nsr_update
["config-status"] = "failed"
7686 # RO task is pending
7687 db_nsr_update
["operational-status"] = "failed"
7689 error_description_nslcmop
= None
7690 nslcmop_operation_state
= "COMPLETED"
7691 db_nslcmop_update
["detailed-status"] = "Done"
7692 db_nsr_update
["detailed-status"] = "Done"
7693 db_nsr_update
["operational-status"] = "running"
7694 db_nsr_update
["config-status"] = "configured"
7696 self
._write
_op
_status
(
7699 error_message
=error_description_nslcmop
,
7700 operation_state
=nslcmop_operation_state
,
7701 other_update
=db_nslcmop_update
,
7704 self
._write
_ns
_status
(
7707 current_operation
="IDLE",
7708 current_operation_id
=None,
7709 other_update
=db_nsr_update
,
7712 if nslcmop_operation_state
:
7716 "nslcmop_id": nslcmop_id
,
7717 "operationState": nslcmop_operation_state
,
7719 await self
.msg
.aiowrite("ns", "healed", msg
, loop
=self
.loop
)
7720 except Exception as e
:
7722 logging_text
+ "kafka_write notification Exception {}".format(e
)
7724 self
.logger
.debug(logging_text
+ "Exit")
7725 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_heal")
7736 :param logging_text: preffix text to use at logging
7737 :param nsr_id: nsr identity
7738 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
7739 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
7740 :return: None or exception
7742 def get_vim_account(vim_account_id
):
7744 if vim_account_id
in db_vims
:
7745 return db_vims
[vim_account_id
]
7746 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
7747 db_vims
[vim_account_id
] = db_vim
7752 ns_params
= db_nslcmop
.get("operationParams")
7753 if ns_params
and ns_params
.get("timeout_ns_heal"):
7754 timeout_ns_heal
= ns_params
["timeout_ns_heal"]
7756 timeout_ns_heal
= self
.timeout
.get(
7757 "ns_heal", self
.timeout_ns_heal
7762 nslcmop_id
= db_nslcmop
["_id"]
7764 "action_id": nslcmop_id
,
7766 self
.logger
.warning("db_nslcmop={} and timeout_ns_heal={}".format(db_nslcmop
,timeout_ns_heal
))
7767 target
.update(db_nslcmop
.get("operationParams", {}))
7769 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
7770 desc
= await self
.RO
.recreate(nsr_id
, target
)
7771 self
.logger
.debug("RO return > {}".format(desc
))
7772 action_id
= desc
["action_id"]
7773 # waits for RO to complete because Reinjecting juju key at ro can find VM in state Deleted
7774 await self
._wait
_ng
_ro
(
7775 nsr_id
, action_id
, nslcmop_id
, start_heal
, timeout_ns_heal
, stage
,
7781 "_admin.deployed.RO.operational-status": "running",
7782 "detailed-status": " ".join(stage
),
7784 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7785 self
._write
_op
_status
(nslcmop_id
, stage
)
7787 logging_text
+ "ns healed at RO. RO_id={}".format(action_id
)
7790 except Exception as e
:
7791 stage
[2] = "ERROR healing at VIM"
7792 #self.set_vnfr_at_error(db_vnfrs, str(e))
7794 "Error healing at VIM {}".format(e
),
7795 exc_info
=not isinstance(
7798 ROclient
.ROClientException
,
7824 task_instantiation_info
,
7827 # launch instantiate_N2VC in a asyncio task and register task object
7828 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
7829 # if not found, create one entry and update database
7830 # fill db_nsr._admin.deployed.VCA.<index>
7833 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
7835 if "execution-environment-list" in descriptor_config
:
7836 ee_list
= descriptor_config
.get("execution-environment-list", [])
7837 elif "juju" in descriptor_config
:
7838 ee_list
= [descriptor_config
] # ns charms
7839 else: # other types as script are not supported
7842 for ee_item
in ee_list
:
7845 + "_deploy_n2vc ee_item juju={}, helm={}".format(
7846 ee_item
.get("juju"), ee_item
.get("helm-chart")
7849 ee_descriptor_id
= ee_item
.get("id")
7850 if ee_item
.get("juju"):
7851 vca_name
= ee_item
["juju"].get("charm")
7854 if ee_item
["juju"].get("charm") is not None
7857 if ee_item
["juju"].get("cloud") == "k8s":
7858 vca_type
= "k8s_proxy_charm"
7859 elif ee_item
["juju"].get("proxy") is False:
7860 vca_type
= "native_charm"
7861 elif ee_item
.get("helm-chart"):
7862 vca_name
= ee_item
["helm-chart"]
7863 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
7866 vca_type
= "helm-v3"
7869 logging_text
+ "skipping non juju neither charm configuration"
7874 for vca_index
, vca_deployed
in enumerate(
7875 db_nsr
["_admin"]["deployed"]["VCA"]
7877 if not vca_deployed
:
7880 vca_deployed
.get("member-vnf-index") == member_vnf_index
7881 and vca_deployed
.get("vdu_id") == vdu_id
7882 and vca_deployed
.get("kdu_name") == kdu_name
7883 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
7884 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
7888 # not found, create one.
7890 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
7893 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
7895 target
+= "/kdu/{}".format(kdu_name
)
7897 "target_element": target
,
7898 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
7899 "member-vnf-index": member_vnf_index
,
7901 "kdu_name": kdu_name
,
7902 "vdu_count_index": vdu_index
,
7903 "operational-status": "init", # TODO revise
7904 "detailed-status": "", # TODO revise
7905 "step": "initial-deploy", # TODO revise
7907 "vdu_name": vdu_name
,
7909 "ee_descriptor_id": ee_descriptor_id
,
7913 # create VCA and configurationStatus in db
7915 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
7916 "configurationStatus.{}".format(vca_index
): dict(),
7918 self
.update_db_2("nsrs", nsr_id
, db_dict
)
7920 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
7922 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
7923 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
7924 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
7927 task_n2vc
= asyncio
.ensure_future(
7929 logging_text
=logging_text
,
7930 vca_index
=vca_index
,
7936 vdu_index
=vdu_index
,
7937 deploy_params
=deploy_params
,
7938 config_descriptor
=descriptor_config
,
7939 base_folder
=base_folder
,
7940 nslcmop_id
=nslcmop_id
,
7944 ee_config_descriptor
=ee_item
,
7947 self
.lcm_tasks
.register(
7951 "instantiate_N2VC-{}".format(vca_index
),
7954 task_instantiation_info
[
7956 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
7957 member_vnf_index
or "", vdu_id
or ""
7960 async def heal_N2VC(
7977 ee_config_descriptor
,
7979 nsr_id
= db_nsr
["_id"]
7980 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
7981 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
7982 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
7983 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
7985 "collection": "nsrs",
7986 "filter": {"_id": nsr_id
},
7987 "path": db_update_entry
,
7993 element_under_configuration
= nsr_id
7997 vnfr_id
= db_vnfr
["_id"]
7998 osm_config
["osm"]["vnf_id"] = vnfr_id
8000 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
8002 if vca_type
== "native_charm":
8005 index_number
= vdu_index
or 0
8008 element_type
= "VNF"
8009 element_under_configuration
= vnfr_id
8010 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
8012 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
8013 element_type
= "VDU"
8014 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
8015 osm_config
["osm"]["vdu_id"] = vdu_id
8017 namespace
+= ".{}".format(kdu_name
)
8018 element_type
= "KDU"
8019 element_under_configuration
= kdu_name
8020 osm_config
["osm"]["kdu_name"] = kdu_name
8023 if base_folder
["pkg-dir"]:
8024 artifact_path
= "{}/{}/{}/{}".format(
8025 base_folder
["folder"],
8026 base_folder
["pkg-dir"],
8029 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8034 artifact_path
= "{}/Scripts/{}/{}/".format(
8035 base_folder
["folder"],
8038 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8043 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
8045 # get initial_config_primitive_list that applies to this element
8046 initial_config_primitive_list
= config_descriptor
.get(
8047 "initial-config-primitive"
8051 "Initial config primitive list > {}".format(
8052 initial_config_primitive_list
8056 # add config if not present for NS charm
8057 ee_descriptor_id
= ee_config_descriptor
.get("id")
8058 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
8059 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
8060 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
8064 "Initial config primitive list #2 > {}".format(
8065 initial_config_primitive_list
8068 # n2vc_redesign STEP 3.1
8069 # find old ee_id if exists
8070 ee_id
= vca_deployed
.get("ee_id")
8072 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
8073 # create or register execution environment in VCA. Only for native charms when healing
8074 if vca_type
== "native_charm":
8075 step
= "Waiting to VM being up and getting IP address"
8076 self
.logger
.debug(logging_text
+ step
)
8077 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8086 credentials
= {"hostname": rw_mgmt_ip
}
8088 username
= deep_get(
8089 config_descriptor
, ("config-access", "ssh-access", "default-user")
8091 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
8092 # merged. Meanwhile let's get username from initial-config-primitive
8093 if not username
and initial_config_primitive_list
:
8094 for config_primitive
in initial_config_primitive_list
:
8095 for param
in config_primitive
.get("parameter", ()):
8096 if param
["name"] == "ssh-username":
8097 username
= param
["value"]
8101 "Cannot determine the username neither with 'initial-config-primitive' nor with "
8102 "'config-access.ssh-access.default-user'"
8104 credentials
["username"] = username
8106 # n2vc_redesign STEP 3.2
8107 # TODO: Before healing at RO it is needed to destroy native charm units to be deleted.
8108 self
._write
_configuration
_status
(
8110 vca_index
=vca_index
,
8111 status
="REGISTERING",
8112 element_under_configuration
=element_under_configuration
,
8113 element_type
=element_type
,
8116 step
= "register execution environment {}".format(credentials
)
8117 self
.logger
.debug(logging_text
+ step
)
8118 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
8119 credentials
=credentials
,
8120 namespace
=namespace
,
8125 # update ee_id en db
8127 "_admin.deployed.VCA.{}.ee_id".format(vca_index
): ee_id
,
8129 self
.update_db_2("nsrs", nsr_id
, db_dict_ee_id
)
8131 # for compatibility with MON/POL modules, the need model and application name at database
8132 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
8133 # Not sure if this need to be done when healing
8135 ee_id_parts = ee_id.split(".")
8136 db_nsr_update = {db_update_entry + "ee_id": ee_id}
8137 if len(ee_id_parts) >= 2:
8138 model_name = ee_id_parts[0]
8139 application_name = ee_id_parts[1]
8140 db_nsr_update[db_update_entry + "model"] = model_name
8141 db_nsr_update[db_update_entry + "application"] = application_name
8144 # n2vc_redesign STEP 3.3
8145 # Install configuration software. Only for native charms.
8146 step
= "Install configuration Software"
8148 self
._write
_configuration
_status
(
8150 vca_index
=vca_index
,
8151 status
="INSTALLING SW",
8152 element_under_configuration
=element_under_configuration
,
8153 element_type
=element_type
,
8154 #other_update=db_nsr_update,
8158 # TODO check if already done
8159 self
.logger
.debug(logging_text
+ step
)
8161 if vca_type
== "native_charm":
8162 config_primitive
= next(
8163 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
8166 if config_primitive
:
8167 config
= self
._map
_primitive
_params
(
8168 config_primitive
, {}, deploy_params
8170 await self
.vca_map
[vca_type
].install_configuration_sw(
8172 artifact_path
=artifact_path
,
8180 # write in db flag of configuration_sw already installed
8182 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
8185 # Not sure if this need to be done when healing
8187 # add relations for this VCA (wait for other peers related with this VCA)
8188 await self._add_vca_relations(
8189 logging_text=logging_text,
8192 vca_index=vca_index,
8196 # if SSH access is required, then get execution environment SSH public
8197 # if native charm we have waited already to VM be UP
8198 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
8201 # self.logger.debug("get ssh key block")
8203 config_descriptor
, ("config-access", "ssh-access", "required")
8205 # self.logger.debug("ssh key needed")
8206 # Needed to inject a ssh key
8209 ("config-access", "ssh-access", "default-user"),
8211 step
= "Install configuration Software, getting public ssh key"
8212 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
8213 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
8216 step
= "Insert public key into VM user={} ssh_key={}".format(
8220 # self.logger.debug("no need to get ssh key")
8221 step
= "Waiting to VM being up and getting IP address"
8222 self
.logger
.debug(logging_text
+ step
)
8224 # n2vc_redesign STEP 5.1
8225 # wait for RO (ip-address) Insert pub_key into VM
8226 # IMPORTANT: We need do wait for RO to complete healing operation.
8227 await self
._wait
_heal
_ro
(nsr_id
,self
.timeout_ns_heal
)
8230 rw_mgmt_ip
= await self
.wait_kdu_up(
8231 logging_text
, nsr_id
, vnfr_id
, kdu_name
8234 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8244 rw_mgmt_ip
= None # This is for a NS configuration
8246 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
8248 # store rw_mgmt_ip in deploy params for later replacement
8249 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
8252 # get run-day1 operation parameter
8253 runDay1
= deploy_params
.get("run-day1",False)
8254 self
.logger
.debug(" Healing vnf={}, vdu={}, runDay1 ={}".format(vnfr_id
,vdu_id
,runDay1
))
8256 # n2vc_redesign STEP 6 Execute initial config primitive
8257 step
= "execute initial config primitive"
8259 # wait for dependent primitives execution (NS -> VNF -> VDU)
8260 if initial_config_primitive_list
:
8261 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
8263 # stage, in function of element type: vdu, kdu, vnf or ns
8264 my_vca
= vca_deployed_list
[vca_index
]
8265 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
8267 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
8268 elif my_vca
.get("member-vnf-index"):
8270 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
8273 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
8275 self
._write
_configuration
_status
(
8276 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
8279 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
8281 check_if_terminated_needed
= True
8282 for initial_config_primitive
in initial_config_primitive_list
:
8283 # adding information on the vca_deployed if it is a NS execution environment
8284 if not vca_deployed
["member-vnf-index"]:
8285 deploy_params
["ns_config_info"] = json
.dumps(
8286 self
._get
_ns
_config
_info
(nsr_id
)
8288 # TODO check if already done
8289 primitive_params_
= self
._map
_primitive
_params
(
8290 initial_config_primitive
, {}, deploy_params
8293 step
= "execute primitive '{}' params '{}'".format(
8294 initial_config_primitive
["name"], primitive_params_
8296 self
.logger
.debug(logging_text
+ step
)
8297 await self
.vca_map
[vca_type
].exec_primitive(
8299 primitive_name
=initial_config_primitive
["name"],
8300 params_dict
=primitive_params_
,
8305 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
8306 if check_if_terminated_needed
:
8307 if config_descriptor
.get("terminate-config-primitive"):
8309 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
8311 check_if_terminated_needed
= False
8313 # TODO register in database that primitive is done
8315 # STEP 7 Configure metrics
8316 # Not sure if this need to be done when healing
8318 if vca_type == "helm" or vca_type == "helm-v3":
8319 prometheus_jobs = await self.extract_prometheus_scrape_jobs(
8321 artifact_path=artifact_path,
8322 ee_config_descriptor=ee_config_descriptor,
8325 target_ip=rw_mgmt_ip,
8331 {db_update_entry + "prometheus_jobs": prometheus_jobs},
8334 for job in prometheus_jobs:
8337 {"job_name": job["job_name"]},
8340 fail_on_empty=False,
8344 step
= "instantiated at VCA"
8345 self
.logger
.debug(logging_text
+ step
)
8347 self
._write
_configuration
_status
(
8348 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
8351 except Exception as e
: # TODO not use Exception but N2VC exception
8352 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
8354 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
8357 "Exception while {} : {}".format(step
, e
), exc_info
=True
8359 self
._write
_configuration
_status
(
8360 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
8362 raise LcmException("{} {}".format(step
, e
)) from e
8364 async def _wait_heal_ro(
8370 while time() <= start_time
+ timeout
:
8371 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
8372 operational_status_ro
= db_nsr
["_admin"]["deployed"]["RO"]["operational-status"]
8373 self
.logger
.debug("Wait Heal RO > {}".format(operational_status_ro
))
8374 if operational_status_ro
!= "healing":
8376 await asyncio
.sleep(15, loop
=self
.loop
)
8377 else: # timeout_ns_deploy
8378 raise NgRoException("Timeout waiting ns to deploy")
8380 async def vertical_scale(self
, nsr_id
, nslcmop_id
):
8382 Vertical Scale the VDUs in a NS
8384 :param: nsr_id: NS Instance ID
8385 :param: nslcmop_id: nslcmop ID of migrate
8388 # Try to lock HA task here
8389 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
8390 if not task_is_locked_by_me
:
8392 logging_text
= "Task ns={} vertical scale ".format(nsr_id
)
8393 self
.logger
.debug(logging_text
+ "Enter")
8394 # get all needed from database
8396 db_nslcmop_update
= {}
8397 nslcmop_operation_state
= None
8401 # in case of error, indicates what part of scale was failed to put nsr at error status
8402 start_deploy
= time()
8405 # wait for any previous tasks in process
8406 step
= "Waiting for previous operations to terminate"
8407 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
8409 self
._write
_ns
_status
(
8412 current_operation
="VerticalScale",
8413 current_operation_id
=nslcmop_id
8415 step
= "Getting nslcmop from database"
8416 self
.logger
.debug(step
+ " after having waited for previous tasks to be completed")
8417 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
8418 operationParams
= db_nslcmop
.get("operationParams")
8420 target
.update(operationParams
)
8421 desc
= await self
.RO
.vertical_scale(nsr_id
, target
)
8422 self
.logger
.debug("RO return > {}".format(desc
))
8423 action_id
= desc
["action_id"]
8424 await self
._wait
_ng
_ro
(
8425 nsr_id
, action_id
, nslcmop_id
, start_deploy
, self
.timeout_verticalscale
,
8426 operation
="verticalscale"
8428 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
8429 self
.logger
.error("Exit Exception {}".format(e
))
8431 except asyncio
.CancelledError
:
8432 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
8433 exc
= "Operation was cancelled"
8434 except Exception as e
:
8435 exc
= traceback
.format_exc()
8436 self
.logger
.critical("Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
8438 self
._write
_ns
_status
(
8441 current_operation
="IDLE",
8442 current_operation_id
=None,
8447 ] = "FAILED {}: {}".format(step
, exc
)
8448 nslcmop_operation_state
= "FAILED"
8450 nslcmop_operation_state
= "COMPLETED"
8451 db_nslcmop_update
["detailed-status"] = "Done"
8452 db_nsr_update
["detailed-status"] = "Done"
8454 self
._write
_op
_status
(
8458 operation_state
=nslcmop_operation_state
,
8459 other_update
=db_nslcmop_update
,
8461 if nslcmop_operation_state
:
8465 "nslcmop_id": nslcmop_id
,
8466 "operationState": nslcmop_operation_state
,
8468 await self
.msg
.aiowrite("ns", "verticalscaled", msg
, loop
=self
.loop
)
8469 except Exception as e
:
8471 logging_text
+ "kafka_write notification Exception {}".format(e
)
8473 self
.logger
.debug(logging_text
+ "Exit")
8474 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_verticalscale")