1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
20 from typing
import Any
, Dict
, List
23 import logging
.handlers
34 from osm_lcm
import ROclient
35 from osm_lcm
.data_utils
.nsr
import (
38 get_deployed_vca_list
,
41 from osm_lcm
.data_utils
.vca
import (
50 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
51 from osm_lcm
.lcm_utils
import (
59 from osm_lcm
.data_utils
.nsd
import (
60 get_ns_configuration_relation_list
,
64 from osm_lcm
.data_utils
.vnfd
import (
68 get_ee_sorted_initial_config_primitive_list
,
69 get_ee_sorted_terminate_config_primitive_list
,
71 get_virtual_link_profiles
,
76 get_number_of_instances
,
78 get_kdu_resource_profile
,
80 from osm_lcm
.data_utils
.list_utils
import find_in_list
81 from osm_lcm
.data_utils
.vnfr
import get_osm_params
, get_vdur_index
, get_kdur
82 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
83 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
84 from n2vc
.definitions
import RelationEndpoint
85 from n2vc
.k8s_helm_conn
import K8sHelmConnector
86 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
87 from n2vc
.k8s_juju_conn
import K8sJujuConnector
89 from osm_common
.dbbase
import DbException
90 from osm_common
.fsbase
import FsException
92 from osm_lcm
.data_utils
.database
.database
import Database
93 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
95 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
96 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
98 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
99 from osm_lcm
.prometheus
import parse_job
101 from copy
import copy
, deepcopy
102 from time
import time
103 from uuid
import uuid4
105 from random
import randint
107 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
110 class NsLcm(LcmBase
):
111 timeout_vca_on_error
= (
113 ) # Time for charm from first time at blocked,error status to mark as failed
114 timeout_ns_deploy
= 2 * 3600 # default global timeout for deployment a ns
115 timeout_ns_terminate
= 1800 # default global timeout for un deployment a ns
116 timeout_charm_delete
= 10 * 60
117 timeout_primitive
= 30 * 60 # timeout for primitive execution
118 timeout_progress_primitive
= (
120 ) # timeout for some progress in a primitive execution
122 SUBOPERATION_STATUS_NOT_FOUND
= -1
123 SUBOPERATION_STATUS_NEW
= -2
124 SUBOPERATION_STATUS_SKIP
= -3
125 task_name_deploy_vca
= "Deploying VCA"
127 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
129 Init, Connect to database, filesystem storage, and messaging
130 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
133 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
135 self
.db
= Database().instance
.db
136 self
.fs
= Filesystem().instance
.fs
138 self
.lcm_tasks
= lcm_tasks
139 self
.timeout
= config
["timeout"]
140 self
.ro_config
= config
["ro_config"]
141 self
.ng_ro
= config
["ro_config"].get("ng")
142 self
.vca_config
= config
["VCA"].copy()
144 # create N2VC connector
145 self
.n2vc
= N2VCJujuConnector(
148 on_update_db
=self
._on
_update
_n
2vc
_db
,
153 self
.conn_helm_ee
= LCMHelmConn(
156 vca_config
=self
.vca_config
,
157 on_update_db
=self
._on
_update
_n
2vc
_db
,
160 self
.k8sclusterhelm2
= K8sHelmConnector(
161 kubectl_command
=self
.vca_config
.get("kubectlpath"),
162 helm_command
=self
.vca_config
.get("helmpath"),
169 self
.k8sclusterhelm3
= K8sHelm3Connector(
170 kubectl_command
=self
.vca_config
.get("kubectlpath"),
171 helm_command
=self
.vca_config
.get("helm3path"),
178 self
.k8sclusterjuju
= K8sJujuConnector(
179 kubectl_command
=self
.vca_config
.get("kubectlpath"),
180 juju_command
=self
.vca_config
.get("jujupath"),
183 on_update_db
=self
._on
_update
_k
8s
_db
,
188 self
.k8scluster_map
= {
189 "helm-chart": self
.k8sclusterhelm2
,
190 "helm-chart-v3": self
.k8sclusterhelm3
,
191 "chart": self
.k8sclusterhelm3
,
192 "juju-bundle": self
.k8sclusterjuju
,
193 "juju": self
.k8sclusterjuju
,
197 "lxc_proxy_charm": self
.n2vc
,
198 "native_charm": self
.n2vc
,
199 "k8s_proxy_charm": self
.n2vc
,
200 "helm": self
.conn_helm_ee
,
201 "helm-v3": self
.conn_helm_ee
,
205 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
)
208 def increment_ip_mac(ip_mac
, vm_index
=1):
209 if not isinstance(ip_mac
, str):
212 # try with ipv4 look for last dot
213 i
= ip_mac
.rfind(".")
216 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
217 # try with ipv6 or mac look for last colon. Operate in hex
218 i
= ip_mac
.rfind(":")
221 # format in hex, len can be 2 for mac or 4 for ipv6
222 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
223 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
229 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
231 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
234 # TODO filter RO descriptor fields...
238 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
239 db_dict
["deploymentStatus"] = ro_descriptor
240 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
242 except Exception as e
:
244 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id
, e
)
247 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
249 # remove last dot from path (if exists)
250 if path
.endswith("."):
253 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
254 # .format(table, filter, path, updated_data))
257 nsr_id
= filter.get("_id")
259 # read ns record from database
260 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
261 current_ns_status
= nsr
.get("nsState")
263 # get vca status for NS
264 status_dict
= await self
.n2vc
.get_status(
265 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
270 db_dict
["vcaStatus"] = status_dict
271 await self
.n2vc
.update_vca_status(db_dict
["vcaStatus"], vca_id
=vca_id
)
273 # update configurationStatus for this VCA
275 vca_index
= int(path
[path
.rfind(".") + 1 :])
278 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
280 vca_status
= vca_list
[vca_index
].get("status")
282 configuration_status_list
= nsr
.get("configurationStatus")
283 config_status
= configuration_status_list
[vca_index
].get("status")
285 if config_status
== "BROKEN" and vca_status
!= "failed":
286 db_dict
["configurationStatus"][vca_index
] = "READY"
287 elif config_status
!= "BROKEN" and vca_status
== "failed":
288 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
289 except Exception as e
:
290 # not update configurationStatus
291 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
293 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
294 # if nsState = 'DEGRADED' check if all is OK
296 if current_ns_status
in ("READY", "DEGRADED"):
297 error_description
= ""
299 if status_dict
.get("machines"):
300 for machine_id
in status_dict
.get("machines"):
301 machine
= status_dict
.get("machines").get(machine_id
)
302 # check machine agent-status
303 if machine
.get("agent-status"):
304 s
= machine
.get("agent-status").get("status")
307 error_description
+= (
308 "machine {} agent-status={} ; ".format(
312 # check machine instance status
313 if machine
.get("instance-status"):
314 s
= machine
.get("instance-status").get("status")
317 error_description
+= (
318 "machine {} instance-status={} ; ".format(
323 if status_dict
.get("applications"):
324 for app_id
in status_dict
.get("applications"):
325 app
= status_dict
.get("applications").get(app_id
)
326 # check application status
327 if app
.get("status"):
328 s
= app
.get("status").get("status")
331 error_description
+= (
332 "application {} status={} ; ".format(app_id
, s
)
335 if error_description
:
336 db_dict
["errorDescription"] = error_description
337 if current_ns_status
== "READY" and is_degraded
:
338 db_dict
["nsState"] = "DEGRADED"
339 if current_ns_status
== "DEGRADED" and not is_degraded
:
340 db_dict
["nsState"] = "READY"
343 self
.update_db_2("nsrs", nsr_id
, db_dict
)
345 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
347 except Exception as e
:
348 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
350 async def _on_update_k8s_db(
351 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None, cluster_type
="juju"
354 Updating vca status in NSR record
355 :param cluster_uuid: UUID of a k8s cluster
356 :param kdu_instance: The unique name of the KDU instance
357 :param filter: To get nsr_id
358 :cluster_type: The cluster type (juju, k8s)
362 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
363 # .format(cluster_uuid, kdu_instance, filter))
365 nsr_id
= filter.get("_id")
367 vca_status
= await self
.k8scluster_map
[cluster_type
].status_kdu(
368 cluster_uuid
=cluster_uuid
,
369 kdu_instance
=kdu_instance
,
371 complete_status
=True,
377 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
379 if cluster_type
in ("juju-bundle", "juju"):
380 # TODO -> this should be done in a more uniform way, I think in N2VC, in order to update the K8s VCA
381 # status in a similar way between Juju Bundles and Helm Charts on this side
382 await self
.k8sclusterjuju
.update_vca_status(
383 db_dict
["vcaStatus"],
389 f
"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
393 self
.update_db_2("nsrs", nsr_id
, db_dict
)
394 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
396 except Exception as e
:
397 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
400 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
402 env
= Environment(undefined
=StrictUndefined
)
403 template
= env
.from_string(cloud_init_text
)
404 return template
.render(additional_params
or {})
405 except UndefinedError
as e
:
407 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
408 "file, must be provided in the instantiation parameters inside the "
409 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
411 except (TemplateError
, TemplateNotFound
) as e
:
413 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
418 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
419 cloud_init_content
= cloud_init_file
= None
421 if vdu
.get("cloud-init-file"):
422 base_folder
= vnfd
["_admin"]["storage"]
423 if base_folder
["pkg-dir"]:
424 cloud_init_file
= "{}/{}/cloud_init/{}".format(
425 base_folder
["folder"],
426 base_folder
["pkg-dir"],
427 vdu
["cloud-init-file"],
430 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
431 base_folder
["folder"],
432 vdu
["cloud-init-file"],
434 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
435 cloud_init_content
= ci_file
.read()
436 elif vdu
.get("cloud-init"):
437 cloud_init_content
= vdu
["cloud-init"]
439 return cloud_init_content
440 except FsException
as e
:
442 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
443 vnfd
["id"], vdu
["id"], cloud_init_file
, e
447 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
449 (vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]),
452 additional_params
= vdur
.get("additionalParams")
453 return parse_yaml_strings(additional_params
)
455 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
457 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
458 :param vnfd: input vnfd
459 :param new_id: overrides vnf id if provided
460 :param additionalParams: Instantiation params for VNFs provided
461 :param nsrId: Id of the NSR
462 :return: copy of vnfd
464 vnfd_RO
= deepcopy(vnfd
)
465 # remove unused by RO configuration, monitoring, scaling and internal keys
466 vnfd_RO
.pop("_id", None)
467 vnfd_RO
.pop("_admin", None)
468 vnfd_RO
.pop("monitoring-param", None)
469 vnfd_RO
.pop("scaling-group-descriptor", None)
470 vnfd_RO
.pop("kdu", None)
471 vnfd_RO
.pop("k8s-cluster", None)
473 vnfd_RO
["id"] = new_id
475 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
476 for vdu
in get_iterable(vnfd_RO
, "vdu"):
477 vdu
.pop("cloud-init-file", None)
478 vdu
.pop("cloud-init", None)
482 def ip_profile_2_RO(ip_profile
):
483 RO_ip_profile
= deepcopy(ip_profile
)
484 if "dns-server" in RO_ip_profile
:
485 if isinstance(RO_ip_profile
["dns-server"], list):
486 RO_ip_profile
["dns-address"] = []
487 for ds
in RO_ip_profile
.pop("dns-server"):
488 RO_ip_profile
["dns-address"].append(ds
["address"])
490 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
491 if RO_ip_profile
.get("ip-version") == "ipv4":
492 RO_ip_profile
["ip-version"] = "IPv4"
493 if RO_ip_profile
.get("ip-version") == "ipv6":
494 RO_ip_profile
["ip-version"] = "IPv6"
495 if "dhcp-params" in RO_ip_profile
:
496 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
499 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
500 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
501 if db_vim
["_admin"]["operationalState"] != "ENABLED":
503 "VIM={} is not available. operationalState={}".format(
504 vim_account
, db_vim
["_admin"]["operationalState"]
507 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
510 def get_ro_wim_id_for_wim_account(self
, wim_account
):
511 if isinstance(wim_account
, str):
512 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
513 if db_wim
["_admin"]["operationalState"] != "ENABLED":
515 "WIM={} is not available. operationalState={}".format(
516 wim_account
, db_wim
["_admin"]["operationalState"]
519 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
524 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
526 db_vdu_push_list
= []
528 db_update
= {"_admin.modified": time()}
530 for vdu_id
, vdu_count
in vdu_create
.items():
534 for vdur
in reversed(db_vnfr
["vdur"])
535 if vdur
["vdu-id-ref"] == vdu_id
540 # Read the template saved in the db:
541 self
.logger
.debug("No vdur in the database. Using the vdur-template to scale")
542 vdur_template
= db_vnfr
.get("vdur-template")
543 if not vdur_template
:
545 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
549 vdur
= vdur_template
[0]
550 # Delete a template from the database after using it
553 {"_id": db_vnfr
["_id"]},
555 pull
={"vdur-template": {"_id": vdur
['_id']}}
557 for count
in range(vdu_count
):
558 vdur_copy
= deepcopy(vdur
)
559 vdur_copy
["status"] = "BUILD"
560 vdur_copy
["status-detailed"] = None
561 vdur_copy
["ip-address"] = None
562 vdur_copy
["_id"] = str(uuid4())
563 vdur_copy
["count-index"] += count
+ 1
564 vdur_copy
["id"] = "{}-{}".format(
565 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
567 vdur_copy
.pop("vim_info", None)
568 for iface
in vdur_copy
["interfaces"]:
569 if iface
.get("fixed-ip"):
570 iface
["ip-address"] = self
.increment_ip_mac(
571 iface
["ip-address"], count
+ 1
574 iface
.pop("ip-address", None)
575 if iface
.get("fixed-mac"):
576 iface
["mac-address"] = self
.increment_ip_mac(
577 iface
["mac-address"], count
+ 1
580 iface
.pop("mac-address", None)
584 ) # only first vdu can be managment of vnf
585 db_vdu_push_list
.append(vdur_copy
)
586 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
588 if len(db_vnfr
["vdur"]) == 1:
589 # The scale will move to 0 instances
590 self
.logger
.debug("Scaling to 0 !, creating the template with the last vdur")
591 template_vdur
= [db_vnfr
["vdur"][0]]
592 for vdu_id
, vdu_count
in vdu_delete
.items():
594 indexes_to_delete
= [
596 for iv
in enumerate(db_vnfr
["vdur"])
597 if iv
[1]["vdu-id-ref"] == vdu_id
601 "vdur.{}.status".format(i
): "DELETING"
602 for i
in indexes_to_delete
[-vdu_count
:]
606 # it must be deleted one by one because common.db does not allow otherwise
609 for v
in reversed(db_vnfr
["vdur"])
610 if v
["vdu-id-ref"] == vdu_id
612 for vdu
in vdus_to_delete
[:vdu_count
]:
615 {"_id": db_vnfr
["_id"]},
617 pull
={"vdur": {"_id": vdu
["_id"]}},
621 db_push
["vdur"] = db_vdu_push_list
623 db_push
["vdur-template"] = template_vdur
626 db_vnfr
["vdur-template"] = template_vdur
627 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
628 # modify passed dictionary db_vnfr
629 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
630 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
632 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
634 Updates database nsr with the RO info for the created vld
635 :param ns_update_nsr: dictionary to be filled with the updated info
636 :param db_nsr: content of db_nsr. This is also modified
637 :param nsr_desc_RO: nsr descriptor from RO
638 :return: Nothing, LcmException is raised on errors
641 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
642 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
643 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
645 vld
["vim-id"] = net_RO
.get("vim_net_id")
646 vld
["name"] = net_RO
.get("vim_name")
647 vld
["status"] = net_RO
.get("status")
648 vld
["status-detailed"] = net_RO
.get("error_msg")
649 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
653 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
656 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
658 for db_vnfr
in db_vnfrs
.values():
659 vnfr_update
= {"status": "ERROR"}
660 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
661 if "status" not in vdur
:
662 vdur
["status"] = "ERROR"
663 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
665 vdur
["status-detailed"] = str(error_text
)
667 "vdur.{}.status-detailed".format(vdu_index
)
669 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
670 except DbException
as e
:
671 self
.logger
.error("Cannot update vnf. {}".format(e
))
673 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
675 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
676 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
677 :param nsr_desc_RO: nsr descriptor from RO
678 :return: Nothing, LcmException is raised on errors
680 for vnf_index
, db_vnfr
in db_vnfrs
.items():
681 for vnf_RO
in nsr_desc_RO
["vnfs"]:
682 if vnf_RO
["member_vnf_index"] != vnf_index
:
685 if vnf_RO
.get("ip_address"):
686 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
[
689 elif not db_vnfr
.get("ip-address"):
690 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
691 raise LcmExceptionNoMgmtIP(
692 "ns member_vnf_index '{}' has no IP address".format(
697 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
698 vdur_RO_count_index
= 0
699 if vdur
.get("pdu-type"):
701 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
702 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
704 if vdur
["count-index"] != vdur_RO_count_index
:
705 vdur_RO_count_index
+= 1
707 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
708 if vdur_RO
.get("ip_address"):
709 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
711 vdur
["ip-address"] = None
712 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
713 vdur
["name"] = vdur_RO
.get("vim_name")
714 vdur
["status"] = vdur_RO
.get("status")
715 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
716 for ifacer
in get_iterable(vdur
, "interfaces"):
717 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
718 if ifacer
["name"] == interface_RO
.get("internal_name"):
719 ifacer
["ip-address"] = interface_RO
.get(
722 ifacer
["mac-address"] = interface_RO
.get(
728 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
729 "from VIM info".format(
730 vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]
733 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
737 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
739 vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]
743 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
744 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
745 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
747 vld
["vim-id"] = net_RO
.get("vim_net_id")
748 vld
["name"] = net_RO
.get("vim_name")
749 vld
["status"] = net_RO
.get("status")
750 vld
["status-detailed"] = net_RO
.get("error_msg")
751 vnfr_update
["vld.{}".format(vld_index
)] = vld
755 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
760 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
765 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
770 def _get_ns_config_info(self
, nsr_id
):
772 Generates a mapping between vnf,vdu elements and the N2VC id
773 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
774 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
775 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
776 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
778 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
779 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
781 ns_config_info
= {"osm-config-mapping": mapping
}
782 for vca
in vca_deployed_list
:
783 if not vca
["member-vnf-index"]:
785 if not vca
["vdu_id"]:
786 mapping
[vca
["member-vnf-index"]] = vca
["application"]
790 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
792 ] = vca
["application"]
793 return ns_config_info
795 async def _instantiate_ng_ro(
812 def get_vim_account(vim_account_id
):
814 if vim_account_id
in db_vims
:
815 return db_vims
[vim_account_id
]
816 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
817 db_vims
[vim_account_id
] = db_vim
820 # modify target_vld info with instantiation parameters
821 def parse_vld_instantiation_params(
822 target_vim
, target_vld
, vld_params
, target_sdn
824 if vld_params
.get("ip-profile"):
825 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
[
828 if vld_params
.get("provider-network"):
829 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
832 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
833 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
836 if vld_params
.get("wimAccountId"):
837 target_wim
= "wim:{}".format(vld_params
["wimAccountId"])
838 target_vld
["vim_info"][target_wim
] = {}
839 for param
in ("vim-network-name", "vim-network-id"):
840 if vld_params
.get(param
):
841 if isinstance(vld_params
[param
], dict):
842 for vim
, vim_net
in vld_params
[param
].items():
843 other_target_vim
= "vim:" + vim
845 target_vld
["vim_info"],
846 (other_target_vim
, param
.replace("-", "_")),
849 else: # isinstance str
850 target_vld
["vim_info"][target_vim
][
851 param
.replace("-", "_")
852 ] = vld_params
[param
]
853 if vld_params
.get("common_id"):
854 target_vld
["common_id"] = vld_params
.get("common_id")
856 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
857 def update_ns_vld_target(target
, ns_params
):
858 for vnf_params
in ns_params
.get("vnf", ()):
859 if vnf_params
.get("vimAccountId"):
863 for vnfr
in db_vnfrs
.values()
864 if vnf_params
["member-vnf-index"]
865 == vnfr
["member-vnf-index-ref"]
869 vdur
= next((vdur
for vdur
in target_vnf
.get("vdur", ())), None)
870 for a_index
, a_vld
in enumerate(target
["ns"]["vld"]):
871 target_vld
= find_in_list(
872 get_iterable(vdur
, "interfaces"),
873 lambda iface
: iface
.get("ns-vld-id") == a_vld
["name"],
876 vld_params
= find_in_list(
877 get_iterable(ns_params
, "vld"),
878 lambda v_vld
: v_vld
["name"] in (a_vld
["name"], a_vld
["id"]),
882 if vnf_params
.get("vimAccountId") not in a_vld
.get(
885 target_vim_network_list
= [
886 v
for _
, v
in a_vld
.get("vim_info").items()
888 target_vim_network_name
= next(
890 item
.get("vim_network_name", "")
891 for item
in target_vim_network_list
896 target
["ns"]["vld"][a_index
].get("vim_info").update(
898 "vim:{}".format(vnf_params
["vimAccountId"]): {
899 "vim_network_name": target_vim_network_name
,
905 for param
in ("vim-network-name", "vim-network-id"):
906 if vld_params
.get(param
) and isinstance(
907 vld_params
[param
], dict
909 for vim
, vim_net
in vld_params
[
912 other_target_vim
= "vim:" + vim
914 target
["ns"]["vld"][a_index
].get(
919 param
.replace("-", "_"),
924 nslcmop_id
= db_nslcmop
["_id"]
926 "name": db_nsr
["name"],
929 "image": deepcopy(db_nsr
["image"]),
930 "flavor": deepcopy(db_nsr
["flavor"]),
931 "action_id": nslcmop_id
,
932 "cloud_init_content": {},
934 for image
in target
["image"]:
935 image
["vim_info"] = {}
936 for flavor
in target
["flavor"]:
937 flavor
["vim_info"] = {}
938 if db_nsr
.get("affinity-or-anti-affinity-group"):
939 target
["affinity-or-anti-affinity-group"] = deepcopy(db_nsr
["affinity-or-anti-affinity-group"])
940 for affinity_or_anti_affinity_group
in target
["affinity-or-anti-affinity-group"]:
941 affinity_or_anti_affinity_group
["vim_info"] = {}
943 if db_nslcmop
.get("lcmOperationType") != "instantiate":
944 # get parameters of instantiation:
945 db_nslcmop_instantiate
= self
.db
.get_list(
948 "nsInstanceId": db_nslcmop
["nsInstanceId"],
949 "lcmOperationType": "instantiate",
952 ns_params
= db_nslcmop_instantiate
.get("operationParams")
954 ns_params
= db_nslcmop
.get("operationParams")
955 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
956 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
959 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
960 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
964 "mgmt-network": vld
.get("mgmt-network", False),
965 "type": vld
.get("type"),
968 "vim_network_name": vld
.get("vim-network-name"),
969 "vim_account_id": ns_params
["vimAccountId"],
973 # check if this network needs SDN assist
974 if vld
.get("pci-interfaces"):
975 db_vim
= get_vim_account(ns_params
["vimAccountId"])
976 sdnc_id
= db_vim
["config"].get("sdn-controller")
978 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
979 target_sdn
= "sdn:{}".format(sdnc_id
)
980 target_vld
["vim_info"][target_sdn
] = {
982 "target_vim": target_vim
,
984 "type": vld
.get("type"),
987 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
988 for nsd_vnf_profile
in nsd_vnf_profiles
:
989 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
990 if cp
["virtual-link-profile-id"] == vld
["id"]:
992 "member_vnf:{}.{}".format(
993 cp
["constituent-cpd-id"][0][
994 "constituent-base-element-id"
996 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
998 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
1000 # check at nsd descriptor, if there is an ip-profile
1002 nsd_vlp
= find_in_list(
1003 get_virtual_link_profiles(nsd
),
1004 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
1009 and nsd_vlp
.get("virtual-link-protocol-data")
1010 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1012 ip_profile_source_data
= nsd_vlp
["virtual-link-protocol-data"][
1015 ip_profile_dest_data
= {}
1016 if "ip-version" in ip_profile_source_data
:
1017 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1020 if "cidr" in ip_profile_source_data
:
1021 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1024 if "gateway-ip" in ip_profile_source_data
:
1025 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
[
1028 if "dhcp-enabled" in ip_profile_source_data
:
1029 ip_profile_dest_data
["dhcp-params"] = {
1030 "enabled": ip_profile_source_data
["dhcp-enabled"]
1032 vld_params
["ip-profile"] = ip_profile_dest_data
1034 # update vld_params with instantiation params
1035 vld_instantiation_params
= find_in_list(
1036 get_iterable(ns_params
, "vld"),
1037 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
1039 if vld_instantiation_params
:
1040 vld_params
.update(vld_instantiation_params
)
1041 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
1042 target
["ns"]["vld"].append(target_vld
)
1043 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1044 update_ns_vld_target(target
, ns_params
)
1046 for vnfr
in db_vnfrs
.values():
1047 vnfd
= find_in_list(
1048 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
1050 vnf_params
= find_in_list(
1051 get_iterable(ns_params
, "vnf"),
1052 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
1054 target_vnf
= deepcopy(vnfr
)
1055 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
1056 for vld
in target_vnf
.get("vld", ()):
1057 # check if connected to a ns.vld, to fill target'
1058 vnf_cp
= find_in_list(
1059 vnfd
.get("int-virtual-link-desc", ()),
1060 lambda cpd
: cpd
.get("id") == vld
["id"],
1063 ns_cp
= "member_vnf:{}.{}".format(
1064 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
1066 if cp2target
.get(ns_cp
):
1067 vld
["target"] = cp2target
[ns_cp
]
1070 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
1072 # check if this network needs SDN assist
1074 if vld
.get("pci-interfaces"):
1075 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1076 sdnc_id
= db_vim
["config"].get("sdn-controller")
1078 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
1079 target_sdn
= "sdn:{}".format(sdnc_id
)
1080 vld
["vim_info"][target_sdn
] = {
1082 "target_vim": target_vim
,
1084 "type": vld
.get("type"),
1087 # check at vnfd descriptor, if there is an ip-profile
1089 vnfd_vlp
= find_in_list(
1090 get_virtual_link_profiles(vnfd
),
1091 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
1095 and vnfd_vlp
.get("virtual-link-protocol-data")
1096 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1098 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"][
1101 ip_profile_dest_data
= {}
1102 if "ip-version" in ip_profile_source_data
:
1103 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1106 if "cidr" in ip_profile_source_data
:
1107 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1110 if "gateway-ip" in ip_profile_source_data
:
1111 ip_profile_dest_data
[
1113 ] = ip_profile_source_data
["gateway-ip"]
1114 if "dhcp-enabled" in ip_profile_source_data
:
1115 ip_profile_dest_data
["dhcp-params"] = {
1116 "enabled": ip_profile_source_data
["dhcp-enabled"]
1119 vld_params
["ip-profile"] = ip_profile_dest_data
1120 # update vld_params with instantiation params
1122 vld_instantiation_params
= find_in_list(
1123 get_iterable(vnf_params
, "internal-vld"),
1124 lambda i_vld
: i_vld
["name"] == vld
["id"],
1126 if vld_instantiation_params
:
1127 vld_params
.update(vld_instantiation_params
)
1128 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
1131 for vdur
in target_vnf
.get("vdur", ()):
1132 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
1133 continue # This vdu must not be created
1134 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
1136 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
1139 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
1140 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
1143 and vdu_configuration
.get("config-access")
1144 and vdu_configuration
.get("config-access").get("ssh-access")
1146 vdur
["ssh-keys"] = ssh_keys_all
1147 vdur
["ssh-access-required"] = vdu_configuration
[
1149 ]["ssh-access"]["required"]
1152 and vnf_configuration
.get("config-access")
1153 and vnf_configuration
.get("config-access").get("ssh-access")
1154 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
1156 vdur
["ssh-keys"] = ssh_keys_all
1157 vdur
["ssh-access-required"] = vnf_configuration
[
1159 ]["ssh-access"]["required"]
1160 elif ssh_keys_instantiation
and find_in_list(
1161 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
1163 vdur
["ssh-keys"] = ssh_keys_instantiation
1165 self
.logger
.debug("NS > vdur > {}".format(vdur
))
1167 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1169 if vdud
.get("cloud-init-file"):
1170 vdur
["cloud-init"] = "{}:file:{}".format(
1171 vnfd
["_id"], vdud
.get("cloud-init-file")
1173 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1174 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1175 base_folder
= vnfd
["_admin"]["storage"]
1176 if base_folder
["pkg-dir"]:
1177 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1178 base_folder
["folder"],
1179 base_folder
["pkg-dir"],
1180 vdud
.get("cloud-init-file"),
1183 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
1184 base_folder
["folder"],
1185 vdud
.get("cloud-init-file"),
1187 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1188 target
["cloud_init_content"][
1191 elif vdud
.get("cloud-init"):
1192 vdur
["cloud-init"] = "{}:vdu:{}".format(
1193 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1195 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1196 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1199 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1200 deploy_params_vdu
= self
._format
_additional
_params
(
1201 vdur
.get("additionalParams") or {}
1203 deploy_params_vdu
["OSM"] = get_osm_params(
1204 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1206 vdur
["additionalParams"] = deploy_params_vdu
1209 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1210 if target_vim
not in ns_flavor
["vim_info"]:
1211 ns_flavor
["vim_info"][target_vim
] = {}
1214 # in case alternative images are provided we must check if they should be applied
1215 # for the vim_type, modify the vim_type taking into account
1216 ns_image_id
= int(vdur
["ns-image-id"])
1217 if vdur
.get("alt-image-ids"):
1218 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1219 vim_type
= db_vim
["vim_type"]
1220 for alt_image_id
in vdur
.get("alt-image-ids"):
1221 ns_alt_image
= target
["image"][int(alt_image_id
)]
1222 if vim_type
== ns_alt_image
.get("vim-type"):
1223 # must use alternative image
1225 "use alternative image id: {}".format(alt_image_id
)
1227 ns_image_id
= alt_image_id
1228 vdur
["ns-image-id"] = ns_image_id
1230 ns_image
= target
["image"][int(ns_image_id
)]
1231 if target_vim
not in ns_image
["vim_info"]:
1232 ns_image
["vim_info"][target_vim
] = {}
1235 if vdur
.get("affinity-or-anti-affinity-group-id"):
1236 for ags_id
in vdur
["affinity-or-anti-affinity-group-id"]:
1237 ns_ags
= target
["affinity-or-anti-affinity-group"][int(ags_id
)]
1238 if target_vim
not in ns_ags
["vim_info"]:
1239 ns_ags
["vim_info"][target_vim
] = {}
1241 vdur
["vim_info"] = {target_vim
: {}}
1242 # instantiation parameters
1244 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
1245 # vdud["id"]), None)
1246 vdur_list
.append(vdur
)
1247 target_vnf
["vdur"] = vdur_list
1248 target
["vnf"].append(target_vnf
)
1250 desc
= await self
.RO
.deploy(nsr_id
, target
)
1251 self
.logger
.debug("RO return > {}".format(desc
))
1252 action_id
= desc
["action_id"]
1253 await self
._wait
_ng
_ro
(
1254 nsr_id
, action_id
, nslcmop_id
, start_deploy
, timeout_ns_deploy
, stage
1259 "_admin.deployed.RO.operational-status": "running",
1260 "detailed-status": " ".join(stage
),
1262 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1263 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1264 self
._write
_op
_status
(nslcmop_id
, stage
)
1266 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1270 async def _wait_ng_ro(
1279 detailed_status_old
= None
1281 start_time
= start_time
or time()
1282 while time() <= start_time
+ timeout
:
1283 desc_status
= await self
.RO
.status(nsr_id
, action_id
)
1284 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1285 if desc_status
["status"] == "FAILED":
1286 raise NgRoException(desc_status
["details"])
1287 elif desc_status
["status"] == "BUILD":
1289 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1290 elif desc_status
["status"] == "DONE":
1292 stage
[2] = "Deployed at VIM"
1295 assert False, "ROclient.check_ns_status returns unknown {}".format(
1296 desc_status
["status"]
1298 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1299 detailed_status_old
= stage
[2]
1300 db_nsr_update
["detailed-status"] = " ".join(stage
)
1301 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1302 self
._write
_op
_status
(nslcmop_id
, stage
)
1303 await asyncio
.sleep(15, loop
=self
.loop
)
1304 else: # timeout_ns_deploy
1305 raise NgRoException("Timeout waiting ns to deploy")
1307 async def _terminate_ng_ro(
1308 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1313 start_deploy
= time()
1320 "action_id": nslcmop_id
,
1322 desc
= await self
.RO
.deploy(nsr_id
, target
)
1323 action_id
= desc
["action_id"]
1324 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1325 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1328 + "ns terminate action at RO. action_id={}".format(action_id
)
1332 delete_timeout
= 20 * 60 # 20 minutes
1333 await self
._wait
_ng
_ro
(
1334 nsr_id
, action_id
, nslcmop_id
, start_deploy
, delete_timeout
, stage
1337 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1338 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1340 await self
.RO
.delete(nsr_id
)
1341 except Exception as e
:
1342 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
1343 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1344 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1345 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1347 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1349 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
1350 failed_detail
.append("delete conflict: {}".format(e
))
1353 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1356 failed_detail
.append("delete error: {}".format(e
))
1359 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1363 stage
[2] = "Error deleting from VIM"
1365 stage
[2] = "Deleted from VIM"
1366 db_nsr_update
["detailed-status"] = " ".join(stage
)
1367 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1368 self
._write
_op
_status
(nslcmop_id
, stage
)
1371 raise LcmException("; ".join(failed_detail
))
1374 async def instantiate_RO(
1388 :param logging_text: preffix text to use at logging
1389 :param nsr_id: nsr identity
1390 :param nsd: database content of ns descriptor
1391 :param db_nsr: database content of ns record
1392 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1394 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1395 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1396 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1397 :return: None or exception
1400 start_deploy
= time()
1401 ns_params
= db_nslcmop
.get("operationParams")
1402 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1403 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1405 timeout_ns_deploy
= self
.timeout
.get(
1406 "ns_deploy", self
.timeout_ns_deploy
1409 # Check for and optionally request placement optimization. Database will be updated if placement activated
1410 stage
[2] = "Waiting for Placement."
1411 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1412 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1413 for vnfr
in db_vnfrs
.values():
1414 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1417 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1419 return await self
._instantiate
_ng
_ro
(
1432 except Exception as e
:
1433 stage
[2] = "ERROR deploying at VIM"
1434 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1436 "Error deploying at VIM {}".format(e
),
1437 exc_info
=not isinstance(
1440 ROclient
.ROClientException
,
1449 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1451 Wait for kdu to be up, get ip address
1452 :param logging_text: prefix use for logging
1459 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1462 while nb_tries
< 360:
1463 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1467 for x
in get_iterable(db_vnfr
, "kdur")
1468 if x
.get("kdu-name") == kdu_name
1474 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1476 if kdur
.get("status"):
1477 if kdur
["status"] in ("READY", "ENABLED"):
1478 return kdur
.get("ip-address")
1481 "target KDU={} is in error state".format(kdu_name
)
1484 await asyncio
.sleep(10, loop
=self
.loop
)
1486 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1488 async def wait_vm_up_insert_key_ro(
1489 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1492 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1493 :param logging_text: prefix use for logging
1498 :param pub_key: public ssh key to inject, None to skip
1499 :param user: user to apply the public ssh key
1503 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1507 target_vdu_id
= None
1513 if ro_retries
>= 360: # 1 hour
1515 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1518 await asyncio
.sleep(10, loop
=self
.loop
)
1521 if not target_vdu_id
:
1522 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1524 if not vdu_id
: # for the VNF case
1525 if db_vnfr
.get("status") == "ERROR":
1527 "Cannot inject ssh-key because target VNF is in error state"
1529 ip_address
= db_vnfr
.get("ip-address")
1535 for x
in get_iterable(db_vnfr
, "vdur")
1536 if x
.get("ip-address") == ip_address
1544 for x
in get_iterable(db_vnfr
, "vdur")
1545 if x
.get("vdu-id-ref") == vdu_id
1546 and x
.get("count-index") == vdu_index
1552 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1553 ): # If only one, this should be the target vdu
1554 vdur
= db_vnfr
["vdur"][0]
1557 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1558 vnfr_id
, vdu_id
, vdu_index
1561 # New generation RO stores information at "vim_info"
1564 if vdur
.get("vim_info"):
1566 t
for t
in vdur
["vim_info"]
1567 ) # there should be only one key
1568 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1570 vdur
.get("pdu-type")
1571 or vdur
.get("status") == "ACTIVE"
1572 or ng_ro_status
== "ACTIVE"
1574 ip_address
= vdur
.get("ip-address")
1577 target_vdu_id
= vdur
["vdu-id-ref"]
1578 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1580 "Cannot inject ssh-key because target VM is in error state"
1583 if not target_vdu_id
:
1586 # inject public key into machine
1587 if pub_key
and user
:
1588 self
.logger
.debug(logging_text
+ "Inserting RO key")
1589 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1590 if vdur
.get("pdu-type"):
1591 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1594 ro_vm_id
= "{}-{}".format(
1595 db_vnfr
["member-vnf-index-ref"], target_vdu_id
1596 ) # TODO add vdu_index
1600 "action": "inject_ssh_key",
1604 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1606 desc
= await self
.RO
.deploy(nsr_id
, target
)
1607 action_id
= desc
["action_id"]
1608 await self
._wait
_ng
_ro
(nsr_id
, action_id
, timeout
=600)
1611 # wait until NS is deployed at RO
1613 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1614 ro_nsr_id
= deep_get(
1615 db_nsrs
, ("_admin", "deployed", "RO", "nsr_id")
1619 result_dict
= await self
.RO
.create_action(
1621 item_id_name
=ro_nsr_id
,
1623 "add_public_key": pub_key
,
1628 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1629 if not result_dict
or not isinstance(result_dict
, dict):
1631 "Unknown response from RO when injecting key"
1633 for result
in result_dict
.values():
1634 if result
.get("vim_result") == 200:
1637 raise ROclient
.ROClientException(
1638 "error injecting key: {}".format(
1639 result
.get("description")
1643 except NgRoException
as e
:
1645 "Reaching max tries injecting key. Error: {}".format(e
)
1647 except ROclient
.ROClientException
as e
:
1651 + "error injecting key: {}. Retrying until {} seconds".format(
1658 "Reaching max tries injecting key. Error: {}".format(e
)
1665 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1667 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1669 my_vca
= vca_deployed_list
[vca_index
]
1670 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1671 # vdu or kdu: no dependencies
1675 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1676 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1677 configuration_status_list
= db_nsr
["configurationStatus"]
1678 for index
, vca_deployed
in enumerate(configuration_status_list
):
1679 if index
== vca_index
:
1682 if not my_vca
.get("member-vnf-index") or (
1683 vca_deployed
.get("member-vnf-index")
1684 == my_vca
.get("member-vnf-index")
1686 internal_status
= configuration_status_list
[index
].get("status")
1687 if internal_status
== "READY":
1689 elif internal_status
== "BROKEN":
1691 "Configuration aborted because dependent charm/s has failed"
1696 # no dependencies, return
1698 await asyncio
.sleep(10)
1701 raise LcmException("Configuration aborted because dependent charm/s timeout")
1703 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1706 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1708 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1709 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1712 async def instantiate_N2VC(
1729 ee_config_descriptor
,
1731 nsr_id
= db_nsr
["_id"]
1732 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1733 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1734 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1735 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1737 "collection": "nsrs",
1738 "filter": {"_id": nsr_id
},
1739 "path": db_update_entry
,
1745 element_under_configuration
= nsr_id
1749 vnfr_id
= db_vnfr
["_id"]
1750 osm_config
["osm"]["vnf_id"] = vnfr_id
1752 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1754 if vca_type
== "native_charm":
1757 index_number
= vdu_index
or 0
1760 element_type
= "VNF"
1761 element_under_configuration
= vnfr_id
1762 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1764 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1765 element_type
= "VDU"
1766 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1767 osm_config
["osm"]["vdu_id"] = vdu_id
1769 namespace
+= ".{}".format(kdu_name
)
1770 element_type
= "KDU"
1771 element_under_configuration
= kdu_name
1772 osm_config
["osm"]["kdu_name"] = kdu_name
1775 if base_folder
["pkg-dir"]:
1776 artifact_path
= "{}/{}/{}/{}".format(
1777 base_folder
["folder"],
1778 base_folder
["pkg-dir"],
1780 if vca_type
in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1785 artifact_path
= "{}/Scripts/{}/{}/".format(
1786 base_folder
["folder"],
1788 if vca_type
in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1793 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1795 # get initial_config_primitive_list that applies to this element
1796 initial_config_primitive_list
= config_descriptor
.get(
1797 "initial-config-primitive"
1801 "Initial config primitive list > {}".format(
1802 initial_config_primitive_list
1806 # add config if not present for NS charm
1807 ee_descriptor_id
= ee_config_descriptor
.get("id")
1808 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1809 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1810 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1814 "Initial config primitive list #2 > {}".format(
1815 initial_config_primitive_list
1818 # n2vc_redesign STEP 3.1
1819 # find old ee_id if exists
1820 ee_id
= vca_deployed
.get("ee_id")
1822 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1823 # create or register execution environment in VCA
1824 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1826 self
._write
_configuration
_status
(
1828 vca_index
=vca_index
,
1830 element_under_configuration
=element_under_configuration
,
1831 element_type
=element_type
,
1834 step
= "create execution environment"
1835 self
.logger
.debug(logging_text
+ step
)
1839 if vca_type
== "k8s_proxy_charm":
1840 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1841 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1842 namespace
=namespace
,
1843 artifact_path
=artifact_path
,
1847 elif vca_type
== "helm" or vca_type
== "helm-v3":
1848 ee_id
, credentials
= await self
.vca_map
[
1850 ].create_execution_environment(
1851 namespace
=namespace
,
1855 artifact_path
=artifact_path
,
1859 ee_id
, credentials
= await self
.vca_map
[
1861 ].create_execution_environment(
1862 namespace
=namespace
,
1868 elif vca_type
== "native_charm":
1869 step
= "Waiting to VM being up and getting IP address"
1870 self
.logger
.debug(logging_text
+ step
)
1871 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1880 credentials
= {"hostname": rw_mgmt_ip
}
1882 username
= deep_get(
1883 config_descriptor
, ("config-access", "ssh-access", "default-user")
1885 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1886 # merged. Meanwhile let's get username from initial-config-primitive
1887 if not username
and initial_config_primitive_list
:
1888 for config_primitive
in initial_config_primitive_list
:
1889 for param
in config_primitive
.get("parameter", ()):
1890 if param
["name"] == "ssh-username":
1891 username
= param
["value"]
1895 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1896 "'config-access.ssh-access.default-user'"
1898 credentials
["username"] = username
1899 # n2vc_redesign STEP 3.2
1901 self
._write
_configuration
_status
(
1903 vca_index
=vca_index
,
1904 status
="REGISTERING",
1905 element_under_configuration
=element_under_configuration
,
1906 element_type
=element_type
,
1909 step
= "register execution environment {}".format(credentials
)
1910 self
.logger
.debug(logging_text
+ step
)
1911 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1912 credentials
=credentials
,
1913 namespace
=namespace
,
1918 # for compatibility with MON/POL modules, the need model and application name at database
1919 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1920 ee_id_parts
= ee_id
.split(".")
1921 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1922 if len(ee_id_parts
) >= 2:
1923 model_name
= ee_id_parts
[0]
1924 application_name
= ee_id_parts
[1]
1925 db_nsr_update
[db_update_entry
+ "model"] = model_name
1926 db_nsr_update
[db_update_entry
+ "application"] = application_name
1928 # n2vc_redesign STEP 3.3
1929 step
= "Install configuration Software"
1931 self
._write
_configuration
_status
(
1933 vca_index
=vca_index
,
1934 status
="INSTALLING SW",
1935 element_under_configuration
=element_under_configuration
,
1936 element_type
=element_type
,
1937 other_update
=db_nsr_update
,
1940 # TODO check if already done
1941 self
.logger
.debug(logging_text
+ step
)
1943 if vca_type
== "native_charm":
1944 config_primitive
= next(
1945 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
1948 if config_primitive
:
1949 config
= self
._map
_primitive
_params
(
1950 config_primitive
, {}, deploy_params
1953 if vca_type
== "lxc_proxy_charm":
1954 if element_type
== "NS":
1955 num_units
= db_nsr
.get("config-units") or 1
1956 elif element_type
== "VNF":
1957 num_units
= db_vnfr
.get("config-units") or 1
1958 elif element_type
== "VDU":
1959 for v
in db_vnfr
["vdur"]:
1960 if vdu_id
== v
["vdu-id-ref"]:
1961 num_units
= v
.get("config-units") or 1
1963 if vca_type
!= "k8s_proxy_charm":
1964 await self
.vca_map
[vca_type
].install_configuration_sw(
1966 artifact_path
=artifact_path
,
1969 num_units
=num_units
,
1974 # write in db flag of configuration_sw already installed
1976 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
1979 # add relations for this VCA (wait for other peers related with this VCA)
1980 await self
._add
_vca
_relations
(
1981 logging_text
=logging_text
,
1984 vca_index
=vca_index
,
1987 # if SSH access is required, then get execution environment SSH public
1988 # if native charm we have waited already to VM be UP
1989 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1992 # self.logger.debug("get ssh key block")
1994 config_descriptor
, ("config-access", "ssh-access", "required")
1996 # self.logger.debug("ssh key needed")
1997 # Needed to inject a ssh key
2000 ("config-access", "ssh-access", "default-user"),
2002 step
= "Install configuration Software, getting public ssh key"
2003 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
2004 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
2007 step
= "Insert public key into VM user={} ssh_key={}".format(
2011 # self.logger.debug("no need to get ssh key")
2012 step
= "Waiting to VM being up and getting IP address"
2013 self
.logger
.debug(logging_text
+ step
)
2015 # default rw_mgmt_ip to None, avoiding the non definition of the variable
2018 # n2vc_redesign STEP 5.1
2019 # wait for RO (ip-address) Insert pub_key into VM
2022 rw_mgmt_ip
= await self
.wait_kdu_up(
2023 logging_text
, nsr_id
, vnfr_id
, kdu_name
2026 # This verification is needed in order to avoid trying to add a public key
2027 # to a VM, when the VNF is a KNF (in the edge case where the user creates a VCA
2028 # for a KNF and not for its KDUs, the previous verification gives False, and the code
2029 # jumps to this block, meaning that there is the need to verify if the VNF is actually a VNF
2031 elif db_vnfr
.get('vdur'):
2032 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
2042 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
2044 # store rw_mgmt_ip in deploy params for later replacement
2045 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
2047 # n2vc_redesign STEP 6 Execute initial config primitive
2048 step
= "execute initial config primitive"
2050 # wait for dependent primitives execution (NS -> VNF -> VDU)
2051 if initial_config_primitive_list
:
2052 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
2054 # stage, in function of element type: vdu, kdu, vnf or ns
2055 my_vca
= vca_deployed_list
[vca_index
]
2056 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
2058 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
2059 elif my_vca
.get("member-vnf-index"):
2061 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
2064 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
2066 self
._write
_configuration
_status
(
2067 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
2070 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2072 check_if_terminated_needed
= True
2073 for initial_config_primitive
in initial_config_primitive_list
:
2074 # adding information on the vca_deployed if it is a NS execution environment
2075 if not vca_deployed
["member-vnf-index"]:
2076 deploy_params
["ns_config_info"] = json
.dumps(
2077 self
._get
_ns
_config
_info
(nsr_id
)
2079 # TODO check if already done
2080 primitive_params_
= self
._map
_primitive
_params
(
2081 initial_config_primitive
, {}, deploy_params
2084 step
= "execute primitive '{}' params '{}'".format(
2085 initial_config_primitive
["name"], primitive_params_
2087 self
.logger
.debug(logging_text
+ step
)
2088 await self
.vca_map
[vca_type
].exec_primitive(
2090 primitive_name
=initial_config_primitive
["name"],
2091 params_dict
=primitive_params_
,
2096 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2097 if check_if_terminated_needed
:
2098 if config_descriptor
.get("terminate-config-primitive"):
2100 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
2102 check_if_terminated_needed
= False
2104 # TODO register in database that primitive is done
2106 # STEP 7 Configure metrics
2107 if vca_type
== "helm" or vca_type
== "helm-v3":
2108 prometheus_jobs
= await self
.extract_prometheus_scrape_jobs(
2110 artifact_path
=artifact_path
,
2111 ee_config_descriptor
=ee_config_descriptor
,
2114 target_ip
=rw_mgmt_ip
,
2120 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
2123 for job
in prometheus_jobs
:
2127 "job_name": job
["job_name"]
2131 fail_on_empty
=False,
2134 step
= "instantiated at VCA"
2135 self
.logger
.debug(logging_text
+ step
)
2137 self
._write
_configuration
_status
(
2138 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
2141 except Exception as e
: # TODO not use Exception but N2VC exception
2142 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2144 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
2147 "Exception while {} : {}".format(step
, e
), exc_info
=True
2149 self
._write
_configuration
_status
(
2150 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2152 raise LcmException("{} {}".format(step
, e
)) from e
2154 def _write_ns_status(
2158 current_operation
: str,
2159 current_operation_id
: str,
2160 error_description
: str = None,
2161 error_detail
: str = None,
2162 other_update
: dict = None,
2165 Update db_nsr fields.
2168 :param current_operation:
2169 :param current_operation_id:
2170 :param error_description:
2171 :param error_detail:
2172 :param other_update: Other required changes at database if provided, will be cleared
2176 db_dict
= other_update
or {}
2179 ] = current_operation_id
# for backward compatibility
2180 db_dict
["_admin.current-operation"] = current_operation_id
2181 db_dict
["_admin.operation-type"] = (
2182 current_operation
if current_operation
!= "IDLE" else None
2184 db_dict
["currentOperation"] = current_operation
2185 db_dict
["currentOperationID"] = current_operation_id
2186 db_dict
["errorDescription"] = error_description
2187 db_dict
["errorDetail"] = error_detail
2190 db_dict
["nsState"] = ns_state
2191 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2192 except DbException
as e
:
2193 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2195 def _write_op_status(
2199 error_message
: str = None,
2200 queuePosition
: int = 0,
2201 operation_state
: str = None,
2202 other_update
: dict = None,
2205 db_dict
= other_update
or {}
2206 db_dict
["queuePosition"] = queuePosition
2207 if isinstance(stage
, list):
2208 db_dict
["stage"] = stage
[0]
2209 db_dict
["detailed-status"] = " ".join(stage
)
2210 elif stage
is not None:
2211 db_dict
["stage"] = str(stage
)
2213 if error_message
is not None:
2214 db_dict
["errorMessage"] = error_message
2215 if operation_state
is not None:
2216 db_dict
["operationState"] = operation_state
2217 db_dict
["statusEnteredTime"] = time()
2218 self
.update_db_2("nslcmops", op_id
, db_dict
)
2219 except DbException
as e
:
2221 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2224 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2226 nsr_id
= db_nsr
["_id"]
2227 # configurationStatus
2228 config_status
= db_nsr
.get("configurationStatus")
2231 "configurationStatus.{}.status".format(index
): status
2232 for index
, v
in enumerate(config_status
)
2236 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2238 except DbException
as e
:
2240 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2243 def _write_configuration_status(
2248 element_under_configuration
: str = None,
2249 element_type
: str = None,
2250 other_update
: dict = None,
2253 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2254 # .format(vca_index, status))
2257 db_path
= "configurationStatus.{}.".format(vca_index
)
2258 db_dict
= other_update
or {}
2260 db_dict
[db_path
+ "status"] = status
2261 if element_under_configuration
:
2263 db_path
+ "elementUnderConfiguration"
2264 ] = element_under_configuration
2266 db_dict
[db_path
+ "elementType"] = element_type
2267 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2268 except DbException
as e
:
2270 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2271 status
, nsr_id
, vca_index
, e
2275 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2277 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2278 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2279 Database is used because the result can be obtained from a different LCM worker in case of HA.
2280 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2281 :param db_nslcmop: database content of nslcmop
2282 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2283 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2284 computed 'vim-account-id'
2287 nslcmop_id
= db_nslcmop
["_id"]
2288 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2289 if placement_engine
== "PLA":
2291 logging_text
+ "Invoke and wait for placement optimization"
2293 await self
.msg
.aiowrite(
2294 "pla", "get_placement", {"nslcmopId": nslcmop_id
}, loop
=self
.loop
2296 db_poll_interval
= 5
2297 wait
= db_poll_interval
* 10
2299 while not pla_result
and wait
>= 0:
2300 await asyncio
.sleep(db_poll_interval
)
2301 wait
-= db_poll_interval
2302 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2303 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2307 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2310 for pla_vnf
in pla_result
["vnf"]:
2311 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2312 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2317 {"_id": vnfr
["_id"]},
2318 {"vim-account-id": pla_vnf
["vimAccountId"]},
2321 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2324 def update_nsrs_with_pla_result(self
, params
):
2326 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2328 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2330 except Exception as e
:
2331 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2333 async def instantiate(self
, nsr_id
, nslcmop_id
):
2336 :param nsr_id: ns instance to deploy
2337 :param nslcmop_id: operation to run
2341 # Try to lock HA task here
2342 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2343 if not task_is_locked_by_me
:
2345 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2349 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2350 self
.logger
.debug(logging_text
+ "Enter")
2352 # get all needed from database
2354 # database nsrs record
2357 # database nslcmops record
2360 # update operation on nsrs
2362 # update operation on nslcmops
2363 db_nslcmop_update
= {}
2365 nslcmop_operation_state
= None
2366 db_vnfrs
= {} # vnf's info indexed by member-index
2368 tasks_dict_info
= {} # from task to info text
2372 "Stage 1/5: preparation of the environment.",
2373 "Waiting for previous operations to terminate.",
2376 # ^ stage, step, VIM progress
2378 # wait for any previous tasks in process
2379 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2381 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2382 stage
[1] = "Reading from database."
2383 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2384 db_nsr_update
["detailed-status"] = "creating"
2385 db_nsr_update
["operational-status"] = "init"
2386 self
._write
_ns
_status
(
2388 ns_state
="BUILDING",
2389 current_operation
="INSTANTIATING",
2390 current_operation_id
=nslcmop_id
,
2391 other_update
=db_nsr_update
,
2393 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2395 # read from db: operation
2396 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2397 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2398 if db_nslcmop
["operationParams"].get("additionalParamsForVnf"):
2399 db_nslcmop
["operationParams"]["additionalParamsForVnf"] = json
.loads(
2400 db_nslcmop
["operationParams"]["additionalParamsForVnf"]
2402 ns_params
= db_nslcmop
.get("operationParams")
2403 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2404 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2406 timeout_ns_deploy
= self
.timeout
.get(
2407 "ns_deploy", self
.timeout_ns_deploy
2411 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2412 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2413 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2414 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2415 self
.fs
.sync(db_nsr
["nsd-id"])
2417 # nsr_name = db_nsr["name"] # TODO short-name??
2419 # read from db: vnf's of this ns
2420 stage
[1] = "Getting vnfrs from db."
2421 self
.logger
.debug(logging_text
+ stage
[1])
2422 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2424 # read from db: vnfd's for every vnf
2425 db_vnfds
= [] # every vnfd data
2427 # for each vnf in ns, read vnfd
2428 for vnfr
in db_vnfrs_list
:
2429 if vnfr
.get("kdur"):
2431 for kdur
in vnfr
["kdur"]:
2432 if kdur
.get("additionalParams"):
2433 kdur
["additionalParams"] = json
.loads(
2434 kdur
["additionalParams"]
2436 kdur_list
.append(kdur
)
2437 vnfr
["kdur"] = kdur_list
2439 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2440 vnfd_id
= vnfr
["vnfd-id"]
2441 vnfd_ref
= vnfr
["vnfd-ref"]
2442 self
.fs
.sync(vnfd_id
)
2444 # if we haven't this vnfd, read it from db
2445 if vnfd_id
not in db_vnfds
:
2447 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2450 self
.logger
.debug(logging_text
+ stage
[1])
2451 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2454 db_vnfds
.append(vnfd
)
2456 # Get or generates the _admin.deployed.VCA list
2457 vca_deployed_list
= None
2458 if db_nsr
["_admin"].get("deployed"):
2459 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2460 if vca_deployed_list
is None:
2461 vca_deployed_list
= []
2462 configuration_status_list
= []
2463 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2464 db_nsr_update
["configurationStatus"] = configuration_status_list
2465 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2466 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2467 elif isinstance(vca_deployed_list
, dict):
2468 # maintain backward compatibility. Change a dict to list at database
2469 vca_deployed_list
= list(vca_deployed_list
.values())
2470 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2471 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2474 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2476 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2477 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2479 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2480 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2481 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2483 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2486 # n2vc_redesign STEP 2 Deploy Network Scenario
2487 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2488 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2490 stage
[1] = "Deploying KDUs."
2491 # self.logger.debug(logging_text + "Before deploy_kdus")
2492 # Call to deploy_kdus in case exists the "vdu:kdu" param
2493 await self
.deploy_kdus(
2494 logging_text
=logging_text
,
2496 nslcmop_id
=nslcmop_id
,
2499 task_instantiation_info
=tasks_dict_info
,
2502 stage
[1] = "Getting VCA public key."
2503 # n2vc_redesign STEP 1 Get VCA public ssh-key
2504 # feature 1429. Add n2vc public key to needed VMs
2505 n2vc_key
= self
.n2vc
.get_public_key()
2506 n2vc_key_list
= [n2vc_key
]
2507 if self
.vca_config
.get("public_key"):
2508 n2vc_key_list
.append(self
.vca_config
["public_key"])
2510 stage
[1] = "Deploying NS at VIM."
2511 task_ro
= asyncio
.ensure_future(
2512 self
.instantiate_RO(
2513 logging_text
=logging_text
,
2517 db_nslcmop
=db_nslcmop
,
2520 n2vc_key_list
=n2vc_key_list
,
2524 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2525 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2527 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2528 stage
[1] = "Deploying Execution Environments."
2529 self
.logger
.debug(logging_text
+ stage
[1])
2531 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2532 for vnf_profile
in get_vnf_profiles(nsd
):
2533 vnfd_id
= vnf_profile
["vnfd-id"]
2534 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2535 member_vnf_index
= str(vnf_profile
["id"])
2536 db_vnfr
= db_vnfrs
[member_vnf_index
]
2537 base_folder
= vnfd
["_admin"]["storage"]
2543 # Get additional parameters
2544 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2545 if db_vnfr
.get("additionalParamsForVnf"):
2546 deploy_params
.update(
2547 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2550 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2551 if descriptor_config
:
2553 logging_text
=logging_text
2554 + "member_vnf_index={} ".format(member_vnf_index
),
2557 nslcmop_id
=nslcmop_id
,
2563 member_vnf_index
=member_vnf_index
,
2564 vdu_index
=vdu_index
,
2566 deploy_params
=deploy_params
,
2567 descriptor_config
=descriptor_config
,
2568 base_folder
=base_folder
,
2569 task_instantiation_info
=tasks_dict_info
,
2573 # Deploy charms for each VDU that supports one.
2574 for vdud
in get_vdu_list(vnfd
):
2576 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2577 vdur
= find_in_list(
2578 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2581 if vdur
.get("additionalParams"):
2582 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2584 deploy_params_vdu
= deploy_params
2585 deploy_params_vdu
["OSM"] = get_osm_params(
2586 db_vnfr
, vdu_id
, vdu_count_index
=0
2588 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2590 self
.logger
.debug("VDUD > {}".format(vdud
))
2592 "Descriptor config > {}".format(descriptor_config
)
2594 if descriptor_config
:
2597 for vdu_index
in range(vdud_count
):
2598 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2600 logging_text
=logging_text
2601 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2602 member_vnf_index
, vdu_id
, vdu_index
2606 nslcmop_id
=nslcmop_id
,
2612 member_vnf_index
=member_vnf_index
,
2613 vdu_index
=vdu_index
,
2615 deploy_params
=deploy_params_vdu
,
2616 descriptor_config
=descriptor_config
,
2617 base_folder
=base_folder
,
2618 task_instantiation_info
=tasks_dict_info
,
2621 for kdud
in get_kdu_list(vnfd
):
2622 kdu_name
= kdud
["name"]
2623 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2624 if descriptor_config
:
2629 x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
2631 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2632 if kdur
.get("additionalParams"):
2633 deploy_params_kdu
.update(
2634 parse_yaml_strings(kdur
["additionalParams"].copy())
2638 logging_text
=logging_text
,
2641 nslcmop_id
=nslcmop_id
,
2647 member_vnf_index
=member_vnf_index
,
2648 vdu_index
=vdu_index
,
2650 deploy_params
=deploy_params_kdu
,
2651 descriptor_config
=descriptor_config
,
2652 base_folder
=base_folder
,
2653 task_instantiation_info
=tasks_dict_info
,
2657 # Check if this NS has a charm configuration
2658 descriptor_config
= nsd
.get("ns-configuration")
2659 if descriptor_config
and descriptor_config
.get("juju"):
2662 member_vnf_index
= None
2668 # Get additional parameters
2669 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2670 if db_nsr
.get("additionalParamsForNs"):
2671 deploy_params
.update(
2672 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2674 base_folder
= nsd
["_admin"]["storage"]
2676 logging_text
=logging_text
,
2679 nslcmop_id
=nslcmop_id
,
2685 member_vnf_index
=member_vnf_index
,
2686 vdu_index
=vdu_index
,
2688 deploy_params
=deploy_params
,
2689 descriptor_config
=descriptor_config
,
2690 base_folder
=base_folder
,
2691 task_instantiation_info
=tasks_dict_info
,
2695 # rest of staff will be done at finally
2698 ROclient
.ROClientException
,
2704 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
2707 except asyncio
.CancelledError
:
2709 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
2711 exc
= "Operation was cancelled"
2712 except Exception as e
:
2713 exc
= traceback
.format_exc()
2714 self
.logger
.critical(
2715 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
2720 error_list
.append(str(exc
))
2722 # wait for pending tasks
2724 stage
[1] = "Waiting for instantiate pending tasks."
2725 self
.logger
.debug(logging_text
+ stage
[1])
2726 error_list
+= await self
._wait
_for
_tasks
(
2734 stage
[1] = stage
[2] = ""
2735 except asyncio
.CancelledError
:
2736 error_list
.append("Cancelled")
2737 # TODO cancel all tasks
2738 except Exception as exc
:
2739 error_list
.append(str(exc
))
2741 # update operation-status
2742 db_nsr_update
["operational-status"] = "running"
2743 # let's begin with VCA 'configured' status (later we can change it)
2744 db_nsr_update
["config-status"] = "configured"
2745 for task
, task_name
in tasks_dict_info
.items():
2746 if not task
.done() or task
.cancelled() or task
.exception():
2747 if task_name
.startswith(self
.task_name_deploy_vca
):
2748 # A N2VC task is pending
2749 db_nsr_update
["config-status"] = "failed"
2751 # RO or KDU task is pending
2752 db_nsr_update
["operational-status"] = "failed"
2754 # update status at database
2756 error_detail
= ". ".join(error_list
)
2757 self
.logger
.error(logging_text
+ error_detail
)
2758 error_description_nslcmop
= "{} Detail: {}".format(
2759 stage
[0], error_detail
2761 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
2762 nslcmop_id
, stage
[0]
2765 db_nsr_update
["detailed-status"] = (
2766 error_description_nsr
+ " Detail: " + error_detail
2768 db_nslcmop_update
["detailed-status"] = error_detail
2769 nslcmop_operation_state
= "FAILED"
2773 error_description_nsr
= error_description_nslcmop
= None
2775 db_nsr_update
["detailed-status"] = "Done"
2776 db_nslcmop_update
["detailed-status"] = "Done"
2777 nslcmop_operation_state
= "COMPLETED"
2780 self
._write
_ns
_status
(
2783 current_operation
="IDLE",
2784 current_operation_id
=None,
2785 error_description
=error_description_nsr
,
2786 error_detail
=error_detail
,
2787 other_update
=db_nsr_update
,
2789 self
._write
_op
_status
(
2792 error_message
=error_description_nslcmop
,
2793 operation_state
=nslcmop_operation_state
,
2794 other_update
=db_nslcmop_update
,
2797 if nslcmop_operation_state
:
2799 await self
.msg
.aiowrite(
2804 "nslcmop_id": nslcmop_id
,
2805 "operationState": nslcmop_operation_state
,
2809 except Exception as e
:
2811 logging_text
+ "kafka_write notification Exception {}".format(e
)
2814 self
.logger
.debug(logging_text
+ "Exit")
2815 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2817 def _get_vnfd(self
, vnfd_id
: str, cached_vnfds
: Dict
[str, Any
]):
2818 if vnfd_id
not in cached_vnfds
:
2819 cached_vnfds
[vnfd_id
] = self
.db
.get_one("vnfds", {"id": vnfd_id
})
2820 return cached_vnfds
[vnfd_id
]
2822 def _get_vnfr(self
, nsr_id
: str, vnf_profile_id
: str, cached_vnfrs
: Dict
[str, Any
]):
2823 if vnf_profile_id
not in cached_vnfrs
:
2824 cached_vnfrs
[vnf_profile_id
] = self
.db
.get_one(
2827 "member-vnf-index-ref": vnf_profile_id
,
2828 "nsr-id-ref": nsr_id
,
2831 return cached_vnfrs
[vnf_profile_id
]
2833 def _is_deployed_vca_in_relation(
2834 self
, vca
: DeployedVCA
, relation
: Relation
2837 for endpoint
in (relation
.provider
, relation
.requirer
):
2838 if endpoint
["kdu-resource-profile-id"]:
2841 vca
.vnf_profile_id
== endpoint
.vnf_profile_id
2842 and vca
.vdu_profile_id
== endpoint
.vdu_profile_id
2843 and vca
.execution_environment_ref
== endpoint
.execution_environment_ref
2849 def _update_ee_relation_data_with_implicit_data(
2850 self
, nsr_id
, nsd
, ee_relation_data
, cached_vnfds
, vnf_profile_id
: str = None
2852 ee_relation_data
= safe_get_ee_relation(
2853 nsr_id
, ee_relation_data
, vnf_profile_id
=vnf_profile_id
2855 ee_relation_level
= EELevel
.get_level(ee_relation_data
)
2856 if (ee_relation_level
in (EELevel
.VNF
, EELevel
.VDU
)) and not ee_relation_data
[
2857 "execution-environment-ref"
2859 vnf_profile
= get_vnf_profile(nsd
, ee_relation_data
["vnf-profile-id"])
2860 vnfd_id
= vnf_profile
["vnfd-id"]
2861 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2864 if ee_relation_level
== EELevel
.VNF
2865 else ee_relation_data
["vdu-profile-id"]
2867 ee
= get_juju_ee_ref(db_vnfd
, entity_id
)
2870 f
"not execution environments found for ee_relation {ee_relation_data}"
2872 ee_relation_data
["execution-environment-ref"] = ee
["id"]
2873 return ee_relation_data
2875 def _get_ns_relations(
2878 nsd
: Dict
[str, Any
],
2880 cached_vnfds
: Dict
[str, Any
],
2881 ) -> List
[Relation
]:
2883 db_ns_relations
= get_ns_configuration_relation_list(nsd
)
2884 for r
in db_ns_relations
:
2885 provider_dict
= None
2886 requirer_dict
= None
2887 if all(key
in r
for key
in ("provider", "requirer")):
2888 provider_dict
= r
["provider"]
2889 requirer_dict
= r
["requirer"]
2890 elif "entities" in r
:
2891 provider_id
= r
["entities"][0]["id"]
2894 "endpoint": r
["entities"][0]["endpoint"],
2896 if provider_id
!= nsd
["id"]:
2897 provider_dict
["vnf-profile-id"] = provider_id
2898 requirer_id
= r
["entities"][1]["id"]
2901 "endpoint": r
["entities"][1]["endpoint"],
2903 if requirer_id
!= nsd
["id"]:
2904 requirer_dict
["vnf-profile-id"] = requirer_id
2906 raise Exception("provider/requirer or entities must be included in the relation.")
2907 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2908 nsr_id
, nsd
, provider_dict
, cached_vnfds
2910 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2911 nsr_id
, nsd
, requirer_dict
, cached_vnfds
2913 provider
= EERelation(relation_provider
)
2914 requirer
= EERelation(relation_requirer
)
2915 relation
= Relation(r
["name"], provider
, requirer
)
2916 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
2918 relations
.append(relation
)
2921 def _get_vnf_relations(
2924 nsd
: Dict
[str, Any
],
2926 cached_vnfds
: Dict
[str, Any
],
2927 ) -> List
[Relation
]:
2929 vnf_profile
= get_vnf_profile(nsd
, vca
.vnf_profile_id
)
2930 vnf_profile_id
= vnf_profile
["id"]
2931 vnfd_id
= vnf_profile
["vnfd-id"]
2932 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2933 db_vnf_relations
= get_relation_list(db_vnfd
, vnfd_id
)
2934 for r
in db_vnf_relations
:
2935 provider_dict
= None
2936 requirer_dict
= None
2937 if all(key
in r
for key
in ("provider", "requirer")):
2938 provider_dict
= r
["provider"]
2939 requirer_dict
= r
["requirer"]
2940 elif "entities" in r
:
2941 provider_id
= r
["entities"][0]["id"]
2944 "vnf-profile-id": vnf_profile_id
,
2945 "endpoint": r
["entities"][0]["endpoint"],
2947 if provider_id
!= vnfd_id
:
2948 provider_dict
["vdu-profile-id"] = provider_id
2949 requirer_id
= r
["entities"][1]["id"]
2952 "vnf-profile-id": vnf_profile_id
,
2953 "endpoint": r
["entities"][1]["endpoint"],
2955 if requirer_id
!= vnfd_id
:
2956 requirer_dict
["vdu-profile-id"] = requirer_id
2958 raise Exception("provider/requirer or entities must be included in the relation.")
2959 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2960 nsr_id
, nsd
, provider_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
2962 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2963 nsr_id
, nsd
, requirer_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
2965 provider
= EERelation(relation_provider
)
2966 requirer
= EERelation(relation_requirer
)
2967 relation
= Relation(r
["name"], provider
, requirer
)
2968 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
2970 relations
.append(relation
)
2973 def _get_kdu_resource_data(
2975 ee_relation
: EERelation
,
2976 db_nsr
: Dict
[str, Any
],
2977 cached_vnfds
: Dict
[str, Any
],
2978 ) -> DeployedK8sResource
:
2979 nsd
= get_nsd(db_nsr
)
2980 vnf_profiles
= get_vnf_profiles(nsd
)
2981 vnfd_id
= find_in_list(
2983 lambda vnf_profile
: vnf_profile
["id"] == ee_relation
.vnf_profile_id
,
2985 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2986 kdu_resource_profile
= get_kdu_resource_profile(
2987 db_vnfd
, ee_relation
.kdu_resource_profile_id
2989 kdu_name
= kdu_resource_profile
["kdu-name"]
2990 deployed_kdu
, _
= get_deployed_kdu(
2991 db_nsr
.get("_admin", ()).get("deployed", ()),
2993 ee_relation
.vnf_profile_id
,
2995 deployed_kdu
.update({"resource-name": kdu_resource_profile
["resource-name"]})
2998 def _get_deployed_component(
3000 ee_relation
: EERelation
,
3001 db_nsr
: Dict
[str, Any
],
3002 cached_vnfds
: Dict
[str, Any
],
3003 ) -> DeployedComponent
:
3004 nsr_id
= db_nsr
["_id"]
3005 deployed_component
= None
3006 ee_level
= EELevel
.get_level(ee_relation
)
3007 if ee_level
== EELevel
.NS
:
3008 vca
= get_deployed_vca(db_nsr
, {"vdu_id": None, "member-vnf-index": None})
3010 deployed_component
= DeployedVCA(nsr_id
, vca
)
3011 elif ee_level
== EELevel
.VNF
:
3012 vca
= get_deployed_vca(
3016 "member-vnf-index": ee_relation
.vnf_profile_id
,
3017 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3021 deployed_component
= DeployedVCA(nsr_id
, vca
)
3022 elif ee_level
== EELevel
.VDU
:
3023 vca
= get_deployed_vca(
3026 "vdu_id": ee_relation
.vdu_profile_id
,
3027 "member-vnf-index": ee_relation
.vnf_profile_id
,
3028 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3032 deployed_component
= DeployedVCA(nsr_id
, vca
)
3033 elif ee_level
== EELevel
.KDU
:
3034 kdu_resource_data
= self
._get
_kdu
_resource
_data
(
3035 ee_relation
, db_nsr
, cached_vnfds
3037 if kdu_resource_data
:
3038 deployed_component
= DeployedK8sResource(kdu_resource_data
)
3039 return deployed_component
3041 async def _add_relation(
3045 db_nsr
: Dict
[str, Any
],
3046 cached_vnfds
: Dict
[str, Any
],
3047 cached_vnfrs
: Dict
[str, Any
],
3049 deployed_provider
= self
._get
_deployed
_component
(
3050 relation
.provider
, db_nsr
, cached_vnfds
3052 deployed_requirer
= self
._get
_deployed
_component
(
3053 relation
.requirer
, db_nsr
, cached_vnfds
3057 and deployed_requirer
3058 and deployed_provider
.config_sw_installed
3059 and deployed_requirer
.config_sw_installed
3061 provider_db_vnfr
= (
3063 relation
.provider
.nsr_id
,
3064 relation
.provider
.vnf_profile_id
,
3067 if relation
.provider
.vnf_profile_id
3070 requirer_db_vnfr
= (
3072 relation
.requirer
.nsr_id
,
3073 relation
.requirer
.vnf_profile_id
,
3076 if relation
.requirer
.vnf_profile_id
3079 provider_vca_id
= self
.get_vca_id(provider_db_vnfr
, db_nsr
)
3080 requirer_vca_id
= self
.get_vca_id(requirer_db_vnfr
, db_nsr
)
3081 provider_relation_endpoint
= RelationEndpoint(
3082 deployed_provider
.ee_id
,
3084 relation
.provider
.endpoint
,
3086 requirer_relation_endpoint
= RelationEndpoint(
3087 deployed_requirer
.ee_id
,
3089 relation
.requirer
.endpoint
,
3091 await self
.vca_map
[vca_type
].add_relation(
3092 provider
=provider_relation_endpoint
,
3093 requirer
=requirer_relation_endpoint
,
3095 # remove entry from relations list
3099 async def _add_vca_relations(
3105 timeout
: int = 3600,
3109 # 1. find all relations for this VCA
3110 # 2. wait for other peers related
3114 # STEP 1: find all relations for this VCA
3117 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3118 nsd
= get_nsd(db_nsr
)
3121 deployed_vca_dict
= get_deployed_vca_list(db_nsr
)[vca_index
]
3122 my_vca
= DeployedVCA(nsr_id
, deployed_vca_dict
)
3127 relations
.extend(self
._get
_ns
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3128 relations
.extend(self
._get
_vnf
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3130 # if no relations, terminate
3132 self
.logger
.debug(logging_text
+ " No relations")
3135 self
.logger
.debug(logging_text
+ " adding relations {}".format(relations
))
3142 if now
- start
>= timeout
:
3143 self
.logger
.error(logging_text
+ " : timeout adding relations")
3146 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3147 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3149 # for each relation, find the VCA's related
3150 for relation
in relations
.copy():
3151 added
= await self
._add
_relation
(
3159 relations
.remove(relation
)
3162 self
.logger
.debug("Relations added")
3164 await asyncio
.sleep(5.0)
3168 except Exception as e
:
3169 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
3172 async def _install_kdu(
3180 k8s_instance_info
: dict,
3181 k8params
: dict = None,
3187 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
3190 "collection": "nsrs",
3191 "filter": {"_id": nsr_id
},
3192 "path": nsr_db_path
,
3195 if k8s_instance_info
.get("kdu-deployment-name"):
3196 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
3198 kdu_instance
= self
.k8scluster_map
[
3200 ].generate_kdu_instance_name(
3201 db_dict
=db_dict_install
,
3202 kdu_model
=k8s_instance_info
["kdu-model"],
3203 kdu_name
=k8s_instance_info
["kdu-name"],
3206 # Update the nsrs table with the kdu-instance value
3210 _desc
={nsr_db_path
+ ".kdu-instance": kdu_instance
},
3213 # Update the nsrs table with the actual namespace being used, if the k8scluster-type is `juju` or
3214 # `juju-bundle`. This verification is needed because there is not a standard/homogeneous namespace
3215 # between the Helm Charts and Juju Bundles-based KNFs. If we found a way of having an homogeneous
3216 # namespace, this first verification could be removed, and the next step would be done for any kind
3218 # TODO -> find a way to have an homogeneous namespace between the Helm Charts and Juju Bundles-based
3219 # KNFs (Bug 2027: https://osm.etsi.org/bugzilla/show_bug.cgi?id=2027)
3220 if k8sclustertype
in ("juju", "juju-bundle"):
3221 # First, verify if the current namespace is present in the `_admin.projects_read` (if not, it means
3222 # that the user passed a namespace which he wants its KDU to be deployed in)
3228 "_admin.projects_write": k8s_instance_info
["namespace"],
3229 "_admin.projects_read": k8s_instance_info
["namespace"],
3235 f
"Updating namespace/model for Juju Bundle from {k8s_instance_info['namespace']} to {kdu_instance}"
3240 _desc
={f
"{nsr_db_path}.namespace": kdu_instance
},
3242 k8s_instance_info
["namespace"] = kdu_instance
3244 await self
.k8scluster_map
[k8sclustertype
].install(
3245 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3246 kdu_model
=k8s_instance_info
["kdu-model"],
3249 db_dict
=db_dict_install
,
3251 kdu_name
=k8s_instance_info
["kdu-name"],
3252 namespace
=k8s_instance_info
["namespace"],
3253 kdu_instance
=kdu_instance
,
3257 # Obtain services to obtain management service ip
3258 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
3259 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3260 kdu_instance
=kdu_instance
,
3261 namespace
=k8s_instance_info
["namespace"],
3264 # Obtain management service info (if exists)
3265 vnfr_update_dict
= {}
3266 kdu_config
= get_configuration(vnfd
, kdud
["name"])
3268 target_ee_list
= kdu_config
.get("execution-environment-list", [])
3273 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
3276 for service
in kdud
.get("service", [])
3277 if service
.get("mgmt-service")
3279 for mgmt_service
in mgmt_services
:
3280 for service
in services
:
3281 if service
["name"].startswith(mgmt_service
["name"]):
3282 # Mgmt service found, Obtain service ip
3283 ip
= service
.get("external_ip", service
.get("cluster_ip"))
3284 if isinstance(ip
, list) and len(ip
) == 1:
3288 "kdur.{}.ip-address".format(kdu_index
)
3291 # Check if must update also mgmt ip at the vnf
3292 service_external_cp
= mgmt_service
.get(
3293 "external-connection-point-ref"
3295 if service_external_cp
:
3297 deep_get(vnfd
, ("mgmt-interface", "cp"))
3298 == service_external_cp
3300 vnfr_update_dict
["ip-address"] = ip
3305 "external-connection-point-ref", ""
3307 == service_external_cp
,
3310 "kdur.{}.ip-address".format(kdu_index
)
3315 "Mgmt service name: {} not found".format(
3316 mgmt_service
["name"]
3320 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3321 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3323 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3326 and kdu_config
.get("initial-config-primitive")
3327 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3329 initial_config_primitive_list
= kdu_config
.get(
3330 "initial-config-primitive"
3332 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3334 for initial_config_primitive
in initial_config_primitive_list
:
3335 primitive_params_
= self
._map
_primitive
_params
(
3336 initial_config_primitive
, {}, {}
3339 await asyncio
.wait_for(
3340 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3341 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3342 kdu_instance
=kdu_instance
,
3343 primitive_name
=initial_config_primitive
["name"],
3344 params
=primitive_params_
,
3345 db_dict
=db_dict_install
,
3351 except Exception as e
:
3352 # Prepare update db with error and raise exception
3355 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3359 vnfr_data
.get("_id"),
3360 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3363 # ignore to keep original exception
3365 # reraise original error
3370 async def deploy_kdus(
3377 task_instantiation_info
,
3379 # Launch kdus if present in the descriptor
3381 k8scluster_id_2_uuic
= {
3382 "helm-chart-v3": {},
3387 async def _get_cluster_id(cluster_id
, cluster_type
):
3388 nonlocal k8scluster_id_2_uuic
3389 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3390 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3392 # check if K8scluster is creating and wait look if previous tasks in process
3393 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3394 "k8scluster", cluster_id
3397 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3398 task_name
, cluster_id
3400 self
.logger
.debug(logging_text
+ text
)
3401 await asyncio
.wait(task_dependency
, timeout
=3600)
3403 db_k8scluster
= self
.db
.get_one(
3404 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3406 if not db_k8scluster
:
3407 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3409 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3411 if cluster_type
== "helm-chart-v3":
3413 # backward compatibility for existing clusters that have not been initialized for helm v3
3414 k8s_credentials
= yaml
.safe_dump(
3415 db_k8scluster
.get("credentials")
3417 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3418 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3420 db_k8scluster_update
= {}
3421 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3422 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3423 db_k8scluster_update
[
3424 "_admin.helm-chart-v3.created"
3426 db_k8scluster_update
[
3427 "_admin.helm-chart-v3.operationalState"
3430 "k8sclusters", cluster_id
, db_k8scluster_update
3432 except Exception as e
:
3435 + "error initializing helm-v3 cluster: {}".format(str(e
))
3438 "K8s cluster '{}' has not been initialized for '{}'".format(
3439 cluster_id
, cluster_type
3444 "K8s cluster '{}' has not been initialized for '{}'".format(
3445 cluster_id
, cluster_type
3448 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3451 logging_text
+= "Deploy kdus: "
3454 db_nsr_update
= {"_admin.deployed.K8s": []}
3455 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3458 updated_cluster_list
= []
3459 updated_v3_cluster_list
= []
3461 for vnfr_data
in db_vnfrs
.values():
3462 vca_id
= self
.get_vca_id(vnfr_data
, {})
3463 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3464 # Step 0: Prepare and set parameters
3465 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3466 vnfd_id
= vnfr_data
.get("vnfd-id")
3467 vnfd_with_id
= find_in_list(
3468 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3472 for kdud
in vnfd_with_id
["kdu"]
3473 if kdud
["name"] == kdur
["kdu-name"]
3475 namespace
= kdur
.get("k8s-namespace")
3476 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3477 if kdur
.get("helm-chart"):
3478 kdumodel
= kdur
["helm-chart"]
3479 # Default version: helm3, if helm-version is v2 assign v2
3480 k8sclustertype
= "helm-chart-v3"
3481 self
.logger
.debug("kdur: {}".format(kdur
))
3483 kdur
.get("helm-version")
3484 and kdur
.get("helm-version") == "v2"
3486 k8sclustertype
= "helm-chart"
3487 elif kdur
.get("juju-bundle"):
3488 kdumodel
= kdur
["juju-bundle"]
3489 k8sclustertype
= "juju-bundle"
3492 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3493 "juju-bundle. Maybe an old NBI version is running".format(
3494 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3497 # check if kdumodel is a file and exists
3499 vnfd_with_id
= find_in_list(
3500 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3502 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3503 if storage
: # may be not present if vnfd has not artifacts
3504 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3505 if storage
["pkg-dir"]:
3506 filename
= "{}/{}/{}s/{}".format(
3513 filename
= "{}/Scripts/{}s/{}".format(
3518 if self
.fs
.file_exists(
3519 filename
, mode
="file"
3520 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3521 kdumodel
= self
.fs
.path
+ filename
3522 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3524 except Exception: # it is not a file
3527 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3528 step
= "Synchronize repos for k8s cluster '{}'".format(
3531 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3535 k8sclustertype
== "helm-chart"
3536 and cluster_uuid
not in updated_cluster_list
3538 k8sclustertype
== "helm-chart-v3"
3539 and cluster_uuid
not in updated_v3_cluster_list
3541 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3542 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3543 cluster_uuid
=cluster_uuid
3546 if del_repo_list
or added_repo_dict
:
3547 if k8sclustertype
== "helm-chart":
3549 "_admin.helm_charts_added." + item
: None
3550 for item
in del_repo_list
3553 "_admin.helm_charts_added." + item
: name
3554 for item
, name
in added_repo_dict
.items()
3556 updated_cluster_list
.append(cluster_uuid
)
3557 elif k8sclustertype
== "helm-chart-v3":
3559 "_admin.helm_charts_v3_added." + item
: None
3560 for item
in del_repo_list
3563 "_admin.helm_charts_v3_added." + item
: name
3564 for item
, name
in added_repo_dict
.items()
3566 updated_v3_cluster_list
.append(cluster_uuid
)
3568 logging_text
+ "repos synchronized on k8s cluster "
3569 "'{}' to_delete: {}, to_add: {}".format(
3570 k8s_cluster_id
, del_repo_list
, added_repo_dict
3575 {"_id": k8s_cluster_id
},
3581 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3582 vnfr_data
["member-vnf-index-ref"],
3586 k8s_instance_info
= {
3587 "kdu-instance": None,
3588 "k8scluster-uuid": cluster_uuid
,
3589 "k8scluster-type": k8sclustertype
,
3590 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3591 "kdu-name": kdur
["kdu-name"],
3592 "kdu-model": kdumodel
,
3593 "namespace": namespace
,
3594 "kdu-deployment-name": kdu_deployment_name
,
3596 db_path
= "_admin.deployed.K8s.{}".format(index
)
3597 db_nsr_update
[db_path
] = k8s_instance_info
3598 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3599 vnfd_with_id
= find_in_list(
3600 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3602 task
= asyncio
.ensure_future(
3611 k8params
=desc_params
,
3616 self
.lcm_tasks
.register(
3620 "instantiate_KDU-{}".format(index
),
3623 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3629 except (LcmException
, asyncio
.CancelledError
):
3631 except Exception as e
:
3632 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3633 if isinstance(e
, (N2VCException
, DbException
)):
3634 self
.logger
.error(logging_text
+ msg
)
3636 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3637 raise LcmException(msg
)
3640 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3659 task_instantiation_info
,
3662 # launch instantiate_N2VC in a asyncio task and register task object
3663 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3664 # if not found, create one entry and update database
3665 # fill db_nsr._admin.deployed.VCA.<index>
3668 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
3670 if "execution-environment-list" in descriptor_config
:
3671 ee_list
= descriptor_config
.get("execution-environment-list", [])
3672 elif "juju" in descriptor_config
:
3673 ee_list
= [descriptor_config
] # ns charms
3674 else: # other types as script are not supported
3677 for ee_item
in ee_list
:
3680 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3681 ee_item
.get("juju"), ee_item
.get("helm-chart")
3684 ee_descriptor_id
= ee_item
.get("id")
3685 if ee_item
.get("juju"):
3686 vca_name
= ee_item
["juju"].get("charm")
3689 if ee_item
["juju"].get("charm") is not None
3692 if ee_item
["juju"].get("cloud") == "k8s":
3693 vca_type
= "k8s_proxy_charm"
3694 elif ee_item
["juju"].get("proxy") is False:
3695 vca_type
= "native_charm"
3696 elif ee_item
.get("helm-chart"):
3697 vca_name
= ee_item
["helm-chart"]
3698 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
3701 vca_type
= "helm-v3"
3704 logging_text
+ "skipping non juju neither charm configuration"
3709 for vca_index
, vca_deployed
in enumerate(
3710 db_nsr
["_admin"]["deployed"]["VCA"]
3712 if not vca_deployed
:
3715 vca_deployed
.get("member-vnf-index") == member_vnf_index
3716 and vca_deployed
.get("vdu_id") == vdu_id
3717 and vca_deployed
.get("kdu_name") == kdu_name
3718 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
3719 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
3723 # not found, create one.
3725 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
3728 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
3730 target
+= "/kdu/{}".format(kdu_name
)
3732 "target_element": target
,
3733 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3734 "member-vnf-index": member_vnf_index
,
3736 "kdu_name": kdu_name
,
3737 "vdu_count_index": vdu_index
,
3738 "operational-status": "init", # TODO revise
3739 "detailed-status": "", # TODO revise
3740 "step": "initial-deploy", # TODO revise
3742 "vdu_name": vdu_name
,
3744 "ee_descriptor_id": ee_descriptor_id
,
3748 # create VCA and configurationStatus in db
3750 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
3751 "configurationStatus.{}".format(vca_index
): dict(),
3753 self
.update_db_2("nsrs", nsr_id
, db_dict
)
3755 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
3757 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
3758 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
3759 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
3762 task_n2vc
= asyncio
.ensure_future(
3763 self
.instantiate_N2VC(
3764 logging_text
=logging_text
,
3765 vca_index
=vca_index
,
3771 vdu_index
=vdu_index
,
3772 deploy_params
=deploy_params
,
3773 config_descriptor
=descriptor_config
,
3774 base_folder
=base_folder
,
3775 nslcmop_id
=nslcmop_id
,
3779 ee_config_descriptor
=ee_item
,
3782 self
.lcm_tasks
.register(
3786 "instantiate_N2VC-{}".format(vca_index
),
3789 task_instantiation_info
[
3791 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
3792 member_vnf_index
or "", vdu_id
or ""
3796 def _create_nslcmop(nsr_id
, operation
, params
):
3798 Creates a ns-lcm-opp content to be stored at database.
3799 :param nsr_id: internal id of the instance
3800 :param operation: instantiate, terminate, scale, action, ...
3801 :param params: user parameters for the operation
3802 :return: dictionary following SOL005 format
3804 # Raise exception if invalid arguments
3805 if not (nsr_id
and operation
and params
):
3807 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3814 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3815 "operationState": "PROCESSING",
3816 "statusEnteredTime": now
,
3817 "nsInstanceId": nsr_id
,
3818 "lcmOperationType": operation
,
3820 "isAutomaticInvocation": False,
3821 "operationParams": params
,
3822 "isCancelPending": False,
3824 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
3825 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
3830 def _format_additional_params(self
, params
):
3831 params
= params
or {}
3832 for key
, value
in params
.items():
3833 if str(value
).startswith("!!yaml "):
3834 params
[key
] = yaml
.safe_load(value
[7:])
3837 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
3838 primitive
= seq
.get("name")
3839 primitive_params
= {}
3841 "member_vnf_index": vnf_index
,
3842 "primitive": primitive
,
3843 "primitive_params": primitive_params
,
3846 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
3850 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
3851 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
3852 if op
.get("operationState") == "COMPLETED":
3853 # b. Skip sub-operation
3854 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3855 return self
.SUBOPERATION_STATUS_SKIP
3857 # c. retry executing sub-operation
3858 # The sub-operation exists, and operationState != 'COMPLETED'
3859 # Update operationState = 'PROCESSING' to indicate a retry.
3860 operationState
= "PROCESSING"
3861 detailed_status
= "In progress"
3862 self
._update
_suboperation
_status
(
3863 db_nslcmop
, op_index
, operationState
, detailed_status
3865 # Return the sub-operation index
3866 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3867 # with arguments extracted from the sub-operation
3870 # Find a sub-operation where all keys in a matching dictionary must match
3871 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3872 def _find_suboperation(self
, db_nslcmop
, match
):
3873 if db_nslcmop
and match
:
3874 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
3875 for i
, op
in enumerate(op_list
):
3876 if all(op
.get(k
) == match
[k
] for k
in match
):
3878 return self
.SUBOPERATION_STATUS_NOT_FOUND
3880 # Update status for a sub-operation given its index
3881 def _update_suboperation_status(
3882 self
, db_nslcmop
, op_index
, operationState
, detailed_status
3884 # Update DB for HA tasks
3885 q_filter
= {"_id": db_nslcmop
["_id"]}
3887 "_admin.operations.{}.operationState".format(op_index
): operationState
,
3888 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
3891 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
3894 # Add sub-operation, return the index of the added sub-operation
3895 # Optionally, set operationState, detailed-status, and operationType
3896 # Status and type are currently set for 'scale' sub-operations:
3897 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3898 # 'detailed-status' : status message
3899 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3900 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3901 def _add_suboperation(
3909 mapped_primitive_params
,
3910 operationState
=None,
3911 detailed_status
=None,
3914 RO_scaling_info
=None,
3917 return self
.SUBOPERATION_STATUS_NOT_FOUND
3918 # Get the "_admin.operations" list, if it exists
3919 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
3920 op_list
= db_nslcmop_admin
.get("operations")
3921 # Create or append to the "_admin.operations" list
3923 "member_vnf_index": vnf_index
,
3925 "vdu_count_index": vdu_count_index
,
3926 "primitive": primitive
,
3927 "primitive_params": mapped_primitive_params
,
3930 new_op
["operationState"] = operationState
3932 new_op
["detailed-status"] = detailed_status
3934 new_op
["lcmOperationType"] = operationType
3936 new_op
["RO_nsr_id"] = RO_nsr_id
3938 new_op
["RO_scaling_info"] = RO_scaling_info
3940 # No existing operations, create key 'operations' with current operation as first list element
3941 db_nslcmop_admin
.update({"operations": [new_op
]})
3942 op_list
= db_nslcmop_admin
.get("operations")
3944 # Existing operations, append operation to list
3945 op_list
.append(new_op
)
3947 db_nslcmop_update
= {"_admin.operations": op_list
}
3948 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
3949 op_index
= len(op_list
) - 1
3952 # Helper methods for scale() sub-operations
3954 # pre-scale/post-scale:
3955 # Check for 3 different cases:
3956 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
3957 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
3958 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
3959 def _check_or_add_scale_suboperation(
3963 vnf_config_primitive
,
3967 RO_scaling_info
=None,
3969 # Find this sub-operation
3970 if RO_nsr_id
and RO_scaling_info
:
3971 operationType
= "SCALE-RO"
3973 "member_vnf_index": vnf_index
,
3974 "RO_nsr_id": RO_nsr_id
,
3975 "RO_scaling_info": RO_scaling_info
,
3979 "member_vnf_index": vnf_index
,
3980 "primitive": vnf_config_primitive
,
3981 "primitive_params": primitive_params
,
3982 "lcmOperationType": operationType
,
3984 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
3985 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
3986 # a. New sub-operation
3987 # The sub-operation does not exist, add it.
3988 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
3989 # The following parameters are set to None for all kind of scaling:
3991 vdu_count_index
= None
3993 if RO_nsr_id
and RO_scaling_info
:
3994 vnf_config_primitive
= None
3995 primitive_params
= None
3998 RO_scaling_info
= None
3999 # Initial status for sub-operation
4000 operationState
= "PROCESSING"
4001 detailed_status
= "In progress"
4002 # Add sub-operation for pre/post-scaling (zero or more operations)
4003 self
._add
_suboperation
(
4009 vnf_config_primitive
,
4017 return self
.SUBOPERATION_STATUS_NEW
4019 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
4020 # or op_index (operationState != 'COMPLETED')
4021 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
4023 # Function to return execution_environment id
4025 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
4026 # TODO vdu_index_count
4027 for vca
in vca_deployed_list
:
4028 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
4031 async def destroy_N2VC(
4039 exec_primitives
=True,
4044 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
4045 :param logging_text:
4047 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
4048 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
4049 :param vca_index: index in the database _admin.deployed.VCA
4050 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
4051 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
4052 not executed properly
4053 :param scaling_in: True destroys the application, False destroys the model
4054 :return: None or exception
4059 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
4060 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
4064 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
4066 # execute terminate_primitives
4068 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
4069 config_descriptor
.get("terminate-config-primitive"),
4070 vca_deployed
.get("ee_descriptor_id"),
4072 vdu_id
= vca_deployed
.get("vdu_id")
4073 vdu_count_index
= vca_deployed
.get("vdu_count_index")
4074 vdu_name
= vca_deployed
.get("vdu_name")
4075 vnf_index
= vca_deployed
.get("member-vnf-index")
4076 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
4077 for seq
in terminate_primitives
:
4078 # For each sequence in list, get primitive and call _ns_execute_primitive()
4079 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
4080 vnf_index
, seq
.get("name")
4082 self
.logger
.debug(logging_text
+ step
)
4083 # Create the primitive for each sequence, i.e. "primitive": "touch"
4084 primitive
= seq
.get("name")
4085 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
4090 self
._add
_suboperation
(
4097 mapped_primitive_params
,
4099 # Sub-operations: Call _ns_execute_primitive() instead of action()
4101 result
, result_detail
= await self
._ns
_execute
_primitive
(
4102 vca_deployed
["ee_id"],
4104 mapped_primitive_params
,
4108 except LcmException
:
4109 # this happens when VCA is not deployed. In this case it is not needed to terminate
4111 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
4112 if result
not in result_ok
:
4114 "terminate_primitive {} for vnf_member_index={} fails with "
4115 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
4117 # set that this VCA do not need terminated
4118 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
4122 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
4125 # Delete Prometheus Jobs if any
4126 # This uses NSR_ID, so it will destroy any jobs under this index
4127 self
.db
.del_list("prometheus_jobs", {"nsr_id": db_nslcmop
["nsInstanceId"]})
4130 await self
.vca_map
[vca_type
].delete_execution_environment(
4131 vca_deployed
["ee_id"],
4132 scaling_in
=scaling_in
,
4137 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
4138 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
4139 namespace
= "." + db_nsr
["_id"]
4141 await self
.n2vc
.delete_namespace(
4142 namespace
=namespace
,
4143 total_timeout
=self
.timeout_charm_delete
,
4146 except N2VCNotFound
: # already deleted. Skip
4148 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
4150 async def _terminate_RO(
4151 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4154 Terminates a deployment from RO
4155 :param logging_text:
4156 :param nsr_deployed: db_nsr._admin.deployed
4159 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
4160 this method will update only the index 2, but it will write on database the concatenated content of the list
4165 ro_nsr_id
= ro_delete_action
= None
4166 if nsr_deployed
and nsr_deployed
.get("RO"):
4167 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
4168 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
4171 stage
[2] = "Deleting ns from VIM."
4172 db_nsr_update
["detailed-status"] = " ".join(stage
)
4173 self
._write
_op
_status
(nslcmop_id
, stage
)
4174 self
.logger
.debug(logging_text
+ stage
[2])
4175 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4176 self
._write
_op
_status
(nslcmop_id
, stage
)
4177 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
4178 ro_delete_action
= desc
["action_id"]
4180 "_admin.deployed.RO.nsr_delete_action_id"
4181 ] = ro_delete_action
4182 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4183 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4184 if ro_delete_action
:
4185 # wait until NS is deleted from VIM
4186 stage
[2] = "Waiting ns deleted from VIM."
4187 detailed_status_old
= None
4191 + " RO_id={} ro_delete_action={}".format(
4192 ro_nsr_id
, ro_delete_action
4195 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4196 self
._write
_op
_status
(nslcmop_id
, stage
)
4198 delete_timeout
= 20 * 60 # 20 minutes
4199 while delete_timeout
> 0:
4200 desc
= await self
.RO
.show(
4202 item_id_name
=ro_nsr_id
,
4203 extra_item
="action",
4204 extra_item_id
=ro_delete_action
,
4208 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
4210 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
4211 if ns_status
== "ERROR":
4212 raise ROclient
.ROClientException(ns_status_info
)
4213 elif ns_status
== "BUILD":
4214 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
4215 elif ns_status
== "ACTIVE":
4216 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4217 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4222 ), "ROclient.check_action_status returns unknown {}".format(
4225 if stage
[2] != detailed_status_old
:
4226 detailed_status_old
= stage
[2]
4227 db_nsr_update
["detailed-status"] = " ".join(stage
)
4228 self
._write
_op
_status
(nslcmop_id
, stage
)
4229 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4230 await asyncio
.sleep(5, loop
=self
.loop
)
4232 else: # delete_timeout <= 0:
4233 raise ROclient
.ROClientException(
4234 "Timeout waiting ns deleted from VIM"
4237 except Exception as e
:
4238 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4240 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4242 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4243 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4244 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4246 logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
)
4249 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4251 failed_detail
.append("delete conflict: {}".format(e
))
4254 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
)
4257 failed_detail
.append("delete error: {}".format(e
))
4259 logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
)
4263 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
4264 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
4266 stage
[2] = "Deleting nsd from RO."
4267 db_nsr_update
["detailed-status"] = " ".join(stage
)
4268 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4269 self
._write
_op
_status
(nslcmop_id
, stage
)
4270 await self
.RO
.delete("nsd", ro_nsd_id
)
4272 logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
)
4274 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4275 except Exception as e
:
4277 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4279 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4281 logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
)
4284 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4286 failed_detail
.append(
4287 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
)
4289 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4291 failed_detail
.append(
4292 "ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
)
4294 self
.logger
.error(logging_text
+ failed_detail
[-1])
4296 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
4297 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
4298 if not vnf_deployed
or not vnf_deployed
["id"]:
4301 ro_vnfd_id
= vnf_deployed
["id"]
4304 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4305 vnf_deployed
["member-vnf-index"], ro_vnfd_id
4307 db_nsr_update
["detailed-status"] = " ".join(stage
)
4308 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4309 self
._write
_op
_status
(nslcmop_id
, stage
)
4310 await self
.RO
.delete("vnfd", ro_vnfd_id
)
4312 logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
)
4314 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
4315 except Exception as e
:
4317 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4320 "_admin.deployed.RO.vnfd.{}.id".format(index
)
4324 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
)
4327 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4329 failed_detail
.append(
4330 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
)
4332 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4334 failed_detail
.append(
4335 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
)
4337 self
.logger
.error(logging_text
+ failed_detail
[-1])
4340 stage
[2] = "Error deleting from VIM"
4342 stage
[2] = "Deleted from VIM"
4343 db_nsr_update
["detailed-status"] = " ".join(stage
)
4344 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4345 self
._write
_op
_status
(nslcmop_id
, stage
)
4348 raise LcmException("; ".join(failed_detail
))
4350 async def terminate(self
, nsr_id
, nslcmop_id
):
4351 # Try to lock HA task here
4352 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4353 if not task_is_locked_by_me
:
4356 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4357 self
.logger
.debug(logging_text
+ "Enter")
4358 timeout_ns_terminate
= self
.timeout_ns_terminate
4361 operation_params
= None
4363 error_list
= [] # annotates all failed error messages
4364 db_nslcmop_update
= {}
4365 autoremove
= False # autoremove after terminated
4366 tasks_dict_info
= {}
4369 "Stage 1/3: Preparing task.",
4370 "Waiting for previous operations to terminate.",
4373 # ^ contains [stage, step, VIM-status]
4375 # wait for any previous tasks in process
4376 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4378 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4379 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4380 operation_params
= db_nslcmop
.get("operationParams") or {}
4381 if operation_params
.get("timeout_ns_terminate"):
4382 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4383 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4384 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4386 db_nsr_update
["operational-status"] = "terminating"
4387 db_nsr_update
["config-status"] = "terminating"
4388 self
._write
_ns
_status
(
4390 ns_state
="TERMINATING",
4391 current_operation
="TERMINATING",
4392 current_operation_id
=nslcmop_id
,
4393 other_update
=db_nsr_update
,
4395 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4396 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4397 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4400 stage
[1] = "Getting vnf descriptors from db."
4401 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4403 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4405 db_vnfds_from_id
= {}
4406 db_vnfds_from_member_index
= {}
4408 for vnfr
in db_vnfrs_list
:
4409 vnfd_id
= vnfr
["vnfd-id"]
4410 if vnfd_id
not in db_vnfds_from_id
:
4411 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4412 db_vnfds_from_id
[vnfd_id
] = vnfd
4413 db_vnfds_from_member_index
[
4414 vnfr
["member-vnf-index-ref"]
4415 ] = db_vnfds_from_id
[vnfd_id
]
4417 # Destroy individual execution environments when there are terminating primitives.
4418 # Rest of EE will be deleted at once
4419 # TODO - check before calling _destroy_N2VC
4420 # if not operation_params.get("skip_terminate_primitives"):#
4421 # or not vca.get("needed_terminate"):
4422 stage
[0] = "Stage 2/3 execute terminating primitives."
4423 self
.logger
.debug(logging_text
+ stage
[0])
4424 stage
[1] = "Looking execution environment that needs terminate."
4425 self
.logger
.debug(logging_text
+ stage
[1])
4427 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4428 config_descriptor
= None
4429 vca_member_vnf_index
= vca
.get("member-vnf-index")
4430 vca_id
= self
.get_vca_id(
4431 db_vnfrs_dict
.get(vca_member_vnf_index
)
4432 if vca_member_vnf_index
4436 if not vca
or not vca
.get("ee_id"):
4438 if not vca
.get("member-vnf-index"):
4440 config_descriptor
= db_nsr
.get("ns-configuration")
4441 elif vca
.get("vdu_id"):
4442 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4443 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4444 elif vca
.get("kdu_name"):
4445 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4446 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4448 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4449 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4450 vca_type
= vca
.get("type")
4451 exec_terminate_primitives
= not operation_params
.get(
4452 "skip_terminate_primitives"
4453 ) and vca
.get("needed_terminate")
4454 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4455 # pending native charms
4457 True if vca_type
in ("helm", "helm-v3", "native_charm") else False
4459 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4460 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4461 task
= asyncio
.ensure_future(
4469 exec_terminate_primitives
,
4473 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4475 # wait for pending tasks of terminate primitives
4479 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4481 error_list
= await self
._wait
_for
_tasks
(
4484 min(self
.timeout_charm_delete
, timeout_ns_terminate
),
4488 tasks_dict_info
.clear()
4490 return # raise LcmException("; ".join(error_list))
4492 # remove All execution environments at once
4493 stage
[0] = "Stage 3/3 delete all."
4495 if nsr_deployed
.get("VCA"):
4496 stage
[1] = "Deleting all execution environments."
4497 self
.logger
.debug(logging_text
+ stage
[1])
4498 vca_id
= self
.get_vca_id({}, db_nsr
)
4499 task_delete_ee
= asyncio
.ensure_future(
4501 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4502 timeout
=self
.timeout_charm_delete
,
4505 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4506 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4508 # Delete from k8scluster
4509 stage
[1] = "Deleting KDUs."
4510 self
.logger
.debug(logging_text
+ stage
[1])
4511 # print(nsr_deployed)
4512 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4513 if not kdu
or not kdu
.get("kdu-instance"):
4515 kdu_instance
= kdu
.get("kdu-instance")
4516 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4517 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4518 vca_id
= self
.get_vca_id({}, db_nsr
)
4519 task_delete_kdu_instance
= asyncio
.ensure_future(
4520 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4521 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4522 kdu_instance
=kdu_instance
,
4529 + "Unknown k8s deployment type {}".format(
4530 kdu
.get("k8scluster-type")
4535 task_delete_kdu_instance
4536 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4539 stage
[1] = "Deleting ns from VIM."
4541 task_delete_ro
= asyncio
.ensure_future(
4542 self
._terminate
_ng
_ro
(
4543 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4547 task_delete_ro
= asyncio
.ensure_future(
4549 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4552 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4554 # rest of staff will be done at finally
4557 ROclient
.ROClientException
,
4562 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4564 except asyncio
.CancelledError
:
4566 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4568 exc
= "Operation was cancelled"
4569 except Exception as e
:
4570 exc
= traceback
.format_exc()
4571 self
.logger
.critical(
4572 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4577 error_list
.append(str(exc
))
4579 # wait for pending tasks
4581 stage
[1] = "Waiting for terminate pending tasks."
4582 self
.logger
.debug(logging_text
+ stage
[1])
4583 error_list
+= await self
._wait
_for
_tasks
(
4586 timeout_ns_terminate
,
4590 stage
[1] = stage
[2] = ""
4591 except asyncio
.CancelledError
:
4592 error_list
.append("Cancelled")
4593 # TODO cancell all tasks
4594 except Exception as exc
:
4595 error_list
.append(str(exc
))
4596 # update status at database
4598 error_detail
= "; ".join(error_list
)
4599 # self.logger.error(logging_text + error_detail)
4600 error_description_nslcmop
= "{} Detail: {}".format(
4601 stage
[0], error_detail
4603 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4604 nslcmop_id
, stage
[0]
4607 db_nsr_update
["operational-status"] = "failed"
4608 db_nsr_update
["detailed-status"] = (
4609 error_description_nsr
+ " Detail: " + error_detail
4611 db_nslcmop_update
["detailed-status"] = error_detail
4612 nslcmop_operation_state
= "FAILED"
4616 error_description_nsr
= error_description_nslcmop
= None
4617 ns_state
= "NOT_INSTANTIATED"
4618 db_nsr_update
["operational-status"] = "terminated"
4619 db_nsr_update
["detailed-status"] = "Done"
4620 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4621 db_nslcmop_update
["detailed-status"] = "Done"
4622 nslcmop_operation_state
= "COMPLETED"
4625 self
._write
_ns
_status
(
4628 current_operation
="IDLE",
4629 current_operation_id
=None,
4630 error_description
=error_description_nsr
,
4631 error_detail
=error_detail
,
4632 other_update
=db_nsr_update
,
4634 self
._write
_op
_status
(
4637 error_message
=error_description_nslcmop
,
4638 operation_state
=nslcmop_operation_state
,
4639 other_update
=db_nslcmop_update
,
4641 if ns_state
== "NOT_INSTANTIATED":
4645 {"nsr-id-ref": nsr_id
},
4646 {"_admin.nsState": "NOT_INSTANTIATED"},
4648 except DbException
as e
:
4651 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4655 if operation_params
:
4656 autoremove
= operation_params
.get("autoremove", False)
4657 if nslcmop_operation_state
:
4659 await self
.msg
.aiowrite(
4664 "nslcmop_id": nslcmop_id
,
4665 "operationState": nslcmop_operation_state
,
4666 "autoremove": autoremove
,
4670 except Exception as e
:
4672 logging_text
+ "kafka_write notification Exception {}".format(e
)
4675 self
.logger
.debug(logging_text
+ "Exit")
4676 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4678 async def _wait_for_tasks(
4679 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4682 error_detail_list
= []
4684 pending_tasks
= list(created_tasks_info
.keys())
4685 num_tasks
= len(pending_tasks
)
4687 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4688 self
._write
_op
_status
(nslcmop_id
, stage
)
4689 while pending_tasks
:
4691 _timeout
= timeout
+ time_start
- time()
4692 done
, pending_tasks
= await asyncio
.wait(
4693 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4695 num_done
+= len(done
)
4696 if not done
: # Timeout
4697 for task
in pending_tasks
:
4698 new_error
= created_tasks_info
[task
] + ": Timeout"
4699 error_detail_list
.append(new_error
)
4700 error_list
.append(new_error
)
4703 if task
.cancelled():
4706 exc
= task
.exception()
4708 if isinstance(exc
, asyncio
.TimeoutError
):
4710 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4711 error_list
.append(created_tasks_info
[task
])
4712 error_detail_list
.append(new_error
)
4719 ROclient
.ROClientException
,
4725 self
.logger
.error(logging_text
+ new_error
)
4727 exc_traceback
= "".join(
4728 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4732 + created_tasks_info
[task
]
4738 logging_text
+ created_tasks_info
[task
] + ": Done"
4740 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4742 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4743 if nsr_id
: # update also nsr
4748 "errorDescription": "Error at: " + ", ".join(error_list
),
4749 "errorDetail": ". ".join(error_detail_list
),
4752 self
._write
_op
_status
(nslcmop_id
, stage
)
4753 return error_detail_list
4756 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4758 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4759 The default-value is used. If it is between < > it look for a value at instantiation_params
4760 :param primitive_desc: portion of VNFD/NSD that describes primitive
4761 :param params: Params provided by user
4762 :param instantiation_params: Instantiation params provided by user
4763 :return: a dictionary with the calculated params
4765 calculated_params
= {}
4766 for parameter
in primitive_desc
.get("parameter", ()):
4767 param_name
= parameter
["name"]
4768 if param_name
in params
:
4769 calculated_params
[param_name
] = params
[param_name
]
4770 elif "default-value" in parameter
or "value" in parameter
:
4771 if "value" in parameter
:
4772 calculated_params
[param_name
] = parameter
["value"]
4774 calculated_params
[param_name
] = parameter
["default-value"]
4776 isinstance(calculated_params
[param_name
], str)
4777 and calculated_params
[param_name
].startswith("<")
4778 and calculated_params
[param_name
].endswith(">")
4780 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4781 calculated_params
[param_name
] = instantiation_params
[
4782 calculated_params
[param_name
][1:-1]
4786 "Parameter {} needed to execute primitive {} not provided".format(
4787 calculated_params
[param_name
], primitive_desc
["name"]
4792 "Parameter {} needed to execute primitive {} not provided".format(
4793 param_name
, primitive_desc
["name"]
4797 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4798 calculated_params
[param_name
] = yaml
.safe_dump(
4799 calculated_params
[param_name
], default_flow_style
=True, width
=256
4801 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4803 ].startswith("!!yaml "):
4804 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4805 if parameter
.get("data-type") == "INTEGER":
4807 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4808 except ValueError: # error converting string to int
4810 "Parameter {} of primitive {} must be integer".format(
4811 param_name
, primitive_desc
["name"]
4814 elif parameter
.get("data-type") == "BOOLEAN":
4815 calculated_params
[param_name
] = not (
4816 (str(calculated_params
[param_name
])).lower() == "false"
4819 # add always ns_config_info if primitive name is config
4820 if primitive_desc
["name"] == "config":
4821 if "ns_config_info" in instantiation_params
:
4822 calculated_params
["ns_config_info"] = instantiation_params
[
4825 return calculated_params
4827 def _look_for_deployed_vca(
4834 ee_descriptor_id
=None,
4836 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4837 for vca
in deployed_vca
:
4840 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4843 vdu_count_index
is not None
4844 and vdu_count_index
!= vca
["vdu_count_index"]
4847 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4849 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4853 # vca_deployed not found
4855 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4856 " is not deployed".format(
4865 ee_id
= vca
.get("ee_id")
4867 "type", "lxc_proxy_charm"
4868 ) # default value for backward compatibility - proxy charm
4871 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4872 "execution environment".format(
4873 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
4876 return ee_id
, vca_type
4878 async def _ns_execute_primitive(
4884 retries_interval
=30,
4891 if primitive
== "config":
4892 primitive_params
= {"params": primitive_params
}
4894 vca_type
= vca_type
or "lxc_proxy_charm"
4898 output
= await asyncio
.wait_for(
4899 self
.vca_map
[vca_type
].exec_primitive(
4901 primitive_name
=primitive
,
4902 params_dict
=primitive_params
,
4903 progress_timeout
=self
.timeout_progress_primitive
,
4904 total_timeout
=self
.timeout_primitive
,
4909 timeout
=timeout
or self
.timeout_primitive
,
4913 except asyncio
.CancelledError
:
4915 except Exception as e
: # asyncio.TimeoutError
4916 if isinstance(e
, asyncio
.TimeoutError
):
4921 "Error executing action {} on {} -> {}".format(
4926 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
4928 return "FAILED", str(e
)
4930 return "COMPLETED", output
4932 except (LcmException
, asyncio
.CancelledError
):
4934 except Exception as e
:
4935 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
4937 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
4939 Updating the vca_status with latest juju information in nsrs record
4940 :param: nsr_id: Id of the nsr
4941 :param: nslcmop_id: Id of the nslcmop
4945 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
4946 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4947 vca_id
= self
.get_vca_id({}, db_nsr
)
4948 if db_nsr
["_admin"]["deployed"]["K8s"]:
4949 for _
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
4950 cluster_uuid
, kdu_instance
, cluster_type
= (
4951 k8s
["k8scluster-uuid"],
4952 k8s
["kdu-instance"],
4953 k8s
["k8scluster-type"],
4955 await self
._on
_update
_k
8s
_db
(
4956 cluster_uuid
=cluster_uuid
,
4957 kdu_instance
=kdu_instance
,
4958 filter={"_id": nsr_id
},
4960 cluster_type
=cluster_type
,
4963 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
4964 table
, filter = "nsrs", {"_id": nsr_id
}
4965 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
4966 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
4968 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
4969 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
4971 async def action(self
, nsr_id
, nslcmop_id
):
4972 # Try to lock HA task here
4973 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4974 if not task_is_locked_by_me
:
4977 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
4978 self
.logger
.debug(logging_text
+ "Enter")
4979 # get all needed from database
4983 db_nslcmop_update
= {}
4984 nslcmop_operation_state
= None
4985 error_description_nslcmop
= None
4988 # wait for any previous tasks in process
4989 step
= "Waiting for previous operations to terminate"
4990 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4992 self
._write
_ns
_status
(
4995 current_operation
="RUNNING ACTION",
4996 current_operation_id
=nslcmop_id
,
4999 step
= "Getting information from database"
5000 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5001 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5002 if db_nslcmop
["operationParams"].get("primitive_params"):
5003 db_nslcmop
["operationParams"]["primitive_params"] = json
.loads(
5004 db_nslcmop
["operationParams"]["primitive_params"]
5007 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5008 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
5009 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
5010 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
5011 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
5012 primitive
= db_nslcmop
["operationParams"]["primitive"]
5013 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
5014 timeout_ns_action
= db_nslcmop
["operationParams"].get(
5015 "timeout_ns_action", self
.timeout_primitive
5019 step
= "Getting vnfr from database"
5020 db_vnfr
= self
.db
.get_one(
5021 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
5023 if db_vnfr
.get("kdur"):
5025 for kdur
in db_vnfr
["kdur"]:
5026 if kdur
.get("additionalParams"):
5027 kdur
["additionalParams"] = json
.loads(
5028 kdur
["additionalParams"]
5030 kdur_list
.append(kdur
)
5031 db_vnfr
["kdur"] = kdur_list
5032 step
= "Getting vnfd from database"
5033 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
5035 step
= "Getting nsd from database"
5036 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
5038 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5039 # for backward compatibility
5040 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5041 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5042 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5043 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5045 # look for primitive
5046 config_primitive_desc
= descriptor_configuration
= None
5048 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
5050 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
5052 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
5054 descriptor_configuration
= db_nsd
.get("ns-configuration")
5056 if descriptor_configuration
and descriptor_configuration
.get(
5059 for config_primitive
in descriptor_configuration
["config-primitive"]:
5060 if config_primitive
["name"] == primitive
:
5061 config_primitive_desc
= config_primitive
5064 if not config_primitive_desc
:
5065 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
5067 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
5071 primitive_name
= primitive
5072 ee_descriptor_id
= None
5074 primitive_name
= config_primitive_desc
.get(
5075 "execution-environment-primitive", primitive
5077 ee_descriptor_id
= config_primitive_desc
.get(
5078 "execution-environment-ref"
5084 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
5086 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
5089 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
5091 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
5093 desc_params
= parse_yaml_strings(
5094 db_vnfr
.get("additionalParamsForVnf")
5097 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
5098 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
5099 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
5101 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
5102 actions
.add(primitive
["name"])
5103 for primitive
in kdu_configuration
.get("config-primitive", []):
5104 actions
.add(primitive
["name"])
5105 kdu_action
= True if primitive_name
in actions
else False
5107 # TODO check if ns is in a proper status
5109 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
5111 # kdur and desc_params already set from before
5112 if primitive_params
:
5113 desc_params
.update(primitive_params
)
5114 # TODO Check if we will need something at vnf level
5115 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
5117 kdu_name
== kdu
["kdu-name"]
5118 and kdu
["member-vnf-index"] == vnf_index
5123 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
5126 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
5127 msg
= "unknown k8scluster-type '{}'".format(
5128 kdu
.get("k8scluster-type")
5130 raise LcmException(msg
)
5133 "collection": "nsrs",
5134 "filter": {"_id": nsr_id
},
5135 "path": "_admin.deployed.K8s.{}".format(index
),
5139 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
5141 step
= "Executing kdu {}".format(primitive_name
)
5142 if primitive_name
== "upgrade":
5143 if desc_params
.get("kdu_model"):
5144 kdu_model
= desc_params
.get("kdu_model")
5145 del desc_params
["kdu_model"]
5147 kdu_model
= kdu
.get("kdu-model")
5148 parts
= kdu_model
.split(sep
=":")
5150 kdu_model
= parts
[0]
5152 detailed_status
= await asyncio
.wait_for(
5153 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
5154 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5155 kdu_instance
=kdu
.get("kdu-instance"),
5157 kdu_model
=kdu_model
,
5160 timeout
=timeout_ns_action
,
5162 timeout
=timeout_ns_action
+ 10,
5165 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
5167 elif primitive_name
== "rollback":
5168 detailed_status
= await asyncio
.wait_for(
5169 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
5170 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5171 kdu_instance
=kdu
.get("kdu-instance"),
5174 timeout
=timeout_ns_action
,
5176 elif primitive_name
== "status":
5177 detailed_status
= await asyncio
.wait_for(
5178 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
5179 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5180 kdu_instance
=kdu
.get("kdu-instance"),
5183 timeout
=timeout_ns_action
,
5186 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
5187 kdu
["kdu-name"], nsr_id
5189 params
= self
._map
_primitive
_params
(
5190 config_primitive_desc
, primitive_params
, desc_params
5193 detailed_status
= await asyncio
.wait_for(
5194 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
5195 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5196 kdu_instance
=kdu_instance
,
5197 primitive_name
=primitive_name
,
5200 timeout
=timeout_ns_action
,
5203 timeout
=timeout_ns_action
,
5207 nslcmop_operation_state
= "COMPLETED"
5209 detailed_status
= ""
5210 nslcmop_operation_state
= "FAILED"
5212 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5213 nsr_deployed
["VCA"],
5214 member_vnf_index
=vnf_index
,
5216 vdu_count_index
=vdu_count_index
,
5217 ee_descriptor_id
=ee_descriptor_id
,
5219 for vca_index
, vca_deployed
in enumerate(
5220 db_nsr
["_admin"]["deployed"]["VCA"]
5222 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5224 "collection": "nsrs",
5225 "filter": {"_id": nsr_id
},
5226 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
5230 nslcmop_operation_state
,
5232 ) = await self
._ns
_execute
_primitive
(
5234 primitive
=primitive_name
,
5235 primitive_params
=self
._map
_primitive
_params
(
5236 config_primitive_desc
, primitive_params
, desc_params
5238 timeout
=timeout_ns_action
,
5244 db_nslcmop_update
["detailed-status"] = detailed_status
5245 error_description_nslcmop
= (
5246 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
5250 + " task Done with result {} {}".format(
5251 nslcmop_operation_state
, detailed_status
5254 return # database update is called inside finally
5256 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5257 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5259 except asyncio
.CancelledError
:
5261 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5263 exc
= "Operation was cancelled"
5264 except asyncio
.TimeoutError
:
5265 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5267 except Exception as e
:
5268 exc
= traceback
.format_exc()
5269 self
.logger
.critical(
5270 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5279 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5280 nslcmop_operation_state
= "FAILED"
5282 self
._write
_ns
_status
(
5286 ], # TODO check if degraded. For the moment use previous status
5287 current_operation
="IDLE",
5288 current_operation_id
=None,
5289 # error_description=error_description_nsr,
5290 # error_detail=error_detail,
5291 other_update
=db_nsr_update
,
5294 self
._write
_op
_status
(
5297 error_message
=error_description_nslcmop
,
5298 operation_state
=nslcmop_operation_state
,
5299 other_update
=db_nslcmop_update
,
5302 if nslcmop_operation_state
:
5304 await self
.msg
.aiowrite(
5309 "nslcmop_id": nslcmop_id
,
5310 "operationState": nslcmop_operation_state
,
5314 except Exception as e
:
5316 logging_text
+ "kafka_write notification Exception {}".format(e
)
5318 self
.logger
.debug(logging_text
+ "Exit")
5319 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5320 return nslcmop_operation_state
, detailed_status
5322 async def scale(self
, nsr_id
, nslcmop_id
):
5323 # Try to lock HA task here
5324 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5325 if not task_is_locked_by_me
:
5328 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
5329 stage
= ["", "", ""]
5330 tasks_dict_info
= {}
5331 # ^ stage, step, VIM progress
5332 self
.logger
.debug(logging_text
+ "Enter")
5333 # get all needed from database
5335 db_nslcmop_update
= {}
5338 # in case of error, indicates what part of scale was failed to put nsr at error status
5339 scale_process
= None
5340 old_operational_status
= ""
5341 old_config_status
= ""
5344 # wait for any previous tasks in process
5345 step
= "Waiting for previous operations to terminate"
5346 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5347 self
._write
_ns
_status
(
5350 current_operation
="SCALING",
5351 current_operation_id
=nslcmop_id
,
5354 step
= "Getting nslcmop from database"
5356 step
+ " after having waited for previous tasks to be completed"
5358 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5360 step
= "Getting nsr from database"
5361 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5362 old_operational_status
= db_nsr
["operational-status"]
5363 old_config_status
= db_nsr
["config-status"]
5365 step
= "Parsing scaling parameters"
5366 db_nsr_update
["operational-status"] = "scaling"
5367 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5368 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5370 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
5372 ]["member-vnf-index"]
5373 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
5375 ]["scaling-group-descriptor"]
5376 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
5377 # for backward compatibility
5378 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5379 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5380 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5381 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5383 step
= "Getting vnfr from database"
5384 db_vnfr
= self
.db
.get_one(
5385 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
5388 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5390 step
= "Getting vnfd from database"
5391 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
5393 base_folder
= db_vnfd
["_admin"]["storage"]
5395 step
= "Getting scaling-group-descriptor"
5396 scaling_descriptor
= find_in_list(
5397 get_scaling_aspect(db_vnfd
),
5398 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
5400 if not scaling_descriptor
:
5402 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
5403 "at vnfd:scaling-group-descriptor".format(scaling_group
)
5406 step
= "Sending scale order to VIM"
5407 # TODO check if ns is in a proper status
5409 if not db_nsr
["_admin"].get("scaling-group"):
5414 "_admin.scaling-group": [
5415 {"name": scaling_group
, "nb-scale-op": 0}
5419 admin_scale_index
= 0
5421 for admin_scale_index
, admin_scale_info
in enumerate(
5422 db_nsr
["_admin"]["scaling-group"]
5424 if admin_scale_info
["name"] == scaling_group
:
5425 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
5427 else: # not found, set index one plus last element and add new entry with the name
5428 admin_scale_index
+= 1
5430 "_admin.scaling-group.{}.name".format(admin_scale_index
)
5433 vca_scaling_info
= []
5434 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
5435 if scaling_type
== "SCALE_OUT":
5436 if "aspect-delta-details" not in scaling_descriptor
:
5438 "Aspect delta details not fount in scaling descriptor {}".format(
5439 scaling_descriptor
["name"]
5442 # count if max-instance-count is reached
5443 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
5445 scaling_info
["scaling_direction"] = "OUT"
5446 scaling_info
["vdu-create"] = {}
5447 scaling_info
["kdu-create"] = {}
5448 for delta
in deltas
:
5449 for vdu_delta
in delta
.get("vdu-delta", {}):
5450 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
5451 # vdu_index also provides the number of instance of the targeted vdu
5452 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
5453 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
5457 additional_params
= (
5458 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
5461 cloud_init_list
= []
5463 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
5464 max_instance_count
= 10
5465 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
5466 max_instance_count
= vdu_profile
.get(
5467 "max-number-of-instances", 10
5470 default_instance_num
= get_number_of_instances(
5473 instances_number
= vdu_delta
.get("number-of-instances", 1)
5474 nb_scale_op
+= instances_number
5476 new_instance_count
= nb_scale_op
+ default_instance_num
5477 # Control if new count is over max and vdu count is less than max.
5478 # Then assign new instance count
5479 if new_instance_count
> max_instance_count
> vdu_count
:
5480 instances_number
= new_instance_count
- max_instance_count
5482 instances_number
= instances_number
5484 if new_instance_count
> max_instance_count
:
5486 "reached the limit of {} (max-instance-count) "
5487 "scaling-out operations for the "
5488 "scaling-group-descriptor '{}'".format(
5489 nb_scale_op
, scaling_group
5492 for x
in range(vdu_delta
.get("number-of-instances", 1)):
5494 # TODO Information of its own ip is not available because db_vnfr is not updated.
5495 additional_params
["OSM"] = get_osm_params(
5496 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
5498 cloud_init_list
.append(
5499 self
._parse
_cloud
_init
(
5506 vca_scaling_info
.append(
5508 "osm_vdu_id": vdu_delta
["id"],
5509 "member-vnf-index": vnf_index
,
5511 "vdu_index": vdu_index
+ x
,
5514 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
5515 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
5516 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
5517 kdu_name
= kdu_profile
["kdu-name"]
5518 resource_name
= kdu_profile
["resource-name"]
5520 # Might have different kdus in the same delta
5521 # Should have list for each kdu
5522 if not scaling_info
["kdu-create"].get(kdu_name
, None):
5523 scaling_info
["kdu-create"][kdu_name
] = []
5525 kdur
= get_kdur(db_vnfr
, kdu_name
)
5526 if kdur
.get("helm-chart"):
5527 k8s_cluster_type
= "helm-chart-v3"
5528 self
.logger
.debug("kdur: {}".format(kdur
))
5530 kdur
.get("helm-version")
5531 and kdur
.get("helm-version") == "v2"
5533 k8s_cluster_type
= "helm-chart"
5534 raise NotImplementedError
5535 elif kdur
.get("juju-bundle"):
5536 k8s_cluster_type
= "juju-bundle"
5539 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5540 "juju-bundle. Maybe an old NBI version is running".format(
5541 db_vnfr
["member-vnf-index-ref"], kdu_name
5545 max_instance_count
= 10
5546 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
5547 max_instance_count
= kdu_profile
.get(
5548 "max-number-of-instances", 10
5551 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
5552 deployed_kdu
, _
= get_deployed_kdu(
5553 nsr_deployed
, kdu_name
, vnf_index
5555 if deployed_kdu
is None:
5557 "KDU '{}' for vnf '{}' not deployed".format(
5561 kdu_instance
= deployed_kdu
.get("kdu-instance")
5562 instance_num
= await self
.k8scluster_map
[
5564 ].get_scale_count(resource_name
, kdu_instance
, vca_id
=vca_id
)
5565 kdu_replica_count
= instance_num
+ kdu_delta
.get(
5566 "number-of-instances", 1
5569 # Control if new count is over max and instance_num is less than max.
5570 # Then assign max instance number to kdu replica count
5571 if kdu_replica_count
> max_instance_count
> instance_num
:
5572 kdu_replica_count
= max_instance_count
5573 if kdu_replica_count
> max_instance_count
:
5575 "reached the limit of {} (max-instance-count) "
5576 "scaling-out operations for the "
5577 "scaling-group-descriptor '{}'".format(
5578 instance_num
, scaling_group
5582 for x
in range(kdu_delta
.get("number-of-instances", 1)):
5583 vca_scaling_info
.append(
5585 "osm_kdu_id": kdu_name
,
5586 "member-vnf-index": vnf_index
,
5588 "kdu_index": instance_num
+ x
- 1,
5591 scaling_info
["kdu-create"][kdu_name
].append(
5593 "member-vnf-index": vnf_index
,
5595 "k8s-cluster-type": k8s_cluster_type
,
5596 "resource-name": resource_name
,
5597 "scale": kdu_replica_count
,
5600 elif scaling_type
== "SCALE_IN":
5601 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
5603 scaling_info
["scaling_direction"] = "IN"
5604 scaling_info
["vdu-delete"] = {}
5605 scaling_info
["kdu-delete"] = {}
5607 for delta
in deltas
:
5608 for vdu_delta
in delta
.get("vdu-delta", {}):
5609 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
5610 min_instance_count
= 0
5611 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
5612 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
5613 min_instance_count
= vdu_profile
["min-number-of-instances"]
5615 default_instance_num
= get_number_of_instances(
5616 db_vnfd
, vdu_delta
["id"]
5618 instance_num
= vdu_delta
.get("number-of-instances", 1)
5619 nb_scale_op
-= instance_num
5621 new_instance_count
= nb_scale_op
+ default_instance_num
5623 if new_instance_count
< min_instance_count
< vdu_count
:
5624 instances_number
= min_instance_count
- new_instance_count
5626 instances_number
= instance_num
5628 if new_instance_count
< min_instance_count
:
5630 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5631 "scaling-group-descriptor '{}'".format(
5632 nb_scale_op
, scaling_group
5635 for x
in range(vdu_delta
.get("number-of-instances", 1)):
5636 vca_scaling_info
.append(
5638 "osm_vdu_id": vdu_delta
["id"],
5639 "member-vnf-index": vnf_index
,
5641 "vdu_index": vdu_index
- 1 - x
,
5644 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
5645 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
5646 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
5647 kdu_name
= kdu_profile
["kdu-name"]
5648 resource_name
= kdu_profile
["resource-name"]
5650 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
5651 scaling_info
["kdu-delete"][kdu_name
] = []
5653 kdur
= get_kdur(db_vnfr
, kdu_name
)
5654 if kdur
.get("helm-chart"):
5655 k8s_cluster_type
= "helm-chart-v3"
5656 self
.logger
.debug("kdur: {}".format(kdur
))
5658 kdur
.get("helm-version")
5659 and kdur
.get("helm-version") == "v2"
5661 k8s_cluster_type
= "helm-chart"
5662 raise NotImplementedError
5663 elif kdur
.get("juju-bundle"):
5664 k8s_cluster_type
= "juju-bundle"
5667 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5668 "juju-bundle. Maybe an old NBI version is running".format(
5669 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
5673 min_instance_count
= 0
5674 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
5675 min_instance_count
= kdu_profile
["min-number-of-instances"]
5677 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
5678 deployed_kdu
, _
= get_deployed_kdu(
5679 nsr_deployed
, kdu_name
, vnf_index
5681 if deployed_kdu
is None:
5683 "KDU '{}' for vnf '{}' not deployed".format(
5687 kdu_instance
= deployed_kdu
.get("kdu-instance")
5688 instance_num
= await self
.k8scluster_map
[
5690 ].get_scale_count(resource_name
, kdu_instance
, vca_id
=vca_id
)
5691 kdu_replica_count
= instance_num
- kdu_delta
.get(
5692 "number-of-instances", 1
5695 if kdu_replica_count
< min_instance_count
< instance_num
:
5696 kdu_replica_count
= min_instance_count
5697 if kdu_replica_count
< min_instance_count
:
5699 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5700 "scaling-group-descriptor '{}'".format(
5701 instance_num
, scaling_group
5705 for x
in range(kdu_delta
.get("number-of-instances", 1)):
5706 vca_scaling_info
.append(
5708 "osm_kdu_id": kdu_name
,
5709 "member-vnf-index": vnf_index
,
5711 "kdu_index": instance_num
- x
- 1,
5714 scaling_info
["kdu-delete"][kdu_name
].append(
5716 "member-vnf-index": vnf_index
,
5718 "k8s-cluster-type": k8s_cluster_type
,
5719 "resource-name": resource_name
,
5720 "scale": kdu_replica_count
,
5724 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
5725 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
5726 if scaling_info
["scaling_direction"] == "IN":
5727 for vdur
in reversed(db_vnfr
["vdur"]):
5728 if vdu_delete
.get(vdur
["vdu-id-ref"]):
5729 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
5730 scaling_info
["vdu"].append(
5732 "name": vdur
.get("name") or vdur
.get("vdu-name"),
5733 "vdu_id": vdur
["vdu-id-ref"],
5737 for interface
in vdur
["interfaces"]:
5738 scaling_info
["vdu"][-1]["interface"].append(
5740 "name": interface
["name"],
5741 "ip_address": interface
["ip-address"],
5742 "mac_address": interface
.get("mac-address"),
5745 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
5748 step
= "Executing pre-scale vnf-config-primitive"
5749 if scaling_descriptor
.get("scaling-config-action"):
5750 for scaling_config_action
in scaling_descriptor
[
5751 "scaling-config-action"
5754 scaling_config_action
.get("trigger") == "pre-scale-in"
5755 and scaling_type
== "SCALE_IN"
5757 scaling_config_action
.get("trigger") == "pre-scale-out"
5758 and scaling_type
== "SCALE_OUT"
5760 vnf_config_primitive
= scaling_config_action
[
5761 "vnf-config-primitive-name-ref"
5763 step
= db_nslcmop_update
[
5765 ] = "executing pre-scale scaling-config-action '{}'".format(
5766 vnf_config_primitive
5769 # look for primitive
5770 for config_primitive
in (
5771 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
5772 ).get("config-primitive", ()):
5773 if config_primitive
["name"] == vnf_config_primitive
:
5777 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
5778 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
5779 "primitive".format(scaling_group
, vnf_config_primitive
)
5782 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
5783 if db_vnfr
.get("additionalParamsForVnf"):
5784 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
5786 scale_process
= "VCA"
5787 db_nsr_update
["config-status"] = "configuring pre-scaling"
5788 primitive_params
= self
._map
_primitive
_params
(
5789 config_primitive
, {}, vnfr_params
5792 # Pre-scale retry check: Check if this sub-operation has been executed before
5793 op_index
= self
._check
_or
_add
_scale
_suboperation
(
5796 vnf_config_primitive
,
5800 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
5801 # Skip sub-operation
5802 result
= "COMPLETED"
5803 result_detail
= "Done"
5806 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5807 vnf_config_primitive
, result
, result_detail
5811 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
5812 # New sub-operation: Get index of this sub-operation
5814 len(db_nslcmop
.get("_admin", {}).get("operations"))
5819 + "vnf_config_primitive={} New sub-operation".format(
5820 vnf_config_primitive
5824 # retry: Get registered params for this existing sub-operation
5825 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
5828 vnf_index
= op
.get("member_vnf_index")
5829 vnf_config_primitive
= op
.get("primitive")
5830 primitive_params
= op
.get("primitive_params")
5833 + "vnf_config_primitive={} Sub-operation retry".format(
5834 vnf_config_primitive
5837 # Execute the primitive, either with new (first-time) or registered (reintent) args
5838 ee_descriptor_id
= config_primitive
.get(
5839 "execution-environment-ref"
5841 primitive_name
= config_primitive
.get(
5842 "execution-environment-primitive", vnf_config_primitive
5844 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5845 nsr_deployed
["VCA"],
5846 member_vnf_index
=vnf_index
,
5848 vdu_count_index
=None,
5849 ee_descriptor_id
=ee_descriptor_id
,
5851 result
, result_detail
= await self
._ns
_execute
_primitive
(
5860 + "vnf_config_primitive={} Done with result {} {}".format(
5861 vnf_config_primitive
, result
, result_detail
5864 # Update operationState = COMPLETED | FAILED
5865 self
._update
_suboperation
_status
(
5866 db_nslcmop
, op_index
, result
, result_detail
5869 if result
== "FAILED":
5870 raise LcmException(result_detail
)
5871 db_nsr_update
["config-status"] = old_config_status
5872 scale_process
= None
5876 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
5879 "_admin.scaling-group.{}.time".format(admin_scale_index
)
5882 # SCALE-IN VCA - BEGIN
5883 if vca_scaling_info
:
5884 step
= db_nslcmop_update
[
5886 ] = "Deleting the execution environments"
5887 scale_process
= "VCA"
5888 for vca_info
in vca_scaling_info
:
5889 if vca_info
["type"] == "delete":
5890 member_vnf_index
= str(vca_info
["member-vnf-index"])
5892 logging_text
+ "vdu info: {}".format(vca_info
)
5894 if vca_info
.get("osm_vdu_id"):
5895 vdu_id
= vca_info
["osm_vdu_id"]
5896 vdu_index
= int(vca_info
["vdu_index"])
5899 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5900 member_vnf_index
, vdu_id
, vdu_index
5904 kdu_id
= vca_info
["osm_kdu_id"]
5907 ] = "Scaling member_vnf_index={}, kdu_id={}, vdu_index={} ".format(
5908 member_vnf_index
, kdu_id
, vdu_index
5910 stage
[2] = step
= "Scaling in VCA"
5911 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
5912 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
5913 config_update
= db_nsr
["configurationStatus"]
5914 for vca_index
, vca
in enumerate(vca_update
):
5916 (vca
or vca
.get("ee_id"))
5917 and vca
["member-vnf-index"] == member_vnf_index
5918 and vca
["vdu_count_index"] == vdu_index
5920 if vca
.get("vdu_id"):
5921 config_descriptor
= get_configuration(
5922 db_vnfd
, vca
.get("vdu_id")
5924 elif vca
.get("kdu_name"):
5925 config_descriptor
= get_configuration(
5926 db_vnfd
, vca
.get("kdu_name")
5929 config_descriptor
= get_configuration(
5930 db_vnfd
, db_vnfd
["id"]
5932 operation_params
= (
5933 db_nslcmop
.get("operationParams") or {}
5935 exec_terminate_primitives
= not operation_params
.get(
5936 "skip_terminate_primitives"
5937 ) and vca
.get("needed_terminate")
5938 task
= asyncio
.ensure_future(
5947 exec_primitives
=exec_terminate_primitives
,
5951 timeout
=self
.timeout_charm_delete
,
5954 tasks_dict_info
[task
] = "Terminating VCA {}".format(
5957 del vca_update
[vca_index
]
5958 del config_update
[vca_index
]
5959 # wait for pending tasks of terminate primitives
5963 + "Waiting for tasks {}".format(
5964 list(tasks_dict_info
.keys())
5967 error_list
= await self
._wait
_for
_tasks
(
5971 self
.timeout_charm_delete
, self
.timeout_ns_terminate
5976 tasks_dict_info
.clear()
5978 raise LcmException("; ".join(error_list
))
5980 db_vca_and_config_update
= {
5981 "_admin.deployed.VCA": vca_update
,
5982 "configurationStatus": config_update
,
5985 "nsrs", db_nsr
["_id"], db_vca_and_config_update
5987 scale_process
= None
5988 # SCALE-IN VCA - END
5991 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
5992 scale_process
= "RO"
5993 if self
.ro_config
.get("ng"):
5994 await self
._scale
_ng
_ro
(
5995 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
5997 scaling_info
.pop("vdu-create", None)
5998 scaling_info
.pop("vdu-delete", None)
6000 scale_process
= None
6004 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
6005 scale_process
= "KDU"
6006 await self
._scale
_kdu
(
6007 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6009 scaling_info
.pop("kdu-create", None)
6010 scaling_info
.pop("kdu-delete", None)
6012 scale_process
= None
6016 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6018 # SCALE-UP VCA - BEGIN
6019 if vca_scaling_info
:
6020 step
= db_nslcmop_update
[
6022 ] = "Creating new execution environments"
6023 scale_process
= "VCA"
6024 for vca_info
in vca_scaling_info
:
6025 if vca_info
["type"] == "create":
6026 member_vnf_index
= str(vca_info
["member-vnf-index"])
6028 logging_text
+ "vdu info: {}".format(vca_info
)
6030 vnfd_id
= db_vnfr
["vnfd-ref"]
6031 if vca_info
.get("osm_vdu_id"):
6032 vdu_index
= int(vca_info
["vdu_index"])
6033 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
6034 if db_vnfr
.get("additionalParamsForVnf"):
6035 deploy_params
.update(
6037 db_vnfr
["additionalParamsForVnf"].copy()
6040 descriptor_config
= get_configuration(
6041 db_vnfd
, db_vnfd
["id"]
6043 if descriptor_config
:
6048 logging_text
=logging_text
6049 + "member_vnf_index={} ".format(member_vnf_index
),
6052 nslcmop_id
=nslcmop_id
,
6058 member_vnf_index
=member_vnf_index
,
6059 vdu_index
=vdu_index
,
6061 deploy_params
=deploy_params
,
6062 descriptor_config
=descriptor_config
,
6063 base_folder
=base_folder
,
6064 task_instantiation_info
=tasks_dict_info
,
6067 vdu_id
= vca_info
["osm_vdu_id"]
6068 vdur
= find_in_list(
6069 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
6071 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
6072 if vdur
.get("additionalParams"):
6073 deploy_params_vdu
= parse_yaml_strings(
6074 vdur
["additionalParams"]
6077 deploy_params_vdu
= deploy_params
6078 deploy_params_vdu
["OSM"] = get_osm_params(
6079 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
6081 if descriptor_config
:
6086 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6087 member_vnf_index
, vdu_id
, vdu_index
6089 stage
[2] = step
= "Scaling out VCA"
6090 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6092 logging_text
=logging_text
6093 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6094 member_vnf_index
, vdu_id
, vdu_index
6098 nslcmop_id
=nslcmop_id
,
6104 member_vnf_index
=member_vnf_index
,
6105 vdu_index
=vdu_index
,
6107 deploy_params
=deploy_params_vdu
,
6108 descriptor_config
=descriptor_config
,
6109 base_folder
=base_folder
,
6110 task_instantiation_info
=tasks_dict_info
,
6114 kdu_name
= vca_info
["osm_kdu_id"]
6115 descriptor_config
= get_configuration(db_vnfd
, kdu_name
)
6116 if descriptor_config
:
6118 kdu_index
= int(vca_info
["kdu_index"])
6122 for x
in db_vnfr
["kdur"]
6123 if x
["kdu-name"] == kdu_name
6125 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
6126 if kdur
.get("additionalParams"):
6127 deploy_params_kdu
= parse_yaml_strings(
6128 kdur
["additionalParams"]
6132 logging_text
=logging_text
,
6135 nslcmop_id
=nslcmop_id
,
6141 member_vnf_index
=member_vnf_index
,
6142 vdu_index
=kdu_index
,
6144 deploy_params
=deploy_params_kdu
,
6145 descriptor_config
=descriptor_config
,
6146 base_folder
=base_folder
,
6147 task_instantiation_info
=tasks_dict_info
,
6150 # SCALE-UP VCA - END
6151 scale_process
= None
6154 # execute primitive service POST-SCALING
6155 step
= "Executing post-scale vnf-config-primitive"
6156 if scaling_descriptor
.get("scaling-config-action"):
6157 for scaling_config_action
in scaling_descriptor
[
6158 "scaling-config-action"
6161 scaling_config_action
.get("trigger") == "post-scale-in"
6162 and scaling_type
== "SCALE_IN"
6164 scaling_config_action
.get("trigger") == "post-scale-out"
6165 and scaling_type
== "SCALE_OUT"
6167 vnf_config_primitive
= scaling_config_action
[
6168 "vnf-config-primitive-name-ref"
6170 step
= db_nslcmop_update
[
6172 ] = "executing post-scale scaling-config-action '{}'".format(
6173 vnf_config_primitive
6176 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6177 if db_vnfr
.get("additionalParamsForVnf"):
6178 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6180 # look for primitive
6181 for config_primitive
in (
6182 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6183 ).get("config-primitive", ()):
6184 if config_primitive
["name"] == vnf_config_primitive
:
6188 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
6189 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
6190 "config-primitive".format(
6191 scaling_group
, vnf_config_primitive
6194 scale_process
= "VCA"
6195 db_nsr_update
["config-status"] = "configuring post-scaling"
6196 primitive_params
= self
._map
_primitive
_params
(
6197 config_primitive
, {}, vnfr_params
6200 # Post-scale retry check: Check if this sub-operation has been executed before
6201 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6204 vnf_config_primitive
,
6208 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6209 # Skip sub-operation
6210 result
= "COMPLETED"
6211 result_detail
= "Done"
6214 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6215 vnf_config_primitive
, result
, result_detail
6219 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6220 # New sub-operation: Get index of this sub-operation
6222 len(db_nslcmop
.get("_admin", {}).get("operations"))
6227 + "vnf_config_primitive={} New sub-operation".format(
6228 vnf_config_primitive
6232 # retry: Get registered params for this existing sub-operation
6233 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6236 vnf_index
= op
.get("member_vnf_index")
6237 vnf_config_primitive
= op
.get("primitive")
6238 primitive_params
= op
.get("primitive_params")
6241 + "vnf_config_primitive={} Sub-operation retry".format(
6242 vnf_config_primitive
6245 # Execute the primitive, either with new (first-time) or registered (reintent) args
6246 ee_descriptor_id
= config_primitive
.get(
6247 "execution-environment-ref"
6249 primitive_name
= config_primitive
.get(
6250 "execution-environment-primitive", vnf_config_primitive
6252 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6253 nsr_deployed
["VCA"],
6254 member_vnf_index
=vnf_index
,
6256 vdu_count_index
=None,
6257 ee_descriptor_id
=ee_descriptor_id
,
6259 result
, result_detail
= await self
._ns
_execute
_primitive
(
6268 + "vnf_config_primitive={} Done with result {} {}".format(
6269 vnf_config_primitive
, result
, result_detail
6272 # Update operationState = COMPLETED | FAILED
6273 self
._update
_suboperation
_status
(
6274 db_nslcmop
, op_index
, result
, result_detail
6277 if result
== "FAILED":
6278 raise LcmException(result_detail
)
6279 db_nsr_update
["config-status"] = old_config_status
6280 scale_process
= None
6285 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
6286 db_nsr_update
["operational-status"] = (
6288 if old_operational_status
== "failed"
6289 else old_operational_status
6291 db_nsr_update
["config-status"] = old_config_status
6294 ROclient
.ROClientException
,
6299 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
6301 except asyncio
.CancelledError
:
6303 logging_text
+ "Cancelled Exception while '{}'".format(step
)
6305 exc
= "Operation was cancelled"
6306 except Exception as e
:
6307 exc
= traceback
.format_exc()
6308 self
.logger
.critical(
6309 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6313 self
._write
_ns
_status
(
6316 current_operation
="IDLE",
6317 current_operation_id
=None,
6320 stage
[1] = "Waiting for instantiate pending tasks."
6321 self
.logger
.debug(logging_text
+ stage
[1])
6322 exc
= await self
._wait
_for
_tasks
(
6325 self
.timeout_ns_deploy
,
6333 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
6334 nslcmop_operation_state
= "FAILED"
6336 db_nsr_update
["operational-status"] = old_operational_status
6337 db_nsr_update
["config-status"] = old_config_status
6338 db_nsr_update
["detailed-status"] = ""
6340 if "VCA" in scale_process
:
6341 db_nsr_update
["config-status"] = "failed"
6342 if "RO" in scale_process
:
6343 db_nsr_update
["operational-status"] = "failed"
6346 ] = "FAILED scaling nslcmop={} {}: {}".format(
6347 nslcmop_id
, step
, exc
6350 error_description_nslcmop
= None
6351 nslcmop_operation_state
= "COMPLETED"
6352 db_nslcmop_update
["detailed-status"] = "Done"
6354 self
._write
_op
_status
(
6357 error_message
=error_description_nslcmop
,
6358 operation_state
=nslcmop_operation_state
,
6359 other_update
=db_nslcmop_update
,
6362 self
._write
_ns
_status
(
6365 current_operation
="IDLE",
6366 current_operation_id
=None,
6367 other_update
=db_nsr_update
,
6370 if nslcmop_operation_state
:
6374 "nslcmop_id": nslcmop_id
,
6375 "operationState": nslcmop_operation_state
,
6377 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
6378 except Exception as e
:
6380 logging_text
+ "kafka_write notification Exception {}".format(e
)
6382 self
.logger
.debug(logging_text
+ "Exit")
6383 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
6385 async def _scale_kdu(
6386 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6388 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
6389 for kdu_name
in _scaling_info
:
6390 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
6391 deployed_kdu
, index
= get_deployed_kdu(
6392 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
6394 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
6395 kdu_instance
= deployed_kdu
["kdu-instance"]
6396 scale
= int(kdu_scaling_info
["scale"])
6397 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
6400 "collection": "nsrs",
6401 "filter": {"_id": nsr_id
},
6402 "path": "_admin.deployed.K8s.{}".format(index
),
6405 step
= "scaling application {}".format(
6406 kdu_scaling_info
["resource-name"]
6408 self
.logger
.debug(logging_text
+ step
)
6410 if kdu_scaling_info
["type"] == "delete":
6411 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
6414 and kdu_config
.get("terminate-config-primitive")
6415 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
6417 terminate_config_primitive_list
= kdu_config
.get(
6418 "terminate-config-primitive"
6420 terminate_config_primitive_list
.sort(
6421 key
=lambda val
: int(val
["seq"])
6425 terminate_config_primitive
6426 ) in terminate_config_primitive_list
:
6427 primitive_params_
= self
._map
_primitive
_params
(
6428 terminate_config_primitive
, {}, {}
6430 step
= "execute terminate config primitive"
6431 self
.logger
.debug(logging_text
+ step
)
6432 await asyncio
.wait_for(
6433 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
6434 cluster_uuid
=cluster_uuid
,
6435 kdu_instance
=kdu_instance
,
6436 primitive_name
=terminate_config_primitive
["name"],
6437 params
=primitive_params_
,
6444 await asyncio
.wait_for(
6445 self
.k8scluster_map
[k8s_cluster_type
].scale(
6448 kdu_scaling_info
["resource-name"],
6451 timeout
=self
.timeout_vca_on_error
,
6454 if kdu_scaling_info
["type"] == "create":
6455 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
6458 and kdu_config
.get("initial-config-primitive")
6459 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
6461 initial_config_primitive_list
= kdu_config
.get(
6462 "initial-config-primitive"
6464 initial_config_primitive_list
.sort(
6465 key
=lambda val
: int(val
["seq"])
6468 for initial_config_primitive
in initial_config_primitive_list
:
6469 primitive_params_
= self
._map
_primitive
_params
(
6470 initial_config_primitive
, {}, {}
6472 step
= "execute initial config primitive"
6473 self
.logger
.debug(logging_text
+ step
)
6474 await asyncio
.wait_for(
6475 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
6476 cluster_uuid
=cluster_uuid
,
6477 kdu_instance
=kdu_instance
,
6478 primitive_name
=initial_config_primitive
["name"],
6479 params
=primitive_params_
,
6486 async def _scale_ng_ro(
6487 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
6489 nsr_id
= db_nslcmop
["nsInstanceId"]
6490 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
6493 # read from db: vnfd's for every vnf
6496 # for each vnf in ns, read vnfd
6497 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
6498 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
6499 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
6500 # if we haven't this vnfd, read it from db
6501 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
6503 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
6504 db_vnfds
.append(vnfd
)
6505 n2vc_key
= self
.n2vc
.get_public_key()
6506 n2vc_key_list
= [n2vc_key
]
6509 vdu_scaling_info
.get("vdu-create"),
6510 vdu_scaling_info
.get("vdu-delete"),
6513 # db_vnfr has been updated, update db_vnfrs to use it
6514 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
6515 await self
._instantiate
_ng
_ro
(
6525 start_deploy
=time(),
6526 timeout_ns_deploy
=self
.timeout_ns_deploy
,
6528 if vdu_scaling_info
.get("vdu-delete"):
6530 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
6533 async def extract_prometheus_scrape_jobs(
6534 self
, ee_id
, artifact_path
, ee_config_descriptor
, vnfr_id
, nsr_id
, target_ip
6536 # look if exist a file called 'prometheus*.j2' and
6537 artifact_content
= self
.fs
.dir_ls(artifact_path
)
6541 for f
in artifact_content
6542 if f
.startswith("prometheus") and f
.endswith(".j2")
6548 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
6552 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
6553 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
6555 vnfr_id
= vnfr_id
.replace("-", "")
6557 "JOB_NAME": vnfr_id
,
6558 "TARGET_IP": target_ip
,
6559 "EXPORTER_POD_IP": host_name
,
6560 "EXPORTER_POD_PORT": host_port
,
6562 job_list
= parse_job(job_data
, variables
)
6563 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
6564 for job
in job_list
:
6566 not isinstance(job
.get("job_name"), str)
6567 or vnfr_id
not in job
["job_name"]
6569 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
6570 job
["nsr_id"] = nsr_id
6571 job
["vnfr_id"] = vnfr_id
6574 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
6576 Get VCA Cloud and VCA Cloud Credentials for the VIM account
6578 :param: vim_account_id: VIM Account ID
6580 :return: (cloud_name, cloud_credential)
6582 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
6583 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
6585 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
6587 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
6589 :param: vim_account_id: VIM Account ID
6591 :return: (cloud_name, cloud_credential)
6593 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
6594 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")