1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
20 from typing
import Any
, Dict
, List
23 import logging
.handlers
34 from osm_lcm
import ROclient
35 from osm_lcm
.data_utils
.nsr
import (
38 get_deployed_vca_list
,
41 from osm_lcm
.data_utils
.vca
import (
50 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
51 from osm_lcm
.lcm_utils
import (
59 from osm_lcm
.data_utils
.nsd
import (
60 get_ns_configuration_relation_list
,
64 from osm_lcm
.data_utils
.vnfd
import (
68 get_ee_sorted_initial_config_primitive_list
,
69 get_ee_sorted_terminate_config_primitive_list
,
71 get_virtual_link_profiles
,
76 get_number_of_instances
,
78 get_kdu_resource_profile
,
80 from osm_lcm
.data_utils
.list_utils
import find_in_list
81 from osm_lcm
.data_utils
.vnfr
import get_osm_params
, get_vdur_index
, get_kdur
82 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
83 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
84 from n2vc
.definitions
import RelationEndpoint
85 from n2vc
.k8s_helm_conn
import K8sHelmConnector
86 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
87 from n2vc
.k8s_juju_conn
import K8sJujuConnector
89 from osm_common
.dbbase
import DbException
90 from osm_common
.fsbase
import FsException
92 from osm_lcm
.data_utils
.database
.database
import Database
93 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
95 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
96 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
98 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
99 from osm_lcm
.prometheus
import parse_job
101 from copy
import copy
, deepcopy
102 from time
import time
103 from uuid
import uuid4
105 from random
import randint
107 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
110 class NsLcm(LcmBase
):
111 timeout_vca_on_error
= (
113 ) # Time for charm from first time at blocked,error status to mark as failed
114 timeout_ns_deploy
= 2 * 3600 # default global timeout for deployment a ns
115 timeout_ns_terminate
= 1800 # default global timeout for un deployment a ns
116 timeout_charm_delete
= 10 * 60
117 timeout_primitive
= 30 * 60 # timeout for primitive execution
118 timeout_progress_primitive
= (
120 ) # timeout for some progress in a primitive execution
122 SUBOPERATION_STATUS_NOT_FOUND
= -1
123 SUBOPERATION_STATUS_NEW
= -2
124 SUBOPERATION_STATUS_SKIP
= -3
125 task_name_deploy_vca
= "Deploying VCA"
127 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
129 Init, Connect to database, filesystem storage, and messaging
130 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
133 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
135 self
.db
= Database().instance
.db
136 self
.fs
= Filesystem().instance
.fs
138 self
.lcm_tasks
= lcm_tasks
139 self
.timeout
= config
["timeout"]
140 self
.ro_config
= config
["ro_config"]
141 self
.ng_ro
= config
["ro_config"].get("ng")
142 self
.vca_config
= config
["VCA"].copy()
144 # create N2VC connector
145 self
.n2vc
= N2VCJujuConnector(
148 on_update_db
=self
._on
_update
_n
2vc
_db
,
153 self
.conn_helm_ee
= LCMHelmConn(
156 vca_config
=self
.vca_config
,
157 on_update_db
=self
._on
_update
_n
2vc
_db
,
160 self
.k8sclusterhelm2
= K8sHelmConnector(
161 kubectl_command
=self
.vca_config
.get("kubectlpath"),
162 helm_command
=self
.vca_config
.get("helmpath"),
169 self
.k8sclusterhelm3
= K8sHelm3Connector(
170 kubectl_command
=self
.vca_config
.get("kubectlpath"),
171 helm_command
=self
.vca_config
.get("helm3path"),
178 self
.k8sclusterjuju
= K8sJujuConnector(
179 kubectl_command
=self
.vca_config
.get("kubectlpath"),
180 juju_command
=self
.vca_config
.get("jujupath"),
183 on_update_db
=self
._on
_update
_k
8s
_db
,
188 self
.k8scluster_map
= {
189 "helm-chart": self
.k8sclusterhelm2
,
190 "helm-chart-v3": self
.k8sclusterhelm3
,
191 "chart": self
.k8sclusterhelm3
,
192 "juju-bundle": self
.k8sclusterjuju
,
193 "juju": self
.k8sclusterjuju
,
197 "lxc_proxy_charm": self
.n2vc
,
198 "native_charm": self
.n2vc
,
199 "k8s_proxy_charm": self
.n2vc
,
200 "helm": self
.conn_helm_ee
,
201 "helm-v3": self
.conn_helm_ee
,
205 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
)
208 def increment_ip_mac(ip_mac
, vm_index
=1):
209 if not isinstance(ip_mac
, str):
212 # try with ipv4 look for last dot
213 i
= ip_mac
.rfind(".")
216 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
217 # try with ipv6 or mac look for last colon. Operate in hex
218 i
= ip_mac
.rfind(":")
221 # format in hex, len can be 2 for mac or 4 for ipv6
222 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
223 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
229 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
231 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
234 # TODO filter RO descriptor fields...
238 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
239 db_dict
["deploymentStatus"] = ro_descriptor
240 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
242 except Exception as e
:
244 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id
, e
)
247 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
249 # remove last dot from path (if exists)
250 if path
.endswith("."):
253 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
254 # .format(table, filter, path, updated_data))
257 nsr_id
= filter.get("_id")
259 # read ns record from database
260 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
261 current_ns_status
= nsr
.get("nsState")
263 # get vca status for NS
264 status_dict
= await self
.n2vc
.get_status(
265 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
270 db_dict
["vcaStatus"] = status_dict
271 await self
.n2vc
.update_vca_status(db_dict
["vcaStatus"], vca_id
=vca_id
)
273 # update configurationStatus for this VCA
275 vca_index
= int(path
[path
.rfind(".") + 1 :])
278 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
280 vca_status
= vca_list
[vca_index
].get("status")
282 configuration_status_list
= nsr
.get("configurationStatus")
283 config_status
= configuration_status_list
[vca_index
].get("status")
285 if config_status
== "BROKEN" and vca_status
!= "failed":
286 db_dict
["configurationStatus"][vca_index
] = "READY"
287 elif config_status
!= "BROKEN" and vca_status
== "failed":
288 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
289 except Exception as e
:
290 # not update configurationStatus
291 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
293 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
294 # if nsState = 'DEGRADED' check if all is OK
296 if current_ns_status
in ("READY", "DEGRADED"):
297 error_description
= ""
299 if status_dict
.get("machines"):
300 for machine_id
in status_dict
.get("machines"):
301 machine
= status_dict
.get("machines").get(machine_id
)
302 # check machine agent-status
303 if machine
.get("agent-status"):
304 s
= machine
.get("agent-status").get("status")
307 error_description
+= (
308 "machine {} agent-status={} ; ".format(
312 # check machine instance status
313 if machine
.get("instance-status"):
314 s
= machine
.get("instance-status").get("status")
317 error_description
+= (
318 "machine {} instance-status={} ; ".format(
323 if status_dict
.get("applications"):
324 for app_id
in status_dict
.get("applications"):
325 app
= status_dict
.get("applications").get(app_id
)
326 # check application status
327 if app
.get("status"):
328 s
= app
.get("status").get("status")
331 error_description
+= (
332 "application {} status={} ; ".format(app_id
, s
)
335 if error_description
:
336 db_dict
["errorDescription"] = error_description
337 if current_ns_status
== "READY" and is_degraded
:
338 db_dict
["nsState"] = "DEGRADED"
339 if current_ns_status
== "DEGRADED" and not is_degraded
:
340 db_dict
["nsState"] = "READY"
343 self
.update_db_2("nsrs", nsr_id
, db_dict
)
345 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
347 except Exception as e
:
348 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
350 async def _on_update_k8s_db(
351 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None, cluster_type
="juju"
354 Updating vca status in NSR record
355 :param cluster_uuid: UUID of a k8s cluster
356 :param kdu_instance: The unique name of the KDU instance
357 :param filter: To get nsr_id
358 :cluster_type: The cluster type (juju, k8s)
362 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
363 # .format(cluster_uuid, kdu_instance, filter))
365 nsr_id
= filter.get("_id")
367 vca_status
= await self
.k8scluster_map
[cluster_type
].status_kdu(
368 cluster_uuid
=cluster_uuid
,
369 kdu_instance
=kdu_instance
,
371 complete_status
=True,
377 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
379 if cluster_type
in ("juju-bundle", "juju"):
380 # TODO -> this should be done in a more uniform way, I think in N2VC, in order to update the K8s VCA
381 # status in a similar way between Juju Bundles and Helm Charts on this side
382 await self
.k8sclusterjuju
.update_vca_status(
383 db_dict
["vcaStatus"],
389 f
"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
393 self
.update_db_2("nsrs", nsr_id
, db_dict
)
394 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
396 except Exception as e
:
397 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
400 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
402 env
= Environment(undefined
=StrictUndefined
)
403 template
= env
.from_string(cloud_init_text
)
404 return template
.render(additional_params
or {})
405 except UndefinedError
as e
:
407 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
408 "file, must be provided in the instantiation parameters inside the "
409 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
411 except (TemplateError
, TemplateNotFound
) as e
:
413 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
418 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
419 cloud_init_content
= cloud_init_file
= None
421 if vdu
.get("cloud-init-file"):
422 base_folder
= vnfd
["_admin"]["storage"]
423 if base_folder
["pkg-dir"]:
424 cloud_init_file
= "{}/{}/cloud_init/{}".format(
425 base_folder
["folder"],
426 base_folder
["pkg-dir"],
427 vdu
["cloud-init-file"],
430 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
431 base_folder
["folder"],
432 vdu
["cloud-init-file"],
434 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
435 cloud_init_content
= ci_file
.read()
436 elif vdu
.get("cloud-init"):
437 cloud_init_content
= vdu
["cloud-init"]
439 return cloud_init_content
440 except FsException
as e
:
442 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
443 vnfd
["id"], vdu
["id"], cloud_init_file
, e
447 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
449 (vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]),
452 additional_params
= vdur
.get("additionalParams")
453 return parse_yaml_strings(additional_params
)
455 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
457 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
458 :param vnfd: input vnfd
459 :param new_id: overrides vnf id if provided
460 :param additionalParams: Instantiation params for VNFs provided
461 :param nsrId: Id of the NSR
462 :return: copy of vnfd
464 vnfd_RO
= deepcopy(vnfd
)
465 # remove unused by RO configuration, monitoring, scaling and internal keys
466 vnfd_RO
.pop("_id", None)
467 vnfd_RO
.pop("_admin", None)
468 vnfd_RO
.pop("monitoring-param", None)
469 vnfd_RO
.pop("scaling-group-descriptor", None)
470 vnfd_RO
.pop("kdu", None)
471 vnfd_RO
.pop("k8s-cluster", None)
473 vnfd_RO
["id"] = new_id
475 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
476 for vdu
in get_iterable(vnfd_RO
, "vdu"):
477 vdu
.pop("cloud-init-file", None)
478 vdu
.pop("cloud-init", None)
482 def ip_profile_2_RO(ip_profile
):
483 RO_ip_profile
= deepcopy(ip_profile
)
484 if "dns-server" in RO_ip_profile
:
485 if isinstance(RO_ip_profile
["dns-server"], list):
486 RO_ip_profile
["dns-address"] = []
487 for ds
in RO_ip_profile
.pop("dns-server"):
488 RO_ip_profile
["dns-address"].append(ds
["address"])
490 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
491 if RO_ip_profile
.get("ip-version") == "ipv4":
492 RO_ip_profile
["ip-version"] = "IPv4"
493 if RO_ip_profile
.get("ip-version") == "ipv6":
494 RO_ip_profile
["ip-version"] = "IPv6"
495 if "dhcp-params" in RO_ip_profile
:
496 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
499 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
500 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
501 if db_vim
["_admin"]["operationalState"] != "ENABLED":
503 "VIM={} is not available. operationalState={}".format(
504 vim_account
, db_vim
["_admin"]["operationalState"]
507 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
510 def get_ro_wim_id_for_wim_account(self
, wim_account
):
511 if isinstance(wim_account
, str):
512 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
513 if db_wim
["_admin"]["operationalState"] != "ENABLED":
515 "WIM={} is not available. operationalState={}".format(
516 wim_account
, db_wim
["_admin"]["operationalState"]
519 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
524 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
526 db_vdu_push_list
= []
528 db_update
= {"_admin.modified": time()}
530 for vdu_id
, vdu_count
in vdu_create
.items():
534 for vdur
in reversed(db_vnfr
["vdur"])
535 if vdur
["vdu-id-ref"] == vdu_id
540 # Read the template saved in the db:
541 self
.logger
.debug("No vdur in the database. Using the vdur-template to scale")
542 vdur_template
= db_vnfr
.get("vdur-template")
543 if not vdur_template
:
545 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
549 vdur
= vdur_template
[0]
550 # Delete a template from the database after using it
553 {"_id": db_vnfr
["_id"]},
555 pull
={"vdur-template": {"_id": vdur
['_id']}}
557 for count
in range(vdu_count
):
558 vdur_copy
= deepcopy(vdur
)
559 vdur_copy
["status"] = "BUILD"
560 vdur_copy
["status-detailed"] = None
561 vdur_copy
["ip-address"] = None
562 vdur_copy
["_id"] = str(uuid4())
563 vdur_copy
["count-index"] += count
+ 1
564 vdur_copy
["id"] = "{}-{}".format(
565 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
567 vdur_copy
.pop("vim_info", None)
568 for iface
in vdur_copy
["interfaces"]:
569 if iface
.get("fixed-ip"):
570 iface
["ip-address"] = self
.increment_ip_mac(
571 iface
["ip-address"], count
+ 1
574 iface
.pop("ip-address", None)
575 if iface
.get("fixed-mac"):
576 iface
["mac-address"] = self
.increment_ip_mac(
577 iface
["mac-address"], count
+ 1
580 iface
.pop("mac-address", None)
584 ) # only first vdu can be managment of vnf
585 db_vdu_push_list
.append(vdur_copy
)
586 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
588 if len(db_vnfr
["vdur"]) == 1:
589 # The scale will move to 0 instances
590 self
.logger
.debug("Scaling to 0 !, creating the template with the last vdur")
591 template_vdur
= [db_vnfr
["vdur"][0]]
592 for vdu_id
, vdu_count
in vdu_delete
.items():
594 indexes_to_delete
= [
596 for iv
in enumerate(db_vnfr
["vdur"])
597 if iv
[1]["vdu-id-ref"] == vdu_id
601 "vdur.{}.status".format(i
): "DELETING"
602 for i
in indexes_to_delete
[-vdu_count
:]
606 # it must be deleted one by one because common.db does not allow otherwise
609 for v
in reversed(db_vnfr
["vdur"])
610 if v
["vdu-id-ref"] == vdu_id
612 for vdu
in vdus_to_delete
[:vdu_count
]:
615 {"_id": db_vnfr
["_id"]},
617 pull
={"vdur": {"_id": vdu
["_id"]}},
621 db_push
["vdur"] = db_vdu_push_list
623 db_push
["vdur-template"] = template_vdur
626 db_vnfr
["vdur-template"] = template_vdur
627 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
628 # modify passed dictionary db_vnfr
629 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
630 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
632 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
634 Updates database nsr with the RO info for the created vld
635 :param ns_update_nsr: dictionary to be filled with the updated info
636 :param db_nsr: content of db_nsr. This is also modified
637 :param nsr_desc_RO: nsr descriptor from RO
638 :return: Nothing, LcmException is raised on errors
641 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
642 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
643 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
645 vld
["vim-id"] = net_RO
.get("vim_net_id")
646 vld
["name"] = net_RO
.get("vim_name")
647 vld
["status"] = net_RO
.get("status")
648 vld
["status-detailed"] = net_RO
.get("error_msg")
649 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
653 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
656 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
658 for db_vnfr
in db_vnfrs
.values():
659 vnfr_update
= {"status": "ERROR"}
660 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
661 if "status" not in vdur
:
662 vdur
["status"] = "ERROR"
663 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
665 vdur
["status-detailed"] = str(error_text
)
667 "vdur.{}.status-detailed".format(vdu_index
)
669 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
670 except DbException
as e
:
671 self
.logger
.error("Cannot update vnf. {}".format(e
))
673 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
675 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
676 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
677 :param nsr_desc_RO: nsr descriptor from RO
678 :return: Nothing, LcmException is raised on errors
680 for vnf_index
, db_vnfr
in db_vnfrs
.items():
681 for vnf_RO
in nsr_desc_RO
["vnfs"]:
682 if vnf_RO
["member_vnf_index"] != vnf_index
:
685 if vnf_RO
.get("ip_address"):
686 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
[
689 elif not db_vnfr
.get("ip-address"):
690 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
691 raise LcmExceptionNoMgmtIP(
692 "ns member_vnf_index '{}' has no IP address".format(
697 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
698 vdur_RO_count_index
= 0
699 if vdur
.get("pdu-type"):
701 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
702 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
704 if vdur
["count-index"] != vdur_RO_count_index
:
705 vdur_RO_count_index
+= 1
707 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
708 if vdur_RO
.get("ip_address"):
709 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
711 vdur
["ip-address"] = None
712 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
713 vdur
["name"] = vdur_RO
.get("vim_name")
714 vdur
["status"] = vdur_RO
.get("status")
715 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
716 for ifacer
in get_iterable(vdur
, "interfaces"):
717 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
718 if ifacer
["name"] == interface_RO
.get("internal_name"):
719 ifacer
["ip-address"] = interface_RO
.get(
722 ifacer
["mac-address"] = interface_RO
.get(
728 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
729 "from VIM info".format(
730 vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]
733 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
737 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
739 vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]
743 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
744 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
745 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
747 vld
["vim-id"] = net_RO
.get("vim_net_id")
748 vld
["name"] = net_RO
.get("vim_name")
749 vld
["status"] = net_RO
.get("status")
750 vld
["status-detailed"] = net_RO
.get("error_msg")
751 vnfr_update
["vld.{}".format(vld_index
)] = vld
755 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
760 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
765 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
770 def _get_ns_config_info(self
, nsr_id
):
772 Generates a mapping between vnf,vdu elements and the N2VC id
773 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
774 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
775 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
776 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
778 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
779 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
781 ns_config_info
= {"osm-config-mapping": mapping
}
782 for vca
in vca_deployed_list
:
783 if not vca
["member-vnf-index"]:
785 if not vca
["vdu_id"]:
786 mapping
[vca
["member-vnf-index"]] = vca
["application"]
790 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
792 ] = vca
["application"]
793 return ns_config_info
795 async def _instantiate_ng_ro(
812 def get_vim_account(vim_account_id
):
814 if vim_account_id
in db_vims
:
815 return db_vims
[vim_account_id
]
816 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
817 db_vims
[vim_account_id
] = db_vim
820 # modify target_vld info with instantiation parameters
821 def parse_vld_instantiation_params(
822 target_vim
, target_vld
, vld_params
, target_sdn
824 if vld_params
.get("ip-profile"):
825 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
[
828 if vld_params
.get("provider-network"):
829 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
832 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
833 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
836 if vld_params
.get("wimAccountId"):
837 target_wim
= "wim:{}".format(vld_params
["wimAccountId"])
838 target_vld
["vim_info"][target_wim
] = {}
839 for param
in ("vim-network-name", "vim-network-id"):
840 if vld_params
.get(param
):
841 if isinstance(vld_params
[param
], dict):
842 for vim
, vim_net
in vld_params
[param
].items():
843 other_target_vim
= "vim:" + vim
845 target_vld
["vim_info"],
846 (other_target_vim
, param
.replace("-", "_")),
849 else: # isinstance str
850 target_vld
["vim_info"][target_vim
][
851 param
.replace("-", "_")
852 ] = vld_params
[param
]
853 if vld_params
.get("common_id"):
854 target_vld
["common_id"] = vld_params
.get("common_id")
856 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
857 def update_ns_vld_target(target
, ns_params
):
858 for vnf_params
in ns_params
.get("vnf", ()):
859 if vnf_params
.get("vimAccountId"):
863 for vnfr
in db_vnfrs
.values()
864 if vnf_params
["member-vnf-index"]
865 == vnfr
["member-vnf-index-ref"]
869 vdur
= next((vdur
for vdur
in target_vnf
.get("vdur", ())), None)
870 for a_index
, a_vld
in enumerate(target
["ns"]["vld"]):
871 target_vld
= find_in_list(
872 get_iterable(vdur
, "interfaces"),
873 lambda iface
: iface
.get("ns-vld-id") == a_vld
["name"],
876 if vnf_params
.get("vimAccountId") not in a_vld
.get(
879 target
["ns"]["vld"][a_index
].get("vim_info").update(
881 "vim:{}".format(vnf_params
["vimAccountId"]): {
882 "vim_network_name": ""
887 nslcmop_id
= db_nslcmop
["_id"]
889 "name": db_nsr
["name"],
892 "image": deepcopy(db_nsr
["image"]),
893 "flavor": deepcopy(db_nsr
["flavor"]),
894 "action_id": nslcmop_id
,
895 "cloud_init_content": {},
897 for image
in target
["image"]:
898 image
["vim_info"] = {}
899 for flavor
in target
["flavor"]:
900 flavor
["vim_info"] = {}
901 if db_nsr
.get("affinity-or-anti-affinity-group"):
902 target
["affinity-or-anti-affinity-group"] = deepcopy(db_nsr
["affinity-or-anti-affinity-group"])
903 for affinity_or_anti_affinity_group
in target
["affinity-or-anti-affinity-group"]:
904 affinity_or_anti_affinity_group
["vim_info"] = {}
906 if db_nslcmop
.get("lcmOperationType") != "instantiate":
907 # get parameters of instantiation:
908 db_nslcmop_instantiate
= self
.db
.get_list(
911 "nsInstanceId": db_nslcmop
["nsInstanceId"],
912 "lcmOperationType": "instantiate",
915 ns_params
= db_nslcmop_instantiate
.get("operationParams")
917 ns_params
= db_nslcmop
.get("operationParams")
918 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
919 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
922 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
923 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
927 "mgmt-network": vld
.get("mgmt-network", False),
928 "type": vld
.get("type"),
931 "vim_network_name": vld
.get("vim-network-name"),
932 "vim_account_id": ns_params
["vimAccountId"],
936 # check if this network needs SDN assist
937 if vld
.get("pci-interfaces"):
938 db_vim
= get_vim_account(ns_params
["vimAccountId"])
939 sdnc_id
= db_vim
["config"].get("sdn-controller")
941 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
942 target_sdn
= "sdn:{}".format(sdnc_id
)
943 target_vld
["vim_info"][target_sdn
] = {
945 "target_vim": target_vim
,
947 "type": vld
.get("type"),
950 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
951 for nsd_vnf_profile
in nsd_vnf_profiles
:
952 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
953 if cp
["virtual-link-profile-id"] == vld
["id"]:
955 "member_vnf:{}.{}".format(
956 cp
["constituent-cpd-id"][0][
957 "constituent-base-element-id"
959 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
961 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
963 # check at nsd descriptor, if there is an ip-profile
965 nsd_vlp
= find_in_list(
966 get_virtual_link_profiles(nsd
),
967 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
972 and nsd_vlp
.get("virtual-link-protocol-data")
973 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
975 ip_profile_source_data
= nsd_vlp
["virtual-link-protocol-data"][
978 ip_profile_dest_data
= {}
979 if "ip-version" in ip_profile_source_data
:
980 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
983 if "cidr" in ip_profile_source_data
:
984 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
987 if "gateway-ip" in ip_profile_source_data
:
988 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
[
991 if "dhcp-enabled" in ip_profile_source_data
:
992 ip_profile_dest_data
["dhcp-params"] = {
993 "enabled": ip_profile_source_data
["dhcp-enabled"]
995 vld_params
["ip-profile"] = ip_profile_dest_data
997 # update vld_params with instantiation params
998 vld_instantiation_params
= find_in_list(
999 get_iterable(ns_params
, "vld"),
1000 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
1002 if vld_instantiation_params
:
1003 vld_params
.update(vld_instantiation_params
)
1004 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
1005 target
["ns"]["vld"].append(target_vld
)
1006 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1007 update_ns_vld_target(target
, ns_params
)
1009 for vnfr
in db_vnfrs
.values():
1010 vnfd
= find_in_list(
1011 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
1013 vnf_params
= find_in_list(
1014 get_iterable(ns_params
, "vnf"),
1015 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
1017 target_vnf
= deepcopy(vnfr
)
1018 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
1019 for vld
in target_vnf
.get("vld", ()):
1020 # check if connected to a ns.vld, to fill target'
1021 vnf_cp
= find_in_list(
1022 vnfd
.get("int-virtual-link-desc", ()),
1023 lambda cpd
: cpd
.get("id") == vld
["id"],
1026 ns_cp
= "member_vnf:{}.{}".format(
1027 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
1029 if cp2target
.get(ns_cp
):
1030 vld
["target"] = cp2target
[ns_cp
]
1033 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
1035 # check if this network needs SDN assist
1037 if vld
.get("pci-interfaces"):
1038 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1039 sdnc_id
= db_vim
["config"].get("sdn-controller")
1041 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
1042 target_sdn
= "sdn:{}".format(sdnc_id
)
1043 vld
["vim_info"][target_sdn
] = {
1045 "target_vim": target_vim
,
1047 "type": vld
.get("type"),
1050 # check at vnfd descriptor, if there is an ip-profile
1052 vnfd_vlp
= find_in_list(
1053 get_virtual_link_profiles(vnfd
),
1054 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
1058 and vnfd_vlp
.get("virtual-link-protocol-data")
1059 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1061 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"][
1064 ip_profile_dest_data
= {}
1065 if "ip-version" in ip_profile_source_data
:
1066 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1069 if "cidr" in ip_profile_source_data
:
1070 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1073 if "gateway-ip" in ip_profile_source_data
:
1074 ip_profile_dest_data
[
1076 ] = ip_profile_source_data
["gateway-ip"]
1077 if "dhcp-enabled" in ip_profile_source_data
:
1078 ip_profile_dest_data
["dhcp-params"] = {
1079 "enabled": ip_profile_source_data
["dhcp-enabled"]
1082 vld_params
["ip-profile"] = ip_profile_dest_data
1083 # update vld_params with instantiation params
1085 vld_instantiation_params
= find_in_list(
1086 get_iterable(vnf_params
, "internal-vld"),
1087 lambda i_vld
: i_vld
["name"] == vld
["id"],
1089 if vld_instantiation_params
:
1090 vld_params
.update(vld_instantiation_params
)
1091 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
1094 for vdur
in target_vnf
.get("vdur", ()):
1095 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
1096 continue # This vdu must not be created
1097 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
1099 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
1102 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
1103 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
1106 and vdu_configuration
.get("config-access")
1107 and vdu_configuration
.get("config-access").get("ssh-access")
1109 vdur
["ssh-keys"] = ssh_keys_all
1110 vdur
["ssh-access-required"] = vdu_configuration
[
1112 ]["ssh-access"]["required"]
1115 and vnf_configuration
.get("config-access")
1116 and vnf_configuration
.get("config-access").get("ssh-access")
1117 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
1119 vdur
["ssh-keys"] = ssh_keys_all
1120 vdur
["ssh-access-required"] = vnf_configuration
[
1122 ]["ssh-access"]["required"]
1123 elif ssh_keys_instantiation
and find_in_list(
1124 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
1126 vdur
["ssh-keys"] = ssh_keys_instantiation
1128 self
.logger
.debug("NS > vdur > {}".format(vdur
))
1130 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1132 if vdud
.get("cloud-init-file"):
1133 vdur
["cloud-init"] = "{}:file:{}".format(
1134 vnfd
["_id"], vdud
.get("cloud-init-file")
1136 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1137 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1138 base_folder
= vnfd
["_admin"]["storage"]
1139 if base_folder
["pkg-dir"]:
1140 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1141 base_folder
["folder"],
1142 base_folder
["pkg-dir"],
1143 vdud
.get("cloud-init-file"),
1146 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
1147 base_folder
["folder"],
1148 vdud
.get("cloud-init-file"),
1150 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1151 target
["cloud_init_content"][
1154 elif vdud
.get("cloud-init"):
1155 vdur
["cloud-init"] = "{}:vdu:{}".format(
1156 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1158 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1159 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1162 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1163 deploy_params_vdu
= self
._format
_additional
_params
(
1164 vdur
.get("additionalParams") or {}
1166 deploy_params_vdu
["OSM"] = get_osm_params(
1167 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1169 vdur
["additionalParams"] = deploy_params_vdu
1172 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1173 if target_vim
not in ns_flavor
["vim_info"]:
1174 ns_flavor
["vim_info"][target_vim
] = {}
1177 # in case alternative images are provided we must check if they should be applied
1178 # for the vim_type, modify the vim_type taking into account
1179 ns_image_id
= int(vdur
["ns-image-id"])
1180 if vdur
.get("alt-image-ids"):
1181 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1182 vim_type
= db_vim
["vim_type"]
1183 for alt_image_id
in vdur
.get("alt-image-ids"):
1184 ns_alt_image
= target
["image"][int(alt_image_id
)]
1185 if vim_type
== ns_alt_image
.get("vim-type"):
1186 # must use alternative image
1188 "use alternative image id: {}".format(alt_image_id
)
1190 ns_image_id
= alt_image_id
1191 vdur
["ns-image-id"] = ns_image_id
1193 ns_image
= target
["image"][int(ns_image_id
)]
1194 if target_vim
not in ns_image
["vim_info"]:
1195 ns_image
["vim_info"][target_vim
] = {}
1198 if vdur
.get("affinity-or-anti-affinity-group-id"):
1199 for ags_id
in vdur
["affinity-or-anti-affinity-group-id"]:
1200 ns_ags
= target
["affinity-or-anti-affinity-group"][int(ags_id
)]
1201 if target_vim
not in ns_ags
["vim_info"]:
1202 ns_ags
["vim_info"][target_vim
] = {}
1204 vdur
["vim_info"] = {target_vim
: {}}
1205 # instantiation parameters
1207 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
1208 # vdud["id"]), None)
1209 vdur_list
.append(vdur
)
1210 target_vnf
["vdur"] = vdur_list
1211 target
["vnf"].append(target_vnf
)
1213 desc
= await self
.RO
.deploy(nsr_id
, target
)
1214 self
.logger
.debug("RO return > {}".format(desc
))
1215 action_id
= desc
["action_id"]
1216 await self
._wait
_ng
_ro
(
1217 nsr_id
, action_id
, nslcmop_id
, start_deploy
, timeout_ns_deploy
, stage
1222 "_admin.deployed.RO.operational-status": "running",
1223 "detailed-status": " ".join(stage
),
1225 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1226 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1227 self
._write
_op
_status
(nslcmop_id
, stage
)
1229 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1233 async def _wait_ng_ro(
1242 detailed_status_old
= None
1244 start_time
= start_time
or time()
1245 while time() <= start_time
+ timeout
:
1246 desc_status
= await self
.RO
.status(nsr_id
, action_id
)
1247 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1248 if desc_status
["status"] == "FAILED":
1249 raise NgRoException(desc_status
["details"])
1250 elif desc_status
["status"] == "BUILD":
1252 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1253 elif desc_status
["status"] == "DONE":
1255 stage
[2] = "Deployed at VIM"
1258 assert False, "ROclient.check_ns_status returns unknown {}".format(
1259 desc_status
["status"]
1261 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1262 detailed_status_old
= stage
[2]
1263 db_nsr_update
["detailed-status"] = " ".join(stage
)
1264 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1265 self
._write
_op
_status
(nslcmop_id
, stage
)
1266 await asyncio
.sleep(15, loop
=self
.loop
)
1267 else: # timeout_ns_deploy
1268 raise NgRoException("Timeout waiting ns to deploy")
1270 async def _terminate_ng_ro(
1271 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1276 start_deploy
= time()
1283 "action_id": nslcmop_id
,
1285 desc
= await self
.RO
.deploy(nsr_id
, target
)
1286 action_id
= desc
["action_id"]
1287 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1288 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1291 + "ns terminate action at RO. action_id={}".format(action_id
)
1295 delete_timeout
= 20 * 60 # 20 minutes
1296 await self
._wait
_ng
_ro
(
1297 nsr_id
, action_id
, nslcmop_id
, start_deploy
, delete_timeout
, stage
1300 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1301 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1303 await self
.RO
.delete(nsr_id
)
1304 except Exception as e
:
1305 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
1306 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1307 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1308 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1310 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1312 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
1313 failed_detail
.append("delete conflict: {}".format(e
))
1316 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1319 failed_detail
.append("delete error: {}".format(e
))
1322 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1326 stage
[2] = "Error deleting from VIM"
1328 stage
[2] = "Deleted from VIM"
1329 db_nsr_update
["detailed-status"] = " ".join(stage
)
1330 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1331 self
._write
_op
_status
(nslcmop_id
, stage
)
1334 raise LcmException("; ".join(failed_detail
))
1337 async def instantiate_RO(
1351 :param logging_text: preffix text to use at logging
1352 :param nsr_id: nsr identity
1353 :param nsd: database content of ns descriptor
1354 :param db_nsr: database content of ns record
1355 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1357 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1358 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1359 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1360 :return: None or exception
1363 start_deploy
= time()
1364 ns_params
= db_nslcmop
.get("operationParams")
1365 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1366 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1368 timeout_ns_deploy
= self
.timeout
.get(
1369 "ns_deploy", self
.timeout_ns_deploy
1372 # Check for and optionally request placement optimization. Database will be updated if placement activated
1373 stage
[2] = "Waiting for Placement."
1374 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1375 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1376 for vnfr
in db_vnfrs
.values():
1377 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1380 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1382 return await self
._instantiate
_ng
_ro
(
1395 except Exception as e
:
1396 stage
[2] = "ERROR deploying at VIM"
1397 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1399 "Error deploying at VIM {}".format(e
),
1400 exc_info
=not isinstance(
1403 ROclient
.ROClientException
,
1412 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1414 Wait for kdu to be up, get ip address
1415 :param logging_text: prefix use for logging
1422 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1425 while nb_tries
< 360:
1426 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1430 for x
in get_iterable(db_vnfr
, "kdur")
1431 if x
.get("kdu-name") == kdu_name
1437 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1439 if kdur
.get("status"):
1440 if kdur
["status"] in ("READY", "ENABLED"):
1441 return kdur
.get("ip-address")
1444 "target KDU={} is in error state".format(kdu_name
)
1447 await asyncio
.sleep(10, loop
=self
.loop
)
1449 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1451 async def wait_vm_up_insert_key_ro(
1452 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1455 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1456 :param logging_text: prefix use for logging
1461 :param pub_key: public ssh key to inject, None to skip
1462 :param user: user to apply the public ssh key
1466 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1470 target_vdu_id
= None
1476 if ro_retries
>= 360: # 1 hour
1478 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1481 await asyncio
.sleep(10, loop
=self
.loop
)
1484 if not target_vdu_id
:
1485 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1487 if not vdu_id
: # for the VNF case
1488 if db_vnfr
.get("status") == "ERROR":
1490 "Cannot inject ssh-key because target VNF is in error state"
1492 ip_address
= db_vnfr
.get("ip-address")
1498 for x
in get_iterable(db_vnfr
, "vdur")
1499 if x
.get("ip-address") == ip_address
1507 for x
in get_iterable(db_vnfr
, "vdur")
1508 if x
.get("vdu-id-ref") == vdu_id
1509 and x
.get("count-index") == vdu_index
1515 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1516 ): # If only one, this should be the target vdu
1517 vdur
= db_vnfr
["vdur"][0]
1520 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1521 vnfr_id
, vdu_id
, vdu_index
1524 # New generation RO stores information at "vim_info"
1527 if vdur
.get("vim_info"):
1529 t
for t
in vdur
["vim_info"]
1530 ) # there should be only one key
1531 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1533 vdur
.get("pdu-type")
1534 or vdur
.get("status") == "ACTIVE"
1535 or ng_ro_status
== "ACTIVE"
1537 ip_address
= vdur
.get("ip-address")
1540 target_vdu_id
= vdur
["vdu-id-ref"]
1541 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1543 "Cannot inject ssh-key because target VM is in error state"
1546 if not target_vdu_id
:
1549 # inject public key into machine
1550 if pub_key
and user
:
1551 self
.logger
.debug(logging_text
+ "Inserting RO key")
1552 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1553 if vdur
.get("pdu-type"):
1554 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1557 ro_vm_id
= "{}-{}".format(
1558 db_vnfr
["member-vnf-index-ref"], target_vdu_id
1559 ) # TODO add vdu_index
1563 "action": "inject_ssh_key",
1567 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1569 desc
= await self
.RO
.deploy(nsr_id
, target
)
1570 action_id
= desc
["action_id"]
1571 await self
._wait
_ng
_ro
(nsr_id
, action_id
, timeout
=600)
1574 # wait until NS is deployed at RO
1576 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1577 ro_nsr_id
= deep_get(
1578 db_nsrs
, ("_admin", "deployed", "RO", "nsr_id")
1582 result_dict
= await self
.RO
.create_action(
1584 item_id_name
=ro_nsr_id
,
1586 "add_public_key": pub_key
,
1591 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1592 if not result_dict
or not isinstance(result_dict
, dict):
1594 "Unknown response from RO when injecting key"
1596 for result
in result_dict
.values():
1597 if result
.get("vim_result") == 200:
1600 raise ROclient
.ROClientException(
1601 "error injecting key: {}".format(
1602 result
.get("description")
1606 except NgRoException
as e
:
1608 "Reaching max tries injecting key. Error: {}".format(e
)
1610 except ROclient
.ROClientException
as e
:
1614 + "error injecting key: {}. Retrying until {} seconds".format(
1621 "Reaching max tries injecting key. Error: {}".format(e
)
1628 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1630 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1632 my_vca
= vca_deployed_list
[vca_index
]
1633 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1634 # vdu or kdu: no dependencies
1638 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1639 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1640 configuration_status_list
= db_nsr
["configurationStatus"]
1641 for index
, vca_deployed
in enumerate(configuration_status_list
):
1642 if index
== vca_index
:
1645 if not my_vca
.get("member-vnf-index") or (
1646 vca_deployed
.get("member-vnf-index")
1647 == my_vca
.get("member-vnf-index")
1649 internal_status
= configuration_status_list
[index
].get("status")
1650 if internal_status
== "READY":
1652 elif internal_status
== "BROKEN":
1654 "Configuration aborted because dependent charm/s has failed"
1659 # no dependencies, return
1661 await asyncio
.sleep(10)
1664 raise LcmException("Configuration aborted because dependent charm/s timeout")
1666 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1669 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1671 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1672 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1675 async def instantiate_N2VC(
1692 ee_config_descriptor
,
1694 nsr_id
= db_nsr
["_id"]
1695 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1696 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1697 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1698 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1700 "collection": "nsrs",
1701 "filter": {"_id": nsr_id
},
1702 "path": db_update_entry
,
1708 element_under_configuration
= nsr_id
1712 vnfr_id
= db_vnfr
["_id"]
1713 osm_config
["osm"]["vnf_id"] = vnfr_id
1715 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1717 if vca_type
== "native_charm":
1720 index_number
= vdu_index
or 0
1723 element_type
= "VNF"
1724 element_under_configuration
= vnfr_id
1725 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1727 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1728 element_type
= "VDU"
1729 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1730 osm_config
["osm"]["vdu_id"] = vdu_id
1732 namespace
+= ".{}".format(kdu_name
)
1733 element_type
= "KDU"
1734 element_under_configuration
= kdu_name
1735 osm_config
["osm"]["kdu_name"] = kdu_name
1738 if base_folder
["pkg-dir"]:
1739 artifact_path
= "{}/{}/{}/{}".format(
1740 base_folder
["folder"],
1741 base_folder
["pkg-dir"],
1743 if vca_type
in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1748 artifact_path
= "{}/Scripts/{}/{}/".format(
1749 base_folder
["folder"],
1751 if vca_type
in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1756 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1758 # get initial_config_primitive_list that applies to this element
1759 initial_config_primitive_list
= config_descriptor
.get(
1760 "initial-config-primitive"
1764 "Initial config primitive list > {}".format(
1765 initial_config_primitive_list
1769 # add config if not present for NS charm
1770 ee_descriptor_id
= ee_config_descriptor
.get("id")
1771 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1772 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1773 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1777 "Initial config primitive list #2 > {}".format(
1778 initial_config_primitive_list
1781 # n2vc_redesign STEP 3.1
1782 # find old ee_id if exists
1783 ee_id
= vca_deployed
.get("ee_id")
1785 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1786 # create or register execution environment in VCA
1787 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1789 self
._write
_configuration
_status
(
1791 vca_index
=vca_index
,
1793 element_under_configuration
=element_under_configuration
,
1794 element_type
=element_type
,
1797 step
= "create execution environment"
1798 self
.logger
.debug(logging_text
+ step
)
1802 if vca_type
== "k8s_proxy_charm":
1803 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1804 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1805 namespace
=namespace
,
1806 artifact_path
=artifact_path
,
1810 elif vca_type
== "helm" or vca_type
== "helm-v3":
1811 ee_id
, credentials
= await self
.vca_map
[
1813 ].create_execution_environment(
1814 namespace
=namespace
,
1818 artifact_path
=artifact_path
,
1822 ee_id
, credentials
= await self
.vca_map
[
1824 ].create_execution_environment(
1825 namespace
=namespace
,
1831 elif vca_type
== "native_charm":
1832 step
= "Waiting to VM being up and getting IP address"
1833 self
.logger
.debug(logging_text
+ step
)
1834 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1843 credentials
= {"hostname": rw_mgmt_ip
}
1845 username
= deep_get(
1846 config_descriptor
, ("config-access", "ssh-access", "default-user")
1848 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1849 # merged. Meanwhile let's get username from initial-config-primitive
1850 if not username
and initial_config_primitive_list
:
1851 for config_primitive
in initial_config_primitive_list
:
1852 for param
in config_primitive
.get("parameter", ()):
1853 if param
["name"] == "ssh-username":
1854 username
= param
["value"]
1858 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1859 "'config-access.ssh-access.default-user'"
1861 credentials
["username"] = username
1862 # n2vc_redesign STEP 3.2
1864 self
._write
_configuration
_status
(
1866 vca_index
=vca_index
,
1867 status
="REGISTERING",
1868 element_under_configuration
=element_under_configuration
,
1869 element_type
=element_type
,
1872 step
= "register execution environment {}".format(credentials
)
1873 self
.logger
.debug(logging_text
+ step
)
1874 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1875 credentials
=credentials
,
1876 namespace
=namespace
,
1881 # for compatibility with MON/POL modules, the need model and application name at database
1882 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1883 ee_id_parts
= ee_id
.split(".")
1884 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1885 if len(ee_id_parts
) >= 2:
1886 model_name
= ee_id_parts
[0]
1887 application_name
= ee_id_parts
[1]
1888 db_nsr_update
[db_update_entry
+ "model"] = model_name
1889 db_nsr_update
[db_update_entry
+ "application"] = application_name
1891 # n2vc_redesign STEP 3.3
1892 step
= "Install configuration Software"
1894 self
._write
_configuration
_status
(
1896 vca_index
=vca_index
,
1897 status
="INSTALLING SW",
1898 element_under_configuration
=element_under_configuration
,
1899 element_type
=element_type
,
1900 other_update
=db_nsr_update
,
1903 # TODO check if already done
1904 self
.logger
.debug(logging_text
+ step
)
1906 if vca_type
== "native_charm":
1907 config_primitive
= next(
1908 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
1911 if config_primitive
:
1912 config
= self
._map
_primitive
_params
(
1913 config_primitive
, {}, deploy_params
1916 if vca_type
== "lxc_proxy_charm":
1917 if element_type
== "NS":
1918 num_units
= db_nsr
.get("config-units") or 1
1919 elif element_type
== "VNF":
1920 num_units
= db_vnfr
.get("config-units") or 1
1921 elif element_type
== "VDU":
1922 for v
in db_vnfr
["vdur"]:
1923 if vdu_id
== v
["vdu-id-ref"]:
1924 num_units
= v
.get("config-units") or 1
1926 if vca_type
!= "k8s_proxy_charm":
1927 await self
.vca_map
[vca_type
].install_configuration_sw(
1929 artifact_path
=artifact_path
,
1932 num_units
=num_units
,
1937 # write in db flag of configuration_sw already installed
1939 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
1942 # add relations for this VCA (wait for other peers related with this VCA)
1943 await self
._add
_vca
_relations
(
1944 logging_text
=logging_text
,
1947 vca_index
=vca_index
,
1950 # if SSH access is required, then get execution environment SSH public
1951 # if native charm we have waited already to VM be UP
1952 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1955 # self.logger.debug("get ssh key block")
1957 config_descriptor
, ("config-access", "ssh-access", "required")
1959 # self.logger.debug("ssh key needed")
1960 # Needed to inject a ssh key
1963 ("config-access", "ssh-access", "default-user"),
1965 step
= "Install configuration Software, getting public ssh key"
1966 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
1967 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
1970 step
= "Insert public key into VM user={} ssh_key={}".format(
1974 # self.logger.debug("no need to get ssh key")
1975 step
= "Waiting to VM being up and getting IP address"
1976 self
.logger
.debug(logging_text
+ step
)
1978 # default rw_mgmt_ip to None, avoiding the non definition of the variable
1981 # n2vc_redesign STEP 5.1
1982 # wait for RO (ip-address) Insert pub_key into VM
1985 rw_mgmt_ip
= await self
.wait_kdu_up(
1986 logging_text
, nsr_id
, vnfr_id
, kdu_name
1989 # This verification is needed in order to avoid trying to add a public key
1990 # to a VM, when the VNF is a KNF (in the edge case where the user creates a VCA
1991 # for a KNF and not for its KDUs, the previous verification gives False, and the code
1992 # jumps to this block, meaning that there is the need to verify if the VNF is actually a VNF
1994 elif db_vnfr
.get('vdur'):
1995 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
2005 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
2007 # store rw_mgmt_ip in deploy params for later replacement
2008 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
2010 # n2vc_redesign STEP 6 Execute initial config primitive
2011 step
= "execute initial config primitive"
2013 # wait for dependent primitives execution (NS -> VNF -> VDU)
2014 if initial_config_primitive_list
:
2015 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
2017 # stage, in function of element type: vdu, kdu, vnf or ns
2018 my_vca
= vca_deployed_list
[vca_index
]
2019 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
2021 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
2022 elif my_vca
.get("member-vnf-index"):
2024 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
2027 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
2029 self
._write
_configuration
_status
(
2030 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
2033 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2035 check_if_terminated_needed
= True
2036 for initial_config_primitive
in initial_config_primitive_list
:
2037 # adding information on the vca_deployed if it is a NS execution environment
2038 if not vca_deployed
["member-vnf-index"]:
2039 deploy_params
["ns_config_info"] = json
.dumps(
2040 self
._get
_ns
_config
_info
(nsr_id
)
2042 # TODO check if already done
2043 primitive_params_
= self
._map
_primitive
_params
(
2044 initial_config_primitive
, {}, deploy_params
2047 step
= "execute primitive '{}' params '{}'".format(
2048 initial_config_primitive
["name"], primitive_params_
2050 self
.logger
.debug(logging_text
+ step
)
2051 await self
.vca_map
[vca_type
].exec_primitive(
2053 primitive_name
=initial_config_primitive
["name"],
2054 params_dict
=primitive_params_
,
2059 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2060 if check_if_terminated_needed
:
2061 if config_descriptor
.get("terminate-config-primitive"):
2063 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
2065 check_if_terminated_needed
= False
2067 # TODO register in database that primitive is done
2069 # STEP 7 Configure metrics
2070 if vca_type
== "helm" or vca_type
== "helm-v3":
2071 prometheus_jobs
= await self
.extract_prometheus_scrape_jobs(
2073 artifact_path
=artifact_path
,
2074 ee_config_descriptor
=ee_config_descriptor
,
2077 target_ip
=rw_mgmt_ip
,
2083 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
2086 for job
in prometheus_jobs
:
2090 "job_name": job
["job_name"]
2094 fail_on_empty
=False,
2097 step
= "instantiated at VCA"
2098 self
.logger
.debug(logging_text
+ step
)
2100 self
._write
_configuration
_status
(
2101 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
2104 except Exception as e
: # TODO not use Exception but N2VC exception
2105 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2107 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
2110 "Exception while {} : {}".format(step
, e
), exc_info
=True
2112 self
._write
_configuration
_status
(
2113 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2115 raise LcmException("{} {}".format(step
, e
)) from e
2117 def _write_ns_status(
2121 current_operation
: str,
2122 current_operation_id
: str,
2123 error_description
: str = None,
2124 error_detail
: str = None,
2125 other_update
: dict = None,
2128 Update db_nsr fields.
2131 :param current_operation:
2132 :param current_operation_id:
2133 :param error_description:
2134 :param error_detail:
2135 :param other_update: Other required changes at database if provided, will be cleared
2139 db_dict
= other_update
or {}
2142 ] = current_operation_id
# for backward compatibility
2143 db_dict
["_admin.current-operation"] = current_operation_id
2144 db_dict
["_admin.operation-type"] = (
2145 current_operation
if current_operation
!= "IDLE" else None
2147 db_dict
["currentOperation"] = current_operation
2148 db_dict
["currentOperationID"] = current_operation_id
2149 db_dict
["errorDescription"] = error_description
2150 db_dict
["errorDetail"] = error_detail
2153 db_dict
["nsState"] = ns_state
2154 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2155 except DbException
as e
:
2156 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2158 def _write_op_status(
2162 error_message
: str = None,
2163 queuePosition
: int = 0,
2164 operation_state
: str = None,
2165 other_update
: dict = None,
2168 db_dict
= other_update
or {}
2169 db_dict
["queuePosition"] = queuePosition
2170 if isinstance(stage
, list):
2171 db_dict
["stage"] = stage
[0]
2172 db_dict
["detailed-status"] = " ".join(stage
)
2173 elif stage
is not None:
2174 db_dict
["stage"] = str(stage
)
2176 if error_message
is not None:
2177 db_dict
["errorMessage"] = error_message
2178 if operation_state
is not None:
2179 db_dict
["operationState"] = operation_state
2180 db_dict
["statusEnteredTime"] = time()
2181 self
.update_db_2("nslcmops", op_id
, db_dict
)
2182 except DbException
as e
:
2184 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2187 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2189 nsr_id
= db_nsr
["_id"]
2190 # configurationStatus
2191 config_status
= db_nsr
.get("configurationStatus")
2194 "configurationStatus.{}.status".format(index
): status
2195 for index
, v
in enumerate(config_status
)
2199 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2201 except DbException
as e
:
2203 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2206 def _write_configuration_status(
2211 element_under_configuration
: str = None,
2212 element_type
: str = None,
2213 other_update
: dict = None,
2216 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2217 # .format(vca_index, status))
2220 db_path
= "configurationStatus.{}.".format(vca_index
)
2221 db_dict
= other_update
or {}
2223 db_dict
[db_path
+ "status"] = status
2224 if element_under_configuration
:
2226 db_path
+ "elementUnderConfiguration"
2227 ] = element_under_configuration
2229 db_dict
[db_path
+ "elementType"] = element_type
2230 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2231 except DbException
as e
:
2233 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2234 status
, nsr_id
, vca_index
, e
2238 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2240 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2241 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2242 Database is used because the result can be obtained from a different LCM worker in case of HA.
2243 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2244 :param db_nslcmop: database content of nslcmop
2245 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2246 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2247 computed 'vim-account-id'
2250 nslcmop_id
= db_nslcmop
["_id"]
2251 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2252 if placement_engine
== "PLA":
2254 logging_text
+ "Invoke and wait for placement optimization"
2256 await self
.msg
.aiowrite(
2257 "pla", "get_placement", {"nslcmopId": nslcmop_id
}, loop
=self
.loop
2259 db_poll_interval
= 5
2260 wait
= db_poll_interval
* 10
2262 while not pla_result
and wait
>= 0:
2263 await asyncio
.sleep(db_poll_interval
)
2264 wait
-= db_poll_interval
2265 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2266 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2270 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2273 for pla_vnf
in pla_result
["vnf"]:
2274 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2275 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2280 {"_id": vnfr
["_id"]},
2281 {"vim-account-id": pla_vnf
["vimAccountId"]},
2284 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2287 def update_nsrs_with_pla_result(self
, params
):
2289 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2291 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2293 except Exception as e
:
2294 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2296 async def instantiate(self
, nsr_id
, nslcmop_id
):
2299 :param nsr_id: ns instance to deploy
2300 :param nslcmop_id: operation to run
2304 # Try to lock HA task here
2305 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2306 if not task_is_locked_by_me
:
2308 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2312 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2313 self
.logger
.debug(logging_text
+ "Enter")
2315 # get all needed from database
2317 # database nsrs record
2320 # database nslcmops record
2323 # update operation on nsrs
2325 # update operation on nslcmops
2326 db_nslcmop_update
= {}
2328 nslcmop_operation_state
= None
2329 db_vnfrs
= {} # vnf's info indexed by member-index
2331 tasks_dict_info
= {} # from task to info text
2335 "Stage 1/5: preparation of the environment.",
2336 "Waiting for previous operations to terminate.",
2339 # ^ stage, step, VIM progress
2341 # wait for any previous tasks in process
2342 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2344 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2345 stage
[1] = "Reading from database."
2346 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2347 db_nsr_update
["detailed-status"] = "creating"
2348 db_nsr_update
["operational-status"] = "init"
2349 self
._write
_ns
_status
(
2351 ns_state
="BUILDING",
2352 current_operation
="INSTANTIATING",
2353 current_operation_id
=nslcmop_id
,
2354 other_update
=db_nsr_update
,
2356 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2358 # read from db: operation
2359 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2360 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2361 if db_nslcmop
["operationParams"].get("additionalParamsForVnf"):
2362 db_nslcmop
["operationParams"]["additionalParamsForVnf"] = json
.loads(
2363 db_nslcmop
["operationParams"]["additionalParamsForVnf"]
2365 ns_params
= db_nslcmop
.get("operationParams")
2366 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2367 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2369 timeout_ns_deploy
= self
.timeout
.get(
2370 "ns_deploy", self
.timeout_ns_deploy
2374 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2375 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2376 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2377 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2378 self
.fs
.sync(db_nsr
["nsd-id"])
2380 # nsr_name = db_nsr["name"] # TODO short-name??
2382 # read from db: vnf's of this ns
2383 stage
[1] = "Getting vnfrs from db."
2384 self
.logger
.debug(logging_text
+ stage
[1])
2385 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2387 # read from db: vnfd's for every vnf
2388 db_vnfds
= [] # every vnfd data
2390 # for each vnf in ns, read vnfd
2391 for vnfr
in db_vnfrs_list
:
2392 if vnfr
.get("kdur"):
2394 for kdur
in vnfr
["kdur"]:
2395 if kdur
.get("additionalParams"):
2396 kdur
["additionalParams"] = json
.loads(
2397 kdur
["additionalParams"]
2399 kdur_list
.append(kdur
)
2400 vnfr
["kdur"] = kdur_list
2402 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2403 vnfd_id
= vnfr
["vnfd-id"]
2404 vnfd_ref
= vnfr
["vnfd-ref"]
2405 self
.fs
.sync(vnfd_id
)
2407 # if we haven't this vnfd, read it from db
2408 if vnfd_id
not in db_vnfds
:
2410 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2413 self
.logger
.debug(logging_text
+ stage
[1])
2414 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2417 db_vnfds
.append(vnfd
)
2419 # Get or generates the _admin.deployed.VCA list
2420 vca_deployed_list
= None
2421 if db_nsr
["_admin"].get("deployed"):
2422 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2423 if vca_deployed_list
is None:
2424 vca_deployed_list
= []
2425 configuration_status_list
= []
2426 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2427 db_nsr_update
["configurationStatus"] = configuration_status_list
2428 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2429 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2430 elif isinstance(vca_deployed_list
, dict):
2431 # maintain backward compatibility. Change a dict to list at database
2432 vca_deployed_list
= list(vca_deployed_list
.values())
2433 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2434 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2437 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2439 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2440 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2442 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2443 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2444 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2446 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2449 # n2vc_redesign STEP 2 Deploy Network Scenario
2450 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2451 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2453 stage
[1] = "Deploying KDUs."
2454 # self.logger.debug(logging_text + "Before deploy_kdus")
2455 # Call to deploy_kdus in case exists the "vdu:kdu" param
2456 await self
.deploy_kdus(
2457 logging_text
=logging_text
,
2459 nslcmop_id
=nslcmop_id
,
2462 task_instantiation_info
=tasks_dict_info
,
2465 stage
[1] = "Getting VCA public key."
2466 # n2vc_redesign STEP 1 Get VCA public ssh-key
2467 # feature 1429. Add n2vc public key to needed VMs
2468 n2vc_key
= self
.n2vc
.get_public_key()
2469 n2vc_key_list
= [n2vc_key
]
2470 if self
.vca_config
.get("public_key"):
2471 n2vc_key_list
.append(self
.vca_config
["public_key"])
2473 stage
[1] = "Deploying NS at VIM."
2474 task_ro
= asyncio
.ensure_future(
2475 self
.instantiate_RO(
2476 logging_text
=logging_text
,
2480 db_nslcmop
=db_nslcmop
,
2483 n2vc_key_list
=n2vc_key_list
,
2487 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2488 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2490 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2491 stage
[1] = "Deploying Execution Environments."
2492 self
.logger
.debug(logging_text
+ stage
[1])
2494 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2495 for vnf_profile
in get_vnf_profiles(nsd
):
2496 vnfd_id
= vnf_profile
["vnfd-id"]
2497 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2498 member_vnf_index
= str(vnf_profile
["id"])
2499 db_vnfr
= db_vnfrs
[member_vnf_index
]
2500 base_folder
= vnfd
["_admin"]["storage"]
2506 # Get additional parameters
2507 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2508 if db_vnfr
.get("additionalParamsForVnf"):
2509 deploy_params
.update(
2510 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2513 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2514 if descriptor_config
:
2516 logging_text
=logging_text
2517 + "member_vnf_index={} ".format(member_vnf_index
),
2520 nslcmop_id
=nslcmop_id
,
2526 member_vnf_index
=member_vnf_index
,
2527 vdu_index
=vdu_index
,
2529 deploy_params
=deploy_params
,
2530 descriptor_config
=descriptor_config
,
2531 base_folder
=base_folder
,
2532 task_instantiation_info
=tasks_dict_info
,
2536 # Deploy charms for each VDU that supports one.
2537 for vdud
in get_vdu_list(vnfd
):
2539 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2540 vdur
= find_in_list(
2541 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2544 if vdur
.get("additionalParams"):
2545 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2547 deploy_params_vdu
= deploy_params
2548 deploy_params_vdu
["OSM"] = get_osm_params(
2549 db_vnfr
, vdu_id
, vdu_count_index
=0
2551 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2553 self
.logger
.debug("VDUD > {}".format(vdud
))
2555 "Descriptor config > {}".format(descriptor_config
)
2557 if descriptor_config
:
2560 for vdu_index
in range(vdud_count
):
2561 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2563 logging_text
=logging_text
2564 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2565 member_vnf_index
, vdu_id
, vdu_index
2569 nslcmop_id
=nslcmop_id
,
2575 member_vnf_index
=member_vnf_index
,
2576 vdu_index
=vdu_index
,
2578 deploy_params
=deploy_params_vdu
,
2579 descriptor_config
=descriptor_config
,
2580 base_folder
=base_folder
,
2581 task_instantiation_info
=tasks_dict_info
,
2584 for kdud
in get_kdu_list(vnfd
):
2585 kdu_name
= kdud
["name"]
2586 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2587 if descriptor_config
:
2592 x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
2594 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2595 if kdur
.get("additionalParams"):
2596 deploy_params_kdu
.update(
2597 parse_yaml_strings(kdur
["additionalParams"].copy())
2601 logging_text
=logging_text
,
2604 nslcmop_id
=nslcmop_id
,
2610 member_vnf_index
=member_vnf_index
,
2611 vdu_index
=vdu_index
,
2613 deploy_params
=deploy_params_kdu
,
2614 descriptor_config
=descriptor_config
,
2615 base_folder
=base_folder
,
2616 task_instantiation_info
=tasks_dict_info
,
2620 # Check if this NS has a charm configuration
2621 descriptor_config
= nsd
.get("ns-configuration")
2622 if descriptor_config
and descriptor_config
.get("juju"):
2625 member_vnf_index
= None
2631 # Get additional parameters
2632 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2633 if db_nsr
.get("additionalParamsForNs"):
2634 deploy_params
.update(
2635 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2637 base_folder
= nsd
["_admin"]["storage"]
2639 logging_text
=logging_text
,
2642 nslcmop_id
=nslcmop_id
,
2648 member_vnf_index
=member_vnf_index
,
2649 vdu_index
=vdu_index
,
2651 deploy_params
=deploy_params
,
2652 descriptor_config
=descriptor_config
,
2653 base_folder
=base_folder
,
2654 task_instantiation_info
=tasks_dict_info
,
2658 # rest of staff will be done at finally
2661 ROclient
.ROClientException
,
2667 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
2670 except asyncio
.CancelledError
:
2672 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
2674 exc
= "Operation was cancelled"
2675 except Exception as e
:
2676 exc
= traceback
.format_exc()
2677 self
.logger
.critical(
2678 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
2683 error_list
.append(str(exc
))
2685 # wait for pending tasks
2687 stage
[1] = "Waiting for instantiate pending tasks."
2688 self
.logger
.debug(logging_text
+ stage
[1])
2689 error_list
+= await self
._wait
_for
_tasks
(
2697 stage
[1] = stage
[2] = ""
2698 except asyncio
.CancelledError
:
2699 error_list
.append("Cancelled")
2700 # TODO cancel all tasks
2701 except Exception as exc
:
2702 error_list
.append(str(exc
))
2704 # update operation-status
2705 db_nsr_update
["operational-status"] = "running"
2706 # let's begin with VCA 'configured' status (later we can change it)
2707 db_nsr_update
["config-status"] = "configured"
2708 for task
, task_name
in tasks_dict_info
.items():
2709 if not task
.done() or task
.cancelled() or task
.exception():
2710 if task_name
.startswith(self
.task_name_deploy_vca
):
2711 # A N2VC task is pending
2712 db_nsr_update
["config-status"] = "failed"
2714 # RO or KDU task is pending
2715 db_nsr_update
["operational-status"] = "failed"
2717 # update status at database
2719 error_detail
= ". ".join(error_list
)
2720 self
.logger
.error(logging_text
+ error_detail
)
2721 error_description_nslcmop
= "{} Detail: {}".format(
2722 stage
[0], error_detail
2724 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
2725 nslcmop_id
, stage
[0]
2728 db_nsr_update
["detailed-status"] = (
2729 error_description_nsr
+ " Detail: " + error_detail
2731 db_nslcmop_update
["detailed-status"] = error_detail
2732 nslcmop_operation_state
= "FAILED"
2736 error_description_nsr
= error_description_nslcmop
= None
2738 db_nsr_update
["detailed-status"] = "Done"
2739 db_nslcmop_update
["detailed-status"] = "Done"
2740 nslcmop_operation_state
= "COMPLETED"
2743 self
._write
_ns
_status
(
2746 current_operation
="IDLE",
2747 current_operation_id
=None,
2748 error_description
=error_description_nsr
,
2749 error_detail
=error_detail
,
2750 other_update
=db_nsr_update
,
2752 self
._write
_op
_status
(
2755 error_message
=error_description_nslcmop
,
2756 operation_state
=nslcmop_operation_state
,
2757 other_update
=db_nslcmop_update
,
2760 if nslcmop_operation_state
:
2762 await self
.msg
.aiowrite(
2767 "nslcmop_id": nslcmop_id
,
2768 "operationState": nslcmop_operation_state
,
2772 except Exception as e
:
2774 logging_text
+ "kafka_write notification Exception {}".format(e
)
2777 self
.logger
.debug(logging_text
+ "Exit")
2778 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2780 def _get_vnfd(self
, vnfd_id
: str, cached_vnfds
: Dict
[str, Any
]):
2781 if vnfd_id
not in cached_vnfds
:
2782 cached_vnfds
[vnfd_id
] = self
.db
.get_one("vnfds", {"id": vnfd_id
})
2783 return cached_vnfds
[vnfd_id
]
2785 def _get_vnfr(self
, nsr_id
: str, vnf_profile_id
: str, cached_vnfrs
: Dict
[str, Any
]):
2786 if vnf_profile_id
not in cached_vnfrs
:
2787 cached_vnfrs
[vnf_profile_id
] = self
.db
.get_one(
2790 "member-vnf-index-ref": vnf_profile_id
,
2791 "nsr-id-ref": nsr_id
,
2794 return cached_vnfrs
[vnf_profile_id
]
2796 def _is_deployed_vca_in_relation(
2797 self
, vca
: DeployedVCA
, relation
: Relation
2800 for endpoint
in (relation
.provider
, relation
.requirer
):
2801 if endpoint
["kdu-resource-profile-id"]:
2804 vca
.vnf_profile_id
== endpoint
.vnf_profile_id
2805 and vca
.vdu_profile_id
== endpoint
.vdu_profile_id
2806 and vca
.execution_environment_ref
== endpoint
.execution_environment_ref
2812 def _update_ee_relation_data_with_implicit_data(
2813 self
, nsr_id
, nsd
, ee_relation_data
, cached_vnfds
, vnf_profile_id
: str = None
2815 ee_relation_data
= safe_get_ee_relation(
2816 nsr_id
, ee_relation_data
, vnf_profile_id
=vnf_profile_id
2818 ee_relation_level
= EELevel
.get_level(ee_relation_data
)
2819 if (ee_relation_level
in (EELevel
.VNF
, EELevel
.VDU
)) and not ee_relation_data
[
2820 "execution-environment-ref"
2822 vnf_profile
= get_vnf_profile(nsd
, ee_relation_data
["vnf-profile-id"])
2823 vnfd_id
= vnf_profile
["vnfd-id"]
2824 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2827 if ee_relation_level
== EELevel
.VNF
2828 else ee_relation_data
["vdu-profile-id"]
2830 ee
= get_juju_ee_ref(db_vnfd
, entity_id
)
2833 f
"not execution environments found for ee_relation {ee_relation_data}"
2835 ee_relation_data
["execution-environment-ref"] = ee
["id"]
2836 return ee_relation_data
2838 def _get_ns_relations(
2841 nsd
: Dict
[str, Any
],
2843 cached_vnfds
: Dict
[str, Any
],
2844 ) -> List
[Relation
]:
2846 db_ns_relations
= get_ns_configuration_relation_list(nsd
)
2847 for r
in db_ns_relations
:
2848 provider_dict
= None
2849 requirer_dict
= None
2850 if all(key
in r
for key
in ("provider", "requirer")):
2851 provider_dict
= r
["provider"]
2852 requirer_dict
= r
["requirer"]
2853 elif "entities" in r
:
2854 provider_id
= r
["entities"][0]["id"]
2857 "endpoint": r
["entities"][0]["endpoint"],
2859 if provider_id
!= nsd
["id"]:
2860 provider_dict
["vnf-profile-id"] = provider_id
2861 requirer_id
= r
["entities"][1]["id"]
2864 "endpoint": r
["entities"][1]["endpoint"],
2866 if requirer_id
!= nsd
["id"]:
2867 requirer_dict
["vnf-profile-id"] = requirer_id
2869 raise Exception("provider/requirer or entities must be included in the relation.")
2870 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2871 nsr_id
, nsd
, provider_dict
, cached_vnfds
2873 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2874 nsr_id
, nsd
, requirer_dict
, cached_vnfds
2876 provider
= EERelation(relation_provider
)
2877 requirer
= EERelation(relation_requirer
)
2878 relation
= Relation(r
["name"], provider
, requirer
)
2879 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
2881 relations
.append(relation
)
2884 def _get_vnf_relations(
2887 nsd
: Dict
[str, Any
],
2889 cached_vnfds
: Dict
[str, Any
],
2890 ) -> List
[Relation
]:
2892 vnf_profile
= get_vnf_profile(nsd
, vca
.vnf_profile_id
)
2893 vnf_profile_id
= vnf_profile
["id"]
2894 vnfd_id
= vnf_profile
["vnfd-id"]
2895 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2896 db_vnf_relations
= get_relation_list(db_vnfd
, vnfd_id
)
2897 for r
in db_vnf_relations
:
2898 provider_dict
= None
2899 requirer_dict
= None
2900 if all(key
in r
for key
in ("provider", "requirer")):
2901 provider_dict
= r
["provider"]
2902 requirer_dict
= r
["requirer"]
2903 elif "entities" in r
:
2904 provider_id
= r
["entities"][0]["id"]
2907 "vnf-profile-id": vnf_profile_id
,
2908 "endpoint": r
["entities"][0]["endpoint"],
2910 if provider_id
!= vnfd_id
:
2911 provider_dict
["vdu-profile-id"] = provider_id
2912 requirer_id
= r
["entities"][1]["id"]
2915 "vnf-profile-id": vnf_profile_id
,
2916 "endpoint": r
["entities"][1]["endpoint"],
2918 if requirer_id
!= vnfd_id
:
2919 requirer_dict
["vdu-profile-id"] = requirer_id
2921 raise Exception("provider/requirer or entities must be included in the relation.")
2922 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2923 nsr_id
, nsd
, provider_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
2925 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2926 nsr_id
, nsd
, requirer_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
2928 provider
= EERelation(relation_provider
)
2929 requirer
= EERelation(relation_requirer
)
2930 relation
= Relation(r
["name"], provider
, requirer
)
2931 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
2933 relations
.append(relation
)
2936 def _get_kdu_resource_data(
2938 ee_relation
: EERelation
,
2939 db_nsr
: Dict
[str, Any
],
2940 cached_vnfds
: Dict
[str, Any
],
2941 ) -> DeployedK8sResource
:
2942 nsd
= get_nsd(db_nsr
)
2943 vnf_profiles
= get_vnf_profiles(nsd
)
2944 vnfd_id
= find_in_list(
2946 lambda vnf_profile
: vnf_profile
["id"] == ee_relation
.vnf_profile_id
,
2948 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2949 kdu_resource_profile
= get_kdu_resource_profile(
2950 db_vnfd
, ee_relation
.kdu_resource_profile_id
2952 kdu_name
= kdu_resource_profile
["kdu-name"]
2953 deployed_kdu
, _
= get_deployed_kdu(
2954 db_nsr
.get("_admin", ()).get("deployed", ()),
2956 ee_relation
.vnf_profile_id
,
2958 deployed_kdu
.update({"resource-name": kdu_resource_profile
["resource-name"]})
2961 def _get_deployed_component(
2963 ee_relation
: EERelation
,
2964 db_nsr
: Dict
[str, Any
],
2965 cached_vnfds
: Dict
[str, Any
],
2966 ) -> DeployedComponent
:
2967 nsr_id
= db_nsr
["_id"]
2968 deployed_component
= None
2969 ee_level
= EELevel
.get_level(ee_relation
)
2970 if ee_level
== EELevel
.NS
:
2971 vca
= get_deployed_vca(db_nsr
, {"vdu_id": None, "member-vnf-index": None})
2973 deployed_component
= DeployedVCA(nsr_id
, vca
)
2974 elif ee_level
== EELevel
.VNF
:
2975 vca
= get_deployed_vca(
2979 "member-vnf-index": ee_relation
.vnf_profile_id
,
2980 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
2984 deployed_component
= DeployedVCA(nsr_id
, vca
)
2985 elif ee_level
== EELevel
.VDU
:
2986 vca
= get_deployed_vca(
2989 "vdu_id": ee_relation
.vdu_profile_id
,
2990 "member-vnf-index": ee_relation
.vnf_profile_id
,
2991 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
2995 deployed_component
= DeployedVCA(nsr_id
, vca
)
2996 elif ee_level
== EELevel
.KDU
:
2997 kdu_resource_data
= self
._get
_kdu
_resource
_data
(
2998 ee_relation
, db_nsr
, cached_vnfds
3000 if kdu_resource_data
:
3001 deployed_component
= DeployedK8sResource(kdu_resource_data
)
3002 return deployed_component
3004 async def _add_relation(
3008 db_nsr
: Dict
[str, Any
],
3009 cached_vnfds
: Dict
[str, Any
],
3010 cached_vnfrs
: Dict
[str, Any
],
3012 deployed_provider
= self
._get
_deployed
_component
(
3013 relation
.provider
, db_nsr
, cached_vnfds
3015 deployed_requirer
= self
._get
_deployed
_component
(
3016 relation
.requirer
, db_nsr
, cached_vnfds
3020 and deployed_requirer
3021 and deployed_provider
.config_sw_installed
3022 and deployed_requirer
.config_sw_installed
3024 provider_db_vnfr
= (
3026 relation
.provider
.nsr_id
,
3027 relation
.provider
.vnf_profile_id
,
3030 if relation
.provider
.vnf_profile_id
3033 requirer_db_vnfr
= (
3035 relation
.requirer
.nsr_id
,
3036 relation
.requirer
.vnf_profile_id
,
3039 if relation
.requirer
.vnf_profile_id
3042 provider_vca_id
= self
.get_vca_id(provider_db_vnfr
, db_nsr
)
3043 requirer_vca_id
= self
.get_vca_id(requirer_db_vnfr
, db_nsr
)
3044 provider_relation_endpoint
= RelationEndpoint(
3045 deployed_provider
.ee_id
,
3047 relation
.provider
.endpoint
,
3049 requirer_relation_endpoint
= RelationEndpoint(
3050 deployed_requirer
.ee_id
,
3052 relation
.requirer
.endpoint
,
3054 await self
.vca_map
[vca_type
].add_relation(
3055 provider
=provider_relation_endpoint
,
3056 requirer
=requirer_relation_endpoint
,
3058 # remove entry from relations list
3062 async def _add_vca_relations(
3068 timeout
: int = 3600,
3072 # 1. find all relations for this VCA
3073 # 2. wait for other peers related
3077 # STEP 1: find all relations for this VCA
3080 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3081 nsd
= get_nsd(db_nsr
)
3084 deployed_vca_dict
= get_deployed_vca_list(db_nsr
)[vca_index
]
3085 my_vca
= DeployedVCA(nsr_id
, deployed_vca_dict
)
3090 relations
.extend(self
._get
_ns
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3091 relations
.extend(self
._get
_vnf
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3093 # if no relations, terminate
3095 self
.logger
.debug(logging_text
+ " No relations")
3098 self
.logger
.debug(logging_text
+ " adding relations {}".format(relations
))
3105 if now
- start
>= timeout
:
3106 self
.logger
.error(logging_text
+ " : timeout adding relations")
3109 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3110 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3112 # for each relation, find the VCA's related
3113 for relation
in relations
.copy():
3114 added
= await self
._add
_relation
(
3122 relations
.remove(relation
)
3125 self
.logger
.debug("Relations added")
3127 await asyncio
.sleep(5.0)
3131 except Exception as e
:
3132 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
3135 async def _install_kdu(
3143 k8s_instance_info
: dict,
3144 k8params
: dict = None,
3150 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
3153 "collection": "nsrs",
3154 "filter": {"_id": nsr_id
},
3155 "path": nsr_db_path
,
3158 if k8s_instance_info
.get("kdu-deployment-name"):
3159 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
3161 kdu_instance
= self
.k8scluster_map
[
3163 ].generate_kdu_instance_name(
3164 db_dict
=db_dict_install
,
3165 kdu_model
=k8s_instance_info
["kdu-model"],
3166 kdu_name
=k8s_instance_info
["kdu-name"],
3169 # Update the nsrs table with the kdu-instance value
3173 _desc
={nsr_db_path
+ ".kdu-instance": kdu_instance
},
3176 # Update the nsrs table with the actual namespace being used, if the k8scluster-type is `juju` or
3177 # `juju-bundle`. This verification is needed because there is not a standard/homogeneous namespace
3178 # between the Helm Charts and Juju Bundles-based KNFs. If we found a way of having an homogeneous
3179 # namespace, this first verification could be removed, and the next step would be done for any kind
3181 # TODO -> find a way to have an homogeneous namespace between the Helm Charts and Juju Bundles-based
3182 # KNFs (Bug 2027: https://osm.etsi.org/bugzilla/show_bug.cgi?id=2027)
3183 if k8sclustertype
in ("juju", "juju-bundle"):
3184 # First, verify if the current namespace is present in the `_admin.projects_read` (if not, it means
3185 # that the user passed a namespace which he wants its KDU to be deployed in)
3191 "_admin.projects_write": k8s_instance_info
["namespace"],
3192 "_admin.projects_read": k8s_instance_info
["namespace"],
3198 f
"Updating namespace/model for Juju Bundle from {k8s_instance_info['namespace']} to {kdu_instance}"
3203 _desc
={f
"{nsr_db_path}.namespace": kdu_instance
},
3205 k8s_instance_info
["namespace"] = kdu_instance
3207 await self
.k8scluster_map
[k8sclustertype
].install(
3208 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3209 kdu_model
=k8s_instance_info
["kdu-model"],
3212 db_dict
=db_dict_install
,
3214 kdu_name
=k8s_instance_info
["kdu-name"],
3215 namespace
=k8s_instance_info
["namespace"],
3216 kdu_instance
=kdu_instance
,
3220 # Obtain services to obtain management service ip
3221 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
3222 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3223 kdu_instance
=kdu_instance
,
3224 namespace
=k8s_instance_info
["namespace"],
3227 # Obtain management service info (if exists)
3228 vnfr_update_dict
= {}
3229 kdu_config
= get_configuration(vnfd
, kdud
["name"])
3231 target_ee_list
= kdu_config
.get("execution-environment-list", [])
3236 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
3239 for service
in kdud
.get("service", [])
3240 if service
.get("mgmt-service")
3242 for mgmt_service
in mgmt_services
:
3243 for service
in services
:
3244 if service
["name"].startswith(mgmt_service
["name"]):
3245 # Mgmt service found, Obtain service ip
3246 ip
= service
.get("external_ip", service
.get("cluster_ip"))
3247 if isinstance(ip
, list) and len(ip
) == 1:
3251 "kdur.{}.ip-address".format(kdu_index
)
3254 # Check if must update also mgmt ip at the vnf
3255 service_external_cp
= mgmt_service
.get(
3256 "external-connection-point-ref"
3258 if service_external_cp
:
3260 deep_get(vnfd
, ("mgmt-interface", "cp"))
3261 == service_external_cp
3263 vnfr_update_dict
["ip-address"] = ip
3268 "external-connection-point-ref", ""
3270 == service_external_cp
,
3273 "kdur.{}.ip-address".format(kdu_index
)
3278 "Mgmt service name: {} not found".format(
3279 mgmt_service
["name"]
3283 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3284 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3286 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3289 and kdu_config
.get("initial-config-primitive")
3290 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3292 initial_config_primitive_list
= kdu_config
.get(
3293 "initial-config-primitive"
3295 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3297 for initial_config_primitive
in initial_config_primitive_list
:
3298 primitive_params_
= self
._map
_primitive
_params
(
3299 initial_config_primitive
, {}, {}
3302 await asyncio
.wait_for(
3303 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3304 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3305 kdu_instance
=kdu_instance
,
3306 primitive_name
=initial_config_primitive
["name"],
3307 params
=primitive_params_
,
3308 db_dict
=db_dict_install
,
3314 except Exception as e
:
3315 # Prepare update db with error and raise exception
3318 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3322 vnfr_data
.get("_id"),
3323 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3326 # ignore to keep original exception
3328 # reraise original error
3333 async def deploy_kdus(
3340 task_instantiation_info
,
3342 # Launch kdus if present in the descriptor
3344 k8scluster_id_2_uuic
= {
3345 "helm-chart-v3": {},
3350 async def _get_cluster_id(cluster_id
, cluster_type
):
3351 nonlocal k8scluster_id_2_uuic
3352 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3353 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3355 # check if K8scluster is creating and wait look if previous tasks in process
3356 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3357 "k8scluster", cluster_id
3360 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3361 task_name
, cluster_id
3363 self
.logger
.debug(logging_text
+ text
)
3364 await asyncio
.wait(task_dependency
, timeout
=3600)
3366 db_k8scluster
= self
.db
.get_one(
3367 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3369 if not db_k8scluster
:
3370 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3372 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3374 if cluster_type
== "helm-chart-v3":
3376 # backward compatibility for existing clusters that have not been initialized for helm v3
3377 k8s_credentials
= yaml
.safe_dump(
3378 db_k8scluster
.get("credentials")
3380 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3381 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3383 db_k8scluster_update
= {}
3384 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3385 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3386 db_k8scluster_update
[
3387 "_admin.helm-chart-v3.created"
3389 db_k8scluster_update
[
3390 "_admin.helm-chart-v3.operationalState"
3393 "k8sclusters", cluster_id
, db_k8scluster_update
3395 except Exception as e
:
3398 + "error initializing helm-v3 cluster: {}".format(str(e
))
3401 "K8s cluster '{}' has not been initialized for '{}'".format(
3402 cluster_id
, cluster_type
3407 "K8s cluster '{}' has not been initialized for '{}'".format(
3408 cluster_id
, cluster_type
3411 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3414 logging_text
+= "Deploy kdus: "
3417 db_nsr_update
= {"_admin.deployed.K8s": []}
3418 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3421 updated_cluster_list
= []
3422 updated_v3_cluster_list
= []
3424 for vnfr_data
in db_vnfrs
.values():
3425 vca_id
= self
.get_vca_id(vnfr_data
, {})
3426 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3427 # Step 0: Prepare and set parameters
3428 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3429 vnfd_id
= vnfr_data
.get("vnfd-id")
3430 vnfd_with_id
= find_in_list(
3431 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3435 for kdud
in vnfd_with_id
["kdu"]
3436 if kdud
["name"] == kdur
["kdu-name"]
3438 namespace
= kdur
.get("k8s-namespace")
3439 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3440 if kdur
.get("helm-chart"):
3441 kdumodel
= kdur
["helm-chart"]
3442 # Default version: helm3, if helm-version is v2 assign v2
3443 k8sclustertype
= "helm-chart-v3"
3444 self
.logger
.debug("kdur: {}".format(kdur
))
3446 kdur
.get("helm-version")
3447 and kdur
.get("helm-version") == "v2"
3449 k8sclustertype
= "helm-chart"
3450 elif kdur
.get("juju-bundle"):
3451 kdumodel
= kdur
["juju-bundle"]
3452 k8sclustertype
= "juju-bundle"
3455 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3456 "juju-bundle. Maybe an old NBI version is running".format(
3457 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3460 # check if kdumodel is a file and exists
3462 vnfd_with_id
= find_in_list(
3463 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3465 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3466 if storage
: # may be not present if vnfd has not artifacts
3467 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3468 if storage
["pkg-dir"]:
3469 filename
= "{}/{}/{}s/{}".format(
3476 filename
= "{}/Scripts/{}s/{}".format(
3481 if self
.fs
.file_exists(
3482 filename
, mode
="file"
3483 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3484 kdumodel
= self
.fs
.path
+ filename
3485 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3487 except Exception: # it is not a file
3490 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3491 step
= "Synchronize repos for k8s cluster '{}'".format(
3494 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3498 k8sclustertype
== "helm-chart"
3499 and cluster_uuid
not in updated_cluster_list
3501 k8sclustertype
== "helm-chart-v3"
3502 and cluster_uuid
not in updated_v3_cluster_list
3504 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3505 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3506 cluster_uuid
=cluster_uuid
3509 if del_repo_list
or added_repo_dict
:
3510 if k8sclustertype
== "helm-chart":
3512 "_admin.helm_charts_added." + item
: None
3513 for item
in del_repo_list
3516 "_admin.helm_charts_added." + item
: name
3517 for item
, name
in added_repo_dict
.items()
3519 updated_cluster_list
.append(cluster_uuid
)
3520 elif k8sclustertype
== "helm-chart-v3":
3522 "_admin.helm_charts_v3_added." + item
: None
3523 for item
in del_repo_list
3526 "_admin.helm_charts_v3_added." + item
: name
3527 for item
, name
in added_repo_dict
.items()
3529 updated_v3_cluster_list
.append(cluster_uuid
)
3531 logging_text
+ "repos synchronized on k8s cluster "
3532 "'{}' to_delete: {}, to_add: {}".format(
3533 k8s_cluster_id
, del_repo_list
, added_repo_dict
3538 {"_id": k8s_cluster_id
},
3544 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3545 vnfr_data
["member-vnf-index-ref"],
3549 k8s_instance_info
= {
3550 "kdu-instance": None,
3551 "k8scluster-uuid": cluster_uuid
,
3552 "k8scluster-type": k8sclustertype
,
3553 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3554 "kdu-name": kdur
["kdu-name"],
3555 "kdu-model": kdumodel
,
3556 "namespace": namespace
,
3557 "kdu-deployment-name": kdu_deployment_name
,
3559 db_path
= "_admin.deployed.K8s.{}".format(index
)
3560 db_nsr_update
[db_path
] = k8s_instance_info
3561 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3562 vnfd_with_id
= find_in_list(
3563 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3565 task
= asyncio
.ensure_future(
3574 k8params
=desc_params
,
3579 self
.lcm_tasks
.register(
3583 "instantiate_KDU-{}".format(index
),
3586 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3592 except (LcmException
, asyncio
.CancelledError
):
3594 except Exception as e
:
3595 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3596 if isinstance(e
, (N2VCException
, DbException
)):
3597 self
.logger
.error(logging_text
+ msg
)
3599 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3600 raise LcmException(msg
)
3603 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3622 task_instantiation_info
,
3625 # launch instantiate_N2VC in a asyncio task and register task object
3626 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3627 # if not found, create one entry and update database
3628 # fill db_nsr._admin.deployed.VCA.<index>
3631 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
3633 if "execution-environment-list" in descriptor_config
:
3634 ee_list
= descriptor_config
.get("execution-environment-list", [])
3635 elif "juju" in descriptor_config
:
3636 ee_list
= [descriptor_config
] # ns charms
3637 else: # other types as script are not supported
3640 for ee_item
in ee_list
:
3643 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3644 ee_item
.get("juju"), ee_item
.get("helm-chart")
3647 ee_descriptor_id
= ee_item
.get("id")
3648 if ee_item
.get("juju"):
3649 vca_name
= ee_item
["juju"].get("charm")
3652 if ee_item
["juju"].get("charm") is not None
3655 if ee_item
["juju"].get("cloud") == "k8s":
3656 vca_type
= "k8s_proxy_charm"
3657 elif ee_item
["juju"].get("proxy") is False:
3658 vca_type
= "native_charm"
3659 elif ee_item
.get("helm-chart"):
3660 vca_name
= ee_item
["helm-chart"]
3661 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
3664 vca_type
= "helm-v3"
3667 logging_text
+ "skipping non juju neither charm configuration"
3672 for vca_index
, vca_deployed
in enumerate(
3673 db_nsr
["_admin"]["deployed"]["VCA"]
3675 if not vca_deployed
:
3678 vca_deployed
.get("member-vnf-index") == member_vnf_index
3679 and vca_deployed
.get("vdu_id") == vdu_id
3680 and vca_deployed
.get("kdu_name") == kdu_name
3681 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
3682 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
3686 # not found, create one.
3688 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
3691 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
3693 target
+= "/kdu/{}".format(kdu_name
)
3695 "target_element": target
,
3696 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3697 "member-vnf-index": member_vnf_index
,
3699 "kdu_name": kdu_name
,
3700 "vdu_count_index": vdu_index
,
3701 "operational-status": "init", # TODO revise
3702 "detailed-status": "", # TODO revise
3703 "step": "initial-deploy", # TODO revise
3705 "vdu_name": vdu_name
,
3707 "ee_descriptor_id": ee_descriptor_id
,
3711 # create VCA and configurationStatus in db
3713 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
3714 "configurationStatus.{}".format(vca_index
): dict(),
3716 self
.update_db_2("nsrs", nsr_id
, db_dict
)
3718 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
3720 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
3721 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
3722 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
3725 task_n2vc
= asyncio
.ensure_future(
3726 self
.instantiate_N2VC(
3727 logging_text
=logging_text
,
3728 vca_index
=vca_index
,
3734 vdu_index
=vdu_index
,
3735 deploy_params
=deploy_params
,
3736 config_descriptor
=descriptor_config
,
3737 base_folder
=base_folder
,
3738 nslcmop_id
=nslcmop_id
,
3742 ee_config_descriptor
=ee_item
,
3745 self
.lcm_tasks
.register(
3749 "instantiate_N2VC-{}".format(vca_index
),
3752 task_instantiation_info
[
3754 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
3755 member_vnf_index
or "", vdu_id
or ""
3759 def _create_nslcmop(nsr_id
, operation
, params
):
3761 Creates a ns-lcm-opp content to be stored at database.
3762 :param nsr_id: internal id of the instance
3763 :param operation: instantiate, terminate, scale, action, ...
3764 :param params: user parameters for the operation
3765 :return: dictionary following SOL005 format
3767 # Raise exception if invalid arguments
3768 if not (nsr_id
and operation
and params
):
3770 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3777 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3778 "operationState": "PROCESSING",
3779 "statusEnteredTime": now
,
3780 "nsInstanceId": nsr_id
,
3781 "lcmOperationType": operation
,
3783 "isAutomaticInvocation": False,
3784 "operationParams": params
,
3785 "isCancelPending": False,
3787 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
3788 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
3793 def _format_additional_params(self
, params
):
3794 params
= params
or {}
3795 for key
, value
in params
.items():
3796 if str(value
).startswith("!!yaml "):
3797 params
[key
] = yaml
.safe_load(value
[7:])
3800 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
3801 primitive
= seq
.get("name")
3802 primitive_params
= {}
3804 "member_vnf_index": vnf_index
,
3805 "primitive": primitive
,
3806 "primitive_params": primitive_params
,
3809 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
3813 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
3814 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
3815 if op
.get("operationState") == "COMPLETED":
3816 # b. Skip sub-operation
3817 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3818 return self
.SUBOPERATION_STATUS_SKIP
3820 # c. retry executing sub-operation
3821 # The sub-operation exists, and operationState != 'COMPLETED'
3822 # Update operationState = 'PROCESSING' to indicate a retry.
3823 operationState
= "PROCESSING"
3824 detailed_status
= "In progress"
3825 self
._update
_suboperation
_status
(
3826 db_nslcmop
, op_index
, operationState
, detailed_status
3828 # Return the sub-operation index
3829 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3830 # with arguments extracted from the sub-operation
3833 # Find a sub-operation where all keys in a matching dictionary must match
3834 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3835 def _find_suboperation(self
, db_nslcmop
, match
):
3836 if db_nslcmop
and match
:
3837 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
3838 for i
, op
in enumerate(op_list
):
3839 if all(op
.get(k
) == match
[k
] for k
in match
):
3841 return self
.SUBOPERATION_STATUS_NOT_FOUND
3843 # Update status for a sub-operation given its index
3844 def _update_suboperation_status(
3845 self
, db_nslcmop
, op_index
, operationState
, detailed_status
3847 # Update DB for HA tasks
3848 q_filter
= {"_id": db_nslcmop
["_id"]}
3850 "_admin.operations.{}.operationState".format(op_index
): operationState
,
3851 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
3854 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
3857 # Add sub-operation, return the index of the added sub-operation
3858 # Optionally, set operationState, detailed-status, and operationType
3859 # Status and type are currently set for 'scale' sub-operations:
3860 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3861 # 'detailed-status' : status message
3862 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3863 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3864 def _add_suboperation(
3872 mapped_primitive_params
,
3873 operationState
=None,
3874 detailed_status
=None,
3877 RO_scaling_info
=None,
3880 return self
.SUBOPERATION_STATUS_NOT_FOUND
3881 # Get the "_admin.operations" list, if it exists
3882 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
3883 op_list
= db_nslcmop_admin
.get("operations")
3884 # Create or append to the "_admin.operations" list
3886 "member_vnf_index": vnf_index
,
3888 "vdu_count_index": vdu_count_index
,
3889 "primitive": primitive
,
3890 "primitive_params": mapped_primitive_params
,
3893 new_op
["operationState"] = operationState
3895 new_op
["detailed-status"] = detailed_status
3897 new_op
["lcmOperationType"] = operationType
3899 new_op
["RO_nsr_id"] = RO_nsr_id
3901 new_op
["RO_scaling_info"] = RO_scaling_info
3903 # No existing operations, create key 'operations' with current operation as first list element
3904 db_nslcmop_admin
.update({"operations": [new_op
]})
3905 op_list
= db_nslcmop_admin
.get("operations")
3907 # Existing operations, append operation to list
3908 op_list
.append(new_op
)
3910 db_nslcmop_update
= {"_admin.operations": op_list
}
3911 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
3912 op_index
= len(op_list
) - 1
3915 # Helper methods for scale() sub-operations
3917 # pre-scale/post-scale:
3918 # Check for 3 different cases:
3919 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
3920 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
3921 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
3922 def _check_or_add_scale_suboperation(
3926 vnf_config_primitive
,
3930 RO_scaling_info
=None,
3932 # Find this sub-operation
3933 if RO_nsr_id
and RO_scaling_info
:
3934 operationType
= "SCALE-RO"
3936 "member_vnf_index": vnf_index
,
3937 "RO_nsr_id": RO_nsr_id
,
3938 "RO_scaling_info": RO_scaling_info
,
3942 "member_vnf_index": vnf_index
,
3943 "primitive": vnf_config_primitive
,
3944 "primitive_params": primitive_params
,
3945 "lcmOperationType": operationType
,
3947 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
3948 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
3949 # a. New sub-operation
3950 # The sub-operation does not exist, add it.
3951 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
3952 # The following parameters are set to None for all kind of scaling:
3954 vdu_count_index
= None
3956 if RO_nsr_id
and RO_scaling_info
:
3957 vnf_config_primitive
= None
3958 primitive_params
= None
3961 RO_scaling_info
= None
3962 # Initial status for sub-operation
3963 operationState
= "PROCESSING"
3964 detailed_status
= "In progress"
3965 # Add sub-operation for pre/post-scaling (zero or more operations)
3966 self
._add
_suboperation
(
3972 vnf_config_primitive
,
3980 return self
.SUBOPERATION_STATUS_NEW
3982 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3983 # or op_index (operationState != 'COMPLETED')
3984 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
3986 # Function to return execution_environment id
3988 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
3989 # TODO vdu_index_count
3990 for vca
in vca_deployed_list
:
3991 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
3994 async def destroy_N2VC(
4002 exec_primitives
=True,
4007 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
4008 :param logging_text:
4010 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
4011 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
4012 :param vca_index: index in the database _admin.deployed.VCA
4013 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
4014 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
4015 not executed properly
4016 :param scaling_in: True destroys the application, False destroys the model
4017 :return: None or exception
4022 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
4023 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
4027 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
4029 # execute terminate_primitives
4031 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
4032 config_descriptor
.get("terminate-config-primitive"),
4033 vca_deployed
.get("ee_descriptor_id"),
4035 vdu_id
= vca_deployed
.get("vdu_id")
4036 vdu_count_index
= vca_deployed
.get("vdu_count_index")
4037 vdu_name
= vca_deployed
.get("vdu_name")
4038 vnf_index
= vca_deployed
.get("member-vnf-index")
4039 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
4040 for seq
in terminate_primitives
:
4041 # For each sequence in list, get primitive and call _ns_execute_primitive()
4042 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
4043 vnf_index
, seq
.get("name")
4045 self
.logger
.debug(logging_text
+ step
)
4046 # Create the primitive for each sequence, i.e. "primitive": "touch"
4047 primitive
= seq
.get("name")
4048 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
4053 self
._add
_suboperation
(
4060 mapped_primitive_params
,
4062 # Sub-operations: Call _ns_execute_primitive() instead of action()
4064 result
, result_detail
= await self
._ns
_execute
_primitive
(
4065 vca_deployed
["ee_id"],
4067 mapped_primitive_params
,
4071 except LcmException
:
4072 # this happens when VCA is not deployed. In this case it is not needed to terminate
4074 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
4075 if result
not in result_ok
:
4077 "terminate_primitive {} for vnf_member_index={} fails with "
4078 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
4080 # set that this VCA do not need terminated
4081 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
4085 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
4088 # Delete Prometheus Jobs if any
4089 # This uses NSR_ID, so it will destroy any jobs under this index
4090 self
.db
.del_list("prometheus_jobs", {"nsr_id": db_nslcmop
["nsInstanceId"]})
4093 await self
.vca_map
[vca_type
].delete_execution_environment(
4094 vca_deployed
["ee_id"],
4095 scaling_in
=scaling_in
,
4100 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
4101 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
4102 namespace
= "." + db_nsr
["_id"]
4104 await self
.n2vc
.delete_namespace(
4105 namespace
=namespace
,
4106 total_timeout
=self
.timeout_charm_delete
,
4109 except N2VCNotFound
: # already deleted. Skip
4111 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
4113 async def _terminate_RO(
4114 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4117 Terminates a deployment from RO
4118 :param logging_text:
4119 :param nsr_deployed: db_nsr._admin.deployed
4122 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
4123 this method will update only the index 2, but it will write on database the concatenated content of the list
4128 ro_nsr_id
= ro_delete_action
= None
4129 if nsr_deployed
and nsr_deployed
.get("RO"):
4130 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
4131 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
4134 stage
[2] = "Deleting ns from VIM."
4135 db_nsr_update
["detailed-status"] = " ".join(stage
)
4136 self
._write
_op
_status
(nslcmop_id
, stage
)
4137 self
.logger
.debug(logging_text
+ stage
[2])
4138 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4139 self
._write
_op
_status
(nslcmop_id
, stage
)
4140 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
4141 ro_delete_action
= desc
["action_id"]
4143 "_admin.deployed.RO.nsr_delete_action_id"
4144 ] = ro_delete_action
4145 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4146 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4147 if ro_delete_action
:
4148 # wait until NS is deleted from VIM
4149 stage
[2] = "Waiting ns deleted from VIM."
4150 detailed_status_old
= None
4154 + " RO_id={} ro_delete_action={}".format(
4155 ro_nsr_id
, ro_delete_action
4158 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4159 self
._write
_op
_status
(nslcmop_id
, stage
)
4161 delete_timeout
= 20 * 60 # 20 minutes
4162 while delete_timeout
> 0:
4163 desc
= await self
.RO
.show(
4165 item_id_name
=ro_nsr_id
,
4166 extra_item
="action",
4167 extra_item_id
=ro_delete_action
,
4171 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
4173 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
4174 if ns_status
== "ERROR":
4175 raise ROclient
.ROClientException(ns_status_info
)
4176 elif ns_status
== "BUILD":
4177 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
4178 elif ns_status
== "ACTIVE":
4179 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4180 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4185 ), "ROclient.check_action_status returns unknown {}".format(
4188 if stage
[2] != detailed_status_old
:
4189 detailed_status_old
= stage
[2]
4190 db_nsr_update
["detailed-status"] = " ".join(stage
)
4191 self
._write
_op
_status
(nslcmop_id
, stage
)
4192 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4193 await asyncio
.sleep(5, loop
=self
.loop
)
4195 else: # delete_timeout <= 0:
4196 raise ROclient
.ROClientException(
4197 "Timeout waiting ns deleted from VIM"
4200 except Exception as e
:
4201 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4203 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4205 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4206 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4207 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4209 logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
)
4212 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4214 failed_detail
.append("delete conflict: {}".format(e
))
4217 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
)
4220 failed_detail
.append("delete error: {}".format(e
))
4222 logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
)
4226 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
4227 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
4229 stage
[2] = "Deleting nsd from RO."
4230 db_nsr_update
["detailed-status"] = " ".join(stage
)
4231 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4232 self
._write
_op
_status
(nslcmop_id
, stage
)
4233 await self
.RO
.delete("nsd", ro_nsd_id
)
4235 logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
)
4237 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4238 except Exception as e
:
4240 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4242 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4244 logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
)
4247 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4249 failed_detail
.append(
4250 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
)
4252 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4254 failed_detail
.append(
4255 "ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
)
4257 self
.logger
.error(logging_text
+ failed_detail
[-1])
4259 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
4260 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
4261 if not vnf_deployed
or not vnf_deployed
["id"]:
4264 ro_vnfd_id
= vnf_deployed
["id"]
4267 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4268 vnf_deployed
["member-vnf-index"], ro_vnfd_id
4270 db_nsr_update
["detailed-status"] = " ".join(stage
)
4271 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4272 self
._write
_op
_status
(nslcmop_id
, stage
)
4273 await self
.RO
.delete("vnfd", ro_vnfd_id
)
4275 logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
)
4277 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
4278 except Exception as e
:
4280 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4283 "_admin.deployed.RO.vnfd.{}.id".format(index
)
4287 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
)
4290 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4292 failed_detail
.append(
4293 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
)
4295 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4297 failed_detail
.append(
4298 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
)
4300 self
.logger
.error(logging_text
+ failed_detail
[-1])
4303 stage
[2] = "Error deleting from VIM"
4305 stage
[2] = "Deleted from VIM"
4306 db_nsr_update
["detailed-status"] = " ".join(stage
)
4307 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4308 self
._write
_op
_status
(nslcmop_id
, stage
)
4311 raise LcmException("; ".join(failed_detail
))
4313 async def terminate(self
, nsr_id
, nslcmop_id
):
4314 # Try to lock HA task here
4315 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4316 if not task_is_locked_by_me
:
4319 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4320 self
.logger
.debug(logging_text
+ "Enter")
4321 timeout_ns_terminate
= self
.timeout_ns_terminate
4324 operation_params
= None
4326 error_list
= [] # annotates all failed error messages
4327 db_nslcmop_update
= {}
4328 autoremove
= False # autoremove after terminated
4329 tasks_dict_info
= {}
4332 "Stage 1/3: Preparing task.",
4333 "Waiting for previous operations to terminate.",
4336 # ^ contains [stage, step, VIM-status]
4338 # wait for any previous tasks in process
4339 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4341 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4342 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4343 operation_params
= db_nslcmop
.get("operationParams") or {}
4344 if operation_params
.get("timeout_ns_terminate"):
4345 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4346 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4347 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4349 db_nsr_update
["operational-status"] = "terminating"
4350 db_nsr_update
["config-status"] = "terminating"
4351 self
._write
_ns
_status
(
4353 ns_state
="TERMINATING",
4354 current_operation
="TERMINATING",
4355 current_operation_id
=nslcmop_id
,
4356 other_update
=db_nsr_update
,
4358 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4359 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4360 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4363 stage
[1] = "Getting vnf descriptors from db."
4364 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4366 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4368 db_vnfds_from_id
= {}
4369 db_vnfds_from_member_index
= {}
4371 for vnfr
in db_vnfrs_list
:
4372 vnfd_id
= vnfr
["vnfd-id"]
4373 if vnfd_id
not in db_vnfds_from_id
:
4374 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4375 db_vnfds_from_id
[vnfd_id
] = vnfd
4376 db_vnfds_from_member_index
[
4377 vnfr
["member-vnf-index-ref"]
4378 ] = db_vnfds_from_id
[vnfd_id
]
4380 # Destroy individual execution environments when there are terminating primitives.
4381 # Rest of EE will be deleted at once
4382 # TODO - check before calling _destroy_N2VC
4383 # if not operation_params.get("skip_terminate_primitives"):#
4384 # or not vca.get("needed_terminate"):
4385 stage
[0] = "Stage 2/3 execute terminating primitives."
4386 self
.logger
.debug(logging_text
+ stage
[0])
4387 stage
[1] = "Looking execution environment that needs terminate."
4388 self
.logger
.debug(logging_text
+ stage
[1])
4390 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4391 config_descriptor
= None
4392 vca_member_vnf_index
= vca
.get("member-vnf-index")
4393 vca_id
= self
.get_vca_id(
4394 db_vnfrs_dict
.get(vca_member_vnf_index
)
4395 if vca_member_vnf_index
4399 if not vca
or not vca
.get("ee_id"):
4401 if not vca
.get("member-vnf-index"):
4403 config_descriptor
= db_nsr
.get("ns-configuration")
4404 elif vca
.get("vdu_id"):
4405 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4406 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4407 elif vca
.get("kdu_name"):
4408 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4409 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4411 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4412 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4413 vca_type
= vca
.get("type")
4414 exec_terminate_primitives
= not operation_params
.get(
4415 "skip_terminate_primitives"
4416 ) and vca
.get("needed_terminate")
4417 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4418 # pending native charms
4420 True if vca_type
in ("helm", "helm-v3", "native_charm") else False
4422 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4423 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4424 task
= asyncio
.ensure_future(
4432 exec_terminate_primitives
,
4436 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4438 # wait for pending tasks of terminate primitives
4442 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4444 error_list
= await self
._wait
_for
_tasks
(
4447 min(self
.timeout_charm_delete
, timeout_ns_terminate
),
4451 tasks_dict_info
.clear()
4453 return # raise LcmException("; ".join(error_list))
4455 # remove All execution environments at once
4456 stage
[0] = "Stage 3/3 delete all."
4458 if nsr_deployed
.get("VCA"):
4459 stage
[1] = "Deleting all execution environments."
4460 self
.logger
.debug(logging_text
+ stage
[1])
4461 vca_id
= self
.get_vca_id({}, db_nsr
)
4462 task_delete_ee
= asyncio
.ensure_future(
4464 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4465 timeout
=self
.timeout_charm_delete
,
4468 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4469 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4471 # Delete from k8scluster
4472 stage
[1] = "Deleting KDUs."
4473 self
.logger
.debug(logging_text
+ stage
[1])
4474 # print(nsr_deployed)
4475 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4476 if not kdu
or not kdu
.get("kdu-instance"):
4478 kdu_instance
= kdu
.get("kdu-instance")
4479 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4480 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4481 vca_id
= self
.get_vca_id({}, db_nsr
)
4482 task_delete_kdu_instance
= asyncio
.ensure_future(
4483 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4484 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4485 kdu_instance
=kdu_instance
,
4492 + "Unknown k8s deployment type {}".format(
4493 kdu
.get("k8scluster-type")
4498 task_delete_kdu_instance
4499 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4502 stage
[1] = "Deleting ns from VIM."
4504 task_delete_ro
= asyncio
.ensure_future(
4505 self
._terminate
_ng
_ro
(
4506 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4510 task_delete_ro
= asyncio
.ensure_future(
4512 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4515 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4517 # rest of staff will be done at finally
4520 ROclient
.ROClientException
,
4525 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4527 except asyncio
.CancelledError
:
4529 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4531 exc
= "Operation was cancelled"
4532 except Exception as e
:
4533 exc
= traceback
.format_exc()
4534 self
.logger
.critical(
4535 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4540 error_list
.append(str(exc
))
4542 # wait for pending tasks
4544 stage
[1] = "Waiting for terminate pending tasks."
4545 self
.logger
.debug(logging_text
+ stage
[1])
4546 error_list
+= await self
._wait
_for
_tasks
(
4549 timeout_ns_terminate
,
4553 stage
[1] = stage
[2] = ""
4554 except asyncio
.CancelledError
:
4555 error_list
.append("Cancelled")
4556 # TODO cancell all tasks
4557 except Exception as exc
:
4558 error_list
.append(str(exc
))
4559 # update status at database
4561 error_detail
= "; ".join(error_list
)
4562 # self.logger.error(logging_text + error_detail)
4563 error_description_nslcmop
= "{} Detail: {}".format(
4564 stage
[0], error_detail
4566 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4567 nslcmop_id
, stage
[0]
4570 db_nsr_update
["operational-status"] = "failed"
4571 db_nsr_update
["detailed-status"] = (
4572 error_description_nsr
+ " Detail: " + error_detail
4574 db_nslcmop_update
["detailed-status"] = error_detail
4575 nslcmop_operation_state
= "FAILED"
4579 error_description_nsr
= error_description_nslcmop
= None
4580 ns_state
= "NOT_INSTANTIATED"
4581 db_nsr_update
["operational-status"] = "terminated"
4582 db_nsr_update
["detailed-status"] = "Done"
4583 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4584 db_nslcmop_update
["detailed-status"] = "Done"
4585 nslcmop_operation_state
= "COMPLETED"
4588 self
._write
_ns
_status
(
4591 current_operation
="IDLE",
4592 current_operation_id
=None,
4593 error_description
=error_description_nsr
,
4594 error_detail
=error_detail
,
4595 other_update
=db_nsr_update
,
4597 self
._write
_op
_status
(
4600 error_message
=error_description_nslcmop
,
4601 operation_state
=nslcmop_operation_state
,
4602 other_update
=db_nslcmop_update
,
4604 if ns_state
== "NOT_INSTANTIATED":
4608 {"nsr-id-ref": nsr_id
},
4609 {"_admin.nsState": "NOT_INSTANTIATED"},
4611 except DbException
as e
:
4614 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4618 if operation_params
:
4619 autoremove
= operation_params
.get("autoremove", False)
4620 if nslcmop_operation_state
:
4622 await self
.msg
.aiowrite(
4627 "nslcmop_id": nslcmop_id
,
4628 "operationState": nslcmop_operation_state
,
4629 "autoremove": autoremove
,
4633 except Exception as e
:
4635 logging_text
+ "kafka_write notification Exception {}".format(e
)
4638 self
.logger
.debug(logging_text
+ "Exit")
4639 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4641 async def _wait_for_tasks(
4642 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4645 error_detail_list
= []
4647 pending_tasks
= list(created_tasks_info
.keys())
4648 num_tasks
= len(pending_tasks
)
4650 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4651 self
._write
_op
_status
(nslcmop_id
, stage
)
4652 while pending_tasks
:
4654 _timeout
= timeout
+ time_start
- time()
4655 done
, pending_tasks
= await asyncio
.wait(
4656 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4658 num_done
+= len(done
)
4659 if not done
: # Timeout
4660 for task
in pending_tasks
:
4661 new_error
= created_tasks_info
[task
] + ": Timeout"
4662 error_detail_list
.append(new_error
)
4663 error_list
.append(new_error
)
4666 if task
.cancelled():
4669 exc
= task
.exception()
4671 if isinstance(exc
, asyncio
.TimeoutError
):
4673 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4674 error_list
.append(created_tasks_info
[task
])
4675 error_detail_list
.append(new_error
)
4682 ROclient
.ROClientException
,
4688 self
.logger
.error(logging_text
+ new_error
)
4690 exc_traceback
= "".join(
4691 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4695 + created_tasks_info
[task
]
4701 logging_text
+ created_tasks_info
[task
] + ": Done"
4703 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4705 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4706 if nsr_id
: # update also nsr
4711 "errorDescription": "Error at: " + ", ".join(error_list
),
4712 "errorDetail": ". ".join(error_detail_list
),
4715 self
._write
_op
_status
(nslcmop_id
, stage
)
4716 return error_detail_list
4719 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4721 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4722 The default-value is used. If it is between < > it look for a value at instantiation_params
4723 :param primitive_desc: portion of VNFD/NSD that describes primitive
4724 :param params: Params provided by user
4725 :param instantiation_params: Instantiation params provided by user
4726 :return: a dictionary with the calculated params
4728 calculated_params
= {}
4729 for parameter
in primitive_desc
.get("parameter", ()):
4730 param_name
= parameter
["name"]
4731 if param_name
in params
:
4732 calculated_params
[param_name
] = params
[param_name
]
4733 elif "default-value" in parameter
or "value" in parameter
:
4734 if "value" in parameter
:
4735 calculated_params
[param_name
] = parameter
["value"]
4737 calculated_params
[param_name
] = parameter
["default-value"]
4739 isinstance(calculated_params
[param_name
], str)
4740 and calculated_params
[param_name
].startswith("<")
4741 and calculated_params
[param_name
].endswith(">")
4743 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4744 calculated_params
[param_name
] = instantiation_params
[
4745 calculated_params
[param_name
][1:-1]
4749 "Parameter {} needed to execute primitive {} not provided".format(
4750 calculated_params
[param_name
], primitive_desc
["name"]
4755 "Parameter {} needed to execute primitive {} not provided".format(
4756 param_name
, primitive_desc
["name"]
4760 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4761 calculated_params
[param_name
] = yaml
.safe_dump(
4762 calculated_params
[param_name
], default_flow_style
=True, width
=256
4764 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4766 ].startswith("!!yaml "):
4767 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4768 if parameter
.get("data-type") == "INTEGER":
4770 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4771 except ValueError: # error converting string to int
4773 "Parameter {} of primitive {} must be integer".format(
4774 param_name
, primitive_desc
["name"]
4777 elif parameter
.get("data-type") == "BOOLEAN":
4778 calculated_params
[param_name
] = not (
4779 (str(calculated_params
[param_name
])).lower() == "false"
4782 # add always ns_config_info if primitive name is config
4783 if primitive_desc
["name"] == "config":
4784 if "ns_config_info" in instantiation_params
:
4785 calculated_params
["ns_config_info"] = instantiation_params
[
4788 return calculated_params
4790 def _look_for_deployed_vca(
4797 ee_descriptor_id
=None,
4799 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4800 for vca
in deployed_vca
:
4803 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4806 vdu_count_index
is not None
4807 and vdu_count_index
!= vca
["vdu_count_index"]
4810 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4812 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4816 # vca_deployed not found
4818 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4819 " is not deployed".format(
4828 ee_id
= vca
.get("ee_id")
4830 "type", "lxc_proxy_charm"
4831 ) # default value for backward compatibility - proxy charm
4834 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4835 "execution environment".format(
4836 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
4839 return ee_id
, vca_type
4841 async def _ns_execute_primitive(
4847 retries_interval
=30,
4854 if primitive
== "config":
4855 primitive_params
= {"params": primitive_params
}
4857 vca_type
= vca_type
or "lxc_proxy_charm"
4861 output
= await asyncio
.wait_for(
4862 self
.vca_map
[vca_type
].exec_primitive(
4864 primitive_name
=primitive
,
4865 params_dict
=primitive_params
,
4866 progress_timeout
=self
.timeout_progress_primitive
,
4867 total_timeout
=self
.timeout_primitive
,
4872 timeout
=timeout
or self
.timeout_primitive
,
4876 except asyncio
.CancelledError
:
4878 except Exception as e
: # asyncio.TimeoutError
4879 if isinstance(e
, asyncio
.TimeoutError
):
4884 "Error executing action {} on {} -> {}".format(
4889 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
4891 return "FAILED", str(e
)
4893 return "COMPLETED", output
4895 except (LcmException
, asyncio
.CancelledError
):
4897 except Exception as e
:
4898 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
4900 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
4902 Updating the vca_status with latest juju information in nsrs record
4903 :param: nsr_id: Id of the nsr
4904 :param: nslcmop_id: Id of the nslcmop
4908 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
4909 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4910 vca_id
= self
.get_vca_id({}, db_nsr
)
4911 if db_nsr
["_admin"]["deployed"]["K8s"]:
4912 for _
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
4913 cluster_uuid
, kdu_instance
, cluster_type
= (
4914 k8s
["k8scluster-uuid"],
4915 k8s
["kdu-instance"],
4916 k8s
["k8scluster-type"],
4918 await self
._on
_update
_k
8s
_db
(
4919 cluster_uuid
=cluster_uuid
,
4920 kdu_instance
=kdu_instance
,
4921 filter={"_id": nsr_id
},
4923 cluster_type
=cluster_type
,
4926 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
4927 table
, filter = "nsrs", {"_id": nsr_id
}
4928 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
4929 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
4931 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
4932 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
4934 async def action(self
, nsr_id
, nslcmop_id
):
4935 # Try to lock HA task here
4936 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4937 if not task_is_locked_by_me
:
4940 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
4941 self
.logger
.debug(logging_text
+ "Enter")
4942 # get all needed from database
4946 db_nslcmop_update
= {}
4947 nslcmop_operation_state
= None
4948 error_description_nslcmop
= None
4951 # wait for any previous tasks in process
4952 step
= "Waiting for previous operations to terminate"
4953 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4955 self
._write
_ns
_status
(
4958 current_operation
="RUNNING ACTION",
4959 current_operation_id
=nslcmop_id
,
4962 step
= "Getting information from database"
4963 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4964 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4965 if db_nslcmop
["operationParams"].get("primitive_params"):
4966 db_nslcmop
["operationParams"]["primitive_params"] = json
.loads(
4967 db_nslcmop
["operationParams"]["primitive_params"]
4970 nsr_deployed
= db_nsr
["_admin"].get("deployed")
4971 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
4972 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
4973 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
4974 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
4975 primitive
= db_nslcmop
["operationParams"]["primitive"]
4976 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
4977 timeout_ns_action
= db_nslcmop
["operationParams"].get(
4978 "timeout_ns_action", self
.timeout_primitive
4982 step
= "Getting vnfr from database"
4983 db_vnfr
= self
.db
.get_one(
4984 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
4986 if db_vnfr
.get("kdur"):
4988 for kdur
in db_vnfr
["kdur"]:
4989 if kdur
.get("additionalParams"):
4990 kdur
["additionalParams"] = json
.loads(
4991 kdur
["additionalParams"]
4993 kdur_list
.append(kdur
)
4994 db_vnfr
["kdur"] = kdur_list
4995 step
= "Getting vnfd from database"
4996 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
4998 step
= "Getting nsd from database"
4999 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
5001 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5002 # for backward compatibility
5003 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5004 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5005 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5006 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5008 # look for primitive
5009 config_primitive_desc
= descriptor_configuration
= None
5011 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
5013 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
5015 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
5017 descriptor_configuration
= db_nsd
.get("ns-configuration")
5019 if descriptor_configuration
and descriptor_configuration
.get(
5022 for config_primitive
in descriptor_configuration
["config-primitive"]:
5023 if config_primitive
["name"] == primitive
:
5024 config_primitive_desc
= config_primitive
5027 if not config_primitive_desc
:
5028 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
5030 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
5034 primitive_name
= primitive
5035 ee_descriptor_id
= None
5037 primitive_name
= config_primitive_desc
.get(
5038 "execution-environment-primitive", primitive
5040 ee_descriptor_id
= config_primitive_desc
.get(
5041 "execution-environment-ref"
5047 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
5049 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
5052 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
5054 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
5056 desc_params
= parse_yaml_strings(
5057 db_vnfr
.get("additionalParamsForVnf")
5060 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
5061 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
5062 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
5064 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
5065 actions
.add(primitive
["name"])
5066 for primitive
in kdu_configuration
.get("config-primitive", []):
5067 actions
.add(primitive
["name"])
5068 kdu_action
= True if primitive_name
in actions
else False
5070 # TODO check if ns is in a proper status
5072 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
5074 # kdur and desc_params already set from before
5075 if primitive_params
:
5076 desc_params
.update(primitive_params
)
5077 # TODO Check if we will need something at vnf level
5078 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
5080 kdu_name
== kdu
["kdu-name"]
5081 and kdu
["member-vnf-index"] == vnf_index
5086 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
5089 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
5090 msg
= "unknown k8scluster-type '{}'".format(
5091 kdu
.get("k8scluster-type")
5093 raise LcmException(msg
)
5096 "collection": "nsrs",
5097 "filter": {"_id": nsr_id
},
5098 "path": "_admin.deployed.K8s.{}".format(index
),
5102 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
5104 step
= "Executing kdu {}".format(primitive_name
)
5105 if primitive_name
== "upgrade":
5106 if desc_params
.get("kdu_model"):
5107 kdu_model
= desc_params
.get("kdu_model")
5108 del desc_params
["kdu_model"]
5110 kdu_model
= kdu
.get("kdu-model")
5111 parts
= kdu_model
.split(sep
=":")
5113 kdu_model
= parts
[0]
5115 detailed_status
= await asyncio
.wait_for(
5116 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
5117 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5118 kdu_instance
=kdu
.get("kdu-instance"),
5120 kdu_model
=kdu_model
,
5123 timeout
=timeout_ns_action
,
5125 timeout
=timeout_ns_action
+ 10,
5128 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
5130 elif primitive_name
== "rollback":
5131 detailed_status
= await asyncio
.wait_for(
5132 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
5133 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5134 kdu_instance
=kdu
.get("kdu-instance"),
5137 timeout
=timeout_ns_action
,
5139 elif primitive_name
== "status":
5140 detailed_status
= await asyncio
.wait_for(
5141 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
5142 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5143 kdu_instance
=kdu
.get("kdu-instance"),
5146 timeout
=timeout_ns_action
,
5149 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
5150 kdu
["kdu-name"], nsr_id
5152 params
= self
._map
_primitive
_params
(
5153 config_primitive_desc
, primitive_params
, desc_params
5156 detailed_status
= await asyncio
.wait_for(
5157 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
5158 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5159 kdu_instance
=kdu_instance
,
5160 primitive_name
=primitive_name
,
5163 timeout
=timeout_ns_action
,
5166 timeout
=timeout_ns_action
,
5170 nslcmop_operation_state
= "COMPLETED"
5172 detailed_status
= ""
5173 nslcmop_operation_state
= "FAILED"
5175 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5176 nsr_deployed
["VCA"],
5177 member_vnf_index
=vnf_index
,
5179 vdu_count_index
=vdu_count_index
,
5180 ee_descriptor_id
=ee_descriptor_id
,
5182 for vca_index
, vca_deployed
in enumerate(
5183 db_nsr
["_admin"]["deployed"]["VCA"]
5185 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5187 "collection": "nsrs",
5188 "filter": {"_id": nsr_id
},
5189 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
5193 nslcmop_operation_state
,
5195 ) = await self
._ns
_execute
_primitive
(
5197 primitive
=primitive_name
,
5198 primitive_params
=self
._map
_primitive
_params
(
5199 config_primitive_desc
, primitive_params
, desc_params
5201 timeout
=timeout_ns_action
,
5207 db_nslcmop_update
["detailed-status"] = detailed_status
5208 error_description_nslcmop
= (
5209 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
5213 + " task Done with result {} {}".format(
5214 nslcmop_operation_state
, detailed_status
5217 return # database update is called inside finally
5219 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5220 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5222 except asyncio
.CancelledError
:
5224 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5226 exc
= "Operation was cancelled"
5227 except asyncio
.TimeoutError
:
5228 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5230 except Exception as e
:
5231 exc
= traceback
.format_exc()
5232 self
.logger
.critical(
5233 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5242 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5243 nslcmop_operation_state
= "FAILED"
5245 self
._write
_ns
_status
(
5249 ], # TODO check if degraded. For the moment use previous status
5250 current_operation
="IDLE",
5251 current_operation_id
=None,
5252 # error_description=error_description_nsr,
5253 # error_detail=error_detail,
5254 other_update
=db_nsr_update
,
5257 self
._write
_op
_status
(
5260 error_message
=error_description_nslcmop
,
5261 operation_state
=nslcmop_operation_state
,
5262 other_update
=db_nslcmop_update
,
5265 if nslcmop_operation_state
:
5267 await self
.msg
.aiowrite(
5272 "nslcmop_id": nslcmop_id
,
5273 "operationState": nslcmop_operation_state
,
5277 except Exception as e
:
5279 logging_text
+ "kafka_write notification Exception {}".format(e
)
5281 self
.logger
.debug(logging_text
+ "Exit")
5282 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5283 return nslcmop_operation_state
, detailed_status
5285 async def scale(self
, nsr_id
, nslcmop_id
):
5286 # Try to lock HA task here
5287 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5288 if not task_is_locked_by_me
:
5291 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
5292 stage
= ["", "", ""]
5293 tasks_dict_info
= {}
5294 # ^ stage, step, VIM progress
5295 self
.logger
.debug(logging_text
+ "Enter")
5296 # get all needed from database
5298 db_nslcmop_update
= {}
5301 # in case of error, indicates what part of scale was failed to put nsr at error status
5302 scale_process
= None
5303 old_operational_status
= ""
5304 old_config_status
= ""
5307 # wait for any previous tasks in process
5308 step
= "Waiting for previous operations to terminate"
5309 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5310 self
._write
_ns
_status
(
5313 current_operation
="SCALING",
5314 current_operation_id
=nslcmop_id
,
5317 step
= "Getting nslcmop from database"
5319 step
+ " after having waited for previous tasks to be completed"
5321 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5323 step
= "Getting nsr from database"
5324 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5325 old_operational_status
= db_nsr
["operational-status"]
5326 old_config_status
= db_nsr
["config-status"]
5328 step
= "Parsing scaling parameters"
5329 db_nsr_update
["operational-status"] = "scaling"
5330 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5331 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5333 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
5335 ]["member-vnf-index"]
5336 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
5338 ]["scaling-group-descriptor"]
5339 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
5340 # for backward compatibility
5341 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5342 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5343 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5344 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5346 step
= "Getting vnfr from database"
5347 db_vnfr
= self
.db
.get_one(
5348 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
5351 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5353 step
= "Getting vnfd from database"
5354 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
5356 base_folder
= db_vnfd
["_admin"]["storage"]
5358 step
= "Getting scaling-group-descriptor"
5359 scaling_descriptor
= find_in_list(
5360 get_scaling_aspect(db_vnfd
),
5361 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
5363 if not scaling_descriptor
:
5365 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
5366 "at vnfd:scaling-group-descriptor".format(scaling_group
)
5369 step
= "Sending scale order to VIM"
5370 # TODO check if ns is in a proper status
5372 if not db_nsr
["_admin"].get("scaling-group"):
5377 "_admin.scaling-group": [
5378 {"name": scaling_group
, "nb-scale-op": 0}
5382 admin_scale_index
= 0
5384 for admin_scale_index
, admin_scale_info
in enumerate(
5385 db_nsr
["_admin"]["scaling-group"]
5387 if admin_scale_info
["name"] == scaling_group
:
5388 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
5390 else: # not found, set index one plus last element and add new entry with the name
5391 admin_scale_index
+= 1
5393 "_admin.scaling-group.{}.name".format(admin_scale_index
)
5396 vca_scaling_info
= []
5397 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
5398 if scaling_type
== "SCALE_OUT":
5399 if "aspect-delta-details" not in scaling_descriptor
:
5401 "Aspect delta details not fount in scaling descriptor {}".format(
5402 scaling_descriptor
["name"]
5405 # count if max-instance-count is reached
5406 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
5408 scaling_info
["scaling_direction"] = "OUT"
5409 scaling_info
["vdu-create"] = {}
5410 scaling_info
["kdu-create"] = {}
5411 for delta
in deltas
:
5412 for vdu_delta
in delta
.get("vdu-delta", {}):
5413 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
5414 # vdu_index also provides the number of instance of the targeted vdu
5415 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
5416 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
5420 additional_params
= (
5421 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
5424 cloud_init_list
= []
5426 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
5427 max_instance_count
= 10
5428 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
5429 max_instance_count
= vdu_profile
.get(
5430 "max-number-of-instances", 10
5433 default_instance_num
= get_number_of_instances(
5436 instances_number
= vdu_delta
.get("number-of-instances", 1)
5437 nb_scale_op
+= instances_number
5439 new_instance_count
= nb_scale_op
+ default_instance_num
5440 # Control if new count is over max and vdu count is less than max.
5441 # Then assign new instance count
5442 if new_instance_count
> max_instance_count
> vdu_count
:
5443 instances_number
= new_instance_count
- max_instance_count
5445 instances_number
= instances_number
5447 if new_instance_count
> max_instance_count
:
5449 "reached the limit of {} (max-instance-count) "
5450 "scaling-out operations for the "
5451 "scaling-group-descriptor '{}'".format(
5452 nb_scale_op
, scaling_group
5455 for x
in range(vdu_delta
.get("number-of-instances", 1)):
5457 # TODO Information of its own ip is not available because db_vnfr is not updated.
5458 additional_params
["OSM"] = get_osm_params(
5459 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
5461 cloud_init_list
.append(
5462 self
._parse
_cloud
_init
(
5469 vca_scaling_info
.append(
5471 "osm_vdu_id": vdu_delta
["id"],
5472 "member-vnf-index": vnf_index
,
5474 "vdu_index": vdu_index
+ x
,
5477 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
5478 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
5479 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
5480 kdu_name
= kdu_profile
["kdu-name"]
5481 resource_name
= kdu_profile
["resource-name"]
5483 # Might have different kdus in the same delta
5484 # Should have list for each kdu
5485 if not scaling_info
["kdu-create"].get(kdu_name
, None):
5486 scaling_info
["kdu-create"][kdu_name
] = []
5488 kdur
= get_kdur(db_vnfr
, kdu_name
)
5489 if kdur
.get("helm-chart"):
5490 k8s_cluster_type
= "helm-chart-v3"
5491 self
.logger
.debug("kdur: {}".format(kdur
))
5493 kdur
.get("helm-version")
5494 and kdur
.get("helm-version") == "v2"
5496 k8s_cluster_type
= "helm-chart"
5497 raise NotImplementedError
5498 elif kdur
.get("juju-bundle"):
5499 k8s_cluster_type
= "juju-bundle"
5502 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5503 "juju-bundle. Maybe an old NBI version is running".format(
5504 db_vnfr
["member-vnf-index-ref"], kdu_name
5508 max_instance_count
= 10
5509 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
5510 max_instance_count
= kdu_profile
.get(
5511 "max-number-of-instances", 10
5514 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
5515 deployed_kdu
, _
= get_deployed_kdu(
5516 nsr_deployed
, kdu_name
, vnf_index
5518 if deployed_kdu
is None:
5520 "KDU '{}' for vnf '{}' not deployed".format(
5524 kdu_instance
= deployed_kdu
.get("kdu-instance")
5525 instance_num
= await self
.k8scluster_map
[
5527 ].get_scale_count(resource_name
, kdu_instance
, vca_id
=vca_id
)
5528 kdu_replica_count
= instance_num
+ kdu_delta
.get(
5529 "number-of-instances", 1
5532 # Control if new count is over max and instance_num is less than max.
5533 # Then assign max instance number to kdu replica count
5534 if kdu_replica_count
> max_instance_count
> instance_num
:
5535 kdu_replica_count
= max_instance_count
5536 if kdu_replica_count
> max_instance_count
:
5538 "reached the limit of {} (max-instance-count) "
5539 "scaling-out operations for the "
5540 "scaling-group-descriptor '{}'".format(
5541 instance_num
, scaling_group
5545 for x
in range(kdu_delta
.get("number-of-instances", 1)):
5546 vca_scaling_info
.append(
5548 "osm_kdu_id": kdu_name
,
5549 "member-vnf-index": vnf_index
,
5551 "kdu_index": instance_num
+ x
- 1,
5554 scaling_info
["kdu-create"][kdu_name
].append(
5556 "member-vnf-index": vnf_index
,
5558 "k8s-cluster-type": k8s_cluster_type
,
5559 "resource-name": resource_name
,
5560 "scale": kdu_replica_count
,
5563 elif scaling_type
== "SCALE_IN":
5564 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
5566 scaling_info
["scaling_direction"] = "IN"
5567 scaling_info
["vdu-delete"] = {}
5568 scaling_info
["kdu-delete"] = {}
5570 for delta
in deltas
:
5571 for vdu_delta
in delta
.get("vdu-delta", {}):
5572 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
5573 min_instance_count
= 0
5574 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
5575 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
5576 min_instance_count
= vdu_profile
["min-number-of-instances"]
5578 default_instance_num
= get_number_of_instances(
5579 db_vnfd
, vdu_delta
["id"]
5581 instance_num
= vdu_delta
.get("number-of-instances", 1)
5582 nb_scale_op
-= instance_num
5584 new_instance_count
= nb_scale_op
+ default_instance_num
5586 if new_instance_count
< min_instance_count
< vdu_count
:
5587 instances_number
= min_instance_count
- new_instance_count
5589 instances_number
= instance_num
5591 if new_instance_count
< min_instance_count
:
5593 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5594 "scaling-group-descriptor '{}'".format(
5595 nb_scale_op
, scaling_group
5598 for x
in range(vdu_delta
.get("number-of-instances", 1)):
5599 vca_scaling_info
.append(
5601 "osm_vdu_id": vdu_delta
["id"],
5602 "member-vnf-index": vnf_index
,
5604 "vdu_index": vdu_index
- 1 - x
,
5607 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
5608 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
5609 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
5610 kdu_name
= kdu_profile
["kdu-name"]
5611 resource_name
= kdu_profile
["resource-name"]
5613 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
5614 scaling_info
["kdu-delete"][kdu_name
] = []
5616 kdur
= get_kdur(db_vnfr
, kdu_name
)
5617 if kdur
.get("helm-chart"):
5618 k8s_cluster_type
= "helm-chart-v3"
5619 self
.logger
.debug("kdur: {}".format(kdur
))
5621 kdur
.get("helm-version")
5622 and kdur
.get("helm-version") == "v2"
5624 k8s_cluster_type
= "helm-chart"
5625 raise NotImplementedError
5626 elif kdur
.get("juju-bundle"):
5627 k8s_cluster_type
= "juju-bundle"
5630 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5631 "juju-bundle. Maybe an old NBI version is running".format(
5632 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
5636 min_instance_count
= 0
5637 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
5638 min_instance_count
= kdu_profile
["min-number-of-instances"]
5640 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
5641 deployed_kdu
, _
= get_deployed_kdu(
5642 nsr_deployed
, kdu_name
, vnf_index
5644 if deployed_kdu
is None:
5646 "KDU '{}' for vnf '{}' not deployed".format(
5650 kdu_instance
= deployed_kdu
.get("kdu-instance")
5651 instance_num
= await self
.k8scluster_map
[
5653 ].get_scale_count(resource_name
, kdu_instance
, vca_id
=vca_id
)
5654 kdu_replica_count
= instance_num
- kdu_delta
.get(
5655 "number-of-instances", 1
5658 if kdu_replica_count
< min_instance_count
< instance_num
:
5659 kdu_replica_count
= min_instance_count
5660 if kdu_replica_count
< min_instance_count
:
5662 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5663 "scaling-group-descriptor '{}'".format(
5664 instance_num
, scaling_group
5668 for x
in range(kdu_delta
.get("number-of-instances", 1)):
5669 vca_scaling_info
.append(
5671 "osm_kdu_id": kdu_name
,
5672 "member-vnf-index": vnf_index
,
5674 "kdu_index": instance_num
- x
- 1,
5677 scaling_info
["kdu-delete"][kdu_name
].append(
5679 "member-vnf-index": vnf_index
,
5681 "k8s-cluster-type": k8s_cluster_type
,
5682 "resource-name": resource_name
,
5683 "scale": kdu_replica_count
,
5687 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
5688 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
5689 if scaling_info
["scaling_direction"] == "IN":
5690 for vdur
in reversed(db_vnfr
["vdur"]):
5691 if vdu_delete
.get(vdur
["vdu-id-ref"]):
5692 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
5693 scaling_info
["vdu"].append(
5695 "name": vdur
.get("name") or vdur
.get("vdu-name"),
5696 "vdu_id": vdur
["vdu-id-ref"],
5700 for interface
in vdur
["interfaces"]:
5701 scaling_info
["vdu"][-1]["interface"].append(
5703 "name": interface
["name"],
5704 "ip_address": interface
["ip-address"],
5705 "mac_address": interface
.get("mac-address"),
5708 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
5711 step
= "Executing pre-scale vnf-config-primitive"
5712 if scaling_descriptor
.get("scaling-config-action"):
5713 for scaling_config_action
in scaling_descriptor
[
5714 "scaling-config-action"
5717 scaling_config_action
.get("trigger") == "pre-scale-in"
5718 and scaling_type
== "SCALE_IN"
5720 scaling_config_action
.get("trigger") == "pre-scale-out"
5721 and scaling_type
== "SCALE_OUT"
5723 vnf_config_primitive
= scaling_config_action
[
5724 "vnf-config-primitive-name-ref"
5726 step
= db_nslcmop_update
[
5728 ] = "executing pre-scale scaling-config-action '{}'".format(
5729 vnf_config_primitive
5732 # look for primitive
5733 for config_primitive
in (
5734 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
5735 ).get("config-primitive", ()):
5736 if config_primitive
["name"] == vnf_config_primitive
:
5740 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
5741 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
5742 "primitive".format(scaling_group
, vnf_config_primitive
)
5745 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
5746 if db_vnfr
.get("additionalParamsForVnf"):
5747 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
5749 scale_process
= "VCA"
5750 db_nsr_update
["config-status"] = "configuring pre-scaling"
5751 primitive_params
= self
._map
_primitive
_params
(
5752 config_primitive
, {}, vnfr_params
5755 # Pre-scale retry check: Check if this sub-operation has been executed before
5756 op_index
= self
._check
_or
_add
_scale
_suboperation
(
5759 vnf_config_primitive
,
5763 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
5764 # Skip sub-operation
5765 result
= "COMPLETED"
5766 result_detail
= "Done"
5769 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5770 vnf_config_primitive
, result
, result_detail
5774 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
5775 # New sub-operation: Get index of this sub-operation
5777 len(db_nslcmop
.get("_admin", {}).get("operations"))
5782 + "vnf_config_primitive={} New sub-operation".format(
5783 vnf_config_primitive
5787 # retry: Get registered params for this existing sub-operation
5788 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
5791 vnf_index
= op
.get("member_vnf_index")
5792 vnf_config_primitive
= op
.get("primitive")
5793 primitive_params
= op
.get("primitive_params")
5796 + "vnf_config_primitive={} Sub-operation retry".format(
5797 vnf_config_primitive
5800 # Execute the primitive, either with new (first-time) or registered (reintent) args
5801 ee_descriptor_id
= config_primitive
.get(
5802 "execution-environment-ref"
5804 primitive_name
= config_primitive
.get(
5805 "execution-environment-primitive", vnf_config_primitive
5807 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5808 nsr_deployed
["VCA"],
5809 member_vnf_index
=vnf_index
,
5811 vdu_count_index
=None,
5812 ee_descriptor_id
=ee_descriptor_id
,
5814 result
, result_detail
= await self
._ns
_execute
_primitive
(
5823 + "vnf_config_primitive={} Done with result {} {}".format(
5824 vnf_config_primitive
, result
, result_detail
5827 # Update operationState = COMPLETED | FAILED
5828 self
._update
_suboperation
_status
(
5829 db_nslcmop
, op_index
, result
, result_detail
5832 if result
== "FAILED":
5833 raise LcmException(result_detail
)
5834 db_nsr_update
["config-status"] = old_config_status
5835 scale_process
= None
5839 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
5842 "_admin.scaling-group.{}.time".format(admin_scale_index
)
5845 # SCALE-IN VCA - BEGIN
5846 if vca_scaling_info
:
5847 step
= db_nslcmop_update
[
5849 ] = "Deleting the execution environments"
5850 scale_process
= "VCA"
5851 for vca_info
in vca_scaling_info
:
5852 if vca_info
["type"] == "delete":
5853 member_vnf_index
= str(vca_info
["member-vnf-index"])
5855 logging_text
+ "vdu info: {}".format(vca_info
)
5857 if vca_info
.get("osm_vdu_id"):
5858 vdu_id
= vca_info
["osm_vdu_id"]
5859 vdu_index
= int(vca_info
["vdu_index"])
5862 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5863 member_vnf_index
, vdu_id
, vdu_index
5867 kdu_id
= vca_info
["osm_kdu_id"]
5870 ] = "Scaling member_vnf_index={}, kdu_id={}, vdu_index={} ".format(
5871 member_vnf_index
, kdu_id
, vdu_index
5873 stage
[2] = step
= "Scaling in VCA"
5874 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
5875 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
5876 config_update
= db_nsr
["configurationStatus"]
5877 for vca_index
, vca
in enumerate(vca_update
):
5879 (vca
or vca
.get("ee_id"))
5880 and vca
["member-vnf-index"] == member_vnf_index
5881 and vca
["vdu_count_index"] == vdu_index
5883 if vca
.get("vdu_id"):
5884 config_descriptor
= get_configuration(
5885 db_vnfd
, vca
.get("vdu_id")
5887 elif vca
.get("kdu_name"):
5888 config_descriptor
= get_configuration(
5889 db_vnfd
, vca
.get("kdu_name")
5892 config_descriptor
= get_configuration(
5893 db_vnfd
, db_vnfd
["id"]
5895 operation_params
= (
5896 db_nslcmop
.get("operationParams") or {}
5898 exec_terminate_primitives
= not operation_params
.get(
5899 "skip_terminate_primitives"
5900 ) and vca
.get("needed_terminate")
5901 task
= asyncio
.ensure_future(
5910 exec_primitives
=exec_terminate_primitives
,
5914 timeout
=self
.timeout_charm_delete
,
5917 tasks_dict_info
[task
] = "Terminating VCA {}".format(
5920 del vca_update
[vca_index
]
5921 del config_update
[vca_index
]
5922 # wait for pending tasks of terminate primitives
5926 + "Waiting for tasks {}".format(
5927 list(tasks_dict_info
.keys())
5930 error_list
= await self
._wait
_for
_tasks
(
5934 self
.timeout_charm_delete
, self
.timeout_ns_terminate
5939 tasks_dict_info
.clear()
5941 raise LcmException("; ".join(error_list
))
5943 db_vca_and_config_update
= {
5944 "_admin.deployed.VCA": vca_update
,
5945 "configurationStatus": config_update
,
5948 "nsrs", db_nsr
["_id"], db_vca_and_config_update
5950 scale_process
= None
5951 # SCALE-IN VCA - END
5954 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
5955 scale_process
= "RO"
5956 if self
.ro_config
.get("ng"):
5957 await self
._scale
_ng
_ro
(
5958 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
5960 scaling_info
.pop("vdu-create", None)
5961 scaling_info
.pop("vdu-delete", None)
5963 scale_process
= None
5967 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
5968 scale_process
= "KDU"
5969 await self
._scale
_kdu
(
5970 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
5972 scaling_info
.pop("kdu-create", None)
5973 scaling_info
.pop("kdu-delete", None)
5975 scale_process
= None
5979 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5981 # SCALE-UP VCA - BEGIN
5982 if vca_scaling_info
:
5983 step
= db_nslcmop_update
[
5985 ] = "Creating new execution environments"
5986 scale_process
= "VCA"
5987 for vca_info
in vca_scaling_info
:
5988 if vca_info
["type"] == "create":
5989 member_vnf_index
= str(vca_info
["member-vnf-index"])
5991 logging_text
+ "vdu info: {}".format(vca_info
)
5993 vnfd_id
= db_vnfr
["vnfd-ref"]
5994 if vca_info
.get("osm_vdu_id"):
5995 vdu_index
= int(vca_info
["vdu_index"])
5996 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
5997 if db_vnfr
.get("additionalParamsForVnf"):
5998 deploy_params
.update(
6000 db_vnfr
["additionalParamsForVnf"].copy()
6003 descriptor_config
= get_configuration(
6004 db_vnfd
, db_vnfd
["id"]
6006 if descriptor_config
:
6011 logging_text
=logging_text
6012 + "member_vnf_index={} ".format(member_vnf_index
),
6015 nslcmop_id
=nslcmop_id
,
6021 member_vnf_index
=member_vnf_index
,
6022 vdu_index
=vdu_index
,
6024 deploy_params
=deploy_params
,
6025 descriptor_config
=descriptor_config
,
6026 base_folder
=base_folder
,
6027 task_instantiation_info
=tasks_dict_info
,
6030 vdu_id
= vca_info
["osm_vdu_id"]
6031 vdur
= find_in_list(
6032 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
6034 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
6035 if vdur
.get("additionalParams"):
6036 deploy_params_vdu
= parse_yaml_strings(
6037 vdur
["additionalParams"]
6040 deploy_params_vdu
= deploy_params
6041 deploy_params_vdu
["OSM"] = get_osm_params(
6042 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
6044 if descriptor_config
:
6049 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6050 member_vnf_index
, vdu_id
, vdu_index
6052 stage
[2] = step
= "Scaling out VCA"
6053 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6055 logging_text
=logging_text
6056 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6057 member_vnf_index
, vdu_id
, vdu_index
6061 nslcmop_id
=nslcmop_id
,
6067 member_vnf_index
=member_vnf_index
,
6068 vdu_index
=vdu_index
,
6070 deploy_params
=deploy_params_vdu
,
6071 descriptor_config
=descriptor_config
,
6072 base_folder
=base_folder
,
6073 task_instantiation_info
=tasks_dict_info
,
6077 kdu_name
= vca_info
["osm_kdu_id"]
6078 descriptor_config
= get_configuration(db_vnfd
, kdu_name
)
6079 if descriptor_config
:
6081 kdu_index
= int(vca_info
["kdu_index"])
6085 for x
in db_vnfr
["kdur"]
6086 if x
["kdu-name"] == kdu_name
6088 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
6089 if kdur
.get("additionalParams"):
6090 deploy_params_kdu
= parse_yaml_strings(
6091 kdur
["additionalParams"]
6095 logging_text
=logging_text
,
6098 nslcmop_id
=nslcmop_id
,
6104 member_vnf_index
=member_vnf_index
,
6105 vdu_index
=kdu_index
,
6107 deploy_params
=deploy_params_kdu
,
6108 descriptor_config
=descriptor_config
,
6109 base_folder
=base_folder
,
6110 task_instantiation_info
=tasks_dict_info
,
6113 # SCALE-UP VCA - END
6114 scale_process
= None
6117 # execute primitive service POST-SCALING
6118 step
= "Executing post-scale vnf-config-primitive"
6119 if scaling_descriptor
.get("scaling-config-action"):
6120 for scaling_config_action
in scaling_descriptor
[
6121 "scaling-config-action"
6124 scaling_config_action
.get("trigger") == "post-scale-in"
6125 and scaling_type
== "SCALE_IN"
6127 scaling_config_action
.get("trigger") == "post-scale-out"
6128 and scaling_type
== "SCALE_OUT"
6130 vnf_config_primitive
= scaling_config_action
[
6131 "vnf-config-primitive-name-ref"
6133 step
= db_nslcmop_update
[
6135 ] = "executing post-scale scaling-config-action '{}'".format(
6136 vnf_config_primitive
6139 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6140 if db_vnfr
.get("additionalParamsForVnf"):
6141 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6143 # look for primitive
6144 for config_primitive
in (
6145 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6146 ).get("config-primitive", ()):
6147 if config_primitive
["name"] == vnf_config_primitive
:
6151 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
6152 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
6153 "config-primitive".format(
6154 scaling_group
, vnf_config_primitive
6157 scale_process
= "VCA"
6158 db_nsr_update
["config-status"] = "configuring post-scaling"
6159 primitive_params
= self
._map
_primitive
_params
(
6160 config_primitive
, {}, vnfr_params
6163 # Post-scale retry check: Check if this sub-operation has been executed before
6164 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6167 vnf_config_primitive
,
6171 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6172 # Skip sub-operation
6173 result
= "COMPLETED"
6174 result_detail
= "Done"
6177 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6178 vnf_config_primitive
, result
, result_detail
6182 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6183 # New sub-operation: Get index of this sub-operation
6185 len(db_nslcmop
.get("_admin", {}).get("operations"))
6190 + "vnf_config_primitive={} New sub-operation".format(
6191 vnf_config_primitive
6195 # retry: Get registered params for this existing sub-operation
6196 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6199 vnf_index
= op
.get("member_vnf_index")
6200 vnf_config_primitive
= op
.get("primitive")
6201 primitive_params
= op
.get("primitive_params")
6204 + "vnf_config_primitive={} Sub-operation retry".format(
6205 vnf_config_primitive
6208 # Execute the primitive, either with new (first-time) or registered (reintent) args
6209 ee_descriptor_id
= config_primitive
.get(
6210 "execution-environment-ref"
6212 primitive_name
= config_primitive
.get(
6213 "execution-environment-primitive", vnf_config_primitive
6215 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6216 nsr_deployed
["VCA"],
6217 member_vnf_index
=vnf_index
,
6219 vdu_count_index
=None,
6220 ee_descriptor_id
=ee_descriptor_id
,
6222 result
, result_detail
= await self
._ns
_execute
_primitive
(
6231 + "vnf_config_primitive={} Done with result {} {}".format(
6232 vnf_config_primitive
, result
, result_detail
6235 # Update operationState = COMPLETED | FAILED
6236 self
._update
_suboperation
_status
(
6237 db_nslcmop
, op_index
, result
, result_detail
6240 if result
== "FAILED":
6241 raise LcmException(result_detail
)
6242 db_nsr_update
["config-status"] = old_config_status
6243 scale_process
= None
6248 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
6249 db_nsr_update
["operational-status"] = (
6251 if old_operational_status
== "failed"
6252 else old_operational_status
6254 db_nsr_update
["config-status"] = old_config_status
6257 ROclient
.ROClientException
,
6262 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
6264 except asyncio
.CancelledError
:
6266 logging_text
+ "Cancelled Exception while '{}'".format(step
)
6268 exc
= "Operation was cancelled"
6269 except Exception as e
:
6270 exc
= traceback
.format_exc()
6271 self
.logger
.critical(
6272 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6276 self
._write
_ns
_status
(
6279 current_operation
="IDLE",
6280 current_operation_id
=None,
6283 stage
[1] = "Waiting for instantiate pending tasks."
6284 self
.logger
.debug(logging_text
+ stage
[1])
6285 exc
= await self
._wait
_for
_tasks
(
6288 self
.timeout_ns_deploy
,
6296 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
6297 nslcmop_operation_state
= "FAILED"
6299 db_nsr_update
["operational-status"] = old_operational_status
6300 db_nsr_update
["config-status"] = old_config_status
6301 db_nsr_update
["detailed-status"] = ""
6303 if "VCA" in scale_process
:
6304 db_nsr_update
["config-status"] = "failed"
6305 if "RO" in scale_process
:
6306 db_nsr_update
["operational-status"] = "failed"
6309 ] = "FAILED scaling nslcmop={} {}: {}".format(
6310 nslcmop_id
, step
, exc
6313 error_description_nslcmop
= None
6314 nslcmop_operation_state
= "COMPLETED"
6315 db_nslcmop_update
["detailed-status"] = "Done"
6317 self
._write
_op
_status
(
6320 error_message
=error_description_nslcmop
,
6321 operation_state
=nslcmop_operation_state
,
6322 other_update
=db_nslcmop_update
,
6325 self
._write
_ns
_status
(
6328 current_operation
="IDLE",
6329 current_operation_id
=None,
6330 other_update
=db_nsr_update
,
6333 if nslcmop_operation_state
:
6337 "nslcmop_id": nslcmop_id
,
6338 "operationState": nslcmop_operation_state
,
6340 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
6341 except Exception as e
:
6343 logging_text
+ "kafka_write notification Exception {}".format(e
)
6345 self
.logger
.debug(logging_text
+ "Exit")
6346 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
6348 async def _scale_kdu(
6349 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6351 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
6352 for kdu_name
in _scaling_info
:
6353 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
6354 deployed_kdu
, index
= get_deployed_kdu(
6355 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
6357 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
6358 kdu_instance
= deployed_kdu
["kdu-instance"]
6359 scale
= int(kdu_scaling_info
["scale"])
6360 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
6363 "collection": "nsrs",
6364 "filter": {"_id": nsr_id
},
6365 "path": "_admin.deployed.K8s.{}".format(index
),
6368 step
= "scaling application {}".format(
6369 kdu_scaling_info
["resource-name"]
6371 self
.logger
.debug(logging_text
+ step
)
6373 if kdu_scaling_info
["type"] == "delete":
6374 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
6377 and kdu_config
.get("terminate-config-primitive")
6378 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
6380 terminate_config_primitive_list
= kdu_config
.get(
6381 "terminate-config-primitive"
6383 terminate_config_primitive_list
.sort(
6384 key
=lambda val
: int(val
["seq"])
6388 terminate_config_primitive
6389 ) in terminate_config_primitive_list
:
6390 primitive_params_
= self
._map
_primitive
_params
(
6391 terminate_config_primitive
, {}, {}
6393 step
= "execute terminate config primitive"
6394 self
.logger
.debug(logging_text
+ step
)
6395 await asyncio
.wait_for(
6396 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
6397 cluster_uuid
=cluster_uuid
,
6398 kdu_instance
=kdu_instance
,
6399 primitive_name
=terminate_config_primitive
["name"],
6400 params
=primitive_params_
,
6407 await asyncio
.wait_for(
6408 self
.k8scluster_map
[k8s_cluster_type
].scale(
6411 kdu_scaling_info
["resource-name"],
6414 timeout
=self
.timeout_vca_on_error
,
6417 if kdu_scaling_info
["type"] == "create":
6418 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
6421 and kdu_config
.get("initial-config-primitive")
6422 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
6424 initial_config_primitive_list
= kdu_config
.get(
6425 "initial-config-primitive"
6427 initial_config_primitive_list
.sort(
6428 key
=lambda val
: int(val
["seq"])
6431 for initial_config_primitive
in initial_config_primitive_list
:
6432 primitive_params_
= self
._map
_primitive
_params
(
6433 initial_config_primitive
, {}, {}
6435 step
= "execute initial config primitive"
6436 self
.logger
.debug(logging_text
+ step
)
6437 await asyncio
.wait_for(
6438 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
6439 cluster_uuid
=cluster_uuid
,
6440 kdu_instance
=kdu_instance
,
6441 primitive_name
=initial_config_primitive
["name"],
6442 params
=primitive_params_
,
6449 async def _scale_ng_ro(
6450 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
6452 nsr_id
= db_nslcmop
["nsInstanceId"]
6453 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
6456 # read from db: vnfd's for every vnf
6459 # for each vnf in ns, read vnfd
6460 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
6461 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
6462 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
6463 # if we haven't this vnfd, read it from db
6464 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
6466 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
6467 db_vnfds
.append(vnfd
)
6468 n2vc_key
= self
.n2vc
.get_public_key()
6469 n2vc_key_list
= [n2vc_key
]
6472 vdu_scaling_info
.get("vdu-create"),
6473 vdu_scaling_info
.get("vdu-delete"),
6476 # db_vnfr has been updated, update db_vnfrs to use it
6477 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
6478 await self
._instantiate
_ng
_ro
(
6488 start_deploy
=time(),
6489 timeout_ns_deploy
=self
.timeout_ns_deploy
,
6491 if vdu_scaling_info
.get("vdu-delete"):
6493 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
6496 async def extract_prometheus_scrape_jobs(
6497 self
, ee_id
, artifact_path
, ee_config_descriptor
, vnfr_id
, nsr_id
, target_ip
6499 # look if exist a file called 'prometheus*.j2' and
6500 artifact_content
= self
.fs
.dir_ls(artifact_path
)
6504 for f
in artifact_content
6505 if f
.startswith("prometheus") and f
.endswith(".j2")
6511 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
6515 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
6516 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
6518 vnfr_id
= vnfr_id
.replace("-", "")
6520 "JOB_NAME": vnfr_id
,
6521 "TARGET_IP": target_ip
,
6522 "EXPORTER_POD_IP": host_name
,
6523 "EXPORTER_POD_PORT": host_port
,
6525 job_list
= parse_job(job_data
, variables
)
6526 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
6527 for job
in job_list
:
6529 not isinstance(job
.get("job_name"), str)
6530 or vnfr_id
not in job
["job_name"]
6532 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
6533 job
["nsr_id"] = nsr_id
6534 job
["vnfr_id"] = vnfr_id
6537 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
6539 Get VCA Cloud and VCA Cloud Credentials for the VIM account
6541 :param: vim_account_id: VIM Account ID
6543 :return: (cloud_name, cloud_credential)
6545 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
6546 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
6548 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
6550 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
6552 :param: vim_account_id: VIM Account ID
6554 :return: (cloud_name, cloud_credential)
6556 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
6557 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")