1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
20 from typing
import Any
, Dict
, List
23 import logging
.handlers
34 from osm_lcm
import ROclient
35 from osm_lcm
.data_utils
.nsr
import (
38 get_deployed_vca_list
,
41 from osm_lcm
.data_utils
.vca
import (
50 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
51 from osm_lcm
.lcm_utils
import (
59 from osm_lcm
.data_utils
.nsd
import (
60 get_ns_configuration_relation_list
,
64 from osm_lcm
.data_utils
.vnfd
import (
68 get_ee_sorted_initial_config_primitive_list
,
69 get_ee_sorted_terminate_config_primitive_list
,
71 get_virtual_link_profiles
,
76 get_number_of_instances
,
78 get_kdu_resource_profile
,
80 from osm_lcm
.data_utils
.list_utils
import find_in_list
81 from osm_lcm
.data_utils
.vnfr
import get_osm_params
, get_vdur_index
, get_kdur
82 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
83 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
84 from n2vc
.definitions
import RelationEndpoint
85 from n2vc
.k8s_helm_conn
import K8sHelmConnector
86 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
87 from n2vc
.k8s_juju_conn
import K8sJujuConnector
89 from osm_common
.dbbase
import DbException
90 from osm_common
.fsbase
import FsException
92 from osm_lcm
.data_utils
.database
.database
import Database
93 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
95 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
96 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
98 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
99 from osm_lcm
.prometheus
import parse_job
101 from copy
import copy
, deepcopy
102 from time
import time
103 from uuid
import uuid4
105 from random
import randint
107 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
110 class NsLcm(LcmBase
):
111 timeout_vca_on_error
= (
113 ) # Time for charm from first time at blocked,error status to mark as failed
114 timeout_ns_deploy
= 2 * 3600 # default global timeout for deployment a ns
115 timeout_ns_terminate
= 1800 # default global timeout for un deployment a ns
116 timeout_charm_delete
= 10 * 60
117 timeout_primitive
= 30 * 60 # timeout for primitive execution
118 timeout_progress_primitive
= (
120 ) # timeout for some progress in a primitive execution
122 SUBOPERATION_STATUS_NOT_FOUND
= -1
123 SUBOPERATION_STATUS_NEW
= -2
124 SUBOPERATION_STATUS_SKIP
= -3
125 task_name_deploy_vca
= "Deploying VCA"
127 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
129 Init, Connect to database, filesystem storage, and messaging
130 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
133 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
135 self
.db
= Database().instance
.db
136 self
.fs
= Filesystem().instance
.fs
138 self
.lcm_tasks
= lcm_tasks
139 self
.timeout
= config
["timeout"]
140 self
.ro_config
= config
["ro_config"]
141 self
.ng_ro
= config
["ro_config"].get("ng")
142 self
.vca_config
= config
["VCA"].copy()
144 # create N2VC connector
145 self
.n2vc
= N2VCJujuConnector(
148 on_update_db
=self
._on
_update
_n
2vc
_db
,
153 self
.conn_helm_ee
= LCMHelmConn(
156 vca_config
=self
.vca_config
,
157 on_update_db
=self
._on
_update
_n
2vc
_db
,
160 self
.k8sclusterhelm2
= K8sHelmConnector(
161 kubectl_command
=self
.vca_config
.get("kubectlpath"),
162 helm_command
=self
.vca_config
.get("helmpath"),
169 self
.k8sclusterhelm3
= K8sHelm3Connector(
170 kubectl_command
=self
.vca_config
.get("kubectlpath"),
171 helm_command
=self
.vca_config
.get("helm3path"),
178 self
.k8sclusterjuju
= K8sJujuConnector(
179 kubectl_command
=self
.vca_config
.get("kubectlpath"),
180 juju_command
=self
.vca_config
.get("jujupath"),
183 on_update_db
=self
._on
_update
_k
8s
_db
,
188 self
.k8scluster_map
= {
189 "helm-chart": self
.k8sclusterhelm2
,
190 "helm-chart-v3": self
.k8sclusterhelm3
,
191 "chart": self
.k8sclusterhelm3
,
192 "juju-bundle": self
.k8sclusterjuju
,
193 "juju": self
.k8sclusterjuju
,
197 "lxc_proxy_charm": self
.n2vc
,
198 "native_charm": self
.n2vc
,
199 "k8s_proxy_charm": self
.n2vc
,
200 "helm": self
.conn_helm_ee
,
201 "helm-v3": self
.conn_helm_ee
,
205 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
)
208 def increment_ip_mac(ip_mac
, vm_index
=1):
209 if not isinstance(ip_mac
, str):
212 # try with ipv4 look for last dot
213 i
= ip_mac
.rfind(".")
216 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
217 # try with ipv6 or mac look for last colon. Operate in hex
218 i
= ip_mac
.rfind(":")
221 # format in hex, len can be 2 for mac or 4 for ipv6
222 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
223 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
229 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
231 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
234 # TODO filter RO descriptor fields...
238 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
239 db_dict
["deploymentStatus"] = ro_descriptor
240 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
242 except Exception as e
:
244 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id
, e
)
247 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
249 # remove last dot from path (if exists)
250 if path
.endswith("."):
253 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
254 # .format(table, filter, path, updated_data))
257 nsr_id
= filter.get("_id")
259 # read ns record from database
260 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
261 current_ns_status
= nsr
.get("nsState")
263 # get vca status for NS
264 status_dict
= await self
.n2vc
.get_status(
265 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
270 db_dict
["vcaStatus"] = status_dict
271 await self
.n2vc
.update_vca_status(db_dict
["vcaStatus"], vca_id
=vca_id
)
273 # update configurationStatus for this VCA
275 vca_index
= int(path
[path
.rfind(".") + 1 :])
278 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
280 vca_status
= vca_list
[vca_index
].get("status")
282 configuration_status_list
= nsr
.get("configurationStatus")
283 config_status
= configuration_status_list
[vca_index
].get("status")
285 if config_status
== "BROKEN" and vca_status
!= "failed":
286 db_dict
["configurationStatus"][vca_index
] = "READY"
287 elif config_status
!= "BROKEN" and vca_status
== "failed":
288 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
289 except Exception as e
:
290 # not update configurationStatus
291 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
293 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
294 # if nsState = 'DEGRADED' check if all is OK
296 if current_ns_status
in ("READY", "DEGRADED"):
297 error_description
= ""
299 if status_dict
.get("machines"):
300 for machine_id
in status_dict
.get("machines"):
301 machine
= status_dict
.get("machines").get(machine_id
)
302 # check machine agent-status
303 if machine
.get("agent-status"):
304 s
= machine
.get("agent-status").get("status")
307 error_description
+= (
308 "machine {} agent-status={} ; ".format(
312 # check machine instance status
313 if machine
.get("instance-status"):
314 s
= machine
.get("instance-status").get("status")
317 error_description
+= (
318 "machine {} instance-status={} ; ".format(
323 if status_dict
.get("applications"):
324 for app_id
in status_dict
.get("applications"):
325 app
= status_dict
.get("applications").get(app_id
)
326 # check application status
327 if app
.get("status"):
328 s
= app
.get("status").get("status")
331 error_description
+= (
332 "application {} status={} ; ".format(app_id
, s
)
335 if error_description
:
336 db_dict
["errorDescription"] = error_description
337 if current_ns_status
== "READY" and is_degraded
:
338 db_dict
["nsState"] = "DEGRADED"
339 if current_ns_status
== "DEGRADED" and not is_degraded
:
340 db_dict
["nsState"] = "READY"
343 self
.update_db_2("nsrs", nsr_id
, db_dict
)
345 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
347 except Exception as e
:
348 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
350 async def _on_update_k8s_db(
351 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None, cluster_type
="juju"
354 Updating vca status in NSR record
355 :param cluster_uuid: UUID of a k8s cluster
356 :param kdu_instance: The unique name of the KDU instance
357 :param filter: To get nsr_id
358 :cluster_type: The cluster type (juju, k8s)
362 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
363 # .format(cluster_uuid, kdu_instance, filter))
365 nsr_id
= filter.get("_id")
367 vca_status
= await self
.k8scluster_map
[cluster_type
].status_kdu(
368 cluster_uuid
=cluster_uuid
,
369 kdu_instance
=kdu_instance
,
371 complete_status
=True,
377 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
379 if cluster_type
in ("juju-bundle", "juju"):
380 # TODO -> this should be done in a more uniform way, I think in N2VC, in order to update the K8s VCA
381 # status in a similar way between Juju Bundles and Helm Charts on this side
382 await self
.k8sclusterjuju
.update_vca_status(
383 db_dict
["vcaStatus"],
389 f
"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
393 self
.update_db_2("nsrs", nsr_id
, db_dict
)
394 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
396 except Exception as e
:
397 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
400 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
402 env
= Environment(undefined
=StrictUndefined
)
403 template
= env
.from_string(cloud_init_text
)
404 return template
.render(additional_params
or {})
405 except UndefinedError
as e
:
407 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
408 "file, must be provided in the instantiation parameters inside the "
409 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
411 except (TemplateError
, TemplateNotFound
) as e
:
413 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
418 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
419 cloud_init_content
= cloud_init_file
= None
421 if vdu
.get("cloud-init-file"):
422 base_folder
= vnfd
["_admin"]["storage"]
423 if base_folder
["pkg-dir"]:
424 cloud_init_file
= "{}/{}/cloud_init/{}".format(
425 base_folder
["folder"],
426 base_folder
["pkg-dir"],
427 vdu
["cloud-init-file"],
430 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
431 base_folder
["folder"],
432 vdu
["cloud-init-file"],
434 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
435 cloud_init_content
= ci_file
.read()
436 elif vdu
.get("cloud-init"):
437 cloud_init_content
= vdu
["cloud-init"]
439 return cloud_init_content
440 except FsException
as e
:
442 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
443 vnfd
["id"], vdu
["id"], cloud_init_file
, e
447 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
449 (vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]),
452 additional_params
= vdur
.get("additionalParams")
453 return parse_yaml_strings(additional_params
)
455 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
457 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
458 :param vnfd: input vnfd
459 :param new_id: overrides vnf id if provided
460 :param additionalParams: Instantiation params for VNFs provided
461 :param nsrId: Id of the NSR
462 :return: copy of vnfd
464 vnfd_RO
= deepcopy(vnfd
)
465 # remove unused by RO configuration, monitoring, scaling and internal keys
466 vnfd_RO
.pop("_id", None)
467 vnfd_RO
.pop("_admin", None)
468 vnfd_RO
.pop("monitoring-param", None)
469 vnfd_RO
.pop("scaling-group-descriptor", None)
470 vnfd_RO
.pop("kdu", None)
471 vnfd_RO
.pop("k8s-cluster", None)
473 vnfd_RO
["id"] = new_id
475 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
476 for vdu
in get_iterable(vnfd_RO
, "vdu"):
477 vdu
.pop("cloud-init-file", None)
478 vdu
.pop("cloud-init", None)
482 def ip_profile_2_RO(ip_profile
):
483 RO_ip_profile
= deepcopy(ip_profile
)
484 if "dns-server" in RO_ip_profile
:
485 if isinstance(RO_ip_profile
["dns-server"], list):
486 RO_ip_profile
["dns-address"] = []
487 for ds
in RO_ip_profile
.pop("dns-server"):
488 RO_ip_profile
["dns-address"].append(ds
["address"])
490 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
491 if RO_ip_profile
.get("ip-version") == "ipv4":
492 RO_ip_profile
["ip-version"] = "IPv4"
493 if RO_ip_profile
.get("ip-version") == "ipv6":
494 RO_ip_profile
["ip-version"] = "IPv6"
495 if "dhcp-params" in RO_ip_profile
:
496 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
499 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
500 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
501 if db_vim
["_admin"]["operationalState"] != "ENABLED":
503 "VIM={} is not available. operationalState={}".format(
504 vim_account
, db_vim
["_admin"]["operationalState"]
507 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
510 def get_ro_wim_id_for_wim_account(self
, wim_account
):
511 if isinstance(wim_account
, str):
512 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
513 if db_wim
["_admin"]["operationalState"] != "ENABLED":
515 "WIM={} is not available. operationalState={}".format(
516 wim_account
, db_wim
["_admin"]["operationalState"]
519 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
524 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
526 db_vdu_push_list
= []
528 db_update
= {"_admin.modified": time()}
530 for vdu_id
, vdu_count
in vdu_create
.items():
534 for vdur
in reversed(db_vnfr
["vdur"])
535 if vdur
["vdu-id-ref"] == vdu_id
540 # Read the template saved in the db:
541 self
.logger
.debug(f
"No vdur in the database. Using the vdur-template to scale")
542 vdur_template
= db_vnfr
.get("vdur-template")
543 if not vdur_template
:
545 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
549 vdur
= vdur_template
[0]
550 #Delete a template from the database after using it
551 self
.db
.set_one("vnfrs",
552 {"_id": db_vnfr
["_id"]},
554 pull
={"vdur-template": {"_id": vdur
['_id']}}
556 for count
in range(vdu_count
):
557 vdur_copy
= deepcopy(vdur
)
558 vdur_copy
["status"] = "BUILD"
559 vdur_copy
["status-detailed"] = None
560 vdur_copy
["ip-address"] = None
561 vdur_copy
["_id"] = str(uuid4())
562 vdur_copy
["count-index"] += count
+ 1
563 vdur_copy
["id"] = "{}-{}".format(
564 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
566 vdur_copy
.pop("vim_info", None)
567 for iface
in vdur_copy
["interfaces"]:
568 if iface
.get("fixed-ip"):
569 iface
["ip-address"] = self
.increment_ip_mac(
570 iface
["ip-address"], count
+ 1
573 iface
.pop("ip-address", None)
574 if iface
.get("fixed-mac"):
575 iface
["mac-address"] = self
.increment_ip_mac(
576 iface
["mac-address"], count
+ 1
579 iface
.pop("mac-address", None)
583 ) # only first vdu can be managment of vnf
584 db_vdu_push_list
.append(vdur_copy
)
585 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
587 if len(db_vnfr
["vdur"]) == 1:
588 # The scale will move to 0 instances
589 self
.logger
.debug(f
"Scaling to 0 !, creating the template with the last vdur")
590 template_vdur
= [db_vnfr
["vdur"][0]]
591 for vdu_id
, vdu_count
in vdu_delete
.items():
593 indexes_to_delete
= [
595 for iv
in enumerate(db_vnfr
["vdur"])
596 if iv
[1]["vdu-id-ref"] == vdu_id
600 "vdur.{}.status".format(i
): "DELETING"
601 for i
in indexes_to_delete
[-vdu_count
:]
605 # it must be deleted one by one because common.db does not allow otherwise
608 for v
in reversed(db_vnfr
["vdur"])
609 if v
["vdu-id-ref"] == vdu_id
611 for vdu
in vdus_to_delete
[:vdu_count
]:
614 {"_id": db_vnfr
["_id"]},
616 pull
={"vdur": {"_id": vdu
["_id"]}},
620 db_push
["vdur"] = db_vdu_push_list
622 db_push
["vdur-template"] = template_vdur
625 db_vnfr
["vdur-template"] = template_vdur
626 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
627 # modify passed dictionary db_vnfr
628 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
629 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
631 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
633 Updates database nsr with the RO info for the created vld
634 :param ns_update_nsr: dictionary to be filled with the updated info
635 :param db_nsr: content of db_nsr. This is also modified
636 :param nsr_desc_RO: nsr descriptor from RO
637 :return: Nothing, LcmException is raised on errors
640 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
641 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
642 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
644 vld
["vim-id"] = net_RO
.get("vim_net_id")
645 vld
["name"] = net_RO
.get("vim_name")
646 vld
["status"] = net_RO
.get("status")
647 vld
["status-detailed"] = net_RO
.get("error_msg")
648 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
652 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
655 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
657 for db_vnfr
in db_vnfrs
.values():
658 vnfr_update
= {"status": "ERROR"}
659 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
660 if "status" not in vdur
:
661 vdur
["status"] = "ERROR"
662 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
664 vdur
["status-detailed"] = str(error_text
)
666 "vdur.{}.status-detailed".format(vdu_index
)
668 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
669 except DbException
as e
:
670 self
.logger
.error("Cannot update vnf. {}".format(e
))
672 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
674 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
675 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
676 :param nsr_desc_RO: nsr descriptor from RO
677 :return: Nothing, LcmException is raised on errors
679 for vnf_index
, db_vnfr
in db_vnfrs
.items():
680 for vnf_RO
in nsr_desc_RO
["vnfs"]:
681 if vnf_RO
["member_vnf_index"] != vnf_index
:
684 if vnf_RO
.get("ip_address"):
685 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
[
688 elif not db_vnfr
.get("ip-address"):
689 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
690 raise LcmExceptionNoMgmtIP(
691 "ns member_vnf_index '{}' has no IP address".format(
696 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
697 vdur_RO_count_index
= 0
698 if vdur
.get("pdu-type"):
700 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
701 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
703 if vdur
["count-index"] != vdur_RO_count_index
:
704 vdur_RO_count_index
+= 1
706 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
707 if vdur_RO
.get("ip_address"):
708 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
710 vdur
["ip-address"] = None
711 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
712 vdur
["name"] = vdur_RO
.get("vim_name")
713 vdur
["status"] = vdur_RO
.get("status")
714 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
715 for ifacer
in get_iterable(vdur
, "interfaces"):
716 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
717 if ifacer
["name"] == interface_RO
.get("internal_name"):
718 ifacer
["ip-address"] = interface_RO
.get(
721 ifacer
["mac-address"] = interface_RO
.get(
727 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
728 "from VIM info".format(
729 vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]
732 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
736 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
738 vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]
742 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
743 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
744 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
746 vld
["vim-id"] = net_RO
.get("vim_net_id")
747 vld
["name"] = net_RO
.get("vim_name")
748 vld
["status"] = net_RO
.get("status")
749 vld
["status-detailed"] = net_RO
.get("error_msg")
750 vnfr_update
["vld.{}".format(vld_index
)] = vld
754 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
759 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
764 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
769 def _get_ns_config_info(self
, nsr_id
):
771 Generates a mapping between vnf,vdu elements and the N2VC id
772 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
773 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
774 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
775 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
777 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
778 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
780 ns_config_info
= {"osm-config-mapping": mapping
}
781 for vca
in vca_deployed_list
:
782 if not vca
["member-vnf-index"]:
784 if not vca
["vdu_id"]:
785 mapping
[vca
["member-vnf-index"]] = vca
["application"]
789 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
791 ] = vca
["application"]
792 return ns_config_info
794 async def _instantiate_ng_ro(
811 def get_vim_account(vim_account_id
):
813 if vim_account_id
in db_vims
:
814 return db_vims
[vim_account_id
]
815 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
816 db_vims
[vim_account_id
] = db_vim
819 # modify target_vld info with instantiation parameters
820 def parse_vld_instantiation_params(
821 target_vim
, target_vld
, vld_params
, target_sdn
823 if vld_params
.get("ip-profile"):
824 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
[
827 if vld_params
.get("provider-network"):
828 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
831 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
832 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
835 if vld_params
.get("wimAccountId"):
836 target_wim
= "wim:{}".format(vld_params
["wimAccountId"])
837 target_vld
["vim_info"][target_wim
] = {}
838 for param
in ("vim-network-name", "vim-network-id"):
839 if vld_params
.get(param
):
840 if isinstance(vld_params
[param
], dict):
841 for vim
, vim_net
in vld_params
[param
].items():
842 other_target_vim
= "vim:" + vim
844 target_vld
["vim_info"],
845 (other_target_vim
, param
.replace("-", "_")),
848 else: # isinstance str
849 target_vld
["vim_info"][target_vim
][
850 param
.replace("-", "_")
851 ] = vld_params
[param
]
852 if vld_params
.get("common_id"):
853 target_vld
["common_id"] = vld_params
.get("common_id")
855 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
856 def update_ns_vld_target(target
, ns_params
):
857 for vnf_params
in ns_params
.get("vnf", ()):
858 if vnf_params
.get("vimAccountId"):
862 for vnfr
in db_vnfrs
.values()
863 if vnf_params
["member-vnf-index"]
864 == vnfr
["member-vnf-index-ref"]
868 vdur
= next((vdur
for vdur
in target_vnf
.get("vdur", ())), None)
869 for a_index
, a_vld
in enumerate(target
["ns"]["vld"]):
870 target_vld
= find_in_list(
871 get_iterable(vdur
, "interfaces"),
872 lambda iface
: iface
.get("ns-vld-id") == a_vld
["name"],
875 if vnf_params
.get("vimAccountId") not in a_vld
.get(
878 target
["ns"]["vld"][a_index
].get("vim_info").update(
880 "vim:{}".format(vnf_params
["vimAccountId"]): {
881 "vim_network_name": ""
886 nslcmop_id
= db_nslcmop
["_id"]
888 "name": db_nsr
["name"],
891 "image": deepcopy(db_nsr
["image"]),
892 "flavor": deepcopy(db_nsr
["flavor"]),
893 "action_id": nslcmop_id
,
894 "cloud_init_content": {},
896 for image
in target
["image"]:
897 image
["vim_info"] = {}
898 for flavor
in target
["flavor"]:
899 flavor
["vim_info"] = {}
900 if db_nsr
.get("affinity-or-anti-affinity-group"):
901 target
["affinity-or-anti-affinity-group"] = deepcopy(db_nsr
["affinity-or-anti-affinity-group"])
902 for affinity_or_anti_affinity_group
in target
["affinity-or-anti-affinity-group"]:
903 affinity_or_anti_affinity_group
["vim_info"] = {}
905 if db_nslcmop
.get("lcmOperationType") != "instantiate":
906 # get parameters of instantiation:
907 db_nslcmop_instantiate
= self
.db
.get_list(
910 "nsInstanceId": db_nslcmop
["nsInstanceId"],
911 "lcmOperationType": "instantiate",
914 ns_params
= db_nslcmop_instantiate
.get("operationParams")
916 ns_params
= db_nslcmop
.get("operationParams")
917 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
918 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
921 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
922 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
926 "mgmt-network": vld
.get("mgmt-network", False),
927 "type": vld
.get("type"),
930 "vim_network_name": vld
.get("vim-network-name"),
931 "vim_account_id": ns_params
["vimAccountId"],
935 # check if this network needs SDN assist
936 if vld
.get("pci-interfaces"):
937 db_vim
= get_vim_account(ns_params
["vimAccountId"])
938 sdnc_id
= db_vim
["config"].get("sdn-controller")
940 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
941 target_sdn
= "sdn:{}".format(sdnc_id
)
942 target_vld
["vim_info"][target_sdn
] = {
944 "target_vim": target_vim
,
946 "type": vld
.get("type"),
949 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
950 for nsd_vnf_profile
in nsd_vnf_profiles
:
951 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
952 if cp
["virtual-link-profile-id"] == vld
["id"]:
954 "member_vnf:{}.{}".format(
955 cp
["constituent-cpd-id"][0][
956 "constituent-base-element-id"
958 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
960 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
962 # check at nsd descriptor, if there is an ip-profile
964 nsd_vlp
= find_in_list(
965 get_virtual_link_profiles(nsd
),
966 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
971 and nsd_vlp
.get("virtual-link-protocol-data")
972 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
974 ip_profile_source_data
= nsd_vlp
["virtual-link-protocol-data"][
977 ip_profile_dest_data
= {}
978 if "ip-version" in ip_profile_source_data
:
979 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
982 if "cidr" in ip_profile_source_data
:
983 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
986 if "gateway-ip" in ip_profile_source_data
:
987 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
[
990 if "dhcp-enabled" in ip_profile_source_data
:
991 ip_profile_dest_data
["dhcp-params"] = {
992 "enabled": ip_profile_source_data
["dhcp-enabled"]
994 vld_params
["ip-profile"] = ip_profile_dest_data
996 # update vld_params with instantiation params
997 vld_instantiation_params
= find_in_list(
998 get_iterable(ns_params
, "vld"),
999 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
1001 if vld_instantiation_params
:
1002 vld_params
.update(vld_instantiation_params
)
1003 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
1004 target
["ns"]["vld"].append(target_vld
)
1005 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1006 update_ns_vld_target(target
, ns_params
)
1008 for vnfr
in db_vnfrs
.values():
1009 vnfd
= find_in_list(
1010 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
1012 vnf_params
= find_in_list(
1013 get_iterable(ns_params
, "vnf"),
1014 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
1016 target_vnf
= deepcopy(vnfr
)
1017 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
1018 for vld
in target_vnf
.get("vld", ()):
1019 # check if connected to a ns.vld, to fill target'
1020 vnf_cp
= find_in_list(
1021 vnfd
.get("int-virtual-link-desc", ()),
1022 lambda cpd
: cpd
.get("id") == vld
["id"],
1025 ns_cp
= "member_vnf:{}.{}".format(
1026 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
1028 if cp2target
.get(ns_cp
):
1029 vld
["target"] = cp2target
[ns_cp
]
1032 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
1034 # check if this network needs SDN assist
1036 if vld
.get("pci-interfaces"):
1037 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1038 sdnc_id
= db_vim
["config"].get("sdn-controller")
1040 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
1041 target_sdn
= "sdn:{}".format(sdnc_id
)
1042 vld
["vim_info"][target_sdn
] = {
1044 "target_vim": target_vim
,
1046 "type": vld
.get("type"),
1049 # check at vnfd descriptor, if there is an ip-profile
1051 vnfd_vlp
= find_in_list(
1052 get_virtual_link_profiles(vnfd
),
1053 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
1057 and vnfd_vlp
.get("virtual-link-protocol-data")
1058 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1060 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"][
1063 ip_profile_dest_data
= {}
1064 if "ip-version" in ip_profile_source_data
:
1065 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1068 if "cidr" in ip_profile_source_data
:
1069 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1072 if "gateway-ip" in ip_profile_source_data
:
1073 ip_profile_dest_data
[
1075 ] = ip_profile_source_data
["gateway-ip"]
1076 if "dhcp-enabled" in ip_profile_source_data
:
1077 ip_profile_dest_data
["dhcp-params"] = {
1078 "enabled": ip_profile_source_data
["dhcp-enabled"]
1081 vld_params
["ip-profile"] = ip_profile_dest_data
1082 # update vld_params with instantiation params
1084 vld_instantiation_params
= find_in_list(
1085 get_iterable(vnf_params
, "internal-vld"),
1086 lambda i_vld
: i_vld
["name"] == vld
["id"],
1088 if vld_instantiation_params
:
1089 vld_params
.update(vld_instantiation_params
)
1090 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
1093 for vdur
in target_vnf
.get("vdur", ()):
1094 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
1095 continue # This vdu must not be created
1096 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
1098 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
1101 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
1102 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
1105 and vdu_configuration
.get("config-access")
1106 and vdu_configuration
.get("config-access").get("ssh-access")
1108 vdur
["ssh-keys"] = ssh_keys_all
1109 vdur
["ssh-access-required"] = vdu_configuration
[
1111 ]["ssh-access"]["required"]
1114 and vnf_configuration
.get("config-access")
1115 and vnf_configuration
.get("config-access").get("ssh-access")
1116 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
1118 vdur
["ssh-keys"] = ssh_keys_all
1119 vdur
["ssh-access-required"] = vnf_configuration
[
1121 ]["ssh-access"]["required"]
1122 elif ssh_keys_instantiation
and find_in_list(
1123 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
1125 vdur
["ssh-keys"] = ssh_keys_instantiation
1127 self
.logger
.debug("NS > vdur > {}".format(vdur
))
1129 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1131 if vdud
.get("cloud-init-file"):
1132 vdur
["cloud-init"] = "{}:file:{}".format(
1133 vnfd
["_id"], vdud
.get("cloud-init-file")
1135 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1136 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1137 base_folder
= vnfd
["_admin"]["storage"]
1138 if base_folder
["pkg-dir"]:
1139 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1140 base_folder
["folder"],
1141 base_folder
["pkg-dir"],
1142 vdud
.get("cloud-init-file"),
1145 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
1146 base_folder
["folder"],
1147 vdud
.get("cloud-init-file"),
1149 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1150 target
["cloud_init_content"][
1153 elif vdud
.get("cloud-init"):
1154 vdur
["cloud-init"] = "{}:vdu:{}".format(
1155 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1157 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1158 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1161 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1162 deploy_params_vdu
= self
._format
_additional
_params
(
1163 vdur
.get("additionalParams") or {}
1165 deploy_params_vdu
["OSM"] = get_osm_params(
1166 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1168 vdur
["additionalParams"] = deploy_params_vdu
1171 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1172 if target_vim
not in ns_flavor
["vim_info"]:
1173 ns_flavor
["vim_info"][target_vim
] = {}
1176 # in case alternative images are provided we must check if they should be applied
1177 # for the vim_type, modify the vim_type taking into account
1178 ns_image_id
= int(vdur
["ns-image-id"])
1179 if vdur
.get("alt-image-ids"):
1180 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1181 vim_type
= db_vim
["vim_type"]
1182 for alt_image_id
in vdur
.get("alt-image-ids"):
1183 ns_alt_image
= target
["image"][int(alt_image_id
)]
1184 if vim_type
== ns_alt_image
.get("vim-type"):
1185 # must use alternative image
1187 "use alternative image id: {}".format(alt_image_id
)
1189 ns_image_id
= alt_image_id
1190 vdur
["ns-image-id"] = ns_image_id
1192 ns_image
= target
["image"][int(ns_image_id
)]
1193 if target_vim
not in ns_image
["vim_info"]:
1194 ns_image
["vim_info"][target_vim
] = {}
1197 if vdur
.get("affinity-or-anti-affinity-group-id"):
1198 for ags_id
in vdur
["affinity-or-anti-affinity-group-id"]:
1199 ns_ags
= target
["affinity-or-anti-affinity-group"][int(ags_id
)]
1200 if target_vim
not in ns_ags
["vim_info"]:
1201 ns_ags
["vim_info"][target_vim
] = {}
1203 vdur
["vim_info"] = {target_vim
: {}}
1204 # instantiation parameters
1206 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
1207 # vdud["id"]), None)
1208 vdur_list
.append(vdur
)
1209 target_vnf
["vdur"] = vdur_list
1210 target
["vnf"].append(target_vnf
)
1212 desc
= await self
.RO
.deploy(nsr_id
, target
)
1213 self
.logger
.debug("RO return > {}".format(desc
))
1214 action_id
= desc
["action_id"]
1215 await self
._wait
_ng
_ro
(
1216 nsr_id
, action_id
, nslcmop_id
, start_deploy
, timeout_ns_deploy
, stage
1221 "_admin.deployed.RO.operational-status": "running",
1222 "detailed-status": " ".join(stage
),
1224 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1225 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1226 self
._write
_op
_status
(nslcmop_id
, stage
)
1228 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1232 async def _wait_ng_ro(
1241 detailed_status_old
= None
1243 start_time
= start_time
or time()
1244 while time() <= start_time
+ timeout
:
1245 desc_status
= await self
.RO
.status(nsr_id
, action_id
)
1246 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1247 if desc_status
["status"] == "FAILED":
1248 raise NgRoException(desc_status
["details"])
1249 elif desc_status
["status"] == "BUILD":
1251 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1252 elif desc_status
["status"] == "DONE":
1254 stage
[2] = "Deployed at VIM"
1257 assert False, "ROclient.check_ns_status returns unknown {}".format(
1258 desc_status
["status"]
1260 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1261 detailed_status_old
= stage
[2]
1262 db_nsr_update
["detailed-status"] = " ".join(stage
)
1263 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1264 self
._write
_op
_status
(nslcmop_id
, stage
)
1265 await asyncio
.sleep(15, loop
=self
.loop
)
1266 else: # timeout_ns_deploy
1267 raise NgRoException("Timeout waiting ns to deploy")
1269 async def _terminate_ng_ro(
1270 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1275 start_deploy
= time()
1282 "action_id": nslcmop_id
,
1284 desc
= await self
.RO
.deploy(nsr_id
, target
)
1285 action_id
= desc
["action_id"]
1286 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1287 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1290 + "ns terminate action at RO. action_id={}".format(action_id
)
1294 delete_timeout
= 20 * 60 # 20 minutes
1295 await self
._wait
_ng
_ro
(
1296 nsr_id
, action_id
, nslcmop_id
, start_deploy
, delete_timeout
, stage
1299 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1300 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1302 await self
.RO
.delete(nsr_id
)
1303 except Exception as e
:
1304 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
1305 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1306 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1307 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1309 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1311 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
1312 failed_detail
.append("delete conflict: {}".format(e
))
1315 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1318 failed_detail
.append("delete error: {}".format(e
))
1321 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1325 stage
[2] = "Error deleting from VIM"
1327 stage
[2] = "Deleted from VIM"
1328 db_nsr_update
["detailed-status"] = " ".join(stage
)
1329 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1330 self
._write
_op
_status
(nslcmop_id
, stage
)
1333 raise LcmException("; ".join(failed_detail
))
1336 async def instantiate_RO(
1350 :param logging_text: preffix text to use at logging
1351 :param nsr_id: nsr identity
1352 :param nsd: database content of ns descriptor
1353 :param db_nsr: database content of ns record
1354 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1356 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1357 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1358 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1359 :return: None or exception
1362 start_deploy
= time()
1363 ns_params
= db_nslcmop
.get("operationParams")
1364 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1365 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1367 timeout_ns_deploy
= self
.timeout
.get(
1368 "ns_deploy", self
.timeout_ns_deploy
1371 # Check for and optionally request placement optimization. Database will be updated if placement activated
1372 stage
[2] = "Waiting for Placement."
1373 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1374 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1375 for vnfr
in db_vnfrs
.values():
1376 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1379 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1381 return await self
._instantiate
_ng
_ro
(
1394 except Exception as e
:
1395 stage
[2] = "ERROR deploying at VIM"
1396 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1398 "Error deploying at VIM {}".format(e
),
1399 exc_info
=not isinstance(
1402 ROclient
.ROClientException
,
1411 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1413 Wait for kdu to be up, get ip address
1414 :param logging_text: prefix use for logging
1421 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1424 while nb_tries
< 360:
1425 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1429 for x
in get_iterable(db_vnfr
, "kdur")
1430 if x
.get("kdu-name") == kdu_name
1436 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1438 if kdur
.get("status"):
1439 if kdur
["status"] in ("READY", "ENABLED"):
1440 return kdur
.get("ip-address")
1443 "target KDU={} is in error state".format(kdu_name
)
1446 await asyncio
.sleep(10, loop
=self
.loop
)
1448 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1450 async def wait_vm_up_insert_key_ro(
1451 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1454 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1455 :param logging_text: prefix use for logging
1460 :param pub_key: public ssh key to inject, None to skip
1461 :param user: user to apply the public ssh key
1465 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1469 target_vdu_id
= None
1475 if ro_retries
>= 360: # 1 hour
1477 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1480 await asyncio
.sleep(10, loop
=self
.loop
)
1483 if not target_vdu_id
:
1484 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1486 if not vdu_id
: # for the VNF case
1487 if db_vnfr
.get("status") == "ERROR":
1489 "Cannot inject ssh-key because target VNF is in error state"
1491 ip_address
= db_vnfr
.get("ip-address")
1497 for x
in get_iterable(db_vnfr
, "vdur")
1498 if x
.get("ip-address") == ip_address
1506 for x
in get_iterable(db_vnfr
, "vdur")
1507 if x
.get("vdu-id-ref") == vdu_id
1508 and x
.get("count-index") == vdu_index
1514 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1515 ): # If only one, this should be the target vdu
1516 vdur
= db_vnfr
["vdur"][0]
1519 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1520 vnfr_id
, vdu_id
, vdu_index
1523 # New generation RO stores information at "vim_info"
1526 if vdur
.get("vim_info"):
1528 t
for t
in vdur
["vim_info"]
1529 ) # there should be only one key
1530 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1532 vdur
.get("pdu-type")
1533 or vdur
.get("status") == "ACTIVE"
1534 or ng_ro_status
== "ACTIVE"
1536 ip_address
= vdur
.get("ip-address")
1539 target_vdu_id
= vdur
["vdu-id-ref"]
1540 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1542 "Cannot inject ssh-key because target VM is in error state"
1545 if not target_vdu_id
:
1548 # inject public key into machine
1549 if pub_key
and user
:
1550 self
.logger
.debug(logging_text
+ "Inserting RO key")
1551 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1552 if vdur
.get("pdu-type"):
1553 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1556 ro_vm_id
= "{}-{}".format(
1557 db_vnfr
["member-vnf-index-ref"], target_vdu_id
1558 ) # TODO add vdu_index
1562 "action": "inject_ssh_key",
1566 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1568 desc
= await self
.RO
.deploy(nsr_id
, target
)
1569 action_id
= desc
["action_id"]
1570 await self
._wait
_ng
_ro
(nsr_id
, action_id
, timeout
=600)
1573 # wait until NS is deployed at RO
1575 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1576 ro_nsr_id
= deep_get(
1577 db_nsrs
, ("_admin", "deployed", "RO", "nsr_id")
1581 result_dict
= await self
.RO
.create_action(
1583 item_id_name
=ro_nsr_id
,
1585 "add_public_key": pub_key
,
1590 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1591 if not result_dict
or not isinstance(result_dict
, dict):
1593 "Unknown response from RO when injecting key"
1595 for result
in result_dict
.values():
1596 if result
.get("vim_result") == 200:
1599 raise ROclient
.ROClientException(
1600 "error injecting key: {}".format(
1601 result
.get("description")
1605 except NgRoException
as e
:
1607 "Reaching max tries injecting key. Error: {}".format(e
)
1609 except ROclient
.ROClientException
as e
:
1613 + "error injecting key: {}. Retrying until {} seconds".format(
1620 "Reaching max tries injecting key. Error: {}".format(e
)
1627 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1629 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1631 my_vca
= vca_deployed_list
[vca_index
]
1632 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1633 # vdu or kdu: no dependencies
1637 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1638 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1639 configuration_status_list
= db_nsr
["configurationStatus"]
1640 for index
, vca_deployed
in enumerate(configuration_status_list
):
1641 if index
== vca_index
:
1644 if not my_vca
.get("member-vnf-index") or (
1645 vca_deployed
.get("member-vnf-index")
1646 == my_vca
.get("member-vnf-index")
1648 internal_status
= configuration_status_list
[index
].get("status")
1649 if internal_status
== "READY":
1651 elif internal_status
== "BROKEN":
1653 "Configuration aborted because dependent charm/s has failed"
1658 # no dependencies, return
1660 await asyncio
.sleep(10)
1663 raise LcmException("Configuration aborted because dependent charm/s timeout")
1665 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1668 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1670 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1671 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1674 async def instantiate_N2VC(
1691 ee_config_descriptor
,
1693 nsr_id
= db_nsr
["_id"]
1694 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1695 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1696 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1697 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1699 "collection": "nsrs",
1700 "filter": {"_id": nsr_id
},
1701 "path": db_update_entry
,
1707 element_under_configuration
= nsr_id
1711 vnfr_id
= db_vnfr
["_id"]
1712 osm_config
["osm"]["vnf_id"] = vnfr_id
1714 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1716 if vca_type
== "native_charm":
1719 index_number
= vdu_index
or 0
1722 element_type
= "VNF"
1723 element_under_configuration
= vnfr_id
1724 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1726 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1727 element_type
= "VDU"
1728 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1729 osm_config
["osm"]["vdu_id"] = vdu_id
1731 namespace
+= ".{}".format(kdu_name
)
1732 element_type
= "KDU"
1733 element_under_configuration
= kdu_name
1734 osm_config
["osm"]["kdu_name"] = kdu_name
1737 if base_folder
["pkg-dir"]:
1738 artifact_path
= "{}/{}/{}/{}".format(
1739 base_folder
["folder"],
1740 base_folder
["pkg-dir"],
1742 if vca_type
in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1747 artifact_path
= "{}/Scripts/{}/{}/".format(
1748 base_folder
["folder"],
1750 if vca_type
in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1755 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1757 # get initial_config_primitive_list that applies to this element
1758 initial_config_primitive_list
= config_descriptor
.get(
1759 "initial-config-primitive"
1763 "Initial config primitive list > {}".format(
1764 initial_config_primitive_list
1768 # add config if not present for NS charm
1769 ee_descriptor_id
= ee_config_descriptor
.get("id")
1770 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1771 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1772 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1776 "Initial config primitive list #2 > {}".format(
1777 initial_config_primitive_list
1780 # n2vc_redesign STEP 3.1
1781 # find old ee_id if exists
1782 ee_id
= vca_deployed
.get("ee_id")
1784 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1785 # create or register execution environment in VCA
1786 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1788 self
._write
_configuration
_status
(
1790 vca_index
=vca_index
,
1792 element_under_configuration
=element_under_configuration
,
1793 element_type
=element_type
,
1796 step
= "create execution environment"
1797 self
.logger
.debug(logging_text
+ step
)
1801 if vca_type
== "k8s_proxy_charm":
1802 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1803 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1804 namespace
=namespace
,
1805 artifact_path
=artifact_path
,
1809 elif vca_type
== "helm" or vca_type
== "helm-v3":
1810 ee_id
, credentials
= await self
.vca_map
[
1812 ].create_execution_environment(
1813 namespace
=namespace
,
1817 artifact_path
=artifact_path
,
1821 ee_id
, credentials
= await self
.vca_map
[
1823 ].create_execution_environment(
1824 namespace
=namespace
,
1830 elif vca_type
== "native_charm":
1831 step
= "Waiting to VM being up and getting IP address"
1832 self
.logger
.debug(logging_text
+ step
)
1833 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1842 credentials
= {"hostname": rw_mgmt_ip
}
1844 username
= deep_get(
1845 config_descriptor
, ("config-access", "ssh-access", "default-user")
1847 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1848 # merged. Meanwhile let's get username from initial-config-primitive
1849 if not username
and initial_config_primitive_list
:
1850 for config_primitive
in initial_config_primitive_list
:
1851 for param
in config_primitive
.get("parameter", ()):
1852 if param
["name"] == "ssh-username":
1853 username
= param
["value"]
1857 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1858 "'config-access.ssh-access.default-user'"
1860 credentials
["username"] = username
1861 # n2vc_redesign STEP 3.2
1863 self
._write
_configuration
_status
(
1865 vca_index
=vca_index
,
1866 status
="REGISTERING",
1867 element_under_configuration
=element_under_configuration
,
1868 element_type
=element_type
,
1871 step
= "register execution environment {}".format(credentials
)
1872 self
.logger
.debug(logging_text
+ step
)
1873 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1874 credentials
=credentials
,
1875 namespace
=namespace
,
1880 # for compatibility with MON/POL modules, the need model and application name at database
1881 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1882 ee_id_parts
= ee_id
.split(".")
1883 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1884 if len(ee_id_parts
) >= 2:
1885 model_name
= ee_id_parts
[0]
1886 application_name
= ee_id_parts
[1]
1887 db_nsr_update
[db_update_entry
+ "model"] = model_name
1888 db_nsr_update
[db_update_entry
+ "application"] = application_name
1890 # n2vc_redesign STEP 3.3
1891 step
= "Install configuration Software"
1893 self
._write
_configuration
_status
(
1895 vca_index
=vca_index
,
1896 status
="INSTALLING SW",
1897 element_under_configuration
=element_under_configuration
,
1898 element_type
=element_type
,
1899 other_update
=db_nsr_update
,
1902 # TODO check if already done
1903 self
.logger
.debug(logging_text
+ step
)
1905 if vca_type
== "native_charm":
1906 config_primitive
= next(
1907 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
1910 if config_primitive
:
1911 config
= self
._map
_primitive
_params
(
1912 config_primitive
, {}, deploy_params
1915 if vca_type
== "lxc_proxy_charm":
1916 if element_type
== "NS":
1917 num_units
= db_nsr
.get("config-units") or 1
1918 elif element_type
== "VNF":
1919 num_units
= db_vnfr
.get("config-units") or 1
1920 elif element_type
== "VDU":
1921 for v
in db_vnfr
["vdur"]:
1922 if vdu_id
== v
["vdu-id-ref"]:
1923 num_units
= v
.get("config-units") or 1
1925 if vca_type
!= "k8s_proxy_charm":
1926 await self
.vca_map
[vca_type
].install_configuration_sw(
1928 artifact_path
=artifact_path
,
1931 num_units
=num_units
,
1936 # write in db flag of configuration_sw already installed
1938 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
1941 # add relations for this VCA (wait for other peers related with this VCA)
1942 await self
._add
_vca
_relations
(
1943 logging_text
=logging_text
,
1946 vca_index
=vca_index
,
1949 # if SSH access is required, then get execution environment SSH public
1950 # if native charm we have waited already to VM be UP
1951 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1954 # self.logger.debug("get ssh key block")
1956 config_descriptor
, ("config-access", "ssh-access", "required")
1958 # self.logger.debug("ssh key needed")
1959 # Needed to inject a ssh key
1962 ("config-access", "ssh-access", "default-user"),
1964 step
= "Install configuration Software, getting public ssh key"
1965 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
1966 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
1969 step
= "Insert public key into VM user={} ssh_key={}".format(
1973 # self.logger.debug("no need to get ssh key")
1974 step
= "Waiting to VM being up and getting IP address"
1975 self
.logger
.debug(logging_text
+ step
)
1977 # n2vc_redesign STEP 5.1
1978 # wait for RO (ip-address) Insert pub_key into VM
1981 rw_mgmt_ip
= await self
.wait_kdu_up(
1982 logging_text
, nsr_id
, vnfr_id
, kdu_name
1985 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1995 rw_mgmt_ip
= None # This is for a NS configuration
1997 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
1999 # store rw_mgmt_ip in deploy params for later replacement
2000 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
2002 # n2vc_redesign STEP 6 Execute initial config primitive
2003 step
= "execute initial config primitive"
2005 # wait for dependent primitives execution (NS -> VNF -> VDU)
2006 if initial_config_primitive_list
:
2007 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
2009 # stage, in function of element type: vdu, kdu, vnf or ns
2010 my_vca
= vca_deployed_list
[vca_index
]
2011 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
2013 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
2014 elif my_vca
.get("member-vnf-index"):
2016 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
2019 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
2021 self
._write
_configuration
_status
(
2022 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
2025 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2027 check_if_terminated_needed
= True
2028 for initial_config_primitive
in initial_config_primitive_list
:
2029 # adding information on the vca_deployed if it is a NS execution environment
2030 if not vca_deployed
["member-vnf-index"]:
2031 deploy_params
["ns_config_info"] = json
.dumps(
2032 self
._get
_ns
_config
_info
(nsr_id
)
2034 # TODO check if already done
2035 primitive_params_
= self
._map
_primitive
_params
(
2036 initial_config_primitive
, {}, deploy_params
2039 step
= "execute primitive '{}' params '{}'".format(
2040 initial_config_primitive
["name"], primitive_params_
2042 self
.logger
.debug(logging_text
+ step
)
2043 await self
.vca_map
[vca_type
].exec_primitive(
2045 primitive_name
=initial_config_primitive
["name"],
2046 params_dict
=primitive_params_
,
2051 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2052 if check_if_terminated_needed
:
2053 if config_descriptor
.get("terminate-config-primitive"):
2055 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
2057 check_if_terminated_needed
= False
2059 # TODO register in database that primitive is done
2061 # STEP 7 Configure metrics
2062 if vca_type
== "helm" or vca_type
== "helm-v3":
2063 prometheus_jobs
= await self
.extract_prometheus_scrape_jobs(
2065 artifact_path
=artifact_path
,
2066 ee_config_descriptor
=ee_config_descriptor
,
2069 target_ip
=rw_mgmt_ip
,
2075 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
2078 for job
in prometheus_jobs
:
2082 "job_name": job
["job_name"]
2086 fail_on_empty
=False,
2089 step
= "instantiated at VCA"
2090 self
.logger
.debug(logging_text
+ step
)
2092 self
._write
_configuration
_status
(
2093 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
2096 except Exception as e
: # TODO not use Exception but N2VC exception
2097 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2099 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
2102 "Exception while {} : {}".format(step
, e
), exc_info
=True
2104 self
._write
_configuration
_status
(
2105 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2107 raise LcmException("{} {}".format(step
, e
)) from e
2109 def _write_ns_status(
2113 current_operation
: str,
2114 current_operation_id
: str,
2115 error_description
: str = None,
2116 error_detail
: str = None,
2117 other_update
: dict = None,
2120 Update db_nsr fields.
2123 :param current_operation:
2124 :param current_operation_id:
2125 :param error_description:
2126 :param error_detail:
2127 :param other_update: Other required changes at database if provided, will be cleared
2131 db_dict
= other_update
or {}
2134 ] = current_operation_id
# for backward compatibility
2135 db_dict
["_admin.current-operation"] = current_operation_id
2136 db_dict
["_admin.operation-type"] = (
2137 current_operation
if current_operation
!= "IDLE" else None
2139 db_dict
["currentOperation"] = current_operation
2140 db_dict
["currentOperationID"] = current_operation_id
2141 db_dict
["errorDescription"] = error_description
2142 db_dict
["errorDetail"] = error_detail
2145 db_dict
["nsState"] = ns_state
2146 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2147 except DbException
as e
:
2148 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2150 def _write_op_status(
2154 error_message
: str = None,
2155 queuePosition
: int = 0,
2156 operation_state
: str = None,
2157 other_update
: dict = None,
2160 db_dict
= other_update
or {}
2161 db_dict
["queuePosition"] = queuePosition
2162 if isinstance(stage
, list):
2163 db_dict
["stage"] = stage
[0]
2164 db_dict
["detailed-status"] = " ".join(stage
)
2165 elif stage
is not None:
2166 db_dict
["stage"] = str(stage
)
2168 if error_message
is not None:
2169 db_dict
["errorMessage"] = error_message
2170 if operation_state
is not None:
2171 db_dict
["operationState"] = operation_state
2172 db_dict
["statusEnteredTime"] = time()
2173 self
.update_db_2("nslcmops", op_id
, db_dict
)
2174 except DbException
as e
:
2176 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2179 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2181 nsr_id
= db_nsr
["_id"]
2182 # configurationStatus
2183 config_status
= db_nsr
.get("configurationStatus")
2186 "configurationStatus.{}.status".format(index
): status
2187 for index
, v
in enumerate(config_status
)
2191 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2193 except DbException
as e
:
2195 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2198 def _write_configuration_status(
2203 element_under_configuration
: str = None,
2204 element_type
: str = None,
2205 other_update
: dict = None,
2208 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2209 # .format(vca_index, status))
2212 db_path
= "configurationStatus.{}.".format(vca_index
)
2213 db_dict
= other_update
or {}
2215 db_dict
[db_path
+ "status"] = status
2216 if element_under_configuration
:
2218 db_path
+ "elementUnderConfiguration"
2219 ] = element_under_configuration
2221 db_dict
[db_path
+ "elementType"] = element_type
2222 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2223 except DbException
as e
:
2225 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2226 status
, nsr_id
, vca_index
, e
2230 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2232 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2233 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2234 Database is used because the result can be obtained from a different LCM worker in case of HA.
2235 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2236 :param db_nslcmop: database content of nslcmop
2237 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2238 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2239 computed 'vim-account-id'
2242 nslcmop_id
= db_nslcmop
["_id"]
2243 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2244 if placement_engine
== "PLA":
2246 logging_text
+ "Invoke and wait for placement optimization"
2248 await self
.msg
.aiowrite(
2249 "pla", "get_placement", {"nslcmopId": nslcmop_id
}, loop
=self
.loop
2251 db_poll_interval
= 5
2252 wait
= db_poll_interval
* 10
2254 while not pla_result
and wait
>= 0:
2255 await asyncio
.sleep(db_poll_interval
)
2256 wait
-= db_poll_interval
2257 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2258 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2262 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2265 for pla_vnf
in pla_result
["vnf"]:
2266 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2267 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2272 {"_id": vnfr
["_id"]},
2273 {"vim-account-id": pla_vnf
["vimAccountId"]},
2276 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2279 def update_nsrs_with_pla_result(self
, params
):
2281 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2283 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2285 except Exception as e
:
2286 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2288 async def instantiate(self
, nsr_id
, nslcmop_id
):
2291 :param nsr_id: ns instance to deploy
2292 :param nslcmop_id: operation to run
2296 # Try to lock HA task here
2297 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2298 if not task_is_locked_by_me
:
2300 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2304 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2305 self
.logger
.debug(logging_text
+ "Enter")
2307 # get all needed from database
2309 # database nsrs record
2312 # database nslcmops record
2315 # update operation on nsrs
2317 # update operation on nslcmops
2318 db_nslcmop_update
= {}
2320 nslcmop_operation_state
= None
2321 db_vnfrs
= {} # vnf's info indexed by member-index
2323 tasks_dict_info
= {} # from task to info text
2327 "Stage 1/5: preparation of the environment.",
2328 "Waiting for previous operations to terminate.",
2331 # ^ stage, step, VIM progress
2333 # wait for any previous tasks in process
2334 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2336 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2337 stage
[1] = "Reading from database."
2338 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2339 db_nsr_update
["detailed-status"] = "creating"
2340 db_nsr_update
["operational-status"] = "init"
2341 self
._write
_ns
_status
(
2343 ns_state
="BUILDING",
2344 current_operation
="INSTANTIATING",
2345 current_operation_id
=nslcmop_id
,
2346 other_update
=db_nsr_update
,
2348 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2350 # read from db: operation
2351 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2352 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2353 if db_nslcmop
["operationParams"].get("additionalParamsForVnf"):
2354 db_nslcmop
["operationParams"]["additionalParamsForVnf"] = json
.loads(
2355 db_nslcmop
["operationParams"]["additionalParamsForVnf"]
2357 ns_params
= db_nslcmop
.get("operationParams")
2358 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2359 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2361 timeout_ns_deploy
= self
.timeout
.get(
2362 "ns_deploy", self
.timeout_ns_deploy
2366 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2367 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2368 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2369 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2370 self
.fs
.sync(db_nsr
["nsd-id"])
2372 # nsr_name = db_nsr["name"] # TODO short-name??
2374 # read from db: vnf's of this ns
2375 stage
[1] = "Getting vnfrs from db."
2376 self
.logger
.debug(logging_text
+ stage
[1])
2377 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2379 # read from db: vnfd's for every vnf
2380 db_vnfds
= [] # every vnfd data
2382 # for each vnf in ns, read vnfd
2383 for vnfr
in db_vnfrs_list
:
2384 if vnfr
.get("kdur"):
2386 for kdur
in vnfr
["kdur"]:
2387 if kdur
.get("additionalParams"):
2388 kdur
["additionalParams"] = json
.loads(
2389 kdur
["additionalParams"]
2391 kdur_list
.append(kdur
)
2392 vnfr
["kdur"] = kdur_list
2394 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2395 vnfd_id
= vnfr
["vnfd-id"]
2396 vnfd_ref
= vnfr
["vnfd-ref"]
2397 self
.fs
.sync(vnfd_id
)
2399 # if we haven't this vnfd, read it from db
2400 if vnfd_id
not in db_vnfds
:
2402 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2405 self
.logger
.debug(logging_text
+ stage
[1])
2406 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2409 db_vnfds
.append(vnfd
)
2411 # Get or generates the _admin.deployed.VCA list
2412 vca_deployed_list
= None
2413 if db_nsr
["_admin"].get("deployed"):
2414 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2415 if vca_deployed_list
is None:
2416 vca_deployed_list
= []
2417 configuration_status_list
= []
2418 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2419 db_nsr_update
["configurationStatus"] = configuration_status_list
2420 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2421 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2422 elif isinstance(vca_deployed_list
, dict):
2423 # maintain backward compatibility. Change a dict to list at database
2424 vca_deployed_list
= list(vca_deployed_list
.values())
2425 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2426 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2429 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2431 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2432 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2434 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2435 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2436 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2438 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2441 # n2vc_redesign STEP 2 Deploy Network Scenario
2442 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2443 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2445 stage
[1] = "Deploying KDUs."
2446 # self.logger.debug(logging_text + "Before deploy_kdus")
2447 # Call to deploy_kdus in case exists the "vdu:kdu" param
2448 await self
.deploy_kdus(
2449 logging_text
=logging_text
,
2451 nslcmop_id
=nslcmop_id
,
2454 task_instantiation_info
=tasks_dict_info
,
2457 stage
[1] = "Getting VCA public key."
2458 # n2vc_redesign STEP 1 Get VCA public ssh-key
2459 # feature 1429. Add n2vc public key to needed VMs
2460 n2vc_key
= self
.n2vc
.get_public_key()
2461 n2vc_key_list
= [n2vc_key
]
2462 if self
.vca_config
.get("public_key"):
2463 n2vc_key_list
.append(self
.vca_config
["public_key"])
2465 stage
[1] = "Deploying NS at VIM."
2466 task_ro
= asyncio
.ensure_future(
2467 self
.instantiate_RO(
2468 logging_text
=logging_text
,
2472 db_nslcmop
=db_nslcmop
,
2475 n2vc_key_list
=n2vc_key_list
,
2479 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2480 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2482 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2483 stage
[1] = "Deploying Execution Environments."
2484 self
.logger
.debug(logging_text
+ stage
[1])
2486 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2487 for vnf_profile
in get_vnf_profiles(nsd
):
2488 vnfd_id
= vnf_profile
["vnfd-id"]
2489 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2490 member_vnf_index
= str(vnf_profile
["id"])
2491 db_vnfr
= db_vnfrs
[member_vnf_index
]
2492 base_folder
= vnfd
["_admin"]["storage"]
2498 # Get additional parameters
2499 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2500 if db_vnfr
.get("additionalParamsForVnf"):
2501 deploy_params
.update(
2502 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2505 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2506 if descriptor_config
:
2508 logging_text
=logging_text
2509 + "member_vnf_index={} ".format(member_vnf_index
),
2512 nslcmop_id
=nslcmop_id
,
2518 member_vnf_index
=member_vnf_index
,
2519 vdu_index
=vdu_index
,
2521 deploy_params
=deploy_params
,
2522 descriptor_config
=descriptor_config
,
2523 base_folder
=base_folder
,
2524 task_instantiation_info
=tasks_dict_info
,
2528 # Deploy charms for each VDU that supports one.
2529 for vdud
in get_vdu_list(vnfd
):
2531 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2532 vdur
= find_in_list(
2533 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2536 if vdur
.get("additionalParams"):
2537 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2539 deploy_params_vdu
= deploy_params
2540 deploy_params_vdu
["OSM"] = get_osm_params(
2541 db_vnfr
, vdu_id
, vdu_count_index
=0
2543 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2545 self
.logger
.debug("VDUD > {}".format(vdud
))
2547 "Descriptor config > {}".format(descriptor_config
)
2549 if descriptor_config
:
2552 for vdu_index
in range(vdud_count
):
2553 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2555 logging_text
=logging_text
2556 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2557 member_vnf_index
, vdu_id
, vdu_index
2561 nslcmop_id
=nslcmop_id
,
2567 member_vnf_index
=member_vnf_index
,
2568 vdu_index
=vdu_index
,
2570 deploy_params
=deploy_params_vdu
,
2571 descriptor_config
=descriptor_config
,
2572 base_folder
=base_folder
,
2573 task_instantiation_info
=tasks_dict_info
,
2576 for kdud
in get_kdu_list(vnfd
):
2577 kdu_name
= kdud
["name"]
2578 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2579 if descriptor_config
:
2584 x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
2586 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2587 if kdur
.get("additionalParams"):
2588 deploy_params_kdu
.update(
2589 parse_yaml_strings(kdur
["additionalParams"].copy())
2593 logging_text
=logging_text
,
2596 nslcmop_id
=nslcmop_id
,
2602 member_vnf_index
=member_vnf_index
,
2603 vdu_index
=vdu_index
,
2605 deploy_params
=deploy_params_kdu
,
2606 descriptor_config
=descriptor_config
,
2607 base_folder
=base_folder
,
2608 task_instantiation_info
=tasks_dict_info
,
2612 # Check if this NS has a charm configuration
2613 descriptor_config
= nsd
.get("ns-configuration")
2614 if descriptor_config
and descriptor_config
.get("juju"):
2617 member_vnf_index
= None
2623 # Get additional parameters
2624 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2625 if db_nsr
.get("additionalParamsForNs"):
2626 deploy_params
.update(
2627 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2629 base_folder
= nsd
["_admin"]["storage"]
2631 logging_text
=logging_text
,
2634 nslcmop_id
=nslcmop_id
,
2640 member_vnf_index
=member_vnf_index
,
2641 vdu_index
=vdu_index
,
2643 deploy_params
=deploy_params
,
2644 descriptor_config
=descriptor_config
,
2645 base_folder
=base_folder
,
2646 task_instantiation_info
=tasks_dict_info
,
2650 # rest of staff will be done at finally
2653 ROclient
.ROClientException
,
2659 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
2662 except asyncio
.CancelledError
:
2664 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
2666 exc
= "Operation was cancelled"
2667 except Exception as e
:
2668 exc
= traceback
.format_exc()
2669 self
.logger
.critical(
2670 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
2675 error_list
.append(str(exc
))
2677 # wait for pending tasks
2679 stage
[1] = "Waiting for instantiate pending tasks."
2680 self
.logger
.debug(logging_text
+ stage
[1])
2681 error_list
+= await self
._wait
_for
_tasks
(
2689 stage
[1] = stage
[2] = ""
2690 except asyncio
.CancelledError
:
2691 error_list
.append("Cancelled")
2692 # TODO cancel all tasks
2693 except Exception as exc
:
2694 error_list
.append(str(exc
))
2696 # update operation-status
2697 db_nsr_update
["operational-status"] = "running"
2698 # let's begin with VCA 'configured' status (later we can change it)
2699 db_nsr_update
["config-status"] = "configured"
2700 for task
, task_name
in tasks_dict_info
.items():
2701 if not task
.done() or task
.cancelled() or task
.exception():
2702 if task_name
.startswith(self
.task_name_deploy_vca
):
2703 # A N2VC task is pending
2704 db_nsr_update
["config-status"] = "failed"
2706 # RO or KDU task is pending
2707 db_nsr_update
["operational-status"] = "failed"
2709 # update status at database
2711 error_detail
= ". ".join(error_list
)
2712 self
.logger
.error(logging_text
+ error_detail
)
2713 error_description_nslcmop
= "{} Detail: {}".format(
2714 stage
[0], error_detail
2716 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
2717 nslcmop_id
, stage
[0]
2720 db_nsr_update
["detailed-status"] = (
2721 error_description_nsr
+ " Detail: " + error_detail
2723 db_nslcmop_update
["detailed-status"] = error_detail
2724 nslcmop_operation_state
= "FAILED"
2728 error_description_nsr
= error_description_nslcmop
= None
2730 db_nsr_update
["detailed-status"] = "Done"
2731 db_nslcmop_update
["detailed-status"] = "Done"
2732 nslcmop_operation_state
= "COMPLETED"
2735 self
._write
_ns
_status
(
2738 current_operation
="IDLE",
2739 current_operation_id
=None,
2740 error_description
=error_description_nsr
,
2741 error_detail
=error_detail
,
2742 other_update
=db_nsr_update
,
2744 self
._write
_op
_status
(
2747 error_message
=error_description_nslcmop
,
2748 operation_state
=nslcmop_operation_state
,
2749 other_update
=db_nslcmop_update
,
2752 if nslcmop_operation_state
:
2754 await self
.msg
.aiowrite(
2759 "nslcmop_id": nslcmop_id
,
2760 "operationState": nslcmop_operation_state
,
2764 except Exception as e
:
2766 logging_text
+ "kafka_write notification Exception {}".format(e
)
2769 self
.logger
.debug(logging_text
+ "Exit")
2770 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2772 def _get_vnfd(self
, vnfd_id
: str, cached_vnfds
: Dict
[str, Any
]):
2773 if vnfd_id
not in cached_vnfds
:
2774 cached_vnfds
[vnfd_id
] = self
.db
.get_one("vnfds", {"id": vnfd_id
})
2775 return cached_vnfds
[vnfd_id
]
2777 def _get_vnfr(self
, nsr_id
: str, vnf_profile_id
: str, cached_vnfrs
: Dict
[str, Any
]):
2778 if vnf_profile_id
not in cached_vnfrs
:
2779 cached_vnfrs
[vnf_profile_id
] = self
.db
.get_one(
2782 "member-vnf-index-ref": vnf_profile_id
,
2783 "nsr-id-ref": nsr_id
,
2786 return cached_vnfrs
[vnf_profile_id
]
2788 def _is_deployed_vca_in_relation(
2789 self
, vca
: DeployedVCA
, relation
: Relation
2792 for endpoint
in (relation
.provider
, relation
.requirer
):
2793 if endpoint
["kdu-resource-profile-id"]:
2796 vca
.vnf_profile_id
== endpoint
.vnf_profile_id
2797 and vca
.vdu_profile_id
== endpoint
.vdu_profile_id
2798 and vca
.execution_environment_ref
== endpoint
.execution_environment_ref
2804 def _update_ee_relation_data_with_implicit_data(
2805 self
, nsr_id
, nsd
, ee_relation_data
, cached_vnfds
, vnf_profile_id
: str = None
2807 ee_relation_data
= safe_get_ee_relation(
2808 nsr_id
, ee_relation_data
, vnf_profile_id
=vnf_profile_id
2810 ee_relation_level
= EELevel
.get_level(ee_relation_data
)
2811 if (ee_relation_level
in (EELevel
.VNF
, EELevel
.VDU
)) and not ee_relation_data
[
2812 "execution-environment-ref"
2814 vnf_profile
= get_vnf_profile(nsd
, ee_relation_data
["vnf-profile-id"])
2815 vnfd_id
= vnf_profile
["vnfd-id"]
2816 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2819 if ee_relation_level
== EELevel
.VNF
2820 else ee_relation_data
["vdu-profile-id"]
2822 ee
= get_juju_ee_ref(db_vnfd
, entity_id
)
2825 f
"not execution environments found for ee_relation {ee_relation_data}"
2827 ee_relation_data
["execution-environment-ref"] = ee
["id"]
2828 return ee_relation_data
2830 def _get_ns_relations(
2833 nsd
: Dict
[str, Any
],
2835 cached_vnfds
: Dict
[str, Any
],
2836 ) -> List
[Relation
]:
2838 db_ns_relations
= get_ns_configuration_relation_list(nsd
)
2839 for r
in db_ns_relations
:
2840 provider_dict
= None
2841 requirer_dict
= None
2842 if all(key
in r
for key
in ("provider", "requirer")):
2843 provider_dict
= r
["provider"]
2844 requirer_dict
= r
["requirer"]
2845 elif "entities" in r
:
2846 provider_id
= r
["entities"][0]["id"]
2849 "endpoint": r
["entities"][0]["endpoint"],
2851 if provider_id
!= nsd
["id"]:
2852 provider_dict
["vnf-profile-id"] = provider_id
2853 requirer_id
= r
["entities"][1]["id"]
2856 "endpoint": r
["entities"][1]["endpoint"],
2858 if requirer_id
!= nsd
["id"]:
2859 requirer_dict
["vnf-profile-id"] = requirer_id
2861 raise Exception("provider/requirer or entities must be included in the relation.")
2862 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2863 nsr_id
, nsd
, provider_dict
, cached_vnfds
2865 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2866 nsr_id
, nsd
, requirer_dict
, cached_vnfds
2868 provider
= EERelation(relation_provider
)
2869 requirer
= EERelation(relation_requirer
)
2870 relation
= Relation(r
["name"], provider
, requirer
)
2871 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
2873 relations
.append(relation
)
2876 def _get_vnf_relations(
2879 nsd
: Dict
[str, Any
],
2881 cached_vnfds
: Dict
[str, Any
],
2882 ) -> List
[Relation
]:
2884 vnf_profile
= get_vnf_profile(nsd
, vca
.vnf_profile_id
)
2885 vnf_profile_id
= vnf_profile
["id"]
2886 vnfd_id
= vnf_profile
["vnfd-id"]
2887 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2888 db_vnf_relations
= get_relation_list(db_vnfd
, vnfd_id
)
2889 for r
in db_vnf_relations
:
2890 provider_dict
= None
2891 requirer_dict
= None
2892 if all(key
in r
for key
in ("provider", "requirer")):
2893 provider_dict
= r
["provider"]
2894 requirer_dict
= r
["requirer"]
2895 elif "entities" in r
:
2896 provider_id
= r
["entities"][0]["id"]
2899 "vnf-profile-id": vnf_profile_id
,
2900 "endpoint": r
["entities"][0]["endpoint"],
2902 if provider_id
!= vnfd_id
:
2903 provider_dict
["vdu-profile-id"] = provider_id
2904 requirer_id
= r
["entities"][1]["id"]
2907 "vnf-profile-id": vnf_profile_id
,
2908 "endpoint": r
["entities"][1]["endpoint"],
2910 if requirer_id
!= vnfd_id
:
2911 requirer_dict
["vdu-profile-id"] = requirer_id
2913 raise Exception("provider/requirer or entities must be included in the relation.")
2914 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2915 nsr_id
, nsd
, provider_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
2917 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2918 nsr_id
, nsd
, requirer_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
2920 provider
= EERelation(relation_provider
)
2921 requirer
= EERelation(relation_requirer
)
2922 relation
= Relation(r
["name"], provider
, requirer
)
2923 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
2925 relations
.append(relation
)
2928 def _get_kdu_resource_data(
2930 ee_relation
: EERelation
,
2931 db_nsr
: Dict
[str, Any
],
2932 cached_vnfds
: Dict
[str, Any
],
2933 ) -> DeployedK8sResource
:
2934 nsd
= get_nsd(db_nsr
)
2935 vnf_profiles
= get_vnf_profiles(nsd
)
2936 vnfd_id
= find_in_list(
2938 lambda vnf_profile
: vnf_profile
["id"] == ee_relation
.vnf_profile_id
,
2940 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2941 kdu_resource_profile
= get_kdu_resource_profile(
2942 db_vnfd
, ee_relation
.kdu_resource_profile_id
2944 kdu_name
= kdu_resource_profile
["kdu-name"]
2945 deployed_kdu
, _
= get_deployed_kdu(
2946 db_nsr
.get("_admin", ()).get("deployed", ()),
2948 ee_relation
.vnf_profile_id
,
2950 deployed_kdu
.update({"resource-name": kdu_resource_profile
["resource-name"]})
2953 def _get_deployed_component(
2955 ee_relation
: EERelation
,
2956 db_nsr
: Dict
[str, Any
],
2957 cached_vnfds
: Dict
[str, Any
],
2958 ) -> DeployedComponent
:
2959 nsr_id
= db_nsr
["_id"]
2960 deployed_component
= None
2961 ee_level
= EELevel
.get_level(ee_relation
)
2962 if ee_level
== EELevel
.NS
:
2963 vca
= get_deployed_vca(db_nsr
, {"vdu_id": None, "member-vnf-index": None})
2965 deployed_component
= DeployedVCA(nsr_id
, vca
)
2966 elif ee_level
== EELevel
.VNF
:
2967 vca
= get_deployed_vca(
2971 "member-vnf-index": ee_relation
.vnf_profile_id
,
2972 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
2976 deployed_component
= DeployedVCA(nsr_id
, vca
)
2977 elif ee_level
== EELevel
.VDU
:
2978 vca
= get_deployed_vca(
2981 "vdu_id": ee_relation
.vdu_profile_id
,
2982 "member-vnf-index": ee_relation
.vnf_profile_id
,
2983 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
2987 deployed_component
= DeployedVCA(nsr_id
, vca
)
2988 elif ee_level
== EELevel
.KDU
:
2989 kdu_resource_data
= self
._get
_kdu
_resource
_data
(
2990 ee_relation
, db_nsr
, cached_vnfds
2992 if kdu_resource_data
:
2993 deployed_component
= DeployedK8sResource(kdu_resource_data
)
2994 return deployed_component
2996 async def _add_relation(
3000 db_nsr
: Dict
[str, Any
],
3001 cached_vnfds
: Dict
[str, Any
],
3002 cached_vnfrs
: Dict
[str, Any
],
3004 deployed_provider
= self
._get
_deployed
_component
(
3005 relation
.provider
, db_nsr
, cached_vnfds
3007 deployed_requirer
= self
._get
_deployed
_component
(
3008 relation
.requirer
, db_nsr
, cached_vnfds
3012 and deployed_requirer
3013 and deployed_provider
.config_sw_installed
3014 and deployed_requirer
.config_sw_installed
3016 provider_db_vnfr
= (
3018 relation
.provider
.nsr_id
,
3019 relation
.provider
.vnf_profile_id
,
3022 if relation
.provider
.vnf_profile_id
3025 requirer_db_vnfr
= (
3027 relation
.requirer
.nsr_id
,
3028 relation
.requirer
.vnf_profile_id
,
3031 if relation
.requirer
.vnf_profile_id
3034 provider_vca_id
= self
.get_vca_id(provider_db_vnfr
, db_nsr
)
3035 requirer_vca_id
= self
.get_vca_id(requirer_db_vnfr
, db_nsr
)
3036 provider_relation_endpoint
= RelationEndpoint(
3037 deployed_provider
.ee_id
,
3039 relation
.provider
.endpoint
,
3041 requirer_relation_endpoint
= RelationEndpoint(
3042 deployed_requirer
.ee_id
,
3044 relation
.requirer
.endpoint
,
3046 await self
.vca_map
[vca_type
].add_relation(
3047 provider
=provider_relation_endpoint
,
3048 requirer
=requirer_relation_endpoint
,
3050 # remove entry from relations list
3054 async def _add_vca_relations(
3060 timeout
: int = 3600,
3064 # 1. find all relations for this VCA
3065 # 2. wait for other peers related
3069 # STEP 1: find all relations for this VCA
3072 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3073 nsd
= get_nsd(db_nsr
)
3076 deployed_vca_dict
= get_deployed_vca_list(db_nsr
)[vca_index
]
3077 my_vca
= DeployedVCA(nsr_id
, deployed_vca_dict
)
3082 relations
.extend(self
._get
_ns
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3083 relations
.extend(self
._get
_vnf
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3085 # if no relations, terminate
3087 self
.logger
.debug(logging_text
+ " No relations")
3090 self
.logger
.debug(logging_text
+ " adding relations {}".format(relations
))
3097 if now
- start
>= timeout
:
3098 self
.logger
.error(logging_text
+ " : timeout adding relations")
3101 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3102 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3104 # for each relation, find the VCA's related
3105 for relation
in relations
.copy():
3106 added
= await self
._add
_relation
(
3114 relations
.remove(relation
)
3117 self
.logger
.debug("Relations added")
3119 await asyncio
.sleep(5.0)
3123 except Exception as e
:
3124 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
3127 async def _install_kdu(
3135 k8s_instance_info
: dict,
3136 k8params
: dict = None,
3142 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
3145 "collection": "nsrs",
3146 "filter": {"_id": nsr_id
},
3147 "path": nsr_db_path
,
3150 if k8s_instance_info
.get("kdu-deployment-name"):
3151 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
3153 kdu_instance
= self
.k8scluster_map
[
3155 ].generate_kdu_instance_name(
3156 db_dict
=db_dict_install
,
3157 kdu_model
=k8s_instance_info
["kdu-model"],
3158 kdu_name
=k8s_instance_info
["kdu-name"],
3161 # Update the nsrs table with the kdu-instance value
3165 _desc
={nsr_db_path
+ ".kdu-instance": kdu_instance
},
3168 # Update the nsrs table with the actual namespace being used, if the k8scluster-type is `juju` or
3169 # `juju-bundle`. This verification is needed because there is not a standard/homogeneous namespace
3170 # between the Helm Charts and Juju Bundles-based KNFs. If we found a way of having an homogeneous
3171 # namespace, this first verification could be removed, and the next step would be done for any kind
3173 # TODO -> find a way to have an homogeneous namespace between the Helm Charts and Juju Bundles-based
3174 # KNFs (Bug 2027: https://osm.etsi.org/bugzilla/show_bug.cgi?id=2027)
3175 if k8sclustertype
in ("juju", "juju-bundle"):
3176 # First, verify if the current namespace is present in the `_admin.projects_read` (if not, it means
3177 # that the user passed a namespace which he wants its KDU to be deployed in)
3183 "_admin.projects_write": k8s_instance_info
["namespace"],
3184 "_admin.projects_read": k8s_instance_info
["namespace"],
3190 f
"Updating namespace/model for Juju Bundle from {k8s_instance_info['namespace']} to {kdu_instance}"
3195 _desc
={f
"{nsr_db_path}.namespace": kdu_instance
},
3197 k8s_instance_info
["namespace"] = kdu_instance
3199 await self
.k8scluster_map
[k8sclustertype
].install(
3200 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3201 kdu_model
=k8s_instance_info
["kdu-model"],
3204 db_dict
=db_dict_install
,
3206 kdu_name
=k8s_instance_info
["kdu-name"],
3207 namespace
=k8s_instance_info
["namespace"],
3208 kdu_instance
=kdu_instance
,
3212 # Obtain services to obtain management service ip
3213 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
3214 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3215 kdu_instance
=kdu_instance
,
3216 namespace
=k8s_instance_info
["namespace"],
3219 # Obtain management service info (if exists)
3220 vnfr_update_dict
= {}
3221 kdu_config
= get_configuration(vnfd
, kdud
["name"])
3223 target_ee_list
= kdu_config
.get("execution-environment-list", [])
3228 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
3231 for service
in kdud
.get("service", [])
3232 if service
.get("mgmt-service")
3234 for mgmt_service
in mgmt_services
:
3235 for service
in services
:
3236 if service
["name"].startswith(mgmt_service
["name"]):
3237 # Mgmt service found, Obtain service ip
3238 ip
= service
.get("external_ip", service
.get("cluster_ip"))
3239 if isinstance(ip
, list) and len(ip
) == 1:
3243 "kdur.{}.ip-address".format(kdu_index
)
3246 # Check if must update also mgmt ip at the vnf
3247 service_external_cp
= mgmt_service
.get(
3248 "external-connection-point-ref"
3250 if service_external_cp
:
3252 deep_get(vnfd
, ("mgmt-interface", "cp"))
3253 == service_external_cp
3255 vnfr_update_dict
["ip-address"] = ip
3260 "external-connection-point-ref", ""
3262 == service_external_cp
,
3265 "kdur.{}.ip-address".format(kdu_index
)
3270 "Mgmt service name: {} not found".format(
3271 mgmt_service
["name"]
3275 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3276 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3278 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3281 and kdu_config
.get("initial-config-primitive")
3282 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3284 initial_config_primitive_list
= kdu_config
.get(
3285 "initial-config-primitive"
3287 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3289 for initial_config_primitive
in initial_config_primitive_list
:
3290 primitive_params_
= self
._map
_primitive
_params
(
3291 initial_config_primitive
, {}, {}
3294 await asyncio
.wait_for(
3295 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3296 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3297 kdu_instance
=kdu_instance
,
3298 primitive_name
=initial_config_primitive
["name"],
3299 params
=primitive_params_
,
3300 db_dict
=db_dict_install
,
3306 except Exception as e
:
3307 # Prepare update db with error and raise exception
3310 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3314 vnfr_data
.get("_id"),
3315 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3318 # ignore to keep original exception
3320 # reraise original error
3325 async def deploy_kdus(
3332 task_instantiation_info
,
3334 # Launch kdus if present in the descriptor
3336 k8scluster_id_2_uuic
= {
3337 "helm-chart-v3": {},
3342 async def _get_cluster_id(cluster_id
, cluster_type
):
3343 nonlocal k8scluster_id_2_uuic
3344 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3345 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3347 # check if K8scluster is creating and wait look if previous tasks in process
3348 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3349 "k8scluster", cluster_id
3352 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3353 task_name
, cluster_id
3355 self
.logger
.debug(logging_text
+ text
)
3356 await asyncio
.wait(task_dependency
, timeout
=3600)
3358 db_k8scluster
= self
.db
.get_one(
3359 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3361 if not db_k8scluster
:
3362 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3364 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3366 if cluster_type
== "helm-chart-v3":
3368 # backward compatibility for existing clusters that have not been initialized for helm v3
3369 k8s_credentials
= yaml
.safe_dump(
3370 db_k8scluster
.get("credentials")
3372 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3373 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3375 db_k8scluster_update
= {}
3376 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3377 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3378 db_k8scluster_update
[
3379 "_admin.helm-chart-v3.created"
3381 db_k8scluster_update
[
3382 "_admin.helm-chart-v3.operationalState"
3385 "k8sclusters", cluster_id
, db_k8scluster_update
3387 except Exception as e
:
3390 + "error initializing helm-v3 cluster: {}".format(str(e
))
3393 "K8s cluster '{}' has not been initialized for '{}'".format(
3394 cluster_id
, cluster_type
3399 "K8s cluster '{}' has not been initialized for '{}'".format(
3400 cluster_id
, cluster_type
3403 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3406 logging_text
+= "Deploy kdus: "
3409 db_nsr_update
= {"_admin.deployed.K8s": []}
3410 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3413 updated_cluster_list
= []
3414 updated_v3_cluster_list
= []
3416 for vnfr_data
in db_vnfrs
.values():
3417 vca_id
= self
.get_vca_id(vnfr_data
, {})
3418 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3419 # Step 0: Prepare and set parameters
3420 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3421 vnfd_id
= vnfr_data
.get("vnfd-id")
3422 vnfd_with_id
= find_in_list(
3423 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3427 for kdud
in vnfd_with_id
["kdu"]
3428 if kdud
["name"] == kdur
["kdu-name"]
3430 namespace
= kdur
.get("k8s-namespace")
3431 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3432 if kdur
.get("helm-chart"):
3433 kdumodel
= kdur
["helm-chart"]
3434 # Default version: helm3, if helm-version is v2 assign v2
3435 k8sclustertype
= "helm-chart-v3"
3436 self
.logger
.debug("kdur: {}".format(kdur
))
3438 kdur
.get("helm-version")
3439 and kdur
.get("helm-version") == "v2"
3441 k8sclustertype
= "helm-chart"
3442 elif kdur
.get("juju-bundle"):
3443 kdumodel
= kdur
["juju-bundle"]
3444 k8sclustertype
= "juju-bundle"
3447 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3448 "juju-bundle. Maybe an old NBI version is running".format(
3449 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3452 # check if kdumodel is a file and exists
3454 vnfd_with_id
= find_in_list(
3455 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3457 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3458 if storage
: # may be not present if vnfd has not artifacts
3459 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3460 if storage
["pkg-dir"]:
3461 filename
= "{}/{}/{}s/{}".format(
3468 filename
= "{}/Scripts/{}s/{}".format(
3473 if self
.fs
.file_exists(
3474 filename
, mode
="file"
3475 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3476 kdumodel
= self
.fs
.path
+ filename
3477 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3479 except Exception: # it is not a file
3482 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3483 step
= "Synchronize repos for k8s cluster '{}'".format(
3486 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3490 k8sclustertype
== "helm-chart"
3491 and cluster_uuid
not in updated_cluster_list
3493 k8sclustertype
== "helm-chart-v3"
3494 and cluster_uuid
not in updated_v3_cluster_list
3496 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3497 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3498 cluster_uuid
=cluster_uuid
3501 if del_repo_list
or added_repo_dict
:
3502 if k8sclustertype
== "helm-chart":
3504 "_admin.helm_charts_added." + item
: None
3505 for item
in del_repo_list
3508 "_admin.helm_charts_added." + item
: name
3509 for item
, name
in added_repo_dict
.items()
3511 updated_cluster_list
.append(cluster_uuid
)
3512 elif k8sclustertype
== "helm-chart-v3":
3514 "_admin.helm_charts_v3_added." + item
: None
3515 for item
in del_repo_list
3518 "_admin.helm_charts_v3_added." + item
: name
3519 for item
, name
in added_repo_dict
.items()
3521 updated_v3_cluster_list
.append(cluster_uuid
)
3523 logging_text
+ "repos synchronized on k8s cluster "
3524 "'{}' to_delete: {}, to_add: {}".format(
3525 k8s_cluster_id
, del_repo_list
, added_repo_dict
3530 {"_id": k8s_cluster_id
},
3536 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3537 vnfr_data
["member-vnf-index-ref"],
3541 k8s_instance_info
= {
3542 "kdu-instance": None,
3543 "k8scluster-uuid": cluster_uuid
,
3544 "k8scluster-type": k8sclustertype
,
3545 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3546 "kdu-name": kdur
["kdu-name"],
3547 "kdu-model": kdumodel
,
3548 "namespace": namespace
,
3549 "kdu-deployment-name": kdu_deployment_name
,
3551 db_path
= "_admin.deployed.K8s.{}".format(index
)
3552 db_nsr_update
[db_path
] = k8s_instance_info
3553 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3554 vnfd_with_id
= find_in_list(
3555 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3557 task
= asyncio
.ensure_future(
3566 k8params
=desc_params
,
3571 self
.lcm_tasks
.register(
3575 "instantiate_KDU-{}".format(index
),
3578 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3584 except (LcmException
, asyncio
.CancelledError
):
3586 except Exception as e
:
3587 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3588 if isinstance(e
, (N2VCException
, DbException
)):
3589 self
.logger
.error(logging_text
+ msg
)
3591 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3592 raise LcmException(msg
)
3595 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3614 task_instantiation_info
,
3617 # launch instantiate_N2VC in a asyncio task and register task object
3618 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3619 # if not found, create one entry and update database
3620 # fill db_nsr._admin.deployed.VCA.<index>
3623 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
3625 if "execution-environment-list" in descriptor_config
:
3626 ee_list
= descriptor_config
.get("execution-environment-list", [])
3627 elif "juju" in descriptor_config
:
3628 ee_list
= [descriptor_config
] # ns charms
3629 else: # other types as script are not supported
3632 for ee_item
in ee_list
:
3635 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3636 ee_item
.get("juju"), ee_item
.get("helm-chart")
3639 ee_descriptor_id
= ee_item
.get("id")
3640 if ee_item
.get("juju"):
3641 vca_name
= ee_item
["juju"].get("charm")
3644 if ee_item
["juju"].get("charm") is not None
3647 if ee_item
["juju"].get("cloud") == "k8s":
3648 vca_type
= "k8s_proxy_charm"
3649 elif ee_item
["juju"].get("proxy") is False:
3650 vca_type
= "native_charm"
3651 elif ee_item
.get("helm-chart"):
3652 vca_name
= ee_item
["helm-chart"]
3653 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
3656 vca_type
= "helm-v3"
3659 logging_text
+ "skipping non juju neither charm configuration"
3664 for vca_index
, vca_deployed
in enumerate(
3665 db_nsr
["_admin"]["deployed"]["VCA"]
3667 if not vca_deployed
:
3670 vca_deployed
.get("member-vnf-index") == member_vnf_index
3671 and vca_deployed
.get("vdu_id") == vdu_id
3672 and vca_deployed
.get("kdu_name") == kdu_name
3673 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
3674 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
3678 # not found, create one.
3680 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
3683 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
3685 target
+= "/kdu/{}".format(kdu_name
)
3687 "target_element": target
,
3688 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3689 "member-vnf-index": member_vnf_index
,
3691 "kdu_name": kdu_name
,
3692 "vdu_count_index": vdu_index
,
3693 "operational-status": "init", # TODO revise
3694 "detailed-status": "", # TODO revise
3695 "step": "initial-deploy", # TODO revise
3697 "vdu_name": vdu_name
,
3699 "ee_descriptor_id": ee_descriptor_id
,
3703 # create VCA and configurationStatus in db
3705 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
3706 "configurationStatus.{}".format(vca_index
): dict(),
3708 self
.update_db_2("nsrs", nsr_id
, db_dict
)
3710 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
3712 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
3713 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
3714 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
3717 task_n2vc
= asyncio
.ensure_future(
3718 self
.instantiate_N2VC(
3719 logging_text
=logging_text
,
3720 vca_index
=vca_index
,
3726 vdu_index
=vdu_index
,
3727 deploy_params
=deploy_params
,
3728 config_descriptor
=descriptor_config
,
3729 base_folder
=base_folder
,
3730 nslcmop_id
=nslcmop_id
,
3734 ee_config_descriptor
=ee_item
,
3737 self
.lcm_tasks
.register(
3741 "instantiate_N2VC-{}".format(vca_index
),
3744 task_instantiation_info
[
3746 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
3747 member_vnf_index
or "", vdu_id
or ""
3751 def _create_nslcmop(nsr_id
, operation
, params
):
3753 Creates a ns-lcm-opp content to be stored at database.
3754 :param nsr_id: internal id of the instance
3755 :param operation: instantiate, terminate, scale, action, ...
3756 :param params: user parameters for the operation
3757 :return: dictionary following SOL005 format
3759 # Raise exception if invalid arguments
3760 if not (nsr_id
and operation
and params
):
3762 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3769 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3770 "operationState": "PROCESSING",
3771 "statusEnteredTime": now
,
3772 "nsInstanceId": nsr_id
,
3773 "lcmOperationType": operation
,
3775 "isAutomaticInvocation": False,
3776 "operationParams": params
,
3777 "isCancelPending": False,
3779 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
3780 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
3785 def _format_additional_params(self
, params
):
3786 params
= params
or {}
3787 for key
, value
in params
.items():
3788 if str(value
).startswith("!!yaml "):
3789 params
[key
] = yaml
.safe_load(value
[7:])
3792 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
3793 primitive
= seq
.get("name")
3794 primitive_params
= {}
3796 "member_vnf_index": vnf_index
,
3797 "primitive": primitive
,
3798 "primitive_params": primitive_params
,
3801 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
3805 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
3806 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
3807 if op
.get("operationState") == "COMPLETED":
3808 # b. Skip sub-operation
3809 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3810 return self
.SUBOPERATION_STATUS_SKIP
3812 # c. retry executing sub-operation
3813 # The sub-operation exists, and operationState != 'COMPLETED'
3814 # Update operationState = 'PROCESSING' to indicate a retry.
3815 operationState
= "PROCESSING"
3816 detailed_status
= "In progress"
3817 self
._update
_suboperation
_status
(
3818 db_nslcmop
, op_index
, operationState
, detailed_status
3820 # Return the sub-operation index
3821 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3822 # with arguments extracted from the sub-operation
3825 # Find a sub-operation where all keys in a matching dictionary must match
3826 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3827 def _find_suboperation(self
, db_nslcmop
, match
):
3828 if db_nslcmop
and match
:
3829 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
3830 for i
, op
in enumerate(op_list
):
3831 if all(op
.get(k
) == match
[k
] for k
in match
):
3833 return self
.SUBOPERATION_STATUS_NOT_FOUND
3835 # Update status for a sub-operation given its index
3836 def _update_suboperation_status(
3837 self
, db_nslcmop
, op_index
, operationState
, detailed_status
3839 # Update DB for HA tasks
3840 q_filter
= {"_id": db_nslcmop
["_id"]}
3842 "_admin.operations.{}.operationState".format(op_index
): operationState
,
3843 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
3846 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
3849 # Add sub-operation, return the index of the added sub-operation
3850 # Optionally, set operationState, detailed-status, and operationType
3851 # Status and type are currently set for 'scale' sub-operations:
3852 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3853 # 'detailed-status' : status message
3854 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3855 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3856 def _add_suboperation(
3864 mapped_primitive_params
,
3865 operationState
=None,
3866 detailed_status
=None,
3869 RO_scaling_info
=None,
3872 return self
.SUBOPERATION_STATUS_NOT_FOUND
3873 # Get the "_admin.operations" list, if it exists
3874 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
3875 op_list
= db_nslcmop_admin
.get("operations")
3876 # Create or append to the "_admin.operations" list
3878 "member_vnf_index": vnf_index
,
3880 "vdu_count_index": vdu_count_index
,
3881 "primitive": primitive
,
3882 "primitive_params": mapped_primitive_params
,
3885 new_op
["operationState"] = operationState
3887 new_op
["detailed-status"] = detailed_status
3889 new_op
["lcmOperationType"] = operationType
3891 new_op
["RO_nsr_id"] = RO_nsr_id
3893 new_op
["RO_scaling_info"] = RO_scaling_info
3895 # No existing operations, create key 'operations' with current operation as first list element
3896 db_nslcmop_admin
.update({"operations": [new_op
]})
3897 op_list
= db_nslcmop_admin
.get("operations")
3899 # Existing operations, append operation to list
3900 op_list
.append(new_op
)
3902 db_nslcmop_update
= {"_admin.operations": op_list
}
3903 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
3904 op_index
= len(op_list
) - 1
3907 # Helper methods for scale() sub-operations
3909 # pre-scale/post-scale:
3910 # Check for 3 different cases:
3911 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
3912 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
3913 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
3914 def _check_or_add_scale_suboperation(
3918 vnf_config_primitive
,
3922 RO_scaling_info
=None,
3924 # Find this sub-operation
3925 if RO_nsr_id
and RO_scaling_info
:
3926 operationType
= "SCALE-RO"
3928 "member_vnf_index": vnf_index
,
3929 "RO_nsr_id": RO_nsr_id
,
3930 "RO_scaling_info": RO_scaling_info
,
3934 "member_vnf_index": vnf_index
,
3935 "primitive": vnf_config_primitive
,
3936 "primitive_params": primitive_params
,
3937 "lcmOperationType": operationType
,
3939 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
3940 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
3941 # a. New sub-operation
3942 # The sub-operation does not exist, add it.
3943 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
3944 # The following parameters are set to None for all kind of scaling:
3946 vdu_count_index
= None
3948 if RO_nsr_id
and RO_scaling_info
:
3949 vnf_config_primitive
= None
3950 primitive_params
= None
3953 RO_scaling_info
= None
3954 # Initial status for sub-operation
3955 operationState
= "PROCESSING"
3956 detailed_status
= "In progress"
3957 # Add sub-operation for pre/post-scaling (zero or more operations)
3958 self
._add
_suboperation
(
3964 vnf_config_primitive
,
3972 return self
.SUBOPERATION_STATUS_NEW
3974 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3975 # or op_index (operationState != 'COMPLETED')
3976 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
3978 # Function to return execution_environment id
3980 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
3981 # TODO vdu_index_count
3982 for vca
in vca_deployed_list
:
3983 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
3986 async def destroy_N2VC(
3994 exec_primitives
=True,
3999 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
4000 :param logging_text:
4002 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
4003 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
4004 :param vca_index: index in the database _admin.deployed.VCA
4005 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
4006 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
4007 not executed properly
4008 :param scaling_in: True destroys the application, False destroys the model
4009 :return: None or exception
4014 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
4015 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
4019 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
4021 # execute terminate_primitives
4023 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
4024 config_descriptor
.get("terminate-config-primitive"),
4025 vca_deployed
.get("ee_descriptor_id"),
4027 vdu_id
= vca_deployed
.get("vdu_id")
4028 vdu_count_index
= vca_deployed
.get("vdu_count_index")
4029 vdu_name
= vca_deployed
.get("vdu_name")
4030 vnf_index
= vca_deployed
.get("member-vnf-index")
4031 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
4032 for seq
in terminate_primitives
:
4033 # For each sequence in list, get primitive and call _ns_execute_primitive()
4034 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
4035 vnf_index
, seq
.get("name")
4037 self
.logger
.debug(logging_text
+ step
)
4038 # Create the primitive for each sequence, i.e. "primitive": "touch"
4039 primitive
= seq
.get("name")
4040 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
4045 self
._add
_suboperation
(
4052 mapped_primitive_params
,
4054 # Sub-operations: Call _ns_execute_primitive() instead of action()
4056 result
, result_detail
= await self
._ns
_execute
_primitive
(
4057 vca_deployed
["ee_id"],
4059 mapped_primitive_params
,
4063 except LcmException
:
4064 # this happens when VCA is not deployed. In this case it is not needed to terminate
4066 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
4067 if result
not in result_ok
:
4069 "terminate_primitive {} for vnf_member_index={} fails with "
4070 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
4072 # set that this VCA do not need terminated
4073 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
4077 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
4080 # Delete Prometheus Jobs if any
4081 # This uses NSR_ID, so it will destroy any jobs under this index
4082 self
.db
.del_list("prometheus_jobs", {"nsr_id": db_nslcmop
["nsInstanceId"]})
4085 await self
.vca_map
[vca_type
].delete_execution_environment(
4086 vca_deployed
["ee_id"],
4087 scaling_in
=scaling_in
,
4092 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
4093 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
4094 namespace
= "." + db_nsr
["_id"]
4096 await self
.n2vc
.delete_namespace(
4097 namespace
=namespace
,
4098 total_timeout
=self
.timeout_charm_delete
,
4101 except N2VCNotFound
: # already deleted. Skip
4103 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
4105 async def _terminate_RO(
4106 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4109 Terminates a deployment from RO
4110 :param logging_text:
4111 :param nsr_deployed: db_nsr._admin.deployed
4114 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
4115 this method will update only the index 2, but it will write on database the concatenated content of the list
4120 ro_nsr_id
= ro_delete_action
= None
4121 if nsr_deployed
and nsr_deployed
.get("RO"):
4122 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
4123 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
4126 stage
[2] = "Deleting ns from VIM."
4127 db_nsr_update
["detailed-status"] = " ".join(stage
)
4128 self
._write
_op
_status
(nslcmop_id
, stage
)
4129 self
.logger
.debug(logging_text
+ stage
[2])
4130 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4131 self
._write
_op
_status
(nslcmop_id
, stage
)
4132 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
4133 ro_delete_action
= desc
["action_id"]
4135 "_admin.deployed.RO.nsr_delete_action_id"
4136 ] = ro_delete_action
4137 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4138 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4139 if ro_delete_action
:
4140 # wait until NS is deleted from VIM
4141 stage
[2] = "Waiting ns deleted from VIM."
4142 detailed_status_old
= None
4146 + " RO_id={} ro_delete_action={}".format(
4147 ro_nsr_id
, ro_delete_action
4150 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4151 self
._write
_op
_status
(nslcmop_id
, stage
)
4153 delete_timeout
= 20 * 60 # 20 minutes
4154 while delete_timeout
> 0:
4155 desc
= await self
.RO
.show(
4157 item_id_name
=ro_nsr_id
,
4158 extra_item
="action",
4159 extra_item_id
=ro_delete_action
,
4163 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
4165 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
4166 if ns_status
== "ERROR":
4167 raise ROclient
.ROClientException(ns_status_info
)
4168 elif ns_status
== "BUILD":
4169 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
4170 elif ns_status
== "ACTIVE":
4171 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4172 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4177 ), "ROclient.check_action_status returns unknown {}".format(
4180 if stage
[2] != detailed_status_old
:
4181 detailed_status_old
= stage
[2]
4182 db_nsr_update
["detailed-status"] = " ".join(stage
)
4183 self
._write
_op
_status
(nslcmop_id
, stage
)
4184 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4185 await asyncio
.sleep(5, loop
=self
.loop
)
4187 else: # delete_timeout <= 0:
4188 raise ROclient
.ROClientException(
4189 "Timeout waiting ns deleted from VIM"
4192 except Exception as e
:
4193 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4195 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4197 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4198 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4199 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4201 logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
)
4204 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4206 failed_detail
.append("delete conflict: {}".format(e
))
4209 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
)
4212 failed_detail
.append("delete error: {}".format(e
))
4214 logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
)
4218 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
4219 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
4221 stage
[2] = "Deleting nsd from RO."
4222 db_nsr_update
["detailed-status"] = " ".join(stage
)
4223 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4224 self
._write
_op
_status
(nslcmop_id
, stage
)
4225 await self
.RO
.delete("nsd", ro_nsd_id
)
4227 logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
)
4229 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4230 except Exception as e
:
4232 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4234 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4236 logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
)
4239 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4241 failed_detail
.append(
4242 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
)
4244 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4246 failed_detail
.append(
4247 "ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
)
4249 self
.logger
.error(logging_text
+ failed_detail
[-1])
4251 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
4252 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
4253 if not vnf_deployed
or not vnf_deployed
["id"]:
4256 ro_vnfd_id
= vnf_deployed
["id"]
4259 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4260 vnf_deployed
["member-vnf-index"], ro_vnfd_id
4262 db_nsr_update
["detailed-status"] = " ".join(stage
)
4263 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4264 self
._write
_op
_status
(nslcmop_id
, stage
)
4265 await self
.RO
.delete("vnfd", ro_vnfd_id
)
4267 logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
)
4269 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
4270 except Exception as e
:
4272 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4275 "_admin.deployed.RO.vnfd.{}.id".format(index
)
4279 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
)
4282 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4284 failed_detail
.append(
4285 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
)
4287 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4289 failed_detail
.append(
4290 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
)
4292 self
.logger
.error(logging_text
+ failed_detail
[-1])
4295 stage
[2] = "Error deleting from VIM"
4297 stage
[2] = "Deleted from VIM"
4298 db_nsr_update
["detailed-status"] = " ".join(stage
)
4299 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4300 self
._write
_op
_status
(nslcmop_id
, stage
)
4303 raise LcmException("; ".join(failed_detail
))
4305 async def terminate(self
, nsr_id
, nslcmop_id
):
4306 # Try to lock HA task here
4307 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4308 if not task_is_locked_by_me
:
4311 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4312 self
.logger
.debug(logging_text
+ "Enter")
4313 timeout_ns_terminate
= self
.timeout_ns_terminate
4316 operation_params
= None
4318 error_list
= [] # annotates all failed error messages
4319 db_nslcmop_update
= {}
4320 autoremove
= False # autoremove after terminated
4321 tasks_dict_info
= {}
4324 "Stage 1/3: Preparing task.",
4325 "Waiting for previous operations to terminate.",
4328 # ^ contains [stage, step, VIM-status]
4330 # wait for any previous tasks in process
4331 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4333 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4334 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4335 operation_params
= db_nslcmop
.get("operationParams") or {}
4336 if operation_params
.get("timeout_ns_terminate"):
4337 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4338 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4339 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4341 db_nsr_update
["operational-status"] = "terminating"
4342 db_nsr_update
["config-status"] = "terminating"
4343 self
._write
_ns
_status
(
4345 ns_state
="TERMINATING",
4346 current_operation
="TERMINATING",
4347 current_operation_id
=nslcmop_id
,
4348 other_update
=db_nsr_update
,
4350 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4351 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4352 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4355 stage
[1] = "Getting vnf descriptors from db."
4356 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4358 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4360 db_vnfds_from_id
= {}
4361 db_vnfds_from_member_index
= {}
4363 for vnfr
in db_vnfrs_list
:
4364 vnfd_id
= vnfr
["vnfd-id"]
4365 if vnfd_id
not in db_vnfds_from_id
:
4366 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4367 db_vnfds_from_id
[vnfd_id
] = vnfd
4368 db_vnfds_from_member_index
[
4369 vnfr
["member-vnf-index-ref"]
4370 ] = db_vnfds_from_id
[vnfd_id
]
4372 # Destroy individual execution environments when there are terminating primitives.
4373 # Rest of EE will be deleted at once
4374 # TODO - check before calling _destroy_N2VC
4375 # if not operation_params.get("skip_terminate_primitives"):#
4376 # or not vca.get("needed_terminate"):
4377 stage
[0] = "Stage 2/3 execute terminating primitives."
4378 self
.logger
.debug(logging_text
+ stage
[0])
4379 stage
[1] = "Looking execution environment that needs terminate."
4380 self
.logger
.debug(logging_text
+ stage
[1])
4382 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4383 config_descriptor
= None
4384 vca_member_vnf_index
= vca
.get("member-vnf-index")
4385 vca_id
= self
.get_vca_id(
4386 db_vnfrs_dict
.get(vca_member_vnf_index
)
4387 if vca_member_vnf_index
4391 if not vca
or not vca
.get("ee_id"):
4393 if not vca
.get("member-vnf-index"):
4395 config_descriptor
= db_nsr
.get("ns-configuration")
4396 elif vca
.get("vdu_id"):
4397 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4398 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4399 elif vca
.get("kdu_name"):
4400 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4401 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4403 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4404 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4405 vca_type
= vca
.get("type")
4406 exec_terminate_primitives
= not operation_params
.get(
4407 "skip_terminate_primitives"
4408 ) and vca
.get("needed_terminate")
4409 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4410 # pending native charms
4412 True if vca_type
in ("helm", "helm-v3", "native_charm") else False
4414 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4415 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4416 task
= asyncio
.ensure_future(
4424 exec_terminate_primitives
,
4428 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4430 # wait for pending tasks of terminate primitives
4434 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4436 error_list
= await self
._wait
_for
_tasks
(
4439 min(self
.timeout_charm_delete
, timeout_ns_terminate
),
4443 tasks_dict_info
.clear()
4445 return # raise LcmException("; ".join(error_list))
4447 # remove All execution environments at once
4448 stage
[0] = "Stage 3/3 delete all."
4450 if nsr_deployed
.get("VCA"):
4451 stage
[1] = "Deleting all execution environments."
4452 self
.logger
.debug(logging_text
+ stage
[1])
4453 vca_id
= self
.get_vca_id({}, db_nsr
)
4454 task_delete_ee
= asyncio
.ensure_future(
4456 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4457 timeout
=self
.timeout_charm_delete
,
4460 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4461 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4463 # Delete from k8scluster
4464 stage
[1] = "Deleting KDUs."
4465 self
.logger
.debug(logging_text
+ stage
[1])
4466 # print(nsr_deployed)
4467 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4468 if not kdu
or not kdu
.get("kdu-instance"):
4470 kdu_instance
= kdu
.get("kdu-instance")
4471 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4472 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4473 vca_id
= self
.get_vca_id({}, db_nsr
)
4474 task_delete_kdu_instance
= asyncio
.ensure_future(
4475 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4476 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4477 kdu_instance
=kdu_instance
,
4484 + "Unknown k8s deployment type {}".format(
4485 kdu
.get("k8scluster-type")
4490 task_delete_kdu_instance
4491 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4494 stage
[1] = "Deleting ns from VIM."
4496 task_delete_ro
= asyncio
.ensure_future(
4497 self
._terminate
_ng
_ro
(
4498 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4502 task_delete_ro
= asyncio
.ensure_future(
4504 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4507 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4509 # rest of staff will be done at finally
4512 ROclient
.ROClientException
,
4517 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4519 except asyncio
.CancelledError
:
4521 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4523 exc
= "Operation was cancelled"
4524 except Exception as e
:
4525 exc
= traceback
.format_exc()
4526 self
.logger
.critical(
4527 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4532 error_list
.append(str(exc
))
4534 # wait for pending tasks
4536 stage
[1] = "Waiting for terminate pending tasks."
4537 self
.logger
.debug(logging_text
+ stage
[1])
4538 error_list
+= await self
._wait
_for
_tasks
(
4541 timeout_ns_terminate
,
4545 stage
[1] = stage
[2] = ""
4546 except asyncio
.CancelledError
:
4547 error_list
.append("Cancelled")
4548 # TODO cancell all tasks
4549 except Exception as exc
:
4550 error_list
.append(str(exc
))
4551 # update status at database
4553 error_detail
= "; ".join(error_list
)
4554 # self.logger.error(logging_text + error_detail)
4555 error_description_nslcmop
= "{} Detail: {}".format(
4556 stage
[0], error_detail
4558 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4559 nslcmop_id
, stage
[0]
4562 db_nsr_update
["operational-status"] = "failed"
4563 db_nsr_update
["detailed-status"] = (
4564 error_description_nsr
+ " Detail: " + error_detail
4566 db_nslcmop_update
["detailed-status"] = error_detail
4567 nslcmop_operation_state
= "FAILED"
4571 error_description_nsr
= error_description_nslcmop
= None
4572 ns_state
= "NOT_INSTANTIATED"
4573 db_nsr_update
["operational-status"] = "terminated"
4574 db_nsr_update
["detailed-status"] = "Done"
4575 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4576 db_nslcmop_update
["detailed-status"] = "Done"
4577 nslcmop_operation_state
= "COMPLETED"
4580 self
._write
_ns
_status
(
4583 current_operation
="IDLE",
4584 current_operation_id
=None,
4585 error_description
=error_description_nsr
,
4586 error_detail
=error_detail
,
4587 other_update
=db_nsr_update
,
4589 self
._write
_op
_status
(
4592 error_message
=error_description_nslcmop
,
4593 operation_state
=nslcmop_operation_state
,
4594 other_update
=db_nslcmop_update
,
4596 if ns_state
== "NOT_INSTANTIATED":
4600 {"nsr-id-ref": nsr_id
},
4601 {"_admin.nsState": "NOT_INSTANTIATED"},
4603 except DbException
as e
:
4606 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4610 if operation_params
:
4611 autoremove
= operation_params
.get("autoremove", False)
4612 if nslcmop_operation_state
:
4614 await self
.msg
.aiowrite(
4619 "nslcmop_id": nslcmop_id
,
4620 "operationState": nslcmop_operation_state
,
4621 "autoremove": autoremove
,
4625 except Exception as e
:
4627 logging_text
+ "kafka_write notification Exception {}".format(e
)
4630 self
.logger
.debug(logging_text
+ "Exit")
4631 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4633 async def _wait_for_tasks(
4634 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4637 error_detail_list
= []
4639 pending_tasks
= list(created_tasks_info
.keys())
4640 num_tasks
= len(pending_tasks
)
4642 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4643 self
._write
_op
_status
(nslcmop_id
, stage
)
4644 while pending_tasks
:
4646 _timeout
= timeout
+ time_start
- time()
4647 done
, pending_tasks
= await asyncio
.wait(
4648 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4650 num_done
+= len(done
)
4651 if not done
: # Timeout
4652 for task
in pending_tasks
:
4653 new_error
= created_tasks_info
[task
] + ": Timeout"
4654 error_detail_list
.append(new_error
)
4655 error_list
.append(new_error
)
4658 if task
.cancelled():
4661 exc
= task
.exception()
4663 if isinstance(exc
, asyncio
.TimeoutError
):
4665 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4666 error_list
.append(created_tasks_info
[task
])
4667 error_detail_list
.append(new_error
)
4674 ROclient
.ROClientException
,
4680 self
.logger
.error(logging_text
+ new_error
)
4682 exc_traceback
= "".join(
4683 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4687 + created_tasks_info
[task
]
4693 logging_text
+ created_tasks_info
[task
] + ": Done"
4695 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4697 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4698 if nsr_id
: # update also nsr
4703 "errorDescription": "Error at: " + ", ".join(error_list
),
4704 "errorDetail": ". ".join(error_detail_list
),
4707 self
._write
_op
_status
(nslcmop_id
, stage
)
4708 return error_detail_list
4711 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4713 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4714 The default-value is used. If it is between < > it look for a value at instantiation_params
4715 :param primitive_desc: portion of VNFD/NSD that describes primitive
4716 :param params: Params provided by user
4717 :param instantiation_params: Instantiation params provided by user
4718 :return: a dictionary with the calculated params
4720 calculated_params
= {}
4721 for parameter
in primitive_desc
.get("parameter", ()):
4722 param_name
= parameter
["name"]
4723 if param_name
in params
:
4724 calculated_params
[param_name
] = params
[param_name
]
4725 elif "default-value" in parameter
or "value" in parameter
:
4726 if "value" in parameter
:
4727 calculated_params
[param_name
] = parameter
["value"]
4729 calculated_params
[param_name
] = parameter
["default-value"]
4731 isinstance(calculated_params
[param_name
], str)
4732 and calculated_params
[param_name
].startswith("<")
4733 and calculated_params
[param_name
].endswith(">")
4735 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4736 calculated_params
[param_name
] = instantiation_params
[
4737 calculated_params
[param_name
][1:-1]
4741 "Parameter {} needed to execute primitive {} not provided".format(
4742 calculated_params
[param_name
], primitive_desc
["name"]
4747 "Parameter {} needed to execute primitive {} not provided".format(
4748 param_name
, primitive_desc
["name"]
4752 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4753 calculated_params
[param_name
] = yaml
.safe_dump(
4754 calculated_params
[param_name
], default_flow_style
=True, width
=256
4756 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4758 ].startswith("!!yaml "):
4759 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4760 if parameter
.get("data-type") == "INTEGER":
4762 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4763 except ValueError: # error converting string to int
4765 "Parameter {} of primitive {} must be integer".format(
4766 param_name
, primitive_desc
["name"]
4769 elif parameter
.get("data-type") == "BOOLEAN":
4770 calculated_params
[param_name
] = not (
4771 (str(calculated_params
[param_name
])).lower() == "false"
4774 # add always ns_config_info if primitive name is config
4775 if primitive_desc
["name"] == "config":
4776 if "ns_config_info" in instantiation_params
:
4777 calculated_params
["ns_config_info"] = instantiation_params
[
4780 return calculated_params
4782 def _look_for_deployed_vca(
4789 ee_descriptor_id
=None,
4791 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4792 for vca
in deployed_vca
:
4795 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4798 vdu_count_index
is not None
4799 and vdu_count_index
!= vca
["vdu_count_index"]
4802 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4804 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4808 # vca_deployed not found
4810 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4811 " is not deployed".format(
4820 ee_id
= vca
.get("ee_id")
4822 "type", "lxc_proxy_charm"
4823 ) # default value for backward compatibility - proxy charm
4826 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4827 "execution environment".format(
4828 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
4831 return ee_id
, vca_type
4833 async def _ns_execute_primitive(
4839 retries_interval
=30,
4846 if primitive
== "config":
4847 primitive_params
= {"params": primitive_params
}
4849 vca_type
= vca_type
or "lxc_proxy_charm"
4853 output
= await asyncio
.wait_for(
4854 self
.vca_map
[vca_type
].exec_primitive(
4856 primitive_name
=primitive
,
4857 params_dict
=primitive_params
,
4858 progress_timeout
=self
.timeout_progress_primitive
,
4859 total_timeout
=self
.timeout_primitive
,
4864 timeout
=timeout
or self
.timeout_primitive
,
4868 except asyncio
.CancelledError
:
4870 except Exception as e
: # asyncio.TimeoutError
4871 if isinstance(e
, asyncio
.TimeoutError
):
4876 "Error executing action {} on {} -> {}".format(
4881 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
4883 return "FAILED", str(e
)
4885 return "COMPLETED", output
4887 except (LcmException
, asyncio
.CancelledError
):
4889 except Exception as e
:
4890 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
4892 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
4894 Updating the vca_status with latest juju information in nsrs record
4895 :param: nsr_id: Id of the nsr
4896 :param: nslcmop_id: Id of the nslcmop
4900 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
4901 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4902 vca_id
= self
.get_vca_id({}, db_nsr
)
4903 if db_nsr
["_admin"]["deployed"]["K8s"]:
4904 for _
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
4905 cluster_uuid
, kdu_instance
, cluster_type
= (
4906 k8s
["k8scluster-uuid"],
4907 k8s
["kdu-instance"],
4908 k8s
["k8scluster-type"],
4910 await self
._on
_update
_k
8s
_db
(
4911 cluster_uuid
=cluster_uuid
,
4912 kdu_instance
=kdu_instance
,
4913 filter={"_id": nsr_id
},
4915 cluster_type
=cluster_type
,
4918 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
4919 table
, filter = "nsrs", {"_id": nsr_id
}
4920 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
4921 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
4923 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
4924 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
4926 async def action(self
, nsr_id
, nslcmop_id
):
4927 # Try to lock HA task here
4928 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4929 if not task_is_locked_by_me
:
4932 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
4933 self
.logger
.debug(logging_text
+ "Enter")
4934 # get all needed from database
4938 db_nslcmop_update
= {}
4939 nslcmop_operation_state
= None
4940 error_description_nslcmop
= None
4943 # wait for any previous tasks in process
4944 step
= "Waiting for previous operations to terminate"
4945 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4947 self
._write
_ns
_status
(
4950 current_operation
="RUNNING ACTION",
4951 current_operation_id
=nslcmop_id
,
4954 step
= "Getting information from database"
4955 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4956 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4957 if db_nslcmop
["operationParams"].get("primitive_params"):
4958 db_nslcmop
["operationParams"]["primitive_params"] = json
.loads(
4959 db_nslcmop
["operationParams"]["primitive_params"]
4962 nsr_deployed
= db_nsr
["_admin"].get("deployed")
4963 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
4964 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
4965 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
4966 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
4967 primitive
= db_nslcmop
["operationParams"]["primitive"]
4968 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
4969 timeout_ns_action
= db_nslcmop
["operationParams"].get(
4970 "timeout_ns_action", self
.timeout_primitive
4974 step
= "Getting vnfr from database"
4975 db_vnfr
= self
.db
.get_one(
4976 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
4978 if db_vnfr
.get("kdur"):
4980 for kdur
in db_vnfr
["kdur"]:
4981 if kdur
.get("additionalParams"):
4982 kdur
["additionalParams"] = json
.loads(
4983 kdur
["additionalParams"]
4985 kdur_list
.append(kdur
)
4986 db_vnfr
["kdur"] = kdur_list
4987 step
= "Getting vnfd from database"
4988 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
4990 step
= "Getting nsd from database"
4991 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
4993 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
4994 # for backward compatibility
4995 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
4996 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
4997 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
4998 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5000 # look for primitive
5001 config_primitive_desc
= descriptor_configuration
= None
5003 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
5005 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
5007 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
5009 descriptor_configuration
= db_nsd
.get("ns-configuration")
5011 if descriptor_configuration
and descriptor_configuration
.get(
5014 for config_primitive
in descriptor_configuration
["config-primitive"]:
5015 if config_primitive
["name"] == primitive
:
5016 config_primitive_desc
= config_primitive
5019 if not config_primitive_desc
:
5020 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
5022 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
5026 primitive_name
= primitive
5027 ee_descriptor_id
= None
5029 primitive_name
= config_primitive_desc
.get(
5030 "execution-environment-primitive", primitive
5032 ee_descriptor_id
= config_primitive_desc
.get(
5033 "execution-environment-ref"
5039 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
5041 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
5044 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
5046 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
5048 desc_params
= parse_yaml_strings(
5049 db_vnfr
.get("additionalParamsForVnf")
5052 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
5053 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
5054 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
5056 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
5057 actions
.add(primitive
["name"])
5058 for primitive
in kdu_configuration
.get("config-primitive", []):
5059 actions
.add(primitive
["name"])
5060 kdu_action
= True if primitive_name
in actions
else False
5062 # TODO check if ns is in a proper status
5064 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
5066 # kdur and desc_params already set from before
5067 if primitive_params
:
5068 desc_params
.update(primitive_params
)
5069 # TODO Check if we will need something at vnf level
5070 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
5072 kdu_name
== kdu
["kdu-name"]
5073 and kdu
["member-vnf-index"] == vnf_index
5078 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
5081 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
5082 msg
= "unknown k8scluster-type '{}'".format(
5083 kdu
.get("k8scluster-type")
5085 raise LcmException(msg
)
5088 "collection": "nsrs",
5089 "filter": {"_id": nsr_id
},
5090 "path": "_admin.deployed.K8s.{}".format(index
),
5094 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
5096 step
= "Executing kdu {}".format(primitive_name
)
5097 if primitive_name
== "upgrade":
5098 if desc_params
.get("kdu_model"):
5099 kdu_model
= desc_params
.get("kdu_model")
5100 del desc_params
["kdu_model"]
5102 kdu_model
= kdu
.get("kdu-model")
5103 parts
= kdu_model
.split(sep
=":")
5105 kdu_model
= parts
[0]
5107 detailed_status
= await asyncio
.wait_for(
5108 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
5109 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5110 kdu_instance
=kdu
.get("kdu-instance"),
5112 kdu_model
=kdu_model
,
5115 timeout
=timeout_ns_action
,
5117 timeout
=timeout_ns_action
+ 10,
5120 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
5122 elif primitive_name
== "rollback":
5123 detailed_status
= await asyncio
.wait_for(
5124 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
5125 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5126 kdu_instance
=kdu
.get("kdu-instance"),
5129 timeout
=timeout_ns_action
,
5131 elif primitive_name
== "status":
5132 detailed_status
= await asyncio
.wait_for(
5133 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
5134 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5135 kdu_instance
=kdu
.get("kdu-instance"),
5138 timeout
=timeout_ns_action
,
5141 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
5142 kdu
["kdu-name"], nsr_id
5144 params
= self
._map
_primitive
_params
(
5145 config_primitive_desc
, primitive_params
, desc_params
5148 detailed_status
= await asyncio
.wait_for(
5149 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
5150 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5151 kdu_instance
=kdu_instance
,
5152 primitive_name
=primitive_name
,
5155 timeout
=timeout_ns_action
,
5158 timeout
=timeout_ns_action
,
5162 nslcmop_operation_state
= "COMPLETED"
5164 detailed_status
= ""
5165 nslcmop_operation_state
= "FAILED"
5167 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5168 nsr_deployed
["VCA"],
5169 member_vnf_index
=vnf_index
,
5171 vdu_count_index
=vdu_count_index
,
5172 ee_descriptor_id
=ee_descriptor_id
,
5174 for vca_index
, vca_deployed
in enumerate(
5175 db_nsr
["_admin"]["deployed"]["VCA"]
5177 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5179 "collection": "nsrs",
5180 "filter": {"_id": nsr_id
},
5181 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
5185 nslcmop_operation_state
,
5187 ) = await self
._ns
_execute
_primitive
(
5189 primitive
=primitive_name
,
5190 primitive_params
=self
._map
_primitive
_params
(
5191 config_primitive_desc
, primitive_params
, desc_params
5193 timeout
=timeout_ns_action
,
5199 db_nslcmop_update
["detailed-status"] = detailed_status
5200 error_description_nslcmop
= (
5201 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
5205 + " task Done with result {} {}".format(
5206 nslcmop_operation_state
, detailed_status
5209 return # database update is called inside finally
5211 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5212 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5214 except asyncio
.CancelledError
:
5216 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5218 exc
= "Operation was cancelled"
5219 except asyncio
.TimeoutError
:
5220 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5222 except Exception as e
:
5223 exc
= traceback
.format_exc()
5224 self
.logger
.critical(
5225 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5234 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5235 nslcmop_operation_state
= "FAILED"
5237 self
._write
_ns
_status
(
5241 ], # TODO check if degraded. For the moment use previous status
5242 current_operation
="IDLE",
5243 current_operation_id
=None,
5244 # error_description=error_description_nsr,
5245 # error_detail=error_detail,
5246 other_update
=db_nsr_update
,
5249 self
._write
_op
_status
(
5252 error_message
=error_description_nslcmop
,
5253 operation_state
=nslcmop_operation_state
,
5254 other_update
=db_nslcmop_update
,
5257 if nslcmop_operation_state
:
5259 await self
.msg
.aiowrite(
5264 "nslcmop_id": nslcmop_id
,
5265 "operationState": nslcmop_operation_state
,
5269 except Exception as e
:
5271 logging_text
+ "kafka_write notification Exception {}".format(e
)
5273 self
.logger
.debug(logging_text
+ "Exit")
5274 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5275 return nslcmop_operation_state
, detailed_status
5277 async def scale(self
, nsr_id
, nslcmop_id
):
5278 # Try to lock HA task here
5279 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5280 if not task_is_locked_by_me
:
5283 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
5284 stage
= ["", "", ""]
5285 tasks_dict_info
= {}
5286 # ^ stage, step, VIM progress
5287 self
.logger
.debug(logging_text
+ "Enter")
5288 # get all needed from database
5290 db_nslcmop_update
= {}
5293 # in case of error, indicates what part of scale was failed to put nsr at error status
5294 scale_process
= None
5295 old_operational_status
= ""
5296 old_config_status
= ""
5299 # wait for any previous tasks in process
5300 step
= "Waiting for previous operations to terminate"
5301 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5302 self
._write
_ns
_status
(
5305 current_operation
="SCALING",
5306 current_operation_id
=nslcmop_id
,
5309 step
= "Getting nslcmop from database"
5311 step
+ " after having waited for previous tasks to be completed"
5313 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5315 step
= "Getting nsr from database"
5316 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5317 old_operational_status
= db_nsr
["operational-status"]
5318 old_config_status
= db_nsr
["config-status"]
5320 step
= "Parsing scaling parameters"
5321 db_nsr_update
["operational-status"] = "scaling"
5322 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5323 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5325 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
5327 ]["member-vnf-index"]
5328 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
5330 ]["scaling-group-descriptor"]
5331 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
5332 # for backward compatibility
5333 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5334 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5335 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5336 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5338 step
= "Getting vnfr from database"
5339 db_vnfr
= self
.db
.get_one(
5340 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
5343 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5345 step
= "Getting vnfd from database"
5346 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
5348 base_folder
= db_vnfd
["_admin"]["storage"]
5350 step
= "Getting scaling-group-descriptor"
5351 scaling_descriptor
= find_in_list(
5352 get_scaling_aspect(db_vnfd
),
5353 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
5355 if not scaling_descriptor
:
5357 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
5358 "at vnfd:scaling-group-descriptor".format(scaling_group
)
5361 step
= "Sending scale order to VIM"
5362 # TODO check if ns is in a proper status
5364 if not db_nsr
["_admin"].get("scaling-group"):
5369 "_admin.scaling-group": [
5370 {"name": scaling_group
, "nb-scale-op": 0}
5374 admin_scale_index
= 0
5376 for admin_scale_index
, admin_scale_info
in enumerate(
5377 db_nsr
["_admin"]["scaling-group"]
5379 if admin_scale_info
["name"] == scaling_group
:
5380 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
5382 else: # not found, set index one plus last element and add new entry with the name
5383 admin_scale_index
+= 1
5385 "_admin.scaling-group.{}.name".format(admin_scale_index
)
5388 vca_scaling_info
= []
5389 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
5390 if scaling_type
== "SCALE_OUT":
5391 if "aspect-delta-details" not in scaling_descriptor
:
5393 "Aspect delta details not fount in scaling descriptor {}".format(
5394 scaling_descriptor
["name"]
5397 # count if max-instance-count is reached
5398 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
5400 scaling_info
["scaling_direction"] = "OUT"
5401 scaling_info
["vdu-create"] = {}
5402 scaling_info
["kdu-create"] = {}
5403 for delta
in deltas
:
5404 for vdu_delta
in delta
.get("vdu-delta", {}):
5405 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
5406 # vdu_index also provides the number of instance of the targeted vdu
5407 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
5408 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
5412 additional_params
= (
5413 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
5416 cloud_init_list
= []
5418 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
5419 max_instance_count
= 10
5420 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
5421 max_instance_count
= vdu_profile
.get(
5422 "max-number-of-instances", 10
5425 default_instance_num
= get_number_of_instances(
5428 instances_number
= vdu_delta
.get("number-of-instances", 1)
5429 nb_scale_op
+= instances_number
5431 new_instance_count
= nb_scale_op
+ default_instance_num
5432 # Control if new count is over max and vdu count is less than max.
5433 # Then assign new instance count
5434 if new_instance_count
> max_instance_count
> vdu_count
:
5435 instances_number
= new_instance_count
- max_instance_count
5437 instances_number
= instances_number
5439 if new_instance_count
> max_instance_count
:
5441 "reached the limit of {} (max-instance-count) "
5442 "scaling-out operations for the "
5443 "scaling-group-descriptor '{}'".format(
5444 nb_scale_op
, scaling_group
5447 for x
in range(vdu_delta
.get("number-of-instances", 1)):
5449 # TODO Information of its own ip is not available because db_vnfr is not updated.
5450 additional_params
["OSM"] = get_osm_params(
5451 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
5453 cloud_init_list
.append(
5454 self
._parse
_cloud
_init
(
5461 vca_scaling_info
.append(
5463 "osm_vdu_id": vdu_delta
["id"],
5464 "member-vnf-index": vnf_index
,
5466 "vdu_index": vdu_index
+ x
,
5469 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
5470 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
5471 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
5472 kdu_name
= kdu_profile
["kdu-name"]
5473 resource_name
= kdu_profile
["resource-name"]
5475 # Might have different kdus in the same delta
5476 # Should have list for each kdu
5477 if not scaling_info
["kdu-create"].get(kdu_name
, None):
5478 scaling_info
["kdu-create"][kdu_name
] = []
5480 kdur
= get_kdur(db_vnfr
, kdu_name
)
5481 if kdur
.get("helm-chart"):
5482 k8s_cluster_type
= "helm-chart-v3"
5483 self
.logger
.debug("kdur: {}".format(kdur
))
5485 kdur
.get("helm-version")
5486 and kdur
.get("helm-version") == "v2"
5488 k8s_cluster_type
= "helm-chart"
5489 raise NotImplementedError
5490 elif kdur
.get("juju-bundle"):
5491 k8s_cluster_type
= "juju-bundle"
5494 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5495 "juju-bundle. Maybe an old NBI version is running".format(
5496 db_vnfr
["member-vnf-index-ref"], kdu_name
5500 max_instance_count
= 10
5501 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
5502 max_instance_count
= kdu_profile
.get(
5503 "max-number-of-instances", 10
5506 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
5507 deployed_kdu
, _
= get_deployed_kdu(
5508 nsr_deployed
, kdu_name
, vnf_index
5510 if deployed_kdu
is None:
5512 "KDU '{}' for vnf '{}' not deployed".format(
5516 kdu_instance
= deployed_kdu
.get("kdu-instance")
5517 instance_num
= await self
.k8scluster_map
[
5519 ].get_scale_count(resource_name
, kdu_instance
, vca_id
=vca_id
)
5520 kdu_replica_count
= instance_num
+ kdu_delta
.get(
5521 "number-of-instances", 1
5524 # Control if new count is over max and instance_num is less than max.
5525 # Then assign max instance number to kdu replica count
5526 if kdu_replica_count
> max_instance_count
> instance_num
:
5527 kdu_replica_count
= max_instance_count
5528 if kdu_replica_count
> max_instance_count
:
5530 "reached the limit of {} (max-instance-count) "
5531 "scaling-out operations for the "
5532 "scaling-group-descriptor '{}'".format(
5533 instance_num
, scaling_group
5537 for x
in range(kdu_delta
.get("number-of-instances", 1)):
5538 vca_scaling_info
.append(
5540 "osm_kdu_id": kdu_name
,
5541 "member-vnf-index": vnf_index
,
5543 "kdu_index": instance_num
+ x
- 1,
5546 scaling_info
["kdu-create"][kdu_name
].append(
5548 "member-vnf-index": vnf_index
,
5550 "k8s-cluster-type": k8s_cluster_type
,
5551 "resource-name": resource_name
,
5552 "scale": kdu_replica_count
,
5555 elif scaling_type
== "SCALE_IN":
5556 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
5558 scaling_info
["scaling_direction"] = "IN"
5559 scaling_info
["vdu-delete"] = {}
5560 scaling_info
["kdu-delete"] = {}
5562 for delta
in deltas
:
5563 for vdu_delta
in delta
.get("vdu-delta", {}):
5564 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
5565 min_instance_count
= 0
5566 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
5567 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
5568 min_instance_count
= vdu_profile
["min-number-of-instances"]
5570 default_instance_num
= get_number_of_instances(
5571 db_vnfd
, vdu_delta
["id"]
5573 instance_num
= vdu_delta
.get("number-of-instances", 1)
5574 nb_scale_op
-= instance_num
5576 new_instance_count
= nb_scale_op
+ default_instance_num
5578 if new_instance_count
< min_instance_count
< vdu_count
:
5579 instances_number
= min_instance_count
- new_instance_count
5581 instances_number
= instance_num
5583 if new_instance_count
< min_instance_count
:
5585 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5586 "scaling-group-descriptor '{}'".format(
5587 nb_scale_op
, scaling_group
5590 for x
in range(vdu_delta
.get("number-of-instances", 1)):
5591 vca_scaling_info
.append(
5593 "osm_vdu_id": vdu_delta
["id"],
5594 "member-vnf-index": vnf_index
,
5596 "vdu_index": vdu_index
- 1 - x
,
5599 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
5600 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
5601 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
5602 kdu_name
= kdu_profile
["kdu-name"]
5603 resource_name
= kdu_profile
["resource-name"]
5605 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
5606 scaling_info
["kdu-delete"][kdu_name
] = []
5608 kdur
= get_kdur(db_vnfr
, kdu_name
)
5609 if kdur
.get("helm-chart"):
5610 k8s_cluster_type
= "helm-chart-v3"
5611 self
.logger
.debug("kdur: {}".format(kdur
))
5613 kdur
.get("helm-version")
5614 and kdur
.get("helm-version") == "v2"
5616 k8s_cluster_type
= "helm-chart"
5617 raise NotImplementedError
5618 elif kdur
.get("juju-bundle"):
5619 k8s_cluster_type
= "juju-bundle"
5622 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5623 "juju-bundle. Maybe an old NBI version is running".format(
5624 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
5628 min_instance_count
= 0
5629 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
5630 min_instance_count
= kdu_profile
["min-number-of-instances"]
5632 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
5633 deployed_kdu
, _
= get_deployed_kdu(
5634 nsr_deployed
, kdu_name
, vnf_index
5636 if deployed_kdu
is None:
5638 "KDU '{}' for vnf '{}' not deployed".format(
5642 kdu_instance
= deployed_kdu
.get("kdu-instance")
5643 instance_num
= await self
.k8scluster_map
[
5645 ].get_scale_count(resource_name
, kdu_instance
, vca_id
=vca_id
)
5646 kdu_replica_count
= instance_num
- kdu_delta
.get(
5647 "number-of-instances", 1
5650 if kdu_replica_count
< min_instance_count
< instance_num
:
5651 kdu_replica_count
= min_instance_count
5652 if kdu_replica_count
< min_instance_count
:
5654 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5655 "scaling-group-descriptor '{}'".format(
5656 instance_num
, scaling_group
5660 for x
in range(kdu_delta
.get("number-of-instances", 1)):
5661 vca_scaling_info
.append(
5663 "osm_kdu_id": kdu_name
,
5664 "member-vnf-index": vnf_index
,
5666 "kdu_index": instance_num
- x
- 1,
5669 scaling_info
["kdu-delete"][kdu_name
].append(
5671 "member-vnf-index": vnf_index
,
5673 "k8s-cluster-type": k8s_cluster_type
,
5674 "resource-name": resource_name
,
5675 "scale": kdu_replica_count
,
5679 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
5680 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
5681 if scaling_info
["scaling_direction"] == "IN":
5682 for vdur
in reversed(db_vnfr
["vdur"]):
5683 if vdu_delete
.get(vdur
["vdu-id-ref"]):
5684 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
5685 scaling_info
["vdu"].append(
5687 "name": vdur
.get("name") or vdur
.get("vdu-name"),
5688 "vdu_id": vdur
["vdu-id-ref"],
5692 for interface
in vdur
["interfaces"]:
5693 scaling_info
["vdu"][-1]["interface"].append(
5695 "name": interface
["name"],
5696 "ip_address": interface
["ip-address"],
5697 "mac_address": interface
.get("mac-address"),
5700 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
5703 step
= "Executing pre-scale vnf-config-primitive"
5704 if scaling_descriptor
.get("scaling-config-action"):
5705 for scaling_config_action
in scaling_descriptor
[
5706 "scaling-config-action"
5709 scaling_config_action
.get("trigger") == "pre-scale-in"
5710 and scaling_type
== "SCALE_IN"
5712 scaling_config_action
.get("trigger") == "pre-scale-out"
5713 and scaling_type
== "SCALE_OUT"
5715 vnf_config_primitive
= scaling_config_action
[
5716 "vnf-config-primitive-name-ref"
5718 step
= db_nslcmop_update
[
5720 ] = "executing pre-scale scaling-config-action '{}'".format(
5721 vnf_config_primitive
5724 # look for primitive
5725 for config_primitive
in (
5726 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
5727 ).get("config-primitive", ()):
5728 if config_primitive
["name"] == vnf_config_primitive
:
5732 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
5733 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
5734 "primitive".format(scaling_group
, vnf_config_primitive
)
5737 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
5738 if db_vnfr
.get("additionalParamsForVnf"):
5739 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
5741 scale_process
= "VCA"
5742 db_nsr_update
["config-status"] = "configuring pre-scaling"
5743 primitive_params
= self
._map
_primitive
_params
(
5744 config_primitive
, {}, vnfr_params
5747 # Pre-scale retry check: Check if this sub-operation has been executed before
5748 op_index
= self
._check
_or
_add
_scale
_suboperation
(
5751 vnf_config_primitive
,
5755 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
5756 # Skip sub-operation
5757 result
= "COMPLETED"
5758 result_detail
= "Done"
5761 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5762 vnf_config_primitive
, result
, result_detail
5766 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
5767 # New sub-operation: Get index of this sub-operation
5769 len(db_nslcmop
.get("_admin", {}).get("operations"))
5774 + "vnf_config_primitive={} New sub-operation".format(
5775 vnf_config_primitive
5779 # retry: Get registered params for this existing sub-operation
5780 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
5783 vnf_index
= op
.get("member_vnf_index")
5784 vnf_config_primitive
= op
.get("primitive")
5785 primitive_params
= op
.get("primitive_params")
5788 + "vnf_config_primitive={} Sub-operation retry".format(
5789 vnf_config_primitive
5792 # Execute the primitive, either with new (first-time) or registered (reintent) args
5793 ee_descriptor_id
= config_primitive
.get(
5794 "execution-environment-ref"
5796 primitive_name
= config_primitive
.get(
5797 "execution-environment-primitive", vnf_config_primitive
5799 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5800 nsr_deployed
["VCA"],
5801 member_vnf_index
=vnf_index
,
5803 vdu_count_index
=None,
5804 ee_descriptor_id
=ee_descriptor_id
,
5806 result
, result_detail
= await self
._ns
_execute
_primitive
(
5815 + "vnf_config_primitive={} Done with result {} {}".format(
5816 vnf_config_primitive
, result
, result_detail
5819 # Update operationState = COMPLETED | FAILED
5820 self
._update
_suboperation
_status
(
5821 db_nslcmop
, op_index
, result
, result_detail
5824 if result
== "FAILED":
5825 raise LcmException(result_detail
)
5826 db_nsr_update
["config-status"] = old_config_status
5827 scale_process
= None
5831 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
5834 "_admin.scaling-group.{}.time".format(admin_scale_index
)
5837 # SCALE-IN VCA - BEGIN
5838 if vca_scaling_info
:
5839 step
= db_nslcmop_update
[
5841 ] = "Deleting the execution environments"
5842 scale_process
= "VCA"
5843 for vca_info
in vca_scaling_info
:
5844 if vca_info
["type"] == "delete":
5845 member_vnf_index
= str(vca_info
["member-vnf-index"])
5847 logging_text
+ "vdu info: {}".format(vca_info
)
5849 if vca_info
.get("osm_vdu_id"):
5850 vdu_id
= vca_info
["osm_vdu_id"]
5851 vdu_index
= int(vca_info
["vdu_index"])
5854 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5855 member_vnf_index
, vdu_id
, vdu_index
5859 kdu_id
= vca_info
["osm_kdu_id"]
5862 ] = "Scaling member_vnf_index={}, kdu_id={}, vdu_index={} ".format(
5863 member_vnf_index
, kdu_id
, vdu_index
5865 stage
[2] = step
= "Scaling in VCA"
5866 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
5867 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
5868 config_update
= db_nsr
["configurationStatus"]
5869 for vca_index
, vca
in enumerate(vca_update
):
5871 (vca
or vca
.get("ee_id"))
5872 and vca
["member-vnf-index"] == member_vnf_index
5873 and vca
["vdu_count_index"] == vdu_index
5875 if vca
.get("vdu_id"):
5876 config_descriptor
= get_configuration(
5877 db_vnfd
, vca
.get("vdu_id")
5879 elif vca
.get("kdu_name"):
5880 config_descriptor
= get_configuration(
5881 db_vnfd
, vca
.get("kdu_name")
5884 config_descriptor
= get_configuration(
5885 db_vnfd
, db_vnfd
["id"]
5887 operation_params
= (
5888 db_nslcmop
.get("operationParams") or {}
5890 exec_terminate_primitives
= not operation_params
.get(
5891 "skip_terminate_primitives"
5892 ) and vca
.get("needed_terminate")
5893 task
= asyncio
.ensure_future(
5902 exec_primitives
=exec_terminate_primitives
,
5906 timeout
=self
.timeout_charm_delete
,
5909 tasks_dict_info
[task
] = "Terminating VCA {}".format(
5912 del vca_update
[vca_index
]
5913 del config_update
[vca_index
]
5914 # wait for pending tasks of terminate primitives
5918 + "Waiting for tasks {}".format(
5919 list(tasks_dict_info
.keys())
5922 error_list
= await self
._wait
_for
_tasks
(
5926 self
.timeout_charm_delete
, self
.timeout_ns_terminate
5931 tasks_dict_info
.clear()
5933 raise LcmException("; ".join(error_list
))
5935 db_vca_and_config_update
= {
5936 "_admin.deployed.VCA": vca_update
,
5937 "configurationStatus": config_update
,
5940 "nsrs", db_nsr
["_id"], db_vca_and_config_update
5942 scale_process
= None
5943 # SCALE-IN VCA - END
5946 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
5947 scale_process
= "RO"
5948 if self
.ro_config
.get("ng"):
5949 await self
._scale
_ng
_ro
(
5950 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
5952 scaling_info
.pop("vdu-create", None)
5953 scaling_info
.pop("vdu-delete", None)
5955 scale_process
= None
5959 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
5960 scale_process
= "KDU"
5961 await self
._scale
_kdu
(
5962 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
5964 scaling_info
.pop("kdu-create", None)
5965 scaling_info
.pop("kdu-delete", None)
5967 scale_process
= None
5971 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5973 # SCALE-UP VCA - BEGIN
5974 if vca_scaling_info
:
5975 step
= db_nslcmop_update
[
5977 ] = "Creating new execution environments"
5978 scale_process
= "VCA"
5979 for vca_info
in vca_scaling_info
:
5980 if vca_info
["type"] == "create":
5981 member_vnf_index
= str(vca_info
["member-vnf-index"])
5983 logging_text
+ "vdu info: {}".format(vca_info
)
5985 vnfd_id
= db_vnfr
["vnfd-ref"]
5986 if vca_info
.get("osm_vdu_id"):
5987 vdu_index
= int(vca_info
["vdu_index"])
5988 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
5989 if db_vnfr
.get("additionalParamsForVnf"):
5990 deploy_params
.update(
5992 db_vnfr
["additionalParamsForVnf"].copy()
5995 descriptor_config
= get_configuration(
5996 db_vnfd
, db_vnfd
["id"]
5998 if descriptor_config
:
6003 logging_text
=logging_text
6004 + "member_vnf_index={} ".format(member_vnf_index
),
6007 nslcmop_id
=nslcmop_id
,
6013 member_vnf_index
=member_vnf_index
,
6014 vdu_index
=vdu_index
,
6016 deploy_params
=deploy_params
,
6017 descriptor_config
=descriptor_config
,
6018 base_folder
=base_folder
,
6019 task_instantiation_info
=tasks_dict_info
,
6022 vdu_id
= vca_info
["osm_vdu_id"]
6023 vdur
= find_in_list(
6024 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
6026 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
6027 if vdur
.get("additionalParams"):
6028 deploy_params_vdu
= parse_yaml_strings(
6029 vdur
["additionalParams"]
6032 deploy_params_vdu
= deploy_params
6033 deploy_params_vdu
["OSM"] = get_osm_params(
6034 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
6036 if descriptor_config
:
6041 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6042 member_vnf_index
, vdu_id
, vdu_index
6044 stage
[2] = step
= "Scaling out VCA"
6045 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6047 logging_text
=logging_text
6048 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6049 member_vnf_index
, vdu_id
, vdu_index
6053 nslcmop_id
=nslcmop_id
,
6059 member_vnf_index
=member_vnf_index
,
6060 vdu_index
=vdu_index
,
6062 deploy_params
=deploy_params_vdu
,
6063 descriptor_config
=descriptor_config
,
6064 base_folder
=base_folder
,
6065 task_instantiation_info
=tasks_dict_info
,
6069 kdu_name
= vca_info
["osm_kdu_id"]
6070 descriptor_config
= get_configuration(db_vnfd
, kdu_name
)
6071 if descriptor_config
:
6073 kdu_index
= int(vca_info
["kdu_index"])
6077 for x
in db_vnfr
["kdur"]
6078 if x
["kdu-name"] == kdu_name
6080 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
6081 if kdur
.get("additionalParams"):
6082 deploy_params_kdu
= parse_yaml_strings(
6083 kdur
["additionalParams"]
6087 logging_text
=logging_text
,
6090 nslcmop_id
=nslcmop_id
,
6096 member_vnf_index
=member_vnf_index
,
6097 vdu_index
=kdu_index
,
6099 deploy_params
=deploy_params_kdu
,
6100 descriptor_config
=descriptor_config
,
6101 base_folder
=base_folder
,
6102 task_instantiation_info
=tasks_dict_info
,
6105 # SCALE-UP VCA - END
6106 scale_process
= None
6109 # execute primitive service POST-SCALING
6110 step
= "Executing post-scale vnf-config-primitive"
6111 if scaling_descriptor
.get("scaling-config-action"):
6112 for scaling_config_action
in scaling_descriptor
[
6113 "scaling-config-action"
6116 scaling_config_action
.get("trigger") == "post-scale-in"
6117 and scaling_type
== "SCALE_IN"
6119 scaling_config_action
.get("trigger") == "post-scale-out"
6120 and scaling_type
== "SCALE_OUT"
6122 vnf_config_primitive
= scaling_config_action
[
6123 "vnf-config-primitive-name-ref"
6125 step
= db_nslcmop_update
[
6127 ] = "executing post-scale scaling-config-action '{}'".format(
6128 vnf_config_primitive
6131 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6132 if db_vnfr
.get("additionalParamsForVnf"):
6133 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6135 # look for primitive
6136 for config_primitive
in (
6137 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6138 ).get("config-primitive", ()):
6139 if config_primitive
["name"] == vnf_config_primitive
:
6143 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
6144 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
6145 "config-primitive".format(
6146 scaling_group
, vnf_config_primitive
6149 scale_process
= "VCA"
6150 db_nsr_update
["config-status"] = "configuring post-scaling"
6151 primitive_params
= self
._map
_primitive
_params
(
6152 config_primitive
, {}, vnfr_params
6155 # Post-scale retry check: Check if this sub-operation has been executed before
6156 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6159 vnf_config_primitive
,
6163 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6164 # Skip sub-operation
6165 result
= "COMPLETED"
6166 result_detail
= "Done"
6169 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6170 vnf_config_primitive
, result
, result_detail
6174 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6175 # New sub-operation: Get index of this sub-operation
6177 len(db_nslcmop
.get("_admin", {}).get("operations"))
6182 + "vnf_config_primitive={} New sub-operation".format(
6183 vnf_config_primitive
6187 # retry: Get registered params for this existing sub-operation
6188 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6191 vnf_index
= op
.get("member_vnf_index")
6192 vnf_config_primitive
= op
.get("primitive")
6193 primitive_params
= op
.get("primitive_params")
6196 + "vnf_config_primitive={} Sub-operation retry".format(
6197 vnf_config_primitive
6200 # Execute the primitive, either with new (first-time) or registered (reintent) args
6201 ee_descriptor_id
= config_primitive
.get(
6202 "execution-environment-ref"
6204 primitive_name
= config_primitive
.get(
6205 "execution-environment-primitive", vnf_config_primitive
6207 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6208 nsr_deployed
["VCA"],
6209 member_vnf_index
=vnf_index
,
6211 vdu_count_index
=None,
6212 ee_descriptor_id
=ee_descriptor_id
,
6214 result
, result_detail
= await self
._ns
_execute
_primitive
(
6223 + "vnf_config_primitive={} Done with result {} {}".format(
6224 vnf_config_primitive
, result
, result_detail
6227 # Update operationState = COMPLETED | FAILED
6228 self
._update
_suboperation
_status
(
6229 db_nslcmop
, op_index
, result
, result_detail
6232 if result
== "FAILED":
6233 raise LcmException(result_detail
)
6234 db_nsr_update
["config-status"] = old_config_status
6235 scale_process
= None
6240 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
6241 db_nsr_update
["operational-status"] = (
6243 if old_operational_status
== "failed"
6244 else old_operational_status
6246 db_nsr_update
["config-status"] = old_config_status
6249 ROclient
.ROClientException
,
6254 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
6256 except asyncio
.CancelledError
:
6258 logging_text
+ "Cancelled Exception while '{}'".format(step
)
6260 exc
= "Operation was cancelled"
6261 except Exception as e
:
6262 exc
= traceback
.format_exc()
6263 self
.logger
.critical(
6264 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6268 self
._write
_ns
_status
(
6271 current_operation
="IDLE",
6272 current_operation_id
=None,
6275 stage
[1] = "Waiting for instantiate pending tasks."
6276 self
.logger
.debug(logging_text
+ stage
[1])
6277 exc
= await self
._wait
_for
_tasks
(
6280 self
.timeout_ns_deploy
,
6288 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
6289 nslcmop_operation_state
= "FAILED"
6291 db_nsr_update
["operational-status"] = old_operational_status
6292 db_nsr_update
["config-status"] = old_config_status
6293 db_nsr_update
["detailed-status"] = ""
6295 if "VCA" in scale_process
:
6296 db_nsr_update
["config-status"] = "failed"
6297 if "RO" in scale_process
:
6298 db_nsr_update
["operational-status"] = "failed"
6301 ] = "FAILED scaling nslcmop={} {}: {}".format(
6302 nslcmop_id
, step
, exc
6305 error_description_nslcmop
= None
6306 nslcmop_operation_state
= "COMPLETED"
6307 db_nslcmop_update
["detailed-status"] = "Done"
6309 self
._write
_op
_status
(
6312 error_message
=error_description_nslcmop
,
6313 operation_state
=nslcmop_operation_state
,
6314 other_update
=db_nslcmop_update
,
6317 self
._write
_ns
_status
(
6320 current_operation
="IDLE",
6321 current_operation_id
=None,
6322 other_update
=db_nsr_update
,
6325 if nslcmop_operation_state
:
6329 "nslcmop_id": nslcmop_id
,
6330 "operationState": nslcmop_operation_state
,
6332 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
6333 except Exception as e
:
6335 logging_text
+ "kafka_write notification Exception {}".format(e
)
6337 self
.logger
.debug(logging_text
+ "Exit")
6338 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
6340 async def _scale_kdu(
6341 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6343 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
6344 for kdu_name
in _scaling_info
:
6345 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
6346 deployed_kdu
, index
= get_deployed_kdu(
6347 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
6349 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
6350 kdu_instance
= deployed_kdu
["kdu-instance"]
6351 scale
= int(kdu_scaling_info
["scale"])
6352 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
6355 "collection": "nsrs",
6356 "filter": {"_id": nsr_id
},
6357 "path": "_admin.deployed.K8s.{}".format(index
),
6360 step
= "scaling application {}".format(
6361 kdu_scaling_info
["resource-name"]
6363 self
.logger
.debug(logging_text
+ step
)
6365 if kdu_scaling_info
["type"] == "delete":
6366 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
6369 and kdu_config
.get("terminate-config-primitive")
6370 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
6372 terminate_config_primitive_list
= kdu_config
.get(
6373 "terminate-config-primitive"
6375 terminate_config_primitive_list
.sort(
6376 key
=lambda val
: int(val
["seq"])
6380 terminate_config_primitive
6381 ) in terminate_config_primitive_list
:
6382 primitive_params_
= self
._map
_primitive
_params
(
6383 terminate_config_primitive
, {}, {}
6385 step
= "execute terminate config primitive"
6386 self
.logger
.debug(logging_text
+ step
)
6387 await asyncio
.wait_for(
6388 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
6389 cluster_uuid
=cluster_uuid
,
6390 kdu_instance
=kdu_instance
,
6391 primitive_name
=terminate_config_primitive
["name"],
6392 params
=primitive_params_
,
6399 await asyncio
.wait_for(
6400 self
.k8scluster_map
[k8s_cluster_type
].scale(
6403 kdu_scaling_info
["resource-name"],
6406 timeout
=self
.timeout_vca_on_error
,
6409 if kdu_scaling_info
["type"] == "create":
6410 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
6413 and kdu_config
.get("initial-config-primitive")
6414 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
6416 initial_config_primitive_list
= kdu_config
.get(
6417 "initial-config-primitive"
6419 initial_config_primitive_list
.sort(
6420 key
=lambda val
: int(val
["seq"])
6423 for initial_config_primitive
in initial_config_primitive_list
:
6424 primitive_params_
= self
._map
_primitive
_params
(
6425 initial_config_primitive
, {}, {}
6427 step
= "execute initial config primitive"
6428 self
.logger
.debug(logging_text
+ step
)
6429 await asyncio
.wait_for(
6430 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
6431 cluster_uuid
=cluster_uuid
,
6432 kdu_instance
=kdu_instance
,
6433 primitive_name
=initial_config_primitive
["name"],
6434 params
=primitive_params_
,
6441 async def _scale_ng_ro(
6442 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
6444 nsr_id
= db_nslcmop
["nsInstanceId"]
6445 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
6448 # read from db: vnfd's for every vnf
6451 # for each vnf in ns, read vnfd
6452 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
6453 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
6454 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
6455 # if we haven't this vnfd, read it from db
6456 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
6458 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
6459 db_vnfds
.append(vnfd
)
6460 n2vc_key
= self
.n2vc
.get_public_key()
6461 n2vc_key_list
= [n2vc_key
]
6464 vdu_scaling_info
.get("vdu-create"),
6465 vdu_scaling_info
.get("vdu-delete"),
6468 # db_vnfr has been updated, update db_vnfrs to use it
6469 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
6470 await self
._instantiate
_ng
_ro
(
6480 start_deploy
=time(),
6481 timeout_ns_deploy
=self
.timeout_ns_deploy
,
6483 if vdu_scaling_info
.get("vdu-delete"):
6485 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
6488 async def extract_prometheus_scrape_jobs(
6489 self
, ee_id
, artifact_path
, ee_config_descriptor
, vnfr_id
, nsr_id
, target_ip
6491 # look if exist a file called 'prometheus*.j2' and
6492 artifact_content
= self
.fs
.dir_ls(artifact_path
)
6496 for f
in artifact_content
6497 if f
.startswith("prometheus") and f
.endswith(".j2")
6503 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
6507 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
6508 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
6510 vnfr_id
= vnfr_id
.replace("-", "")
6512 "JOB_NAME": vnfr_id
,
6513 "TARGET_IP": target_ip
,
6514 "EXPORTER_POD_IP": host_name
,
6515 "EXPORTER_POD_PORT": host_port
,
6517 job_list
= parse_job(job_data
, variables
)
6518 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
6519 for job
in job_list
:
6521 not isinstance(job
.get("job_name"), str)
6522 or vnfr_id
not in job
["job_name"]
6524 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
6525 job
["nsr_id"] = nsr_id
6526 job
["vnfr_id"] = vnfr_id
6529 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
6531 Get VCA Cloud and VCA Cloud Credentials for the VIM account
6533 :param: vim_account_id: VIM Account ID
6535 :return: (cloud_name, cloud_credential)
6537 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
6538 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
6540 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
6542 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
6544 :param: vim_account_id: VIM Account ID
6546 :return: (cloud_name, cloud_credential)
6548 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
6549 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")