1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
20 from typing
import Any
, Dict
, List
23 import logging
.handlers
34 from osm_lcm
import ROclient
35 from osm_lcm
.data_utils
.nsr
import (
38 get_deployed_vca_list
,
41 from osm_lcm
.data_utils
.vca
import (
50 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
51 from osm_lcm
.lcm_utils
import (
59 from osm_lcm
.data_utils
.nsd
import (
60 get_ns_configuration_relation_list
,
64 from osm_lcm
.data_utils
.vnfd
import (
68 get_ee_sorted_initial_config_primitive_list
,
69 get_ee_sorted_terminate_config_primitive_list
,
71 get_virtual_link_profiles
,
76 get_number_of_instances
,
78 get_kdu_resource_profile
,
80 from osm_lcm
.data_utils
.list_utils
import find_in_list
81 from osm_lcm
.data_utils
.vnfr
import get_osm_params
, get_vdur_index
, get_kdur
82 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
83 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
84 from n2vc
.definitions
import RelationEndpoint
85 from n2vc
.k8s_helm_conn
import K8sHelmConnector
86 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
87 from n2vc
.k8s_juju_conn
import K8sJujuConnector
89 from osm_common
.dbbase
import DbException
90 from osm_common
.fsbase
import FsException
92 from osm_lcm
.data_utils
.database
.database
import Database
93 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
95 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
96 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
98 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
99 from osm_lcm
.prometheus
import parse_job
101 from copy
import copy
, deepcopy
102 from time
import time
103 from uuid
import uuid4
105 from random
import randint
107 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
110 class NsLcm(LcmBase
):
111 timeout_vca_on_error
= (
113 ) # Time for charm from first time at blocked,error status to mark as failed
114 timeout_ns_deploy
= 2 * 3600 # default global timeout for deployment a ns
115 timeout_ns_terminate
= 1800 # default global timeout for un deployment a ns
116 timeout_charm_delete
= 10 * 60
117 timeout_primitive
= 30 * 60 # timeout for primitive execution
118 timeout_progress_primitive
= (
120 ) # timeout for some progress in a primitive execution
122 SUBOPERATION_STATUS_NOT_FOUND
= -1
123 SUBOPERATION_STATUS_NEW
= -2
124 SUBOPERATION_STATUS_SKIP
= -3
125 task_name_deploy_vca
= "Deploying VCA"
127 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
129 Init, Connect to database, filesystem storage, and messaging
130 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
133 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
135 self
.db
= Database().instance
.db
136 self
.fs
= Filesystem().instance
.fs
138 self
.lcm_tasks
= lcm_tasks
139 self
.timeout
= config
["timeout"]
140 self
.ro_config
= config
["ro_config"]
141 self
.ng_ro
= config
["ro_config"].get("ng")
142 self
.vca_config
= config
["VCA"].copy()
144 # create N2VC connector
145 self
.n2vc
= N2VCJujuConnector(
148 on_update_db
=self
._on
_update
_n
2vc
_db
,
153 self
.conn_helm_ee
= LCMHelmConn(
156 vca_config
=self
.vca_config
,
157 on_update_db
=self
._on
_update
_n
2vc
_db
,
160 self
.k8sclusterhelm2
= K8sHelmConnector(
161 kubectl_command
=self
.vca_config
.get("kubectlpath"),
162 helm_command
=self
.vca_config
.get("helmpath"),
169 self
.k8sclusterhelm3
= K8sHelm3Connector(
170 kubectl_command
=self
.vca_config
.get("kubectlpath"),
171 helm_command
=self
.vca_config
.get("helm3path"),
178 self
.k8sclusterjuju
= K8sJujuConnector(
179 kubectl_command
=self
.vca_config
.get("kubectlpath"),
180 juju_command
=self
.vca_config
.get("jujupath"),
183 on_update_db
=self
._on
_update
_k
8s
_db
,
188 self
.k8scluster_map
= {
189 "helm-chart": self
.k8sclusterhelm2
,
190 "helm-chart-v3": self
.k8sclusterhelm3
,
191 "chart": self
.k8sclusterhelm3
,
192 "juju-bundle": self
.k8sclusterjuju
,
193 "juju": self
.k8sclusterjuju
,
197 "lxc_proxy_charm": self
.n2vc
,
198 "native_charm": self
.n2vc
,
199 "k8s_proxy_charm": self
.n2vc
,
200 "helm": self
.conn_helm_ee
,
201 "helm-v3": self
.conn_helm_ee
,
205 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
)
208 def increment_ip_mac(ip_mac
, vm_index
=1):
209 if not isinstance(ip_mac
, str):
212 # try with ipv4 look for last dot
213 i
= ip_mac
.rfind(".")
216 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
217 # try with ipv6 or mac look for last colon. Operate in hex
218 i
= ip_mac
.rfind(":")
221 # format in hex, len can be 2 for mac or 4 for ipv6
222 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
223 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
229 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
231 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
234 # TODO filter RO descriptor fields...
238 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
239 db_dict
["deploymentStatus"] = ro_descriptor
240 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
242 except Exception as e
:
244 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id
, e
)
247 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
249 # remove last dot from path (if exists)
250 if path
.endswith("."):
253 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
254 # .format(table, filter, path, updated_data))
257 nsr_id
= filter.get("_id")
259 # read ns record from database
260 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
261 current_ns_status
= nsr
.get("nsState")
263 # get vca status for NS
264 status_dict
= await self
.n2vc
.get_status(
265 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
270 db_dict
["vcaStatus"] = status_dict
271 await self
.n2vc
.update_vca_status(db_dict
["vcaStatus"], vca_id
=vca_id
)
273 # update configurationStatus for this VCA
275 vca_index
= int(path
[path
.rfind(".") + 1 :])
278 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
280 vca_status
= vca_list
[vca_index
].get("status")
282 configuration_status_list
= nsr
.get("configurationStatus")
283 config_status
= configuration_status_list
[vca_index
].get("status")
285 if config_status
== "BROKEN" and vca_status
!= "failed":
286 db_dict
["configurationStatus"][vca_index
] = "READY"
287 elif config_status
!= "BROKEN" and vca_status
== "failed":
288 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
289 except Exception as e
:
290 # not update configurationStatus
291 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
293 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
294 # if nsState = 'DEGRADED' check if all is OK
296 if current_ns_status
in ("READY", "DEGRADED"):
297 error_description
= ""
299 if status_dict
.get("machines"):
300 for machine_id
in status_dict
.get("machines"):
301 machine
= status_dict
.get("machines").get(machine_id
)
302 # check machine agent-status
303 if machine
.get("agent-status"):
304 s
= machine
.get("agent-status").get("status")
307 error_description
+= (
308 "machine {} agent-status={} ; ".format(
312 # check machine instance status
313 if machine
.get("instance-status"):
314 s
= machine
.get("instance-status").get("status")
317 error_description
+= (
318 "machine {} instance-status={} ; ".format(
323 if status_dict
.get("applications"):
324 for app_id
in status_dict
.get("applications"):
325 app
= status_dict
.get("applications").get(app_id
)
326 # check application status
327 if app
.get("status"):
328 s
= app
.get("status").get("status")
331 error_description
+= (
332 "application {} status={} ; ".format(app_id
, s
)
335 if error_description
:
336 db_dict
["errorDescription"] = error_description
337 if current_ns_status
== "READY" and is_degraded
:
338 db_dict
["nsState"] = "DEGRADED"
339 if current_ns_status
== "DEGRADED" and not is_degraded
:
340 db_dict
["nsState"] = "READY"
343 self
.update_db_2("nsrs", nsr_id
, db_dict
)
345 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
347 except Exception as e
:
348 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
350 async def _on_update_k8s_db(
351 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None
354 Updating vca status in NSR record
355 :param cluster_uuid: UUID of a k8s cluster
356 :param kdu_instance: The unique name of the KDU instance
357 :param filter: To get nsr_id
361 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
362 # .format(cluster_uuid, kdu_instance, filter))
365 nsr_id
= filter.get("_id")
367 # get vca status for NS
368 vca_status
= await self
.k8sclusterjuju
.status_kdu(
371 complete_status
=True,
377 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
379 await self
.k8sclusterjuju
.update_vca_status(
380 db_dict
["vcaStatus"],
386 self
.update_db_2("nsrs", nsr_id
, db_dict
)
388 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
390 except Exception as e
:
391 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
394 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
396 env
= Environment(undefined
=StrictUndefined
)
397 template
= env
.from_string(cloud_init_text
)
398 return template
.render(additional_params
or {})
399 except UndefinedError
as e
:
401 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
402 "file, must be provided in the instantiation parameters inside the "
403 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
405 except (TemplateError
, TemplateNotFound
) as e
:
407 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
412 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
413 cloud_init_content
= cloud_init_file
= None
415 if vdu
.get("cloud-init-file"):
416 base_folder
= vnfd
["_admin"]["storage"]
417 if base_folder
["pkg-dir"]:
418 cloud_init_file
= "{}/{}/cloud_init/{}".format(
419 base_folder
["folder"],
420 base_folder
["pkg-dir"],
421 vdu
["cloud-init-file"],
424 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
425 base_folder
["folder"],
426 vdu
["cloud-init-file"],
428 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
429 cloud_init_content
= ci_file
.read()
430 elif vdu
.get("cloud-init"):
431 cloud_init_content
= vdu
["cloud-init"]
433 return cloud_init_content
434 except FsException
as e
:
436 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
437 vnfd
["id"], vdu
["id"], cloud_init_file
, e
441 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
443 (vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]),
446 additional_params
= vdur
.get("additionalParams")
447 return parse_yaml_strings(additional_params
)
449 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
451 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
452 :param vnfd: input vnfd
453 :param new_id: overrides vnf id if provided
454 :param additionalParams: Instantiation params for VNFs provided
455 :param nsrId: Id of the NSR
456 :return: copy of vnfd
458 vnfd_RO
= deepcopy(vnfd
)
459 # remove unused by RO configuration, monitoring, scaling and internal keys
460 vnfd_RO
.pop("_id", None)
461 vnfd_RO
.pop("_admin", None)
462 vnfd_RO
.pop("monitoring-param", None)
463 vnfd_RO
.pop("scaling-group-descriptor", None)
464 vnfd_RO
.pop("kdu", None)
465 vnfd_RO
.pop("k8s-cluster", None)
467 vnfd_RO
["id"] = new_id
469 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
470 for vdu
in get_iterable(vnfd_RO
, "vdu"):
471 vdu
.pop("cloud-init-file", None)
472 vdu
.pop("cloud-init", None)
476 def ip_profile_2_RO(ip_profile
):
477 RO_ip_profile
= deepcopy(ip_profile
)
478 if "dns-server" in RO_ip_profile
:
479 if isinstance(RO_ip_profile
["dns-server"], list):
480 RO_ip_profile
["dns-address"] = []
481 for ds
in RO_ip_profile
.pop("dns-server"):
482 RO_ip_profile
["dns-address"].append(ds
["address"])
484 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
485 if RO_ip_profile
.get("ip-version") == "ipv4":
486 RO_ip_profile
["ip-version"] = "IPv4"
487 if RO_ip_profile
.get("ip-version") == "ipv6":
488 RO_ip_profile
["ip-version"] = "IPv6"
489 if "dhcp-params" in RO_ip_profile
:
490 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
493 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
494 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
495 if db_vim
["_admin"]["operationalState"] != "ENABLED":
497 "VIM={} is not available. operationalState={}".format(
498 vim_account
, db_vim
["_admin"]["operationalState"]
501 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
504 def get_ro_wim_id_for_wim_account(self
, wim_account
):
505 if isinstance(wim_account
, str):
506 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
507 if db_wim
["_admin"]["operationalState"] != "ENABLED":
509 "WIM={} is not available. operationalState={}".format(
510 wim_account
, db_wim
["_admin"]["operationalState"]
513 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
518 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
520 db_vdu_push_list
= []
522 db_update
= {"_admin.modified": time()}
524 for vdu_id
, vdu_count
in vdu_create
.items():
528 for vdur
in reversed(db_vnfr
["vdur"])
529 if vdur
["vdu-id-ref"] == vdu_id
534 # Read the template saved in the db:
535 self
.logger
.debug(f
"No vdur in the database. Using the vdur-template to scale")
536 vdur_template
= db_vnfr
.get("vdur-template")
537 if not vdur_template
:
539 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
543 vdur
= vdur_template
[0]
544 #Delete a template from the database after using it
545 self
.db
.set_one("vnfrs",
546 {"_id": db_vnfr
["_id"]},
548 pull
={"vdur-template": {"_id": vdur
['_id']}}
550 for count
in range(vdu_count
):
551 vdur_copy
= deepcopy(vdur
)
552 vdur_copy
["status"] = "BUILD"
553 vdur_copy
["status-detailed"] = None
554 vdur_copy
["ip-address"] = None
555 vdur_copy
["_id"] = str(uuid4())
556 vdur_copy
["count-index"] += count
+ 1
557 vdur_copy
["id"] = "{}-{}".format(
558 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
560 vdur_copy
.pop("vim_info", None)
561 for iface
in vdur_copy
["interfaces"]:
562 if iface
.get("fixed-ip"):
563 iface
["ip-address"] = self
.increment_ip_mac(
564 iface
["ip-address"], count
+ 1
567 iface
.pop("ip-address", None)
568 if iface
.get("fixed-mac"):
569 iface
["mac-address"] = self
.increment_ip_mac(
570 iface
["mac-address"], count
+ 1
573 iface
.pop("mac-address", None)
577 ) # only first vdu can be managment of vnf
578 db_vdu_push_list
.append(vdur_copy
)
579 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
581 if len(db_vnfr
["vdur"]) == 1:
582 # The scale will move to 0 instances
583 self
.logger
.debug(f
"Scaling to 0 !, creating the template with the last vdur")
584 template_vdur
= [db_vnfr
["vdur"][0]]
585 for vdu_id
, vdu_count
in vdu_delete
.items():
587 indexes_to_delete
= [
589 for iv
in enumerate(db_vnfr
["vdur"])
590 if iv
[1]["vdu-id-ref"] == vdu_id
594 "vdur.{}.status".format(i
): "DELETING"
595 for i
in indexes_to_delete
[-vdu_count
:]
599 # it must be deleted one by one because common.db does not allow otherwise
602 for v
in reversed(db_vnfr
["vdur"])
603 if v
["vdu-id-ref"] == vdu_id
605 for vdu
in vdus_to_delete
[:vdu_count
]:
608 {"_id": db_vnfr
["_id"]},
610 pull
={"vdur": {"_id": vdu
["_id"]}},
614 db_push
["vdur"] = db_vdu_push_list
616 db_push
["vdur-template"] = template_vdur
619 db_vnfr
["vdur-template"] = template_vdur
620 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
621 # modify passed dictionary db_vnfr
622 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
623 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
625 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
627 Updates database nsr with the RO info for the created vld
628 :param ns_update_nsr: dictionary to be filled with the updated info
629 :param db_nsr: content of db_nsr. This is also modified
630 :param nsr_desc_RO: nsr descriptor from RO
631 :return: Nothing, LcmException is raised on errors
634 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
635 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
636 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
638 vld
["vim-id"] = net_RO
.get("vim_net_id")
639 vld
["name"] = net_RO
.get("vim_name")
640 vld
["status"] = net_RO
.get("status")
641 vld
["status-detailed"] = net_RO
.get("error_msg")
642 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
646 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
649 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
651 for db_vnfr
in db_vnfrs
.values():
652 vnfr_update
= {"status": "ERROR"}
653 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
654 if "status" not in vdur
:
655 vdur
["status"] = "ERROR"
656 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
658 vdur
["status-detailed"] = str(error_text
)
660 "vdur.{}.status-detailed".format(vdu_index
)
662 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
663 except DbException
as e
:
664 self
.logger
.error("Cannot update vnf. {}".format(e
))
666 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
668 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
669 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
670 :param nsr_desc_RO: nsr descriptor from RO
671 :return: Nothing, LcmException is raised on errors
673 for vnf_index
, db_vnfr
in db_vnfrs
.items():
674 for vnf_RO
in nsr_desc_RO
["vnfs"]:
675 if vnf_RO
["member_vnf_index"] != vnf_index
:
678 if vnf_RO
.get("ip_address"):
679 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
[
682 elif not db_vnfr
.get("ip-address"):
683 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
684 raise LcmExceptionNoMgmtIP(
685 "ns member_vnf_index '{}' has no IP address".format(
690 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
691 vdur_RO_count_index
= 0
692 if vdur
.get("pdu-type"):
694 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
695 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
697 if vdur
["count-index"] != vdur_RO_count_index
:
698 vdur_RO_count_index
+= 1
700 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
701 if vdur_RO
.get("ip_address"):
702 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
704 vdur
["ip-address"] = None
705 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
706 vdur
["name"] = vdur_RO
.get("vim_name")
707 vdur
["status"] = vdur_RO
.get("status")
708 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
709 for ifacer
in get_iterable(vdur
, "interfaces"):
710 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
711 if ifacer
["name"] == interface_RO
.get("internal_name"):
712 ifacer
["ip-address"] = interface_RO
.get(
715 ifacer
["mac-address"] = interface_RO
.get(
721 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
722 "from VIM info".format(
723 vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]
726 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
730 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
732 vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]
736 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
737 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
738 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
740 vld
["vim-id"] = net_RO
.get("vim_net_id")
741 vld
["name"] = net_RO
.get("vim_name")
742 vld
["status"] = net_RO
.get("status")
743 vld
["status-detailed"] = net_RO
.get("error_msg")
744 vnfr_update
["vld.{}".format(vld_index
)] = vld
748 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
753 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
758 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
763 def _get_ns_config_info(self
, nsr_id
):
765 Generates a mapping between vnf,vdu elements and the N2VC id
766 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
767 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
768 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
769 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
771 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
772 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
774 ns_config_info
= {"osm-config-mapping": mapping
}
775 for vca
in vca_deployed_list
:
776 if not vca
["member-vnf-index"]:
778 if not vca
["vdu_id"]:
779 mapping
[vca
["member-vnf-index"]] = vca
["application"]
783 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
785 ] = vca
["application"]
786 return ns_config_info
788 async def _instantiate_ng_ro(
805 def get_vim_account(vim_account_id
):
807 if vim_account_id
in db_vims
:
808 return db_vims
[vim_account_id
]
809 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
810 db_vims
[vim_account_id
] = db_vim
813 # modify target_vld info with instantiation parameters
814 def parse_vld_instantiation_params(
815 target_vim
, target_vld
, vld_params
, target_sdn
817 if vld_params
.get("ip-profile"):
818 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
[
821 if vld_params
.get("provider-network"):
822 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
825 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
826 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
829 if vld_params
.get("wimAccountId"):
830 target_wim
= "wim:{}".format(vld_params
["wimAccountId"])
831 target_vld
["vim_info"][target_wim
] = {}
832 for param
in ("vim-network-name", "vim-network-id"):
833 if vld_params
.get(param
):
834 if isinstance(vld_params
[param
], dict):
835 for vim
, vim_net
in vld_params
[param
].items():
836 other_target_vim
= "vim:" + vim
838 target_vld
["vim_info"],
839 (other_target_vim
, param
.replace("-", "_")),
842 else: # isinstance str
843 target_vld
["vim_info"][target_vim
][
844 param
.replace("-", "_")
845 ] = vld_params
[param
]
846 if vld_params
.get("common_id"):
847 target_vld
["common_id"] = vld_params
.get("common_id")
849 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
850 def update_ns_vld_target(target
, ns_params
):
851 for vnf_params
in ns_params
.get("vnf", ()):
852 if vnf_params
.get("vimAccountId"):
856 for vnfr
in db_vnfrs
.values()
857 if vnf_params
["member-vnf-index"]
858 == vnfr
["member-vnf-index-ref"]
862 vdur
= next((vdur
for vdur
in target_vnf
.get("vdur", ())), None)
863 for a_index
, a_vld
in enumerate(target
["ns"]["vld"]):
864 target_vld
= find_in_list(
865 get_iterable(vdur
, "interfaces"),
866 lambda iface
: iface
.get("ns-vld-id") == a_vld
["name"],
869 if vnf_params
.get("vimAccountId") not in a_vld
.get(
872 target
["ns"]["vld"][a_index
].get("vim_info").update(
874 "vim:{}".format(vnf_params
["vimAccountId"]): {
875 "vim_network_name": ""
880 nslcmop_id
= db_nslcmop
["_id"]
882 "name": db_nsr
["name"],
885 "image": deepcopy(db_nsr
["image"]),
886 "flavor": deepcopy(db_nsr
["flavor"]),
887 "action_id": nslcmop_id
,
888 "cloud_init_content": {},
890 for image
in target
["image"]:
891 image
["vim_info"] = {}
892 for flavor
in target
["flavor"]:
893 flavor
["vim_info"] = {}
894 if db_nsr
.get("affinity-or-anti-affinity-group"):
895 target
["affinity-or-anti-affinity-group"] = deepcopy(
896 db_nsr
["affinity-or-anti-affinity-group"]
898 for affinity_or_anti_affinity_group
in target
[
899 "affinity-or-anti-affinity-group"
901 affinity_or_anti_affinity_group
["vim_info"] = {}
903 if db_nslcmop
.get("lcmOperationType") != "instantiate":
904 # get parameters of instantiation:
905 db_nslcmop_instantiate
= self
.db
.get_list(
908 "nsInstanceId": db_nslcmop
["nsInstanceId"],
909 "lcmOperationType": "instantiate",
912 ns_params
= db_nslcmop_instantiate
.get("operationParams")
914 ns_params
= db_nslcmop
.get("operationParams")
915 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
916 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
919 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
920 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
924 "mgmt-network": vld
.get("mgmt-network", False),
925 "type": vld
.get("type"),
928 "vim_network_name": vld
.get("vim-network-name"),
929 "vim_account_id": ns_params
["vimAccountId"],
933 # check if this network needs SDN assist
934 if vld
.get("pci-interfaces"):
935 db_vim
= get_vim_account(ns_params
["vimAccountId"])
936 sdnc_id
= db_vim
["config"].get("sdn-controller")
938 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
939 target_sdn
= "sdn:{}".format(sdnc_id
)
940 target_vld
["vim_info"][target_sdn
] = {
942 "target_vim": target_vim
,
944 "type": vld
.get("type"),
947 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
948 for nsd_vnf_profile
in nsd_vnf_profiles
:
949 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
950 if cp
["virtual-link-profile-id"] == vld
["id"]:
952 "member_vnf:{}.{}".format(
953 cp
["constituent-cpd-id"][0][
954 "constituent-base-element-id"
956 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
958 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
960 # check at nsd descriptor, if there is an ip-profile
962 nsd_vlp
= find_in_list(
963 get_virtual_link_profiles(nsd
),
964 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
969 and nsd_vlp
.get("virtual-link-protocol-data")
970 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
972 ip_profile_source_data
= nsd_vlp
["virtual-link-protocol-data"][
975 ip_profile_dest_data
= {}
976 if "ip-version" in ip_profile_source_data
:
977 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
980 if "cidr" in ip_profile_source_data
:
981 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
984 if "gateway-ip" in ip_profile_source_data
:
985 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
[
988 if "dhcp-enabled" in ip_profile_source_data
:
989 ip_profile_dest_data
["dhcp-params"] = {
990 "enabled": ip_profile_source_data
["dhcp-enabled"]
992 vld_params
["ip-profile"] = ip_profile_dest_data
994 # update vld_params with instantiation params
995 vld_instantiation_params
= find_in_list(
996 get_iterable(ns_params
, "vld"),
997 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
999 if vld_instantiation_params
:
1000 vld_params
.update(vld_instantiation_params
)
1001 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
1002 target
["ns"]["vld"].append(target_vld
)
1003 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1004 update_ns_vld_target(target
, ns_params
)
1006 for vnfr
in db_vnfrs
.values():
1007 vnfd
= find_in_list(
1008 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
1010 vnf_params
= find_in_list(
1011 get_iterable(ns_params
, "vnf"),
1012 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
1014 target_vnf
= deepcopy(vnfr
)
1015 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
1016 for vld
in target_vnf
.get("vld", ()):
1017 # check if connected to a ns.vld, to fill target'
1018 vnf_cp
= find_in_list(
1019 vnfd
.get("int-virtual-link-desc", ()),
1020 lambda cpd
: cpd
.get("id") == vld
["id"],
1023 ns_cp
= "member_vnf:{}.{}".format(
1024 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
1026 if cp2target
.get(ns_cp
):
1027 vld
["target"] = cp2target
[ns_cp
]
1030 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
1032 # check if this network needs SDN assist
1034 if vld
.get("pci-interfaces"):
1035 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1036 sdnc_id
= db_vim
["config"].get("sdn-controller")
1038 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
1039 target_sdn
= "sdn:{}".format(sdnc_id
)
1040 vld
["vim_info"][target_sdn
] = {
1042 "target_vim": target_vim
,
1044 "type": vld
.get("type"),
1047 # check at vnfd descriptor, if there is an ip-profile
1049 vnfd_vlp
= find_in_list(
1050 get_virtual_link_profiles(vnfd
),
1051 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
1055 and vnfd_vlp
.get("virtual-link-protocol-data")
1056 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1058 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"][
1061 ip_profile_dest_data
= {}
1062 if "ip-version" in ip_profile_source_data
:
1063 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1066 if "cidr" in ip_profile_source_data
:
1067 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1070 if "gateway-ip" in ip_profile_source_data
:
1071 ip_profile_dest_data
[
1073 ] = ip_profile_source_data
["gateway-ip"]
1074 if "dhcp-enabled" in ip_profile_source_data
:
1075 ip_profile_dest_data
["dhcp-params"] = {
1076 "enabled": ip_profile_source_data
["dhcp-enabled"]
1079 vld_params
["ip-profile"] = ip_profile_dest_data
1080 # update vld_params with instantiation params
1082 vld_instantiation_params
= find_in_list(
1083 get_iterable(vnf_params
, "internal-vld"),
1084 lambda i_vld
: i_vld
["name"] == vld
["id"],
1086 if vld_instantiation_params
:
1087 vld_params
.update(vld_instantiation_params
)
1088 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
1091 for vdur
in target_vnf
.get("vdur", ()):
1092 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
1093 continue # This vdu must not be created
1094 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
1096 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
1099 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
1100 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
1103 and vdu_configuration
.get("config-access")
1104 and vdu_configuration
.get("config-access").get("ssh-access")
1106 vdur
["ssh-keys"] = ssh_keys_all
1107 vdur
["ssh-access-required"] = vdu_configuration
[
1109 ]["ssh-access"]["required"]
1112 and vnf_configuration
.get("config-access")
1113 and vnf_configuration
.get("config-access").get("ssh-access")
1114 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
1116 vdur
["ssh-keys"] = ssh_keys_all
1117 vdur
["ssh-access-required"] = vnf_configuration
[
1119 ]["ssh-access"]["required"]
1120 elif ssh_keys_instantiation
and find_in_list(
1121 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
1123 vdur
["ssh-keys"] = ssh_keys_instantiation
1125 self
.logger
.debug("NS > vdur > {}".format(vdur
))
1127 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1129 if vdud
.get("cloud-init-file"):
1130 vdur
["cloud-init"] = "{}:file:{}".format(
1131 vnfd
["_id"], vdud
.get("cloud-init-file")
1133 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1134 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1135 base_folder
= vnfd
["_admin"]["storage"]
1136 if base_folder
["pkg-dir"]:
1137 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1138 base_folder
["folder"],
1139 base_folder
["pkg-dir"],
1140 vdud
.get("cloud-init-file"),
1143 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
1144 base_folder
["folder"],
1145 vdud
.get("cloud-init-file"),
1147 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1148 target
["cloud_init_content"][
1151 elif vdud
.get("cloud-init"):
1152 vdur
["cloud-init"] = "{}:vdu:{}".format(
1153 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1155 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1156 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1159 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1160 deploy_params_vdu
= self
._format
_additional
_params
(
1161 vdur
.get("additionalParams") or {}
1163 deploy_params_vdu
["OSM"] = get_osm_params(
1164 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1166 vdur
["additionalParams"] = deploy_params_vdu
1169 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1170 if target_vim
not in ns_flavor
["vim_info"]:
1171 ns_flavor
["vim_info"][target_vim
] = {}
1174 # in case alternative images are provided we must check if they should be applied
1175 # for the vim_type, modify the vim_type taking into account
1176 ns_image_id
= int(vdur
["ns-image-id"])
1177 if vdur
.get("alt-image-ids"):
1178 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1179 vim_type
= db_vim
["vim_type"]
1180 for alt_image_id
in vdur
.get("alt-image-ids"):
1181 ns_alt_image
= target
["image"][int(alt_image_id
)]
1182 if vim_type
== ns_alt_image
.get("vim-type"):
1183 # must use alternative image
1185 "use alternative image id: {}".format(alt_image_id
)
1187 ns_image_id
= alt_image_id
1188 vdur
["ns-image-id"] = ns_image_id
1190 ns_image
= target
["image"][int(ns_image_id
)]
1191 if target_vim
not in ns_image
["vim_info"]:
1192 ns_image
["vim_info"][target_vim
] = {}
1195 if vdur
.get("affinity-or-anti-affinity-group-id"):
1196 for ags_id
in vdur
["affinity-or-anti-affinity-group-id"]:
1197 ns_ags
= target
["affinity-or-anti-affinity-group"][int(ags_id
)]
1198 if target_vim
not in ns_ags
["vim_info"]:
1199 ns_ags
["vim_info"][target_vim
] = {}
1201 vdur
["vim_info"] = {target_vim
: {}}
1202 # instantiation parameters
1204 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
1205 # vdud["id"]), None)
1206 vdur_list
.append(vdur
)
1207 target_vnf
["vdur"] = vdur_list
1208 target
["vnf"].append(target_vnf
)
1210 desc
= await self
.RO
.deploy(nsr_id
, target
)
1211 self
.logger
.debug("RO return > {}".format(desc
))
1212 action_id
= desc
["action_id"]
1213 await self
._wait
_ng
_ro
(
1214 nsr_id
, action_id
, nslcmop_id
, start_deploy
, timeout_ns_deploy
, stage
1219 "_admin.deployed.RO.operational-status": "running",
1220 "detailed-status": " ".join(stage
),
1222 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1223 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1224 self
._write
_op
_status
(nslcmop_id
, stage
)
1226 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1230 async def _wait_ng_ro(
1239 detailed_status_old
= None
1241 start_time
= start_time
or time()
1242 while time() <= start_time
+ timeout
:
1243 desc_status
= await self
.RO
.status(nsr_id
, action_id
)
1244 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1245 if desc_status
["status"] == "FAILED":
1246 raise NgRoException(desc_status
["details"])
1247 elif desc_status
["status"] == "BUILD":
1249 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1250 elif desc_status
["status"] == "DONE":
1252 stage
[2] = "Deployed at VIM"
1255 assert False, "ROclient.check_ns_status returns unknown {}".format(
1256 desc_status
["status"]
1258 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1259 detailed_status_old
= stage
[2]
1260 db_nsr_update
["detailed-status"] = " ".join(stage
)
1261 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1262 self
._write
_op
_status
(nslcmop_id
, stage
)
1263 await asyncio
.sleep(15, loop
=self
.loop
)
1264 else: # timeout_ns_deploy
1265 raise NgRoException("Timeout waiting ns to deploy")
1267 async def _terminate_ng_ro(
1268 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1273 start_deploy
= time()
1280 "action_id": nslcmop_id
,
1282 desc
= await self
.RO
.deploy(nsr_id
, target
)
1283 action_id
= desc
["action_id"]
1284 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1285 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1288 + "ns terminate action at RO. action_id={}".format(action_id
)
1292 delete_timeout
= 20 * 60 # 20 minutes
1293 await self
._wait
_ng
_ro
(
1294 nsr_id
, action_id
, nslcmop_id
, start_deploy
, delete_timeout
, stage
1297 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1298 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1300 await self
.RO
.delete(nsr_id
)
1301 except Exception as e
:
1302 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
1303 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1304 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1305 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1307 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1309 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
1310 failed_detail
.append("delete conflict: {}".format(e
))
1313 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1316 failed_detail
.append("delete error: {}".format(e
))
1319 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1323 stage
[2] = "Error deleting from VIM"
1325 stage
[2] = "Deleted from VIM"
1326 db_nsr_update
["detailed-status"] = " ".join(stage
)
1327 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1328 self
._write
_op
_status
(nslcmop_id
, stage
)
1331 raise LcmException("; ".join(failed_detail
))
1334 async def instantiate_RO(
1348 :param logging_text: preffix text to use at logging
1349 :param nsr_id: nsr identity
1350 :param nsd: database content of ns descriptor
1351 :param db_nsr: database content of ns record
1352 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1354 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1355 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1356 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1357 :return: None or exception
1360 start_deploy
= time()
1361 ns_params
= db_nslcmop
.get("operationParams")
1362 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1363 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1365 timeout_ns_deploy
= self
.timeout
.get(
1366 "ns_deploy", self
.timeout_ns_deploy
1369 # Check for and optionally request placement optimization. Database will be updated if placement activated
1370 stage
[2] = "Waiting for Placement."
1371 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1372 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1373 for vnfr
in db_vnfrs
.values():
1374 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1377 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1379 return await self
._instantiate
_ng
_ro
(
1392 except Exception as e
:
1393 stage
[2] = "ERROR deploying at VIM"
1394 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1396 "Error deploying at VIM {}".format(e
),
1397 exc_info
=not isinstance(
1400 ROclient
.ROClientException
,
1409 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1411 Wait for kdu to be up, get ip address
1412 :param logging_text: prefix use for logging
1419 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1422 while nb_tries
< 360:
1423 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1427 for x
in get_iterable(db_vnfr
, "kdur")
1428 if x
.get("kdu-name") == kdu_name
1434 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1436 if kdur
.get("status"):
1437 if kdur
["status"] in ("READY", "ENABLED"):
1438 return kdur
.get("ip-address")
1441 "target KDU={} is in error state".format(kdu_name
)
1444 await asyncio
.sleep(10, loop
=self
.loop
)
1446 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1448 async def wait_vm_up_insert_key_ro(
1449 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1452 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1453 :param logging_text: prefix use for logging
1458 :param pub_key: public ssh key to inject, None to skip
1459 :param user: user to apply the public ssh key
1463 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1467 target_vdu_id
= None
1473 if ro_retries
>= 360: # 1 hour
1475 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1478 await asyncio
.sleep(10, loop
=self
.loop
)
1481 if not target_vdu_id
:
1482 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1484 if not vdu_id
: # for the VNF case
1485 if db_vnfr
.get("status") == "ERROR":
1487 "Cannot inject ssh-key because target VNF is in error state"
1489 ip_address
= db_vnfr
.get("ip-address")
1495 for x
in get_iterable(db_vnfr
, "vdur")
1496 if x
.get("ip-address") == ip_address
1504 for x
in get_iterable(db_vnfr
, "vdur")
1505 if x
.get("vdu-id-ref") == vdu_id
1506 and x
.get("count-index") == vdu_index
1512 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1513 ): # If only one, this should be the target vdu
1514 vdur
= db_vnfr
["vdur"][0]
1517 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1518 vnfr_id
, vdu_id
, vdu_index
1521 # New generation RO stores information at "vim_info"
1524 if vdur
.get("vim_info"):
1526 t
for t
in vdur
["vim_info"]
1527 ) # there should be only one key
1528 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1530 vdur
.get("pdu-type")
1531 or vdur
.get("status") == "ACTIVE"
1532 or ng_ro_status
== "ACTIVE"
1534 ip_address
= vdur
.get("ip-address")
1537 target_vdu_id
= vdur
["vdu-id-ref"]
1538 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1540 "Cannot inject ssh-key because target VM is in error state"
1543 if not target_vdu_id
:
1546 # inject public key into machine
1547 if pub_key
and user
:
1548 self
.logger
.debug(logging_text
+ "Inserting RO key")
1549 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1550 if vdur
.get("pdu-type"):
1551 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1554 ro_vm_id
= "{}-{}".format(
1555 db_vnfr
["member-vnf-index-ref"], target_vdu_id
1556 ) # TODO add vdu_index
1560 "action": "inject_ssh_key",
1564 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1566 desc
= await self
.RO
.deploy(nsr_id
, target
)
1567 action_id
= desc
["action_id"]
1568 await self
._wait
_ng
_ro
(nsr_id
, action_id
, timeout
=600)
1571 # wait until NS is deployed at RO
1573 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1574 ro_nsr_id
= deep_get(
1575 db_nsrs
, ("_admin", "deployed", "RO", "nsr_id")
1579 result_dict
= await self
.RO
.create_action(
1581 item_id_name
=ro_nsr_id
,
1583 "add_public_key": pub_key
,
1588 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1589 if not result_dict
or not isinstance(result_dict
, dict):
1591 "Unknown response from RO when injecting key"
1593 for result
in result_dict
.values():
1594 if result
.get("vim_result") == 200:
1597 raise ROclient
.ROClientException(
1598 "error injecting key: {}".format(
1599 result
.get("description")
1603 except NgRoException
as e
:
1605 "Reaching max tries injecting key. Error: {}".format(e
)
1607 except ROclient
.ROClientException
as e
:
1611 + "error injecting key: {}. Retrying until {} seconds".format(
1618 "Reaching max tries injecting key. Error: {}".format(e
)
1625 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1627 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1629 my_vca
= vca_deployed_list
[vca_index
]
1630 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1631 # vdu or kdu: no dependencies
1635 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1636 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1637 configuration_status_list
= db_nsr
["configurationStatus"]
1638 for index
, vca_deployed
in enumerate(configuration_status_list
):
1639 if index
== vca_index
:
1642 if not my_vca
.get("member-vnf-index") or (
1643 vca_deployed
.get("member-vnf-index")
1644 == my_vca
.get("member-vnf-index")
1646 internal_status
= configuration_status_list
[index
].get("status")
1647 if internal_status
== "READY":
1649 elif internal_status
== "BROKEN":
1651 "Configuration aborted because dependent charm/s has failed"
1656 # no dependencies, return
1658 await asyncio
.sleep(10)
1661 raise LcmException("Configuration aborted because dependent charm/s timeout")
1663 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1666 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1668 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1669 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1672 async def instantiate_N2VC(
1689 ee_config_descriptor
,
1691 nsr_id
= db_nsr
["_id"]
1692 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1693 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1694 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1695 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1697 "collection": "nsrs",
1698 "filter": {"_id": nsr_id
},
1699 "path": db_update_entry
,
1705 element_under_configuration
= nsr_id
1709 vnfr_id
= db_vnfr
["_id"]
1710 osm_config
["osm"]["vnf_id"] = vnfr_id
1712 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1714 if vca_type
== "native_charm":
1717 index_number
= vdu_index
or 0
1720 element_type
= "VNF"
1721 element_under_configuration
= vnfr_id
1722 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1724 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1725 element_type
= "VDU"
1726 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1727 osm_config
["osm"]["vdu_id"] = vdu_id
1729 namespace
+= ".{}".format(kdu_name
)
1730 element_type
= "KDU"
1731 element_under_configuration
= kdu_name
1732 osm_config
["osm"]["kdu_name"] = kdu_name
1735 if base_folder
["pkg-dir"]:
1736 artifact_path
= "{}/{}/{}/{}".format(
1737 base_folder
["folder"],
1738 base_folder
["pkg-dir"],
1741 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1746 artifact_path
= "{}/Scripts/{}/{}/".format(
1747 base_folder
["folder"],
1750 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1755 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1757 # get initial_config_primitive_list that applies to this element
1758 initial_config_primitive_list
= config_descriptor
.get(
1759 "initial-config-primitive"
1763 "Initial config primitive list > {}".format(
1764 initial_config_primitive_list
1768 # add config if not present for NS charm
1769 ee_descriptor_id
= ee_config_descriptor
.get("id")
1770 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1771 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1772 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1776 "Initial config primitive list #2 > {}".format(
1777 initial_config_primitive_list
1780 # n2vc_redesign STEP 3.1
1781 # find old ee_id if exists
1782 ee_id
= vca_deployed
.get("ee_id")
1784 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1785 # create or register execution environment in VCA
1786 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1788 self
._write
_configuration
_status
(
1790 vca_index
=vca_index
,
1792 element_under_configuration
=element_under_configuration
,
1793 element_type
=element_type
,
1796 step
= "create execution environment"
1797 self
.logger
.debug(logging_text
+ step
)
1801 if vca_type
== "k8s_proxy_charm":
1802 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1803 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1804 namespace
=namespace
,
1805 artifact_path
=artifact_path
,
1809 elif vca_type
== "helm" or vca_type
== "helm-v3":
1810 ee_id
, credentials
= await self
.vca_map
[
1812 ].create_execution_environment(
1813 namespace
=namespace
,
1817 artifact_path
=artifact_path
,
1821 ee_id
, credentials
= await self
.vca_map
[
1823 ].create_execution_environment(
1824 namespace
=namespace
,
1830 elif vca_type
== "native_charm":
1831 step
= "Waiting to VM being up and getting IP address"
1832 self
.logger
.debug(logging_text
+ step
)
1833 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1842 credentials
= {"hostname": rw_mgmt_ip
}
1844 username
= deep_get(
1845 config_descriptor
, ("config-access", "ssh-access", "default-user")
1847 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1848 # merged. Meanwhile let's get username from initial-config-primitive
1849 if not username
and initial_config_primitive_list
:
1850 for config_primitive
in initial_config_primitive_list
:
1851 for param
in config_primitive
.get("parameter", ()):
1852 if param
["name"] == "ssh-username":
1853 username
= param
["value"]
1857 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1858 "'config-access.ssh-access.default-user'"
1860 credentials
["username"] = username
1861 # n2vc_redesign STEP 3.2
1863 self
._write
_configuration
_status
(
1865 vca_index
=vca_index
,
1866 status
="REGISTERING",
1867 element_under_configuration
=element_under_configuration
,
1868 element_type
=element_type
,
1871 step
= "register execution environment {}".format(credentials
)
1872 self
.logger
.debug(logging_text
+ step
)
1873 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1874 credentials
=credentials
,
1875 namespace
=namespace
,
1880 # for compatibility with MON/POL modules, the need model and application name at database
1881 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1882 ee_id_parts
= ee_id
.split(".")
1883 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1884 if len(ee_id_parts
) >= 2:
1885 model_name
= ee_id_parts
[0]
1886 application_name
= ee_id_parts
[1]
1887 db_nsr_update
[db_update_entry
+ "model"] = model_name
1888 db_nsr_update
[db_update_entry
+ "application"] = application_name
1890 # n2vc_redesign STEP 3.3
1891 step
= "Install configuration Software"
1893 self
._write
_configuration
_status
(
1895 vca_index
=vca_index
,
1896 status
="INSTALLING SW",
1897 element_under_configuration
=element_under_configuration
,
1898 element_type
=element_type
,
1899 other_update
=db_nsr_update
,
1902 # TODO check if already done
1903 self
.logger
.debug(logging_text
+ step
)
1905 if vca_type
== "native_charm":
1906 config_primitive
= next(
1907 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
1910 if config_primitive
:
1911 config
= self
._map
_primitive
_params
(
1912 config_primitive
, {}, deploy_params
1915 if vca_type
== "lxc_proxy_charm":
1916 if element_type
== "NS":
1917 num_units
= db_nsr
.get("config-units") or 1
1918 elif element_type
== "VNF":
1919 num_units
= db_vnfr
.get("config-units") or 1
1920 elif element_type
== "VDU":
1921 for v
in db_vnfr
["vdur"]:
1922 if vdu_id
== v
["vdu-id-ref"]:
1923 num_units
= v
.get("config-units") or 1
1925 if vca_type
!= "k8s_proxy_charm":
1926 await self
.vca_map
[vca_type
].install_configuration_sw(
1928 artifact_path
=artifact_path
,
1931 num_units
=num_units
,
1936 # write in db flag of configuration_sw already installed
1938 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
1941 # add relations for this VCA (wait for other peers related with this VCA)
1942 await self
._add
_vca
_relations
(
1943 logging_text
=logging_text
,
1946 vca_index
=vca_index
,
1949 # if SSH access is required, then get execution environment SSH public
1950 # if native charm we have waited already to VM be UP
1951 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1954 # self.logger.debug("get ssh key block")
1956 config_descriptor
, ("config-access", "ssh-access", "required")
1958 # self.logger.debug("ssh key needed")
1959 # Needed to inject a ssh key
1962 ("config-access", "ssh-access", "default-user"),
1964 step
= "Install configuration Software, getting public ssh key"
1965 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
1966 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
1969 step
= "Insert public key into VM user={} ssh_key={}".format(
1973 # self.logger.debug("no need to get ssh key")
1974 step
= "Waiting to VM being up and getting IP address"
1975 self
.logger
.debug(logging_text
+ step
)
1977 # n2vc_redesign STEP 5.1
1978 # wait for RO (ip-address) Insert pub_key into VM
1981 rw_mgmt_ip
= await self
.wait_kdu_up(
1982 logging_text
, nsr_id
, vnfr_id
, kdu_name
1985 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1995 rw_mgmt_ip
= None # This is for a NS configuration
1997 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
1999 # store rw_mgmt_ip in deploy params for later replacement
2000 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
2002 # n2vc_redesign STEP 6 Execute initial config primitive
2003 step
= "execute initial config primitive"
2005 # wait for dependent primitives execution (NS -> VNF -> VDU)
2006 if initial_config_primitive_list
:
2007 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
2009 # stage, in function of element type: vdu, kdu, vnf or ns
2010 my_vca
= vca_deployed_list
[vca_index
]
2011 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
2013 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
2014 elif my_vca
.get("member-vnf-index"):
2016 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
2019 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
2021 self
._write
_configuration
_status
(
2022 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
2025 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2027 check_if_terminated_needed
= True
2028 for initial_config_primitive
in initial_config_primitive_list
:
2029 # adding information on the vca_deployed if it is a NS execution environment
2030 if not vca_deployed
["member-vnf-index"]:
2031 deploy_params
["ns_config_info"] = json
.dumps(
2032 self
._get
_ns
_config
_info
(nsr_id
)
2034 # TODO check if already done
2035 primitive_params_
= self
._map
_primitive
_params
(
2036 initial_config_primitive
, {}, deploy_params
2039 step
= "execute primitive '{}' params '{}'".format(
2040 initial_config_primitive
["name"], primitive_params_
2042 self
.logger
.debug(logging_text
+ step
)
2043 await self
.vca_map
[vca_type
].exec_primitive(
2045 primitive_name
=initial_config_primitive
["name"],
2046 params_dict
=primitive_params_
,
2051 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2052 if check_if_terminated_needed
:
2053 if config_descriptor
.get("terminate-config-primitive"):
2055 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
2057 check_if_terminated_needed
= False
2059 # TODO register in database that primitive is done
2061 # STEP 7 Configure metrics
2062 if vca_type
== "helm" or vca_type
== "helm-v3":
2063 prometheus_jobs
= await self
.extract_prometheus_scrape_jobs(
2065 artifact_path
=artifact_path
,
2066 ee_config_descriptor
=ee_config_descriptor
,
2069 target_ip
=rw_mgmt_ip
,
2075 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
2078 for job
in prometheus_jobs
:
2081 {"job_name": job
["job_name"]},
2084 fail_on_empty
=False,
2087 step
= "instantiated at VCA"
2088 self
.logger
.debug(logging_text
+ step
)
2090 self
._write
_configuration
_status
(
2091 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
2094 except Exception as e
: # TODO not use Exception but N2VC exception
2095 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2097 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
2100 "Exception while {} : {}".format(step
, e
), exc_info
=True
2102 self
._write
_configuration
_status
(
2103 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2105 raise LcmException("{} {}".format(step
, e
)) from e
2107 def _write_ns_status(
2111 current_operation
: str,
2112 current_operation_id
: str,
2113 error_description
: str = None,
2114 error_detail
: str = None,
2115 other_update
: dict = None,
2118 Update db_nsr fields.
2121 :param current_operation:
2122 :param current_operation_id:
2123 :param error_description:
2124 :param error_detail:
2125 :param other_update: Other required changes at database if provided, will be cleared
2129 db_dict
= other_update
or {}
2132 ] = current_operation_id
# for backward compatibility
2133 db_dict
["_admin.current-operation"] = current_operation_id
2134 db_dict
["_admin.operation-type"] = (
2135 current_operation
if current_operation
!= "IDLE" else None
2137 db_dict
["currentOperation"] = current_operation
2138 db_dict
["currentOperationID"] = current_operation_id
2139 db_dict
["errorDescription"] = error_description
2140 db_dict
["errorDetail"] = error_detail
2143 db_dict
["nsState"] = ns_state
2144 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2145 except DbException
as e
:
2146 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2148 def _write_op_status(
2152 error_message
: str = None,
2153 queuePosition
: int = 0,
2154 operation_state
: str = None,
2155 other_update
: dict = None,
2158 db_dict
= other_update
or {}
2159 db_dict
["queuePosition"] = queuePosition
2160 if isinstance(stage
, list):
2161 db_dict
["stage"] = stage
[0]
2162 db_dict
["detailed-status"] = " ".join(stage
)
2163 elif stage
is not None:
2164 db_dict
["stage"] = str(stage
)
2166 if error_message
is not None:
2167 db_dict
["errorMessage"] = error_message
2168 if operation_state
is not None:
2169 db_dict
["operationState"] = operation_state
2170 db_dict
["statusEnteredTime"] = time()
2171 self
.update_db_2("nslcmops", op_id
, db_dict
)
2172 except DbException
as e
:
2174 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2177 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2179 nsr_id
= db_nsr
["_id"]
2180 # configurationStatus
2181 config_status
= db_nsr
.get("configurationStatus")
2184 "configurationStatus.{}.status".format(index
): status
2185 for index
, v
in enumerate(config_status
)
2189 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2191 except DbException
as e
:
2193 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2196 def _write_configuration_status(
2201 element_under_configuration
: str = None,
2202 element_type
: str = None,
2203 other_update
: dict = None,
2206 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2207 # .format(vca_index, status))
2210 db_path
= "configurationStatus.{}.".format(vca_index
)
2211 db_dict
= other_update
or {}
2213 db_dict
[db_path
+ "status"] = status
2214 if element_under_configuration
:
2216 db_path
+ "elementUnderConfiguration"
2217 ] = element_under_configuration
2219 db_dict
[db_path
+ "elementType"] = element_type
2220 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2221 except DbException
as e
:
2223 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2224 status
, nsr_id
, vca_index
, e
2228 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2230 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2231 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2232 Database is used because the result can be obtained from a different LCM worker in case of HA.
2233 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2234 :param db_nslcmop: database content of nslcmop
2235 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2236 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2237 computed 'vim-account-id'
2240 nslcmop_id
= db_nslcmop
["_id"]
2241 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2242 if placement_engine
== "PLA":
2244 logging_text
+ "Invoke and wait for placement optimization"
2246 await self
.msg
.aiowrite(
2247 "pla", "get_placement", {"nslcmopId": nslcmop_id
}, loop
=self
.loop
2249 db_poll_interval
= 5
2250 wait
= db_poll_interval
* 10
2252 while not pla_result
and wait
>= 0:
2253 await asyncio
.sleep(db_poll_interval
)
2254 wait
-= db_poll_interval
2255 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2256 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2260 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2263 for pla_vnf
in pla_result
["vnf"]:
2264 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2265 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2270 {"_id": vnfr
["_id"]},
2271 {"vim-account-id": pla_vnf
["vimAccountId"]},
2274 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2277 def update_nsrs_with_pla_result(self
, params
):
2279 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2281 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2283 except Exception as e
:
2284 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2286 async def instantiate(self
, nsr_id
, nslcmop_id
):
2289 :param nsr_id: ns instance to deploy
2290 :param nslcmop_id: operation to run
2294 # Try to lock HA task here
2295 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2296 if not task_is_locked_by_me
:
2298 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2302 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2303 self
.logger
.debug(logging_text
+ "Enter")
2305 # get all needed from database
2307 # database nsrs record
2310 # database nslcmops record
2313 # update operation on nsrs
2315 # update operation on nslcmops
2316 db_nslcmop_update
= {}
2318 nslcmop_operation_state
= None
2319 db_vnfrs
= {} # vnf's info indexed by member-index
2321 tasks_dict_info
= {} # from task to info text
2325 "Stage 1/5: preparation of the environment.",
2326 "Waiting for previous operations to terminate.",
2329 # ^ stage, step, VIM progress
2331 # wait for any previous tasks in process
2332 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2334 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2335 stage
[1] = "Reading from database."
2336 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2337 db_nsr_update
["detailed-status"] = "creating"
2338 db_nsr_update
["operational-status"] = "init"
2339 self
._write
_ns
_status
(
2341 ns_state
="BUILDING",
2342 current_operation
="INSTANTIATING",
2343 current_operation_id
=nslcmop_id
,
2344 other_update
=db_nsr_update
,
2346 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2348 # read from db: operation
2349 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2350 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2351 if db_nslcmop
["operationParams"].get("additionalParamsForVnf"):
2352 db_nslcmop
["operationParams"]["additionalParamsForVnf"] = json
.loads(
2353 db_nslcmop
["operationParams"]["additionalParamsForVnf"]
2355 ns_params
= db_nslcmop
.get("operationParams")
2356 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2357 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2359 timeout_ns_deploy
= self
.timeout
.get(
2360 "ns_deploy", self
.timeout_ns_deploy
2364 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2365 self
.logger
.debug(logging_text
+ stage
[1])
2366 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2367 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2368 self
.logger
.debug(logging_text
+ stage
[1])
2369 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2370 self
.fs
.sync(db_nsr
["nsd-id"])
2372 # nsr_name = db_nsr["name"] # TODO short-name??
2374 # read from db: vnf's of this ns
2375 stage
[1] = "Getting vnfrs from db."
2376 self
.logger
.debug(logging_text
+ stage
[1])
2377 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2379 # read from db: vnfd's for every vnf
2380 db_vnfds
= [] # every vnfd data
2382 # for each vnf in ns, read vnfd
2383 for vnfr
in db_vnfrs_list
:
2384 if vnfr
.get("kdur"):
2386 for kdur
in vnfr
["kdur"]:
2387 if kdur
.get("additionalParams"):
2388 kdur
["additionalParams"] = json
.loads(
2389 kdur
["additionalParams"]
2391 kdur_list
.append(kdur
)
2392 vnfr
["kdur"] = kdur_list
2394 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2395 vnfd_id
= vnfr
["vnfd-id"]
2396 vnfd_ref
= vnfr
["vnfd-ref"]
2397 self
.fs
.sync(vnfd_id
)
2399 # if we haven't this vnfd, read it from db
2400 if vnfd_id
not in db_vnfds
:
2402 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2405 self
.logger
.debug(logging_text
+ stage
[1])
2406 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2409 db_vnfds
.append(vnfd
)
2411 # Get or generates the _admin.deployed.VCA list
2412 vca_deployed_list
= None
2413 if db_nsr
["_admin"].get("deployed"):
2414 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2415 if vca_deployed_list
is None:
2416 vca_deployed_list
= []
2417 configuration_status_list
= []
2418 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2419 db_nsr_update
["configurationStatus"] = configuration_status_list
2420 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2421 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2422 elif isinstance(vca_deployed_list
, dict):
2423 # maintain backward compatibility. Change a dict to list at database
2424 vca_deployed_list
= list(vca_deployed_list
.values())
2425 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2426 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2429 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2431 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2432 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2434 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2435 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2436 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2438 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2441 # n2vc_redesign STEP 2 Deploy Network Scenario
2442 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2443 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2445 stage
[1] = "Deploying KDUs."
2446 # self.logger.debug(logging_text + "Before deploy_kdus")
2447 # Call to deploy_kdus in case exists the "vdu:kdu" param
2448 await self
.deploy_kdus(
2449 logging_text
=logging_text
,
2451 nslcmop_id
=nslcmop_id
,
2454 task_instantiation_info
=tasks_dict_info
,
2457 stage
[1] = "Getting VCA public key."
2458 # n2vc_redesign STEP 1 Get VCA public ssh-key
2459 # feature 1429. Add n2vc public key to needed VMs
2460 n2vc_key
= self
.n2vc
.get_public_key()
2461 n2vc_key_list
= [n2vc_key
]
2462 if self
.vca_config
.get("public_key"):
2463 n2vc_key_list
.append(self
.vca_config
["public_key"])
2465 stage
[1] = "Deploying NS at VIM."
2466 task_ro
= asyncio
.ensure_future(
2467 self
.instantiate_RO(
2468 logging_text
=logging_text
,
2472 db_nslcmop
=db_nslcmop
,
2475 n2vc_key_list
=n2vc_key_list
,
2479 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2480 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2482 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2483 stage
[1] = "Deploying Execution Environments."
2484 self
.logger
.debug(logging_text
+ stage
[1])
2486 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2487 for vnf_profile
in get_vnf_profiles(nsd
):
2488 vnfd_id
= vnf_profile
["vnfd-id"]
2489 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2490 member_vnf_index
= str(vnf_profile
["id"])
2491 db_vnfr
= db_vnfrs
[member_vnf_index
]
2492 base_folder
= vnfd
["_admin"]["storage"]
2498 # Get additional parameters
2499 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2500 if db_vnfr
.get("additionalParamsForVnf"):
2501 deploy_params
.update(
2502 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2505 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2506 if descriptor_config
:
2508 logging_text
=logging_text
2509 + "member_vnf_index={} ".format(member_vnf_index
),
2512 nslcmop_id
=nslcmop_id
,
2518 member_vnf_index
=member_vnf_index
,
2519 vdu_index
=vdu_index
,
2521 deploy_params
=deploy_params
,
2522 descriptor_config
=descriptor_config
,
2523 base_folder
=base_folder
,
2524 task_instantiation_info
=tasks_dict_info
,
2528 # Deploy charms for each VDU that supports one.
2529 for vdud
in get_vdu_list(vnfd
):
2531 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2532 vdur
= find_in_list(
2533 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2536 if vdur
.get("additionalParams"):
2537 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2539 deploy_params_vdu
= deploy_params
2540 deploy_params_vdu
["OSM"] = get_osm_params(
2541 db_vnfr
, vdu_id
, vdu_count_index
=0
2543 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2545 self
.logger
.debug("VDUD > {}".format(vdud
))
2547 "Descriptor config > {}".format(descriptor_config
)
2549 if descriptor_config
:
2552 for vdu_index
in range(vdud_count
):
2553 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2555 logging_text
=logging_text
2556 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2557 member_vnf_index
, vdu_id
, vdu_index
2561 nslcmop_id
=nslcmop_id
,
2567 member_vnf_index
=member_vnf_index
,
2568 vdu_index
=vdu_index
,
2570 deploy_params
=deploy_params_vdu
,
2571 descriptor_config
=descriptor_config
,
2572 base_folder
=base_folder
,
2573 task_instantiation_info
=tasks_dict_info
,
2576 for kdud
in get_kdu_list(vnfd
):
2577 kdu_name
= kdud
["name"]
2578 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2579 if descriptor_config
:
2584 x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
2586 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2587 if kdur
.get("additionalParams"):
2588 deploy_params_kdu
.update(
2589 parse_yaml_strings(kdur
["additionalParams"].copy())
2593 logging_text
=logging_text
,
2596 nslcmop_id
=nslcmop_id
,
2602 member_vnf_index
=member_vnf_index
,
2603 vdu_index
=vdu_index
,
2605 deploy_params
=deploy_params_kdu
,
2606 descriptor_config
=descriptor_config
,
2607 base_folder
=base_folder
,
2608 task_instantiation_info
=tasks_dict_info
,
2612 # Check if this NS has a charm configuration
2613 descriptor_config
= nsd
.get("ns-configuration")
2614 if descriptor_config
and descriptor_config
.get("juju"):
2617 member_vnf_index
= None
2623 # Get additional parameters
2624 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2625 if db_nsr
.get("additionalParamsForNs"):
2626 deploy_params
.update(
2627 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2629 base_folder
= nsd
["_admin"]["storage"]
2631 logging_text
=logging_text
,
2634 nslcmop_id
=nslcmop_id
,
2640 member_vnf_index
=member_vnf_index
,
2641 vdu_index
=vdu_index
,
2643 deploy_params
=deploy_params
,
2644 descriptor_config
=descriptor_config
,
2645 base_folder
=base_folder
,
2646 task_instantiation_info
=tasks_dict_info
,
2650 # rest of staff will be done at finally
2653 ROclient
.ROClientException
,
2659 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
2662 except asyncio
.CancelledError
:
2664 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
2666 exc
= "Operation was cancelled"
2667 except Exception as e
:
2668 exc
= traceback
.format_exc()
2669 self
.logger
.critical(
2670 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
2675 error_list
.append(str(exc
))
2677 # wait for pending tasks
2679 stage
[1] = "Waiting for instantiate pending tasks."
2680 self
.logger
.debug(logging_text
+ stage
[1])
2681 error_list
+= await self
._wait
_for
_tasks
(
2689 stage
[1] = stage
[2] = ""
2690 except asyncio
.CancelledError
:
2691 error_list
.append("Cancelled")
2692 # TODO cancel all tasks
2693 except Exception as exc
:
2694 error_list
.append(str(exc
))
2696 # update operation-status
2697 db_nsr_update
["operational-status"] = "running"
2698 # let's begin with VCA 'configured' status (later we can change it)
2699 db_nsr_update
["config-status"] = "configured"
2700 for task
, task_name
in tasks_dict_info
.items():
2701 if not task
.done() or task
.cancelled() or task
.exception():
2702 if task_name
.startswith(self
.task_name_deploy_vca
):
2703 # A N2VC task is pending
2704 db_nsr_update
["config-status"] = "failed"
2706 # RO or KDU task is pending
2707 db_nsr_update
["operational-status"] = "failed"
2709 # update status at database
2711 error_detail
= ". ".join(error_list
)
2712 self
.logger
.error(logging_text
+ error_detail
)
2713 error_description_nslcmop
= "{} Detail: {}".format(
2714 stage
[0], error_detail
2716 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
2717 nslcmop_id
, stage
[0]
2720 db_nsr_update
["detailed-status"] = (
2721 error_description_nsr
+ " Detail: " + error_detail
2723 db_nslcmop_update
["detailed-status"] = error_detail
2724 nslcmop_operation_state
= "FAILED"
2728 error_description_nsr
= error_description_nslcmop
= None
2730 db_nsr_update
["detailed-status"] = "Done"
2731 db_nslcmop_update
["detailed-status"] = "Done"
2732 nslcmop_operation_state
= "COMPLETED"
2735 self
._write
_ns
_status
(
2738 current_operation
="IDLE",
2739 current_operation_id
=None,
2740 error_description
=error_description_nsr
,
2741 error_detail
=error_detail
,
2742 other_update
=db_nsr_update
,
2744 self
._write
_op
_status
(
2747 error_message
=error_description_nslcmop
,
2748 operation_state
=nslcmop_operation_state
,
2749 other_update
=db_nslcmop_update
,
2752 if nslcmop_operation_state
:
2754 await self
.msg
.aiowrite(
2759 "nslcmop_id": nslcmop_id
,
2760 "operationState": nslcmop_operation_state
,
2764 except Exception as e
:
2766 logging_text
+ "kafka_write notification Exception {}".format(e
)
2769 self
.logger
.debug(logging_text
+ "Exit")
2770 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2772 def _get_vnfd(self
, vnfd_id
: str, cached_vnfds
: Dict
[str, Any
]):
2773 if vnfd_id
not in cached_vnfds
:
2774 cached_vnfds
[vnfd_id
] = self
.db
.get_one("vnfds", {"id": vnfd_id
})
2775 return cached_vnfds
[vnfd_id
]
2777 def _get_vnfr(self
, nsr_id
: str, vnf_profile_id
: str, cached_vnfrs
: Dict
[str, Any
]):
2778 if vnf_profile_id
not in cached_vnfrs
:
2779 cached_vnfrs
[vnf_profile_id
] = self
.db
.get_one(
2782 "member-vnf-index-ref": vnf_profile_id
,
2783 "nsr-id-ref": nsr_id
,
2786 return cached_vnfrs
[vnf_profile_id
]
2788 def _is_deployed_vca_in_relation(
2789 self
, vca
: DeployedVCA
, relation
: Relation
2792 for endpoint
in (relation
.provider
, relation
.requirer
):
2793 if endpoint
["kdu-resource-profile-id"]:
2796 vca
.vnf_profile_id
== endpoint
.vnf_profile_id
2797 and vca
.vdu_profile_id
== endpoint
.vdu_profile_id
2798 and vca
.execution_environment_ref
== endpoint
.execution_environment_ref
2804 def _update_ee_relation_data_with_implicit_data(
2805 self
, nsr_id
, nsd
, ee_relation_data
, cached_vnfds
, vnf_profile_id
: str = None
2807 ee_relation_data
= safe_get_ee_relation(
2808 nsr_id
, ee_relation_data
, vnf_profile_id
=vnf_profile_id
2810 ee_relation_level
= EELevel
.get_level(ee_relation_data
)
2811 if (ee_relation_level
in (EELevel
.VNF
, EELevel
.VDU
)) and not ee_relation_data
[
2812 "execution-environment-ref"
2814 vnf_profile
= get_vnf_profile(nsd
, ee_relation_data
["vnf-profile-id"])
2815 vnfd_id
= vnf_profile
["vnfd-id"]
2816 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2819 if ee_relation_level
== EELevel
.VNF
2820 else ee_relation_data
["vdu-profile-id"]
2822 ee
= get_juju_ee_ref(db_vnfd
, entity_id
)
2825 f
"not execution environments found for ee_relation {ee_relation_data}"
2827 ee_relation_data
["execution-environment-ref"] = ee
["id"]
2828 return ee_relation_data
2830 def _get_ns_relations(
2833 nsd
: Dict
[str, Any
],
2835 cached_vnfds
: Dict
[str, Any
],
2836 ) -> List
[Relation
]:
2838 db_ns_relations
= get_ns_configuration_relation_list(nsd
)
2839 for r
in db_ns_relations
:
2840 provider_dict
= None
2841 requirer_dict
= None
2842 if all(key
in r
for key
in ("provider", "requirer")):
2843 provider_dict
= r
["provider"]
2844 requirer_dict
= r
["requirer"]
2845 elif "entities" in r
:
2846 provider_id
= r
["entities"][0]["id"]
2849 "endpoint": r
["entities"][0]["endpoint"],
2851 if provider_id
!= nsd
["id"]:
2852 provider_dict
["vnf-profile-id"] = provider_id
2853 requirer_id
= r
["entities"][1]["id"]
2856 "endpoint": r
["entities"][1]["endpoint"],
2858 if requirer_id
!= nsd
["id"]:
2859 requirer_dict
["vnf-profile-id"] = requirer_id
2862 "provider/requirer or entities must be included in the relation."
2864 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2865 nsr_id
, nsd
, provider_dict
, cached_vnfds
2867 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2868 nsr_id
, nsd
, requirer_dict
, cached_vnfds
2870 provider
= EERelation(relation_provider
)
2871 requirer
= EERelation(relation_requirer
)
2872 relation
= Relation(r
["name"], provider
, requirer
)
2873 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
2875 relations
.append(relation
)
2878 def _get_vnf_relations(
2881 nsd
: Dict
[str, Any
],
2883 cached_vnfds
: Dict
[str, Any
],
2884 ) -> List
[Relation
]:
2886 vnf_profile
= get_vnf_profile(nsd
, vca
.vnf_profile_id
)
2887 vnf_profile_id
= vnf_profile
["id"]
2888 vnfd_id
= vnf_profile
["vnfd-id"]
2889 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2890 db_vnf_relations
= get_relation_list(db_vnfd
, vnfd_id
)
2891 for r
in db_vnf_relations
:
2892 provider_dict
= None
2893 requirer_dict
= None
2894 if all(key
in r
for key
in ("provider", "requirer")):
2895 provider_dict
= r
["provider"]
2896 requirer_dict
= r
["requirer"]
2897 elif "entities" in r
:
2898 provider_id
= r
["entities"][0]["id"]
2901 "vnf-profile-id": vnf_profile_id
,
2902 "endpoint": r
["entities"][0]["endpoint"],
2904 if provider_id
!= vnfd_id
:
2905 provider_dict
["vdu-profile-id"] = provider_id
2906 requirer_id
= r
["entities"][1]["id"]
2909 "vnf-profile-id": vnf_profile_id
,
2910 "endpoint": r
["entities"][1]["endpoint"],
2912 if requirer_id
!= vnfd_id
:
2913 requirer_dict
["vdu-profile-id"] = requirer_id
2916 "provider/requirer or entities must be included in the relation."
2918 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2919 nsr_id
, nsd
, provider_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
2921 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2922 nsr_id
, nsd
, requirer_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
2924 provider
= EERelation(relation_provider
)
2925 requirer
= EERelation(relation_requirer
)
2926 relation
= Relation(r
["name"], provider
, requirer
)
2927 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
2929 relations
.append(relation
)
2932 def _get_kdu_resource_data(
2934 ee_relation
: EERelation
,
2935 db_nsr
: Dict
[str, Any
],
2936 cached_vnfds
: Dict
[str, Any
],
2937 ) -> DeployedK8sResource
:
2938 nsd
= get_nsd(db_nsr
)
2939 vnf_profiles
= get_vnf_profiles(nsd
)
2940 vnfd_id
= find_in_list(
2942 lambda vnf_profile
: vnf_profile
["id"] == ee_relation
.vnf_profile_id
,
2944 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2945 kdu_resource_profile
= get_kdu_resource_profile(
2946 db_vnfd
, ee_relation
.kdu_resource_profile_id
2948 kdu_name
= kdu_resource_profile
["kdu-name"]
2949 deployed_kdu
, _
= get_deployed_kdu(
2950 db_nsr
.get("_admin", ()).get("deployed", ()),
2952 ee_relation
.vnf_profile_id
,
2954 deployed_kdu
.update({"resource-name": kdu_resource_profile
["resource-name"]})
2957 def _get_deployed_component(
2959 ee_relation
: EERelation
,
2960 db_nsr
: Dict
[str, Any
],
2961 cached_vnfds
: Dict
[str, Any
],
2962 ) -> DeployedComponent
:
2963 nsr_id
= db_nsr
["_id"]
2964 deployed_component
= None
2965 ee_level
= EELevel
.get_level(ee_relation
)
2966 if ee_level
== EELevel
.NS
:
2967 vca
= get_deployed_vca(db_nsr
, {"vdu_id": None, "member-vnf-index": None})
2969 deployed_component
= DeployedVCA(nsr_id
, vca
)
2970 elif ee_level
== EELevel
.VNF
:
2971 vca
= get_deployed_vca(
2975 "member-vnf-index": ee_relation
.vnf_profile_id
,
2976 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
2980 deployed_component
= DeployedVCA(nsr_id
, vca
)
2981 elif ee_level
== EELevel
.VDU
:
2982 vca
= get_deployed_vca(
2985 "vdu_id": ee_relation
.vdu_profile_id
,
2986 "member-vnf-index": ee_relation
.vnf_profile_id
,
2987 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
2991 deployed_component
= DeployedVCA(nsr_id
, vca
)
2992 elif ee_level
== EELevel
.KDU
:
2993 kdu_resource_data
= self
._get
_kdu
_resource
_data
(
2994 ee_relation
, db_nsr
, cached_vnfds
2996 if kdu_resource_data
:
2997 deployed_component
= DeployedK8sResource(kdu_resource_data
)
2998 return deployed_component
3000 async def _add_relation(
3004 db_nsr
: Dict
[str, Any
],
3005 cached_vnfds
: Dict
[str, Any
],
3006 cached_vnfrs
: Dict
[str, Any
],
3008 deployed_provider
= self
._get
_deployed
_component
(
3009 relation
.provider
, db_nsr
, cached_vnfds
3011 deployed_requirer
= self
._get
_deployed
_component
(
3012 relation
.requirer
, db_nsr
, cached_vnfds
3016 and deployed_requirer
3017 and deployed_provider
.config_sw_installed
3018 and deployed_requirer
.config_sw_installed
3020 provider_db_vnfr
= (
3022 relation
.provider
.nsr_id
,
3023 relation
.provider
.vnf_profile_id
,
3026 if relation
.provider
.vnf_profile_id
3029 requirer_db_vnfr
= (
3031 relation
.requirer
.nsr_id
,
3032 relation
.requirer
.vnf_profile_id
,
3035 if relation
.requirer
.vnf_profile_id
3038 provider_vca_id
= self
.get_vca_id(provider_db_vnfr
, db_nsr
)
3039 requirer_vca_id
= self
.get_vca_id(requirer_db_vnfr
, db_nsr
)
3040 provider_relation_endpoint
= RelationEndpoint(
3041 deployed_provider
.ee_id
,
3043 relation
.provider
.endpoint
,
3045 requirer_relation_endpoint
= RelationEndpoint(
3046 deployed_requirer
.ee_id
,
3048 relation
.requirer
.endpoint
,
3050 await self
.vca_map
[vca_type
].add_relation(
3051 provider
=provider_relation_endpoint
,
3052 requirer
=requirer_relation_endpoint
,
3054 # remove entry from relations list
3058 async def _add_vca_relations(
3064 timeout
: int = 3600,
3068 # 1. find all relations for this VCA
3069 # 2. wait for other peers related
3073 # STEP 1: find all relations for this VCA
3076 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3077 nsd
= get_nsd(db_nsr
)
3080 deployed_vca_dict
= get_deployed_vca_list(db_nsr
)[vca_index
]
3081 my_vca
= DeployedVCA(nsr_id
, deployed_vca_dict
)
3086 relations
.extend(self
._get
_ns
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3087 relations
.extend(self
._get
_vnf
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3089 # if no relations, terminate
3091 self
.logger
.debug(logging_text
+ " No relations")
3094 self
.logger
.debug(logging_text
+ " adding relations {}".format(relations
))
3101 if now
- start
>= timeout
:
3102 self
.logger
.error(logging_text
+ " : timeout adding relations")
3105 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3106 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3108 # for each relation, find the VCA's related
3109 for relation
in relations
.copy():
3110 added
= await self
._add
_relation
(
3118 relations
.remove(relation
)
3121 self
.logger
.debug("Relations added")
3123 await asyncio
.sleep(5.0)
3127 except Exception as e
:
3128 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
3131 async def _install_kdu(
3139 k8s_instance_info
: dict,
3140 k8params
: dict = None,
3146 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
3149 "collection": "nsrs",
3150 "filter": {"_id": nsr_id
},
3151 "path": nsr_db_path
,
3154 if k8s_instance_info
.get("kdu-deployment-name"):
3155 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
3157 kdu_instance
= self
.k8scluster_map
[
3159 ].generate_kdu_instance_name(
3160 db_dict
=db_dict_install
,
3161 kdu_model
=k8s_instance_info
["kdu-model"],
3162 kdu_name
=k8s_instance_info
["kdu-name"],
3165 "nsrs", nsr_id
, {nsr_db_path
+ ".kdu-instance": kdu_instance
}
3167 await self
.k8scluster_map
[k8sclustertype
].install(
3168 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3169 kdu_model
=k8s_instance_info
["kdu-model"],
3172 db_dict
=db_dict_install
,
3174 kdu_name
=k8s_instance_info
["kdu-name"],
3175 namespace
=k8s_instance_info
["namespace"],
3176 kdu_instance
=kdu_instance
,
3180 "nsrs", nsr_id
, {nsr_db_path
+ ".kdu-instance": kdu_instance
}
3183 # Obtain services to obtain management service ip
3184 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
3185 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3186 kdu_instance
=kdu_instance
,
3187 namespace
=k8s_instance_info
["namespace"],
3190 # Obtain management service info (if exists)
3191 vnfr_update_dict
= {}
3192 kdu_config
= get_configuration(vnfd
, kdud
["name"])
3194 target_ee_list
= kdu_config
.get("execution-environment-list", [])
3199 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
3202 for service
in kdud
.get("service", [])
3203 if service
.get("mgmt-service")
3205 for mgmt_service
in mgmt_services
:
3206 for service
in services
:
3207 if service
["name"].startswith(mgmt_service
["name"]):
3208 # Mgmt service found, Obtain service ip
3209 ip
= service
.get("external_ip", service
.get("cluster_ip"))
3210 if isinstance(ip
, list) and len(ip
) == 1:
3214 "kdur.{}.ip-address".format(kdu_index
)
3217 # Check if must update also mgmt ip at the vnf
3218 service_external_cp
= mgmt_service
.get(
3219 "external-connection-point-ref"
3221 if service_external_cp
:
3223 deep_get(vnfd
, ("mgmt-interface", "cp"))
3224 == service_external_cp
3226 vnfr_update_dict
["ip-address"] = ip
3231 "external-connection-point-ref", ""
3233 == service_external_cp
,
3236 "kdur.{}.ip-address".format(kdu_index
)
3241 "Mgmt service name: {} not found".format(
3242 mgmt_service
["name"]
3246 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3247 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3249 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3252 and kdu_config
.get("initial-config-primitive")
3253 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3255 initial_config_primitive_list
= kdu_config
.get(
3256 "initial-config-primitive"
3258 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3260 for initial_config_primitive
in initial_config_primitive_list
:
3261 primitive_params_
= self
._map
_primitive
_params
(
3262 initial_config_primitive
, {}, {}
3265 await asyncio
.wait_for(
3266 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3267 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3268 kdu_instance
=kdu_instance
,
3269 primitive_name
=initial_config_primitive
["name"],
3270 params
=primitive_params_
,
3271 db_dict
=db_dict_install
,
3277 except Exception as e
:
3278 # Prepare update db with error and raise exception
3281 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3285 vnfr_data
.get("_id"),
3286 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3289 # ignore to keep original exception
3291 # reraise original error
3296 async def deploy_kdus(
3303 task_instantiation_info
,
3305 # Launch kdus if present in the descriptor
3307 k8scluster_id_2_uuic
= {
3308 "helm-chart-v3": {},
3313 async def _get_cluster_id(cluster_id
, cluster_type
):
3314 nonlocal k8scluster_id_2_uuic
3315 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3316 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3318 # check if K8scluster is creating and wait look if previous tasks in process
3319 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3320 "k8scluster", cluster_id
3323 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3324 task_name
, cluster_id
3326 self
.logger
.debug(logging_text
+ text
)
3327 await asyncio
.wait(task_dependency
, timeout
=3600)
3329 db_k8scluster
= self
.db
.get_one(
3330 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3332 if not db_k8scluster
:
3333 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3335 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3337 if cluster_type
== "helm-chart-v3":
3339 # backward compatibility for existing clusters that have not been initialized for helm v3
3340 k8s_credentials
= yaml
.safe_dump(
3341 db_k8scluster
.get("credentials")
3343 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3344 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3346 db_k8scluster_update
= {}
3347 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3348 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3349 db_k8scluster_update
[
3350 "_admin.helm-chart-v3.created"
3352 db_k8scluster_update
[
3353 "_admin.helm-chart-v3.operationalState"
3356 "k8sclusters", cluster_id
, db_k8scluster_update
3358 except Exception as e
:
3361 + "error initializing helm-v3 cluster: {}".format(str(e
))
3364 "K8s cluster '{}' has not been initialized for '{}'".format(
3365 cluster_id
, cluster_type
3370 "K8s cluster '{}' has not been initialized for '{}'".format(
3371 cluster_id
, cluster_type
3374 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3377 logging_text
+= "Deploy kdus: "
3380 db_nsr_update
= {"_admin.deployed.K8s": []}
3381 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3384 updated_cluster_list
= []
3385 updated_v3_cluster_list
= []
3387 for vnfr_data
in db_vnfrs
.values():
3388 vca_id
= self
.get_vca_id(vnfr_data
, {})
3389 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3390 # Step 0: Prepare and set parameters
3391 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3392 vnfd_id
= vnfr_data
.get("vnfd-id")
3393 vnfd_with_id
= find_in_list(
3394 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3398 for kdud
in vnfd_with_id
["kdu"]
3399 if kdud
["name"] == kdur
["kdu-name"]
3401 namespace
= kdur
.get("k8s-namespace")
3402 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3403 if kdur
.get("helm-chart"):
3404 kdumodel
= kdur
["helm-chart"]
3405 # Default version: helm3, if helm-version is v2 assign v2
3406 k8sclustertype
= "helm-chart-v3"
3407 self
.logger
.debug("kdur: {}".format(kdur
))
3409 kdur
.get("helm-version")
3410 and kdur
.get("helm-version") == "v2"
3412 k8sclustertype
= "helm-chart"
3413 elif kdur
.get("juju-bundle"):
3414 kdumodel
= kdur
["juju-bundle"]
3415 k8sclustertype
= "juju-bundle"
3418 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3419 "juju-bundle. Maybe an old NBI version is running".format(
3420 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3423 # check if kdumodel is a file and exists
3425 vnfd_with_id
= find_in_list(
3426 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3428 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3429 if storage
: # may be not present if vnfd has not artifacts
3430 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3431 if storage
["pkg-dir"]:
3432 filename
= "{}/{}/{}s/{}".format(
3439 filename
= "{}/Scripts/{}s/{}".format(
3444 if self
.fs
.file_exists(
3445 filename
, mode
="file"
3446 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3447 kdumodel
= self
.fs
.path
+ filename
3448 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3450 except Exception: # it is not a file
3453 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3454 step
= "Synchronize repos for k8s cluster '{}'".format(
3457 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3461 k8sclustertype
== "helm-chart"
3462 and cluster_uuid
not in updated_cluster_list
3464 k8sclustertype
== "helm-chart-v3"
3465 and cluster_uuid
not in updated_v3_cluster_list
3467 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3468 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3469 cluster_uuid
=cluster_uuid
3472 if del_repo_list
or added_repo_dict
:
3473 if k8sclustertype
== "helm-chart":
3475 "_admin.helm_charts_added." + item
: None
3476 for item
in del_repo_list
3479 "_admin.helm_charts_added." + item
: name
3480 for item
, name
in added_repo_dict
.items()
3482 updated_cluster_list
.append(cluster_uuid
)
3483 elif k8sclustertype
== "helm-chart-v3":
3485 "_admin.helm_charts_v3_added." + item
: None
3486 for item
in del_repo_list
3489 "_admin.helm_charts_v3_added." + item
: name
3490 for item
, name
in added_repo_dict
.items()
3492 updated_v3_cluster_list
.append(cluster_uuid
)
3494 logging_text
+ "repos synchronized on k8s cluster "
3495 "'{}' to_delete: {}, to_add: {}".format(
3496 k8s_cluster_id
, del_repo_list
, added_repo_dict
3501 {"_id": k8s_cluster_id
},
3507 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3508 vnfr_data
["member-vnf-index-ref"],
3512 k8s_instance_info
= {
3513 "kdu-instance": None,
3514 "k8scluster-uuid": cluster_uuid
,
3515 "k8scluster-type": k8sclustertype
,
3516 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3517 "kdu-name": kdur
["kdu-name"],
3518 "kdu-model": kdumodel
,
3519 "namespace": namespace
,
3520 "kdu-deployment-name": kdu_deployment_name
,
3522 db_path
= "_admin.deployed.K8s.{}".format(index
)
3523 db_nsr_update
[db_path
] = k8s_instance_info
3524 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3525 vnfd_with_id
= find_in_list(
3526 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3528 task
= asyncio
.ensure_future(
3537 k8params
=desc_params
,
3542 self
.lcm_tasks
.register(
3546 "instantiate_KDU-{}".format(index
),
3549 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3555 except (LcmException
, asyncio
.CancelledError
):
3557 except Exception as e
:
3558 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3559 if isinstance(e
, (N2VCException
, DbException
)):
3560 self
.logger
.error(logging_text
+ msg
)
3562 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3563 raise LcmException(msg
)
3566 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3585 task_instantiation_info
,
3588 # launch instantiate_N2VC in a asyncio task and register task object
3589 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3590 # if not found, create one entry and update database
3591 # fill db_nsr._admin.deployed.VCA.<index>
3594 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
3596 if "execution-environment-list" in descriptor_config
:
3597 ee_list
= descriptor_config
.get("execution-environment-list", [])
3598 elif "juju" in descriptor_config
:
3599 ee_list
= [descriptor_config
] # ns charms
3600 else: # other types as script are not supported
3603 for ee_item
in ee_list
:
3606 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3607 ee_item
.get("juju"), ee_item
.get("helm-chart")
3610 ee_descriptor_id
= ee_item
.get("id")
3611 if ee_item
.get("juju"):
3612 vca_name
= ee_item
["juju"].get("charm")
3615 if ee_item
["juju"].get("charm") is not None
3618 if ee_item
["juju"].get("cloud") == "k8s":
3619 vca_type
= "k8s_proxy_charm"
3620 elif ee_item
["juju"].get("proxy") is False:
3621 vca_type
= "native_charm"
3622 elif ee_item
.get("helm-chart"):
3623 vca_name
= ee_item
["helm-chart"]
3624 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
3627 vca_type
= "helm-v3"
3630 logging_text
+ "skipping non juju neither charm configuration"
3635 for vca_index
, vca_deployed
in enumerate(
3636 db_nsr
["_admin"]["deployed"]["VCA"]
3638 if not vca_deployed
:
3641 vca_deployed
.get("member-vnf-index") == member_vnf_index
3642 and vca_deployed
.get("vdu_id") == vdu_id
3643 and vca_deployed
.get("kdu_name") == kdu_name
3644 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
3645 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
3649 # not found, create one.
3651 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
3654 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
3656 target
+= "/kdu/{}".format(kdu_name
)
3658 "target_element": target
,
3659 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3660 "member-vnf-index": member_vnf_index
,
3662 "kdu_name": kdu_name
,
3663 "vdu_count_index": vdu_index
,
3664 "operational-status": "init", # TODO revise
3665 "detailed-status": "", # TODO revise
3666 "step": "initial-deploy", # TODO revise
3668 "vdu_name": vdu_name
,
3670 "ee_descriptor_id": ee_descriptor_id
,
3674 # create VCA and configurationStatus in db
3676 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
3677 "configurationStatus.{}".format(vca_index
): dict(),
3679 self
.update_db_2("nsrs", nsr_id
, db_dict
)
3681 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
3683 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
3684 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
3685 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
3688 task_n2vc
= asyncio
.ensure_future(
3689 self
.instantiate_N2VC(
3690 logging_text
=logging_text
,
3691 vca_index
=vca_index
,
3697 vdu_index
=vdu_index
,
3698 deploy_params
=deploy_params
,
3699 config_descriptor
=descriptor_config
,
3700 base_folder
=base_folder
,
3701 nslcmop_id
=nslcmop_id
,
3705 ee_config_descriptor
=ee_item
,
3708 self
.lcm_tasks
.register(
3712 "instantiate_N2VC-{}".format(vca_index
),
3715 task_instantiation_info
[
3717 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
3718 member_vnf_index
or "", vdu_id
or ""
3722 def _create_nslcmop(nsr_id
, operation
, params
):
3724 Creates a ns-lcm-opp content to be stored at database.
3725 :param nsr_id: internal id of the instance
3726 :param operation: instantiate, terminate, scale, action, ...
3727 :param params: user parameters for the operation
3728 :return: dictionary following SOL005 format
3730 # Raise exception if invalid arguments
3731 if not (nsr_id
and operation
and params
):
3733 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3740 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3741 "operationState": "PROCESSING",
3742 "statusEnteredTime": now
,
3743 "nsInstanceId": nsr_id
,
3744 "lcmOperationType": operation
,
3746 "isAutomaticInvocation": False,
3747 "operationParams": params
,
3748 "isCancelPending": False,
3750 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
3751 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
3756 def _format_additional_params(self
, params
):
3757 params
= params
or {}
3758 for key
, value
in params
.items():
3759 if str(value
).startswith("!!yaml "):
3760 params
[key
] = yaml
.safe_load(value
[7:])
3763 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
3764 primitive
= seq
.get("name")
3765 primitive_params
= {}
3767 "member_vnf_index": vnf_index
,
3768 "primitive": primitive
,
3769 "primitive_params": primitive_params
,
3772 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
3776 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
3777 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
3778 if op
.get("operationState") == "COMPLETED":
3779 # b. Skip sub-operation
3780 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3781 return self
.SUBOPERATION_STATUS_SKIP
3783 # c. retry executing sub-operation
3784 # The sub-operation exists, and operationState != 'COMPLETED'
3785 # Update operationState = 'PROCESSING' to indicate a retry.
3786 operationState
= "PROCESSING"
3787 detailed_status
= "In progress"
3788 self
._update
_suboperation
_status
(
3789 db_nslcmop
, op_index
, operationState
, detailed_status
3791 # Return the sub-operation index
3792 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3793 # with arguments extracted from the sub-operation
3796 # Find a sub-operation where all keys in a matching dictionary must match
3797 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3798 def _find_suboperation(self
, db_nslcmop
, match
):
3799 if db_nslcmop
and match
:
3800 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
3801 for i
, op
in enumerate(op_list
):
3802 if all(op
.get(k
) == match
[k
] for k
in match
):
3804 return self
.SUBOPERATION_STATUS_NOT_FOUND
3806 # Update status for a sub-operation given its index
3807 def _update_suboperation_status(
3808 self
, db_nslcmop
, op_index
, operationState
, detailed_status
3810 # Update DB for HA tasks
3811 q_filter
= {"_id": db_nslcmop
["_id"]}
3813 "_admin.operations.{}.operationState".format(op_index
): operationState
,
3814 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
3817 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
3820 # Add sub-operation, return the index of the added sub-operation
3821 # Optionally, set operationState, detailed-status, and operationType
3822 # Status and type are currently set for 'scale' sub-operations:
3823 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3824 # 'detailed-status' : status message
3825 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3826 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3827 def _add_suboperation(
3835 mapped_primitive_params
,
3836 operationState
=None,
3837 detailed_status
=None,
3840 RO_scaling_info
=None,
3843 return self
.SUBOPERATION_STATUS_NOT_FOUND
3844 # Get the "_admin.operations" list, if it exists
3845 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
3846 op_list
= db_nslcmop_admin
.get("operations")
3847 # Create or append to the "_admin.operations" list
3849 "member_vnf_index": vnf_index
,
3851 "vdu_count_index": vdu_count_index
,
3852 "primitive": primitive
,
3853 "primitive_params": mapped_primitive_params
,
3856 new_op
["operationState"] = operationState
3858 new_op
["detailed-status"] = detailed_status
3860 new_op
["lcmOperationType"] = operationType
3862 new_op
["RO_nsr_id"] = RO_nsr_id
3864 new_op
["RO_scaling_info"] = RO_scaling_info
3866 # No existing operations, create key 'operations' with current operation as first list element
3867 db_nslcmop_admin
.update({"operations": [new_op
]})
3868 op_list
= db_nslcmop_admin
.get("operations")
3870 # Existing operations, append operation to list
3871 op_list
.append(new_op
)
3873 db_nslcmop_update
= {"_admin.operations": op_list
}
3874 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
3875 op_index
= len(op_list
) - 1
3878 # Helper methods for scale() sub-operations
3880 # pre-scale/post-scale:
3881 # Check for 3 different cases:
3882 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
3883 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
3884 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
3885 def _check_or_add_scale_suboperation(
3889 vnf_config_primitive
,
3893 RO_scaling_info
=None,
3895 # Find this sub-operation
3896 if RO_nsr_id
and RO_scaling_info
:
3897 operationType
= "SCALE-RO"
3899 "member_vnf_index": vnf_index
,
3900 "RO_nsr_id": RO_nsr_id
,
3901 "RO_scaling_info": RO_scaling_info
,
3905 "member_vnf_index": vnf_index
,
3906 "primitive": vnf_config_primitive
,
3907 "primitive_params": primitive_params
,
3908 "lcmOperationType": operationType
,
3910 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
3911 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
3912 # a. New sub-operation
3913 # The sub-operation does not exist, add it.
3914 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
3915 # The following parameters are set to None for all kind of scaling:
3917 vdu_count_index
= None
3919 if RO_nsr_id
and RO_scaling_info
:
3920 vnf_config_primitive
= None
3921 primitive_params
= None
3924 RO_scaling_info
= None
3925 # Initial status for sub-operation
3926 operationState
= "PROCESSING"
3927 detailed_status
= "In progress"
3928 # Add sub-operation for pre/post-scaling (zero or more operations)
3929 self
._add
_suboperation
(
3935 vnf_config_primitive
,
3943 return self
.SUBOPERATION_STATUS_NEW
3945 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3946 # or op_index (operationState != 'COMPLETED')
3947 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
3949 # Function to return execution_environment id
3951 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
3952 # TODO vdu_index_count
3953 for vca
in vca_deployed_list
:
3954 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
3957 async def destroy_N2VC(
3965 exec_primitives
=True,
3970 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
3971 :param logging_text:
3973 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
3974 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
3975 :param vca_index: index in the database _admin.deployed.VCA
3976 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
3977 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
3978 not executed properly
3979 :param scaling_in: True destroys the application, False destroys the model
3980 :return: None or exception
3985 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
3986 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
3990 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
3992 # execute terminate_primitives
3994 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
3995 config_descriptor
.get("terminate-config-primitive"),
3996 vca_deployed
.get("ee_descriptor_id"),
3998 vdu_id
= vca_deployed
.get("vdu_id")
3999 vdu_count_index
= vca_deployed
.get("vdu_count_index")
4000 vdu_name
= vca_deployed
.get("vdu_name")
4001 vnf_index
= vca_deployed
.get("member-vnf-index")
4002 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
4003 for seq
in terminate_primitives
:
4004 # For each sequence in list, get primitive and call _ns_execute_primitive()
4005 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
4006 vnf_index
, seq
.get("name")
4008 self
.logger
.debug(logging_text
+ step
)
4009 # Create the primitive for each sequence, i.e. "primitive": "touch"
4010 primitive
= seq
.get("name")
4011 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
4016 self
._add
_suboperation
(
4023 mapped_primitive_params
,
4025 # Sub-operations: Call _ns_execute_primitive() instead of action()
4027 result
, result_detail
= await self
._ns
_execute
_primitive
(
4028 vca_deployed
["ee_id"],
4030 mapped_primitive_params
,
4034 except LcmException
:
4035 # this happens when VCA is not deployed. In this case it is not needed to terminate
4037 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
4038 if result
not in result_ok
:
4040 "terminate_primitive {} for vnf_member_index={} fails with "
4041 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
4043 # set that this VCA do not need terminated
4044 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
4048 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
4051 # Delete Prometheus Jobs if any
4052 # This uses NSR_ID, so it will destroy any jobs under this index
4053 self
.db
.del_list("prometheus_jobs", {"nsr_id": db_nslcmop
["nsInstanceId"]})
4056 await self
.vca_map
[vca_type
].delete_execution_environment(
4057 vca_deployed
["ee_id"],
4058 scaling_in
=scaling_in
,
4063 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
4064 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
4065 namespace
= "." + db_nsr
["_id"]
4067 await self
.n2vc
.delete_namespace(
4068 namespace
=namespace
,
4069 total_timeout
=self
.timeout_charm_delete
,
4072 except N2VCNotFound
: # already deleted. Skip
4074 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
4076 async def _terminate_RO(
4077 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4080 Terminates a deployment from RO
4081 :param logging_text:
4082 :param nsr_deployed: db_nsr._admin.deployed
4085 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
4086 this method will update only the index 2, but it will write on database the concatenated content of the list
4091 ro_nsr_id
= ro_delete_action
= None
4092 if nsr_deployed
and nsr_deployed
.get("RO"):
4093 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
4094 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
4097 stage
[2] = "Deleting ns from VIM."
4098 db_nsr_update
["detailed-status"] = " ".join(stage
)
4099 self
._write
_op
_status
(nslcmop_id
, stage
)
4100 self
.logger
.debug(logging_text
+ stage
[2])
4101 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4102 self
._write
_op
_status
(nslcmop_id
, stage
)
4103 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
4104 ro_delete_action
= desc
["action_id"]
4106 "_admin.deployed.RO.nsr_delete_action_id"
4107 ] = ro_delete_action
4108 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4109 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4110 if ro_delete_action
:
4111 # wait until NS is deleted from VIM
4112 stage
[2] = "Waiting ns deleted from VIM."
4113 detailed_status_old
= None
4117 + " RO_id={} ro_delete_action={}".format(
4118 ro_nsr_id
, ro_delete_action
4121 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4122 self
._write
_op
_status
(nslcmop_id
, stage
)
4124 delete_timeout
= 20 * 60 # 20 minutes
4125 while delete_timeout
> 0:
4126 desc
= await self
.RO
.show(
4128 item_id_name
=ro_nsr_id
,
4129 extra_item
="action",
4130 extra_item_id
=ro_delete_action
,
4134 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
4136 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
4137 if ns_status
== "ERROR":
4138 raise ROclient
.ROClientException(ns_status_info
)
4139 elif ns_status
== "BUILD":
4140 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
4141 elif ns_status
== "ACTIVE":
4142 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4143 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4148 ), "ROclient.check_action_status returns unknown {}".format(
4151 if stage
[2] != detailed_status_old
:
4152 detailed_status_old
= stage
[2]
4153 db_nsr_update
["detailed-status"] = " ".join(stage
)
4154 self
._write
_op
_status
(nslcmop_id
, stage
)
4155 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4156 await asyncio
.sleep(5, loop
=self
.loop
)
4158 else: # delete_timeout <= 0:
4159 raise ROclient
.ROClientException(
4160 "Timeout waiting ns deleted from VIM"
4163 except Exception as e
:
4164 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4166 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4168 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4169 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4170 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4172 logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
)
4175 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4177 failed_detail
.append("delete conflict: {}".format(e
))
4180 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
)
4183 failed_detail
.append("delete error: {}".format(e
))
4185 logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
)
4189 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
4190 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
4192 stage
[2] = "Deleting nsd from RO."
4193 db_nsr_update
["detailed-status"] = " ".join(stage
)
4194 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4195 self
._write
_op
_status
(nslcmop_id
, stage
)
4196 await self
.RO
.delete("nsd", ro_nsd_id
)
4198 logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
)
4200 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4201 except Exception as e
:
4203 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4205 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4207 logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
)
4210 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4212 failed_detail
.append(
4213 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
)
4215 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4217 failed_detail
.append(
4218 "ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
)
4220 self
.logger
.error(logging_text
+ failed_detail
[-1])
4222 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
4223 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
4224 if not vnf_deployed
or not vnf_deployed
["id"]:
4227 ro_vnfd_id
= vnf_deployed
["id"]
4230 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4231 vnf_deployed
["member-vnf-index"], ro_vnfd_id
4233 db_nsr_update
["detailed-status"] = " ".join(stage
)
4234 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4235 self
._write
_op
_status
(nslcmop_id
, stage
)
4236 await self
.RO
.delete("vnfd", ro_vnfd_id
)
4238 logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
)
4240 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
4241 except Exception as e
:
4243 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4246 "_admin.deployed.RO.vnfd.{}.id".format(index
)
4250 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
)
4253 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4255 failed_detail
.append(
4256 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
)
4258 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4260 failed_detail
.append(
4261 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
)
4263 self
.logger
.error(logging_text
+ failed_detail
[-1])
4266 stage
[2] = "Error deleting from VIM"
4268 stage
[2] = "Deleted from VIM"
4269 db_nsr_update
["detailed-status"] = " ".join(stage
)
4270 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4271 self
._write
_op
_status
(nslcmop_id
, stage
)
4274 raise LcmException("; ".join(failed_detail
))
4276 async def terminate(self
, nsr_id
, nslcmop_id
):
4277 # Try to lock HA task here
4278 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4279 if not task_is_locked_by_me
:
4282 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4283 self
.logger
.debug(logging_text
+ "Enter")
4284 timeout_ns_terminate
= self
.timeout_ns_terminate
4287 operation_params
= None
4289 error_list
= [] # annotates all failed error messages
4290 db_nslcmop_update
= {}
4291 autoremove
= False # autoremove after terminated
4292 tasks_dict_info
= {}
4295 "Stage 1/3: Preparing task.",
4296 "Waiting for previous operations to terminate.",
4299 # ^ contains [stage, step, VIM-status]
4301 # wait for any previous tasks in process
4302 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4304 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4305 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4306 operation_params
= db_nslcmop
.get("operationParams") or {}
4307 if operation_params
.get("timeout_ns_terminate"):
4308 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4309 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4310 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4312 db_nsr_update
["operational-status"] = "terminating"
4313 db_nsr_update
["config-status"] = "terminating"
4314 self
._write
_ns
_status
(
4316 ns_state
="TERMINATING",
4317 current_operation
="TERMINATING",
4318 current_operation_id
=nslcmop_id
,
4319 other_update
=db_nsr_update
,
4321 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4322 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4323 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4326 stage
[1] = "Getting vnf descriptors from db."
4327 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4329 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4331 db_vnfds_from_id
= {}
4332 db_vnfds_from_member_index
= {}
4334 for vnfr
in db_vnfrs_list
:
4335 vnfd_id
= vnfr
["vnfd-id"]
4336 if vnfd_id
not in db_vnfds_from_id
:
4337 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4338 db_vnfds_from_id
[vnfd_id
] = vnfd
4339 db_vnfds_from_member_index
[
4340 vnfr
["member-vnf-index-ref"]
4341 ] = db_vnfds_from_id
[vnfd_id
]
4343 # Destroy individual execution environments when there are terminating primitives.
4344 # Rest of EE will be deleted at once
4345 # TODO - check before calling _destroy_N2VC
4346 # if not operation_params.get("skip_terminate_primitives"):#
4347 # or not vca.get("needed_terminate"):
4348 stage
[0] = "Stage 2/3 execute terminating primitives."
4349 self
.logger
.debug(logging_text
+ stage
[0])
4350 stage
[1] = "Looking execution environment that needs terminate."
4351 self
.logger
.debug(logging_text
+ stage
[1])
4353 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4354 config_descriptor
= None
4355 vca_member_vnf_index
= vca
.get("member-vnf-index")
4356 vca_id
= self
.get_vca_id(
4357 db_vnfrs_dict
.get(vca_member_vnf_index
)
4358 if vca_member_vnf_index
4362 if not vca
or not vca
.get("ee_id"):
4364 if not vca
.get("member-vnf-index"):
4366 config_descriptor
= db_nsr
.get("ns-configuration")
4367 elif vca
.get("vdu_id"):
4368 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4369 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4370 elif vca
.get("kdu_name"):
4371 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4372 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4374 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4375 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4376 vca_type
= vca
.get("type")
4377 exec_terminate_primitives
= not operation_params
.get(
4378 "skip_terminate_primitives"
4379 ) and vca
.get("needed_terminate")
4380 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4381 # pending native charms
4383 True if vca_type
in ("helm", "helm-v3", "native_charm") else False
4385 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4386 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4387 task
= asyncio
.ensure_future(
4395 exec_terminate_primitives
,
4399 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4401 # wait for pending tasks of terminate primitives
4405 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4407 error_list
= await self
._wait
_for
_tasks
(
4410 min(self
.timeout_charm_delete
, timeout_ns_terminate
),
4414 tasks_dict_info
.clear()
4416 return # raise LcmException("; ".join(error_list))
4418 # remove All execution environments at once
4419 stage
[0] = "Stage 3/3 delete all."
4421 if nsr_deployed
.get("VCA"):
4422 stage
[1] = "Deleting all execution environments."
4423 self
.logger
.debug(logging_text
+ stage
[1])
4424 vca_id
= self
.get_vca_id({}, db_nsr
)
4425 task_delete_ee
= asyncio
.ensure_future(
4427 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4428 timeout
=self
.timeout_charm_delete
,
4431 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4432 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4434 # Delete from k8scluster
4435 stage
[1] = "Deleting KDUs."
4436 self
.logger
.debug(logging_text
+ stage
[1])
4437 # print(nsr_deployed)
4438 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4439 if not kdu
or not kdu
.get("kdu-instance"):
4441 kdu_instance
= kdu
.get("kdu-instance")
4442 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4443 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4444 vca_id
= self
.get_vca_id({}, db_nsr
)
4445 task_delete_kdu_instance
= asyncio
.ensure_future(
4446 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4447 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4448 kdu_instance
=kdu_instance
,
4455 + "Unknown k8s deployment type {}".format(
4456 kdu
.get("k8scluster-type")
4461 task_delete_kdu_instance
4462 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4465 stage
[1] = "Deleting ns from VIM."
4467 task_delete_ro
= asyncio
.ensure_future(
4468 self
._terminate
_ng
_ro
(
4469 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4473 task_delete_ro
= asyncio
.ensure_future(
4475 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4478 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4480 # rest of staff will be done at finally
4483 ROclient
.ROClientException
,
4488 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4490 except asyncio
.CancelledError
:
4492 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4494 exc
= "Operation was cancelled"
4495 except Exception as e
:
4496 exc
= traceback
.format_exc()
4497 self
.logger
.critical(
4498 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4503 error_list
.append(str(exc
))
4505 # wait for pending tasks
4507 stage
[1] = "Waiting for terminate pending tasks."
4508 self
.logger
.debug(logging_text
+ stage
[1])
4509 error_list
+= await self
._wait
_for
_tasks
(
4512 timeout_ns_terminate
,
4516 stage
[1] = stage
[2] = ""
4517 except asyncio
.CancelledError
:
4518 error_list
.append("Cancelled")
4519 # TODO cancell all tasks
4520 except Exception as exc
:
4521 error_list
.append(str(exc
))
4522 # update status at database
4524 error_detail
= "; ".join(error_list
)
4525 # self.logger.error(logging_text + error_detail)
4526 error_description_nslcmop
= "{} Detail: {}".format(
4527 stage
[0], error_detail
4529 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4530 nslcmop_id
, stage
[0]
4533 db_nsr_update
["operational-status"] = "failed"
4534 db_nsr_update
["detailed-status"] = (
4535 error_description_nsr
+ " Detail: " + error_detail
4537 db_nslcmop_update
["detailed-status"] = error_detail
4538 nslcmop_operation_state
= "FAILED"
4542 error_description_nsr
= error_description_nslcmop
= None
4543 ns_state
= "NOT_INSTANTIATED"
4544 db_nsr_update
["operational-status"] = "terminated"
4545 db_nsr_update
["detailed-status"] = "Done"
4546 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4547 db_nslcmop_update
["detailed-status"] = "Done"
4548 nslcmop_operation_state
= "COMPLETED"
4551 self
._write
_ns
_status
(
4554 current_operation
="IDLE",
4555 current_operation_id
=None,
4556 error_description
=error_description_nsr
,
4557 error_detail
=error_detail
,
4558 other_update
=db_nsr_update
,
4560 self
._write
_op
_status
(
4563 error_message
=error_description_nslcmop
,
4564 operation_state
=nslcmop_operation_state
,
4565 other_update
=db_nslcmop_update
,
4567 if ns_state
== "NOT_INSTANTIATED":
4571 {"nsr-id-ref": nsr_id
},
4572 {"_admin.nsState": "NOT_INSTANTIATED"},
4574 except DbException
as e
:
4577 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4581 if operation_params
:
4582 autoremove
= operation_params
.get("autoremove", False)
4583 if nslcmop_operation_state
:
4585 await self
.msg
.aiowrite(
4590 "nslcmop_id": nslcmop_id
,
4591 "operationState": nslcmop_operation_state
,
4592 "autoremove": autoremove
,
4596 except Exception as e
:
4598 logging_text
+ "kafka_write notification Exception {}".format(e
)
4601 self
.logger
.debug(logging_text
+ "Exit")
4602 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4604 async def _wait_for_tasks(
4605 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4608 error_detail_list
= []
4610 pending_tasks
= list(created_tasks_info
.keys())
4611 num_tasks
= len(pending_tasks
)
4613 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4614 self
._write
_op
_status
(nslcmop_id
, stage
)
4615 while pending_tasks
:
4617 _timeout
= timeout
+ time_start
- time()
4618 done
, pending_tasks
= await asyncio
.wait(
4619 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4621 num_done
+= len(done
)
4622 if not done
: # Timeout
4623 for task
in pending_tasks
:
4624 new_error
= created_tasks_info
[task
] + ": Timeout"
4625 error_detail_list
.append(new_error
)
4626 error_list
.append(new_error
)
4629 if task
.cancelled():
4632 exc
= task
.exception()
4634 if isinstance(exc
, asyncio
.TimeoutError
):
4636 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4637 error_list
.append(created_tasks_info
[task
])
4638 error_detail_list
.append(new_error
)
4645 ROclient
.ROClientException
,
4651 self
.logger
.error(logging_text
+ new_error
)
4653 exc_traceback
= "".join(
4654 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4658 + created_tasks_info
[task
]
4664 logging_text
+ created_tasks_info
[task
] + ": Done"
4666 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4668 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4669 if nsr_id
: # update also nsr
4674 "errorDescription": "Error at: " + ", ".join(error_list
),
4675 "errorDetail": ". ".join(error_detail_list
),
4678 self
._write
_op
_status
(nslcmop_id
, stage
)
4679 return error_detail_list
4682 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4684 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4685 The default-value is used. If it is between < > it look for a value at instantiation_params
4686 :param primitive_desc: portion of VNFD/NSD that describes primitive
4687 :param params: Params provided by user
4688 :param instantiation_params: Instantiation params provided by user
4689 :return: a dictionary with the calculated params
4691 calculated_params
= {}
4692 for parameter
in primitive_desc
.get("parameter", ()):
4693 param_name
= parameter
["name"]
4694 if param_name
in params
:
4695 calculated_params
[param_name
] = params
[param_name
]
4696 elif "default-value" in parameter
or "value" in parameter
:
4697 if "value" in parameter
:
4698 calculated_params
[param_name
] = parameter
["value"]
4700 calculated_params
[param_name
] = parameter
["default-value"]
4702 isinstance(calculated_params
[param_name
], str)
4703 and calculated_params
[param_name
].startswith("<")
4704 and calculated_params
[param_name
].endswith(">")
4706 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4707 calculated_params
[param_name
] = instantiation_params
[
4708 calculated_params
[param_name
][1:-1]
4712 "Parameter {} needed to execute primitive {} not provided".format(
4713 calculated_params
[param_name
], primitive_desc
["name"]
4718 "Parameter {} needed to execute primitive {} not provided".format(
4719 param_name
, primitive_desc
["name"]
4723 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4724 calculated_params
[param_name
] = yaml
.safe_dump(
4725 calculated_params
[param_name
], default_flow_style
=True, width
=256
4727 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4729 ].startswith("!!yaml "):
4730 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4731 if parameter
.get("data-type") == "INTEGER":
4733 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4734 except ValueError: # error converting string to int
4736 "Parameter {} of primitive {} must be integer".format(
4737 param_name
, primitive_desc
["name"]
4740 elif parameter
.get("data-type") == "BOOLEAN":
4741 calculated_params
[param_name
] = not (
4742 (str(calculated_params
[param_name
])).lower() == "false"
4745 # add always ns_config_info if primitive name is config
4746 if primitive_desc
["name"] == "config":
4747 if "ns_config_info" in instantiation_params
:
4748 calculated_params
["ns_config_info"] = instantiation_params
[
4751 return calculated_params
4753 def _look_for_deployed_vca(
4760 ee_descriptor_id
=None,
4762 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4763 for vca
in deployed_vca
:
4766 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4769 vdu_count_index
is not None
4770 and vdu_count_index
!= vca
["vdu_count_index"]
4773 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4775 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4779 # vca_deployed not found
4781 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4782 " is not deployed".format(
4791 ee_id
= vca
.get("ee_id")
4793 "type", "lxc_proxy_charm"
4794 ) # default value for backward compatibility - proxy charm
4797 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4798 "execution environment".format(
4799 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
4802 return ee_id
, vca_type
4804 async def _ns_execute_primitive(
4810 retries_interval
=30,
4817 if primitive
== "config":
4818 primitive_params
= {"params": primitive_params
}
4820 vca_type
= vca_type
or "lxc_proxy_charm"
4824 output
= await asyncio
.wait_for(
4825 self
.vca_map
[vca_type
].exec_primitive(
4827 primitive_name
=primitive
,
4828 params_dict
=primitive_params
,
4829 progress_timeout
=self
.timeout_progress_primitive
,
4830 total_timeout
=self
.timeout_primitive
,
4835 timeout
=timeout
or self
.timeout_primitive
,
4839 except asyncio
.CancelledError
:
4841 except Exception as e
: # asyncio.TimeoutError
4842 if isinstance(e
, asyncio
.TimeoutError
):
4847 "Error executing action {} on {} -> {}".format(
4852 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
4854 return "FAILED", str(e
)
4856 return "COMPLETED", output
4858 except (LcmException
, asyncio
.CancelledError
):
4860 except Exception as e
:
4861 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
4863 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
4865 Updating the vca_status with latest juju information in nsrs record
4866 :param: nsr_id: Id of the nsr
4867 :param: nslcmop_id: Id of the nslcmop
4871 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
4872 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4873 vca_id
= self
.get_vca_id({}, db_nsr
)
4874 if db_nsr
["_admin"]["deployed"]["K8s"]:
4875 for k8s_index
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
4876 cluster_uuid
, kdu_instance
= k8s
["k8scluster-uuid"], k8s
["kdu-instance"]
4877 await self
._on
_update
_k
8s
_db
(
4878 cluster_uuid
, kdu_instance
, filter={"_id": nsr_id
}, vca_id
=vca_id
4881 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
4882 table
, filter = "nsrs", {"_id": nsr_id
}
4883 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
4884 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
4886 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
4887 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
4889 async def action(self
, nsr_id
, nslcmop_id
):
4890 # Try to lock HA task here
4891 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4892 if not task_is_locked_by_me
:
4895 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
4896 self
.logger
.debug(logging_text
+ "Enter")
4897 # get all needed from database
4901 db_nslcmop_update
= {}
4902 nslcmop_operation_state
= None
4903 error_description_nslcmop
= None
4906 # wait for any previous tasks in process
4907 step
= "Waiting for previous operations to terminate"
4908 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4910 self
._write
_ns
_status
(
4913 current_operation
="RUNNING ACTION",
4914 current_operation_id
=nslcmop_id
,
4917 step
= "Getting information from database"
4918 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4919 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4920 if db_nslcmop
["operationParams"].get("primitive_params"):
4921 db_nslcmop
["operationParams"]["primitive_params"] = json
.loads(
4922 db_nslcmop
["operationParams"]["primitive_params"]
4925 nsr_deployed
= db_nsr
["_admin"].get("deployed")
4926 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
4927 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
4928 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
4929 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
4930 primitive
= db_nslcmop
["operationParams"]["primitive"]
4931 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
4932 timeout_ns_action
= db_nslcmop
["operationParams"].get(
4933 "timeout_ns_action", self
.timeout_primitive
4937 step
= "Getting vnfr from database"
4938 db_vnfr
= self
.db
.get_one(
4939 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
4941 if db_vnfr
.get("kdur"):
4943 for kdur
in db_vnfr
["kdur"]:
4944 if kdur
.get("additionalParams"):
4945 kdur
["additionalParams"] = json
.loads(
4946 kdur
["additionalParams"]
4948 kdur_list
.append(kdur
)
4949 db_vnfr
["kdur"] = kdur_list
4950 step
= "Getting vnfd from database"
4951 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
4953 # Sync filesystem before running a primitive
4954 self
.fs
.sync(db_vnfr
["vnfd-id"])
4956 step
= "Getting nsd from database"
4957 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
4959 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
4960 # for backward compatibility
4961 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
4962 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
4963 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
4964 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4966 # look for primitive
4967 config_primitive_desc
= descriptor_configuration
= None
4969 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
4971 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
4973 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
4975 descriptor_configuration
= db_nsd
.get("ns-configuration")
4977 if descriptor_configuration
and descriptor_configuration
.get(
4980 for config_primitive
in descriptor_configuration
["config-primitive"]:
4981 if config_primitive
["name"] == primitive
:
4982 config_primitive_desc
= config_primitive
4985 if not config_primitive_desc
:
4986 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
4988 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
4992 primitive_name
= primitive
4993 ee_descriptor_id
= None
4995 primitive_name
= config_primitive_desc
.get(
4996 "execution-environment-primitive", primitive
4998 ee_descriptor_id
= config_primitive_desc
.get(
4999 "execution-environment-ref"
5005 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
5007 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
5010 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
5012 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
5014 desc_params
= parse_yaml_strings(
5015 db_vnfr
.get("additionalParamsForVnf")
5018 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
5019 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
5020 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
5022 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
5023 actions
.add(primitive
["name"])
5024 for primitive
in kdu_configuration
.get("config-primitive", []):
5025 actions
.add(primitive
["name"])
5026 kdu_action
= True if primitive_name
in actions
else False
5028 # TODO check if ns is in a proper status
5030 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
5032 # kdur and desc_params already set from before
5033 if primitive_params
:
5034 desc_params
.update(primitive_params
)
5035 # TODO Check if we will need something at vnf level
5036 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
5038 kdu_name
== kdu
["kdu-name"]
5039 and kdu
["member-vnf-index"] == vnf_index
5044 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
5047 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
5048 msg
= "unknown k8scluster-type '{}'".format(
5049 kdu
.get("k8scluster-type")
5051 raise LcmException(msg
)
5054 "collection": "nsrs",
5055 "filter": {"_id": nsr_id
},
5056 "path": "_admin.deployed.K8s.{}".format(index
),
5060 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
5062 step
= "Executing kdu {}".format(primitive_name
)
5063 if primitive_name
== "upgrade":
5064 if desc_params
.get("kdu_model"):
5065 kdu_model
= desc_params
.get("kdu_model")
5066 del desc_params
["kdu_model"]
5068 kdu_model
= kdu
.get("kdu-model")
5069 parts
= kdu_model
.split(sep
=":")
5071 kdu_model
= parts
[0]
5073 detailed_status
= await asyncio
.wait_for(
5074 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
5075 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5076 kdu_instance
=kdu
.get("kdu-instance"),
5078 kdu_model
=kdu_model
,
5081 timeout
=timeout_ns_action
,
5083 timeout
=timeout_ns_action
+ 10,
5086 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
5088 elif primitive_name
== "rollback":
5089 detailed_status
= await asyncio
.wait_for(
5090 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
5091 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5092 kdu_instance
=kdu
.get("kdu-instance"),
5095 timeout
=timeout_ns_action
,
5097 elif primitive_name
== "status":
5098 detailed_status
= await asyncio
.wait_for(
5099 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
5100 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5101 kdu_instance
=kdu
.get("kdu-instance"),
5104 timeout
=timeout_ns_action
,
5107 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
5108 kdu
["kdu-name"], nsr_id
5110 params
= self
._map
_primitive
_params
(
5111 config_primitive_desc
, primitive_params
, desc_params
5114 detailed_status
= await asyncio
.wait_for(
5115 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
5116 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5117 kdu_instance
=kdu_instance
,
5118 primitive_name
=primitive_name
,
5121 timeout
=timeout_ns_action
,
5124 timeout
=timeout_ns_action
,
5128 nslcmop_operation_state
= "COMPLETED"
5130 detailed_status
= ""
5131 nslcmop_operation_state
= "FAILED"
5133 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5134 nsr_deployed
["VCA"],
5135 member_vnf_index
=vnf_index
,
5137 vdu_count_index
=vdu_count_index
,
5138 ee_descriptor_id
=ee_descriptor_id
,
5140 for vca_index
, vca_deployed
in enumerate(
5141 db_nsr
["_admin"]["deployed"]["VCA"]
5143 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5145 "collection": "nsrs",
5146 "filter": {"_id": nsr_id
},
5147 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
5151 nslcmop_operation_state
,
5153 ) = await self
._ns
_execute
_primitive
(
5155 primitive
=primitive_name
,
5156 primitive_params
=self
._map
_primitive
_params
(
5157 config_primitive_desc
, primitive_params
, desc_params
5159 timeout
=timeout_ns_action
,
5165 db_nslcmop_update
["detailed-status"] = detailed_status
5166 error_description_nslcmop
= (
5167 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
5171 + " task Done with result {} {}".format(
5172 nslcmop_operation_state
, detailed_status
5175 return # database update is called inside finally
5177 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5178 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5180 except asyncio
.CancelledError
:
5182 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5184 exc
= "Operation was cancelled"
5185 except asyncio
.TimeoutError
:
5186 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5188 except Exception as e
:
5189 exc
= traceback
.format_exc()
5190 self
.logger
.critical(
5191 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5200 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5201 nslcmop_operation_state
= "FAILED"
5203 self
._write
_ns
_status
(
5207 ], # TODO check if degraded. For the moment use previous status
5208 current_operation
="IDLE",
5209 current_operation_id
=None,
5210 # error_description=error_description_nsr,
5211 # error_detail=error_detail,
5212 other_update
=db_nsr_update
,
5215 self
._write
_op
_status
(
5218 error_message
=error_description_nslcmop
,
5219 operation_state
=nslcmop_operation_state
,
5220 other_update
=db_nslcmop_update
,
5223 if nslcmop_operation_state
:
5225 await self
.msg
.aiowrite(
5230 "nslcmop_id": nslcmop_id
,
5231 "operationState": nslcmop_operation_state
,
5235 except Exception as e
:
5237 logging_text
+ "kafka_write notification Exception {}".format(e
)
5239 self
.logger
.debug(logging_text
+ "Exit")
5240 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5241 return nslcmop_operation_state
, detailed_status
5243 async def scale(self
, nsr_id
, nslcmop_id
):
5244 # Try to lock HA task here
5245 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5246 if not task_is_locked_by_me
:
5249 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
5250 stage
= ["", "", ""]
5251 tasks_dict_info
= {}
5252 # ^ stage, step, VIM progress
5253 self
.logger
.debug(logging_text
+ "Enter")
5254 # get all needed from database
5256 db_nslcmop_update
= {}
5259 # in case of error, indicates what part of scale was failed to put nsr at error status
5260 scale_process
= None
5261 old_operational_status
= ""
5262 old_config_status
= ""
5265 # wait for any previous tasks in process
5266 step
= "Waiting for previous operations to terminate"
5267 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5268 self
._write
_ns
_status
(
5271 current_operation
="SCALING",
5272 current_operation_id
=nslcmop_id
,
5275 step
= "Getting nslcmop from database"
5277 step
+ " after having waited for previous tasks to be completed"
5279 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5281 step
= "Getting nsr from database"
5282 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5283 old_operational_status
= db_nsr
["operational-status"]
5284 old_config_status
= db_nsr
["config-status"]
5286 step
= "Parsing scaling parameters"
5287 db_nsr_update
["operational-status"] = "scaling"
5288 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5289 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5291 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
5293 ]["member-vnf-index"]
5294 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
5296 ]["scaling-group-descriptor"]
5297 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
5298 # for backward compatibility
5299 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5300 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5301 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5302 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5304 step
= "Getting vnfr from database"
5305 db_vnfr
= self
.db
.get_one(
5306 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
5309 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5311 step
= "Getting vnfd from database"
5312 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
5314 base_folder
= db_vnfd
["_admin"]["storage"]
5316 step
= "Getting scaling-group-descriptor"
5317 scaling_descriptor
= find_in_list(
5318 get_scaling_aspect(db_vnfd
),
5319 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
5321 if not scaling_descriptor
:
5323 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
5324 "at vnfd:scaling-group-descriptor".format(scaling_group
)
5327 step
= "Sending scale order to VIM"
5328 # TODO check if ns is in a proper status
5330 if not db_nsr
["_admin"].get("scaling-group"):
5335 "_admin.scaling-group": [
5336 {"name": scaling_group
, "nb-scale-op": 0}
5340 admin_scale_index
= 0
5342 for admin_scale_index
, admin_scale_info
in enumerate(
5343 db_nsr
["_admin"]["scaling-group"]
5345 if admin_scale_info
["name"] == scaling_group
:
5346 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
5348 else: # not found, set index one plus last element and add new entry with the name
5349 admin_scale_index
+= 1
5351 "_admin.scaling-group.{}.name".format(admin_scale_index
)
5354 vca_scaling_info
= []
5355 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
5356 if scaling_type
== "SCALE_OUT":
5357 if "aspect-delta-details" not in scaling_descriptor
:
5359 "Aspect delta details not fount in scaling descriptor {}".format(
5360 scaling_descriptor
["name"]
5363 # count if max-instance-count is reached
5364 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
5366 scaling_info
["scaling_direction"] = "OUT"
5367 scaling_info
["vdu-create"] = {}
5368 scaling_info
["kdu-create"] = {}
5369 for delta
in deltas
:
5370 for vdu_delta
in delta
.get("vdu-delta", {}):
5371 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
5372 # vdu_index also provides the number of instance of the targeted vdu
5373 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
5374 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
5378 additional_params
= (
5379 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
5382 cloud_init_list
= []
5384 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
5385 max_instance_count
= 10
5386 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
5387 max_instance_count
= vdu_profile
.get(
5388 "max-number-of-instances", 10
5391 default_instance_num
= get_number_of_instances(
5394 instances_number
= vdu_delta
.get("number-of-instances", 1)
5395 nb_scale_op
+= instances_number
5397 new_instance_count
= nb_scale_op
+ default_instance_num
5398 # Control if new count is over max and vdu count is less than max.
5399 # Then assign new instance count
5400 if new_instance_count
> max_instance_count
> vdu_count
:
5401 instances_number
= new_instance_count
- max_instance_count
5403 instances_number
= instances_number
5405 if new_instance_count
> max_instance_count
:
5407 "reached the limit of {} (max-instance-count) "
5408 "scaling-out operations for the "
5409 "scaling-group-descriptor '{}'".format(
5410 nb_scale_op
, scaling_group
5413 for x
in range(vdu_delta
.get("number-of-instances", 1)):
5415 # TODO Information of its own ip is not available because db_vnfr is not updated.
5416 additional_params
["OSM"] = get_osm_params(
5417 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
5419 cloud_init_list
.append(
5420 self
._parse
_cloud
_init
(
5427 vca_scaling_info
.append(
5429 "osm_vdu_id": vdu_delta
["id"],
5430 "member-vnf-index": vnf_index
,
5432 "vdu_index": vdu_index
+ x
,
5435 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
5436 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
5437 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
5438 kdu_name
= kdu_profile
["kdu-name"]
5439 resource_name
= kdu_profile
.get("resource-name", "")
5441 # Might have different kdus in the same delta
5442 # Should have list for each kdu
5443 if not scaling_info
["kdu-create"].get(kdu_name
, None):
5444 scaling_info
["kdu-create"][kdu_name
] = []
5446 kdur
= get_kdur(db_vnfr
, kdu_name
)
5447 if kdur
.get("helm-chart"):
5448 k8s_cluster_type
= "helm-chart-v3"
5449 self
.logger
.debug("kdur: {}".format(kdur
))
5451 kdur
.get("helm-version")
5452 and kdur
.get("helm-version") == "v2"
5454 k8s_cluster_type
= "helm-chart"
5455 elif kdur
.get("juju-bundle"):
5456 k8s_cluster_type
= "juju-bundle"
5459 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5460 "juju-bundle. Maybe an old NBI version is running".format(
5461 db_vnfr
["member-vnf-index-ref"], kdu_name
5465 max_instance_count
= 10
5466 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
5467 max_instance_count
= kdu_profile
.get(
5468 "max-number-of-instances", 10
5471 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
5472 deployed_kdu
, _
= get_deployed_kdu(
5473 nsr_deployed
, kdu_name
, vnf_index
5475 if deployed_kdu
is None:
5477 "KDU '{}' for vnf '{}' not deployed".format(
5481 kdu_instance
= deployed_kdu
.get("kdu-instance")
5482 instance_num
= await self
.k8scluster_map
[
5488 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
5489 kdu_model
=deployed_kdu
.get("kdu-model"),
5491 kdu_replica_count
= instance_num
+ kdu_delta
.get(
5492 "number-of-instances", 1
5495 # Control if new count is over max and instance_num is less than max.
5496 # Then assign max instance number to kdu replica count
5497 if kdu_replica_count
> max_instance_count
> instance_num
:
5498 kdu_replica_count
= max_instance_count
5499 if kdu_replica_count
> max_instance_count
:
5501 "reached the limit of {} (max-instance-count) "
5502 "scaling-out operations for the "
5503 "scaling-group-descriptor '{}'".format(
5504 instance_num
, scaling_group
5508 for x
in range(kdu_delta
.get("number-of-instances", 1)):
5509 vca_scaling_info
.append(
5511 "osm_kdu_id": kdu_name
,
5512 "member-vnf-index": vnf_index
,
5514 "kdu_index": instance_num
+ x
- 1,
5517 scaling_info
["kdu-create"][kdu_name
].append(
5519 "member-vnf-index": vnf_index
,
5521 "k8s-cluster-type": k8s_cluster_type
,
5522 "resource-name": resource_name
,
5523 "scale": kdu_replica_count
,
5526 elif scaling_type
== "SCALE_IN":
5527 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
5529 scaling_info
["scaling_direction"] = "IN"
5530 scaling_info
["vdu-delete"] = {}
5531 scaling_info
["kdu-delete"] = {}
5533 for delta
in deltas
:
5534 for vdu_delta
in delta
.get("vdu-delta", {}):
5535 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
5536 min_instance_count
= 0
5537 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
5538 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
5539 min_instance_count
= vdu_profile
["min-number-of-instances"]
5541 default_instance_num
= get_number_of_instances(
5542 db_vnfd
, vdu_delta
["id"]
5544 instance_num
= vdu_delta
.get("number-of-instances", 1)
5545 nb_scale_op
-= instance_num
5547 new_instance_count
= nb_scale_op
+ default_instance_num
5549 if new_instance_count
< min_instance_count
< vdu_count
:
5550 instances_number
= min_instance_count
- new_instance_count
5552 instances_number
= instance_num
5554 if new_instance_count
< min_instance_count
:
5556 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5557 "scaling-group-descriptor '{}'".format(
5558 nb_scale_op
, scaling_group
5561 for x
in range(vdu_delta
.get("number-of-instances", 1)):
5562 vca_scaling_info
.append(
5564 "osm_vdu_id": vdu_delta
["id"],
5565 "member-vnf-index": vnf_index
,
5567 "vdu_index": vdu_index
- 1 - x
,
5570 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
5571 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
5572 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
5573 kdu_name
= kdu_profile
["kdu-name"]
5574 resource_name
= kdu_profile
.get("resource-name", "")
5576 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
5577 scaling_info
["kdu-delete"][kdu_name
] = []
5579 kdur
= get_kdur(db_vnfr
, kdu_name
)
5580 if kdur
.get("helm-chart"):
5581 k8s_cluster_type
= "helm-chart-v3"
5582 self
.logger
.debug("kdur: {}".format(kdur
))
5584 kdur
.get("helm-version")
5585 and kdur
.get("helm-version") == "v2"
5587 k8s_cluster_type
= "helm-chart"
5588 elif kdur
.get("juju-bundle"):
5589 k8s_cluster_type
= "juju-bundle"
5592 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5593 "juju-bundle. Maybe an old NBI version is running".format(
5594 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
5598 min_instance_count
= 0
5599 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
5600 min_instance_count
= kdu_profile
["min-number-of-instances"]
5602 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
5603 deployed_kdu
, _
= get_deployed_kdu(
5604 nsr_deployed
, kdu_name
, vnf_index
5606 if deployed_kdu
is None:
5608 "KDU '{}' for vnf '{}' not deployed".format(
5612 kdu_instance
= deployed_kdu
.get("kdu-instance")
5613 instance_num
= await self
.k8scluster_map
[
5619 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
5620 kdu_model
=deployed_kdu
.get("kdu-model"),
5622 kdu_replica_count
= instance_num
- kdu_delta
.get(
5623 "number-of-instances", 1
5626 if kdu_replica_count
< min_instance_count
< instance_num
:
5627 kdu_replica_count
= min_instance_count
5628 if kdu_replica_count
< min_instance_count
:
5630 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5631 "scaling-group-descriptor '{}'".format(
5632 instance_num
, scaling_group
5636 for x
in range(kdu_delta
.get("number-of-instances", 1)):
5637 vca_scaling_info
.append(
5639 "osm_kdu_id": kdu_name
,
5640 "member-vnf-index": vnf_index
,
5642 "kdu_index": instance_num
- x
- 1,
5645 scaling_info
["kdu-delete"][kdu_name
].append(
5647 "member-vnf-index": vnf_index
,
5649 "k8s-cluster-type": k8s_cluster_type
,
5650 "resource-name": resource_name
,
5651 "scale": kdu_replica_count
,
5655 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
5656 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
5657 if scaling_info
["scaling_direction"] == "IN":
5658 for vdur
in reversed(db_vnfr
["vdur"]):
5659 if vdu_delete
.get(vdur
["vdu-id-ref"]):
5660 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
5661 scaling_info
["vdu"].append(
5663 "name": vdur
.get("name") or vdur
.get("vdu-name"),
5664 "vdu_id": vdur
["vdu-id-ref"],
5668 for interface
in vdur
["interfaces"]:
5669 scaling_info
["vdu"][-1]["interface"].append(
5671 "name": interface
["name"],
5672 "ip_address": interface
["ip-address"],
5673 "mac_address": interface
.get("mac-address"),
5676 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
5679 step
= "Executing pre-scale vnf-config-primitive"
5680 if scaling_descriptor
.get("scaling-config-action"):
5681 for scaling_config_action
in scaling_descriptor
[
5682 "scaling-config-action"
5685 scaling_config_action
.get("trigger") == "pre-scale-in"
5686 and scaling_type
== "SCALE_IN"
5688 scaling_config_action
.get("trigger") == "pre-scale-out"
5689 and scaling_type
== "SCALE_OUT"
5691 vnf_config_primitive
= scaling_config_action
[
5692 "vnf-config-primitive-name-ref"
5694 step
= db_nslcmop_update
[
5696 ] = "executing pre-scale scaling-config-action '{}'".format(
5697 vnf_config_primitive
5700 # look for primitive
5701 for config_primitive
in (
5702 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
5703 ).get("config-primitive", ()):
5704 if config_primitive
["name"] == vnf_config_primitive
:
5708 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
5709 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
5710 "primitive".format(scaling_group
, vnf_config_primitive
)
5713 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
5714 if db_vnfr
.get("additionalParamsForVnf"):
5715 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
5717 scale_process
= "VCA"
5718 db_nsr_update
["config-status"] = "configuring pre-scaling"
5719 primitive_params
= self
._map
_primitive
_params
(
5720 config_primitive
, {}, vnfr_params
5723 # Pre-scale retry check: Check if this sub-operation has been executed before
5724 op_index
= self
._check
_or
_add
_scale
_suboperation
(
5727 vnf_config_primitive
,
5731 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
5732 # Skip sub-operation
5733 result
= "COMPLETED"
5734 result_detail
= "Done"
5737 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5738 vnf_config_primitive
, result
, result_detail
5742 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
5743 # New sub-operation: Get index of this sub-operation
5745 len(db_nslcmop
.get("_admin", {}).get("operations"))
5750 + "vnf_config_primitive={} New sub-operation".format(
5751 vnf_config_primitive
5755 # retry: Get registered params for this existing sub-operation
5756 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
5759 vnf_index
= op
.get("member_vnf_index")
5760 vnf_config_primitive
= op
.get("primitive")
5761 primitive_params
= op
.get("primitive_params")
5764 + "vnf_config_primitive={} Sub-operation retry".format(
5765 vnf_config_primitive
5768 # Execute the primitive, either with new (first-time) or registered (reintent) args
5769 ee_descriptor_id
= config_primitive
.get(
5770 "execution-environment-ref"
5772 primitive_name
= config_primitive
.get(
5773 "execution-environment-primitive", vnf_config_primitive
5775 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5776 nsr_deployed
["VCA"],
5777 member_vnf_index
=vnf_index
,
5779 vdu_count_index
=None,
5780 ee_descriptor_id
=ee_descriptor_id
,
5782 result
, result_detail
= await self
._ns
_execute
_primitive
(
5791 + "vnf_config_primitive={} Done with result {} {}".format(
5792 vnf_config_primitive
, result
, result_detail
5795 # Update operationState = COMPLETED | FAILED
5796 self
._update
_suboperation
_status
(
5797 db_nslcmop
, op_index
, result
, result_detail
5800 if result
== "FAILED":
5801 raise LcmException(result_detail
)
5802 db_nsr_update
["config-status"] = old_config_status
5803 scale_process
= None
5807 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
5810 "_admin.scaling-group.{}.time".format(admin_scale_index
)
5813 # SCALE-IN VCA - BEGIN
5814 if vca_scaling_info
:
5815 step
= db_nslcmop_update
[
5817 ] = "Deleting the execution environments"
5818 scale_process
= "VCA"
5819 for vca_info
in vca_scaling_info
:
5820 if vca_info
["type"] == "delete" and not vca_info
.get("osm_kdu_id"):
5821 member_vnf_index
= str(vca_info
["member-vnf-index"])
5823 logging_text
+ "vdu info: {}".format(vca_info
)
5825 if vca_info
.get("osm_vdu_id"):
5826 vdu_id
= vca_info
["osm_vdu_id"]
5827 vdu_index
= int(vca_info
["vdu_index"])
5830 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5831 member_vnf_index
, vdu_id
, vdu_index
5833 stage
[2] = step
= "Scaling in VCA"
5834 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
5835 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
5836 config_update
= db_nsr
["configurationStatus"]
5837 for vca_index
, vca
in enumerate(vca_update
):
5839 (vca
or vca
.get("ee_id"))
5840 and vca
["member-vnf-index"] == member_vnf_index
5841 and vca
["vdu_count_index"] == vdu_index
5843 if vca
.get("vdu_id"):
5844 config_descriptor
= get_configuration(
5845 db_vnfd
, vca
.get("vdu_id")
5847 elif vca
.get("kdu_name"):
5848 config_descriptor
= get_configuration(
5849 db_vnfd
, vca
.get("kdu_name")
5852 config_descriptor
= get_configuration(
5853 db_vnfd
, db_vnfd
["id"]
5855 operation_params
= (
5856 db_nslcmop
.get("operationParams") or {}
5858 exec_terminate_primitives
= not operation_params
.get(
5859 "skip_terminate_primitives"
5860 ) and vca
.get("needed_terminate")
5861 task
= asyncio
.ensure_future(
5870 exec_primitives
=exec_terminate_primitives
,
5874 timeout
=self
.timeout_charm_delete
,
5877 tasks_dict_info
[task
] = "Terminating VCA {}".format(
5880 del vca_update
[vca_index
]
5881 del config_update
[vca_index
]
5882 # wait for pending tasks of terminate primitives
5886 + "Waiting for tasks {}".format(
5887 list(tasks_dict_info
.keys())
5890 error_list
= await self
._wait
_for
_tasks
(
5894 self
.timeout_charm_delete
, self
.timeout_ns_terminate
5899 tasks_dict_info
.clear()
5901 raise LcmException("; ".join(error_list
))
5903 db_vca_and_config_update
= {
5904 "_admin.deployed.VCA": vca_update
,
5905 "configurationStatus": config_update
,
5908 "nsrs", db_nsr
["_id"], db_vca_and_config_update
5910 scale_process
= None
5911 # SCALE-IN VCA - END
5914 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
5915 scale_process
= "RO"
5916 if self
.ro_config
.get("ng"):
5917 await self
._scale
_ng
_ro
(
5918 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
5920 scaling_info
.pop("vdu-create", None)
5921 scaling_info
.pop("vdu-delete", None)
5923 scale_process
= None
5927 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
5928 scale_process
= "KDU"
5929 await self
._scale
_kdu
(
5930 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
5932 scaling_info
.pop("kdu-create", None)
5933 scaling_info
.pop("kdu-delete", None)
5935 scale_process
= None
5939 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5941 # SCALE-UP VCA - BEGIN
5942 if vca_scaling_info
:
5943 step
= db_nslcmop_update
[
5945 ] = "Creating new execution environments"
5946 scale_process
= "VCA"
5947 for vca_info
in vca_scaling_info
:
5948 if vca_info
["type"] == "create" and not vca_info
.get("osm_kdu_id"):
5949 member_vnf_index
= str(vca_info
["member-vnf-index"])
5951 logging_text
+ "vdu info: {}".format(vca_info
)
5953 vnfd_id
= db_vnfr
["vnfd-ref"]
5954 if vca_info
.get("osm_vdu_id"):
5955 vdu_index
= int(vca_info
["vdu_index"])
5956 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
5957 if db_vnfr
.get("additionalParamsForVnf"):
5958 deploy_params
.update(
5960 db_vnfr
["additionalParamsForVnf"].copy()
5963 descriptor_config
= get_configuration(
5964 db_vnfd
, db_vnfd
["id"]
5966 if descriptor_config
:
5971 logging_text
=logging_text
5972 + "member_vnf_index={} ".format(member_vnf_index
),
5975 nslcmop_id
=nslcmop_id
,
5981 member_vnf_index
=member_vnf_index
,
5982 vdu_index
=vdu_index
,
5984 deploy_params
=deploy_params
,
5985 descriptor_config
=descriptor_config
,
5986 base_folder
=base_folder
,
5987 task_instantiation_info
=tasks_dict_info
,
5990 vdu_id
= vca_info
["osm_vdu_id"]
5991 vdur
= find_in_list(
5992 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
5994 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
5995 if vdur
.get("additionalParams"):
5996 deploy_params_vdu
= parse_yaml_strings(
5997 vdur
["additionalParams"]
6000 deploy_params_vdu
= deploy_params
6001 deploy_params_vdu
["OSM"] = get_osm_params(
6002 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
6004 if descriptor_config
:
6009 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6010 member_vnf_index
, vdu_id
, vdu_index
6012 stage
[2] = step
= "Scaling out VCA"
6013 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6015 logging_text
=logging_text
6016 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6017 member_vnf_index
, vdu_id
, vdu_index
6021 nslcmop_id
=nslcmop_id
,
6027 member_vnf_index
=member_vnf_index
,
6028 vdu_index
=vdu_index
,
6030 deploy_params
=deploy_params_vdu
,
6031 descriptor_config
=descriptor_config
,
6032 base_folder
=base_folder
,
6033 task_instantiation_info
=tasks_dict_info
,
6036 # SCALE-UP VCA - END
6037 scale_process
= None
6040 # execute primitive service POST-SCALING
6041 step
= "Executing post-scale vnf-config-primitive"
6042 if scaling_descriptor
.get("scaling-config-action"):
6043 for scaling_config_action
in scaling_descriptor
[
6044 "scaling-config-action"
6047 scaling_config_action
.get("trigger") == "post-scale-in"
6048 and scaling_type
== "SCALE_IN"
6050 scaling_config_action
.get("trigger") == "post-scale-out"
6051 and scaling_type
== "SCALE_OUT"
6053 vnf_config_primitive
= scaling_config_action
[
6054 "vnf-config-primitive-name-ref"
6056 step
= db_nslcmop_update
[
6058 ] = "executing post-scale scaling-config-action '{}'".format(
6059 vnf_config_primitive
6062 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6063 if db_vnfr
.get("additionalParamsForVnf"):
6064 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6066 # look for primitive
6067 for config_primitive
in (
6068 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6069 ).get("config-primitive", ()):
6070 if config_primitive
["name"] == vnf_config_primitive
:
6074 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
6075 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
6076 "config-primitive".format(
6077 scaling_group
, vnf_config_primitive
6080 scale_process
= "VCA"
6081 db_nsr_update
["config-status"] = "configuring post-scaling"
6082 primitive_params
= self
._map
_primitive
_params
(
6083 config_primitive
, {}, vnfr_params
6086 # Post-scale retry check: Check if this sub-operation has been executed before
6087 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6090 vnf_config_primitive
,
6094 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6095 # Skip sub-operation
6096 result
= "COMPLETED"
6097 result_detail
= "Done"
6100 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6101 vnf_config_primitive
, result
, result_detail
6105 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6106 # New sub-operation: Get index of this sub-operation
6108 len(db_nslcmop
.get("_admin", {}).get("operations"))
6113 + "vnf_config_primitive={} New sub-operation".format(
6114 vnf_config_primitive
6118 # retry: Get registered params for this existing sub-operation
6119 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6122 vnf_index
= op
.get("member_vnf_index")
6123 vnf_config_primitive
= op
.get("primitive")
6124 primitive_params
= op
.get("primitive_params")
6127 + "vnf_config_primitive={} Sub-operation retry".format(
6128 vnf_config_primitive
6131 # Execute the primitive, either with new (first-time) or registered (reintent) args
6132 ee_descriptor_id
= config_primitive
.get(
6133 "execution-environment-ref"
6135 primitive_name
= config_primitive
.get(
6136 "execution-environment-primitive", vnf_config_primitive
6138 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6139 nsr_deployed
["VCA"],
6140 member_vnf_index
=vnf_index
,
6142 vdu_count_index
=None,
6143 ee_descriptor_id
=ee_descriptor_id
,
6145 result
, result_detail
= await self
._ns
_execute
_primitive
(
6154 + "vnf_config_primitive={} Done with result {} {}".format(
6155 vnf_config_primitive
, result
, result_detail
6158 # Update operationState = COMPLETED | FAILED
6159 self
._update
_suboperation
_status
(
6160 db_nslcmop
, op_index
, result
, result_detail
6163 if result
== "FAILED":
6164 raise LcmException(result_detail
)
6165 db_nsr_update
["config-status"] = old_config_status
6166 scale_process
= None
6171 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
6172 db_nsr_update
["operational-status"] = (
6174 if old_operational_status
== "failed"
6175 else old_operational_status
6177 db_nsr_update
["config-status"] = old_config_status
6180 ROclient
.ROClientException
,
6185 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
6187 except asyncio
.CancelledError
:
6189 logging_text
+ "Cancelled Exception while '{}'".format(step
)
6191 exc
= "Operation was cancelled"
6192 except Exception as e
:
6193 exc
= traceback
.format_exc()
6194 self
.logger
.critical(
6195 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6199 self
._write
_ns
_status
(
6202 current_operation
="IDLE",
6203 current_operation_id
=None,
6206 stage
[1] = "Waiting for instantiate pending tasks."
6207 self
.logger
.debug(logging_text
+ stage
[1])
6208 exc
= await self
._wait
_for
_tasks
(
6211 self
.timeout_ns_deploy
,
6219 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
6220 nslcmop_operation_state
= "FAILED"
6222 db_nsr_update
["operational-status"] = old_operational_status
6223 db_nsr_update
["config-status"] = old_config_status
6224 db_nsr_update
["detailed-status"] = ""
6226 if "VCA" in scale_process
:
6227 db_nsr_update
["config-status"] = "failed"
6228 if "RO" in scale_process
:
6229 db_nsr_update
["operational-status"] = "failed"
6232 ] = "FAILED scaling nslcmop={} {}: {}".format(
6233 nslcmop_id
, step
, exc
6236 error_description_nslcmop
= None
6237 nslcmop_operation_state
= "COMPLETED"
6238 db_nslcmop_update
["detailed-status"] = "Done"
6240 self
._write
_op
_status
(
6243 error_message
=error_description_nslcmop
,
6244 operation_state
=nslcmop_operation_state
,
6245 other_update
=db_nslcmop_update
,
6248 self
._write
_ns
_status
(
6251 current_operation
="IDLE",
6252 current_operation_id
=None,
6253 other_update
=db_nsr_update
,
6256 if nslcmop_operation_state
:
6260 "nslcmop_id": nslcmop_id
,
6261 "operationState": nslcmop_operation_state
,
6263 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
6264 except Exception as e
:
6266 logging_text
+ "kafka_write notification Exception {}".format(e
)
6268 self
.logger
.debug(logging_text
+ "Exit")
6269 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
6271 async def _scale_kdu(
6272 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6274 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
6275 for kdu_name
in _scaling_info
:
6276 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
6277 deployed_kdu
, index
= get_deployed_kdu(
6278 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
6280 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
6281 kdu_instance
= deployed_kdu
["kdu-instance"]
6282 kdu_model
= deployed_kdu
.get("kdu-model")
6283 scale
= int(kdu_scaling_info
["scale"])
6284 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
6287 "collection": "nsrs",
6288 "filter": {"_id": nsr_id
},
6289 "path": "_admin.deployed.K8s.{}".format(index
),
6292 step
= "scaling application {}".format(
6293 kdu_scaling_info
["resource-name"]
6295 self
.logger
.debug(logging_text
+ step
)
6297 if kdu_scaling_info
["type"] == "delete":
6298 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
6301 and kdu_config
.get("terminate-config-primitive")
6302 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
6304 terminate_config_primitive_list
= kdu_config
.get(
6305 "terminate-config-primitive"
6307 terminate_config_primitive_list
.sort(
6308 key
=lambda val
: int(val
["seq"])
6312 terminate_config_primitive
6313 ) in terminate_config_primitive_list
:
6314 primitive_params_
= self
._map
_primitive
_params
(
6315 terminate_config_primitive
, {}, {}
6317 step
= "execute terminate config primitive"
6318 self
.logger
.debug(logging_text
+ step
)
6319 await asyncio
.wait_for(
6320 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
6321 cluster_uuid
=cluster_uuid
,
6322 kdu_instance
=kdu_instance
,
6323 primitive_name
=terminate_config_primitive
["name"],
6324 params
=primitive_params_
,
6331 await asyncio
.wait_for(
6332 self
.k8scluster_map
[k8s_cluster_type
].scale(
6335 kdu_scaling_info
["resource-name"],
6337 cluster_uuid
=cluster_uuid
,
6338 kdu_model
=kdu_model
,
6342 timeout
=self
.timeout_vca_on_error
,
6345 if kdu_scaling_info
["type"] == "create":
6346 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
6349 and kdu_config
.get("initial-config-primitive")
6350 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
6352 initial_config_primitive_list
= kdu_config
.get(
6353 "initial-config-primitive"
6355 initial_config_primitive_list
.sort(
6356 key
=lambda val
: int(val
["seq"])
6359 for initial_config_primitive
in initial_config_primitive_list
:
6360 primitive_params_
= self
._map
_primitive
_params
(
6361 initial_config_primitive
, {}, {}
6363 step
= "execute initial config primitive"
6364 self
.logger
.debug(logging_text
+ step
)
6365 await asyncio
.wait_for(
6366 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
6367 cluster_uuid
=cluster_uuid
,
6368 kdu_instance
=kdu_instance
,
6369 primitive_name
=initial_config_primitive
["name"],
6370 params
=primitive_params_
,
6377 async def _scale_ng_ro(
6378 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
6380 nsr_id
= db_nslcmop
["nsInstanceId"]
6381 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
6384 # read from db: vnfd's for every vnf
6387 # for each vnf in ns, read vnfd
6388 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
6389 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
6390 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
6391 # if we haven't this vnfd, read it from db
6392 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
6394 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
6395 db_vnfds
.append(vnfd
)
6396 n2vc_key
= self
.n2vc
.get_public_key()
6397 n2vc_key_list
= [n2vc_key
]
6400 vdu_scaling_info
.get("vdu-create"),
6401 vdu_scaling_info
.get("vdu-delete"),
6404 # db_vnfr has been updated, update db_vnfrs to use it
6405 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
6406 await self
._instantiate
_ng
_ro
(
6416 start_deploy
=time(),
6417 timeout_ns_deploy
=self
.timeout_ns_deploy
,
6419 if vdu_scaling_info
.get("vdu-delete"):
6421 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
6424 async def extract_prometheus_scrape_jobs(
6425 self
, ee_id
, artifact_path
, ee_config_descriptor
, vnfr_id
, nsr_id
, target_ip
6427 # look if exist a file called 'prometheus*.j2' and
6428 artifact_content
= self
.fs
.dir_ls(artifact_path
)
6432 for f
in artifact_content
6433 if f
.startswith("prometheus") and f
.endswith(".j2")
6439 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
6443 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
6444 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
6446 vnfr_id
= vnfr_id
.replace("-", "")
6448 "JOB_NAME": vnfr_id
,
6449 "TARGET_IP": target_ip
,
6450 "EXPORTER_POD_IP": host_name
,
6451 "EXPORTER_POD_PORT": host_port
,
6453 job_list
= parse_job(job_data
, variables
)
6454 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
6455 for job
in job_list
:
6457 not isinstance(job
.get("job_name"), str)
6458 or vnfr_id
not in job
["job_name"]
6460 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
6461 job
["nsr_id"] = nsr_id
6462 job
["vnfr_id"] = vnfr_id
6465 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
6467 Get VCA Cloud and VCA Cloud Credentials for the VIM account
6469 :param: vim_account_id: VIM Account ID
6471 :return: (cloud_name, cloud_credential)
6473 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
6474 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
6476 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
6478 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
6480 :param: vim_account_id: VIM Account ID
6482 :return: (cloud_name, cloud_credential)
6484 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
6485 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")