1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
20 from typing
import Any
, Dict
, List
23 import logging
.handlers
34 from osm_lcm
import ROclient
35 from osm_lcm
.data_utils
.nsr
import (
38 get_deployed_vca_list
,
41 from osm_lcm
.data_utils
.vca
import (
50 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
51 from osm_lcm
.lcm_utils
import (
59 from osm_lcm
.data_utils
.nsd
import (
60 get_ns_configuration_relation_list
,
64 from osm_lcm
.data_utils
.vnfd
import (
68 get_ee_sorted_initial_config_primitive_list
,
69 get_ee_sorted_terminate_config_primitive_list
,
71 get_virtual_link_profiles
,
76 get_number_of_instances
,
78 get_kdu_resource_profile
,
80 from osm_lcm
.data_utils
.list_utils
import find_in_list
81 from osm_lcm
.data_utils
.vnfr
import get_osm_params
, get_vdur_index
, get_kdur
82 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
83 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
84 from n2vc
.definitions
import RelationEndpoint
85 from n2vc
.k8s_helm_conn
import K8sHelmConnector
86 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
87 from n2vc
.k8s_juju_conn
import K8sJujuConnector
89 from osm_common
.dbbase
import DbException
90 from osm_common
.fsbase
import FsException
92 from osm_lcm
.data_utils
.database
.database
import Database
93 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
95 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
96 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
98 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
99 from osm_lcm
.prometheus
import parse_job
101 from copy
import copy
, deepcopy
102 from time
import time
103 from uuid
import uuid4
105 from random
import randint
107 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
110 class NsLcm(LcmBase
):
111 timeout_vca_on_error
= (
113 ) # Time for charm from first time at blocked,error status to mark as failed
114 timeout_ns_deploy
= 2 * 3600 # default global timeout for deployment a ns
115 timeout_ns_terminate
= 1800 # default global timeout for un deployment a ns
116 timeout_charm_delete
= 10 * 60
117 timeout_primitive
= 30 * 60 # timeout for primitive execution
118 timeout_progress_primitive
= (
120 ) # timeout for some progress in a primitive execution
122 SUBOPERATION_STATUS_NOT_FOUND
= -1
123 SUBOPERATION_STATUS_NEW
= -2
124 SUBOPERATION_STATUS_SKIP
= -3
125 task_name_deploy_vca
= "Deploying VCA"
127 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
129 Init, Connect to database, filesystem storage, and messaging
130 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
133 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
135 self
.db
= Database().instance
.db
136 self
.fs
= Filesystem().instance
.fs
138 self
.lcm_tasks
= lcm_tasks
139 self
.timeout
= config
["timeout"]
140 self
.ro_config
= config
["ro_config"]
141 self
.ng_ro
= config
["ro_config"].get("ng")
142 self
.vca_config
= config
["VCA"].copy()
144 # create N2VC connector
145 self
.n2vc
= N2VCJujuConnector(
148 on_update_db
=self
._on
_update
_n
2vc
_db
,
153 self
.conn_helm_ee
= LCMHelmConn(
156 vca_config
=self
.vca_config
,
157 on_update_db
=self
._on
_update
_n
2vc
_db
,
160 self
.k8sclusterhelm2
= K8sHelmConnector(
161 kubectl_command
=self
.vca_config
.get("kubectlpath"),
162 helm_command
=self
.vca_config
.get("helmpath"),
169 self
.k8sclusterhelm3
= K8sHelm3Connector(
170 kubectl_command
=self
.vca_config
.get("kubectlpath"),
171 helm_command
=self
.vca_config
.get("helm3path"),
178 self
.k8sclusterjuju
= K8sJujuConnector(
179 kubectl_command
=self
.vca_config
.get("kubectlpath"),
180 juju_command
=self
.vca_config
.get("jujupath"),
183 on_update_db
=self
._on
_update
_k
8s
_db
,
188 self
.k8scluster_map
= {
189 "helm-chart": self
.k8sclusterhelm2
,
190 "helm-chart-v3": self
.k8sclusterhelm3
,
191 "chart": self
.k8sclusterhelm3
,
192 "juju-bundle": self
.k8sclusterjuju
,
193 "juju": self
.k8sclusterjuju
,
197 "lxc_proxy_charm": self
.n2vc
,
198 "native_charm": self
.n2vc
,
199 "k8s_proxy_charm": self
.n2vc
,
200 "helm": self
.conn_helm_ee
,
201 "helm-v3": self
.conn_helm_ee
,
205 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
)
208 def increment_ip_mac(ip_mac
, vm_index
=1):
209 if not isinstance(ip_mac
, str):
212 # try with ipv4 look for last dot
213 i
= ip_mac
.rfind(".")
216 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
217 # try with ipv6 or mac look for last colon. Operate in hex
218 i
= ip_mac
.rfind(":")
221 # format in hex, len can be 2 for mac or 4 for ipv6
222 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
223 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
229 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
231 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
234 # TODO filter RO descriptor fields...
238 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
239 db_dict
["deploymentStatus"] = ro_descriptor
240 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
242 except Exception as e
:
244 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id
, e
)
247 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
249 # remove last dot from path (if exists)
250 if path
.endswith("."):
253 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
254 # .format(table, filter, path, updated_data))
257 nsr_id
= filter.get("_id")
259 # read ns record from database
260 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
261 current_ns_status
= nsr
.get("nsState")
263 # get vca status for NS
264 status_dict
= await self
.n2vc
.get_status(
265 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
270 db_dict
["vcaStatus"] = status_dict
271 await self
.n2vc
.update_vca_status(db_dict
["vcaStatus"], vca_id
=vca_id
)
273 # update configurationStatus for this VCA
275 vca_index
= int(path
[path
.rfind(".") + 1 :])
278 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
280 vca_status
= vca_list
[vca_index
].get("status")
282 configuration_status_list
= nsr
.get("configurationStatus")
283 config_status
= configuration_status_list
[vca_index
].get("status")
285 if config_status
== "BROKEN" and vca_status
!= "failed":
286 db_dict
["configurationStatus"][vca_index
] = "READY"
287 elif config_status
!= "BROKEN" and vca_status
== "failed":
288 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
289 except Exception as e
:
290 # not update configurationStatus
291 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
293 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
294 # if nsState = 'DEGRADED' check if all is OK
296 if current_ns_status
in ("READY", "DEGRADED"):
297 error_description
= ""
299 if status_dict
.get("machines"):
300 for machine_id
in status_dict
.get("machines"):
301 machine
= status_dict
.get("machines").get(machine_id
)
302 # check machine agent-status
303 if machine
.get("agent-status"):
304 s
= machine
.get("agent-status").get("status")
307 error_description
+= (
308 "machine {} agent-status={} ; ".format(
312 # check machine instance status
313 if machine
.get("instance-status"):
314 s
= machine
.get("instance-status").get("status")
317 error_description
+= (
318 "machine {} instance-status={} ; ".format(
323 if status_dict
.get("applications"):
324 for app_id
in status_dict
.get("applications"):
325 app
= status_dict
.get("applications").get(app_id
)
326 # check application status
327 if app
.get("status"):
328 s
= app
.get("status").get("status")
331 error_description
+= (
332 "application {} status={} ; ".format(app_id
, s
)
335 if error_description
:
336 db_dict
["errorDescription"] = error_description
337 if current_ns_status
== "READY" and is_degraded
:
338 db_dict
["nsState"] = "DEGRADED"
339 if current_ns_status
== "DEGRADED" and not is_degraded
:
340 db_dict
["nsState"] = "READY"
343 self
.update_db_2("nsrs", nsr_id
, db_dict
)
345 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
347 except Exception as e
:
348 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
350 async def _on_update_k8s_db(
351 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None, cluster_type
="juju"
354 Updating vca status in NSR record
355 :param cluster_uuid: UUID of a k8s cluster
356 :param kdu_instance: The unique name of the KDU instance
357 :param filter: To get nsr_id
358 :cluster_type: The cluster type (juju, k8s)
362 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
363 # .format(cluster_uuid, kdu_instance, filter))
365 nsr_id
= filter.get("_id")
367 vca_status
= await self
.k8scluster_map
[cluster_type
].status_kdu(
368 cluster_uuid
=cluster_uuid
,
369 kdu_instance
=kdu_instance
,
371 complete_status
=True,
377 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
379 if cluster_type
in ("juju-bundle", "juju"):
380 # TODO -> this should be done in a more uniform way, I think in N2VC, in order to update the K8s VCA
381 # status in a similar way between Juju Bundles and Helm Charts on this side
382 await self
.k8sclusterjuju
.update_vca_status(
383 db_dict
["vcaStatus"],
389 f
"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
393 self
.update_db_2("nsrs", nsr_id
, db_dict
)
394 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
396 except Exception as e
:
397 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
400 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
402 env
= Environment(undefined
=StrictUndefined
)
403 template
= env
.from_string(cloud_init_text
)
404 return template
.render(additional_params
or {})
405 except UndefinedError
as e
:
407 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
408 "file, must be provided in the instantiation parameters inside the "
409 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
411 except (TemplateError
, TemplateNotFound
) as e
:
413 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
418 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
419 cloud_init_content
= cloud_init_file
= None
421 if vdu
.get("cloud-init-file"):
422 base_folder
= vnfd
["_admin"]["storage"]
423 if base_folder
["pkg-dir"]:
424 cloud_init_file
= "{}/{}/cloud_init/{}".format(
425 base_folder
["folder"],
426 base_folder
["pkg-dir"],
427 vdu
["cloud-init-file"],
430 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
431 base_folder
["folder"],
432 vdu
["cloud-init-file"],
434 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
435 cloud_init_content
= ci_file
.read()
436 elif vdu
.get("cloud-init"):
437 cloud_init_content
= vdu
["cloud-init"]
439 return cloud_init_content
440 except FsException
as e
:
442 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
443 vnfd
["id"], vdu
["id"], cloud_init_file
, e
447 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
449 (vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]),
452 additional_params
= vdur
.get("additionalParams")
453 return parse_yaml_strings(additional_params
)
455 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
457 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
458 :param vnfd: input vnfd
459 :param new_id: overrides vnf id if provided
460 :param additionalParams: Instantiation params for VNFs provided
461 :param nsrId: Id of the NSR
462 :return: copy of vnfd
464 vnfd_RO
= deepcopy(vnfd
)
465 # remove unused by RO configuration, monitoring, scaling and internal keys
466 vnfd_RO
.pop("_id", None)
467 vnfd_RO
.pop("_admin", None)
468 vnfd_RO
.pop("monitoring-param", None)
469 vnfd_RO
.pop("scaling-group-descriptor", None)
470 vnfd_RO
.pop("kdu", None)
471 vnfd_RO
.pop("k8s-cluster", None)
473 vnfd_RO
["id"] = new_id
475 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
476 for vdu
in get_iterable(vnfd_RO
, "vdu"):
477 vdu
.pop("cloud-init-file", None)
478 vdu
.pop("cloud-init", None)
482 def ip_profile_2_RO(ip_profile
):
483 RO_ip_profile
= deepcopy(ip_profile
)
484 if "dns-server" in RO_ip_profile
:
485 if isinstance(RO_ip_profile
["dns-server"], list):
486 RO_ip_profile
["dns-address"] = []
487 for ds
in RO_ip_profile
.pop("dns-server"):
488 RO_ip_profile
["dns-address"].append(ds
["address"])
490 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
491 if RO_ip_profile
.get("ip-version") == "ipv4":
492 RO_ip_profile
["ip-version"] = "IPv4"
493 if RO_ip_profile
.get("ip-version") == "ipv6":
494 RO_ip_profile
["ip-version"] = "IPv6"
495 if "dhcp-params" in RO_ip_profile
:
496 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
499 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
500 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
501 if db_vim
["_admin"]["operationalState"] != "ENABLED":
503 "VIM={} is not available. operationalState={}".format(
504 vim_account
, db_vim
["_admin"]["operationalState"]
507 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
510 def get_ro_wim_id_for_wim_account(self
, wim_account
):
511 if isinstance(wim_account
, str):
512 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
513 if db_wim
["_admin"]["operationalState"] != "ENABLED":
515 "WIM={} is not available. operationalState={}".format(
516 wim_account
, db_wim
["_admin"]["operationalState"]
519 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
524 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
526 db_vdu_push_list
= []
528 db_update
= {"_admin.modified": time()}
530 for vdu_id
, vdu_count
in vdu_create
.items():
534 for vdur
in reversed(db_vnfr
["vdur"])
535 if vdur
["vdu-id-ref"] == vdu_id
540 # Read the template saved in the db:
541 self
.logger
.debug(f
"No vdur in the database. Using the vdur-template to scale")
542 vdur_template
= db_vnfr
.get("vdur-template")
543 if not vdur_template
:
545 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
549 vdur
= vdur_template
[0]
550 #Delete a template from the database after using it
551 self
.db
.set_one("vnfrs",
552 {"_id": db_vnfr
["_id"]},
554 pull
={"vdur-template": {"_id": vdur
['_id']}}
556 for count
in range(vdu_count
):
557 vdur_copy
= deepcopy(vdur
)
558 vdur_copy
["status"] = "BUILD"
559 vdur_copy
["status-detailed"] = None
560 vdur_copy
["ip-address"] = None
561 vdur_copy
["_id"] = str(uuid4())
562 vdur_copy
["count-index"] += count
+ 1
563 vdur_copy
["id"] = "{}-{}".format(
564 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
566 vdur_copy
.pop("vim_info", None)
567 for iface
in vdur_copy
["interfaces"]:
568 if iface
.get("fixed-ip"):
569 iface
["ip-address"] = self
.increment_ip_mac(
570 iface
["ip-address"], count
+ 1
573 iface
.pop("ip-address", None)
574 if iface
.get("fixed-mac"):
575 iface
["mac-address"] = self
.increment_ip_mac(
576 iface
["mac-address"], count
+ 1
579 iface
.pop("mac-address", None)
583 ) # only first vdu can be managment of vnf
584 db_vdu_push_list
.append(vdur_copy
)
585 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
587 if len(db_vnfr
["vdur"]) == 1:
588 # The scale will move to 0 instances
589 self
.logger
.debug(f
"Scaling to 0 !, creating the template with the last vdur")
590 template_vdur
= [db_vnfr
["vdur"][0]]
591 for vdu_id
, vdu_count
in vdu_delete
.items():
593 indexes_to_delete
= [
595 for iv
in enumerate(db_vnfr
["vdur"])
596 if iv
[1]["vdu-id-ref"] == vdu_id
600 "vdur.{}.status".format(i
): "DELETING"
601 for i
in indexes_to_delete
[-vdu_count
:]
605 # it must be deleted one by one because common.db does not allow otherwise
608 for v
in reversed(db_vnfr
["vdur"])
609 if v
["vdu-id-ref"] == vdu_id
611 for vdu
in vdus_to_delete
[:vdu_count
]:
614 {"_id": db_vnfr
["_id"]},
616 pull
={"vdur": {"_id": vdu
["_id"]}},
620 db_push
["vdur"] = db_vdu_push_list
622 db_push
["vdur-template"] = template_vdur
625 db_vnfr
["vdur-template"] = template_vdur
626 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
627 # modify passed dictionary db_vnfr
628 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
629 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
631 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
633 Updates database nsr with the RO info for the created vld
634 :param ns_update_nsr: dictionary to be filled with the updated info
635 :param db_nsr: content of db_nsr. This is also modified
636 :param nsr_desc_RO: nsr descriptor from RO
637 :return: Nothing, LcmException is raised on errors
640 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
641 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
642 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
644 vld
["vim-id"] = net_RO
.get("vim_net_id")
645 vld
["name"] = net_RO
.get("vim_name")
646 vld
["status"] = net_RO
.get("status")
647 vld
["status-detailed"] = net_RO
.get("error_msg")
648 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
652 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
655 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
657 for db_vnfr
in db_vnfrs
.values():
658 vnfr_update
= {"status": "ERROR"}
659 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
660 if "status" not in vdur
:
661 vdur
["status"] = "ERROR"
662 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
664 vdur
["status-detailed"] = str(error_text
)
666 "vdur.{}.status-detailed".format(vdu_index
)
668 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
669 except DbException
as e
:
670 self
.logger
.error("Cannot update vnf. {}".format(e
))
672 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
674 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
675 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
676 :param nsr_desc_RO: nsr descriptor from RO
677 :return: Nothing, LcmException is raised on errors
679 for vnf_index
, db_vnfr
in db_vnfrs
.items():
680 for vnf_RO
in nsr_desc_RO
["vnfs"]:
681 if vnf_RO
["member_vnf_index"] != vnf_index
:
684 if vnf_RO
.get("ip_address"):
685 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
[
688 elif not db_vnfr
.get("ip-address"):
689 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
690 raise LcmExceptionNoMgmtIP(
691 "ns member_vnf_index '{}' has no IP address".format(
696 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
697 vdur_RO_count_index
= 0
698 if vdur
.get("pdu-type"):
700 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
701 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
703 if vdur
["count-index"] != vdur_RO_count_index
:
704 vdur_RO_count_index
+= 1
706 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
707 if vdur_RO
.get("ip_address"):
708 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
710 vdur
["ip-address"] = None
711 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
712 vdur
["name"] = vdur_RO
.get("vim_name")
713 vdur
["status"] = vdur_RO
.get("status")
714 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
715 for ifacer
in get_iterable(vdur
, "interfaces"):
716 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
717 if ifacer
["name"] == interface_RO
.get("internal_name"):
718 ifacer
["ip-address"] = interface_RO
.get(
721 ifacer
["mac-address"] = interface_RO
.get(
727 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
728 "from VIM info".format(
729 vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]
732 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
736 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
738 vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]
742 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
743 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
744 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
746 vld
["vim-id"] = net_RO
.get("vim_net_id")
747 vld
["name"] = net_RO
.get("vim_name")
748 vld
["status"] = net_RO
.get("status")
749 vld
["status-detailed"] = net_RO
.get("error_msg")
750 vnfr_update
["vld.{}".format(vld_index
)] = vld
754 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
759 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
764 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
769 def _get_ns_config_info(self
, nsr_id
):
771 Generates a mapping between vnf,vdu elements and the N2VC id
772 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
773 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
774 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
775 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
777 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
778 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
780 ns_config_info
= {"osm-config-mapping": mapping
}
781 for vca
in vca_deployed_list
:
782 if not vca
["member-vnf-index"]:
784 if not vca
["vdu_id"]:
785 mapping
[vca
["member-vnf-index"]] = vca
["application"]
789 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
791 ] = vca
["application"]
792 return ns_config_info
794 async def _instantiate_ng_ro(
811 def get_vim_account(vim_account_id
):
813 if vim_account_id
in db_vims
:
814 return db_vims
[vim_account_id
]
815 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
816 db_vims
[vim_account_id
] = db_vim
819 # modify target_vld info with instantiation parameters
820 def parse_vld_instantiation_params(
821 target_vim
, target_vld
, vld_params
, target_sdn
823 if vld_params
.get("ip-profile"):
824 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
[
827 if vld_params
.get("provider-network"):
828 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
831 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
832 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
835 if vld_params
.get("wimAccountId"):
836 target_wim
= "wim:{}".format(vld_params
["wimAccountId"])
837 target_vld
["vim_info"][target_wim
] = {}
838 for param
in ("vim-network-name", "vim-network-id"):
839 if vld_params
.get(param
):
840 if isinstance(vld_params
[param
], dict):
841 for vim
, vim_net
in vld_params
[param
].items():
842 other_target_vim
= "vim:" + vim
844 target_vld
["vim_info"],
845 (other_target_vim
, param
.replace("-", "_")),
848 else: # isinstance str
849 target_vld
["vim_info"][target_vim
][
850 param
.replace("-", "_")
851 ] = vld_params
[param
]
852 if vld_params
.get("common_id"):
853 target_vld
["common_id"] = vld_params
.get("common_id")
855 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
856 def update_ns_vld_target(target
, ns_params
):
857 for vnf_params
in ns_params
.get("vnf", ()):
858 if vnf_params
.get("vimAccountId"):
862 for vnfr
in db_vnfrs
.values()
863 if vnf_params
["member-vnf-index"]
864 == vnfr
["member-vnf-index-ref"]
868 vdur
= next((vdur
for vdur
in target_vnf
.get("vdur", ())), None)
869 for a_index
, a_vld
in enumerate(target
["ns"]["vld"]):
870 target_vld
= find_in_list(
871 get_iterable(vdur
, "interfaces"),
872 lambda iface
: iface
.get("ns-vld-id") == a_vld
["name"],
875 if vnf_params
.get("vimAccountId") not in a_vld
.get(
878 target
["ns"]["vld"][a_index
].get("vim_info").update(
880 "vim:{}".format(vnf_params
["vimAccountId"]): {
881 "vim_network_name": ""
886 nslcmop_id
= db_nslcmop
["_id"]
888 "name": db_nsr
["name"],
891 "image": deepcopy(db_nsr
["image"]),
892 "flavor": deepcopy(db_nsr
["flavor"]),
893 "action_id": nslcmop_id
,
894 "cloud_init_content": {},
896 for image
in target
["image"]:
897 image
["vim_info"] = {}
898 for flavor
in target
["flavor"]:
899 flavor
["vim_info"] = {}
900 if db_nsr
.get("affinity-or-anti-affinity-group"):
901 target
["affinity-or-anti-affinity-group"] = deepcopy(db_nsr
["affinity-or-anti-affinity-group"])
902 for affinity_or_anti_affinity_group
in target
["affinity-or-anti-affinity-group"]:
903 affinity_or_anti_affinity_group
["vim_info"] = {}
905 if db_nslcmop
.get("lcmOperationType") != "instantiate":
906 # get parameters of instantiation:
907 db_nslcmop_instantiate
= self
.db
.get_list(
910 "nsInstanceId": db_nslcmop
["nsInstanceId"],
911 "lcmOperationType": "instantiate",
914 ns_params
= db_nslcmop_instantiate
.get("operationParams")
916 ns_params
= db_nslcmop
.get("operationParams")
917 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
918 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
921 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
922 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
926 "mgmt-network": vld
.get("mgmt-network", False),
927 "type": vld
.get("type"),
930 "vim_network_name": vld
.get("vim-network-name"),
931 "vim_account_id": ns_params
["vimAccountId"],
935 # check if this network needs SDN assist
936 if vld
.get("pci-interfaces"):
937 db_vim
= get_vim_account(ns_params
["vimAccountId"])
938 sdnc_id
= db_vim
["config"].get("sdn-controller")
940 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
941 target_sdn
= "sdn:{}".format(sdnc_id
)
942 target_vld
["vim_info"][target_sdn
] = {
944 "target_vim": target_vim
,
946 "type": vld
.get("type"),
949 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
950 for nsd_vnf_profile
in nsd_vnf_profiles
:
951 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
952 if cp
["virtual-link-profile-id"] == vld
["id"]:
954 "member_vnf:{}.{}".format(
955 cp
["constituent-cpd-id"][0][
956 "constituent-base-element-id"
958 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
960 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
962 # check at nsd descriptor, if there is an ip-profile
964 nsd_vlp
= find_in_list(
965 get_virtual_link_profiles(nsd
),
966 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
971 and nsd_vlp
.get("virtual-link-protocol-data")
972 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
974 ip_profile_source_data
= nsd_vlp
["virtual-link-protocol-data"][
977 ip_profile_dest_data
= {}
978 if "ip-version" in ip_profile_source_data
:
979 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
982 if "cidr" in ip_profile_source_data
:
983 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
986 if "gateway-ip" in ip_profile_source_data
:
987 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
[
990 if "dhcp-enabled" in ip_profile_source_data
:
991 ip_profile_dest_data
["dhcp-params"] = {
992 "enabled": ip_profile_source_data
["dhcp-enabled"]
994 vld_params
["ip-profile"] = ip_profile_dest_data
996 # update vld_params with instantiation params
997 vld_instantiation_params
= find_in_list(
998 get_iterable(ns_params
, "vld"),
999 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
1001 if vld_instantiation_params
:
1002 vld_params
.update(vld_instantiation_params
)
1003 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
1004 target
["ns"]["vld"].append(target_vld
)
1005 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1006 update_ns_vld_target(target
, ns_params
)
1008 for vnfr
in db_vnfrs
.values():
1009 vnfd
= find_in_list(
1010 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
1012 vnf_params
= find_in_list(
1013 get_iterable(ns_params
, "vnf"),
1014 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
1016 target_vnf
= deepcopy(vnfr
)
1017 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
1018 for vld
in target_vnf
.get("vld", ()):
1019 # check if connected to a ns.vld, to fill target'
1020 vnf_cp
= find_in_list(
1021 vnfd
.get("int-virtual-link-desc", ()),
1022 lambda cpd
: cpd
.get("id") == vld
["id"],
1025 ns_cp
= "member_vnf:{}.{}".format(
1026 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
1028 if cp2target
.get(ns_cp
):
1029 vld
["target"] = cp2target
[ns_cp
]
1032 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
1034 # check if this network needs SDN assist
1036 if vld
.get("pci-interfaces"):
1037 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1038 sdnc_id
= db_vim
["config"].get("sdn-controller")
1040 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
1041 target_sdn
= "sdn:{}".format(sdnc_id
)
1042 vld
["vim_info"][target_sdn
] = {
1044 "target_vim": target_vim
,
1046 "type": vld
.get("type"),
1049 # check at vnfd descriptor, if there is an ip-profile
1051 vnfd_vlp
= find_in_list(
1052 get_virtual_link_profiles(vnfd
),
1053 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
1057 and vnfd_vlp
.get("virtual-link-protocol-data")
1058 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1060 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"][
1063 ip_profile_dest_data
= {}
1064 if "ip-version" in ip_profile_source_data
:
1065 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1068 if "cidr" in ip_profile_source_data
:
1069 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1072 if "gateway-ip" in ip_profile_source_data
:
1073 ip_profile_dest_data
[
1075 ] = ip_profile_source_data
["gateway-ip"]
1076 if "dhcp-enabled" in ip_profile_source_data
:
1077 ip_profile_dest_data
["dhcp-params"] = {
1078 "enabled": ip_profile_source_data
["dhcp-enabled"]
1081 vld_params
["ip-profile"] = ip_profile_dest_data
1082 # update vld_params with instantiation params
1084 vld_instantiation_params
= find_in_list(
1085 get_iterable(vnf_params
, "internal-vld"),
1086 lambda i_vld
: i_vld
["name"] == vld
["id"],
1088 if vld_instantiation_params
:
1089 vld_params
.update(vld_instantiation_params
)
1090 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
1093 for vdur
in target_vnf
.get("vdur", ()):
1094 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
1095 continue # This vdu must not be created
1096 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
1098 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
1101 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
1102 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
1105 and vdu_configuration
.get("config-access")
1106 and vdu_configuration
.get("config-access").get("ssh-access")
1108 vdur
["ssh-keys"] = ssh_keys_all
1109 vdur
["ssh-access-required"] = vdu_configuration
[
1111 ]["ssh-access"]["required"]
1114 and vnf_configuration
.get("config-access")
1115 and vnf_configuration
.get("config-access").get("ssh-access")
1116 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
1118 vdur
["ssh-keys"] = ssh_keys_all
1119 vdur
["ssh-access-required"] = vnf_configuration
[
1121 ]["ssh-access"]["required"]
1122 elif ssh_keys_instantiation
and find_in_list(
1123 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
1125 vdur
["ssh-keys"] = ssh_keys_instantiation
1127 self
.logger
.debug("NS > vdur > {}".format(vdur
))
1129 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1131 if vdud
.get("cloud-init-file"):
1132 vdur
["cloud-init"] = "{}:file:{}".format(
1133 vnfd
["_id"], vdud
.get("cloud-init-file")
1135 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1136 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1137 base_folder
= vnfd
["_admin"]["storage"]
1138 if base_folder
["pkg-dir"]:
1139 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1140 base_folder
["folder"],
1141 base_folder
["pkg-dir"],
1142 vdud
.get("cloud-init-file"),
1145 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
1146 base_folder
["folder"],
1147 vdud
.get("cloud-init-file"),
1149 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1150 target
["cloud_init_content"][
1153 elif vdud
.get("cloud-init"):
1154 vdur
["cloud-init"] = "{}:vdu:{}".format(
1155 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1157 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1158 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1161 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1162 deploy_params_vdu
= self
._format
_additional
_params
(
1163 vdur
.get("additionalParams") or {}
1165 deploy_params_vdu
["OSM"] = get_osm_params(
1166 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1168 vdur
["additionalParams"] = deploy_params_vdu
1171 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1172 if target_vim
not in ns_flavor
["vim_info"]:
1173 ns_flavor
["vim_info"][target_vim
] = {}
1176 # in case alternative images are provided we must check if they should be applied
1177 # for the vim_type, modify the vim_type taking into account
1178 ns_image_id
= int(vdur
["ns-image-id"])
1179 if vdur
.get("alt-image-ids"):
1180 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1181 vim_type
= db_vim
["vim_type"]
1182 for alt_image_id
in vdur
.get("alt-image-ids"):
1183 ns_alt_image
= target
["image"][int(alt_image_id
)]
1184 if vim_type
== ns_alt_image
.get("vim-type"):
1185 # must use alternative image
1187 "use alternative image id: {}".format(alt_image_id
)
1189 ns_image_id
= alt_image_id
1190 vdur
["ns-image-id"] = ns_image_id
1192 ns_image
= target
["image"][int(ns_image_id
)]
1193 if target_vim
not in ns_image
["vim_info"]:
1194 ns_image
["vim_info"][target_vim
] = {}
1197 if vdur
.get("affinity-or-anti-affinity-group-id"):
1198 for ags_id
in vdur
["affinity-or-anti-affinity-group-id"]:
1199 ns_ags
= target
["affinity-or-anti-affinity-group"][int(ags_id
)]
1200 if target_vim
not in ns_ags
["vim_info"]:
1201 ns_ags
["vim_info"][target_vim
] = {}
1203 vdur
["vim_info"] = {target_vim
: {}}
1204 # instantiation parameters
1206 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
1207 # vdud["id"]), None)
1208 vdur_list
.append(vdur
)
1209 target_vnf
["vdur"] = vdur_list
1210 target
["vnf"].append(target_vnf
)
1212 desc
= await self
.RO
.deploy(nsr_id
, target
)
1213 self
.logger
.debug("RO return > {}".format(desc
))
1214 action_id
= desc
["action_id"]
1215 await self
._wait
_ng
_ro
(
1216 nsr_id
, action_id
, nslcmop_id
, start_deploy
, timeout_ns_deploy
, stage
1221 "_admin.deployed.RO.operational-status": "running",
1222 "detailed-status": " ".join(stage
),
1224 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1225 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1226 self
._write
_op
_status
(nslcmop_id
, stage
)
1228 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1232 async def _wait_ng_ro(
1241 detailed_status_old
= None
1243 start_time
= start_time
or time()
1244 while time() <= start_time
+ timeout
:
1245 desc_status
= await self
.RO
.status(nsr_id
, action_id
)
1246 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1247 if desc_status
["status"] == "FAILED":
1248 raise NgRoException(desc_status
["details"])
1249 elif desc_status
["status"] == "BUILD":
1251 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1252 elif desc_status
["status"] == "DONE":
1254 stage
[2] = "Deployed at VIM"
1257 assert False, "ROclient.check_ns_status returns unknown {}".format(
1258 desc_status
["status"]
1260 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1261 detailed_status_old
= stage
[2]
1262 db_nsr_update
["detailed-status"] = " ".join(stage
)
1263 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1264 self
._write
_op
_status
(nslcmop_id
, stage
)
1265 await asyncio
.sleep(15, loop
=self
.loop
)
1266 else: # timeout_ns_deploy
1267 raise NgRoException("Timeout waiting ns to deploy")
1269 async def _terminate_ng_ro(
1270 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1275 start_deploy
= time()
1282 "action_id": nslcmop_id
,
1284 desc
= await self
.RO
.deploy(nsr_id
, target
)
1285 action_id
= desc
["action_id"]
1286 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1287 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1290 + "ns terminate action at RO. action_id={}".format(action_id
)
1294 delete_timeout
= 20 * 60 # 20 minutes
1295 await self
._wait
_ng
_ro
(
1296 nsr_id
, action_id
, nslcmop_id
, start_deploy
, delete_timeout
, stage
1299 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1300 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1302 await self
.RO
.delete(nsr_id
)
1303 except Exception as e
:
1304 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
1305 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1306 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1307 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1309 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1311 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
1312 failed_detail
.append("delete conflict: {}".format(e
))
1315 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1318 failed_detail
.append("delete error: {}".format(e
))
1321 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1325 stage
[2] = "Error deleting from VIM"
1327 stage
[2] = "Deleted from VIM"
1328 db_nsr_update
["detailed-status"] = " ".join(stage
)
1329 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1330 self
._write
_op
_status
(nslcmop_id
, stage
)
1333 raise LcmException("; ".join(failed_detail
))
1336 async def instantiate_RO(
1350 :param logging_text: preffix text to use at logging
1351 :param nsr_id: nsr identity
1352 :param nsd: database content of ns descriptor
1353 :param db_nsr: database content of ns record
1354 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1356 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1357 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1358 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1359 :return: None or exception
1362 start_deploy
= time()
1363 ns_params
= db_nslcmop
.get("operationParams")
1364 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1365 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1367 timeout_ns_deploy
= self
.timeout
.get(
1368 "ns_deploy", self
.timeout_ns_deploy
1371 # Check for and optionally request placement optimization. Database will be updated if placement activated
1372 stage
[2] = "Waiting for Placement."
1373 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1374 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1375 for vnfr
in db_vnfrs
.values():
1376 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1379 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1381 return await self
._instantiate
_ng
_ro
(
1394 except Exception as e
:
1395 stage
[2] = "ERROR deploying at VIM"
1396 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1398 "Error deploying at VIM {}".format(e
),
1399 exc_info
=not isinstance(
1402 ROclient
.ROClientException
,
1411 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1413 Wait for kdu to be up, get ip address
1414 :param logging_text: prefix use for logging
1421 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1424 while nb_tries
< 360:
1425 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1429 for x
in get_iterable(db_vnfr
, "kdur")
1430 if x
.get("kdu-name") == kdu_name
1436 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1438 if kdur
.get("status"):
1439 if kdur
["status"] in ("READY", "ENABLED"):
1440 return kdur
.get("ip-address")
1443 "target KDU={} is in error state".format(kdu_name
)
1446 await asyncio
.sleep(10, loop
=self
.loop
)
1448 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1450 async def wait_vm_up_insert_key_ro(
1451 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1454 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1455 :param logging_text: prefix use for logging
1460 :param pub_key: public ssh key to inject, None to skip
1461 :param user: user to apply the public ssh key
1465 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1469 target_vdu_id
= None
1475 if ro_retries
>= 360: # 1 hour
1477 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1480 await asyncio
.sleep(10, loop
=self
.loop
)
1483 if not target_vdu_id
:
1484 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1486 if not vdu_id
: # for the VNF case
1487 if db_vnfr
.get("status") == "ERROR":
1489 "Cannot inject ssh-key because target VNF is in error state"
1491 ip_address
= db_vnfr
.get("ip-address")
1497 for x
in get_iterable(db_vnfr
, "vdur")
1498 if x
.get("ip-address") == ip_address
1506 for x
in get_iterable(db_vnfr
, "vdur")
1507 if x
.get("vdu-id-ref") == vdu_id
1508 and x
.get("count-index") == vdu_index
1514 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1515 ): # If only one, this should be the target vdu
1516 vdur
= db_vnfr
["vdur"][0]
1519 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1520 vnfr_id
, vdu_id
, vdu_index
1523 # New generation RO stores information at "vim_info"
1526 if vdur
.get("vim_info"):
1528 t
for t
in vdur
["vim_info"]
1529 ) # there should be only one key
1530 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1532 vdur
.get("pdu-type")
1533 or vdur
.get("status") == "ACTIVE"
1534 or ng_ro_status
== "ACTIVE"
1536 ip_address
= vdur
.get("ip-address")
1539 target_vdu_id
= vdur
["vdu-id-ref"]
1540 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1542 "Cannot inject ssh-key because target VM is in error state"
1545 if not target_vdu_id
:
1548 # inject public key into machine
1549 if pub_key
and user
:
1550 self
.logger
.debug(logging_text
+ "Inserting RO key")
1551 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1552 if vdur
.get("pdu-type"):
1553 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1556 ro_vm_id
= "{}-{}".format(
1557 db_vnfr
["member-vnf-index-ref"], target_vdu_id
1558 ) # TODO add vdu_index
1562 "action": "inject_ssh_key",
1566 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1568 desc
= await self
.RO
.deploy(nsr_id
, target
)
1569 action_id
= desc
["action_id"]
1570 await self
._wait
_ng
_ro
(nsr_id
, action_id
, timeout
=600)
1573 # wait until NS is deployed at RO
1575 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1576 ro_nsr_id
= deep_get(
1577 db_nsrs
, ("_admin", "deployed", "RO", "nsr_id")
1581 result_dict
= await self
.RO
.create_action(
1583 item_id_name
=ro_nsr_id
,
1585 "add_public_key": pub_key
,
1590 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1591 if not result_dict
or not isinstance(result_dict
, dict):
1593 "Unknown response from RO when injecting key"
1595 for result
in result_dict
.values():
1596 if result
.get("vim_result") == 200:
1599 raise ROclient
.ROClientException(
1600 "error injecting key: {}".format(
1601 result
.get("description")
1605 except NgRoException
as e
:
1607 "Reaching max tries injecting key. Error: {}".format(e
)
1609 except ROclient
.ROClientException
as e
:
1613 + "error injecting key: {}. Retrying until {} seconds".format(
1620 "Reaching max tries injecting key. Error: {}".format(e
)
1627 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1629 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1631 my_vca
= vca_deployed_list
[vca_index
]
1632 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1633 # vdu or kdu: no dependencies
1637 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1638 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1639 configuration_status_list
= db_nsr
["configurationStatus"]
1640 for index
, vca_deployed
in enumerate(configuration_status_list
):
1641 if index
== vca_index
:
1644 if not my_vca
.get("member-vnf-index") or (
1645 vca_deployed
.get("member-vnf-index")
1646 == my_vca
.get("member-vnf-index")
1648 internal_status
= configuration_status_list
[index
].get("status")
1649 if internal_status
== "READY":
1651 elif internal_status
== "BROKEN":
1653 "Configuration aborted because dependent charm/s has failed"
1658 # no dependencies, return
1660 await asyncio
.sleep(10)
1663 raise LcmException("Configuration aborted because dependent charm/s timeout")
1665 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1668 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1670 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1671 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1674 async def instantiate_N2VC(
1691 ee_config_descriptor
,
1693 nsr_id
= db_nsr
["_id"]
1694 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1695 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1696 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1697 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1699 "collection": "nsrs",
1700 "filter": {"_id": nsr_id
},
1701 "path": db_update_entry
,
1707 element_under_configuration
= nsr_id
1711 vnfr_id
= db_vnfr
["_id"]
1712 osm_config
["osm"]["vnf_id"] = vnfr_id
1714 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1716 if vca_type
== "native_charm":
1719 index_number
= vdu_index
or 0
1722 element_type
= "VNF"
1723 element_under_configuration
= vnfr_id
1724 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1726 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1727 element_type
= "VDU"
1728 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1729 osm_config
["osm"]["vdu_id"] = vdu_id
1731 namespace
+= ".{}".format(kdu_name
)
1732 element_type
= "KDU"
1733 element_under_configuration
= kdu_name
1734 osm_config
["osm"]["kdu_name"] = kdu_name
1737 if base_folder
["pkg-dir"]:
1738 artifact_path
= "{}/{}/{}/{}".format(
1739 base_folder
["folder"],
1740 base_folder
["pkg-dir"],
1742 if vca_type
in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1747 artifact_path
= "{}/Scripts/{}/{}/".format(
1748 base_folder
["folder"],
1750 if vca_type
in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1755 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1757 # get initial_config_primitive_list that applies to this element
1758 initial_config_primitive_list
= config_descriptor
.get(
1759 "initial-config-primitive"
1763 "Initial config primitive list > {}".format(
1764 initial_config_primitive_list
1768 # add config if not present for NS charm
1769 ee_descriptor_id
= ee_config_descriptor
.get("id")
1770 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1771 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1772 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1776 "Initial config primitive list #2 > {}".format(
1777 initial_config_primitive_list
1780 # n2vc_redesign STEP 3.1
1781 # find old ee_id if exists
1782 ee_id
= vca_deployed
.get("ee_id")
1784 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1785 # create or register execution environment in VCA
1786 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1788 self
._write
_configuration
_status
(
1790 vca_index
=vca_index
,
1792 element_under_configuration
=element_under_configuration
,
1793 element_type
=element_type
,
1796 step
= "create execution environment"
1797 self
.logger
.debug(logging_text
+ step
)
1801 if vca_type
== "k8s_proxy_charm":
1802 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1803 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1804 namespace
=namespace
,
1805 artifact_path
=artifact_path
,
1809 elif vca_type
== "helm" or vca_type
== "helm-v3":
1810 ee_id
, credentials
= await self
.vca_map
[
1812 ].create_execution_environment(
1813 namespace
=namespace
,
1817 artifact_path
=artifact_path
,
1821 ee_id
, credentials
= await self
.vca_map
[
1823 ].create_execution_environment(
1824 namespace
=namespace
,
1830 elif vca_type
== "native_charm":
1831 step
= "Waiting to VM being up and getting IP address"
1832 self
.logger
.debug(logging_text
+ step
)
1833 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1842 credentials
= {"hostname": rw_mgmt_ip
}
1844 username
= deep_get(
1845 config_descriptor
, ("config-access", "ssh-access", "default-user")
1847 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1848 # merged. Meanwhile let's get username from initial-config-primitive
1849 if not username
and initial_config_primitive_list
:
1850 for config_primitive
in initial_config_primitive_list
:
1851 for param
in config_primitive
.get("parameter", ()):
1852 if param
["name"] == "ssh-username":
1853 username
= param
["value"]
1857 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1858 "'config-access.ssh-access.default-user'"
1860 credentials
["username"] = username
1861 # n2vc_redesign STEP 3.2
1863 self
._write
_configuration
_status
(
1865 vca_index
=vca_index
,
1866 status
="REGISTERING",
1867 element_under_configuration
=element_under_configuration
,
1868 element_type
=element_type
,
1871 step
= "register execution environment {}".format(credentials
)
1872 self
.logger
.debug(logging_text
+ step
)
1873 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1874 credentials
=credentials
,
1875 namespace
=namespace
,
1880 # for compatibility with MON/POL modules, the need model and application name at database
1881 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1882 ee_id_parts
= ee_id
.split(".")
1883 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1884 if len(ee_id_parts
) >= 2:
1885 model_name
= ee_id_parts
[0]
1886 application_name
= ee_id_parts
[1]
1887 db_nsr_update
[db_update_entry
+ "model"] = model_name
1888 db_nsr_update
[db_update_entry
+ "application"] = application_name
1890 # n2vc_redesign STEP 3.3
1891 step
= "Install configuration Software"
1893 self
._write
_configuration
_status
(
1895 vca_index
=vca_index
,
1896 status
="INSTALLING SW",
1897 element_under_configuration
=element_under_configuration
,
1898 element_type
=element_type
,
1899 other_update
=db_nsr_update
,
1902 # TODO check if already done
1903 self
.logger
.debug(logging_text
+ step
)
1905 if vca_type
== "native_charm":
1906 config_primitive
= next(
1907 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
1910 if config_primitive
:
1911 config
= self
._map
_primitive
_params
(
1912 config_primitive
, {}, deploy_params
1915 if vca_type
== "lxc_proxy_charm":
1916 if element_type
== "NS":
1917 num_units
= db_nsr
.get("config-units") or 1
1918 elif element_type
== "VNF":
1919 num_units
= db_vnfr
.get("config-units") or 1
1920 elif element_type
== "VDU":
1921 for v
in db_vnfr
["vdur"]:
1922 if vdu_id
== v
["vdu-id-ref"]:
1923 num_units
= v
.get("config-units") or 1
1925 if vca_type
!= "k8s_proxy_charm":
1926 await self
.vca_map
[vca_type
].install_configuration_sw(
1928 artifact_path
=artifact_path
,
1931 num_units
=num_units
,
1936 # write in db flag of configuration_sw already installed
1938 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
1941 # add relations for this VCA (wait for other peers related with this VCA)
1942 await self
._add
_vca
_relations
(
1943 logging_text
=logging_text
,
1946 vca_index
=vca_index
,
1949 # if SSH access is required, then get execution environment SSH public
1950 # if native charm we have waited already to VM be UP
1951 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1954 # self.logger.debug("get ssh key block")
1956 config_descriptor
, ("config-access", "ssh-access", "required")
1958 # self.logger.debug("ssh key needed")
1959 # Needed to inject a ssh key
1962 ("config-access", "ssh-access", "default-user"),
1964 step
= "Install configuration Software, getting public ssh key"
1965 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
1966 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
1969 step
= "Insert public key into VM user={} ssh_key={}".format(
1973 # self.logger.debug("no need to get ssh key")
1974 step
= "Waiting to VM being up and getting IP address"
1975 self
.logger
.debug(logging_text
+ step
)
1977 # n2vc_redesign STEP 5.1
1978 # wait for RO (ip-address) Insert pub_key into VM
1981 rw_mgmt_ip
= await self
.wait_kdu_up(
1982 logging_text
, nsr_id
, vnfr_id
, kdu_name
1985 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1995 rw_mgmt_ip
= None # This is for a NS configuration
1997 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
1999 # store rw_mgmt_ip in deploy params for later replacement
2000 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
2002 # n2vc_redesign STEP 6 Execute initial config primitive
2003 step
= "execute initial config primitive"
2005 # wait for dependent primitives execution (NS -> VNF -> VDU)
2006 if initial_config_primitive_list
:
2007 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
2009 # stage, in function of element type: vdu, kdu, vnf or ns
2010 my_vca
= vca_deployed_list
[vca_index
]
2011 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
2013 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
2014 elif my_vca
.get("member-vnf-index"):
2016 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
2019 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
2021 self
._write
_configuration
_status
(
2022 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
2025 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2027 check_if_terminated_needed
= True
2028 for initial_config_primitive
in initial_config_primitive_list
:
2029 # adding information on the vca_deployed if it is a NS execution environment
2030 if not vca_deployed
["member-vnf-index"]:
2031 deploy_params
["ns_config_info"] = json
.dumps(
2032 self
._get
_ns
_config
_info
(nsr_id
)
2034 # TODO check if already done
2035 primitive_params_
= self
._map
_primitive
_params
(
2036 initial_config_primitive
, {}, deploy_params
2039 step
= "execute primitive '{}' params '{}'".format(
2040 initial_config_primitive
["name"], primitive_params_
2042 self
.logger
.debug(logging_text
+ step
)
2043 await self
.vca_map
[vca_type
].exec_primitive(
2045 primitive_name
=initial_config_primitive
["name"],
2046 params_dict
=primitive_params_
,
2051 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2052 if check_if_terminated_needed
:
2053 if config_descriptor
.get("terminate-config-primitive"):
2055 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
2057 check_if_terminated_needed
= False
2059 # TODO register in database that primitive is done
2061 # STEP 7 Configure metrics
2062 if vca_type
== "helm" or vca_type
== "helm-v3":
2063 prometheus_jobs
= await self
.extract_prometheus_scrape_jobs(
2065 artifact_path
=artifact_path
,
2066 ee_config_descriptor
=ee_config_descriptor
,
2069 target_ip
=rw_mgmt_ip
,
2075 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
2078 for job
in prometheus_jobs
:
2082 "job_name": job
["job_name"]
2086 fail_on_empty
=False,
2089 step
= "instantiated at VCA"
2090 self
.logger
.debug(logging_text
+ step
)
2092 self
._write
_configuration
_status
(
2093 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
2096 except Exception as e
: # TODO not use Exception but N2VC exception
2097 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2099 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
2102 "Exception while {} : {}".format(step
, e
), exc_info
=True
2104 self
._write
_configuration
_status
(
2105 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2107 raise LcmException("{} {}".format(step
, e
)) from e
2109 def _write_ns_status(
2113 current_operation
: str,
2114 current_operation_id
: str,
2115 error_description
: str = None,
2116 error_detail
: str = None,
2117 other_update
: dict = None,
2120 Update db_nsr fields.
2123 :param current_operation:
2124 :param current_operation_id:
2125 :param error_description:
2126 :param error_detail:
2127 :param other_update: Other required changes at database if provided, will be cleared
2131 db_dict
= other_update
or {}
2134 ] = current_operation_id
# for backward compatibility
2135 db_dict
["_admin.current-operation"] = current_operation_id
2136 db_dict
["_admin.operation-type"] = (
2137 current_operation
if current_operation
!= "IDLE" else None
2139 db_dict
["currentOperation"] = current_operation
2140 db_dict
["currentOperationID"] = current_operation_id
2141 db_dict
["errorDescription"] = error_description
2142 db_dict
["errorDetail"] = error_detail
2145 db_dict
["nsState"] = ns_state
2146 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2147 except DbException
as e
:
2148 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2150 def _write_op_status(
2154 error_message
: str = None,
2155 queuePosition
: int = 0,
2156 operation_state
: str = None,
2157 other_update
: dict = None,
2160 db_dict
= other_update
or {}
2161 db_dict
["queuePosition"] = queuePosition
2162 if isinstance(stage
, list):
2163 db_dict
["stage"] = stage
[0]
2164 db_dict
["detailed-status"] = " ".join(stage
)
2165 elif stage
is not None:
2166 db_dict
["stage"] = str(stage
)
2168 if error_message
is not None:
2169 db_dict
["errorMessage"] = error_message
2170 if operation_state
is not None:
2171 db_dict
["operationState"] = operation_state
2172 db_dict
["statusEnteredTime"] = time()
2173 self
.update_db_2("nslcmops", op_id
, db_dict
)
2174 except DbException
as e
:
2176 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2179 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2181 nsr_id
= db_nsr
["_id"]
2182 # configurationStatus
2183 config_status
= db_nsr
.get("configurationStatus")
2186 "configurationStatus.{}.status".format(index
): status
2187 for index
, v
in enumerate(config_status
)
2191 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2193 except DbException
as e
:
2195 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2198 def _write_configuration_status(
2203 element_under_configuration
: str = None,
2204 element_type
: str = None,
2205 other_update
: dict = None,
2208 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2209 # .format(vca_index, status))
2212 db_path
= "configurationStatus.{}.".format(vca_index
)
2213 db_dict
= other_update
or {}
2215 db_dict
[db_path
+ "status"] = status
2216 if element_under_configuration
:
2218 db_path
+ "elementUnderConfiguration"
2219 ] = element_under_configuration
2221 db_dict
[db_path
+ "elementType"] = element_type
2222 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2223 except DbException
as e
:
2225 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2226 status
, nsr_id
, vca_index
, e
2230 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2232 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2233 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2234 Database is used because the result can be obtained from a different LCM worker in case of HA.
2235 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2236 :param db_nslcmop: database content of nslcmop
2237 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2238 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2239 computed 'vim-account-id'
2242 nslcmop_id
= db_nslcmop
["_id"]
2243 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2244 if placement_engine
== "PLA":
2246 logging_text
+ "Invoke and wait for placement optimization"
2248 await self
.msg
.aiowrite(
2249 "pla", "get_placement", {"nslcmopId": nslcmop_id
}, loop
=self
.loop
2251 db_poll_interval
= 5
2252 wait
= db_poll_interval
* 10
2254 while not pla_result
and wait
>= 0:
2255 await asyncio
.sleep(db_poll_interval
)
2256 wait
-= db_poll_interval
2257 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2258 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2262 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2265 for pla_vnf
in pla_result
["vnf"]:
2266 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2267 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2272 {"_id": vnfr
["_id"]},
2273 {"vim-account-id": pla_vnf
["vimAccountId"]},
2276 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2279 def update_nsrs_with_pla_result(self
, params
):
2281 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2283 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2285 except Exception as e
:
2286 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2288 async def instantiate(self
, nsr_id
, nslcmop_id
):
2291 :param nsr_id: ns instance to deploy
2292 :param nslcmop_id: operation to run
2296 # Try to lock HA task here
2297 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2298 if not task_is_locked_by_me
:
2300 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2304 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2305 self
.logger
.debug(logging_text
+ "Enter")
2307 # get all needed from database
2309 # database nsrs record
2312 # database nslcmops record
2315 # update operation on nsrs
2317 # update operation on nslcmops
2318 db_nslcmop_update
= {}
2320 nslcmop_operation_state
= None
2321 db_vnfrs
= {} # vnf's info indexed by member-index
2323 tasks_dict_info
= {} # from task to info text
2327 "Stage 1/5: preparation of the environment.",
2328 "Waiting for previous operations to terminate.",
2331 # ^ stage, step, VIM progress
2333 # wait for any previous tasks in process
2334 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2336 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2337 stage
[1] = "Reading from database."
2338 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2339 db_nsr_update
["detailed-status"] = "creating"
2340 db_nsr_update
["operational-status"] = "init"
2341 self
._write
_ns
_status
(
2343 ns_state
="BUILDING",
2344 current_operation
="INSTANTIATING",
2345 current_operation_id
=nslcmop_id
,
2346 other_update
=db_nsr_update
,
2348 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2350 # read from db: operation
2351 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2352 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2353 if db_nslcmop
["operationParams"].get("additionalParamsForVnf"):
2354 db_nslcmop
["operationParams"]["additionalParamsForVnf"] = json
.loads(
2355 db_nslcmop
["operationParams"]["additionalParamsForVnf"]
2357 ns_params
= db_nslcmop
.get("operationParams")
2358 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2359 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2361 timeout_ns_deploy
= self
.timeout
.get(
2362 "ns_deploy", self
.timeout_ns_deploy
2366 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2367 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2368 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2369 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2370 self
.fs
.sync(db_nsr
["nsd-id"])
2372 # nsr_name = db_nsr["name"] # TODO short-name??
2374 # read from db: vnf's of this ns
2375 stage
[1] = "Getting vnfrs from db."
2376 self
.logger
.debug(logging_text
+ stage
[1])
2377 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2379 # read from db: vnfd's for every vnf
2380 db_vnfds
= [] # every vnfd data
2382 # for each vnf in ns, read vnfd
2383 for vnfr
in db_vnfrs_list
:
2384 if vnfr
.get("kdur"):
2386 for kdur
in vnfr
["kdur"]:
2387 if kdur
.get("additionalParams"):
2388 kdur
["additionalParams"] = json
.loads(
2389 kdur
["additionalParams"]
2391 kdur_list
.append(kdur
)
2392 vnfr
["kdur"] = kdur_list
2394 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2395 vnfd_id
= vnfr
["vnfd-id"]
2396 vnfd_ref
= vnfr
["vnfd-ref"]
2397 self
.fs
.sync(vnfd_id
)
2399 # if we haven't this vnfd, read it from db
2400 if vnfd_id
not in db_vnfds
:
2402 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2405 self
.logger
.debug(logging_text
+ stage
[1])
2406 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2409 db_vnfds
.append(vnfd
)
2411 # Get or generates the _admin.deployed.VCA list
2412 vca_deployed_list
= None
2413 if db_nsr
["_admin"].get("deployed"):
2414 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2415 if vca_deployed_list
is None:
2416 vca_deployed_list
= []
2417 configuration_status_list
= []
2418 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2419 db_nsr_update
["configurationStatus"] = configuration_status_list
2420 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2421 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2422 elif isinstance(vca_deployed_list
, dict):
2423 # maintain backward compatibility. Change a dict to list at database
2424 vca_deployed_list
= list(vca_deployed_list
.values())
2425 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2426 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2429 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2431 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2432 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2434 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2435 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2436 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2438 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2441 # n2vc_redesign STEP 2 Deploy Network Scenario
2442 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2443 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2445 stage
[1] = "Deploying KDUs."
2446 # self.logger.debug(logging_text + "Before deploy_kdus")
2447 # Call to deploy_kdus in case exists the "vdu:kdu" param
2448 await self
.deploy_kdus(
2449 logging_text
=logging_text
,
2451 nslcmop_id
=nslcmop_id
,
2454 task_instantiation_info
=tasks_dict_info
,
2457 stage
[1] = "Getting VCA public key."
2458 # n2vc_redesign STEP 1 Get VCA public ssh-key
2459 # feature 1429. Add n2vc public key to needed VMs
2460 n2vc_key
= self
.n2vc
.get_public_key()
2461 n2vc_key_list
= [n2vc_key
]
2462 if self
.vca_config
.get("public_key"):
2463 n2vc_key_list
.append(self
.vca_config
["public_key"])
2465 stage
[1] = "Deploying NS at VIM."
2466 task_ro
= asyncio
.ensure_future(
2467 self
.instantiate_RO(
2468 logging_text
=logging_text
,
2472 db_nslcmop
=db_nslcmop
,
2475 n2vc_key_list
=n2vc_key_list
,
2479 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2480 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2482 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2483 stage
[1] = "Deploying Execution Environments."
2484 self
.logger
.debug(logging_text
+ stage
[1])
2486 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2487 for vnf_profile
in get_vnf_profiles(nsd
):
2488 vnfd_id
= vnf_profile
["vnfd-id"]
2489 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2490 member_vnf_index
= str(vnf_profile
["id"])
2491 db_vnfr
= db_vnfrs
[member_vnf_index
]
2492 base_folder
= vnfd
["_admin"]["storage"]
2498 # Get additional parameters
2499 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2500 if db_vnfr
.get("additionalParamsForVnf"):
2501 deploy_params
.update(
2502 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2505 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2506 if descriptor_config
:
2508 logging_text
=logging_text
2509 + "member_vnf_index={} ".format(member_vnf_index
),
2512 nslcmop_id
=nslcmop_id
,
2518 member_vnf_index
=member_vnf_index
,
2519 vdu_index
=vdu_index
,
2521 deploy_params
=deploy_params
,
2522 descriptor_config
=descriptor_config
,
2523 base_folder
=base_folder
,
2524 task_instantiation_info
=tasks_dict_info
,
2528 # Deploy charms for each VDU that supports one.
2529 for vdud
in get_vdu_list(vnfd
):
2531 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2532 vdur
= find_in_list(
2533 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2536 if vdur
.get("additionalParams"):
2537 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2539 deploy_params_vdu
= deploy_params
2540 deploy_params_vdu
["OSM"] = get_osm_params(
2541 db_vnfr
, vdu_id
, vdu_count_index
=0
2543 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2545 self
.logger
.debug("VDUD > {}".format(vdud
))
2547 "Descriptor config > {}".format(descriptor_config
)
2549 if descriptor_config
:
2552 for vdu_index
in range(vdud_count
):
2553 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2555 logging_text
=logging_text
2556 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2557 member_vnf_index
, vdu_id
, vdu_index
2561 nslcmop_id
=nslcmop_id
,
2567 member_vnf_index
=member_vnf_index
,
2568 vdu_index
=vdu_index
,
2570 deploy_params
=deploy_params_vdu
,
2571 descriptor_config
=descriptor_config
,
2572 base_folder
=base_folder
,
2573 task_instantiation_info
=tasks_dict_info
,
2576 for kdud
in get_kdu_list(vnfd
):
2577 kdu_name
= kdud
["name"]
2578 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2579 if descriptor_config
:
2584 x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
2586 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2587 if kdur
.get("additionalParams"):
2588 deploy_params_kdu
.update(
2589 parse_yaml_strings(kdur
["additionalParams"].copy())
2593 logging_text
=logging_text
,
2596 nslcmop_id
=nslcmop_id
,
2602 member_vnf_index
=member_vnf_index
,
2603 vdu_index
=vdu_index
,
2605 deploy_params
=deploy_params_kdu
,
2606 descriptor_config
=descriptor_config
,
2607 base_folder
=base_folder
,
2608 task_instantiation_info
=tasks_dict_info
,
2612 # Check if this NS has a charm configuration
2613 descriptor_config
= nsd
.get("ns-configuration")
2614 if descriptor_config
and descriptor_config
.get("juju"):
2617 member_vnf_index
= None
2623 # Get additional parameters
2624 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2625 if db_nsr
.get("additionalParamsForNs"):
2626 deploy_params
.update(
2627 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2629 base_folder
= nsd
["_admin"]["storage"]
2631 logging_text
=logging_text
,
2634 nslcmop_id
=nslcmop_id
,
2640 member_vnf_index
=member_vnf_index
,
2641 vdu_index
=vdu_index
,
2643 deploy_params
=deploy_params
,
2644 descriptor_config
=descriptor_config
,
2645 base_folder
=base_folder
,
2646 task_instantiation_info
=tasks_dict_info
,
2650 # rest of staff will be done at finally
2653 ROclient
.ROClientException
,
2659 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
2662 except asyncio
.CancelledError
:
2664 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
2666 exc
= "Operation was cancelled"
2667 except Exception as e
:
2668 exc
= traceback
.format_exc()
2669 self
.logger
.critical(
2670 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
2675 error_list
.append(str(exc
))
2677 # wait for pending tasks
2679 stage
[1] = "Waiting for instantiate pending tasks."
2680 self
.logger
.debug(logging_text
+ stage
[1])
2681 error_list
+= await self
._wait
_for
_tasks
(
2689 stage
[1] = stage
[2] = ""
2690 except asyncio
.CancelledError
:
2691 error_list
.append("Cancelled")
2692 # TODO cancel all tasks
2693 except Exception as exc
:
2694 error_list
.append(str(exc
))
2696 # update operation-status
2697 db_nsr_update
["operational-status"] = "running"
2698 # let's begin with VCA 'configured' status (later we can change it)
2699 db_nsr_update
["config-status"] = "configured"
2700 for task
, task_name
in tasks_dict_info
.items():
2701 if not task
.done() or task
.cancelled() or task
.exception():
2702 if task_name
.startswith(self
.task_name_deploy_vca
):
2703 # A N2VC task is pending
2704 db_nsr_update
["config-status"] = "failed"
2706 # RO or KDU task is pending
2707 db_nsr_update
["operational-status"] = "failed"
2709 # update status at database
2711 error_detail
= ". ".join(error_list
)
2712 self
.logger
.error(logging_text
+ error_detail
)
2713 error_description_nslcmop
= "{} Detail: {}".format(
2714 stage
[0], error_detail
2716 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
2717 nslcmop_id
, stage
[0]
2720 db_nsr_update
["detailed-status"] = (
2721 error_description_nsr
+ " Detail: " + error_detail
2723 db_nslcmop_update
["detailed-status"] = error_detail
2724 nslcmop_operation_state
= "FAILED"
2728 error_description_nsr
= error_description_nslcmop
= None
2730 db_nsr_update
["detailed-status"] = "Done"
2731 db_nslcmop_update
["detailed-status"] = "Done"
2732 nslcmop_operation_state
= "COMPLETED"
2735 self
._write
_ns
_status
(
2738 current_operation
="IDLE",
2739 current_operation_id
=None,
2740 error_description
=error_description_nsr
,
2741 error_detail
=error_detail
,
2742 other_update
=db_nsr_update
,
2744 self
._write
_op
_status
(
2747 error_message
=error_description_nslcmop
,
2748 operation_state
=nslcmop_operation_state
,
2749 other_update
=db_nslcmop_update
,
2752 if nslcmop_operation_state
:
2754 await self
.msg
.aiowrite(
2759 "nslcmop_id": nslcmop_id
,
2760 "operationState": nslcmop_operation_state
,
2764 except Exception as e
:
2766 logging_text
+ "kafka_write notification Exception {}".format(e
)
2769 self
.logger
.debug(logging_text
+ "Exit")
2770 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2772 def _get_vnfd(self
, vnfd_id
: str, cached_vnfds
: Dict
[str, Any
]):
2773 if vnfd_id
not in cached_vnfds
:
2774 cached_vnfds
[vnfd_id
] = self
.db
.get_one("vnfds", {"id": vnfd_id
})
2775 return cached_vnfds
[vnfd_id
]
2777 def _get_vnfr(self
, nsr_id
: str, vnf_profile_id
: str, cached_vnfrs
: Dict
[str, Any
]):
2778 if vnf_profile_id
not in cached_vnfrs
:
2779 cached_vnfrs
[vnf_profile_id
] = self
.db
.get_one(
2782 "member-vnf-index-ref": vnf_profile_id
,
2783 "nsr-id-ref": nsr_id
,
2786 return cached_vnfrs
[vnf_profile_id
]
2788 def _is_deployed_vca_in_relation(
2789 self
, vca
: DeployedVCA
, relation
: Relation
2792 for endpoint
in (relation
.provider
, relation
.requirer
):
2793 if endpoint
["kdu-resource-profile-id"]:
2796 vca
.vnf_profile_id
== endpoint
.vnf_profile_id
2797 and vca
.vdu_profile_id
== endpoint
.vdu_profile_id
2798 and vca
.execution_environment_ref
== endpoint
.execution_environment_ref
2804 def _update_ee_relation_data_with_implicit_data(
2805 self
, nsr_id
, nsd
, ee_relation_data
, cached_vnfds
, vnf_profile_id
: str = None
2807 ee_relation_data
= safe_get_ee_relation(
2808 nsr_id
, ee_relation_data
, vnf_profile_id
=vnf_profile_id
2810 ee_relation_level
= EELevel
.get_level(ee_relation_data
)
2811 if (ee_relation_level
in (EELevel
.VNF
, EELevel
.VDU
)) and not ee_relation_data
[
2812 "execution-environment-ref"
2814 vnf_profile
= get_vnf_profile(nsd
, ee_relation_data
["vnf-profile-id"])
2815 vnfd_id
= vnf_profile
["vnfd-id"]
2816 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2819 if ee_relation_level
== EELevel
.VNF
2820 else ee_relation_data
["vdu-profile-id"]
2822 ee
= get_juju_ee_ref(db_vnfd
, entity_id
)
2825 f
"not execution environments found for ee_relation {ee_relation_data}"
2827 ee_relation_data
["execution-environment-ref"] = ee
["id"]
2828 return ee_relation_data
2830 def _get_ns_relations(
2833 nsd
: Dict
[str, Any
],
2835 cached_vnfds
: Dict
[str, Any
],
2836 ) -> List
[Relation
]:
2838 db_ns_relations
= get_ns_configuration_relation_list(nsd
)
2839 for r
in db_ns_relations
:
2840 provider_dict
= None
2841 requirer_dict
= None
2842 if all(key
in r
for key
in ("provider", "requirer")):
2843 provider_dict
= r
["provider"]
2844 requirer_dict
= r
["requirer"]
2845 elif "entities" in r
:
2846 provider_id
= r
["entities"][0]["id"]
2849 "endpoint": r
["entities"][0]["endpoint"],
2851 if provider_id
!= nsd
["id"]:
2852 provider_dict
["vnf-profile-id"] = provider_id
2853 requirer_id
= r
["entities"][1]["id"]
2856 "endpoint": r
["entities"][1]["endpoint"],
2858 if requirer_id
!= nsd
["id"]:
2859 requirer_dict
["vnf-profile-id"] = requirer_id
2861 raise Exception("provider/requirer or entities must be included in the relation.")
2862 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2863 nsr_id
, nsd
, provider_dict
, cached_vnfds
2865 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2866 nsr_id
, nsd
, requirer_dict
, cached_vnfds
2868 provider
= EERelation(relation_provider
)
2869 requirer
= EERelation(relation_requirer
)
2870 relation
= Relation(r
["name"], provider
, requirer
)
2871 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
2873 relations
.append(relation
)
2876 def _get_vnf_relations(
2879 nsd
: Dict
[str, Any
],
2881 cached_vnfds
: Dict
[str, Any
],
2882 ) -> List
[Relation
]:
2884 vnf_profile
= get_vnf_profile(nsd
, vca
.vnf_profile_id
)
2885 vnf_profile_id
= vnf_profile
["id"]
2886 vnfd_id
= vnf_profile
["vnfd-id"]
2887 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2888 db_vnf_relations
= get_relation_list(db_vnfd
, vnfd_id
)
2889 for r
in db_vnf_relations
:
2890 provider_dict
= None
2891 requirer_dict
= None
2892 if all(key
in r
for key
in ("provider", "requirer")):
2893 provider_dict
= r
["provider"]
2894 requirer_dict
= r
["requirer"]
2895 elif "entities" in r
:
2896 provider_id
= r
["entities"][0]["id"]
2899 "vnf-profile-id": vnf_profile_id
,
2900 "endpoint": r
["entities"][0]["endpoint"],
2902 if provider_id
!= vnfd_id
:
2903 provider_dict
["vdu-profile-id"] = provider_id
2904 requirer_id
= r
["entities"][1]["id"]
2907 "vnf-profile-id": vnf_profile_id
,
2908 "endpoint": r
["entities"][1]["endpoint"],
2910 if requirer_id
!= vnfd_id
:
2911 requirer_dict
["vdu-profile-id"] = requirer_id
2913 raise Exception("provider/requirer or entities must be included in the relation.")
2914 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2915 nsr_id
, nsd
, provider_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
2917 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2918 nsr_id
, nsd
, requirer_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
2920 provider
= EERelation(relation_provider
)
2921 requirer
= EERelation(relation_requirer
)
2922 relation
= Relation(r
["name"], provider
, requirer
)
2923 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
2925 relations
.append(relation
)
2928 def _get_kdu_resource_data(
2930 ee_relation
: EERelation
,
2931 db_nsr
: Dict
[str, Any
],
2932 cached_vnfds
: Dict
[str, Any
],
2933 ) -> DeployedK8sResource
:
2934 nsd
= get_nsd(db_nsr
)
2935 vnf_profiles
= get_vnf_profiles(nsd
)
2936 vnfd_id
= find_in_list(
2938 lambda vnf_profile
: vnf_profile
["id"] == ee_relation
.vnf_profile_id
,
2940 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2941 kdu_resource_profile
= get_kdu_resource_profile(
2942 db_vnfd
, ee_relation
.kdu_resource_profile_id
2944 kdu_name
= kdu_resource_profile
["kdu-name"]
2945 deployed_kdu
, _
= get_deployed_kdu(
2946 db_nsr
.get("_admin", ()).get("deployed", ()),
2948 ee_relation
.vnf_profile_id
,
2950 deployed_kdu
.update({"resource-name": kdu_resource_profile
["resource-name"]})
2953 def _get_deployed_component(
2955 ee_relation
: EERelation
,
2956 db_nsr
: Dict
[str, Any
],
2957 cached_vnfds
: Dict
[str, Any
],
2958 ) -> DeployedComponent
:
2959 nsr_id
= db_nsr
["_id"]
2960 deployed_component
= None
2961 ee_level
= EELevel
.get_level(ee_relation
)
2962 if ee_level
== EELevel
.NS
:
2963 vca
= get_deployed_vca(db_nsr
, {"vdu_id": None, "member-vnf-index": None})
2965 deployed_component
= DeployedVCA(nsr_id
, vca
)
2966 elif ee_level
== EELevel
.VNF
:
2967 vca
= get_deployed_vca(
2971 "member-vnf-index": ee_relation
.vnf_profile_id
,
2972 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
2976 deployed_component
= DeployedVCA(nsr_id
, vca
)
2977 elif ee_level
== EELevel
.VDU
:
2978 vca
= get_deployed_vca(
2981 "vdu_id": ee_relation
.vdu_profile_id
,
2982 "member-vnf-index": ee_relation
.vnf_profile_id
,
2983 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
2987 deployed_component
= DeployedVCA(nsr_id
, vca
)
2988 elif ee_level
== EELevel
.KDU
:
2989 kdu_resource_data
= self
._get
_kdu
_resource
_data
(
2990 ee_relation
, db_nsr
, cached_vnfds
2992 if kdu_resource_data
:
2993 deployed_component
= DeployedK8sResource(kdu_resource_data
)
2994 return deployed_component
2996 async def _add_relation(
3000 db_nsr
: Dict
[str, Any
],
3001 cached_vnfds
: Dict
[str, Any
],
3002 cached_vnfrs
: Dict
[str, Any
],
3004 deployed_provider
= self
._get
_deployed
_component
(
3005 relation
.provider
, db_nsr
, cached_vnfds
3007 deployed_requirer
= self
._get
_deployed
_component
(
3008 relation
.requirer
, db_nsr
, cached_vnfds
3012 and deployed_requirer
3013 and deployed_provider
.config_sw_installed
3014 and deployed_requirer
.config_sw_installed
3016 provider_db_vnfr
= (
3018 relation
.provider
.nsr_id
,
3019 relation
.provider
.vnf_profile_id
,
3022 if relation
.provider
.vnf_profile_id
3025 requirer_db_vnfr
= (
3027 relation
.requirer
.nsr_id
,
3028 relation
.requirer
.vnf_profile_id
,
3031 if relation
.requirer
.vnf_profile_id
3034 provider_vca_id
= self
.get_vca_id(provider_db_vnfr
, db_nsr
)
3035 requirer_vca_id
= self
.get_vca_id(requirer_db_vnfr
, db_nsr
)
3036 provider_relation_endpoint
= RelationEndpoint(
3037 deployed_provider
.ee_id
,
3039 relation
.provider
.endpoint
,
3041 requirer_relation_endpoint
= RelationEndpoint(
3042 deployed_requirer
.ee_id
,
3044 relation
.requirer
.endpoint
,
3046 await self
.vca_map
[vca_type
].add_relation(
3047 provider
=provider_relation_endpoint
,
3048 requirer
=requirer_relation_endpoint
,
3050 # remove entry from relations list
3054 async def _add_vca_relations(
3060 timeout
: int = 3600,
3064 # 1. find all relations for this VCA
3065 # 2. wait for other peers related
3069 # STEP 1: find all relations for this VCA
3072 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3073 nsd
= get_nsd(db_nsr
)
3076 deployed_vca_dict
= get_deployed_vca_list(db_nsr
)[vca_index
]
3077 my_vca
= DeployedVCA(nsr_id
, deployed_vca_dict
)
3082 relations
.extend(self
._get
_ns
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3083 relations
.extend(self
._get
_vnf
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3085 # if no relations, terminate
3087 self
.logger
.debug(logging_text
+ " No relations")
3090 self
.logger
.debug(logging_text
+ " adding relations {}".format(relations
))
3097 if now
- start
>= timeout
:
3098 self
.logger
.error(logging_text
+ " : timeout adding relations")
3101 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3102 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3104 # for each relation, find the VCA's related
3105 for relation
in relations
.copy():
3106 added
= await self
._add
_relation
(
3114 relations
.remove(relation
)
3117 self
.logger
.debug("Relations added")
3119 await asyncio
.sleep(5.0)
3123 except Exception as e
:
3124 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
3127 async def _install_kdu(
3135 k8s_instance_info
: dict,
3136 k8params
: dict = None,
3142 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
3145 "collection": "nsrs",
3146 "filter": {"_id": nsr_id
},
3147 "path": nsr_db_path
,
3150 if k8s_instance_info
.get("kdu-deployment-name"):
3151 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
3153 kdu_instance
= self
.k8scluster_map
[
3155 ].generate_kdu_instance_name(
3156 db_dict
=db_dict_install
,
3157 kdu_model
=k8s_instance_info
["kdu-model"],
3158 kdu_name
=k8s_instance_info
["kdu-name"],
3161 "nsrs", nsr_id
, {nsr_db_path
+ ".kdu-instance": kdu_instance
}
3163 await self
.k8scluster_map
[k8sclustertype
].install(
3164 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3165 kdu_model
=k8s_instance_info
["kdu-model"],
3168 db_dict
=db_dict_install
,
3170 kdu_name
=k8s_instance_info
["kdu-name"],
3171 namespace
=k8s_instance_info
["namespace"],
3172 kdu_instance
=kdu_instance
,
3176 "nsrs", nsr_id
, {nsr_db_path
+ ".kdu-instance": kdu_instance
}
3179 # Obtain services to obtain management service ip
3180 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
3181 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3182 kdu_instance
=kdu_instance
,
3183 namespace
=k8s_instance_info
["namespace"],
3186 # Obtain management service info (if exists)
3187 vnfr_update_dict
= {}
3188 kdu_config
= get_configuration(vnfd
, kdud
["name"])
3190 target_ee_list
= kdu_config
.get("execution-environment-list", [])
3195 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
3198 for service
in kdud
.get("service", [])
3199 if service
.get("mgmt-service")
3201 for mgmt_service
in mgmt_services
:
3202 for service
in services
:
3203 if service
["name"].startswith(mgmt_service
["name"]):
3204 # Mgmt service found, Obtain service ip
3205 ip
= service
.get("external_ip", service
.get("cluster_ip"))
3206 if isinstance(ip
, list) and len(ip
) == 1:
3210 "kdur.{}.ip-address".format(kdu_index
)
3213 # Check if must update also mgmt ip at the vnf
3214 service_external_cp
= mgmt_service
.get(
3215 "external-connection-point-ref"
3217 if service_external_cp
:
3219 deep_get(vnfd
, ("mgmt-interface", "cp"))
3220 == service_external_cp
3222 vnfr_update_dict
["ip-address"] = ip
3227 "external-connection-point-ref", ""
3229 == service_external_cp
,
3232 "kdur.{}.ip-address".format(kdu_index
)
3237 "Mgmt service name: {} not found".format(
3238 mgmt_service
["name"]
3242 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3243 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3245 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3248 and kdu_config
.get("initial-config-primitive")
3249 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3251 initial_config_primitive_list
= kdu_config
.get(
3252 "initial-config-primitive"
3254 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3256 for initial_config_primitive
in initial_config_primitive_list
:
3257 primitive_params_
= self
._map
_primitive
_params
(
3258 initial_config_primitive
, {}, {}
3261 await asyncio
.wait_for(
3262 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3263 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3264 kdu_instance
=kdu_instance
,
3265 primitive_name
=initial_config_primitive
["name"],
3266 params
=primitive_params_
,
3267 db_dict
=db_dict_install
,
3273 except Exception as e
:
3274 # Prepare update db with error and raise exception
3277 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3281 vnfr_data
.get("_id"),
3282 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3285 # ignore to keep original exception
3287 # reraise original error
3292 async def deploy_kdus(
3299 task_instantiation_info
,
3301 # Launch kdus if present in the descriptor
3303 k8scluster_id_2_uuic
= {
3304 "helm-chart-v3": {},
3309 async def _get_cluster_id(cluster_id
, cluster_type
):
3310 nonlocal k8scluster_id_2_uuic
3311 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3312 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3314 # check if K8scluster is creating and wait look if previous tasks in process
3315 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3316 "k8scluster", cluster_id
3319 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3320 task_name
, cluster_id
3322 self
.logger
.debug(logging_text
+ text
)
3323 await asyncio
.wait(task_dependency
, timeout
=3600)
3325 db_k8scluster
= self
.db
.get_one(
3326 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3328 if not db_k8scluster
:
3329 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3331 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3333 if cluster_type
== "helm-chart-v3":
3335 # backward compatibility for existing clusters that have not been initialized for helm v3
3336 k8s_credentials
= yaml
.safe_dump(
3337 db_k8scluster
.get("credentials")
3339 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3340 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3342 db_k8scluster_update
= {}
3343 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3344 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3345 db_k8scluster_update
[
3346 "_admin.helm-chart-v3.created"
3348 db_k8scluster_update
[
3349 "_admin.helm-chart-v3.operationalState"
3352 "k8sclusters", cluster_id
, db_k8scluster_update
3354 except Exception as e
:
3357 + "error initializing helm-v3 cluster: {}".format(str(e
))
3360 "K8s cluster '{}' has not been initialized for '{}'".format(
3361 cluster_id
, cluster_type
3366 "K8s cluster '{}' has not been initialized for '{}'".format(
3367 cluster_id
, cluster_type
3370 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3373 logging_text
+= "Deploy kdus: "
3376 db_nsr_update
= {"_admin.deployed.K8s": []}
3377 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3380 updated_cluster_list
= []
3381 updated_v3_cluster_list
= []
3383 for vnfr_data
in db_vnfrs
.values():
3384 vca_id
= self
.get_vca_id(vnfr_data
, {})
3385 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3386 # Step 0: Prepare and set parameters
3387 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3388 vnfd_id
= vnfr_data
.get("vnfd-id")
3389 vnfd_with_id
= find_in_list(
3390 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3394 for kdud
in vnfd_with_id
["kdu"]
3395 if kdud
["name"] == kdur
["kdu-name"]
3397 namespace
= kdur
.get("k8s-namespace")
3398 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3399 if kdur
.get("helm-chart"):
3400 kdumodel
= kdur
["helm-chart"]
3401 # Default version: helm3, if helm-version is v2 assign v2
3402 k8sclustertype
= "helm-chart-v3"
3403 self
.logger
.debug("kdur: {}".format(kdur
))
3405 kdur
.get("helm-version")
3406 and kdur
.get("helm-version") == "v2"
3408 k8sclustertype
= "helm-chart"
3409 elif kdur
.get("juju-bundle"):
3410 kdumodel
= kdur
["juju-bundle"]
3411 k8sclustertype
= "juju-bundle"
3414 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3415 "juju-bundle. Maybe an old NBI version is running".format(
3416 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3419 # check if kdumodel is a file and exists
3421 vnfd_with_id
= find_in_list(
3422 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3424 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3425 if storage
: # may be not present if vnfd has not artifacts
3426 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3427 if storage
["pkg-dir"]:
3428 filename
= "{}/{}/{}s/{}".format(
3435 filename
= "{}/Scripts/{}s/{}".format(
3440 if self
.fs
.file_exists(
3441 filename
, mode
="file"
3442 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3443 kdumodel
= self
.fs
.path
+ filename
3444 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3446 except Exception: # it is not a file
3449 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3450 step
= "Synchronize repos for k8s cluster '{}'".format(
3453 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3457 k8sclustertype
== "helm-chart"
3458 and cluster_uuid
not in updated_cluster_list
3460 k8sclustertype
== "helm-chart-v3"
3461 and cluster_uuid
not in updated_v3_cluster_list
3463 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3464 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3465 cluster_uuid
=cluster_uuid
3468 if del_repo_list
or added_repo_dict
:
3469 if k8sclustertype
== "helm-chart":
3471 "_admin.helm_charts_added." + item
: None
3472 for item
in del_repo_list
3475 "_admin.helm_charts_added." + item
: name
3476 for item
, name
in added_repo_dict
.items()
3478 updated_cluster_list
.append(cluster_uuid
)
3479 elif k8sclustertype
== "helm-chart-v3":
3481 "_admin.helm_charts_v3_added." + item
: None
3482 for item
in del_repo_list
3485 "_admin.helm_charts_v3_added." + item
: name
3486 for item
, name
in added_repo_dict
.items()
3488 updated_v3_cluster_list
.append(cluster_uuid
)
3490 logging_text
+ "repos synchronized on k8s cluster "
3491 "'{}' to_delete: {}, to_add: {}".format(
3492 k8s_cluster_id
, del_repo_list
, added_repo_dict
3497 {"_id": k8s_cluster_id
},
3503 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3504 vnfr_data
["member-vnf-index-ref"],
3508 k8s_instance_info
= {
3509 "kdu-instance": None,
3510 "k8scluster-uuid": cluster_uuid
,
3511 "k8scluster-type": k8sclustertype
,
3512 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3513 "kdu-name": kdur
["kdu-name"],
3514 "kdu-model": kdumodel
,
3515 "namespace": namespace
,
3516 "kdu-deployment-name": kdu_deployment_name
,
3518 db_path
= "_admin.deployed.K8s.{}".format(index
)
3519 db_nsr_update
[db_path
] = k8s_instance_info
3520 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3521 vnfd_with_id
= find_in_list(
3522 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3524 task
= asyncio
.ensure_future(
3533 k8params
=desc_params
,
3538 self
.lcm_tasks
.register(
3542 "instantiate_KDU-{}".format(index
),
3545 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3551 except (LcmException
, asyncio
.CancelledError
):
3553 except Exception as e
:
3554 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3555 if isinstance(e
, (N2VCException
, DbException
)):
3556 self
.logger
.error(logging_text
+ msg
)
3558 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3559 raise LcmException(msg
)
3562 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3581 task_instantiation_info
,
3584 # launch instantiate_N2VC in a asyncio task and register task object
3585 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3586 # if not found, create one entry and update database
3587 # fill db_nsr._admin.deployed.VCA.<index>
3590 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
3592 if "execution-environment-list" in descriptor_config
:
3593 ee_list
= descriptor_config
.get("execution-environment-list", [])
3594 elif "juju" in descriptor_config
:
3595 ee_list
= [descriptor_config
] # ns charms
3596 else: # other types as script are not supported
3599 for ee_item
in ee_list
:
3602 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3603 ee_item
.get("juju"), ee_item
.get("helm-chart")
3606 ee_descriptor_id
= ee_item
.get("id")
3607 if ee_item
.get("juju"):
3608 vca_name
= ee_item
["juju"].get("charm")
3611 if ee_item
["juju"].get("charm") is not None
3614 if ee_item
["juju"].get("cloud") == "k8s":
3615 vca_type
= "k8s_proxy_charm"
3616 elif ee_item
["juju"].get("proxy") is False:
3617 vca_type
= "native_charm"
3618 elif ee_item
.get("helm-chart"):
3619 vca_name
= ee_item
["helm-chart"]
3620 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
3623 vca_type
= "helm-v3"
3626 logging_text
+ "skipping non juju neither charm configuration"
3631 for vca_index
, vca_deployed
in enumerate(
3632 db_nsr
["_admin"]["deployed"]["VCA"]
3634 if not vca_deployed
:
3637 vca_deployed
.get("member-vnf-index") == member_vnf_index
3638 and vca_deployed
.get("vdu_id") == vdu_id
3639 and vca_deployed
.get("kdu_name") == kdu_name
3640 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
3641 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
3645 # not found, create one.
3647 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
3650 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
3652 target
+= "/kdu/{}".format(kdu_name
)
3654 "target_element": target
,
3655 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3656 "member-vnf-index": member_vnf_index
,
3658 "kdu_name": kdu_name
,
3659 "vdu_count_index": vdu_index
,
3660 "operational-status": "init", # TODO revise
3661 "detailed-status": "", # TODO revise
3662 "step": "initial-deploy", # TODO revise
3664 "vdu_name": vdu_name
,
3666 "ee_descriptor_id": ee_descriptor_id
,
3670 # create VCA and configurationStatus in db
3672 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
3673 "configurationStatus.{}".format(vca_index
): dict(),
3675 self
.update_db_2("nsrs", nsr_id
, db_dict
)
3677 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
3679 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
3680 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
3681 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
3684 task_n2vc
= asyncio
.ensure_future(
3685 self
.instantiate_N2VC(
3686 logging_text
=logging_text
,
3687 vca_index
=vca_index
,
3693 vdu_index
=vdu_index
,
3694 deploy_params
=deploy_params
,
3695 config_descriptor
=descriptor_config
,
3696 base_folder
=base_folder
,
3697 nslcmop_id
=nslcmop_id
,
3701 ee_config_descriptor
=ee_item
,
3704 self
.lcm_tasks
.register(
3708 "instantiate_N2VC-{}".format(vca_index
),
3711 task_instantiation_info
[
3713 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
3714 member_vnf_index
or "", vdu_id
or ""
3718 def _create_nslcmop(nsr_id
, operation
, params
):
3720 Creates a ns-lcm-opp content to be stored at database.
3721 :param nsr_id: internal id of the instance
3722 :param operation: instantiate, terminate, scale, action, ...
3723 :param params: user parameters for the operation
3724 :return: dictionary following SOL005 format
3726 # Raise exception if invalid arguments
3727 if not (nsr_id
and operation
and params
):
3729 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3736 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3737 "operationState": "PROCESSING",
3738 "statusEnteredTime": now
,
3739 "nsInstanceId": nsr_id
,
3740 "lcmOperationType": operation
,
3742 "isAutomaticInvocation": False,
3743 "operationParams": params
,
3744 "isCancelPending": False,
3746 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
3747 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
3752 def _format_additional_params(self
, params
):
3753 params
= params
or {}
3754 for key
, value
in params
.items():
3755 if str(value
).startswith("!!yaml "):
3756 params
[key
] = yaml
.safe_load(value
[7:])
3759 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
3760 primitive
= seq
.get("name")
3761 primitive_params
= {}
3763 "member_vnf_index": vnf_index
,
3764 "primitive": primitive
,
3765 "primitive_params": primitive_params
,
3768 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
3772 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
3773 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
3774 if op
.get("operationState") == "COMPLETED":
3775 # b. Skip sub-operation
3776 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3777 return self
.SUBOPERATION_STATUS_SKIP
3779 # c. retry executing sub-operation
3780 # The sub-operation exists, and operationState != 'COMPLETED'
3781 # Update operationState = 'PROCESSING' to indicate a retry.
3782 operationState
= "PROCESSING"
3783 detailed_status
= "In progress"
3784 self
._update
_suboperation
_status
(
3785 db_nslcmop
, op_index
, operationState
, detailed_status
3787 # Return the sub-operation index
3788 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3789 # with arguments extracted from the sub-operation
3792 # Find a sub-operation where all keys in a matching dictionary must match
3793 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3794 def _find_suboperation(self
, db_nslcmop
, match
):
3795 if db_nslcmop
and match
:
3796 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
3797 for i
, op
in enumerate(op_list
):
3798 if all(op
.get(k
) == match
[k
] for k
in match
):
3800 return self
.SUBOPERATION_STATUS_NOT_FOUND
3802 # Update status for a sub-operation given its index
3803 def _update_suboperation_status(
3804 self
, db_nslcmop
, op_index
, operationState
, detailed_status
3806 # Update DB for HA tasks
3807 q_filter
= {"_id": db_nslcmop
["_id"]}
3809 "_admin.operations.{}.operationState".format(op_index
): operationState
,
3810 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
3813 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
3816 # Add sub-operation, return the index of the added sub-operation
3817 # Optionally, set operationState, detailed-status, and operationType
3818 # Status and type are currently set for 'scale' sub-operations:
3819 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3820 # 'detailed-status' : status message
3821 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3822 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3823 def _add_suboperation(
3831 mapped_primitive_params
,
3832 operationState
=None,
3833 detailed_status
=None,
3836 RO_scaling_info
=None,
3839 return self
.SUBOPERATION_STATUS_NOT_FOUND
3840 # Get the "_admin.operations" list, if it exists
3841 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
3842 op_list
= db_nslcmop_admin
.get("operations")
3843 # Create or append to the "_admin.operations" list
3845 "member_vnf_index": vnf_index
,
3847 "vdu_count_index": vdu_count_index
,
3848 "primitive": primitive
,
3849 "primitive_params": mapped_primitive_params
,
3852 new_op
["operationState"] = operationState
3854 new_op
["detailed-status"] = detailed_status
3856 new_op
["lcmOperationType"] = operationType
3858 new_op
["RO_nsr_id"] = RO_nsr_id
3860 new_op
["RO_scaling_info"] = RO_scaling_info
3862 # No existing operations, create key 'operations' with current operation as first list element
3863 db_nslcmop_admin
.update({"operations": [new_op
]})
3864 op_list
= db_nslcmop_admin
.get("operations")
3866 # Existing operations, append operation to list
3867 op_list
.append(new_op
)
3869 db_nslcmop_update
= {"_admin.operations": op_list
}
3870 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
3871 op_index
= len(op_list
) - 1
3874 # Helper methods for scale() sub-operations
3876 # pre-scale/post-scale:
3877 # Check for 3 different cases:
3878 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
3879 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
3880 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
3881 def _check_or_add_scale_suboperation(
3885 vnf_config_primitive
,
3889 RO_scaling_info
=None,
3891 # Find this sub-operation
3892 if RO_nsr_id
and RO_scaling_info
:
3893 operationType
= "SCALE-RO"
3895 "member_vnf_index": vnf_index
,
3896 "RO_nsr_id": RO_nsr_id
,
3897 "RO_scaling_info": RO_scaling_info
,
3901 "member_vnf_index": vnf_index
,
3902 "primitive": vnf_config_primitive
,
3903 "primitive_params": primitive_params
,
3904 "lcmOperationType": operationType
,
3906 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
3907 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
3908 # a. New sub-operation
3909 # The sub-operation does not exist, add it.
3910 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
3911 # The following parameters are set to None for all kind of scaling:
3913 vdu_count_index
= None
3915 if RO_nsr_id
and RO_scaling_info
:
3916 vnf_config_primitive
= None
3917 primitive_params
= None
3920 RO_scaling_info
= None
3921 # Initial status for sub-operation
3922 operationState
= "PROCESSING"
3923 detailed_status
= "In progress"
3924 # Add sub-operation for pre/post-scaling (zero or more operations)
3925 self
._add
_suboperation
(
3931 vnf_config_primitive
,
3939 return self
.SUBOPERATION_STATUS_NEW
3941 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3942 # or op_index (operationState != 'COMPLETED')
3943 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
3945 # Function to return execution_environment id
3947 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
3948 # TODO vdu_index_count
3949 for vca
in vca_deployed_list
:
3950 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
3953 async def destroy_N2VC(
3961 exec_primitives
=True,
3966 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
3967 :param logging_text:
3969 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
3970 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
3971 :param vca_index: index in the database _admin.deployed.VCA
3972 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
3973 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
3974 not executed properly
3975 :param scaling_in: True destroys the application, False destroys the model
3976 :return: None or exception
3981 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
3982 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
3986 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
3988 # execute terminate_primitives
3990 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
3991 config_descriptor
.get("terminate-config-primitive"),
3992 vca_deployed
.get("ee_descriptor_id"),
3994 vdu_id
= vca_deployed
.get("vdu_id")
3995 vdu_count_index
= vca_deployed
.get("vdu_count_index")
3996 vdu_name
= vca_deployed
.get("vdu_name")
3997 vnf_index
= vca_deployed
.get("member-vnf-index")
3998 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
3999 for seq
in terminate_primitives
:
4000 # For each sequence in list, get primitive and call _ns_execute_primitive()
4001 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
4002 vnf_index
, seq
.get("name")
4004 self
.logger
.debug(logging_text
+ step
)
4005 # Create the primitive for each sequence, i.e. "primitive": "touch"
4006 primitive
= seq
.get("name")
4007 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
4012 self
._add
_suboperation
(
4019 mapped_primitive_params
,
4021 # Sub-operations: Call _ns_execute_primitive() instead of action()
4023 result
, result_detail
= await self
._ns
_execute
_primitive
(
4024 vca_deployed
["ee_id"],
4026 mapped_primitive_params
,
4030 except LcmException
:
4031 # this happens when VCA is not deployed. In this case it is not needed to terminate
4033 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
4034 if result
not in result_ok
:
4036 "terminate_primitive {} for vnf_member_index={} fails with "
4037 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
4039 # set that this VCA do not need terminated
4040 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
4044 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
4047 # Delete Prometheus Jobs if any
4048 # This uses NSR_ID, so it will destroy any jobs under this index
4049 self
.db
.del_list("prometheus_jobs", {"nsr_id": db_nslcmop
["nsInstanceId"]})
4052 await self
.vca_map
[vca_type
].delete_execution_environment(
4053 vca_deployed
["ee_id"],
4054 scaling_in
=scaling_in
,
4059 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
4060 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
4061 namespace
= "." + db_nsr
["_id"]
4063 await self
.n2vc
.delete_namespace(
4064 namespace
=namespace
,
4065 total_timeout
=self
.timeout_charm_delete
,
4068 except N2VCNotFound
: # already deleted. Skip
4070 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
4072 async def _terminate_RO(
4073 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4076 Terminates a deployment from RO
4077 :param logging_text:
4078 :param nsr_deployed: db_nsr._admin.deployed
4081 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
4082 this method will update only the index 2, but it will write on database the concatenated content of the list
4087 ro_nsr_id
= ro_delete_action
= None
4088 if nsr_deployed
and nsr_deployed
.get("RO"):
4089 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
4090 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
4093 stage
[2] = "Deleting ns from VIM."
4094 db_nsr_update
["detailed-status"] = " ".join(stage
)
4095 self
._write
_op
_status
(nslcmop_id
, stage
)
4096 self
.logger
.debug(logging_text
+ stage
[2])
4097 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4098 self
._write
_op
_status
(nslcmop_id
, stage
)
4099 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
4100 ro_delete_action
= desc
["action_id"]
4102 "_admin.deployed.RO.nsr_delete_action_id"
4103 ] = ro_delete_action
4104 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4105 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4106 if ro_delete_action
:
4107 # wait until NS is deleted from VIM
4108 stage
[2] = "Waiting ns deleted from VIM."
4109 detailed_status_old
= None
4113 + " RO_id={} ro_delete_action={}".format(
4114 ro_nsr_id
, ro_delete_action
4117 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4118 self
._write
_op
_status
(nslcmop_id
, stage
)
4120 delete_timeout
= 20 * 60 # 20 minutes
4121 while delete_timeout
> 0:
4122 desc
= await self
.RO
.show(
4124 item_id_name
=ro_nsr_id
,
4125 extra_item
="action",
4126 extra_item_id
=ro_delete_action
,
4130 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
4132 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
4133 if ns_status
== "ERROR":
4134 raise ROclient
.ROClientException(ns_status_info
)
4135 elif ns_status
== "BUILD":
4136 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
4137 elif ns_status
== "ACTIVE":
4138 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4139 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4144 ), "ROclient.check_action_status returns unknown {}".format(
4147 if stage
[2] != detailed_status_old
:
4148 detailed_status_old
= stage
[2]
4149 db_nsr_update
["detailed-status"] = " ".join(stage
)
4150 self
._write
_op
_status
(nslcmop_id
, stage
)
4151 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4152 await asyncio
.sleep(5, loop
=self
.loop
)
4154 else: # delete_timeout <= 0:
4155 raise ROclient
.ROClientException(
4156 "Timeout waiting ns deleted from VIM"
4159 except Exception as e
:
4160 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4162 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4164 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4165 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4166 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4168 logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
)
4171 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4173 failed_detail
.append("delete conflict: {}".format(e
))
4176 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
)
4179 failed_detail
.append("delete error: {}".format(e
))
4181 logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
)
4185 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
4186 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
4188 stage
[2] = "Deleting nsd from RO."
4189 db_nsr_update
["detailed-status"] = " ".join(stage
)
4190 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4191 self
._write
_op
_status
(nslcmop_id
, stage
)
4192 await self
.RO
.delete("nsd", ro_nsd_id
)
4194 logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
)
4196 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4197 except Exception as e
:
4199 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4201 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4203 logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
)
4206 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4208 failed_detail
.append(
4209 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
)
4211 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4213 failed_detail
.append(
4214 "ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
)
4216 self
.logger
.error(logging_text
+ failed_detail
[-1])
4218 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
4219 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
4220 if not vnf_deployed
or not vnf_deployed
["id"]:
4223 ro_vnfd_id
= vnf_deployed
["id"]
4226 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4227 vnf_deployed
["member-vnf-index"], ro_vnfd_id
4229 db_nsr_update
["detailed-status"] = " ".join(stage
)
4230 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4231 self
._write
_op
_status
(nslcmop_id
, stage
)
4232 await self
.RO
.delete("vnfd", ro_vnfd_id
)
4234 logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
)
4236 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
4237 except Exception as e
:
4239 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4242 "_admin.deployed.RO.vnfd.{}.id".format(index
)
4246 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
)
4249 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4251 failed_detail
.append(
4252 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
)
4254 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4256 failed_detail
.append(
4257 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
)
4259 self
.logger
.error(logging_text
+ failed_detail
[-1])
4262 stage
[2] = "Error deleting from VIM"
4264 stage
[2] = "Deleted from VIM"
4265 db_nsr_update
["detailed-status"] = " ".join(stage
)
4266 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4267 self
._write
_op
_status
(nslcmop_id
, stage
)
4270 raise LcmException("; ".join(failed_detail
))
4272 async def terminate(self
, nsr_id
, nslcmop_id
):
4273 # Try to lock HA task here
4274 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4275 if not task_is_locked_by_me
:
4278 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4279 self
.logger
.debug(logging_text
+ "Enter")
4280 timeout_ns_terminate
= self
.timeout_ns_terminate
4283 operation_params
= None
4285 error_list
= [] # annotates all failed error messages
4286 db_nslcmop_update
= {}
4287 autoremove
= False # autoremove after terminated
4288 tasks_dict_info
= {}
4291 "Stage 1/3: Preparing task.",
4292 "Waiting for previous operations to terminate.",
4295 # ^ contains [stage, step, VIM-status]
4297 # wait for any previous tasks in process
4298 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4300 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4301 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4302 operation_params
= db_nslcmop
.get("operationParams") or {}
4303 if operation_params
.get("timeout_ns_terminate"):
4304 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4305 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4306 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4308 db_nsr_update
["operational-status"] = "terminating"
4309 db_nsr_update
["config-status"] = "terminating"
4310 self
._write
_ns
_status
(
4312 ns_state
="TERMINATING",
4313 current_operation
="TERMINATING",
4314 current_operation_id
=nslcmop_id
,
4315 other_update
=db_nsr_update
,
4317 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4318 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4319 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4322 stage
[1] = "Getting vnf descriptors from db."
4323 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4325 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4327 db_vnfds_from_id
= {}
4328 db_vnfds_from_member_index
= {}
4330 for vnfr
in db_vnfrs_list
:
4331 vnfd_id
= vnfr
["vnfd-id"]
4332 if vnfd_id
not in db_vnfds_from_id
:
4333 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4334 db_vnfds_from_id
[vnfd_id
] = vnfd
4335 db_vnfds_from_member_index
[
4336 vnfr
["member-vnf-index-ref"]
4337 ] = db_vnfds_from_id
[vnfd_id
]
4339 # Destroy individual execution environments when there are terminating primitives.
4340 # Rest of EE will be deleted at once
4341 # TODO - check before calling _destroy_N2VC
4342 # if not operation_params.get("skip_terminate_primitives"):#
4343 # or not vca.get("needed_terminate"):
4344 stage
[0] = "Stage 2/3 execute terminating primitives."
4345 self
.logger
.debug(logging_text
+ stage
[0])
4346 stage
[1] = "Looking execution environment that needs terminate."
4347 self
.logger
.debug(logging_text
+ stage
[1])
4349 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4350 config_descriptor
= None
4351 vca_member_vnf_index
= vca
.get("member-vnf-index")
4352 vca_id
= self
.get_vca_id(
4353 db_vnfrs_dict
.get(vca_member_vnf_index
)
4354 if vca_member_vnf_index
4358 if not vca
or not vca
.get("ee_id"):
4360 if not vca
.get("member-vnf-index"):
4362 config_descriptor
= db_nsr
.get("ns-configuration")
4363 elif vca
.get("vdu_id"):
4364 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4365 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4366 elif vca
.get("kdu_name"):
4367 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4368 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4370 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4371 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4372 vca_type
= vca
.get("type")
4373 exec_terminate_primitives
= not operation_params
.get(
4374 "skip_terminate_primitives"
4375 ) and vca
.get("needed_terminate")
4376 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4377 # pending native charms
4379 True if vca_type
in ("helm", "helm-v3", "native_charm") else False
4381 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4382 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4383 task
= asyncio
.ensure_future(
4391 exec_terminate_primitives
,
4395 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4397 # wait for pending tasks of terminate primitives
4401 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4403 error_list
= await self
._wait
_for
_tasks
(
4406 min(self
.timeout_charm_delete
, timeout_ns_terminate
),
4410 tasks_dict_info
.clear()
4412 return # raise LcmException("; ".join(error_list))
4414 # remove All execution environments at once
4415 stage
[0] = "Stage 3/3 delete all."
4417 if nsr_deployed
.get("VCA"):
4418 stage
[1] = "Deleting all execution environments."
4419 self
.logger
.debug(logging_text
+ stage
[1])
4420 vca_id
= self
.get_vca_id({}, db_nsr
)
4421 task_delete_ee
= asyncio
.ensure_future(
4423 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4424 timeout
=self
.timeout_charm_delete
,
4427 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4428 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4430 # Delete from k8scluster
4431 stage
[1] = "Deleting KDUs."
4432 self
.logger
.debug(logging_text
+ stage
[1])
4433 # print(nsr_deployed)
4434 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4435 if not kdu
or not kdu
.get("kdu-instance"):
4437 kdu_instance
= kdu
.get("kdu-instance")
4438 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4439 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4440 vca_id
= self
.get_vca_id({}, db_nsr
)
4441 task_delete_kdu_instance
= asyncio
.ensure_future(
4442 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4443 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4444 kdu_instance
=kdu_instance
,
4451 + "Unknown k8s deployment type {}".format(
4452 kdu
.get("k8scluster-type")
4457 task_delete_kdu_instance
4458 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4461 stage
[1] = "Deleting ns from VIM."
4463 task_delete_ro
= asyncio
.ensure_future(
4464 self
._terminate
_ng
_ro
(
4465 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4469 task_delete_ro
= asyncio
.ensure_future(
4471 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4474 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4476 # rest of staff will be done at finally
4479 ROclient
.ROClientException
,
4484 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4486 except asyncio
.CancelledError
:
4488 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4490 exc
= "Operation was cancelled"
4491 except Exception as e
:
4492 exc
= traceback
.format_exc()
4493 self
.logger
.critical(
4494 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4499 error_list
.append(str(exc
))
4501 # wait for pending tasks
4503 stage
[1] = "Waiting for terminate pending tasks."
4504 self
.logger
.debug(logging_text
+ stage
[1])
4505 error_list
+= await self
._wait
_for
_tasks
(
4508 timeout_ns_terminate
,
4512 stage
[1] = stage
[2] = ""
4513 except asyncio
.CancelledError
:
4514 error_list
.append("Cancelled")
4515 # TODO cancell all tasks
4516 except Exception as exc
:
4517 error_list
.append(str(exc
))
4518 # update status at database
4520 error_detail
= "; ".join(error_list
)
4521 # self.logger.error(logging_text + error_detail)
4522 error_description_nslcmop
= "{} Detail: {}".format(
4523 stage
[0], error_detail
4525 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4526 nslcmop_id
, stage
[0]
4529 db_nsr_update
["operational-status"] = "failed"
4530 db_nsr_update
["detailed-status"] = (
4531 error_description_nsr
+ " Detail: " + error_detail
4533 db_nslcmop_update
["detailed-status"] = error_detail
4534 nslcmop_operation_state
= "FAILED"
4538 error_description_nsr
= error_description_nslcmop
= None
4539 ns_state
= "NOT_INSTANTIATED"
4540 db_nsr_update
["operational-status"] = "terminated"
4541 db_nsr_update
["detailed-status"] = "Done"
4542 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4543 db_nslcmop_update
["detailed-status"] = "Done"
4544 nslcmop_operation_state
= "COMPLETED"
4547 self
._write
_ns
_status
(
4550 current_operation
="IDLE",
4551 current_operation_id
=None,
4552 error_description
=error_description_nsr
,
4553 error_detail
=error_detail
,
4554 other_update
=db_nsr_update
,
4556 self
._write
_op
_status
(
4559 error_message
=error_description_nslcmop
,
4560 operation_state
=nslcmop_operation_state
,
4561 other_update
=db_nslcmop_update
,
4563 if ns_state
== "NOT_INSTANTIATED":
4567 {"nsr-id-ref": nsr_id
},
4568 {"_admin.nsState": "NOT_INSTANTIATED"},
4570 except DbException
as e
:
4573 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4577 if operation_params
:
4578 autoremove
= operation_params
.get("autoremove", False)
4579 if nslcmop_operation_state
:
4581 await self
.msg
.aiowrite(
4586 "nslcmop_id": nslcmop_id
,
4587 "operationState": nslcmop_operation_state
,
4588 "autoremove": autoremove
,
4592 except Exception as e
:
4594 logging_text
+ "kafka_write notification Exception {}".format(e
)
4597 self
.logger
.debug(logging_text
+ "Exit")
4598 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4600 async def _wait_for_tasks(
4601 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4604 error_detail_list
= []
4606 pending_tasks
= list(created_tasks_info
.keys())
4607 num_tasks
= len(pending_tasks
)
4609 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4610 self
._write
_op
_status
(nslcmop_id
, stage
)
4611 while pending_tasks
:
4613 _timeout
= timeout
+ time_start
- time()
4614 done
, pending_tasks
= await asyncio
.wait(
4615 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4617 num_done
+= len(done
)
4618 if not done
: # Timeout
4619 for task
in pending_tasks
:
4620 new_error
= created_tasks_info
[task
] + ": Timeout"
4621 error_detail_list
.append(new_error
)
4622 error_list
.append(new_error
)
4625 if task
.cancelled():
4628 exc
= task
.exception()
4630 if isinstance(exc
, asyncio
.TimeoutError
):
4632 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4633 error_list
.append(created_tasks_info
[task
])
4634 error_detail_list
.append(new_error
)
4641 ROclient
.ROClientException
,
4647 self
.logger
.error(logging_text
+ new_error
)
4649 exc_traceback
= "".join(
4650 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4654 + created_tasks_info
[task
]
4660 logging_text
+ created_tasks_info
[task
] + ": Done"
4662 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4664 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4665 if nsr_id
: # update also nsr
4670 "errorDescription": "Error at: " + ", ".join(error_list
),
4671 "errorDetail": ". ".join(error_detail_list
),
4674 self
._write
_op
_status
(nslcmop_id
, stage
)
4675 return error_detail_list
4678 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4680 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4681 The default-value is used. If it is between < > it look for a value at instantiation_params
4682 :param primitive_desc: portion of VNFD/NSD that describes primitive
4683 :param params: Params provided by user
4684 :param instantiation_params: Instantiation params provided by user
4685 :return: a dictionary with the calculated params
4687 calculated_params
= {}
4688 for parameter
in primitive_desc
.get("parameter", ()):
4689 param_name
= parameter
["name"]
4690 if param_name
in params
:
4691 calculated_params
[param_name
] = params
[param_name
]
4692 elif "default-value" in parameter
or "value" in parameter
:
4693 if "value" in parameter
:
4694 calculated_params
[param_name
] = parameter
["value"]
4696 calculated_params
[param_name
] = parameter
["default-value"]
4698 isinstance(calculated_params
[param_name
], str)
4699 and calculated_params
[param_name
].startswith("<")
4700 and calculated_params
[param_name
].endswith(">")
4702 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4703 calculated_params
[param_name
] = instantiation_params
[
4704 calculated_params
[param_name
][1:-1]
4708 "Parameter {} needed to execute primitive {} not provided".format(
4709 calculated_params
[param_name
], primitive_desc
["name"]
4714 "Parameter {} needed to execute primitive {} not provided".format(
4715 param_name
, primitive_desc
["name"]
4719 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4720 calculated_params
[param_name
] = yaml
.safe_dump(
4721 calculated_params
[param_name
], default_flow_style
=True, width
=256
4723 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4725 ].startswith("!!yaml "):
4726 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4727 if parameter
.get("data-type") == "INTEGER":
4729 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4730 except ValueError: # error converting string to int
4732 "Parameter {} of primitive {} must be integer".format(
4733 param_name
, primitive_desc
["name"]
4736 elif parameter
.get("data-type") == "BOOLEAN":
4737 calculated_params
[param_name
] = not (
4738 (str(calculated_params
[param_name
])).lower() == "false"
4741 # add always ns_config_info if primitive name is config
4742 if primitive_desc
["name"] == "config":
4743 if "ns_config_info" in instantiation_params
:
4744 calculated_params
["ns_config_info"] = instantiation_params
[
4747 return calculated_params
4749 def _look_for_deployed_vca(
4756 ee_descriptor_id
=None,
4758 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4759 for vca
in deployed_vca
:
4762 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4765 vdu_count_index
is not None
4766 and vdu_count_index
!= vca
["vdu_count_index"]
4769 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4771 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4775 # vca_deployed not found
4777 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4778 " is not deployed".format(
4787 ee_id
= vca
.get("ee_id")
4789 "type", "lxc_proxy_charm"
4790 ) # default value for backward compatibility - proxy charm
4793 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4794 "execution environment".format(
4795 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
4798 return ee_id
, vca_type
4800 async def _ns_execute_primitive(
4806 retries_interval
=30,
4813 if primitive
== "config":
4814 primitive_params
= {"params": primitive_params
}
4816 vca_type
= vca_type
or "lxc_proxy_charm"
4820 output
= await asyncio
.wait_for(
4821 self
.vca_map
[vca_type
].exec_primitive(
4823 primitive_name
=primitive
,
4824 params_dict
=primitive_params
,
4825 progress_timeout
=self
.timeout_progress_primitive
,
4826 total_timeout
=self
.timeout_primitive
,
4831 timeout
=timeout
or self
.timeout_primitive
,
4835 except asyncio
.CancelledError
:
4837 except Exception as e
: # asyncio.TimeoutError
4838 if isinstance(e
, asyncio
.TimeoutError
):
4843 "Error executing action {} on {} -> {}".format(
4848 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
4850 return "FAILED", str(e
)
4852 return "COMPLETED", output
4854 except (LcmException
, asyncio
.CancelledError
):
4856 except Exception as e
:
4857 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
4859 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
4861 Updating the vca_status with latest juju information in nsrs record
4862 :param: nsr_id: Id of the nsr
4863 :param: nslcmop_id: Id of the nslcmop
4867 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
4868 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4869 vca_id
= self
.get_vca_id({}, db_nsr
)
4870 if db_nsr
["_admin"]["deployed"]["K8s"]:
4871 for _
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
4872 cluster_uuid
, kdu_instance
, cluster_type
= (
4873 k8s
["k8scluster-uuid"],
4874 k8s
["kdu-instance"],
4875 k8s
["k8scluster-type"],
4877 await self
._on
_update
_k
8s
_db
(
4878 cluster_uuid
=cluster_uuid
,
4879 kdu_instance
=kdu_instance
,
4880 filter={"_id": nsr_id
},
4882 cluster_type
=cluster_type
,
4885 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
4886 table
, filter = "nsrs", {"_id": nsr_id
}
4887 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
4888 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
4890 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
4891 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
4893 async def action(self
, nsr_id
, nslcmop_id
):
4894 # Try to lock HA task here
4895 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4896 if not task_is_locked_by_me
:
4899 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
4900 self
.logger
.debug(logging_text
+ "Enter")
4901 # get all needed from database
4905 db_nslcmop_update
= {}
4906 nslcmop_operation_state
= None
4907 error_description_nslcmop
= None
4910 # wait for any previous tasks in process
4911 step
= "Waiting for previous operations to terminate"
4912 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4914 self
._write
_ns
_status
(
4917 current_operation
="RUNNING ACTION",
4918 current_operation_id
=nslcmop_id
,
4921 step
= "Getting information from database"
4922 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4923 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4924 if db_nslcmop
["operationParams"].get("primitive_params"):
4925 db_nslcmop
["operationParams"]["primitive_params"] = json
.loads(
4926 db_nslcmop
["operationParams"]["primitive_params"]
4929 nsr_deployed
= db_nsr
["_admin"].get("deployed")
4930 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
4931 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
4932 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
4933 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
4934 primitive
= db_nslcmop
["operationParams"]["primitive"]
4935 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
4936 timeout_ns_action
= db_nslcmop
["operationParams"].get(
4937 "timeout_ns_action", self
.timeout_primitive
4941 step
= "Getting vnfr from database"
4942 db_vnfr
= self
.db
.get_one(
4943 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
4945 if db_vnfr
.get("kdur"):
4947 for kdur
in db_vnfr
["kdur"]:
4948 if kdur
.get("additionalParams"):
4949 kdur
["additionalParams"] = json
.loads(
4950 kdur
["additionalParams"]
4952 kdur_list
.append(kdur
)
4953 db_vnfr
["kdur"] = kdur_list
4954 step
= "Getting vnfd from database"
4955 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
4957 step
= "Getting nsd from database"
4958 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
4960 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
4961 # for backward compatibility
4962 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
4963 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
4964 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
4965 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4967 # look for primitive
4968 config_primitive_desc
= descriptor_configuration
= None
4970 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
4972 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
4974 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
4976 descriptor_configuration
= db_nsd
.get("ns-configuration")
4978 if descriptor_configuration
and descriptor_configuration
.get(
4981 for config_primitive
in descriptor_configuration
["config-primitive"]:
4982 if config_primitive
["name"] == primitive
:
4983 config_primitive_desc
= config_primitive
4986 if not config_primitive_desc
:
4987 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
4989 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
4993 primitive_name
= primitive
4994 ee_descriptor_id
= None
4996 primitive_name
= config_primitive_desc
.get(
4997 "execution-environment-primitive", primitive
4999 ee_descriptor_id
= config_primitive_desc
.get(
5000 "execution-environment-ref"
5006 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
5008 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
5011 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
5013 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
5015 desc_params
= parse_yaml_strings(
5016 db_vnfr
.get("additionalParamsForVnf")
5019 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
5020 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
5021 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
5023 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
5024 actions
.add(primitive
["name"])
5025 for primitive
in kdu_configuration
.get("config-primitive", []):
5026 actions
.add(primitive
["name"])
5027 kdu_action
= True if primitive_name
in actions
else False
5029 # TODO check if ns is in a proper status
5031 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
5033 # kdur and desc_params already set from before
5034 if primitive_params
:
5035 desc_params
.update(primitive_params
)
5036 # TODO Check if we will need something at vnf level
5037 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
5039 kdu_name
== kdu
["kdu-name"]
5040 and kdu
["member-vnf-index"] == vnf_index
5045 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
5048 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
5049 msg
= "unknown k8scluster-type '{}'".format(
5050 kdu
.get("k8scluster-type")
5052 raise LcmException(msg
)
5055 "collection": "nsrs",
5056 "filter": {"_id": nsr_id
},
5057 "path": "_admin.deployed.K8s.{}".format(index
),
5061 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
5063 step
= "Executing kdu {}".format(primitive_name
)
5064 if primitive_name
== "upgrade":
5065 if desc_params
.get("kdu_model"):
5066 kdu_model
= desc_params
.get("kdu_model")
5067 del desc_params
["kdu_model"]
5069 kdu_model
= kdu
.get("kdu-model")
5070 parts
= kdu_model
.split(sep
=":")
5072 kdu_model
= parts
[0]
5074 detailed_status
= await asyncio
.wait_for(
5075 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
5076 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5077 kdu_instance
=kdu
.get("kdu-instance"),
5079 kdu_model
=kdu_model
,
5082 timeout
=timeout_ns_action
,
5084 timeout
=timeout_ns_action
+ 10,
5087 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
5089 elif primitive_name
== "rollback":
5090 detailed_status
= await asyncio
.wait_for(
5091 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
5092 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5093 kdu_instance
=kdu
.get("kdu-instance"),
5096 timeout
=timeout_ns_action
,
5098 elif primitive_name
== "status":
5099 detailed_status
= await asyncio
.wait_for(
5100 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
5101 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5102 kdu_instance
=kdu
.get("kdu-instance"),
5105 timeout
=timeout_ns_action
,
5108 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
5109 kdu
["kdu-name"], nsr_id
5111 params
= self
._map
_primitive
_params
(
5112 config_primitive_desc
, primitive_params
, desc_params
5115 detailed_status
= await asyncio
.wait_for(
5116 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
5117 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5118 kdu_instance
=kdu_instance
,
5119 primitive_name
=primitive_name
,
5122 timeout
=timeout_ns_action
,
5125 timeout
=timeout_ns_action
,
5129 nslcmop_operation_state
= "COMPLETED"
5131 detailed_status
= ""
5132 nslcmop_operation_state
= "FAILED"
5134 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5135 nsr_deployed
["VCA"],
5136 member_vnf_index
=vnf_index
,
5138 vdu_count_index
=vdu_count_index
,
5139 ee_descriptor_id
=ee_descriptor_id
,
5141 for vca_index
, vca_deployed
in enumerate(
5142 db_nsr
["_admin"]["deployed"]["VCA"]
5144 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5146 "collection": "nsrs",
5147 "filter": {"_id": nsr_id
},
5148 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
5152 nslcmop_operation_state
,
5154 ) = await self
._ns
_execute
_primitive
(
5156 primitive
=primitive_name
,
5157 primitive_params
=self
._map
_primitive
_params
(
5158 config_primitive_desc
, primitive_params
, desc_params
5160 timeout
=timeout_ns_action
,
5166 db_nslcmop_update
["detailed-status"] = detailed_status
5167 error_description_nslcmop
= (
5168 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
5172 + " task Done with result {} {}".format(
5173 nslcmop_operation_state
, detailed_status
5176 return # database update is called inside finally
5178 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5179 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5181 except asyncio
.CancelledError
:
5183 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5185 exc
= "Operation was cancelled"
5186 except asyncio
.TimeoutError
:
5187 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5189 except Exception as e
:
5190 exc
= traceback
.format_exc()
5191 self
.logger
.critical(
5192 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5201 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5202 nslcmop_operation_state
= "FAILED"
5204 self
._write
_ns
_status
(
5208 ], # TODO check if degraded. For the moment use previous status
5209 current_operation
="IDLE",
5210 current_operation_id
=None,
5211 # error_description=error_description_nsr,
5212 # error_detail=error_detail,
5213 other_update
=db_nsr_update
,
5216 self
._write
_op
_status
(
5219 error_message
=error_description_nslcmop
,
5220 operation_state
=nslcmop_operation_state
,
5221 other_update
=db_nslcmop_update
,
5224 if nslcmop_operation_state
:
5226 await self
.msg
.aiowrite(
5231 "nslcmop_id": nslcmop_id
,
5232 "operationState": nslcmop_operation_state
,
5236 except Exception as e
:
5238 logging_text
+ "kafka_write notification Exception {}".format(e
)
5240 self
.logger
.debug(logging_text
+ "Exit")
5241 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5242 return nslcmop_operation_state
, detailed_status
5244 async def scale(self
, nsr_id
, nslcmop_id
):
5245 # Try to lock HA task here
5246 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5247 if not task_is_locked_by_me
:
5250 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
5251 stage
= ["", "", ""]
5252 tasks_dict_info
= {}
5253 # ^ stage, step, VIM progress
5254 self
.logger
.debug(logging_text
+ "Enter")
5255 # get all needed from database
5257 db_nslcmop_update
= {}
5260 # in case of error, indicates what part of scale was failed to put nsr at error status
5261 scale_process
= None
5262 old_operational_status
= ""
5263 old_config_status
= ""
5266 # wait for any previous tasks in process
5267 step
= "Waiting for previous operations to terminate"
5268 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5269 self
._write
_ns
_status
(
5272 current_operation
="SCALING",
5273 current_operation_id
=nslcmop_id
,
5276 step
= "Getting nslcmop from database"
5278 step
+ " after having waited for previous tasks to be completed"
5280 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5282 step
= "Getting nsr from database"
5283 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5284 old_operational_status
= db_nsr
["operational-status"]
5285 old_config_status
= db_nsr
["config-status"]
5287 step
= "Parsing scaling parameters"
5288 db_nsr_update
["operational-status"] = "scaling"
5289 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5290 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5292 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
5294 ]["member-vnf-index"]
5295 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
5297 ]["scaling-group-descriptor"]
5298 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
5299 # for backward compatibility
5300 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5301 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5302 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5303 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5305 step
= "Getting vnfr from database"
5306 db_vnfr
= self
.db
.get_one(
5307 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
5310 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5312 step
= "Getting vnfd from database"
5313 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
5315 base_folder
= db_vnfd
["_admin"]["storage"]
5317 step
= "Getting scaling-group-descriptor"
5318 scaling_descriptor
= find_in_list(
5319 get_scaling_aspect(db_vnfd
),
5320 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
5322 if not scaling_descriptor
:
5324 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
5325 "at vnfd:scaling-group-descriptor".format(scaling_group
)
5328 step
= "Sending scale order to VIM"
5329 # TODO check if ns is in a proper status
5331 if not db_nsr
["_admin"].get("scaling-group"):
5336 "_admin.scaling-group": [
5337 {"name": scaling_group
, "nb-scale-op": 0}
5341 admin_scale_index
= 0
5343 for admin_scale_index
, admin_scale_info
in enumerate(
5344 db_nsr
["_admin"]["scaling-group"]
5346 if admin_scale_info
["name"] == scaling_group
:
5347 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
5349 else: # not found, set index one plus last element and add new entry with the name
5350 admin_scale_index
+= 1
5352 "_admin.scaling-group.{}.name".format(admin_scale_index
)
5355 vca_scaling_info
= []
5356 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
5357 if scaling_type
== "SCALE_OUT":
5358 if "aspect-delta-details" not in scaling_descriptor
:
5360 "Aspect delta details not fount in scaling descriptor {}".format(
5361 scaling_descriptor
["name"]
5364 # count if max-instance-count is reached
5365 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
5367 scaling_info
["scaling_direction"] = "OUT"
5368 scaling_info
["vdu-create"] = {}
5369 scaling_info
["kdu-create"] = {}
5370 for delta
in deltas
:
5371 for vdu_delta
in delta
.get("vdu-delta", {}):
5372 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
5373 # vdu_index also provides the number of instance of the targeted vdu
5374 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
5375 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
5379 additional_params
= (
5380 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
5383 cloud_init_list
= []
5385 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
5386 max_instance_count
= 10
5387 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
5388 max_instance_count
= vdu_profile
.get(
5389 "max-number-of-instances", 10
5392 default_instance_num
= get_number_of_instances(
5395 instances_number
= vdu_delta
.get("number-of-instances", 1)
5396 nb_scale_op
+= instances_number
5398 new_instance_count
= nb_scale_op
+ default_instance_num
5399 # Control if new count is over max and vdu count is less than max.
5400 # Then assign new instance count
5401 if new_instance_count
> max_instance_count
> vdu_count
:
5402 instances_number
= new_instance_count
- max_instance_count
5404 instances_number
= instances_number
5406 if new_instance_count
> max_instance_count
:
5408 "reached the limit of {} (max-instance-count) "
5409 "scaling-out operations for the "
5410 "scaling-group-descriptor '{}'".format(
5411 nb_scale_op
, scaling_group
5414 for x
in range(vdu_delta
.get("number-of-instances", 1)):
5416 # TODO Information of its own ip is not available because db_vnfr is not updated.
5417 additional_params
["OSM"] = get_osm_params(
5418 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
5420 cloud_init_list
.append(
5421 self
._parse
_cloud
_init
(
5428 vca_scaling_info
.append(
5430 "osm_vdu_id": vdu_delta
["id"],
5431 "member-vnf-index": vnf_index
,
5433 "vdu_index": vdu_index
+ x
,
5436 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
5437 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
5438 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
5439 kdu_name
= kdu_profile
["kdu-name"]
5440 resource_name
= kdu_profile
["resource-name"]
5442 # Might have different kdus in the same delta
5443 # Should have list for each kdu
5444 if not scaling_info
["kdu-create"].get(kdu_name
, None):
5445 scaling_info
["kdu-create"][kdu_name
] = []
5447 kdur
= get_kdur(db_vnfr
, kdu_name
)
5448 if kdur
.get("helm-chart"):
5449 k8s_cluster_type
= "helm-chart-v3"
5450 self
.logger
.debug("kdur: {}".format(kdur
))
5452 kdur
.get("helm-version")
5453 and kdur
.get("helm-version") == "v2"
5455 k8s_cluster_type
= "helm-chart"
5456 raise NotImplementedError
5457 elif kdur
.get("juju-bundle"):
5458 k8s_cluster_type
= "juju-bundle"
5461 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5462 "juju-bundle. Maybe an old NBI version is running".format(
5463 db_vnfr
["member-vnf-index-ref"], kdu_name
5467 max_instance_count
= 10
5468 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
5469 max_instance_count
= kdu_profile
.get(
5470 "max-number-of-instances", 10
5473 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
5474 deployed_kdu
, _
= get_deployed_kdu(
5475 nsr_deployed
, kdu_name
, vnf_index
5477 if deployed_kdu
is None:
5479 "KDU '{}' for vnf '{}' not deployed".format(
5483 kdu_instance
= deployed_kdu
.get("kdu-instance")
5484 instance_num
= await self
.k8scluster_map
[
5486 ].get_scale_count(resource_name
, kdu_instance
, vca_id
=vca_id
)
5487 kdu_replica_count
= instance_num
+ kdu_delta
.get(
5488 "number-of-instances", 1
5491 # Control if new count is over max and instance_num is less than max.
5492 # Then assign max instance number to kdu replica count
5493 if kdu_replica_count
> max_instance_count
> instance_num
:
5494 kdu_replica_count
= max_instance_count
5495 if kdu_replica_count
> max_instance_count
:
5497 "reached the limit of {} (max-instance-count) "
5498 "scaling-out operations for the "
5499 "scaling-group-descriptor '{}'".format(
5500 instance_num
, scaling_group
5504 for x
in range(kdu_delta
.get("number-of-instances", 1)):
5505 vca_scaling_info
.append(
5507 "osm_kdu_id": kdu_name
,
5508 "member-vnf-index": vnf_index
,
5510 "kdu_index": instance_num
+ x
- 1,
5513 scaling_info
["kdu-create"][kdu_name
].append(
5515 "member-vnf-index": vnf_index
,
5517 "k8s-cluster-type": k8s_cluster_type
,
5518 "resource-name": resource_name
,
5519 "scale": kdu_replica_count
,
5522 elif scaling_type
== "SCALE_IN":
5523 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
5525 scaling_info
["scaling_direction"] = "IN"
5526 scaling_info
["vdu-delete"] = {}
5527 scaling_info
["kdu-delete"] = {}
5529 for delta
in deltas
:
5530 for vdu_delta
in delta
.get("vdu-delta", {}):
5531 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
5532 min_instance_count
= 0
5533 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
5534 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
5535 min_instance_count
= vdu_profile
["min-number-of-instances"]
5537 default_instance_num
= get_number_of_instances(
5538 db_vnfd
, vdu_delta
["id"]
5540 instance_num
= vdu_delta
.get("number-of-instances", 1)
5541 nb_scale_op
-= instance_num
5543 new_instance_count
= nb_scale_op
+ default_instance_num
5545 if new_instance_count
< min_instance_count
< vdu_count
:
5546 instances_number
= min_instance_count
- new_instance_count
5548 instances_number
= instance_num
5550 if new_instance_count
< min_instance_count
:
5552 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5553 "scaling-group-descriptor '{}'".format(
5554 nb_scale_op
, scaling_group
5557 for x
in range(vdu_delta
.get("number-of-instances", 1)):
5558 vca_scaling_info
.append(
5560 "osm_vdu_id": vdu_delta
["id"],
5561 "member-vnf-index": vnf_index
,
5563 "vdu_index": vdu_index
- 1 - x
,
5566 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
5567 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
5568 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
5569 kdu_name
= kdu_profile
["kdu-name"]
5570 resource_name
= kdu_profile
["resource-name"]
5572 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
5573 scaling_info
["kdu-delete"][kdu_name
] = []
5575 kdur
= get_kdur(db_vnfr
, kdu_name
)
5576 if kdur
.get("helm-chart"):
5577 k8s_cluster_type
= "helm-chart-v3"
5578 self
.logger
.debug("kdur: {}".format(kdur
))
5580 kdur
.get("helm-version")
5581 and kdur
.get("helm-version") == "v2"
5583 k8s_cluster_type
= "helm-chart"
5584 raise NotImplementedError
5585 elif kdur
.get("juju-bundle"):
5586 k8s_cluster_type
= "juju-bundle"
5589 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5590 "juju-bundle. Maybe an old NBI version is running".format(
5591 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
5595 min_instance_count
= 0
5596 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
5597 min_instance_count
= kdu_profile
["min-number-of-instances"]
5599 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
5600 deployed_kdu
, _
= get_deployed_kdu(
5601 nsr_deployed
, kdu_name
, vnf_index
5603 if deployed_kdu
is None:
5605 "KDU '{}' for vnf '{}' not deployed".format(
5609 kdu_instance
= deployed_kdu
.get("kdu-instance")
5610 instance_num
= await self
.k8scluster_map
[
5612 ].get_scale_count(resource_name
, kdu_instance
, vca_id
=vca_id
)
5613 kdu_replica_count
= instance_num
- kdu_delta
.get(
5614 "number-of-instances", 1
5617 if kdu_replica_count
< min_instance_count
< instance_num
:
5618 kdu_replica_count
= min_instance_count
5619 if kdu_replica_count
< min_instance_count
:
5621 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5622 "scaling-group-descriptor '{}'".format(
5623 instance_num
, scaling_group
5627 for x
in range(kdu_delta
.get("number-of-instances", 1)):
5628 vca_scaling_info
.append(
5630 "osm_kdu_id": kdu_name
,
5631 "member-vnf-index": vnf_index
,
5633 "kdu_index": instance_num
- x
- 1,
5636 scaling_info
["kdu-delete"][kdu_name
].append(
5638 "member-vnf-index": vnf_index
,
5640 "k8s-cluster-type": k8s_cluster_type
,
5641 "resource-name": resource_name
,
5642 "scale": kdu_replica_count
,
5646 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
5647 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
5648 if scaling_info
["scaling_direction"] == "IN":
5649 for vdur
in reversed(db_vnfr
["vdur"]):
5650 if vdu_delete
.get(vdur
["vdu-id-ref"]):
5651 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
5652 scaling_info
["vdu"].append(
5654 "name": vdur
.get("name") or vdur
.get("vdu-name"),
5655 "vdu_id": vdur
["vdu-id-ref"],
5659 for interface
in vdur
["interfaces"]:
5660 scaling_info
["vdu"][-1]["interface"].append(
5662 "name": interface
["name"],
5663 "ip_address": interface
["ip-address"],
5664 "mac_address": interface
.get("mac-address"),
5667 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
5670 step
= "Executing pre-scale vnf-config-primitive"
5671 if scaling_descriptor
.get("scaling-config-action"):
5672 for scaling_config_action
in scaling_descriptor
[
5673 "scaling-config-action"
5676 scaling_config_action
.get("trigger") == "pre-scale-in"
5677 and scaling_type
== "SCALE_IN"
5679 scaling_config_action
.get("trigger") == "pre-scale-out"
5680 and scaling_type
== "SCALE_OUT"
5682 vnf_config_primitive
= scaling_config_action
[
5683 "vnf-config-primitive-name-ref"
5685 step
= db_nslcmop_update
[
5687 ] = "executing pre-scale scaling-config-action '{}'".format(
5688 vnf_config_primitive
5691 # look for primitive
5692 for config_primitive
in (
5693 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
5694 ).get("config-primitive", ()):
5695 if config_primitive
["name"] == vnf_config_primitive
:
5699 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
5700 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
5701 "primitive".format(scaling_group
, vnf_config_primitive
)
5704 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
5705 if db_vnfr
.get("additionalParamsForVnf"):
5706 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
5708 scale_process
= "VCA"
5709 db_nsr_update
["config-status"] = "configuring pre-scaling"
5710 primitive_params
= self
._map
_primitive
_params
(
5711 config_primitive
, {}, vnfr_params
5714 # Pre-scale retry check: Check if this sub-operation has been executed before
5715 op_index
= self
._check
_or
_add
_scale
_suboperation
(
5718 vnf_config_primitive
,
5722 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
5723 # Skip sub-operation
5724 result
= "COMPLETED"
5725 result_detail
= "Done"
5728 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5729 vnf_config_primitive
, result
, result_detail
5733 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
5734 # New sub-operation: Get index of this sub-operation
5736 len(db_nslcmop
.get("_admin", {}).get("operations"))
5741 + "vnf_config_primitive={} New sub-operation".format(
5742 vnf_config_primitive
5746 # retry: Get registered params for this existing sub-operation
5747 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
5750 vnf_index
= op
.get("member_vnf_index")
5751 vnf_config_primitive
= op
.get("primitive")
5752 primitive_params
= op
.get("primitive_params")
5755 + "vnf_config_primitive={} Sub-operation retry".format(
5756 vnf_config_primitive
5759 # Execute the primitive, either with new (first-time) or registered (reintent) args
5760 ee_descriptor_id
= config_primitive
.get(
5761 "execution-environment-ref"
5763 primitive_name
= config_primitive
.get(
5764 "execution-environment-primitive", vnf_config_primitive
5766 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5767 nsr_deployed
["VCA"],
5768 member_vnf_index
=vnf_index
,
5770 vdu_count_index
=None,
5771 ee_descriptor_id
=ee_descriptor_id
,
5773 result
, result_detail
= await self
._ns
_execute
_primitive
(
5782 + "vnf_config_primitive={} Done with result {} {}".format(
5783 vnf_config_primitive
, result
, result_detail
5786 # Update operationState = COMPLETED | FAILED
5787 self
._update
_suboperation
_status
(
5788 db_nslcmop
, op_index
, result
, result_detail
5791 if result
== "FAILED":
5792 raise LcmException(result_detail
)
5793 db_nsr_update
["config-status"] = old_config_status
5794 scale_process
= None
5798 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
5801 "_admin.scaling-group.{}.time".format(admin_scale_index
)
5804 # SCALE-IN VCA - BEGIN
5805 if vca_scaling_info
:
5806 step
= db_nslcmop_update
[
5808 ] = "Deleting the execution environments"
5809 scale_process
= "VCA"
5810 for vca_info
in vca_scaling_info
:
5811 if vca_info
["type"] == "delete":
5812 member_vnf_index
= str(vca_info
["member-vnf-index"])
5814 logging_text
+ "vdu info: {}".format(vca_info
)
5816 if vca_info
.get("osm_vdu_id"):
5817 vdu_id
= vca_info
["osm_vdu_id"]
5818 vdu_index
= int(vca_info
["vdu_index"])
5821 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5822 member_vnf_index
, vdu_id
, vdu_index
5826 kdu_id
= vca_info
["osm_kdu_id"]
5829 ] = "Scaling member_vnf_index={}, kdu_id={}, vdu_index={} ".format(
5830 member_vnf_index
, kdu_id
, vdu_index
5832 stage
[2] = step
= "Scaling in VCA"
5833 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
5834 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
5835 config_update
= db_nsr
["configurationStatus"]
5836 for vca_index
, vca
in enumerate(vca_update
):
5838 (vca
or vca
.get("ee_id"))
5839 and vca
["member-vnf-index"] == member_vnf_index
5840 and vca
["vdu_count_index"] == vdu_index
5842 if vca
.get("vdu_id"):
5843 config_descriptor
= get_configuration(
5844 db_vnfd
, vca
.get("vdu_id")
5846 elif vca
.get("kdu_name"):
5847 config_descriptor
= get_configuration(
5848 db_vnfd
, vca
.get("kdu_name")
5851 config_descriptor
= get_configuration(
5852 db_vnfd
, db_vnfd
["id"]
5854 operation_params
= (
5855 db_nslcmop
.get("operationParams") or {}
5857 exec_terminate_primitives
= not operation_params
.get(
5858 "skip_terminate_primitives"
5859 ) and vca
.get("needed_terminate")
5860 task
= asyncio
.ensure_future(
5869 exec_primitives
=exec_terminate_primitives
,
5873 timeout
=self
.timeout_charm_delete
,
5876 tasks_dict_info
[task
] = "Terminating VCA {}".format(
5879 del vca_update
[vca_index
]
5880 del config_update
[vca_index
]
5881 # wait for pending tasks of terminate primitives
5885 + "Waiting for tasks {}".format(
5886 list(tasks_dict_info
.keys())
5889 error_list
= await self
._wait
_for
_tasks
(
5893 self
.timeout_charm_delete
, self
.timeout_ns_terminate
5898 tasks_dict_info
.clear()
5900 raise LcmException("; ".join(error_list
))
5902 db_vca_and_config_update
= {
5903 "_admin.deployed.VCA": vca_update
,
5904 "configurationStatus": config_update
,
5907 "nsrs", db_nsr
["_id"], db_vca_and_config_update
5909 scale_process
= None
5910 # SCALE-IN VCA - END
5913 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
5914 scale_process
= "RO"
5915 if self
.ro_config
.get("ng"):
5916 await self
._scale
_ng
_ro
(
5917 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
5919 scaling_info
.pop("vdu-create", None)
5920 scaling_info
.pop("vdu-delete", None)
5922 scale_process
= None
5926 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
5927 scale_process
= "KDU"
5928 await self
._scale
_kdu
(
5929 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
5931 scaling_info
.pop("kdu-create", None)
5932 scaling_info
.pop("kdu-delete", None)
5934 scale_process
= None
5938 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5940 # SCALE-UP VCA - BEGIN
5941 if vca_scaling_info
:
5942 step
= db_nslcmop_update
[
5944 ] = "Creating new execution environments"
5945 scale_process
= "VCA"
5946 for vca_info
in vca_scaling_info
:
5947 if vca_info
["type"] == "create":
5948 member_vnf_index
= str(vca_info
["member-vnf-index"])
5950 logging_text
+ "vdu info: {}".format(vca_info
)
5952 vnfd_id
= db_vnfr
["vnfd-ref"]
5953 if vca_info
.get("osm_vdu_id"):
5954 vdu_index
= int(vca_info
["vdu_index"])
5955 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
5956 if db_vnfr
.get("additionalParamsForVnf"):
5957 deploy_params
.update(
5959 db_vnfr
["additionalParamsForVnf"].copy()
5962 descriptor_config
= get_configuration(
5963 db_vnfd
, db_vnfd
["id"]
5965 if descriptor_config
:
5970 logging_text
=logging_text
5971 + "member_vnf_index={} ".format(member_vnf_index
),
5974 nslcmop_id
=nslcmop_id
,
5980 member_vnf_index
=member_vnf_index
,
5981 vdu_index
=vdu_index
,
5983 deploy_params
=deploy_params
,
5984 descriptor_config
=descriptor_config
,
5985 base_folder
=base_folder
,
5986 task_instantiation_info
=tasks_dict_info
,
5989 vdu_id
= vca_info
["osm_vdu_id"]
5990 vdur
= find_in_list(
5991 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
5993 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
5994 if vdur
.get("additionalParams"):
5995 deploy_params_vdu
= parse_yaml_strings(
5996 vdur
["additionalParams"]
5999 deploy_params_vdu
= deploy_params
6000 deploy_params_vdu
["OSM"] = get_osm_params(
6001 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
6003 if descriptor_config
:
6008 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6009 member_vnf_index
, vdu_id
, vdu_index
6011 stage
[2] = step
= "Scaling out VCA"
6012 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6014 logging_text
=logging_text
6015 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6016 member_vnf_index
, vdu_id
, vdu_index
6020 nslcmop_id
=nslcmop_id
,
6026 member_vnf_index
=member_vnf_index
,
6027 vdu_index
=vdu_index
,
6029 deploy_params
=deploy_params_vdu
,
6030 descriptor_config
=descriptor_config
,
6031 base_folder
=base_folder
,
6032 task_instantiation_info
=tasks_dict_info
,
6036 kdu_name
= vca_info
["osm_kdu_id"]
6037 descriptor_config
= get_configuration(db_vnfd
, kdu_name
)
6038 if descriptor_config
:
6040 kdu_index
= int(vca_info
["kdu_index"])
6044 for x
in db_vnfr
["kdur"]
6045 if x
["kdu-name"] == kdu_name
6047 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
6048 if kdur
.get("additionalParams"):
6049 deploy_params_kdu
= parse_yaml_strings(
6050 kdur
["additionalParams"]
6054 logging_text
=logging_text
,
6057 nslcmop_id
=nslcmop_id
,
6063 member_vnf_index
=member_vnf_index
,
6064 vdu_index
=kdu_index
,
6066 deploy_params
=deploy_params_kdu
,
6067 descriptor_config
=descriptor_config
,
6068 base_folder
=base_folder
,
6069 task_instantiation_info
=tasks_dict_info
,
6072 # SCALE-UP VCA - END
6073 scale_process
= None
6076 # execute primitive service POST-SCALING
6077 step
= "Executing post-scale vnf-config-primitive"
6078 if scaling_descriptor
.get("scaling-config-action"):
6079 for scaling_config_action
in scaling_descriptor
[
6080 "scaling-config-action"
6083 scaling_config_action
.get("trigger") == "post-scale-in"
6084 and scaling_type
== "SCALE_IN"
6086 scaling_config_action
.get("trigger") == "post-scale-out"
6087 and scaling_type
== "SCALE_OUT"
6089 vnf_config_primitive
= scaling_config_action
[
6090 "vnf-config-primitive-name-ref"
6092 step
= db_nslcmop_update
[
6094 ] = "executing post-scale scaling-config-action '{}'".format(
6095 vnf_config_primitive
6098 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6099 if db_vnfr
.get("additionalParamsForVnf"):
6100 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6102 # look for primitive
6103 for config_primitive
in (
6104 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6105 ).get("config-primitive", ()):
6106 if config_primitive
["name"] == vnf_config_primitive
:
6110 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
6111 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
6112 "config-primitive".format(
6113 scaling_group
, vnf_config_primitive
6116 scale_process
= "VCA"
6117 db_nsr_update
["config-status"] = "configuring post-scaling"
6118 primitive_params
= self
._map
_primitive
_params
(
6119 config_primitive
, {}, vnfr_params
6122 # Post-scale retry check: Check if this sub-operation has been executed before
6123 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6126 vnf_config_primitive
,
6130 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6131 # Skip sub-operation
6132 result
= "COMPLETED"
6133 result_detail
= "Done"
6136 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6137 vnf_config_primitive
, result
, result_detail
6141 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6142 # New sub-operation: Get index of this sub-operation
6144 len(db_nslcmop
.get("_admin", {}).get("operations"))
6149 + "vnf_config_primitive={} New sub-operation".format(
6150 vnf_config_primitive
6154 # retry: Get registered params for this existing sub-operation
6155 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6158 vnf_index
= op
.get("member_vnf_index")
6159 vnf_config_primitive
= op
.get("primitive")
6160 primitive_params
= op
.get("primitive_params")
6163 + "vnf_config_primitive={} Sub-operation retry".format(
6164 vnf_config_primitive
6167 # Execute the primitive, either with new (first-time) or registered (reintent) args
6168 ee_descriptor_id
= config_primitive
.get(
6169 "execution-environment-ref"
6171 primitive_name
= config_primitive
.get(
6172 "execution-environment-primitive", vnf_config_primitive
6174 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6175 nsr_deployed
["VCA"],
6176 member_vnf_index
=vnf_index
,
6178 vdu_count_index
=None,
6179 ee_descriptor_id
=ee_descriptor_id
,
6181 result
, result_detail
= await self
._ns
_execute
_primitive
(
6190 + "vnf_config_primitive={} Done with result {} {}".format(
6191 vnf_config_primitive
, result
, result_detail
6194 # Update operationState = COMPLETED | FAILED
6195 self
._update
_suboperation
_status
(
6196 db_nslcmop
, op_index
, result
, result_detail
6199 if result
== "FAILED":
6200 raise LcmException(result_detail
)
6201 db_nsr_update
["config-status"] = old_config_status
6202 scale_process
= None
6207 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
6208 db_nsr_update
["operational-status"] = (
6210 if old_operational_status
== "failed"
6211 else old_operational_status
6213 db_nsr_update
["config-status"] = old_config_status
6216 ROclient
.ROClientException
,
6221 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
6223 except asyncio
.CancelledError
:
6225 logging_text
+ "Cancelled Exception while '{}'".format(step
)
6227 exc
= "Operation was cancelled"
6228 except Exception as e
:
6229 exc
= traceback
.format_exc()
6230 self
.logger
.critical(
6231 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6235 self
._write
_ns
_status
(
6238 current_operation
="IDLE",
6239 current_operation_id
=None,
6242 stage
[1] = "Waiting for instantiate pending tasks."
6243 self
.logger
.debug(logging_text
+ stage
[1])
6244 exc
= await self
._wait
_for
_tasks
(
6247 self
.timeout_ns_deploy
,
6255 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
6256 nslcmop_operation_state
= "FAILED"
6258 db_nsr_update
["operational-status"] = old_operational_status
6259 db_nsr_update
["config-status"] = old_config_status
6260 db_nsr_update
["detailed-status"] = ""
6262 if "VCA" in scale_process
:
6263 db_nsr_update
["config-status"] = "failed"
6264 if "RO" in scale_process
:
6265 db_nsr_update
["operational-status"] = "failed"
6268 ] = "FAILED scaling nslcmop={} {}: {}".format(
6269 nslcmop_id
, step
, exc
6272 error_description_nslcmop
= None
6273 nslcmop_operation_state
= "COMPLETED"
6274 db_nslcmop_update
["detailed-status"] = "Done"
6276 self
._write
_op
_status
(
6279 error_message
=error_description_nslcmop
,
6280 operation_state
=nslcmop_operation_state
,
6281 other_update
=db_nslcmop_update
,
6284 self
._write
_ns
_status
(
6287 current_operation
="IDLE",
6288 current_operation_id
=None,
6289 other_update
=db_nsr_update
,
6292 if nslcmop_operation_state
:
6296 "nslcmop_id": nslcmop_id
,
6297 "operationState": nslcmop_operation_state
,
6299 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
6300 except Exception as e
:
6302 logging_text
+ "kafka_write notification Exception {}".format(e
)
6304 self
.logger
.debug(logging_text
+ "Exit")
6305 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
6307 async def _scale_kdu(
6308 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6310 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
6311 for kdu_name
in _scaling_info
:
6312 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
6313 deployed_kdu
, index
= get_deployed_kdu(
6314 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
6316 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
6317 kdu_instance
= deployed_kdu
["kdu-instance"]
6318 scale
= int(kdu_scaling_info
["scale"])
6319 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
6322 "collection": "nsrs",
6323 "filter": {"_id": nsr_id
},
6324 "path": "_admin.deployed.K8s.{}".format(index
),
6327 step
= "scaling application {}".format(
6328 kdu_scaling_info
["resource-name"]
6330 self
.logger
.debug(logging_text
+ step
)
6332 if kdu_scaling_info
["type"] == "delete":
6333 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
6336 and kdu_config
.get("terminate-config-primitive")
6337 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
6339 terminate_config_primitive_list
= kdu_config
.get(
6340 "terminate-config-primitive"
6342 terminate_config_primitive_list
.sort(
6343 key
=lambda val
: int(val
["seq"])
6347 terminate_config_primitive
6348 ) in terminate_config_primitive_list
:
6349 primitive_params_
= self
._map
_primitive
_params
(
6350 terminate_config_primitive
, {}, {}
6352 step
= "execute terminate config primitive"
6353 self
.logger
.debug(logging_text
+ step
)
6354 await asyncio
.wait_for(
6355 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
6356 cluster_uuid
=cluster_uuid
,
6357 kdu_instance
=kdu_instance
,
6358 primitive_name
=terminate_config_primitive
["name"],
6359 params
=primitive_params_
,
6366 await asyncio
.wait_for(
6367 self
.k8scluster_map
[k8s_cluster_type
].scale(
6370 kdu_scaling_info
["resource-name"],
6373 timeout
=self
.timeout_vca_on_error
,
6376 if kdu_scaling_info
["type"] == "create":
6377 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
6380 and kdu_config
.get("initial-config-primitive")
6381 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
6383 initial_config_primitive_list
= kdu_config
.get(
6384 "initial-config-primitive"
6386 initial_config_primitive_list
.sort(
6387 key
=lambda val
: int(val
["seq"])
6390 for initial_config_primitive
in initial_config_primitive_list
:
6391 primitive_params_
= self
._map
_primitive
_params
(
6392 initial_config_primitive
, {}, {}
6394 step
= "execute initial config primitive"
6395 self
.logger
.debug(logging_text
+ step
)
6396 await asyncio
.wait_for(
6397 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
6398 cluster_uuid
=cluster_uuid
,
6399 kdu_instance
=kdu_instance
,
6400 primitive_name
=initial_config_primitive
["name"],
6401 params
=primitive_params_
,
6408 async def _scale_ng_ro(
6409 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
6411 nsr_id
= db_nslcmop
["nsInstanceId"]
6412 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
6415 # read from db: vnfd's for every vnf
6418 # for each vnf in ns, read vnfd
6419 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
6420 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
6421 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
6422 # if we haven't this vnfd, read it from db
6423 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
6425 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
6426 db_vnfds
.append(vnfd
)
6427 n2vc_key
= self
.n2vc
.get_public_key()
6428 n2vc_key_list
= [n2vc_key
]
6431 vdu_scaling_info
.get("vdu-create"),
6432 vdu_scaling_info
.get("vdu-delete"),
6435 # db_vnfr has been updated, update db_vnfrs to use it
6436 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
6437 await self
._instantiate
_ng
_ro
(
6447 start_deploy
=time(),
6448 timeout_ns_deploy
=self
.timeout_ns_deploy
,
6450 if vdu_scaling_info
.get("vdu-delete"):
6452 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
6455 async def extract_prometheus_scrape_jobs(
6456 self
, ee_id
, artifact_path
, ee_config_descriptor
, vnfr_id
, nsr_id
, target_ip
6458 # look if exist a file called 'prometheus*.j2' and
6459 artifact_content
= self
.fs
.dir_ls(artifact_path
)
6463 for f
in artifact_content
6464 if f
.startswith("prometheus") and f
.endswith(".j2")
6470 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
6474 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
6475 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
6477 vnfr_id
= vnfr_id
.replace("-", "")
6479 "JOB_NAME": vnfr_id
,
6480 "TARGET_IP": target_ip
,
6481 "EXPORTER_POD_IP": host_name
,
6482 "EXPORTER_POD_PORT": host_port
,
6484 job_list
= parse_job(job_data
, variables
)
6485 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
6486 for job
in job_list
:
6488 not isinstance(job
.get("job_name"), str)
6489 or vnfr_id
not in job
["job_name"]
6491 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
6492 job
["nsr_id"] = nsr_id
6493 job
["vnfr_id"] = vnfr_id
6496 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
6498 Get VCA Cloud and VCA Cloud Credentials for the VIM account
6500 :param: vim_account_id: VIM Account ID
6502 :return: (cloud_name, cloud_credential)
6504 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
6505 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
6507 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
6509 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
6511 :param: vim_account_id: VIM Account ID
6513 :return: (cloud_name, cloud_credential)
6515 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
6516 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")