1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
20 from typing
import Any
, Dict
, List
23 import logging
.handlers
34 from osm_lcm
import ROclient
35 from osm_lcm
.data_utils
.nsr
import (
38 get_deployed_vca_list
,
41 from osm_lcm
.data_utils
.vca
import (
50 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
51 from osm_lcm
.lcm_utils
import (
59 from osm_lcm
.data_utils
.nsd
import (
60 get_ns_configuration_relation_list
,
64 from osm_lcm
.data_utils
.vnfd
import (
68 get_ee_sorted_initial_config_primitive_list
,
69 get_ee_sorted_terminate_config_primitive_list
,
71 get_virtual_link_profiles
,
76 get_number_of_instances
,
78 get_kdu_resource_profile
,
80 from osm_lcm
.data_utils
.list_utils
import find_in_list
81 from osm_lcm
.data_utils
.vnfr
import get_osm_params
, get_vdur_index
, get_kdur
82 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
83 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
84 from n2vc
.definitions
import RelationEndpoint
85 from n2vc
.k8s_helm_conn
import K8sHelmConnector
86 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
87 from n2vc
.k8s_juju_conn
import K8sJujuConnector
89 from osm_common
.dbbase
import DbException
90 from osm_common
.fsbase
import FsException
92 from osm_lcm
.data_utils
.database
.database
import Database
93 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
95 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
96 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
98 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
99 from osm_lcm
.prometheus
import parse_job
101 from copy
import copy
, deepcopy
102 from time
import time
103 from uuid
import uuid4
105 from random
import randint
107 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
110 class NsLcm(LcmBase
):
111 timeout_vca_on_error
= (
113 ) # Time for charm from first time at blocked,error status to mark as failed
114 timeout_ns_deploy
= 2 * 3600 # default global timeout for deployment a ns
115 timeout_ns_terminate
= 1800 # default global timeout for un deployment a ns
116 timeout_charm_delete
= 10 * 60
117 timeout_primitive
= 30 * 60 # timeout for primitive execution
118 timeout_progress_primitive
= (
120 ) # timeout for some progress in a primitive execution
122 SUBOPERATION_STATUS_NOT_FOUND
= -1
123 SUBOPERATION_STATUS_NEW
= -2
124 SUBOPERATION_STATUS_SKIP
= -3
125 task_name_deploy_vca
= "Deploying VCA"
127 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
129 Init, Connect to database, filesystem storage, and messaging
130 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
133 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
135 self
.db
= Database().instance
.db
136 self
.fs
= Filesystem().instance
.fs
138 self
.lcm_tasks
= lcm_tasks
139 self
.timeout
= config
["timeout"]
140 self
.ro_config
= config
["ro_config"]
141 self
.ng_ro
= config
["ro_config"].get("ng")
142 self
.vca_config
= config
["VCA"].copy()
144 # create N2VC connector
145 self
.n2vc
= N2VCJujuConnector(
148 on_update_db
=self
._on
_update
_n
2vc
_db
,
153 self
.conn_helm_ee
= LCMHelmConn(
156 vca_config
=self
.vca_config
,
157 on_update_db
=self
._on
_update
_n
2vc
_db
,
160 self
.k8sclusterhelm2
= K8sHelmConnector(
161 kubectl_command
=self
.vca_config
.get("kubectlpath"),
162 helm_command
=self
.vca_config
.get("helmpath"),
169 self
.k8sclusterhelm3
= K8sHelm3Connector(
170 kubectl_command
=self
.vca_config
.get("kubectlpath"),
171 helm_command
=self
.vca_config
.get("helm3path"),
178 self
.k8sclusterjuju
= K8sJujuConnector(
179 kubectl_command
=self
.vca_config
.get("kubectlpath"),
180 juju_command
=self
.vca_config
.get("jujupath"),
183 on_update_db
=self
._on
_update
_k
8s
_db
,
188 self
.k8scluster_map
= {
189 "helm-chart": self
.k8sclusterhelm2
,
190 "helm-chart-v3": self
.k8sclusterhelm3
,
191 "chart": self
.k8sclusterhelm3
,
192 "juju-bundle": self
.k8sclusterjuju
,
193 "juju": self
.k8sclusterjuju
,
197 "lxc_proxy_charm": self
.n2vc
,
198 "native_charm": self
.n2vc
,
199 "k8s_proxy_charm": self
.n2vc
,
200 "helm": self
.conn_helm_ee
,
201 "helm-v3": self
.conn_helm_ee
,
205 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
)
208 def increment_ip_mac(ip_mac
, vm_index
=1):
209 if not isinstance(ip_mac
, str):
212 # try with ipv4 look for last dot
213 i
= ip_mac
.rfind(".")
216 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
217 # try with ipv6 or mac look for last colon. Operate in hex
218 i
= ip_mac
.rfind(":")
221 # format in hex, len can be 2 for mac or 4 for ipv6
222 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
223 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
229 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
231 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
234 # TODO filter RO descriptor fields...
238 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
239 db_dict
["deploymentStatus"] = ro_descriptor
240 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
242 except Exception as e
:
244 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id
, e
)
247 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
249 # remove last dot from path (if exists)
250 if path
.endswith("."):
253 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
254 # .format(table, filter, path, updated_data))
257 nsr_id
= filter.get("_id")
259 # read ns record from database
260 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
261 current_ns_status
= nsr
.get("nsState")
263 # get vca status for NS
264 status_dict
= await self
.n2vc
.get_status(
265 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
270 db_dict
["vcaStatus"] = status_dict
271 await self
.n2vc
.update_vca_status(db_dict
["vcaStatus"], vca_id
=vca_id
)
273 # update configurationStatus for this VCA
275 vca_index
= int(path
[path
.rfind(".") + 1 :])
278 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
280 vca_status
= vca_list
[vca_index
].get("status")
282 configuration_status_list
= nsr
.get("configurationStatus")
283 config_status
= configuration_status_list
[vca_index
].get("status")
285 if config_status
== "BROKEN" and vca_status
!= "failed":
286 db_dict
["configurationStatus"][vca_index
] = "READY"
287 elif config_status
!= "BROKEN" and vca_status
== "failed":
288 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
289 except Exception as e
:
290 # not update configurationStatus
291 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
293 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
294 # if nsState = 'DEGRADED' check if all is OK
296 if current_ns_status
in ("READY", "DEGRADED"):
297 error_description
= ""
299 if status_dict
.get("machines"):
300 for machine_id
in status_dict
.get("machines"):
301 machine
= status_dict
.get("machines").get(machine_id
)
302 # check machine agent-status
303 if machine
.get("agent-status"):
304 s
= machine
.get("agent-status").get("status")
307 error_description
+= (
308 "machine {} agent-status={} ; ".format(
312 # check machine instance status
313 if machine
.get("instance-status"):
314 s
= machine
.get("instance-status").get("status")
317 error_description
+= (
318 "machine {} instance-status={} ; ".format(
323 if status_dict
.get("applications"):
324 for app_id
in status_dict
.get("applications"):
325 app
= status_dict
.get("applications").get(app_id
)
326 # check application status
327 if app
.get("status"):
328 s
= app
.get("status").get("status")
331 error_description
+= (
332 "application {} status={} ; ".format(app_id
, s
)
335 if error_description
:
336 db_dict
["errorDescription"] = error_description
337 if current_ns_status
== "READY" and is_degraded
:
338 db_dict
["nsState"] = "DEGRADED"
339 if current_ns_status
== "DEGRADED" and not is_degraded
:
340 db_dict
["nsState"] = "READY"
343 self
.update_db_2("nsrs", nsr_id
, db_dict
)
345 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
347 except Exception as e
:
348 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
350 async def _on_update_k8s_db(
351 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None, cluster_type
="juju"
354 Updating vca status in NSR record
355 :param cluster_uuid: UUID of a k8s cluster
356 :param kdu_instance: The unique name of the KDU instance
357 :param filter: To get nsr_id
358 :cluster_type: The cluster type (juju, k8s)
362 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
363 # .format(cluster_uuid, kdu_instance, filter))
365 nsr_id
= filter.get("_id")
367 vca_status
= await self
.k8scluster_map
[cluster_type
].status_kdu(
368 cluster_uuid
=cluster_uuid
,
369 kdu_instance
=kdu_instance
,
371 complete_status
=True,
377 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
379 if cluster_type
in ("juju-bundle", "juju"):
380 # TODO -> this should be done in a more uniform way, I think in N2VC, in order to update the K8s VCA
381 # status in a similar way between Juju Bundles and Helm Charts on this side
382 await self
.k8sclusterjuju
.update_vca_status(
383 db_dict
["vcaStatus"],
389 f
"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
393 self
.update_db_2("nsrs", nsr_id
, db_dict
)
394 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
396 except Exception as e
:
397 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
400 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
402 env
= Environment(undefined
=StrictUndefined
)
403 template
= env
.from_string(cloud_init_text
)
404 return template
.render(additional_params
or {})
405 except UndefinedError
as e
:
407 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
408 "file, must be provided in the instantiation parameters inside the "
409 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
411 except (TemplateError
, TemplateNotFound
) as e
:
413 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
418 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
419 cloud_init_content
= cloud_init_file
= None
421 if vdu
.get("cloud-init-file"):
422 base_folder
= vnfd
["_admin"]["storage"]
423 if base_folder
["pkg-dir"]:
424 cloud_init_file
= "{}/{}/cloud_init/{}".format(
425 base_folder
["folder"],
426 base_folder
["pkg-dir"],
427 vdu
["cloud-init-file"],
430 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
431 base_folder
["folder"],
432 vdu
["cloud-init-file"],
434 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
435 cloud_init_content
= ci_file
.read()
436 elif vdu
.get("cloud-init"):
437 cloud_init_content
= vdu
["cloud-init"]
439 return cloud_init_content
440 except FsException
as e
:
442 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
443 vnfd
["id"], vdu
["id"], cloud_init_file
, e
447 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
449 (vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]),
452 additional_params
= vdur
.get("additionalParams")
453 return parse_yaml_strings(additional_params
)
455 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
457 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
458 :param vnfd: input vnfd
459 :param new_id: overrides vnf id if provided
460 :param additionalParams: Instantiation params for VNFs provided
461 :param nsrId: Id of the NSR
462 :return: copy of vnfd
464 vnfd_RO
= deepcopy(vnfd
)
465 # remove unused by RO configuration, monitoring, scaling and internal keys
466 vnfd_RO
.pop("_id", None)
467 vnfd_RO
.pop("_admin", None)
468 vnfd_RO
.pop("monitoring-param", None)
469 vnfd_RO
.pop("scaling-group-descriptor", None)
470 vnfd_RO
.pop("kdu", None)
471 vnfd_RO
.pop("k8s-cluster", None)
473 vnfd_RO
["id"] = new_id
475 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
476 for vdu
in get_iterable(vnfd_RO
, "vdu"):
477 vdu
.pop("cloud-init-file", None)
478 vdu
.pop("cloud-init", None)
482 def ip_profile_2_RO(ip_profile
):
483 RO_ip_profile
= deepcopy(ip_profile
)
484 if "dns-server" in RO_ip_profile
:
485 if isinstance(RO_ip_profile
["dns-server"], list):
486 RO_ip_profile
["dns-address"] = []
487 for ds
in RO_ip_profile
.pop("dns-server"):
488 RO_ip_profile
["dns-address"].append(ds
["address"])
490 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
491 if RO_ip_profile
.get("ip-version") == "ipv4":
492 RO_ip_profile
["ip-version"] = "IPv4"
493 if RO_ip_profile
.get("ip-version") == "ipv6":
494 RO_ip_profile
["ip-version"] = "IPv6"
495 if "dhcp-params" in RO_ip_profile
:
496 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
499 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
500 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
501 if db_vim
["_admin"]["operationalState"] != "ENABLED":
503 "VIM={} is not available. operationalState={}".format(
504 vim_account
, db_vim
["_admin"]["operationalState"]
507 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
510 def get_ro_wim_id_for_wim_account(self
, wim_account
):
511 if isinstance(wim_account
, str):
512 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
513 if db_wim
["_admin"]["operationalState"] != "ENABLED":
515 "WIM={} is not available. operationalState={}".format(
516 wim_account
, db_wim
["_admin"]["operationalState"]
519 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
524 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
526 db_vdu_push_list
= []
528 db_update
= {"_admin.modified": time()}
530 for vdu_id
, vdu_count
in vdu_create
.items():
534 for vdur
in reversed(db_vnfr
["vdur"])
535 if vdur
["vdu-id-ref"] == vdu_id
540 # Read the template saved in the db:
541 self
.logger
.debug(f
"No vdur in the database. Using the vdur-template to scale")
542 vdur_template
= db_vnfr
.get("vdur-template")
543 if not vdur_template
:
545 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
549 vdur
= vdur_template
[0]
550 #Delete a template from the database after using it
551 self
.db
.set_one("vnfrs",
552 {"_id": db_vnfr
["_id"]},
554 pull
={"vdur-template": {"_id": vdur
['_id']}}
556 for count
in range(vdu_count
):
557 vdur_copy
= deepcopy(vdur
)
558 vdur_copy
["status"] = "BUILD"
559 vdur_copy
["status-detailed"] = None
560 vdur_copy
["ip-address"] = None
561 vdur_copy
["_id"] = str(uuid4())
562 vdur_copy
["count-index"] += count
+ 1
563 vdur_copy
["id"] = "{}-{}".format(
564 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
566 vdur_copy
.pop("vim_info", None)
567 for iface
in vdur_copy
["interfaces"]:
568 if iface
.get("fixed-ip"):
569 iface
["ip-address"] = self
.increment_ip_mac(
570 iface
["ip-address"], count
+ 1
573 iface
.pop("ip-address", None)
574 if iface
.get("fixed-mac"):
575 iface
["mac-address"] = self
.increment_ip_mac(
576 iface
["mac-address"], count
+ 1
579 iface
.pop("mac-address", None)
583 ) # only first vdu can be managment of vnf
584 db_vdu_push_list
.append(vdur_copy
)
585 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
587 if len(db_vnfr
["vdur"]) == 1:
588 # The scale will move to 0 instances
589 self
.logger
.debug(f
"Scaling to 0 !, creating the template with the last vdur")
590 template_vdur
= [db_vnfr
["vdur"][0]]
591 for vdu_id
, vdu_count
in vdu_delete
.items():
593 indexes_to_delete
= [
595 for iv
in enumerate(db_vnfr
["vdur"])
596 if iv
[1]["vdu-id-ref"] == vdu_id
600 "vdur.{}.status".format(i
): "DELETING"
601 for i
in indexes_to_delete
[-vdu_count
:]
605 # it must be deleted one by one because common.db does not allow otherwise
608 for v
in reversed(db_vnfr
["vdur"])
609 if v
["vdu-id-ref"] == vdu_id
611 for vdu
in vdus_to_delete
[:vdu_count
]:
614 {"_id": db_vnfr
["_id"]},
616 pull
={"vdur": {"_id": vdu
["_id"]}},
620 db_push
["vdur"] = db_vdu_push_list
622 db_push
["vdur-template"] = template_vdur
625 db_vnfr
["vdur-template"] = template_vdur
626 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
627 # modify passed dictionary db_vnfr
628 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
629 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
631 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
633 Updates database nsr with the RO info for the created vld
634 :param ns_update_nsr: dictionary to be filled with the updated info
635 :param db_nsr: content of db_nsr. This is also modified
636 :param nsr_desc_RO: nsr descriptor from RO
637 :return: Nothing, LcmException is raised on errors
640 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
641 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
642 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
644 vld
["vim-id"] = net_RO
.get("vim_net_id")
645 vld
["name"] = net_RO
.get("vim_name")
646 vld
["status"] = net_RO
.get("status")
647 vld
["status-detailed"] = net_RO
.get("error_msg")
648 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
652 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
655 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
657 for db_vnfr
in db_vnfrs
.values():
658 vnfr_update
= {"status": "ERROR"}
659 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
660 if "status" not in vdur
:
661 vdur
["status"] = "ERROR"
662 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
664 vdur
["status-detailed"] = str(error_text
)
666 "vdur.{}.status-detailed".format(vdu_index
)
668 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
669 except DbException
as e
:
670 self
.logger
.error("Cannot update vnf. {}".format(e
))
672 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
674 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
675 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
676 :param nsr_desc_RO: nsr descriptor from RO
677 :return: Nothing, LcmException is raised on errors
679 for vnf_index
, db_vnfr
in db_vnfrs
.items():
680 for vnf_RO
in nsr_desc_RO
["vnfs"]:
681 if vnf_RO
["member_vnf_index"] != vnf_index
:
684 if vnf_RO
.get("ip_address"):
685 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
[
688 elif not db_vnfr
.get("ip-address"):
689 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
690 raise LcmExceptionNoMgmtIP(
691 "ns member_vnf_index '{}' has no IP address".format(
696 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
697 vdur_RO_count_index
= 0
698 if vdur
.get("pdu-type"):
700 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
701 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
703 if vdur
["count-index"] != vdur_RO_count_index
:
704 vdur_RO_count_index
+= 1
706 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
707 if vdur_RO
.get("ip_address"):
708 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
710 vdur
["ip-address"] = None
711 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
712 vdur
["name"] = vdur_RO
.get("vim_name")
713 vdur
["status"] = vdur_RO
.get("status")
714 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
715 for ifacer
in get_iterable(vdur
, "interfaces"):
716 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
717 if ifacer
["name"] == interface_RO
.get("internal_name"):
718 ifacer
["ip-address"] = interface_RO
.get(
721 ifacer
["mac-address"] = interface_RO
.get(
727 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
728 "from VIM info".format(
729 vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]
732 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
736 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
738 vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]
742 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
743 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
744 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
746 vld
["vim-id"] = net_RO
.get("vim_net_id")
747 vld
["name"] = net_RO
.get("vim_name")
748 vld
["status"] = net_RO
.get("status")
749 vld
["status-detailed"] = net_RO
.get("error_msg")
750 vnfr_update
["vld.{}".format(vld_index
)] = vld
754 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
759 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
764 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
769 def _get_ns_config_info(self
, nsr_id
):
771 Generates a mapping between vnf,vdu elements and the N2VC id
772 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
773 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
774 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
775 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
777 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
778 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
780 ns_config_info
= {"osm-config-mapping": mapping
}
781 for vca
in vca_deployed_list
:
782 if not vca
["member-vnf-index"]:
784 if not vca
["vdu_id"]:
785 mapping
[vca
["member-vnf-index"]] = vca
["application"]
789 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
791 ] = vca
["application"]
792 return ns_config_info
794 async def _instantiate_ng_ro(
811 def get_vim_account(vim_account_id
):
813 if vim_account_id
in db_vims
:
814 return db_vims
[vim_account_id
]
815 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
816 db_vims
[vim_account_id
] = db_vim
819 # modify target_vld info with instantiation parameters
820 def parse_vld_instantiation_params(
821 target_vim
, target_vld
, vld_params
, target_sdn
823 if vld_params
.get("ip-profile"):
824 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
[
827 if vld_params
.get("provider-network"):
828 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
831 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
832 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
835 if vld_params
.get("wimAccountId"):
836 target_wim
= "wim:{}".format(vld_params
["wimAccountId"])
837 target_vld
["vim_info"][target_wim
] = {}
838 for param
in ("vim-network-name", "vim-network-id"):
839 if vld_params
.get(param
):
840 if isinstance(vld_params
[param
], dict):
841 for vim
, vim_net
in vld_params
[param
].items():
842 other_target_vim
= "vim:" + vim
844 target_vld
["vim_info"],
845 (other_target_vim
, param
.replace("-", "_")),
848 else: # isinstance str
849 target_vld
["vim_info"][target_vim
][
850 param
.replace("-", "_")
851 ] = vld_params
[param
]
852 if vld_params
.get("common_id"):
853 target_vld
["common_id"] = vld_params
.get("common_id")
855 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
856 def update_ns_vld_target(target
, ns_params
):
857 for vnf_params
in ns_params
.get("vnf", ()):
858 if vnf_params
.get("vimAccountId"):
862 for vnfr
in db_vnfrs
.values()
863 if vnf_params
["member-vnf-index"]
864 == vnfr
["member-vnf-index-ref"]
868 vdur
= next((vdur
for vdur
in target_vnf
.get("vdur", ())), None)
869 for a_index
, a_vld
in enumerate(target
["ns"]["vld"]):
870 target_vld
= find_in_list(
871 get_iterable(vdur
, "interfaces"),
872 lambda iface
: iface
.get("ns-vld-id") == a_vld
["name"],
875 if vnf_params
.get("vimAccountId") not in a_vld
.get(
878 target
["ns"]["vld"][a_index
].get("vim_info").update(
880 "vim:{}".format(vnf_params
["vimAccountId"]): {
881 "vim_network_name": ""
886 nslcmop_id
= db_nslcmop
["_id"]
888 "name": db_nsr
["name"],
891 "image": deepcopy(db_nsr
["image"]),
892 "flavor": deepcopy(db_nsr
["flavor"]),
893 "action_id": nslcmop_id
,
894 "cloud_init_content": {},
896 for image
in target
["image"]:
897 image
["vim_info"] = {}
898 for flavor
in target
["flavor"]:
899 flavor
["vim_info"] = {}
900 if db_nsr
.get("affinity-or-anti-affinity-group"):
901 target
["affinity-or-anti-affinity-group"] = deepcopy(db_nsr
["affinity-or-anti-affinity-group"])
902 for affinity_or_anti_affinity_group
in target
["affinity-or-anti-affinity-group"]:
903 affinity_or_anti_affinity_group
["vim_info"] = {}
905 if db_nslcmop
.get("lcmOperationType") != "instantiate":
906 # get parameters of instantiation:
907 db_nslcmop_instantiate
= self
.db
.get_list(
910 "nsInstanceId": db_nslcmop
["nsInstanceId"],
911 "lcmOperationType": "instantiate",
914 ns_params
= db_nslcmop_instantiate
.get("operationParams")
916 ns_params
= db_nslcmop
.get("operationParams")
917 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
918 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
921 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
922 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
926 "mgmt-network": vld
.get("mgmt-network", False),
927 "type": vld
.get("type"),
930 "vim_network_name": vld
.get("vim-network-name"),
931 "vim_account_id": ns_params
["vimAccountId"],
935 # check if this network needs SDN assist
936 if vld
.get("pci-interfaces"):
937 db_vim
= get_vim_account(ns_params
["vimAccountId"])
938 sdnc_id
= db_vim
["config"].get("sdn-controller")
940 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
941 target_sdn
= "sdn:{}".format(sdnc_id
)
942 target_vld
["vim_info"][target_sdn
] = {
944 "target_vim": target_vim
,
946 "type": vld
.get("type"),
949 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
950 for nsd_vnf_profile
in nsd_vnf_profiles
:
951 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
952 if cp
["virtual-link-profile-id"] == vld
["id"]:
954 "member_vnf:{}.{}".format(
955 cp
["constituent-cpd-id"][0][
956 "constituent-base-element-id"
958 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
960 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
962 # check at nsd descriptor, if there is an ip-profile
964 nsd_vlp
= find_in_list(
965 get_virtual_link_profiles(nsd
),
966 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
971 and nsd_vlp
.get("virtual-link-protocol-data")
972 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
974 ip_profile_source_data
= nsd_vlp
["virtual-link-protocol-data"][
977 ip_profile_dest_data
= {}
978 if "ip-version" in ip_profile_source_data
:
979 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
982 if "cidr" in ip_profile_source_data
:
983 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
986 if "gateway-ip" in ip_profile_source_data
:
987 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
[
990 if "dhcp-enabled" in ip_profile_source_data
:
991 ip_profile_dest_data
["dhcp-params"] = {
992 "enabled": ip_profile_source_data
["dhcp-enabled"]
994 vld_params
["ip-profile"] = ip_profile_dest_data
996 # update vld_params with instantiation params
997 vld_instantiation_params
= find_in_list(
998 get_iterable(ns_params
, "vld"),
999 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
1001 if vld_instantiation_params
:
1002 vld_params
.update(vld_instantiation_params
)
1003 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
1004 target
["ns"]["vld"].append(target_vld
)
1005 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1006 update_ns_vld_target(target
, ns_params
)
1008 for vnfr
in db_vnfrs
.values():
1009 vnfd
= find_in_list(
1010 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
1012 vnf_params
= find_in_list(
1013 get_iterable(ns_params
, "vnf"),
1014 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
1016 target_vnf
= deepcopy(vnfr
)
1017 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
1018 for vld
in target_vnf
.get("vld", ()):
1019 # check if connected to a ns.vld, to fill target'
1020 vnf_cp
= find_in_list(
1021 vnfd
.get("int-virtual-link-desc", ()),
1022 lambda cpd
: cpd
.get("id") == vld
["id"],
1025 ns_cp
= "member_vnf:{}.{}".format(
1026 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
1028 if cp2target
.get(ns_cp
):
1029 vld
["target"] = cp2target
[ns_cp
]
1032 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
1034 # check if this network needs SDN assist
1036 if vld
.get("pci-interfaces"):
1037 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1038 sdnc_id
= db_vim
["config"].get("sdn-controller")
1040 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
1041 target_sdn
= "sdn:{}".format(sdnc_id
)
1042 vld
["vim_info"][target_sdn
] = {
1044 "target_vim": target_vim
,
1046 "type": vld
.get("type"),
1049 # check at vnfd descriptor, if there is an ip-profile
1051 vnfd_vlp
= find_in_list(
1052 get_virtual_link_profiles(vnfd
),
1053 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
1057 and vnfd_vlp
.get("virtual-link-protocol-data")
1058 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1060 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"][
1063 ip_profile_dest_data
= {}
1064 if "ip-version" in ip_profile_source_data
:
1065 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1068 if "cidr" in ip_profile_source_data
:
1069 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1072 if "gateway-ip" in ip_profile_source_data
:
1073 ip_profile_dest_data
[
1075 ] = ip_profile_source_data
["gateway-ip"]
1076 if "dhcp-enabled" in ip_profile_source_data
:
1077 ip_profile_dest_data
["dhcp-params"] = {
1078 "enabled": ip_profile_source_data
["dhcp-enabled"]
1081 vld_params
["ip-profile"] = ip_profile_dest_data
1082 # update vld_params with instantiation params
1084 vld_instantiation_params
= find_in_list(
1085 get_iterable(vnf_params
, "internal-vld"),
1086 lambda i_vld
: i_vld
["name"] == vld
["id"],
1088 if vld_instantiation_params
:
1089 vld_params
.update(vld_instantiation_params
)
1090 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
1093 for vdur
in target_vnf
.get("vdur", ()):
1094 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
1095 continue # This vdu must not be created
1096 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
1098 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
1101 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
1102 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
1105 and vdu_configuration
.get("config-access")
1106 and vdu_configuration
.get("config-access").get("ssh-access")
1108 vdur
["ssh-keys"] = ssh_keys_all
1109 vdur
["ssh-access-required"] = vdu_configuration
[
1111 ]["ssh-access"]["required"]
1114 and vnf_configuration
.get("config-access")
1115 and vnf_configuration
.get("config-access").get("ssh-access")
1116 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
1118 vdur
["ssh-keys"] = ssh_keys_all
1119 vdur
["ssh-access-required"] = vnf_configuration
[
1121 ]["ssh-access"]["required"]
1122 elif ssh_keys_instantiation
and find_in_list(
1123 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
1125 vdur
["ssh-keys"] = ssh_keys_instantiation
1127 self
.logger
.debug("NS > vdur > {}".format(vdur
))
1129 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1131 if vdud
.get("cloud-init-file"):
1132 vdur
["cloud-init"] = "{}:file:{}".format(
1133 vnfd
["_id"], vdud
.get("cloud-init-file")
1135 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1136 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1137 base_folder
= vnfd
["_admin"]["storage"]
1138 if base_folder
["pkg-dir"]:
1139 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1140 base_folder
["folder"],
1141 base_folder
["pkg-dir"],
1142 vdud
.get("cloud-init-file"),
1145 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
1146 base_folder
["folder"],
1147 vdud
.get("cloud-init-file"),
1149 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1150 target
["cloud_init_content"][
1153 elif vdud
.get("cloud-init"):
1154 vdur
["cloud-init"] = "{}:vdu:{}".format(
1155 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1157 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1158 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1161 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1162 deploy_params_vdu
= self
._format
_additional
_params
(
1163 vdur
.get("additionalParams") or {}
1165 deploy_params_vdu
["OSM"] = get_osm_params(
1166 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1168 vdur
["additionalParams"] = deploy_params_vdu
1171 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1172 if target_vim
not in ns_flavor
["vim_info"]:
1173 ns_flavor
["vim_info"][target_vim
] = {}
1176 # in case alternative images are provided we must check if they should be applied
1177 # for the vim_type, modify the vim_type taking into account
1178 ns_image_id
= int(vdur
["ns-image-id"])
1179 if vdur
.get("alt-image-ids"):
1180 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1181 vim_type
= db_vim
["vim_type"]
1182 for alt_image_id
in vdur
.get("alt-image-ids"):
1183 ns_alt_image
= target
["image"][int(alt_image_id
)]
1184 if vim_type
== ns_alt_image
.get("vim-type"):
1185 # must use alternative image
1187 "use alternative image id: {}".format(alt_image_id
)
1189 ns_image_id
= alt_image_id
1190 vdur
["ns-image-id"] = ns_image_id
1192 ns_image
= target
["image"][int(ns_image_id
)]
1193 if target_vim
not in ns_image
["vim_info"]:
1194 ns_image
["vim_info"][target_vim
] = {}
1197 if vdur
.get("affinity-or-anti-affinity-group-id"):
1198 for ags_id
in vdur
["affinity-or-anti-affinity-group-id"]:
1199 ns_ags
= target
["affinity-or-anti-affinity-group"][int(ags_id
)]
1200 if target_vim
not in ns_ags
["vim_info"]:
1201 ns_ags
["vim_info"][target_vim
] = {}
1203 vdur
["vim_info"] = {target_vim
: {}}
1204 # instantiation parameters
1206 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
1207 # vdud["id"]), None)
1208 vdur_list
.append(vdur
)
1209 target_vnf
["vdur"] = vdur_list
1210 target
["vnf"].append(target_vnf
)
1212 desc
= await self
.RO
.deploy(nsr_id
, target
)
1213 self
.logger
.debug("RO return > {}".format(desc
))
1214 action_id
= desc
["action_id"]
1215 await self
._wait
_ng
_ro
(
1216 nsr_id
, action_id
, nslcmop_id
, start_deploy
, timeout_ns_deploy
, stage
1221 "_admin.deployed.RO.operational-status": "running",
1222 "detailed-status": " ".join(stage
),
1224 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1225 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1226 self
._write
_op
_status
(nslcmop_id
, stage
)
1228 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1232 async def _wait_ng_ro(
1241 detailed_status_old
= None
1243 start_time
= start_time
or time()
1244 while time() <= start_time
+ timeout
:
1245 desc_status
= await self
.RO
.status(nsr_id
, action_id
)
1246 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1247 if desc_status
["status"] == "FAILED":
1248 raise NgRoException(desc_status
["details"])
1249 elif desc_status
["status"] == "BUILD":
1251 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1252 elif desc_status
["status"] == "DONE":
1254 stage
[2] = "Deployed at VIM"
1257 assert False, "ROclient.check_ns_status returns unknown {}".format(
1258 desc_status
["status"]
1260 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1261 detailed_status_old
= stage
[2]
1262 db_nsr_update
["detailed-status"] = " ".join(stage
)
1263 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1264 self
._write
_op
_status
(nslcmop_id
, stage
)
1265 await asyncio
.sleep(15, loop
=self
.loop
)
1266 else: # timeout_ns_deploy
1267 raise NgRoException("Timeout waiting ns to deploy")
1269 async def _terminate_ng_ro(
1270 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1275 start_deploy
= time()
1282 "action_id": nslcmop_id
,
1284 desc
= await self
.RO
.deploy(nsr_id
, target
)
1285 action_id
= desc
["action_id"]
1286 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1287 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1290 + "ns terminate action at RO. action_id={}".format(action_id
)
1294 delete_timeout
= 20 * 60 # 20 minutes
1295 await self
._wait
_ng
_ro
(
1296 nsr_id
, action_id
, nslcmop_id
, start_deploy
, delete_timeout
, stage
1299 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1300 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1302 await self
.RO
.delete(nsr_id
)
1303 except Exception as e
:
1304 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
1305 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1306 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1307 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1309 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1311 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
1312 failed_detail
.append("delete conflict: {}".format(e
))
1315 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1318 failed_detail
.append("delete error: {}".format(e
))
1321 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1325 stage
[2] = "Error deleting from VIM"
1327 stage
[2] = "Deleted from VIM"
1328 db_nsr_update
["detailed-status"] = " ".join(stage
)
1329 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1330 self
._write
_op
_status
(nslcmop_id
, stage
)
1333 raise LcmException("; ".join(failed_detail
))
1336 async def instantiate_RO(
1350 :param logging_text: preffix text to use at logging
1351 :param nsr_id: nsr identity
1352 :param nsd: database content of ns descriptor
1353 :param db_nsr: database content of ns record
1354 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1356 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1357 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1358 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1359 :return: None or exception
1362 start_deploy
= time()
1363 ns_params
= db_nslcmop
.get("operationParams")
1364 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1365 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1367 timeout_ns_deploy
= self
.timeout
.get(
1368 "ns_deploy", self
.timeout_ns_deploy
1371 # Check for and optionally request placement optimization. Database will be updated if placement activated
1372 stage
[2] = "Waiting for Placement."
1373 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1374 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1375 for vnfr
in db_vnfrs
.values():
1376 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1379 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1381 return await self
._instantiate
_ng
_ro
(
1394 except Exception as e
:
1395 stage
[2] = "ERROR deploying at VIM"
1396 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1398 "Error deploying at VIM {}".format(e
),
1399 exc_info
=not isinstance(
1402 ROclient
.ROClientException
,
1411 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1413 Wait for kdu to be up, get ip address
1414 :param logging_text: prefix use for logging
1421 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1424 while nb_tries
< 360:
1425 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1429 for x
in get_iterable(db_vnfr
, "kdur")
1430 if x
.get("kdu-name") == kdu_name
1436 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1438 if kdur
.get("status"):
1439 if kdur
["status"] in ("READY", "ENABLED"):
1440 return kdur
.get("ip-address")
1443 "target KDU={} is in error state".format(kdu_name
)
1446 await asyncio
.sleep(10, loop
=self
.loop
)
1448 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1450 async def wait_vm_up_insert_key_ro(
1451 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1454 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1455 :param logging_text: prefix use for logging
1460 :param pub_key: public ssh key to inject, None to skip
1461 :param user: user to apply the public ssh key
1465 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1469 target_vdu_id
= None
1475 if ro_retries
>= 360: # 1 hour
1477 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1480 await asyncio
.sleep(10, loop
=self
.loop
)
1483 if not target_vdu_id
:
1484 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1486 if not vdu_id
: # for the VNF case
1487 if db_vnfr
.get("status") == "ERROR":
1489 "Cannot inject ssh-key because target VNF is in error state"
1491 ip_address
= db_vnfr
.get("ip-address")
1497 for x
in get_iterable(db_vnfr
, "vdur")
1498 if x
.get("ip-address") == ip_address
1506 for x
in get_iterable(db_vnfr
, "vdur")
1507 if x
.get("vdu-id-ref") == vdu_id
1508 and x
.get("count-index") == vdu_index
1514 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1515 ): # If only one, this should be the target vdu
1516 vdur
= db_vnfr
["vdur"][0]
1519 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1520 vnfr_id
, vdu_id
, vdu_index
1523 # New generation RO stores information at "vim_info"
1526 if vdur
.get("vim_info"):
1528 t
for t
in vdur
["vim_info"]
1529 ) # there should be only one key
1530 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1532 vdur
.get("pdu-type")
1533 or vdur
.get("status") == "ACTIVE"
1534 or ng_ro_status
== "ACTIVE"
1536 ip_address
= vdur
.get("ip-address")
1539 target_vdu_id
= vdur
["vdu-id-ref"]
1540 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1542 "Cannot inject ssh-key because target VM is in error state"
1545 if not target_vdu_id
:
1548 # inject public key into machine
1549 if pub_key
and user
:
1550 self
.logger
.debug(logging_text
+ "Inserting RO key")
1551 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1552 if vdur
.get("pdu-type"):
1553 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1556 ro_vm_id
= "{}-{}".format(
1557 db_vnfr
["member-vnf-index-ref"], target_vdu_id
1558 ) # TODO add vdu_index
1562 "action": "inject_ssh_key",
1566 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1568 desc
= await self
.RO
.deploy(nsr_id
, target
)
1569 action_id
= desc
["action_id"]
1570 await self
._wait
_ng
_ro
(nsr_id
, action_id
, timeout
=600)
1573 # wait until NS is deployed at RO
1575 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1576 ro_nsr_id
= deep_get(
1577 db_nsrs
, ("_admin", "deployed", "RO", "nsr_id")
1581 result_dict
= await self
.RO
.create_action(
1583 item_id_name
=ro_nsr_id
,
1585 "add_public_key": pub_key
,
1590 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1591 if not result_dict
or not isinstance(result_dict
, dict):
1593 "Unknown response from RO when injecting key"
1595 for result
in result_dict
.values():
1596 if result
.get("vim_result") == 200:
1599 raise ROclient
.ROClientException(
1600 "error injecting key: {}".format(
1601 result
.get("description")
1605 except NgRoException
as e
:
1607 "Reaching max tries injecting key. Error: {}".format(e
)
1609 except ROclient
.ROClientException
as e
:
1613 + "error injecting key: {}. Retrying until {} seconds".format(
1620 "Reaching max tries injecting key. Error: {}".format(e
)
1627 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1629 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1631 my_vca
= vca_deployed_list
[vca_index
]
1632 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1633 # vdu or kdu: no dependencies
1637 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1638 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1639 configuration_status_list
= db_nsr
["configurationStatus"]
1640 for index
, vca_deployed
in enumerate(configuration_status_list
):
1641 if index
== vca_index
:
1644 if not my_vca
.get("member-vnf-index") or (
1645 vca_deployed
.get("member-vnf-index")
1646 == my_vca
.get("member-vnf-index")
1648 internal_status
= configuration_status_list
[index
].get("status")
1649 if internal_status
== "READY":
1651 elif internal_status
== "BROKEN":
1653 "Configuration aborted because dependent charm/s has failed"
1658 # no dependencies, return
1660 await asyncio
.sleep(10)
1663 raise LcmException("Configuration aborted because dependent charm/s timeout")
1665 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1668 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1670 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1671 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1674 async def instantiate_N2VC(
1691 ee_config_descriptor
,
1693 nsr_id
= db_nsr
["_id"]
1694 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1695 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1696 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1697 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1699 "collection": "nsrs",
1700 "filter": {"_id": nsr_id
},
1701 "path": db_update_entry
,
1707 element_under_configuration
= nsr_id
1711 vnfr_id
= db_vnfr
["_id"]
1712 osm_config
["osm"]["vnf_id"] = vnfr_id
1714 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1716 if vca_type
== "native_charm":
1719 index_number
= vdu_index
or 0
1722 element_type
= "VNF"
1723 element_under_configuration
= vnfr_id
1724 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1726 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1727 element_type
= "VDU"
1728 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1729 osm_config
["osm"]["vdu_id"] = vdu_id
1731 namespace
+= ".{}".format(kdu_name
)
1732 element_type
= "KDU"
1733 element_under_configuration
= kdu_name
1734 osm_config
["osm"]["kdu_name"] = kdu_name
1737 if base_folder
["pkg-dir"]:
1738 artifact_path
= "{}/{}/{}/{}".format(
1739 base_folder
["folder"],
1740 base_folder
["pkg-dir"],
1742 if vca_type
in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1747 artifact_path
= "{}/Scripts/{}/{}/".format(
1748 base_folder
["folder"],
1750 if vca_type
in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1755 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1757 # get initial_config_primitive_list that applies to this element
1758 initial_config_primitive_list
= config_descriptor
.get(
1759 "initial-config-primitive"
1763 "Initial config primitive list > {}".format(
1764 initial_config_primitive_list
1768 # add config if not present for NS charm
1769 ee_descriptor_id
= ee_config_descriptor
.get("id")
1770 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1771 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1772 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1776 "Initial config primitive list #2 > {}".format(
1777 initial_config_primitive_list
1780 # n2vc_redesign STEP 3.1
1781 # find old ee_id if exists
1782 ee_id
= vca_deployed
.get("ee_id")
1784 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1785 # create or register execution environment in VCA
1786 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1788 self
._write
_configuration
_status
(
1790 vca_index
=vca_index
,
1792 element_under_configuration
=element_under_configuration
,
1793 element_type
=element_type
,
1796 step
= "create execution environment"
1797 self
.logger
.debug(logging_text
+ step
)
1801 if vca_type
== "k8s_proxy_charm":
1802 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1803 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1804 namespace
=namespace
,
1805 artifact_path
=artifact_path
,
1809 elif vca_type
== "helm" or vca_type
== "helm-v3":
1810 ee_id
, credentials
= await self
.vca_map
[
1812 ].create_execution_environment(
1813 namespace
=namespace
,
1817 artifact_path
=artifact_path
,
1821 ee_id
, credentials
= await self
.vca_map
[
1823 ].create_execution_environment(
1824 namespace
=namespace
,
1830 elif vca_type
== "native_charm":
1831 step
= "Waiting to VM being up and getting IP address"
1832 self
.logger
.debug(logging_text
+ step
)
1833 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1842 credentials
= {"hostname": rw_mgmt_ip
}
1844 username
= deep_get(
1845 config_descriptor
, ("config-access", "ssh-access", "default-user")
1847 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1848 # merged. Meanwhile let's get username from initial-config-primitive
1849 if not username
and initial_config_primitive_list
:
1850 for config_primitive
in initial_config_primitive_list
:
1851 for param
in config_primitive
.get("parameter", ()):
1852 if param
["name"] == "ssh-username":
1853 username
= param
["value"]
1857 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1858 "'config-access.ssh-access.default-user'"
1860 credentials
["username"] = username
1861 # n2vc_redesign STEP 3.2
1863 self
._write
_configuration
_status
(
1865 vca_index
=vca_index
,
1866 status
="REGISTERING",
1867 element_under_configuration
=element_under_configuration
,
1868 element_type
=element_type
,
1871 step
= "register execution environment {}".format(credentials
)
1872 self
.logger
.debug(logging_text
+ step
)
1873 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1874 credentials
=credentials
,
1875 namespace
=namespace
,
1880 # for compatibility with MON/POL modules, the need model and application name at database
1881 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1882 ee_id_parts
= ee_id
.split(".")
1883 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1884 if len(ee_id_parts
) >= 2:
1885 model_name
= ee_id_parts
[0]
1886 application_name
= ee_id_parts
[1]
1887 db_nsr_update
[db_update_entry
+ "model"] = model_name
1888 db_nsr_update
[db_update_entry
+ "application"] = application_name
1890 # n2vc_redesign STEP 3.3
1891 step
= "Install configuration Software"
1893 self
._write
_configuration
_status
(
1895 vca_index
=vca_index
,
1896 status
="INSTALLING SW",
1897 element_under_configuration
=element_under_configuration
,
1898 element_type
=element_type
,
1899 other_update
=db_nsr_update
,
1902 # TODO check if already done
1903 self
.logger
.debug(logging_text
+ step
)
1905 if vca_type
== "native_charm":
1906 config_primitive
= next(
1907 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
1910 if config_primitive
:
1911 config
= self
._map
_primitive
_params
(
1912 config_primitive
, {}, deploy_params
1915 if vca_type
== "lxc_proxy_charm":
1916 if element_type
== "NS":
1917 num_units
= db_nsr
.get("config-units") or 1
1918 elif element_type
== "VNF":
1919 num_units
= db_vnfr
.get("config-units") or 1
1920 elif element_type
== "VDU":
1921 for v
in db_vnfr
["vdur"]:
1922 if vdu_id
== v
["vdu-id-ref"]:
1923 num_units
= v
.get("config-units") or 1
1925 if vca_type
!= "k8s_proxy_charm":
1926 await self
.vca_map
[vca_type
].install_configuration_sw(
1928 artifact_path
=artifact_path
,
1931 num_units
=num_units
,
1936 # write in db flag of configuration_sw already installed
1938 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
1941 # add relations for this VCA (wait for other peers related with this VCA)
1942 await self
._add
_vca
_relations
(
1943 logging_text
=logging_text
,
1946 vca_index
=vca_index
,
1949 # if SSH access is required, then get execution environment SSH public
1950 # if native charm we have waited already to VM be UP
1951 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1954 # self.logger.debug("get ssh key block")
1956 config_descriptor
, ("config-access", "ssh-access", "required")
1958 # self.logger.debug("ssh key needed")
1959 # Needed to inject a ssh key
1962 ("config-access", "ssh-access", "default-user"),
1964 step
= "Install configuration Software, getting public ssh key"
1965 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
1966 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
1969 step
= "Insert public key into VM user={} ssh_key={}".format(
1973 # self.logger.debug("no need to get ssh key")
1974 step
= "Waiting to VM being up and getting IP address"
1975 self
.logger
.debug(logging_text
+ step
)
1977 # default rw_mgmt_ip to None, avoiding the non definition of the variable
1980 # n2vc_redesign STEP 5.1
1981 # wait for RO (ip-address) Insert pub_key into VM
1984 rw_mgmt_ip
= await self
.wait_kdu_up(
1985 logging_text
, nsr_id
, vnfr_id
, kdu_name
1988 # This verification is needed in order to avoid trying to add a public key
1989 # to a VM, when the VNF is a KNF (in the edge case where the user creates a VCA
1990 # for a KNF and not for its KDUs, the previous verification gives False, and the code
1991 # jumps to this block, meaning that there is the need to verify if the VNF is actually a VNF
1993 elif db_vnfr
.get('vdur'):
1994 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
2004 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
2006 # store rw_mgmt_ip in deploy params for later replacement
2007 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
2009 # n2vc_redesign STEP 6 Execute initial config primitive
2010 step
= "execute initial config primitive"
2012 # wait for dependent primitives execution (NS -> VNF -> VDU)
2013 if initial_config_primitive_list
:
2014 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
2016 # stage, in function of element type: vdu, kdu, vnf or ns
2017 my_vca
= vca_deployed_list
[vca_index
]
2018 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
2020 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
2021 elif my_vca
.get("member-vnf-index"):
2023 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
2026 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
2028 self
._write
_configuration
_status
(
2029 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
2032 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2034 check_if_terminated_needed
= True
2035 for initial_config_primitive
in initial_config_primitive_list
:
2036 # adding information on the vca_deployed if it is a NS execution environment
2037 if not vca_deployed
["member-vnf-index"]:
2038 deploy_params
["ns_config_info"] = json
.dumps(
2039 self
._get
_ns
_config
_info
(nsr_id
)
2041 # TODO check if already done
2042 primitive_params_
= self
._map
_primitive
_params
(
2043 initial_config_primitive
, {}, deploy_params
2046 step
= "execute primitive '{}' params '{}'".format(
2047 initial_config_primitive
["name"], primitive_params_
2049 self
.logger
.debug(logging_text
+ step
)
2050 await self
.vca_map
[vca_type
].exec_primitive(
2052 primitive_name
=initial_config_primitive
["name"],
2053 params_dict
=primitive_params_
,
2058 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2059 if check_if_terminated_needed
:
2060 if config_descriptor
.get("terminate-config-primitive"):
2062 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
2064 check_if_terminated_needed
= False
2066 # TODO register in database that primitive is done
2068 # STEP 7 Configure metrics
2069 if vca_type
== "helm" or vca_type
== "helm-v3":
2070 prometheus_jobs
= await self
.extract_prometheus_scrape_jobs(
2072 artifact_path
=artifact_path
,
2073 ee_config_descriptor
=ee_config_descriptor
,
2076 target_ip
=rw_mgmt_ip
,
2082 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
2085 for job
in prometheus_jobs
:
2089 "job_name": job
["job_name"]
2093 fail_on_empty
=False,
2096 step
= "instantiated at VCA"
2097 self
.logger
.debug(logging_text
+ step
)
2099 self
._write
_configuration
_status
(
2100 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
2103 except Exception as e
: # TODO not use Exception but N2VC exception
2104 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2106 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
2109 "Exception while {} : {}".format(step
, e
), exc_info
=True
2111 self
._write
_configuration
_status
(
2112 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2114 raise LcmException("{} {}".format(step
, e
)) from e
2116 def _write_ns_status(
2120 current_operation
: str,
2121 current_operation_id
: str,
2122 error_description
: str = None,
2123 error_detail
: str = None,
2124 other_update
: dict = None,
2127 Update db_nsr fields.
2130 :param current_operation:
2131 :param current_operation_id:
2132 :param error_description:
2133 :param error_detail:
2134 :param other_update: Other required changes at database if provided, will be cleared
2138 db_dict
= other_update
or {}
2141 ] = current_operation_id
# for backward compatibility
2142 db_dict
["_admin.current-operation"] = current_operation_id
2143 db_dict
["_admin.operation-type"] = (
2144 current_operation
if current_operation
!= "IDLE" else None
2146 db_dict
["currentOperation"] = current_operation
2147 db_dict
["currentOperationID"] = current_operation_id
2148 db_dict
["errorDescription"] = error_description
2149 db_dict
["errorDetail"] = error_detail
2152 db_dict
["nsState"] = ns_state
2153 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2154 except DbException
as e
:
2155 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2157 def _write_op_status(
2161 error_message
: str = None,
2162 queuePosition
: int = 0,
2163 operation_state
: str = None,
2164 other_update
: dict = None,
2167 db_dict
= other_update
or {}
2168 db_dict
["queuePosition"] = queuePosition
2169 if isinstance(stage
, list):
2170 db_dict
["stage"] = stage
[0]
2171 db_dict
["detailed-status"] = " ".join(stage
)
2172 elif stage
is not None:
2173 db_dict
["stage"] = str(stage
)
2175 if error_message
is not None:
2176 db_dict
["errorMessage"] = error_message
2177 if operation_state
is not None:
2178 db_dict
["operationState"] = operation_state
2179 db_dict
["statusEnteredTime"] = time()
2180 self
.update_db_2("nslcmops", op_id
, db_dict
)
2181 except DbException
as e
:
2183 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2186 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2188 nsr_id
= db_nsr
["_id"]
2189 # configurationStatus
2190 config_status
= db_nsr
.get("configurationStatus")
2193 "configurationStatus.{}.status".format(index
): status
2194 for index
, v
in enumerate(config_status
)
2198 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2200 except DbException
as e
:
2202 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2205 def _write_configuration_status(
2210 element_under_configuration
: str = None,
2211 element_type
: str = None,
2212 other_update
: dict = None,
2215 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2216 # .format(vca_index, status))
2219 db_path
= "configurationStatus.{}.".format(vca_index
)
2220 db_dict
= other_update
or {}
2222 db_dict
[db_path
+ "status"] = status
2223 if element_under_configuration
:
2225 db_path
+ "elementUnderConfiguration"
2226 ] = element_under_configuration
2228 db_dict
[db_path
+ "elementType"] = element_type
2229 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2230 except DbException
as e
:
2232 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2233 status
, nsr_id
, vca_index
, e
2237 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2239 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2240 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2241 Database is used because the result can be obtained from a different LCM worker in case of HA.
2242 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2243 :param db_nslcmop: database content of nslcmop
2244 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2245 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2246 computed 'vim-account-id'
2249 nslcmop_id
= db_nslcmop
["_id"]
2250 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2251 if placement_engine
== "PLA":
2253 logging_text
+ "Invoke and wait for placement optimization"
2255 await self
.msg
.aiowrite(
2256 "pla", "get_placement", {"nslcmopId": nslcmop_id
}, loop
=self
.loop
2258 db_poll_interval
= 5
2259 wait
= db_poll_interval
* 10
2261 while not pla_result
and wait
>= 0:
2262 await asyncio
.sleep(db_poll_interval
)
2263 wait
-= db_poll_interval
2264 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2265 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2269 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2272 for pla_vnf
in pla_result
["vnf"]:
2273 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2274 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2279 {"_id": vnfr
["_id"]},
2280 {"vim-account-id": pla_vnf
["vimAccountId"]},
2283 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2286 def update_nsrs_with_pla_result(self
, params
):
2288 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2290 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2292 except Exception as e
:
2293 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2295 async def instantiate(self
, nsr_id
, nslcmop_id
):
2298 :param nsr_id: ns instance to deploy
2299 :param nslcmop_id: operation to run
2303 # Try to lock HA task here
2304 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2305 if not task_is_locked_by_me
:
2307 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2311 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2312 self
.logger
.debug(logging_text
+ "Enter")
2314 # get all needed from database
2316 # database nsrs record
2319 # database nslcmops record
2322 # update operation on nsrs
2324 # update operation on nslcmops
2325 db_nslcmop_update
= {}
2327 nslcmop_operation_state
= None
2328 db_vnfrs
= {} # vnf's info indexed by member-index
2330 tasks_dict_info
= {} # from task to info text
2334 "Stage 1/5: preparation of the environment.",
2335 "Waiting for previous operations to terminate.",
2338 # ^ stage, step, VIM progress
2340 # wait for any previous tasks in process
2341 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2343 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2344 stage
[1] = "Reading from database."
2345 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2346 db_nsr_update
["detailed-status"] = "creating"
2347 db_nsr_update
["operational-status"] = "init"
2348 self
._write
_ns
_status
(
2350 ns_state
="BUILDING",
2351 current_operation
="INSTANTIATING",
2352 current_operation_id
=nslcmop_id
,
2353 other_update
=db_nsr_update
,
2355 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2357 # read from db: operation
2358 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2359 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2360 if db_nslcmop
["operationParams"].get("additionalParamsForVnf"):
2361 db_nslcmop
["operationParams"]["additionalParamsForVnf"] = json
.loads(
2362 db_nslcmop
["operationParams"]["additionalParamsForVnf"]
2364 ns_params
= db_nslcmop
.get("operationParams")
2365 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2366 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2368 timeout_ns_deploy
= self
.timeout
.get(
2369 "ns_deploy", self
.timeout_ns_deploy
2373 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2374 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2375 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2376 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2377 self
.fs
.sync(db_nsr
["nsd-id"])
2379 # nsr_name = db_nsr["name"] # TODO short-name??
2381 # read from db: vnf's of this ns
2382 stage
[1] = "Getting vnfrs from db."
2383 self
.logger
.debug(logging_text
+ stage
[1])
2384 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2386 # read from db: vnfd's for every vnf
2387 db_vnfds
= [] # every vnfd data
2389 # for each vnf in ns, read vnfd
2390 for vnfr
in db_vnfrs_list
:
2391 if vnfr
.get("kdur"):
2393 for kdur
in vnfr
["kdur"]:
2394 if kdur
.get("additionalParams"):
2395 kdur
["additionalParams"] = json
.loads(
2396 kdur
["additionalParams"]
2398 kdur_list
.append(kdur
)
2399 vnfr
["kdur"] = kdur_list
2401 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2402 vnfd_id
= vnfr
["vnfd-id"]
2403 vnfd_ref
= vnfr
["vnfd-ref"]
2404 self
.fs
.sync(vnfd_id
)
2406 # if we haven't this vnfd, read it from db
2407 if vnfd_id
not in db_vnfds
:
2409 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2412 self
.logger
.debug(logging_text
+ stage
[1])
2413 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2416 db_vnfds
.append(vnfd
)
2418 # Get or generates the _admin.deployed.VCA list
2419 vca_deployed_list
= None
2420 if db_nsr
["_admin"].get("deployed"):
2421 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2422 if vca_deployed_list
is None:
2423 vca_deployed_list
= []
2424 configuration_status_list
= []
2425 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2426 db_nsr_update
["configurationStatus"] = configuration_status_list
2427 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2428 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2429 elif isinstance(vca_deployed_list
, dict):
2430 # maintain backward compatibility. Change a dict to list at database
2431 vca_deployed_list
= list(vca_deployed_list
.values())
2432 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2433 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2436 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2438 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2439 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2441 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2442 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2443 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2445 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2448 # n2vc_redesign STEP 2 Deploy Network Scenario
2449 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2450 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2452 stage
[1] = "Deploying KDUs."
2453 # self.logger.debug(logging_text + "Before deploy_kdus")
2454 # Call to deploy_kdus in case exists the "vdu:kdu" param
2455 await self
.deploy_kdus(
2456 logging_text
=logging_text
,
2458 nslcmop_id
=nslcmop_id
,
2461 task_instantiation_info
=tasks_dict_info
,
2464 stage
[1] = "Getting VCA public key."
2465 # n2vc_redesign STEP 1 Get VCA public ssh-key
2466 # feature 1429. Add n2vc public key to needed VMs
2467 n2vc_key
= self
.n2vc
.get_public_key()
2468 n2vc_key_list
= [n2vc_key
]
2469 if self
.vca_config
.get("public_key"):
2470 n2vc_key_list
.append(self
.vca_config
["public_key"])
2472 stage
[1] = "Deploying NS at VIM."
2473 task_ro
= asyncio
.ensure_future(
2474 self
.instantiate_RO(
2475 logging_text
=logging_text
,
2479 db_nslcmop
=db_nslcmop
,
2482 n2vc_key_list
=n2vc_key_list
,
2486 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2487 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2489 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2490 stage
[1] = "Deploying Execution Environments."
2491 self
.logger
.debug(logging_text
+ stage
[1])
2493 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2494 for vnf_profile
in get_vnf_profiles(nsd
):
2495 vnfd_id
= vnf_profile
["vnfd-id"]
2496 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2497 member_vnf_index
= str(vnf_profile
["id"])
2498 db_vnfr
= db_vnfrs
[member_vnf_index
]
2499 base_folder
= vnfd
["_admin"]["storage"]
2505 # Get additional parameters
2506 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2507 if db_vnfr
.get("additionalParamsForVnf"):
2508 deploy_params
.update(
2509 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2512 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2513 if descriptor_config
:
2515 logging_text
=logging_text
2516 + "member_vnf_index={} ".format(member_vnf_index
),
2519 nslcmop_id
=nslcmop_id
,
2525 member_vnf_index
=member_vnf_index
,
2526 vdu_index
=vdu_index
,
2528 deploy_params
=deploy_params
,
2529 descriptor_config
=descriptor_config
,
2530 base_folder
=base_folder
,
2531 task_instantiation_info
=tasks_dict_info
,
2535 # Deploy charms for each VDU that supports one.
2536 for vdud
in get_vdu_list(vnfd
):
2538 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2539 vdur
= find_in_list(
2540 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2543 if vdur
.get("additionalParams"):
2544 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2546 deploy_params_vdu
= deploy_params
2547 deploy_params_vdu
["OSM"] = get_osm_params(
2548 db_vnfr
, vdu_id
, vdu_count_index
=0
2550 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2552 self
.logger
.debug("VDUD > {}".format(vdud
))
2554 "Descriptor config > {}".format(descriptor_config
)
2556 if descriptor_config
:
2559 for vdu_index
in range(vdud_count
):
2560 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2562 logging_text
=logging_text
2563 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2564 member_vnf_index
, vdu_id
, vdu_index
2568 nslcmop_id
=nslcmop_id
,
2574 member_vnf_index
=member_vnf_index
,
2575 vdu_index
=vdu_index
,
2577 deploy_params
=deploy_params_vdu
,
2578 descriptor_config
=descriptor_config
,
2579 base_folder
=base_folder
,
2580 task_instantiation_info
=tasks_dict_info
,
2583 for kdud
in get_kdu_list(vnfd
):
2584 kdu_name
= kdud
["name"]
2585 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2586 if descriptor_config
:
2591 x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
2593 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2594 if kdur
.get("additionalParams"):
2595 deploy_params_kdu
.update(
2596 parse_yaml_strings(kdur
["additionalParams"].copy())
2600 logging_text
=logging_text
,
2603 nslcmop_id
=nslcmop_id
,
2609 member_vnf_index
=member_vnf_index
,
2610 vdu_index
=vdu_index
,
2612 deploy_params
=deploy_params_kdu
,
2613 descriptor_config
=descriptor_config
,
2614 base_folder
=base_folder
,
2615 task_instantiation_info
=tasks_dict_info
,
2619 # Check if this NS has a charm configuration
2620 descriptor_config
= nsd
.get("ns-configuration")
2621 if descriptor_config
and descriptor_config
.get("juju"):
2624 member_vnf_index
= None
2630 # Get additional parameters
2631 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2632 if db_nsr
.get("additionalParamsForNs"):
2633 deploy_params
.update(
2634 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2636 base_folder
= nsd
["_admin"]["storage"]
2638 logging_text
=logging_text
,
2641 nslcmop_id
=nslcmop_id
,
2647 member_vnf_index
=member_vnf_index
,
2648 vdu_index
=vdu_index
,
2650 deploy_params
=deploy_params
,
2651 descriptor_config
=descriptor_config
,
2652 base_folder
=base_folder
,
2653 task_instantiation_info
=tasks_dict_info
,
2657 # rest of staff will be done at finally
2660 ROclient
.ROClientException
,
2666 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
2669 except asyncio
.CancelledError
:
2671 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
2673 exc
= "Operation was cancelled"
2674 except Exception as e
:
2675 exc
= traceback
.format_exc()
2676 self
.logger
.critical(
2677 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
2682 error_list
.append(str(exc
))
2684 # wait for pending tasks
2686 stage
[1] = "Waiting for instantiate pending tasks."
2687 self
.logger
.debug(logging_text
+ stage
[1])
2688 error_list
+= await self
._wait
_for
_tasks
(
2696 stage
[1] = stage
[2] = ""
2697 except asyncio
.CancelledError
:
2698 error_list
.append("Cancelled")
2699 # TODO cancel all tasks
2700 except Exception as exc
:
2701 error_list
.append(str(exc
))
2703 # update operation-status
2704 db_nsr_update
["operational-status"] = "running"
2705 # let's begin with VCA 'configured' status (later we can change it)
2706 db_nsr_update
["config-status"] = "configured"
2707 for task
, task_name
in tasks_dict_info
.items():
2708 if not task
.done() or task
.cancelled() or task
.exception():
2709 if task_name
.startswith(self
.task_name_deploy_vca
):
2710 # A N2VC task is pending
2711 db_nsr_update
["config-status"] = "failed"
2713 # RO or KDU task is pending
2714 db_nsr_update
["operational-status"] = "failed"
2716 # update status at database
2718 error_detail
= ". ".join(error_list
)
2719 self
.logger
.error(logging_text
+ error_detail
)
2720 error_description_nslcmop
= "{} Detail: {}".format(
2721 stage
[0], error_detail
2723 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
2724 nslcmop_id
, stage
[0]
2727 db_nsr_update
["detailed-status"] = (
2728 error_description_nsr
+ " Detail: " + error_detail
2730 db_nslcmop_update
["detailed-status"] = error_detail
2731 nslcmop_operation_state
= "FAILED"
2735 error_description_nsr
= error_description_nslcmop
= None
2737 db_nsr_update
["detailed-status"] = "Done"
2738 db_nslcmop_update
["detailed-status"] = "Done"
2739 nslcmop_operation_state
= "COMPLETED"
2742 self
._write
_ns
_status
(
2745 current_operation
="IDLE",
2746 current_operation_id
=None,
2747 error_description
=error_description_nsr
,
2748 error_detail
=error_detail
,
2749 other_update
=db_nsr_update
,
2751 self
._write
_op
_status
(
2754 error_message
=error_description_nslcmop
,
2755 operation_state
=nslcmop_operation_state
,
2756 other_update
=db_nslcmop_update
,
2759 if nslcmop_operation_state
:
2761 await self
.msg
.aiowrite(
2766 "nslcmop_id": nslcmop_id
,
2767 "operationState": nslcmop_operation_state
,
2771 except Exception as e
:
2773 logging_text
+ "kafka_write notification Exception {}".format(e
)
2776 self
.logger
.debug(logging_text
+ "Exit")
2777 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2779 def _get_vnfd(self
, vnfd_id
: str, cached_vnfds
: Dict
[str, Any
]):
2780 if vnfd_id
not in cached_vnfds
:
2781 cached_vnfds
[vnfd_id
] = self
.db
.get_one("vnfds", {"id": vnfd_id
})
2782 return cached_vnfds
[vnfd_id
]
2784 def _get_vnfr(self
, nsr_id
: str, vnf_profile_id
: str, cached_vnfrs
: Dict
[str, Any
]):
2785 if vnf_profile_id
not in cached_vnfrs
:
2786 cached_vnfrs
[vnf_profile_id
] = self
.db
.get_one(
2789 "member-vnf-index-ref": vnf_profile_id
,
2790 "nsr-id-ref": nsr_id
,
2793 return cached_vnfrs
[vnf_profile_id
]
2795 def _is_deployed_vca_in_relation(
2796 self
, vca
: DeployedVCA
, relation
: Relation
2799 for endpoint
in (relation
.provider
, relation
.requirer
):
2800 if endpoint
["kdu-resource-profile-id"]:
2803 vca
.vnf_profile_id
== endpoint
.vnf_profile_id
2804 and vca
.vdu_profile_id
== endpoint
.vdu_profile_id
2805 and vca
.execution_environment_ref
== endpoint
.execution_environment_ref
2811 def _update_ee_relation_data_with_implicit_data(
2812 self
, nsr_id
, nsd
, ee_relation_data
, cached_vnfds
, vnf_profile_id
: str = None
2814 ee_relation_data
= safe_get_ee_relation(
2815 nsr_id
, ee_relation_data
, vnf_profile_id
=vnf_profile_id
2817 ee_relation_level
= EELevel
.get_level(ee_relation_data
)
2818 if (ee_relation_level
in (EELevel
.VNF
, EELevel
.VDU
)) and not ee_relation_data
[
2819 "execution-environment-ref"
2821 vnf_profile
= get_vnf_profile(nsd
, ee_relation_data
["vnf-profile-id"])
2822 vnfd_id
= vnf_profile
["vnfd-id"]
2823 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2826 if ee_relation_level
== EELevel
.VNF
2827 else ee_relation_data
["vdu-profile-id"]
2829 ee
= get_juju_ee_ref(db_vnfd
, entity_id
)
2832 f
"not execution environments found for ee_relation {ee_relation_data}"
2834 ee_relation_data
["execution-environment-ref"] = ee
["id"]
2835 return ee_relation_data
2837 def _get_ns_relations(
2840 nsd
: Dict
[str, Any
],
2842 cached_vnfds
: Dict
[str, Any
],
2843 ) -> List
[Relation
]:
2845 db_ns_relations
= get_ns_configuration_relation_list(nsd
)
2846 for r
in db_ns_relations
:
2847 provider_dict
= None
2848 requirer_dict
= None
2849 if all(key
in r
for key
in ("provider", "requirer")):
2850 provider_dict
= r
["provider"]
2851 requirer_dict
= r
["requirer"]
2852 elif "entities" in r
:
2853 provider_id
= r
["entities"][0]["id"]
2856 "endpoint": r
["entities"][0]["endpoint"],
2858 if provider_id
!= nsd
["id"]:
2859 provider_dict
["vnf-profile-id"] = provider_id
2860 requirer_id
= r
["entities"][1]["id"]
2863 "endpoint": r
["entities"][1]["endpoint"],
2865 if requirer_id
!= nsd
["id"]:
2866 requirer_dict
["vnf-profile-id"] = requirer_id
2868 raise Exception("provider/requirer or entities must be included in the relation.")
2869 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2870 nsr_id
, nsd
, provider_dict
, cached_vnfds
2872 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2873 nsr_id
, nsd
, requirer_dict
, cached_vnfds
2875 provider
= EERelation(relation_provider
)
2876 requirer
= EERelation(relation_requirer
)
2877 relation
= Relation(r
["name"], provider
, requirer
)
2878 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
2880 relations
.append(relation
)
2883 def _get_vnf_relations(
2886 nsd
: Dict
[str, Any
],
2888 cached_vnfds
: Dict
[str, Any
],
2889 ) -> List
[Relation
]:
2891 vnf_profile
= get_vnf_profile(nsd
, vca
.vnf_profile_id
)
2892 vnf_profile_id
= vnf_profile
["id"]
2893 vnfd_id
= vnf_profile
["vnfd-id"]
2894 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2895 db_vnf_relations
= get_relation_list(db_vnfd
, vnfd_id
)
2896 for r
in db_vnf_relations
:
2897 provider_dict
= None
2898 requirer_dict
= None
2899 if all(key
in r
for key
in ("provider", "requirer")):
2900 provider_dict
= r
["provider"]
2901 requirer_dict
= r
["requirer"]
2902 elif "entities" in r
:
2903 provider_id
= r
["entities"][0]["id"]
2906 "vnf-profile-id": vnf_profile_id
,
2907 "endpoint": r
["entities"][0]["endpoint"],
2909 if provider_id
!= vnfd_id
:
2910 provider_dict
["vdu-profile-id"] = provider_id
2911 requirer_id
= r
["entities"][1]["id"]
2914 "vnf-profile-id": vnf_profile_id
,
2915 "endpoint": r
["entities"][1]["endpoint"],
2917 if requirer_id
!= vnfd_id
:
2918 requirer_dict
["vdu-profile-id"] = requirer_id
2920 raise Exception("provider/requirer or entities must be included in the relation.")
2921 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2922 nsr_id
, nsd
, provider_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
2924 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2925 nsr_id
, nsd
, requirer_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
2927 provider
= EERelation(relation_provider
)
2928 requirer
= EERelation(relation_requirer
)
2929 relation
= Relation(r
["name"], provider
, requirer
)
2930 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
2932 relations
.append(relation
)
2935 def _get_kdu_resource_data(
2937 ee_relation
: EERelation
,
2938 db_nsr
: Dict
[str, Any
],
2939 cached_vnfds
: Dict
[str, Any
],
2940 ) -> DeployedK8sResource
:
2941 nsd
= get_nsd(db_nsr
)
2942 vnf_profiles
= get_vnf_profiles(nsd
)
2943 vnfd_id
= find_in_list(
2945 lambda vnf_profile
: vnf_profile
["id"] == ee_relation
.vnf_profile_id
,
2947 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2948 kdu_resource_profile
= get_kdu_resource_profile(
2949 db_vnfd
, ee_relation
.kdu_resource_profile_id
2951 kdu_name
= kdu_resource_profile
["kdu-name"]
2952 deployed_kdu
, _
= get_deployed_kdu(
2953 db_nsr
.get("_admin", ()).get("deployed", ()),
2955 ee_relation
.vnf_profile_id
,
2957 deployed_kdu
.update({"resource-name": kdu_resource_profile
["resource-name"]})
2960 def _get_deployed_component(
2962 ee_relation
: EERelation
,
2963 db_nsr
: Dict
[str, Any
],
2964 cached_vnfds
: Dict
[str, Any
],
2965 ) -> DeployedComponent
:
2966 nsr_id
= db_nsr
["_id"]
2967 deployed_component
= None
2968 ee_level
= EELevel
.get_level(ee_relation
)
2969 if ee_level
== EELevel
.NS
:
2970 vca
= get_deployed_vca(db_nsr
, {"vdu_id": None, "member-vnf-index": None})
2972 deployed_component
= DeployedVCA(nsr_id
, vca
)
2973 elif ee_level
== EELevel
.VNF
:
2974 vca
= get_deployed_vca(
2978 "member-vnf-index": ee_relation
.vnf_profile_id
,
2979 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
2983 deployed_component
= DeployedVCA(nsr_id
, vca
)
2984 elif ee_level
== EELevel
.VDU
:
2985 vca
= get_deployed_vca(
2988 "vdu_id": ee_relation
.vdu_profile_id
,
2989 "member-vnf-index": ee_relation
.vnf_profile_id
,
2990 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
2994 deployed_component
= DeployedVCA(nsr_id
, vca
)
2995 elif ee_level
== EELevel
.KDU
:
2996 kdu_resource_data
= self
._get
_kdu
_resource
_data
(
2997 ee_relation
, db_nsr
, cached_vnfds
2999 if kdu_resource_data
:
3000 deployed_component
= DeployedK8sResource(kdu_resource_data
)
3001 return deployed_component
3003 async def _add_relation(
3007 db_nsr
: Dict
[str, Any
],
3008 cached_vnfds
: Dict
[str, Any
],
3009 cached_vnfrs
: Dict
[str, Any
],
3011 deployed_provider
= self
._get
_deployed
_component
(
3012 relation
.provider
, db_nsr
, cached_vnfds
3014 deployed_requirer
= self
._get
_deployed
_component
(
3015 relation
.requirer
, db_nsr
, cached_vnfds
3019 and deployed_requirer
3020 and deployed_provider
.config_sw_installed
3021 and deployed_requirer
.config_sw_installed
3023 provider_db_vnfr
= (
3025 relation
.provider
.nsr_id
,
3026 relation
.provider
.vnf_profile_id
,
3029 if relation
.provider
.vnf_profile_id
3032 requirer_db_vnfr
= (
3034 relation
.requirer
.nsr_id
,
3035 relation
.requirer
.vnf_profile_id
,
3038 if relation
.requirer
.vnf_profile_id
3041 provider_vca_id
= self
.get_vca_id(provider_db_vnfr
, db_nsr
)
3042 requirer_vca_id
= self
.get_vca_id(requirer_db_vnfr
, db_nsr
)
3043 provider_relation_endpoint
= RelationEndpoint(
3044 deployed_provider
.ee_id
,
3046 relation
.provider
.endpoint
,
3048 requirer_relation_endpoint
= RelationEndpoint(
3049 deployed_requirer
.ee_id
,
3051 relation
.requirer
.endpoint
,
3053 await self
.vca_map
[vca_type
].add_relation(
3054 provider
=provider_relation_endpoint
,
3055 requirer
=requirer_relation_endpoint
,
3057 # remove entry from relations list
3061 async def _add_vca_relations(
3067 timeout
: int = 3600,
3071 # 1. find all relations for this VCA
3072 # 2. wait for other peers related
3076 # STEP 1: find all relations for this VCA
3079 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3080 nsd
= get_nsd(db_nsr
)
3083 deployed_vca_dict
= get_deployed_vca_list(db_nsr
)[vca_index
]
3084 my_vca
= DeployedVCA(nsr_id
, deployed_vca_dict
)
3089 relations
.extend(self
._get
_ns
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3090 relations
.extend(self
._get
_vnf
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3092 # if no relations, terminate
3094 self
.logger
.debug(logging_text
+ " No relations")
3097 self
.logger
.debug(logging_text
+ " adding relations {}".format(relations
))
3104 if now
- start
>= timeout
:
3105 self
.logger
.error(logging_text
+ " : timeout adding relations")
3108 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3109 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3111 # for each relation, find the VCA's related
3112 for relation
in relations
.copy():
3113 added
= await self
._add
_relation
(
3121 relations
.remove(relation
)
3124 self
.logger
.debug("Relations added")
3126 await asyncio
.sleep(5.0)
3130 except Exception as e
:
3131 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
3134 async def _install_kdu(
3142 k8s_instance_info
: dict,
3143 k8params
: dict = None,
3149 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
3152 "collection": "nsrs",
3153 "filter": {"_id": nsr_id
},
3154 "path": nsr_db_path
,
3157 if k8s_instance_info
.get("kdu-deployment-name"):
3158 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
3160 kdu_instance
= self
.k8scluster_map
[
3162 ].generate_kdu_instance_name(
3163 db_dict
=db_dict_install
,
3164 kdu_model
=k8s_instance_info
["kdu-model"],
3165 kdu_name
=k8s_instance_info
["kdu-name"],
3168 # Update the nsrs table with the kdu-instance value
3172 _desc
={nsr_db_path
+ ".kdu-instance": kdu_instance
},
3175 # Update the nsrs table with the actual namespace being used, if the k8scluster-type is `juju` or
3176 # `juju-bundle`. This verification is needed because there is not a standard/homogeneous namespace
3177 # between the Helm Charts and Juju Bundles-based KNFs. If we found a way of having an homogeneous
3178 # namespace, this first verification could be removed, and the next step would be done for any kind
3180 # TODO -> find a way to have an homogeneous namespace between the Helm Charts and Juju Bundles-based
3181 # KNFs (Bug 2027: https://osm.etsi.org/bugzilla/show_bug.cgi?id=2027)
3182 if k8sclustertype
in ("juju", "juju-bundle"):
3183 # First, verify if the current namespace is present in the `_admin.projects_read` (if not, it means
3184 # that the user passed a namespace which he wants its KDU to be deployed in)
3190 "_admin.projects_write": k8s_instance_info
["namespace"],
3191 "_admin.projects_read": k8s_instance_info
["namespace"],
3197 f
"Updating namespace/model for Juju Bundle from {k8s_instance_info['namespace']} to {kdu_instance}"
3202 _desc
={f
"{nsr_db_path}.namespace": kdu_instance
},
3204 k8s_instance_info
["namespace"] = kdu_instance
3206 await self
.k8scluster_map
[k8sclustertype
].install(
3207 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3208 kdu_model
=k8s_instance_info
["kdu-model"],
3211 db_dict
=db_dict_install
,
3213 kdu_name
=k8s_instance_info
["kdu-name"],
3214 namespace
=k8s_instance_info
["namespace"],
3215 kdu_instance
=kdu_instance
,
3219 # Obtain services to obtain management service ip
3220 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
3221 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3222 kdu_instance
=kdu_instance
,
3223 namespace
=k8s_instance_info
["namespace"],
3226 # Obtain management service info (if exists)
3227 vnfr_update_dict
= {}
3228 kdu_config
= get_configuration(vnfd
, kdud
["name"])
3230 target_ee_list
= kdu_config
.get("execution-environment-list", [])
3235 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
3238 for service
in kdud
.get("service", [])
3239 if service
.get("mgmt-service")
3241 for mgmt_service
in mgmt_services
:
3242 for service
in services
:
3243 if service
["name"].startswith(mgmt_service
["name"]):
3244 # Mgmt service found, Obtain service ip
3245 ip
= service
.get("external_ip", service
.get("cluster_ip"))
3246 if isinstance(ip
, list) and len(ip
) == 1:
3250 "kdur.{}.ip-address".format(kdu_index
)
3253 # Check if must update also mgmt ip at the vnf
3254 service_external_cp
= mgmt_service
.get(
3255 "external-connection-point-ref"
3257 if service_external_cp
:
3259 deep_get(vnfd
, ("mgmt-interface", "cp"))
3260 == service_external_cp
3262 vnfr_update_dict
["ip-address"] = ip
3267 "external-connection-point-ref", ""
3269 == service_external_cp
,
3272 "kdur.{}.ip-address".format(kdu_index
)
3277 "Mgmt service name: {} not found".format(
3278 mgmt_service
["name"]
3282 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3283 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3285 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3288 and kdu_config
.get("initial-config-primitive")
3289 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3291 initial_config_primitive_list
= kdu_config
.get(
3292 "initial-config-primitive"
3294 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3296 for initial_config_primitive
in initial_config_primitive_list
:
3297 primitive_params_
= self
._map
_primitive
_params
(
3298 initial_config_primitive
, {}, {}
3301 await asyncio
.wait_for(
3302 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3303 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3304 kdu_instance
=kdu_instance
,
3305 primitive_name
=initial_config_primitive
["name"],
3306 params
=primitive_params_
,
3307 db_dict
=db_dict_install
,
3313 except Exception as e
:
3314 # Prepare update db with error and raise exception
3317 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3321 vnfr_data
.get("_id"),
3322 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3325 # ignore to keep original exception
3327 # reraise original error
3332 async def deploy_kdus(
3339 task_instantiation_info
,
3341 # Launch kdus if present in the descriptor
3343 k8scluster_id_2_uuic
= {
3344 "helm-chart-v3": {},
3349 async def _get_cluster_id(cluster_id
, cluster_type
):
3350 nonlocal k8scluster_id_2_uuic
3351 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3352 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3354 # check if K8scluster is creating and wait look if previous tasks in process
3355 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3356 "k8scluster", cluster_id
3359 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3360 task_name
, cluster_id
3362 self
.logger
.debug(logging_text
+ text
)
3363 await asyncio
.wait(task_dependency
, timeout
=3600)
3365 db_k8scluster
= self
.db
.get_one(
3366 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3368 if not db_k8scluster
:
3369 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3371 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3373 if cluster_type
== "helm-chart-v3":
3375 # backward compatibility for existing clusters that have not been initialized for helm v3
3376 k8s_credentials
= yaml
.safe_dump(
3377 db_k8scluster
.get("credentials")
3379 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3380 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3382 db_k8scluster_update
= {}
3383 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3384 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3385 db_k8scluster_update
[
3386 "_admin.helm-chart-v3.created"
3388 db_k8scluster_update
[
3389 "_admin.helm-chart-v3.operationalState"
3392 "k8sclusters", cluster_id
, db_k8scluster_update
3394 except Exception as e
:
3397 + "error initializing helm-v3 cluster: {}".format(str(e
))
3400 "K8s cluster '{}' has not been initialized for '{}'".format(
3401 cluster_id
, cluster_type
3406 "K8s cluster '{}' has not been initialized for '{}'".format(
3407 cluster_id
, cluster_type
3410 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3413 logging_text
+= "Deploy kdus: "
3416 db_nsr_update
= {"_admin.deployed.K8s": []}
3417 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3420 updated_cluster_list
= []
3421 updated_v3_cluster_list
= []
3423 for vnfr_data
in db_vnfrs
.values():
3424 vca_id
= self
.get_vca_id(vnfr_data
, {})
3425 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3426 # Step 0: Prepare and set parameters
3427 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3428 vnfd_id
= vnfr_data
.get("vnfd-id")
3429 vnfd_with_id
= find_in_list(
3430 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3434 for kdud
in vnfd_with_id
["kdu"]
3435 if kdud
["name"] == kdur
["kdu-name"]
3437 namespace
= kdur
.get("k8s-namespace")
3438 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3439 if kdur
.get("helm-chart"):
3440 kdumodel
= kdur
["helm-chart"]
3441 # Default version: helm3, if helm-version is v2 assign v2
3442 k8sclustertype
= "helm-chart-v3"
3443 self
.logger
.debug("kdur: {}".format(kdur
))
3445 kdur
.get("helm-version")
3446 and kdur
.get("helm-version") == "v2"
3448 k8sclustertype
= "helm-chart"
3449 elif kdur
.get("juju-bundle"):
3450 kdumodel
= kdur
["juju-bundle"]
3451 k8sclustertype
= "juju-bundle"
3454 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3455 "juju-bundle. Maybe an old NBI version is running".format(
3456 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3459 # check if kdumodel is a file and exists
3461 vnfd_with_id
= find_in_list(
3462 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3464 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3465 if storage
: # may be not present if vnfd has not artifacts
3466 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3467 if storage
["pkg-dir"]:
3468 filename
= "{}/{}/{}s/{}".format(
3475 filename
= "{}/Scripts/{}s/{}".format(
3480 if self
.fs
.file_exists(
3481 filename
, mode
="file"
3482 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3483 kdumodel
= self
.fs
.path
+ filename
3484 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3486 except Exception: # it is not a file
3489 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3490 step
= "Synchronize repos for k8s cluster '{}'".format(
3493 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3497 k8sclustertype
== "helm-chart"
3498 and cluster_uuid
not in updated_cluster_list
3500 k8sclustertype
== "helm-chart-v3"
3501 and cluster_uuid
not in updated_v3_cluster_list
3503 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3504 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3505 cluster_uuid
=cluster_uuid
3508 if del_repo_list
or added_repo_dict
:
3509 if k8sclustertype
== "helm-chart":
3511 "_admin.helm_charts_added." + item
: None
3512 for item
in del_repo_list
3515 "_admin.helm_charts_added." + item
: name
3516 for item
, name
in added_repo_dict
.items()
3518 updated_cluster_list
.append(cluster_uuid
)
3519 elif k8sclustertype
== "helm-chart-v3":
3521 "_admin.helm_charts_v3_added." + item
: None
3522 for item
in del_repo_list
3525 "_admin.helm_charts_v3_added." + item
: name
3526 for item
, name
in added_repo_dict
.items()
3528 updated_v3_cluster_list
.append(cluster_uuid
)
3530 logging_text
+ "repos synchronized on k8s cluster "
3531 "'{}' to_delete: {}, to_add: {}".format(
3532 k8s_cluster_id
, del_repo_list
, added_repo_dict
3537 {"_id": k8s_cluster_id
},
3543 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3544 vnfr_data
["member-vnf-index-ref"],
3548 k8s_instance_info
= {
3549 "kdu-instance": None,
3550 "k8scluster-uuid": cluster_uuid
,
3551 "k8scluster-type": k8sclustertype
,
3552 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3553 "kdu-name": kdur
["kdu-name"],
3554 "kdu-model": kdumodel
,
3555 "namespace": namespace
,
3556 "kdu-deployment-name": kdu_deployment_name
,
3558 db_path
= "_admin.deployed.K8s.{}".format(index
)
3559 db_nsr_update
[db_path
] = k8s_instance_info
3560 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3561 vnfd_with_id
= find_in_list(
3562 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3564 task
= asyncio
.ensure_future(
3573 k8params
=desc_params
,
3578 self
.lcm_tasks
.register(
3582 "instantiate_KDU-{}".format(index
),
3585 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3591 except (LcmException
, asyncio
.CancelledError
):
3593 except Exception as e
:
3594 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3595 if isinstance(e
, (N2VCException
, DbException
)):
3596 self
.logger
.error(logging_text
+ msg
)
3598 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3599 raise LcmException(msg
)
3602 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3621 task_instantiation_info
,
3624 # launch instantiate_N2VC in a asyncio task and register task object
3625 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3626 # if not found, create one entry and update database
3627 # fill db_nsr._admin.deployed.VCA.<index>
3630 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
3632 if "execution-environment-list" in descriptor_config
:
3633 ee_list
= descriptor_config
.get("execution-environment-list", [])
3634 elif "juju" in descriptor_config
:
3635 ee_list
= [descriptor_config
] # ns charms
3636 else: # other types as script are not supported
3639 for ee_item
in ee_list
:
3642 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3643 ee_item
.get("juju"), ee_item
.get("helm-chart")
3646 ee_descriptor_id
= ee_item
.get("id")
3647 if ee_item
.get("juju"):
3648 vca_name
= ee_item
["juju"].get("charm")
3651 if ee_item
["juju"].get("charm") is not None
3654 if ee_item
["juju"].get("cloud") == "k8s":
3655 vca_type
= "k8s_proxy_charm"
3656 elif ee_item
["juju"].get("proxy") is False:
3657 vca_type
= "native_charm"
3658 elif ee_item
.get("helm-chart"):
3659 vca_name
= ee_item
["helm-chart"]
3660 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
3663 vca_type
= "helm-v3"
3666 logging_text
+ "skipping non juju neither charm configuration"
3671 for vca_index
, vca_deployed
in enumerate(
3672 db_nsr
["_admin"]["deployed"]["VCA"]
3674 if not vca_deployed
:
3677 vca_deployed
.get("member-vnf-index") == member_vnf_index
3678 and vca_deployed
.get("vdu_id") == vdu_id
3679 and vca_deployed
.get("kdu_name") == kdu_name
3680 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
3681 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
3685 # not found, create one.
3687 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
3690 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
3692 target
+= "/kdu/{}".format(kdu_name
)
3694 "target_element": target
,
3695 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3696 "member-vnf-index": member_vnf_index
,
3698 "kdu_name": kdu_name
,
3699 "vdu_count_index": vdu_index
,
3700 "operational-status": "init", # TODO revise
3701 "detailed-status": "", # TODO revise
3702 "step": "initial-deploy", # TODO revise
3704 "vdu_name": vdu_name
,
3706 "ee_descriptor_id": ee_descriptor_id
,
3710 # create VCA and configurationStatus in db
3712 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
3713 "configurationStatus.{}".format(vca_index
): dict(),
3715 self
.update_db_2("nsrs", nsr_id
, db_dict
)
3717 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
3719 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
3720 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
3721 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
3724 task_n2vc
= asyncio
.ensure_future(
3725 self
.instantiate_N2VC(
3726 logging_text
=logging_text
,
3727 vca_index
=vca_index
,
3733 vdu_index
=vdu_index
,
3734 deploy_params
=deploy_params
,
3735 config_descriptor
=descriptor_config
,
3736 base_folder
=base_folder
,
3737 nslcmop_id
=nslcmop_id
,
3741 ee_config_descriptor
=ee_item
,
3744 self
.lcm_tasks
.register(
3748 "instantiate_N2VC-{}".format(vca_index
),
3751 task_instantiation_info
[
3753 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
3754 member_vnf_index
or "", vdu_id
or ""
3758 def _create_nslcmop(nsr_id
, operation
, params
):
3760 Creates a ns-lcm-opp content to be stored at database.
3761 :param nsr_id: internal id of the instance
3762 :param operation: instantiate, terminate, scale, action, ...
3763 :param params: user parameters for the operation
3764 :return: dictionary following SOL005 format
3766 # Raise exception if invalid arguments
3767 if not (nsr_id
and operation
and params
):
3769 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3776 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3777 "operationState": "PROCESSING",
3778 "statusEnteredTime": now
,
3779 "nsInstanceId": nsr_id
,
3780 "lcmOperationType": operation
,
3782 "isAutomaticInvocation": False,
3783 "operationParams": params
,
3784 "isCancelPending": False,
3786 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
3787 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
3792 def _format_additional_params(self
, params
):
3793 params
= params
or {}
3794 for key
, value
in params
.items():
3795 if str(value
).startswith("!!yaml "):
3796 params
[key
] = yaml
.safe_load(value
[7:])
3799 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
3800 primitive
= seq
.get("name")
3801 primitive_params
= {}
3803 "member_vnf_index": vnf_index
,
3804 "primitive": primitive
,
3805 "primitive_params": primitive_params
,
3808 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
3812 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
3813 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
3814 if op
.get("operationState") == "COMPLETED":
3815 # b. Skip sub-operation
3816 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3817 return self
.SUBOPERATION_STATUS_SKIP
3819 # c. retry executing sub-operation
3820 # The sub-operation exists, and operationState != 'COMPLETED'
3821 # Update operationState = 'PROCESSING' to indicate a retry.
3822 operationState
= "PROCESSING"
3823 detailed_status
= "In progress"
3824 self
._update
_suboperation
_status
(
3825 db_nslcmop
, op_index
, operationState
, detailed_status
3827 # Return the sub-operation index
3828 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3829 # with arguments extracted from the sub-operation
3832 # Find a sub-operation where all keys in a matching dictionary must match
3833 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3834 def _find_suboperation(self
, db_nslcmop
, match
):
3835 if db_nslcmop
and match
:
3836 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
3837 for i
, op
in enumerate(op_list
):
3838 if all(op
.get(k
) == match
[k
] for k
in match
):
3840 return self
.SUBOPERATION_STATUS_NOT_FOUND
3842 # Update status for a sub-operation given its index
3843 def _update_suboperation_status(
3844 self
, db_nslcmop
, op_index
, operationState
, detailed_status
3846 # Update DB for HA tasks
3847 q_filter
= {"_id": db_nslcmop
["_id"]}
3849 "_admin.operations.{}.operationState".format(op_index
): operationState
,
3850 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
3853 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
3856 # Add sub-operation, return the index of the added sub-operation
3857 # Optionally, set operationState, detailed-status, and operationType
3858 # Status and type are currently set for 'scale' sub-operations:
3859 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3860 # 'detailed-status' : status message
3861 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3862 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3863 def _add_suboperation(
3871 mapped_primitive_params
,
3872 operationState
=None,
3873 detailed_status
=None,
3876 RO_scaling_info
=None,
3879 return self
.SUBOPERATION_STATUS_NOT_FOUND
3880 # Get the "_admin.operations" list, if it exists
3881 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
3882 op_list
= db_nslcmop_admin
.get("operations")
3883 # Create or append to the "_admin.operations" list
3885 "member_vnf_index": vnf_index
,
3887 "vdu_count_index": vdu_count_index
,
3888 "primitive": primitive
,
3889 "primitive_params": mapped_primitive_params
,
3892 new_op
["operationState"] = operationState
3894 new_op
["detailed-status"] = detailed_status
3896 new_op
["lcmOperationType"] = operationType
3898 new_op
["RO_nsr_id"] = RO_nsr_id
3900 new_op
["RO_scaling_info"] = RO_scaling_info
3902 # No existing operations, create key 'operations' with current operation as first list element
3903 db_nslcmop_admin
.update({"operations": [new_op
]})
3904 op_list
= db_nslcmop_admin
.get("operations")
3906 # Existing operations, append operation to list
3907 op_list
.append(new_op
)
3909 db_nslcmop_update
= {"_admin.operations": op_list
}
3910 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
3911 op_index
= len(op_list
) - 1
3914 # Helper methods for scale() sub-operations
3916 # pre-scale/post-scale:
3917 # Check for 3 different cases:
3918 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
3919 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
3920 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
3921 def _check_or_add_scale_suboperation(
3925 vnf_config_primitive
,
3929 RO_scaling_info
=None,
3931 # Find this sub-operation
3932 if RO_nsr_id
and RO_scaling_info
:
3933 operationType
= "SCALE-RO"
3935 "member_vnf_index": vnf_index
,
3936 "RO_nsr_id": RO_nsr_id
,
3937 "RO_scaling_info": RO_scaling_info
,
3941 "member_vnf_index": vnf_index
,
3942 "primitive": vnf_config_primitive
,
3943 "primitive_params": primitive_params
,
3944 "lcmOperationType": operationType
,
3946 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
3947 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
3948 # a. New sub-operation
3949 # The sub-operation does not exist, add it.
3950 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
3951 # The following parameters are set to None for all kind of scaling:
3953 vdu_count_index
= None
3955 if RO_nsr_id
and RO_scaling_info
:
3956 vnf_config_primitive
= None
3957 primitive_params
= None
3960 RO_scaling_info
= None
3961 # Initial status for sub-operation
3962 operationState
= "PROCESSING"
3963 detailed_status
= "In progress"
3964 # Add sub-operation for pre/post-scaling (zero or more operations)
3965 self
._add
_suboperation
(
3971 vnf_config_primitive
,
3979 return self
.SUBOPERATION_STATUS_NEW
3981 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3982 # or op_index (operationState != 'COMPLETED')
3983 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
3985 # Function to return execution_environment id
3987 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
3988 # TODO vdu_index_count
3989 for vca
in vca_deployed_list
:
3990 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
3993 async def destroy_N2VC(
4001 exec_primitives
=True,
4006 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
4007 :param logging_text:
4009 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
4010 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
4011 :param vca_index: index in the database _admin.deployed.VCA
4012 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
4013 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
4014 not executed properly
4015 :param scaling_in: True destroys the application, False destroys the model
4016 :return: None or exception
4021 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
4022 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
4026 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
4028 # execute terminate_primitives
4030 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
4031 config_descriptor
.get("terminate-config-primitive"),
4032 vca_deployed
.get("ee_descriptor_id"),
4034 vdu_id
= vca_deployed
.get("vdu_id")
4035 vdu_count_index
= vca_deployed
.get("vdu_count_index")
4036 vdu_name
= vca_deployed
.get("vdu_name")
4037 vnf_index
= vca_deployed
.get("member-vnf-index")
4038 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
4039 for seq
in terminate_primitives
:
4040 # For each sequence in list, get primitive and call _ns_execute_primitive()
4041 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
4042 vnf_index
, seq
.get("name")
4044 self
.logger
.debug(logging_text
+ step
)
4045 # Create the primitive for each sequence, i.e. "primitive": "touch"
4046 primitive
= seq
.get("name")
4047 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
4052 self
._add
_suboperation
(
4059 mapped_primitive_params
,
4061 # Sub-operations: Call _ns_execute_primitive() instead of action()
4063 result
, result_detail
= await self
._ns
_execute
_primitive
(
4064 vca_deployed
["ee_id"],
4066 mapped_primitive_params
,
4070 except LcmException
:
4071 # this happens when VCA is not deployed. In this case it is not needed to terminate
4073 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
4074 if result
not in result_ok
:
4076 "terminate_primitive {} for vnf_member_index={} fails with "
4077 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
4079 # set that this VCA do not need terminated
4080 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
4084 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
4087 # Delete Prometheus Jobs if any
4088 # This uses NSR_ID, so it will destroy any jobs under this index
4089 self
.db
.del_list("prometheus_jobs", {"nsr_id": db_nslcmop
["nsInstanceId"]})
4092 await self
.vca_map
[vca_type
].delete_execution_environment(
4093 vca_deployed
["ee_id"],
4094 scaling_in
=scaling_in
,
4099 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
4100 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
4101 namespace
= "." + db_nsr
["_id"]
4103 await self
.n2vc
.delete_namespace(
4104 namespace
=namespace
,
4105 total_timeout
=self
.timeout_charm_delete
,
4108 except N2VCNotFound
: # already deleted. Skip
4110 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
4112 async def _terminate_RO(
4113 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4116 Terminates a deployment from RO
4117 :param logging_text:
4118 :param nsr_deployed: db_nsr._admin.deployed
4121 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
4122 this method will update only the index 2, but it will write on database the concatenated content of the list
4127 ro_nsr_id
= ro_delete_action
= None
4128 if nsr_deployed
and nsr_deployed
.get("RO"):
4129 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
4130 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
4133 stage
[2] = "Deleting ns from VIM."
4134 db_nsr_update
["detailed-status"] = " ".join(stage
)
4135 self
._write
_op
_status
(nslcmop_id
, stage
)
4136 self
.logger
.debug(logging_text
+ stage
[2])
4137 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4138 self
._write
_op
_status
(nslcmop_id
, stage
)
4139 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
4140 ro_delete_action
= desc
["action_id"]
4142 "_admin.deployed.RO.nsr_delete_action_id"
4143 ] = ro_delete_action
4144 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4145 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4146 if ro_delete_action
:
4147 # wait until NS is deleted from VIM
4148 stage
[2] = "Waiting ns deleted from VIM."
4149 detailed_status_old
= None
4153 + " RO_id={} ro_delete_action={}".format(
4154 ro_nsr_id
, ro_delete_action
4157 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4158 self
._write
_op
_status
(nslcmop_id
, stage
)
4160 delete_timeout
= 20 * 60 # 20 minutes
4161 while delete_timeout
> 0:
4162 desc
= await self
.RO
.show(
4164 item_id_name
=ro_nsr_id
,
4165 extra_item
="action",
4166 extra_item_id
=ro_delete_action
,
4170 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
4172 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
4173 if ns_status
== "ERROR":
4174 raise ROclient
.ROClientException(ns_status_info
)
4175 elif ns_status
== "BUILD":
4176 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
4177 elif ns_status
== "ACTIVE":
4178 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4179 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4184 ), "ROclient.check_action_status returns unknown {}".format(
4187 if stage
[2] != detailed_status_old
:
4188 detailed_status_old
= stage
[2]
4189 db_nsr_update
["detailed-status"] = " ".join(stage
)
4190 self
._write
_op
_status
(nslcmop_id
, stage
)
4191 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4192 await asyncio
.sleep(5, loop
=self
.loop
)
4194 else: # delete_timeout <= 0:
4195 raise ROclient
.ROClientException(
4196 "Timeout waiting ns deleted from VIM"
4199 except Exception as e
:
4200 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4202 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4204 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4205 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4206 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4208 logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
)
4211 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4213 failed_detail
.append("delete conflict: {}".format(e
))
4216 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
)
4219 failed_detail
.append("delete error: {}".format(e
))
4221 logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
)
4225 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
4226 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
4228 stage
[2] = "Deleting nsd from RO."
4229 db_nsr_update
["detailed-status"] = " ".join(stage
)
4230 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4231 self
._write
_op
_status
(nslcmop_id
, stage
)
4232 await self
.RO
.delete("nsd", ro_nsd_id
)
4234 logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
)
4236 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4237 except Exception as e
:
4239 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4241 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4243 logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
)
4246 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4248 failed_detail
.append(
4249 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
)
4251 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4253 failed_detail
.append(
4254 "ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
)
4256 self
.logger
.error(logging_text
+ failed_detail
[-1])
4258 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
4259 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
4260 if not vnf_deployed
or not vnf_deployed
["id"]:
4263 ro_vnfd_id
= vnf_deployed
["id"]
4266 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4267 vnf_deployed
["member-vnf-index"], ro_vnfd_id
4269 db_nsr_update
["detailed-status"] = " ".join(stage
)
4270 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4271 self
._write
_op
_status
(nslcmop_id
, stage
)
4272 await self
.RO
.delete("vnfd", ro_vnfd_id
)
4274 logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
)
4276 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
4277 except Exception as e
:
4279 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4282 "_admin.deployed.RO.vnfd.{}.id".format(index
)
4286 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
)
4289 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4291 failed_detail
.append(
4292 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
)
4294 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4296 failed_detail
.append(
4297 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
)
4299 self
.logger
.error(logging_text
+ failed_detail
[-1])
4302 stage
[2] = "Error deleting from VIM"
4304 stage
[2] = "Deleted from VIM"
4305 db_nsr_update
["detailed-status"] = " ".join(stage
)
4306 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4307 self
._write
_op
_status
(nslcmop_id
, stage
)
4310 raise LcmException("; ".join(failed_detail
))
4312 async def terminate(self
, nsr_id
, nslcmop_id
):
4313 # Try to lock HA task here
4314 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4315 if not task_is_locked_by_me
:
4318 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4319 self
.logger
.debug(logging_text
+ "Enter")
4320 timeout_ns_terminate
= self
.timeout_ns_terminate
4323 operation_params
= None
4325 error_list
= [] # annotates all failed error messages
4326 db_nslcmop_update
= {}
4327 autoremove
= False # autoremove after terminated
4328 tasks_dict_info
= {}
4331 "Stage 1/3: Preparing task.",
4332 "Waiting for previous operations to terminate.",
4335 # ^ contains [stage, step, VIM-status]
4337 # wait for any previous tasks in process
4338 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4340 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4341 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4342 operation_params
= db_nslcmop
.get("operationParams") or {}
4343 if operation_params
.get("timeout_ns_terminate"):
4344 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4345 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4346 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4348 db_nsr_update
["operational-status"] = "terminating"
4349 db_nsr_update
["config-status"] = "terminating"
4350 self
._write
_ns
_status
(
4352 ns_state
="TERMINATING",
4353 current_operation
="TERMINATING",
4354 current_operation_id
=nslcmop_id
,
4355 other_update
=db_nsr_update
,
4357 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4358 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4359 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4362 stage
[1] = "Getting vnf descriptors from db."
4363 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4365 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4367 db_vnfds_from_id
= {}
4368 db_vnfds_from_member_index
= {}
4370 for vnfr
in db_vnfrs_list
:
4371 vnfd_id
= vnfr
["vnfd-id"]
4372 if vnfd_id
not in db_vnfds_from_id
:
4373 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4374 db_vnfds_from_id
[vnfd_id
] = vnfd
4375 db_vnfds_from_member_index
[
4376 vnfr
["member-vnf-index-ref"]
4377 ] = db_vnfds_from_id
[vnfd_id
]
4379 # Destroy individual execution environments when there are terminating primitives.
4380 # Rest of EE will be deleted at once
4381 # TODO - check before calling _destroy_N2VC
4382 # if not operation_params.get("skip_terminate_primitives"):#
4383 # or not vca.get("needed_terminate"):
4384 stage
[0] = "Stage 2/3 execute terminating primitives."
4385 self
.logger
.debug(logging_text
+ stage
[0])
4386 stage
[1] = "Looking execution environment that needs terminate."
4387 self
.logger
.debug(logging_text
+ stage
[1])
4389 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4390 config_descriptor
= None
4391 vca_member_vnf_index
= vca
.get("member-vnf-index")
4392 vca_id
= self
.get_vca_id(
4393 db_vnfrs_dict
.get(vca_member_vnf_index
)
4394 if vca_member_vnf_index
4398 if not vca
or not vca
.get("ee_id"):
4400 if not vca
.get("member-vnf-index"):
4402 config_descriptor
= db_nsr
.get("ns-configuration")
4403 elif vca
.get("vdu_id"):
4404 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4405 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4406 elif vca
.get("kdu_name"):
4407 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4408 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4410 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4411 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4412 vca_type
= vca
.get("type")
4413 exec_terminate_primitives
= not operation_params
.get(
4414 "skip_terminate_primitives"
4415 ) and vca
.get("needed_terminate")
4416 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4417 # pending native charms
4419 True if vca_type
in ("helm", "helm-v3", "native_charm") else False
4421 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4422 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4423 task
= asyncio
.ensure_future(
4431 exec_terminate_primitives
,
4435 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4437 # wait for pending tasks of terminate primitives
4441 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4443 error_list
= await self
._wait
_for
_tasks
(
4446 min(self
.timeout_charm_delete
, timeout_ns_terminate
),
4450 tasks_dict_info
.clear()
4452 return # raise LcmException("; ".join(error_list))
4454 # remove All execution environments at once
4455 stage
[0] = "Stage 3/3 delete all."
4457 if nsr_deployed
.get("VCA"):
4458 stage
[1] = "Deleting all execution environments."
4459 self
.logger
.debug(logging_text
+ stage
[1])
4460 vca_id
= self
.get_vca_id({}, db_nsr
)
4461 task_delete_ee
= asyncio
.ensure_future(
4463 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4464 timeout
=self
.timeout_charm_delete
,
4467 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4468 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4470 # Delete from k8scluster
4471 stage
[1] = "Deleting KDUs."
4472 self
.logger
.debug(logging_text
+ stage
[1])
4473 # print(nsr_deployed)
4474 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4475 if not kdu
or not kdu
.get("kdu-instance"):
4477 kdu_instance
= kdu
.get("kdu-instance")
4478 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4479 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4480 vca_id
= self
.get_vca_id({}, db_nsr
)
4481 task_delete_kdu_instance
= asyncio
.ensure_future(
4482 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4483 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4484 kdu_instance
=kdu_instance
,
4491 + "Unknown k8s deployment type {}".format(
4492 kdu
.get("k8scluster-type")
4497 task_delete_kdu_instance
4498 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4501 stage
[1] = "Deleting ns from VIM."
4503 task_delete_ro
= asyncio
.ensure_future(
4504 self
._terminate
_ng
_ro
(
4505 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4509 task_delete_ro
= asyncio
.ensure_future(
4511 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4514 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4516 # rest of staff will be done at finally
4519 ROclient
.ROClientException
,
4524 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4526 except asyncio
.CancelledError
:
4528 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4530 exc
= "Operation was cancelled"
4531 except Exception as e
:
4532 exc
= traceback
.format_exc()
4533 self
.logger
.critical(
4534 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4539 error_list
.append(str(exc
))
4541 # wait for pending tasks
4543 stage
[1] = "Waiting for terminate pending tasks."
4544 self
.logger
.debug(logging_text
+ stage
[1])
4545 error_list
+= await self
._wait
_for
_tasks
(
4548 timeout_ns_terminate
,
4552 stage
[1] = stage
[2] = ""
4553 except asyncio
.CancelledError
:
4554 error_list
.append("Cancelled")
4555 # TODO cancell all tasks
4556 except Exception as exc
:
4557 error_list
.append(str(exc
))
4558 # update status at database
4560 error_detail
= "; ".join(error_list
)
4561 # self.logger.error(logging_text + error_detail)
4562 error_description_nslcmop
= "{} Detail: {}".format(
4563 stage
[0], error_detail
4565 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4566 nslcmop_id
, stage
[0]
4569 db_nsr_update
["operational-status"] = "failed"
4570 db_nsr_update
["detailed-status"] = (
4571 error_description_nsr
+ " Detail: " + error_detail
4573 db_nslcmop_update
["detailed-status"] = error_detail
4574 nslcmop_operation_state
= "FAILED"
4578 error_description_nsr
= error_description_nslcmop
= None
4579 ns_state
= "NOT_INSTANTIATED"
4580 db_nsr_update
["operational-status"] = "terminated"
4581 db_nsr_update
["detailed-status"] = "Done"
4582 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4583 db_nslcmop_update
["detailed-status"] = "Done"
4584 nslcmop_operation_state
= "COMPLETED"
4587 self
._write
_ns
_status
(
4590 current_operation
="IDLE",
4591 current_operation_id
=None,
4592 error_description
=error_description_nsr
,
4593 error_detail
=error_detail
,
4594 other_update
=db_nsr_update
,
4596 self
._write
_op
_status
(
4599 error_message
=error_description_nslcmop
,
4600 operation_state
=nslcmop_operation_state
,
4601 other_update
=db_nslcmop_update
,
4603 if ns_state
== "NOT_INSTANTIATED":
4607 {"nsr-id-ref": nsr_id
},
4608 {"_admin.nsState": "NOT_INSTANTIATED"},
4610 except DbException
as e
:
4613 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4617 if operation_params
:
4618 autoremove
= operation_params
.get("autoremove", False)
4619 if nslcmop_operation_state
:
4621 await self
.msg
.aiowrite(
4626 "nslcmop_id": nslcmop_id
,
4627 "operationState": nslcmop_operation_state
,
4628 "autoremove": autoremove
,
4632 except Exception as e
:
4634 logging_text
+ "kafka_write notification Exception {}".format(e
)
4637 self
.logger
.debug(logging_text
+ "Exit")
4638 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4640 async def _wait_for_tasks(
4641 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4644 error_detail_list
= []
4646 pending_tasks
= list(created_tasks_info
.keys())
4647 num_tasks
= len(pending_tasks
)
4649 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4650 self
._write
_op
_status
(nslcmop_id
, stage
)
4651 while pending_tasks
:
4653 _timeout
= timeout
+ time_start
- time()
4654 done
, pending_tasks
= await asyncio
.wait(
4655 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4657 num_done
+= len(done
)
4658 if not done
: # Timeout
4659 for task
in pending_tasks
:
4660 new_error
= created_tasks_info
[task
] + ": Timeout"
4661 error_detail_list
.append(new_error
)
4662 error_list
.append(new_error
)
4665 if task
.cancelled():
4668 exc
= task
.exception()
4670 if isinstance(exc
, asyncio
.TimeoutError
):
4672 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4673 error_list
.append(created_tasks_info
[task
])
4674 error_detail_list
.append(new_error
)
4681 ROclient
.ROClientException
,
4687 self
.logger
.error(logging_text
+ new_error
)
4689 exc_traceback
= "".join(
4690 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4694 + created_tasks_info
[task
]
4700 logging_text
+ created_tasks_info
[task
] + ": Done"
4702 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4704 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4705 if nsr_id
: # update also nsr
4710 "errorDescription": "Error at: " + ", ".join(error_list
),
4711 "errorDetail": ". ".join(error_detail_list
),
4714 self
._write
_op
_status
(nslcmop_id
, stage
)
4715 return error_detail_list
4718 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4720 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4721 The default-value is used. If it is between < > it look for a value at instantiation_params
4722 :param primitive_desc: portion of VNFD/NSD that describes primitive
4723 :param params: Params provided by user
4724 :param instantiation_params: Instantiation params provided by user
4725 :return: a dictionary with the calculated params
4727 calculated_params
= {}
4728 for parameter
in primitive_desc
.get("parameter", ()):
4729 param_name
= parameter
["name"]
4730 if param_name
in params
:
4731 calculated_params
[param_name
] = params
[param_name
]
4732 elif "default-value" in parameter
or "value" in parameter
:
4733 if "value" in parameter
:
4734 calculated_params
[param_name
] = parameter
["value"]
4736 calculated_params
[param_name
] = parameter
["default-value"]
4738 isinstance(calculated_params
[param_name
], str)
4739 and calculated_params
[param_name
].startswith("<")
4740 and calculated_params
[param_name
].endswith(">")
4742 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4743 calculated_params
[param_name
] = instantiation_params
[
4744 calculated_params
[param_name
][1:-1]
4748 "Parameter {} needed to execute primitive {} not provided".format(
4749 calculated_params
[param_name
], primitive_desc
["name"]
4754 "Parameter {} needed to execute primitive {} not provided".format(
4755 param_name
, primitive_desc
["name"]
4759 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4760 calculated_params
[param_name
] = yaml
.safe_dump(
4761 calculated_params
[param_name
], default_flow_style
=True, width
=256
4763 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4765 ].startswith("!!yaml "):
4766 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4767 if parameter
.get("data-type") == "INTEGER":
4769 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4770 except ValueError: # error converting string to int
4772 "Parameter {} of primitive {} must be integer".format(
4773 param_name
, primitive_desc
["name"]
4776 elif parameter
.get("data-type") == "BOOLEAN":
4777 calculated_params
[param_name
] = not (
4778 (str(calculated_params
[param_name
])).lower() == "false"
4781 # add always ns_config_info if primitive name is config
4782 if primitive_desc
["name"] == "config":
4783 if "ns_config_info" in instantiation_params
:
4784 calculated_params
["ns_config_info"] = instantiation_params
[
4787 return calculated_params
4789 def _look_for_deployed_vca(
4796 ee_descriptor_id
=None,
4798 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4799 for vca
in deployed_vca
:
4802 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4805 vdu_count_index
is not None
4806 and vdu_count_index
!= vca
["vdu_count_index"]
4809 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4811 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4815 # vca_deployed not found
4817 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4818 " is not deployed".format(
4827 ee_id
= vca
.get("ee_id")
4829 "type", "lxc_proxy_charm"
4830 ) # default value for backward compatibility - proxy charm
4833 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4834 "execution environment".format(
4835 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
4838 return ee_id
, vca_type
4840 async def _ns_execute_primitive(
4846 retries_interval
=30,
4853 if primitive
== "config":
4854 primitive_params
= {"params": primitive_params
}
4856 vca_type
= vca_type
or "lxc_proxy_charm"
4860 output
= await asyncio
.wait_for(
4861 self
.vca_map
[vca_type
].exec_primitive(
4863 primitive_name
=primitive
,
4864 params_dict
=primitive_params
,
4865 progress_timeout
=self
.timeout_progress_primitive
,
4866 total_timeout
=self
.timeout_primitive
,
4871 timeout
=timeout
or self
.timeout_primitive
,
4875 except asyncio
.CancelledError
:
4877 except Exception as e
: # asyncio.TimeoutError
4878 if isinstance(e
, asyncio
.TimeoutError
):
4883 "Error executing action {} on {} -> {}".format(
4888 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
4890 return "FAILED", str(e
)
4892 return "COMPLETED", output
4894 except (LcmException
, asyncio
.CancelledError
):
4896 except Exception as e
:
4897 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
4899 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
4901 Updating the vca_status with latest juju information in nsrs record
4902 :param: nsr_id: Id of the nsr
4903 :param: nslcmop_id: Id of the nslcmop
4907 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
4908 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4909 vca_id
= self
.get_vca_id({}, db_nsr
)
4910 if db_nsr
["_admin"]["deployed"]["K8s"]:
4911 for _
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
4912 cluster_uuid
, kdu_instance
, cluster_type
= (
4913 k8s
["k8scluster-uuid"],
4914 k8s
["kdu-instance"],
4915 k8s
["k8scluster-type"],
4917 await self
._on
_update
_k
8s
_db
(
4918 cluster_uuid
=cluster_uuid
,
4919 kdu_instance
=kdu_instance
,
4920 filter={"_id": nsr_id
},
4922 cluster_type
=cluster_type
,
4925 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
4926 table
, filter = "nsrs", {"_id": nsr_id
}
4927 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
4928 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
4930 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
4931 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
4933 async def action(self
, nsr_id
, nslcmop_id
):
4934 # Try to lock HA task here
4935 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4936 if not task_is_locked_by_me
:
4939 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
4940 self
.logger
.debug(logging_text
+ "Enter")
4941 # get all needed from database
4945 db_nslcmop_update
= {}
4946 nslcmop_operation_state
= None
4947 error_description_nslcmop
= None
4950 # wait for any previous tasks in process
4951 step
= "Waiting for previous operations to terminate"
4952 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4954 self
._write
_ns
_status
(
4957 current_operation
="RUNNING ACTION",
4958 current_operation_id
=nslcmop_id
,
4961 step
= "Getting information from database"
4962 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4963 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4964 if db_nslcmop
["operationParams"].get("primitive_params"):
4965 db_nslcmop
["operationParams"]["primitive_params"] = json
.loads(
4966 db_nslcmop
["operationParams"]["primitive_params"]
4969 nsr_deployed
= db_nsr
["_admin"].get("deployed")
4970 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
4971 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
4972 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
4973 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
4974 primitive
= db_nslcmop
["operationParams"]["primitive"]
4975 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
4976 timeout_ns_action
= db_nslcmop
["operationParams"].get(
4977 "timeout_ns_action", self
.timeout_primitive
4981 step
= "Getting vnfr from database"
4982 db_vnfr
= self
.db
.get_one(
4983 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
4985 if db_vnfr
.get("kdur"):
4987 for kdur
in db_vnfr
["kdur"]:
4988 if kdur
.get("additionalParams"):
4989 kdur
["additionalParams"] = json
.loads(
4990 kdur
["additionalParams"]
4992 kdur_list
.append(kdur
)
4993 db_vnfr
["kdur"] = kdur_list
4994 step
= "Getting vnfd from database"
4995 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
4997 step
= "Getting nsd from database"
4998 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
5000 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5001 # for backward compatibility
5002 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5003 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5004 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5005 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5007 # look for primitive
5008 config_primitive_desc
= descriptor_configuration
= None
5010 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
5012 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
5014 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
5016 descriptor_configuration
= db_nsd
.get("ns-configuration")
5018 if descriptor_configuration
and descriptor_configuration
.get(
5021 for config_primitive
in descriptor_configuration
["config-primitive"]:
5022 if config_primitive
["name"] == primitive
:
5023 config_primitive_desc
= config_primitive
5026 if not config_primitive_desc
:
5027 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
5029 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
5033 primitive_name
= primitive
5034 ee_descriptor_id
= None
5036 primitive_name
= config_primitive_desc
.get(
5037 "execution-environment-primitive", primitive
5039 ee_descriptor_id
= config_primitive_desc
.get(
5040 "execution-environment-ref"
5046 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
5048 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
5051 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
5053 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
5055 desc_params
= parse_yaml_strings(
5056 db_vnfr
.get("additionalParamsForVnf")
5059 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
5060 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
5061 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
5063 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
5064 actions
.add(primitive
["name"])
5065 for primitive
in kdu_configuration
.get("config-primitive", []):
5066 actions
.add(primitive
["name"])
5067 kdu_action
= True if primitive_name
in actions
else False
5069 # TODO check if ns is in a proper status
5071 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
5073 # kdur and desc_params already set from before
5074 if primitive_params
:
5075 desc_params
.update(primitive_params
)
5076 # TODO Check if we will need something at vnf level
5077 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
5079 kdu_name
== kdu
["kdu-name"]
5080 and kdu
["member-vnf-index"] == vnf_index
5085 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
5088 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
5089 msg
= "unknown k8scluster-type '{}'".format(
5090 kdu
.get("k8scluster-type")
5092 raise LcmException(msg
)
5095 "collection": "nsrs",
5096 "filter": {"_id": nsr_id
},
5097 "path": "_admin.deployed.K8s.{}".format(index
),
5101 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
5103 step
= "Executing kdu {}".format(primitive_name
)
5104 if primitive_name
== "upgrade":
5105 if desc_params
.get("kdu_model"):
5106 kdu_model
= desc_params
.get("kdu_model")
5107 del desc_params
["kdu_model"]
5109 kdu_model
= kdu
.get("kdu-model")
5110 parts
= kdu_model
.split(sep
=":")
5112 kdu_model
= parts
[0]
5114 detailed_status
= await asyncio
.wait_for(
5115 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
5116 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5117 kdu_instance
=kdu
.get("kdu-instance"),
5119 kdu_model
=kdu_model
,
5122 timeout
=timeout_ns_action
,
5124 timeout
=timeout_ns_action
+ 10,
5127 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
5129 elif primitive_name
== "rollback":
5130 detailed_status
= await asyncio
.wait_for(
5131 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
5132 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5133 kdu_instance
=kdu
.get("kdu-instance"),
5136 timeout
=timeout_ns_action
,
5138 elif primitive_name
== "status":
5139 detailed_status
= await asyncio
.wait_for(
5140 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
5141 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5142 kdu_instance
=kdu
.get("kdu-instance"),
5145 timeout
=timeout_ns_action
,
5148 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
5149 kdu
["kdu-name"], nsr_id
5151 params
= self
._map
_primitive
_params
(
5152 config_primitive_desc
, primitive_params
, desc_params
5155 detailed_status
= await asyncio
.wait_for(
5156 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
5157 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5158 kdu_instance
=kdu_instance
,
5159 primitive_name
=primitive_name
,
5162 timeout
=timeout_ns_action
,
5165 timeout
=timeout_ns_action
,
5169 nslcmop_operation_state
= "COMPLETED"
5171 detailed_status
= ""
5172 nslcmop_operation_state
= "FAILED"
5174 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5175 nsr_deployed
["VCA"],
5176 member_vnf_index
=vnf_index
,
5178 vdu_count_index
=vdu_count_index
,
5179 ee_descriptor_id
=ee_descriptor_id
,
5181 for vca_index
, vca_deployed
in enumerate(
5182 db_nsr
["_admin"]["deployed"]["VCA"]
5184 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5186 "collection": "nsrs",
5187 "filter": {"_id": nsr_id
},
5188 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
5192 nslcmop_operation_state
,
5194 ) = await self
._ns
_execute
_primitive
(
5196 primitive
=primitive_name
,
5197 primitive_params
=self
._map
_primitive
_params
(
5198 config_primitive_desc
, primitive_params
, desc_params
5200 timeout
=timeout_ns_action
,
5206 db_nslcmop_update
["detailed-status"] = detailed_status
5207 error_description_nslcmop
= (
5208 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
5212 + " task Done with result {} {}".format(
5213 nslcmop_operation_state
, detailed_status
5216 return # database update is called inside finally
5218 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5219 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5221 except asyncio
.CancelledError
:
5223 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5225 exc
= "Operation was cancelled"
5226 except asyncio
.TimeoutError
:
5227 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5229 except Exception as e
:
5230 exc
= traceback
.format_exc()
5231 self
.logger
.critical(
5232 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5241 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5242 nslcmop_operation_state
= "FAILED"
5244 self
._write
_ns
_status
(
5248 ], # TODO check if degraded. For the moment use previous status
5249 current_operation
="IDLE",
5250 current_operation_id
=None,
5251 # error_description=error_description_nsr,
5252 # error_detail=error_detail,
5253 other_update
=db_nsr_update
,
5256 self
._write
_op
_status
(
5259 error_message
=error_description_nslcmop
,
5260 operation_state
=nslcmop_operation_state
,
5261 other_update
=db_nslcmop_update
,
5264 if nslcmop_operation_state
:
5266 await self
.msg
.aiowrite(
5271 "nslcmop_id": nslcmop_id
,
5272 "operationState": nslcmop_operation_state
,
5276 except Exception as e
:
5278 logging_text
+ "kafka_write notification Exception {}".format(e
)
5280 self
.logger
.debug(logging_text
+ "Exit")
5281 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5282 return nslcmop_operation_state
, detailed_status
5284 async def scale(self
, nsr_id
, nslcmop_id
):
5285 # Try to lock HA task here
5286 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5287 if not task_is_locked_by_me
:
5290 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
5291 stage
= ["", "", ""]
5292 tasks_dict_info
= {}
5293 # ^ stage, step, VIM progress
5294 self
.logger
.debug(logging_text
+ "Enter")
5295 # get all needed from database
5297 db_nslcmop_update
= {}
5300 # in case of error, indicates what part of scale was failed to put nsr at error status
5301 scale_process
= None
5302 old_operational_status
= ""
5303 old_config_status
= ""
5306 # wait for any previous tasks in process
5307 step
= "Waiting for previous operations to terminate"
5308 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5309 self
._write
_ns
_status
(
5312 current_operation
="SCALING",
5313 current_operation_id
=nslcmop_id
,
5316 step
= "Getting nslcmop from database"
5318 step
+ " after having waited for previous tasks to be completed"
5320 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5322 step
= "Getting nsr from database"
5323 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5324 old_operational_status
= db_nsr
["operational-status"]
5325 old_config_status
= db_nsr
["config-status"]
5327 step
= "Parsing scaling parameters"
5328 db_nsr_update
["operational-status"] = "scaling"
5329 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5330 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5332 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
5334 ]["member-vnf-index"]
5335 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
5337 ]["scaling-group-descriptor"]
5338 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
5339 # for backward compatibility
5340 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5341 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5342 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5343 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5345 step
= "Getting vnfr from database"
5346 db_vnfr
= self
.db
.get_one(
5347 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
5350 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5352 step
= "Getting vnfd from database"
5353 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
5355 base_folder
= db_vnfd
["_admin"]["storage"]
5357 step
= "Getting scaling-group-descriptor"
5358 scaling_descriptor
= find_in_list(
5359 get_scaling_aspect(db_vnfd
),
5360 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
5362 if not scaling_descriptor
:
5364 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
5365 "at vnfd:scaling-group-descriptor".format(scaling_group
)
5368 step
= "Sending scale order to VIM"
5369 # TODO check if ns is in a proper status
5371 if not db_nsr
["_admin"].get("scaling-group"):
5376 "_admin.scaling-group": [
5377 {"name": scaling_group
, "nb-scale-op": 0}
5381 admin_scale_index
= 0
5383 for admin_scale_index
, admin_scale_info
in enumerate(
5384 db_nsr
["_admin"]["scaling-group"]
5386 if admin_scale_info
["name"] == scaling_group
:
5387 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
5389 else: # not found, set index one plus last element and add new entry with the name
5390 admin_scale_index
+= 1
5392 "_admin.scaling-group.{}.name".format(admin_scale_index
)
5395 vca_scaling_info
= []
5396 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
5397 if scaling_type
== "SCALE_OUT":
5398 if "aspect-delta-details" not in scaling_descriptor
:
5400 "Aspect delta details not fount in scaling descriptor {}".format(
5401 scaling_descriptor
["name"]
5404 # count if max-instance-count is reached
5405 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
5407 scaling_info
["scaling_direction"] = "OUT"
5408 scaling_info
["vdu-create"] = {}
5409 scaling_info
["kdu-create"] = {}
5410 for delta
in deltas
:
5411 for vdu_delta
in delta
.get("vdu-delta", {}):
5412 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
5413 # vdu_index also provides the number of instance of the targeted vdu
5414 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
5415 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
5419 additional_params
= (
5420 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
5423 cloud_init_list
= []
5425 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
5426 max_instance_count
= 10
5427 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
5428 max_instance_count
= vdu_profile
.get(
5429 "max-number-of-instances", 10
5432 default_instance_num
= get_number_of_instances(
5435 instances_number
= vdu_delta
.get("number-of-instances", 1)
5436 nb_scale_op
+= instances_number
5438 new_instance_count
= nb_scale_op
+ default_instance_num
5439 # Control if new count is over max and vdu count is less than max.
5440 # Then assign new instance count
5441 if new_instance_count
> max_instance_count
> vdu_count
:
5442 instances_number
= new_instance_count
- max_instance_count
5444 instances_number
= instances_number
5446 if new_instance_count
> max_instance_count
:
5448 "reached the limit of {} (max-instance-count) "
5449 "scaling-out operations for the "
5450 "scaling-group-descriptor '{}'".format(
5451 nb_scale_op
, scaling_group
5454 for x
in range(vdu_delta
.get("number-of-instances", 1)):
5456 # TODO Information of its own ip is not available because db_vnfr is not updated.
5457 additional_params
["OSM"] = get_osm_params(
5458 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
5460 cloud_init_list
.append(
5461 self
._parse
_cloud
_init
(
5468 vca_scaling_info
.append(
5470 "osm_vdu_id": vdu_delta
["id"],
5471 "member-vnf-index": vnf_index
,
5473 "vdu_index": vdu_index
+ x
,
5476 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
5477 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
5478 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
5479 kdu_name
= kdu_profile
["kdu-name"]
5480 resource_name
= kdu_profile
["resource-name"]
5482 # Might have different kdus in the same delta
5483 # Should have list for each kdu
5484 if not scaling_info
["kdu-create"].get(kdu_name
, None):
5485 scaling_info
["kdu-create"][kdu_name
] = []
5487 kdur
= get_kdur(db_vnfr
, kdu_name
)
5488 if kdur
.get("helm-chart"):
5489 k8s_cluster_type
= "helm-chart-v3"
5490 self
.logger
.debug("kdur: {}".format(kdur
))
5492 kdur
.get("helm-version")
5493 and kdur
.get("helm-version") == "v2"
5495 k8s_cluster_type
= "helm-chart"
5496 raise NotImplementedError
5497 elif kdur
.get("juju-bundle"):
5498 k8s_cluster_type
= "juju-bundle"
5501 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5502 "juju-bundle. Maybe an old NBI version is running".format(
5503 db_vnfr
["member-vnf-index-ref"], kdu_name
5507 max_instance_count
= 10
5508 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
5509 max_instance_count
= kdu_profile
.get(
5510 "max-number-of-instances", 10
5513 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
5514 deployed_kdu
, _
= get_deployed_kdu(
5515 nsr_deployed
, kdu_name
, vnf_index
5517 if deployed_kdu
is None:
5519 "KDU '{}' for vnf '{}' not deployed".format(
5523 kdu_instance
= deployed_kdu
.get("kdu-instance")
5524 instance_num
= await self
.k8scluster_map
[
5526 ].get_scale_count(resource_name
, kdu_instance
, vca_id
=vca_id
)
5527 kdu_replica_count
= instance_num
+ kdu_delta
.get(
5528 "number-of-instances", 1
5531 # Control if new count is over max and instance_num is less than max.
5532 # Then assign max instance number to kdu replica count
5533 if kdu_replica_count
> max_instance_count
> instance_num
:
5534 kdu_replica_count
= max_instance_count
5535 if kdu_replica_count
> max_instance_count
:
5537 "reached the limit of {} (max-instance-count) "
5538 "scaling-out operations for the "
5539 "scaling-group-descriptor '{}'".format(
5540 instance_num
, scaling_group
5544 for x
in range(kdu_delta
.get("number-of-instances", 1)):
5545 vca_scaling_info
.append(
5547 "osm_kdu_id": kdu_name
,
5548 "member-vnf-index": vnf_index
,
5550 "kdu_index": instance_num
+ x
- 1,
5553 scaling_info
["kdu-create"][kdu_name
].append(
5555 "member-vnf-index": vnf_index
,
5557 "k8s-cluster-type": k8s_cluster_type
,
5558 "resource-name": resource_name
,
5559 "scale": kdu_replica_count
,
5562 elif scaling_type
== "SCALE_IN":
5563 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
5565 scaling_info
["scaling_direction"] = "IN"
5566 scaling_info
["vdu-delete"] = {}
5567 scaling_info
["kdu-delete"] = {}
5569 for delta
in deltas
:
5570 for vdu_delta
in delta
.get("vdu-delta", {}):
5571 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
5572 min_instance_count
= 0
5573 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
5574 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
5575 min_instance_count
= vdu_profile
["min-number-of-instances"]
5577 default_instance_num
= get_number_of_instances(
5578 db_vnfd
, vdu_delta
["id"]
5580 instance_num
= vdu_delta
.get("number-of-instances", 1)
5581 nb_scale_op
-= instance_num
5583 new_instance_count
= nb_scale_op
+ default_instance_num
5585 if new_instance_count
< min_instance_count
< vdu_count
:
5586 instances_number
= min_instance_count
- new_instance_count
5588 instances_number
= instance_num
5590 if new_instance_count
< min_instance_count
:
5592 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5593 "scaling-group-descriptor '{}'".format(
5594 nb_scale_op
, scaling_group
5597 for x
in range(vdu_delta
.get("number-of-instances", 1)):
5598 vca_scaling_info
.append(
5600 "osm_vdu_id": vdu_delta
["id"],
5601 "member-vnf-index": vnf_index
,
5603 "vdu_index": vdu_index
- 1 - x
,
5606 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
5607 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
5608 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
5609 kdu_name
= kdu_profile
["kdu-name"]
5610 resource_name
= kdu_profile
["resource-name"]
5612 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
5613 scaling_info
["kdu-delete"][kdu_name
] = []
5615 kdur
= get_kdur(db_vnfr
, kdu_name
)
5616 if kdur
.get("helm-chart"):
5617 k8s_cluster_type
= "helm-chart-v3"
5618 self
.logger
.debug("kdur: {}".format(kdur
))
5620 kdur
.get("helm-version")
5621 and kdur
.get("helm-version") == "v2"
5623 k8s_cluster_type
= "helm-chart"
5624 raise NotImplementedError
5625 elif kdur
.get("juju-bundle"):
5626 k8s_cluster_type
= "juju-bundle"
5629 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5630 "juju-bundle. Maybe an old NBI version is running".format(
5631 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
5635 min_instance_count
= 0
5636 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
5637 min_instance_count
= kdu_profile
["min-number-of-instances"]
5639 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
5640 deployed_kdu
, _
= get_deployed_kdu(
5641 nsr_deployed
, kdu_name
, vnf_index
5643 if deployed_kdu
is None:
5645 "KDU '{}' for vnf '{}' not deployed".format(
5649 kdu_instance
= deployed_kdu
.get("kdu-instance")
5650 instance_num
= await self
.k8scluster_map
[
5652 ].get_scale_count(resource_name
, kdu_instance
, vca_id
=vca_id
)
5653 kdu_replica_count
= instance_num
- kdu_delta
.get(
5654 "number-of-instances", 1
5657 if kdu_replica_count
< min_instance_count
< instance_num
:
5658 kdu_replica_count
= min_instance_count
5659 if kdu_replica_count
< min_instance_count
:
5661 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5662 "scaling-group-descriptor '{}'".format(
5663 instance_num
, scaling_group
5667 for x
in range(kdu_delta
.get("number-of-instances", 1)):
5668 vca_scaling_info
.append(
5670 "osm_kdu_id": kdu_name
,
5671 "member-vnf-index": vnf_index
,
5673 "kdu_index": instance_num
- x
- 1,
5676 scaling_info
["kdu-delete"][kdu_name
].append(
5678 "member-vnf-index": vnf_index
,
5680 "k8s-cluster-type": k8s_cluster_type
,
5681 "resource-name": resource_name
,
5682 "scale": kdu_replica_count
,
5686 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
5687 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
5688 if scaling_info
["scaling_direction"] == "IN":
5689 for vdur
in reversed(db_vnfr
["vdur"]):
5690 if vdu_delete
.get(vdur
["vdu-id-ref"]):
5691 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
5692 scaling_info
["vdu"].append(
5694 "name": vdur
.get("name") or vdur
.get("vdu-name"),
5695 "vdu_id": vdur
["vdu-id-ref"],
5699 for interface
in vdur
["interfaces"]:
5700 scaling_info
["vdu"][-1]["interface"].append(
5702 "name": interface
["name"],
5703 "ip_address": interface
["ip-address"],
5704 "mac_address": interface
.get("mac-address"),
5707 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
5710 step
= "Executing pre-scale vnf-config-primitive"
5711 if scaling_descriptor
.get("scaling-config-action"):
5712 for scaling_config_action
in scaling_descriptor
[
5713 "scaling-config-action"
5716 scaling_config_action
.get("trigger") == "pre-scale-in"
5717 and scaling_type
== "SCALE_IN"
5719 scaling_config_action
.get("trigger") == "pre-scale-out"
5720 and scaling_type
== "SCALE_OUT"
5722 vnf_config_primitive
= scaling_config_action
[
5723 "vnf-config-primitive-name-ref"
5725 step
= db_nslcmop_update
[
5727 ] = "executing pre-scale scaling-config-action '{}'".format(
5728 vnf_config_primitive
5731 # look for primitive
5732 for config_primitive
in (
5733 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
5734 ).get("config-primitive", ()):
5735 if config_primitive
["name"] == vnf_config_primitive
:
5739 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
5740 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
5741 "primitive".format(scaling_group
, vnf_config_primitive
)
5744 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
5745 if db_vnfr
.get("additionalParamsForVnf"):
5746 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
5748 scale_process
= "VCA"
5749 db_nsr_update
["config-status"] = "configuring pre-scaling"
5750 primitive_params
= self
._map
_primitive
_params
(
5751 config_primitive
, {}, vnfr_params
5754 # Pre-scale retry check: Check if this sub-operation has been executed before
5755 op_index
= self
._check
_or
_add
_scale
_suboperation
(
5758 vnf_config_primitive
,
5762 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
5763 # Skip sub-operation
5764 result
= "COMPLETED"
5765 result_detail
= "Done"
5768 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5769 vnf_config_primitive
, result
, result_detail
5773 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
5774 # New sub-operation: Get index of this sub-operation
5776 len(db_nslcmop
.get("_admin", {}).get("operations"))
5781 + "vnf_config_primitive={} New sub-operation".format(
5782 vnf_config_primitive
5786 # retry: Get registered params for this existing sub-operation
5787 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
5790 vnf_index
= op
.get("member_vnf_index")
5791 vnf_config_primitive
= op
.get("primitive")
5792 primitive_params
= op
.get("primitive_params")
5795 + "vnf_config_primitive={} Sub-operation retry".format(
5796 vnf_config_primitive
5799 # Execute the primitive, either with new (first-time) or registered (reintent) args
5800 ee_descriptor_id
= config_primitive
.get(
5801 "execution-environment-ref"
5803 primitive_name
= config_primitive
.get(
5804 "execution-environment-primitive", vnf_config_primitive
5806 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5807 nsr_deployed
["VCA"],
5808 member_vnf_index
=vnf_index
,
5810 vdu_count_index
=None,
5811 ee_descriptor_id
=ee_descriptor_id
,
5813 result
, result_detail
= await self
._ns
_execute
_primitive
(
5822 + "vnf_config_primitive={} Done with result {} {}".format(
5823 vnf_config_primitive
, result
, result_detail
5826 # Update operationState = COMPLETED | FAILED
5827 self
._update
_suboperation
_status
(
5828 db_nslcmop
, op_index
, result
, result_detail
5831 if result
== "FAILED":
5832 raise LcmException(result_detail
)
5833 db_nsr_update
["config-status"] = old_config_status
5834 scale_process
= None
5838 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
5841 "_admin.scaling-group.{}.time".format(admin_scale_index
)
5844 # SCALE-IN VCA - BEGIN
5845 if vca_scaling_info
:
5846 step
= db_nslcmop_update
[
5848 ] = "Deleting the execution environments"
5849 scale_process
= "VCA"
5850 for vca_info
in vca_scaling_info
:
5851 if vca_info
["type"] == "delete":
5852 member_vnf_index
= str(vca_info
["member-vnf-index"])
5854 logging_text
+ "vdu info: {}".format(vca_info
)
5856 if vca_info
.get("osm_vdu_id"):
5857 vdu_id
= vca_info
["osm_vdu_id"]
5858 vdu_index
= int(vca_info
["vdu_index"])
5861 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5862 member_vnf_index
, vdu_id
, vdu_index
5866 kdu_id
= vca_info
["osm_kdu_id"]
5869 ] = "Scaling member_vnf_index={}, kdu_id={}, vdu_index={} ".format(
5870 member_vnf_index
, kdu_id
, vdu_index
5872 stage
[2] = step
= "Scaling in VCA"
5873 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
5874 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
5875 config_update
= db_nsr
["configurationStatus"]
5876 for vca_index
, vca
in enumerate(vca_update
):
5878 (vca
or vca
.get("ee_id"))
5879 and vca
["member-vnf-index"] == member_vnf_index
5880 and vca
["vdu_count_index"] == vdu_index
5882 if vca
.get("vdu_id"):
5883 config_descriptor
= get_configuration(
5884 db_vnfd
, vca
.get("vdu_id")
5886 elif vca
.get("kdu_name"):
5887 config_descriptor
= get_configuration(
5888 db_vnfd
, vca
.get("kdu_name")
5891 config_descriptor
= get_configuration(
5892 db_vnfd
, db_vnfd
["id"]
5894 operation_params
= (
5895 db_nslcmop
.get("operationParams") or {}
5897 exec_terminate_primitives
= not operation_params
.get(
5898 "skip_terminate_primitives"
5899 ) and vca
.get("needed_terminate")
5900 task
= asyncio
.ensure_future(
5909 exec_primitives
=exec_terminate_primitives
,
5913 timeout
=self
.timeout_charm_delete
,
5916 tasks_dict_info
[task
] = "Terminating VCA {}".format(
5919 del vca_update
[vca_index
]
5920 del config_update
[vca_index
]
5921 # wait for pending tasks of terminate primitives
5925 + "Waiting for tasks {}".format(
5926 list(tasks_dict_info
.keys())
5929 error_list
= await self
._wait
_for
_tasks
(
5933 self
.timeout_charm_delete
, self
.timeout_ns_terminate
5938 tasks_dict_info
.clear()
5940 raise LcmException("; ".join(error_list
))
5942 db_vca_and_config_update
= {
5943 "_admin.deployed.VCA": vca_update
,
5944 "configurationStatus": config_update
,
5947 "nsrs", db_nsr
["_id"], db_vca_and_config_update
5949 scale_process
= None
5950 # SCALE-IN VCA - END
5953 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
5954 scale_process
= "RO"
5955 if self
.ro_config
.get("ng"):
5956 await self
._scale
_ng
_ro
(
5957 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
5959 scaling_info
.pop("vdu-create", None)
5960 scaling_info
.pop("vdu-delete", None)
5962 scale_process
= None
5966 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
5967 scale_process
= "KDU"
5968 await self
._scale
_kdu
(
5969 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
5971 scaling_info
.pop("kdu-create", None)
5972 scaling_info
.pop("kdu-delete", None)
5974 scale_process
= None
5978 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5980 # SCALE-UP VCA - BEGIN
5981 if vca_scaling_info
:
5982 step
= db_nslcmop_update
[
5984 ] = "Creating new execution environments"
5985 scale_process
= "VCA"
5986 for vca_info
in vca_scaling_info
:
5987 if vca_info
["type"] == "create":
5988 member_vnf_index
= str(vca_info
["member-vnf-index"])
5990 logging_text
+ "vdu info: {}".format(vca_info
)
5992 vnfd_id
= db_vnfr
["vnfd-ref"]
5993 if vca_info
.get("osm_vdu_id"):
5994 vdu_index
= int(vca_info
["vdu_index"])
5995 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
5996 if db_vnfr
.get("additionalParamsForVnf"):
5997 deploy_params
.update(
5999 db_vnfr
["additionalParamsForVnf"].copy()
6002 descriptor_config
= get_configuration(
6003 db_vnfd
, db_vnfd
["id"]
6005 if descriptor_config
:
6010 logging_text
=logging_text
6011 + "member_vnf_index={} ".format(member_vnf_index
),
6014 nslcmop_id
=nslcmop_id
,
6020 member_vnf_index
=member_vnf_index
,
6021 vdu_index
=vdu_index
,
6023 deploy_params
=deploy_params
,
6024 descriptor_config
=descriptor_config
,
6025 base_folder
=base_folder
,
6026 task_instantiation_info
=tasks_dict_info
,
6029 vdu_id
= vca_info
["osm_vdu_id"]
6030 vdur
= find_in_list(
6031 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
6033 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
6034 if vdur
.get("additionalParams"):
6035 deploy_params_vdu
= parse_yaml_strings(
6036 vdur
["additionalParams"]
6039 deploy_params_vdu
= deploy_params
6040 deploy_params_vdu
["OSM"] = get_osm_params(
6041 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
6043 if descriptor_config
:
6048 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6049 member_vnf_index
, vdu_id
, vdu_index
6051 stage
[2] = step
= "Scaling out VCA"
6052 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6054 logging_text
=logging_text
6055 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6056 member_vnf_index
, vdu_id
, vdu_index
6060 nslcmop_id
=nslcmop_id
,
6066 member_vnf_index
=member_vnf_index
,
6067 vdu_index
=vdu_index
,
6069 deploy_params
=deploy_params_vdu
,
6070 descriptor_config
=descriptor_config
,
6071 base_folder
=base_folder
,
6072 task_instantiation_info
=tasks_dict_info
,
6076 kdu_name
= vca_info
["osm_kdu_id"]
6077 descriptor_config
= get_configuration(db_vnfd
, kdu_name
)
6078 if descriptor_config
:
6080 kdu_index
= int(vca_info
["kdu_index"])
6084 for x
in db_vnfr
["kdur"]
6085 if x
["kdu-name"] == kdu_name
6087 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
6088 if kdur
.get("additionalParams"):
6089 deploy_params_kdu
= parse_yaml_strings(
6090 kdur
["additionalParams"]
6094 logging_text
=logging_text
,
6097 nslcmop_id
=nslcmop_id
,
6103 member_vnf_index
=member_vnf_index
,
6104 vdu_index
=kdu_index
,
6106 deploy_params
=deploy_params_kdu
,
6107 descriptor_config
=descriptor_config
,
6108 base_folder
=base_folder
,
6109 task_instantiation_info
=tasks_dict_info
,
6112 # SCALE-UP VCA - END
6113 scale_process
= None
6116 # execute primitive service POST-SCALING
6117 step
= "Executing post-scale vnf-config-primitive"
6118 if scaling_descriptor
.get("scaling-config-action"):
6119 for scaling_config_action
in scaling_descriptor
[
6120 "scaling-config-action"
6123 scaling_config_action
.get("trigger") == "post-scale-in"
6124 and scaling_type
== "SCALE_IN"
6126 scaling_config_action
.get("trigger") == "post-scale-out"
6127 and scaling_type
== "SCALE_OUT"
6129 vnf_config_primitive
= scaling_config_action
[
6130 "vnf-config-primitive-name-ref"
6132 step
= db_nslcmop_update
[
6134 ] = "executing post-scale scaling-config-action '{}'".format(
6135 vnf_config_primitive
6138 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6139 if db_vnfr
.get("additionalParamsForVnf"):
6140 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6142 # look for primitive
6143 for config_primitive
in (
6144 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6145 ).get("config-primitive", ()):
6146 if config_primitive
["name"] == vnf_config_primitive
:
6150 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
6151 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
6152 "config-primitive".format(
6153 scaling_group
, vnf_config_primitive
6156 scale_process
= "VCA"
6157 db_nsr_update
["config-status"] = "configuring post-scaling"
6158 primitive_params
= self
._map
_primitive
_params
(
6159 config_primitive
, {}, vnfr_params
6162 # Post-scale retry check: Check if this sub-operation has been executed before
6163 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6166 vnf_config_primitive
,
6170 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6171 # Skip sub-operation
6172 result
= "COMPLETED"
6173 result_detail
= "Done"
6176 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6177 vnf_config_primitive
, result
, result_detail
6181 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6182 # New sub-operation: Get index of this sub-operation
6184 len(db_nslcmop
.get("_admin", {}).get("operations"))
6189 + "vnf_config_primitive={} New sub-operation".format(
6190 vnf_config_primitive
6194 # retry: Get registered params for this existing sub-operation
6195 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6198 vnf_index
= op
.get("member_vnf_index")
6199 vnf_config_primitive
= op
.get("primitive")
6200 primitive_params
= op
.get("primitive_params")
6203 + "vnf_config_primitive={} Sub-operation retry".format(
6204 vnf_config_primitive
6207 # Execute the primitive, either with new (first-time) or registered (reintent) args
6208 ee_descriptor_id
= config_primitive
.get(
6209 "execution-environment-ref"
6211 primitive_name
= config_primitive
.get(
6212 "execution-environment-primitive", vnf_config_primitive
6214 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6215 nsr_deployed
["VCA"],
6216 member_vnf_index
=vnf_index
,
6218 vdu_count_index
=None,
6219 ee_descriptor_id
=ee_descriptor_id
,
6221 result
, result_detail
= await self
._ns
_execute
_primitive
(
6230 + "vnf_config_primitive={} Done with result {} {}".format(
6231 vnf_config_primitive
, result
, result_detail
6234 # Update operationState = COMPLETED | FAILED
6235 self
._update
_suboperation
_status
(
6236 db_nslcmop
, op_index
, result
, result_detail
6239 if result
== "FAILED":
6240 raise LcmException(result_detail
)
6241 db_nsr_update
["config-status"] = old_config_status
6242 scale_process
= None
6247 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
6248 db_nsr_update
["operational-status"] = (
6250 if old_operational_status
== "failed"
6251 else old_operational_status
6253 db_nsr_update
["config-status"] = old_config_status
6256 ROclient
.ROClientException
,
6261 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
6263 except asyncio
.CancelledError
:
6265 logging_text
+ "Cancelled Exception while '{}'".format(step
)
6267 exc
= "Operation was cancelled"
6268 except Exception as e
:
6269 exc
= traceback
.format_exc()
6270 self
.logger
.critical(
6271 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6275 self
._write
_ns
_status
(
6278 current_operation
="IDLE",
6279 current_operation_id
=None,
6282 stage
[1] = "Waiting for instantiate pending tasks."
6283 self
.logger
.debug(logging_text
+ stage
[1])
6284 exc
= await self
._wait
_for
_tasks
(
6287 self
.timeout_ns_deploy
,
6295 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
6296 nslcmop_operation_state
= "FAILED"
6298 db_nsr_update
["operational-status"] = old_operational_status
6299 db_nsr_update
["config-status"] = old_config_status
6300 db_nsr_update
["detailed-status"] = ""
6302 if "VCA" in scale_process
:
6303 db_nsr_update
["config-status"] = "failed"
6304 if "RO" in scale_process
:
6305 db_nsr_update
["operational-status"] = "failed"
6308 ] = "FAILED scaling nslcmop={} {}: {}".format(
6309 nslcmop_id
, step
, exc
6312 error_description_nslcmop
= None
6313 nslcmop_operation_state
= "COMPLETED"
6314 db_nslcmop_update
["detailed-status"] = "Done"
6316 self
._write
_op
_status
(
6319 error_message
=error_description_nslcmop
,
6320 operation_state
=nslcmop_operation_state
,
6321 other_update
=db_nslcmop_update
,
6324 self
._write
_ns
_status
(
6327 current_operation
="IDLE",
6328 current_operation_id
=None,
6329 other_update
=db_nsr_update
,
6332 if nslcmop_operation_state
:
6336 "nslcmop_id": nslcmop_id
,
6337 "operationState": nslcmop_operation_state
,
6339 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
6340 except Exception as e
:
6342 logging_text
+ "kafka_write notification Exception {}".format(e
)
6344 self
.logger
.debug(logging_text
+ "Exit")
6345 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
6347 async def _scale_kdu(
6348 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6350 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
6351 for kdu_name
in _scaling_info
:
6352 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
6353 deployed_kdu
, index
= get_deployed_kdu(
6354 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
6356 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
6357 kdu_instance
= deployed_kdu
["kdu-instance"]
6358 scale
= int(kdu_scaling_info
["scale"])
6359 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
6362 "collection": "nsrs",
6363 "filter": {"_id": nsr_id
},
6364 "path": "_admin.deployed.K8s.{}".format(index
),
6367 step
= "scaling application {}".format(
6368 kdu_scaling_info
["resource-name"]
6370 self
.logger
.debug(logging_text
+ step
)
6372 if kdu_scaling_info
["type"] == "delete":
6373 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
6376 and kdu_config
.get("terminate-config-primitive")
6377 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
6379 terminate_config_primitive_list
= kdu_config
.get(
6380 "terminate-config-primitive"
6382 terminate_config_primitive_list
.sort(
6383 key
=lambda val
: int(val
["seq"])
6387 terminate_config_primitive
6388 ) in terminate_config_primitive_list
:
6389 primitive_params_
= self
._map
_primitive
_params
(
6390 terminate_config_primitive
, {}, {}
6392 step
= "execute terminate config primitive"
6393 self
.logger
.debug(logging_text
+ step
)
6394 await asyncio
.wait_for(
6395 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
6396 cluster_uuid
=cluster_uuid
,
6397 kdu_instance
=kdu_instance
,
6398 primitive_name
=terminate_config_primitive
["name"],
6399 params
=primitive_params_
,
6406 await asyncio
.wait_for(
6407 self
.k8scluster_map
[k8s_cluster_type
].scale(
6410 kdu_scaling_info
["resource-name"],
6413 timeout
=self
.timeout_vca_on_error
,
6416 if kdu_scaling_info
["type"] == "create":
6417 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
6420 and kdu_config
.get("initial-config-primitive")
6421 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
6423 initial_config_primitive_list
= kdu_config
.get(
6424 "initial-config-primitive"
6426 initial_config_primitive_list
.sort(
6427 key
=lambda val
: int(val
["seq"])
6430 for initial_config_primitive
in initial_config_primitive_list
:
6431 primitive_params_
= self
._map
_primitive
_params
(
6432 initial_config_primitive
, {}, {}
6434 step
= "execute initial config primitive"
6435 self
.logger
.debug(logging_text
+ step
)
6436 await asyncio
.wait_for(
6437 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
6438 cluster_uuid
=cluster_uuid
,
6439 kdu_instance
=kdu_instance
,
6440 primitive_name
=initial_config_primitive
["name"],
6441 params
=primitive_params_
,
6448 async def _scale_ng_ro(
6449 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
6451 nsr_id
= db_nslcmop
["nsInstanceId"]
6452 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
6455 # read from db: vnfd's for every vnf
6458 # for each vnf in ns, read vnfd
6459 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
6460 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
6461 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
6462 # if we haven't this vnfd, read it from db
6463 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
6465 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
6466 db_vnfds
.append(vnfd
)
6467 n2vc_key
= self
.n2vc
.get_public_key()
6468 n2vc_key_list
= [n2vc_key
]
6471 vdu_scaling_info
.get("vdu-create"),
6472 vdu_scaling_info
.get("vdu-delete"),
6475 # db_vnfr has been updated, update db_vnfrs to use it
6476 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
6477 await self
._instantiate
_ng
_ro
(
6487 start_deploy
=time(),
6488 timeout_ns_deploy
=self
.timeout_ns_deploy
,
6490 if vdu_scaling_info
.get("vdu-delete"):
6492 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
6495 async def extract_prometheus_scrape_jobs(
6496 self
, ee_id
, artifact_path
, ee_config_descriptor
, vnfr_id
, nsr_id
, target_ip
6498 # look if exist a file called 'prometheus*.j2' and
6499 artifact_content
= self
.fs
.dir_ls(artifact_path
)
6503 for f
in artifact_content
6504 if f
.startswith("prometheus") and f
.endswith(".j2")
6510 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
6514 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
6515 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
6517 vnfr_id
= vnfr_id
.replace("-", "")
6519 "JOB_NAME": vnfr_id
,
6520 "TARGET_IP": target_ip
,
6521 "EXPORTER_POD_IP": host_name
,
6522 "EXPORTER_POD_PORT": host_port
,
6524 job_list
= parse_job(job_data
, variables
)
6525 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
6526 for job
in job_list
:
6528 not isinstance(job
.get("job_name"), str)
6529 or vnfr_id
not in job
["job_name"]
6531 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
6532 job
["nsr_id"] = nsr_id
6533 job
["vnfr_id"] = vnfr_id
6536 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
6538 Get VCA Cloud and VCA Cloud Credentials for the VIM account
6540 :param: vim_account_id: VIM Account ID
6542 :return: (cloud_name, cloud_credential)
6544 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
6545 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
6547 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
6549 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
6551 :param: vim_account_id: VIM Account ID
6553 :return: (cloud_name, cloud_credential)
6555 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
6556 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")