1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
20 from typing
import Any
, Dict
, List
23 import logging
.handlers
34 from osm_lcm
import ROclient
35 from osm_lcm
.data_utils
.nsr
import (
38 get_deployed_vca_list
,
41 from osm_lcm
.data_utils
.vca
import (
50 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
51 from osm_lcm
.lcm_utils
import (
59 from osm_lcm
.data_utils
.nsd
import (
60 get_ns_configuration_relation_list
,
64 from osm_lcm
.data_utils
.vnfd
import (
68 get_ee_sorted_initial_config_primitive_list
,
69 get_ee_sorted_terminate_config_primitive_list
,
71 get_virtual_link_profiles
,
76 get_number_of_instances
,
78 get_kdu_resource_profile
,
80 from osm_lcm
.data_utils
.list_utils
import find_in_list
81 from osm_lcm
.data_utils
.vnfr
import get_osm_params
, get_vdur_index
, get_kdur
82 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
83 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
84 from n2vc
.definitions
import RelationEndpoint
85 from n2vc
.k8s_helm_conn
import K8sHelmConnector
86 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
87 from n2vc
.k8s_juju_conn
import K8sJujuConnector
89 from osm_common
.dbbase
import DbException
90 from osm_common
.fsbase
import FsException
92 from osm_lcm
.data_utils
.database
.database
import Database
93 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
95 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
96 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
98 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
100 from copy
import copy
, deepcopy
101 from time
import time
102 from uuid
import uuid4
104 from random
import randint
106 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
109 class NsLcm(LcmBase
):
110 timeout_vca_on_error
= (
112 ) # Time for charm from first time at blocked,error status to mark as failed
113 timeout_ns_deploy
= 2 * 3600 # default global timeout for deployment a ns
114 timeout_ns_terminate
= 1800 # default global timeout for un deployment a ns
115 timeout_charm_delete
= 10 * 60
116 timeout_primitive
= 30 * 60 # timeout for primitive execution
117 timeout_progress_primitive
= (
119 ) # timeout for some progress in a primitive execution
121 SUBOPERATION_STATUS_NOT_FOUND
= -1
122 SUBOPERATION_STATUS_NEW
= -2
123 SUBOPERATION_STATUS_SKIP
= -3
124 task_name_deploy_vca
= "Deploying VCA"
126 def __init__(self
, msg
, lcm_tasks
, config
, loop
, prometheus
=None):
128 Init, Connect to database, filesystem storage, and messaging
129 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
132 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
134 self
.db
= Database().instance
.db
135 self
.fs
= Filesystem().instance
.fs
137 self
.lcm_tasks
= lcm_tasks
138 self
.timeout
= config
["timeout"]
139 self
.ro_config
= config
["ro_config"]
140 self
.ng_ro
= config
["ro_config"].get("ng")
141 self
.vca_config
= config
["VCA"].copy()
143 # create N2VC connector
144 self
.n2vc
= N2VCJujuConnector(
147 on_update_db
=self
._on
_update
_n
2vc
_db
,
152 self
.conn_helm_ee
= LCMHelmConn(
155 vca_config
=self
.vca_config
,
156 on_update_db
=self
._on
_update
_n
2vc
_db
,
159 self
.k8sclusterhelm2
= K8sHelmConnector(
160 kubectl_command
=self
.vca_config
.get("kubectlpath"),
161 helm_command
=self
.vca_config
.get("helmpath"),
168 self
.k8sclusterhelm3
= K8sHelm3Connector(
169 kubectl_command
=self
.vca_config
.get("kubectlpath"),
170 helm_command
=self
.vca_config
.get("helm3path"),
177 self
.k8sclusterjuju
= K8sJujuConnector(
178 kubectl_command
=self
.vca_config
.get("kubectlpath"),
179 juju_command
=self
.vca_config
.get("jujupath"),
182 on_update_db
=self
._on
_update
_k
8s
_db
,
187 self
.k8scluster_map
= {
188 "helm-chart": self
.k8sclusterhelm2
,
189 "helm-chart-v3": self
.k8sclusterhelm3
,
190 "chart": self
.k8sclusterhelm3
,
191 "juju-bundle": self
.k8sclusterjuju
,
192 "juju": self
.k8sclusterjuju
,
196 "lxc_proxy_charm": self
.n2vc
,
197 "native_charm": self
.n2vc
,
198 "k8s_proxy_charm": self
.n2vc
,
199 "helm": self
.conn_helm_ee
,
200 "helm-v3": self
.conn_helm_ee
,
203 self
.prometheus
= prometheus
206 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
)
209 def increment_ip_mac(ip_mac
, vm_index
=1):
210 if not isinstance(ip_mac
, str):
213 # try with ipv4 look for last dot
214 i
= ip_mac
.rfind(".")
217 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
218 # try with ipv6 or mac look for last colon. Operate in hex
219 i
= ip_mac
.rfind(":")
222 # format in hex, len can be 2 for mac or 4 for ipv6
223 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
224 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
230 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
232 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
235 # TODO filter RO descriptor fields...
239 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
240 db_dict
["deploymentStatus"] = ro_descriptor
241 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
243 except Exception as e
:
245 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id
, e
)
248 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
250 # remove last dot from path (if exists)
251 if path
.endswith("."):
254 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
255 # .format(table, filter, path, updated_data))
258 nsr_id
= filter.get("_id")
260 # read ns record from database
261 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
262 current_ns_status
= nsr
.get("nsState")
264 # get vca status for NS
265 status_dict
= await self
.n2vc
.get_status(
266 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
271 db_dict
["vcaStatus"] = status_dict
272 await self
.n2vc
.update_vca_status(db_dict
["vcaStatus"], vca_id
=vca_id
)
274 # update configurationStatus for this VCA
276 vca_index
= int(path
[path
.rfind(".") + 1 :])
279 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
281 vca_status
= vca_list
[vca_index
].get("status")
283 configuration_status_list
= nsr
.get("configurationStatus")
284 config_status
= configuration_status_list
[vca_index
].get("status")
286 if config_status
== "BROKEN" and vca_status
!= "failed":
287 db_dict
["configurationStatus"][vca_index
] = "READY"
288 elif config_status
!= "BROKEN" and vca_status
== "failed":
289 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
290 except Exception as e
:
291 # not update configurationStatus
292 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
294 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
295 # if nsState = 'DEGRADED' check if all is OK
297 if current_ns_status
in ("READY", "DEGRADED"):
298 error_description
= ""
300 if status_dict
.get("machines"):
301 for machine_id
in status_dict
.get("machines"):
302 machine
= status_dict
.get("machines").get(machine_id
)
303 # check machine agent-status
304 if machine
.get("agent-status"):
305 s
= machine
.get("agent-status").get("status")
308 error_description
+= (
309 "machine {} agent-status={} ; ".format(
313 # check machine instance status
314 if machine
.get("instance-status"):
315 s
= machine
.get("instance-status").get("status")
318 error_description
+= (
319 "machine {} instance-status={} ; ".format(
324 if status_dict
.get("applications"):
325 for app_id
in status_dict
.get("applications"):
326 app
= status_dict
.get("applications").get(app_id
)
327 # check application status
328 if app
.get("status"):
329 s
= app
.get("status").get("status")
332 error_description
+= (
333 "application {} status={} ; ".format(app_id
, s
)
336 if error_description
:
337 db_dict
["errorDescription"] = error_description
338 if current_ns_status
== "READY" and is_degraded
:
339 db_dict
["nsState"] = "DEGRADED"
340 if current_ns_status
== "DEGRADED" and not is_degraded
:
341 db_dict
["nsState"] = "READY"
344 self
.update_db_2("nsrs", nsr_id
, db_dict
)
346 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
348 except Exception as e
:
349 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
351 async def _on_update_k8s_db(
352 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None
355 Updating vca status in NSR record
356 :param cluster_uuid: UUID of a k8s cluster
357 :param kdu_instance: The unique name of the KDU instance
358 :param filter: To get nsr_id
362 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
363 # .format(cluster_uuid, kdu_instance, filter))
366 nsr_id
= filter.get("_id")
368 # get vca status for NS
369 vca_status
= await self
.k8sclusterjuju
.status_kdu(
372 complete_status
=True,
378 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
380 await self
.k8sclusterjuju
.update_vca_status(
381 db_dict
["vcaStatus"],
387 self
.update_db_2("nsrs", nsr_id
, db_dict
)
389 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
391 except Exception as e
:
392 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
395 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
397 env
= Environment(undefined
=StrictUndefined
)
398 template
= env
.from_string(cloud_init_text
)
399 return template
.render(additional_params
or {})
400 except UndefinedError
as e
:
402 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
403 "file, must be provided in the instantiation parameters inside the "
404 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
406 except (TemplateError
, TemplateNotFound
) as e
:
408 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
413 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
414 cloud_init_content
= cloud_init_file
= None
416 if vdu
.get("cloud-init-file"):
417 base_folder
= vnfd
["_admin"]["storage"]
418 cloud_init_file
= "{}/{}/cloud_init/{}".format(
419 base_folder
["folder"],
420 base_folder
["pkg-dir"],
421 vdu
["cloud-init-file"],
423 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
424 cloud_init_content
= ci_file
.read()
425 elif vdu
.get("cloud-init"):
426 cloud_init_content
= vdu
["cloud-init"]
428 return cloud_init_content
429 except FsException
as e
:
431 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
432 vnfd
["id"], vdu
["id"], cloud_init_file
, e
436 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
438 vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]
440 additional_params
= vdur
.get("additionalParams")
441 return parse_yaml_strings(additional_params
)
443 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
445 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
446 :param vnfd: input vnfd
447 :param new_id: overrides vnf id if provided
448 :param additionalParams: Instantiation params for VNFs provided
449 :param nsrId: Id of the NSR
450 :return: copy of vnfd
452 vnfd_RO
= deepcopy(vnfd
)
453 # remove unused by RO configuration, monitoring, scaling and internal keys
454 vnfd_RO
.pop("_id", None)
455 vnfd_RO
.pop("_admin", None)
456 vnfd_RO
.pop("monitoring-param", None)
457 vnfd_RO
.pop("scaling-group-descriptor", None)
458 vnfd_RO
.pop("kdu", None)
459 vnfd_RO
.pop("k8s-cluster", None)
461 vnfd_RO
["id"] = new_id
463 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
464 for vdu
in get_iterable(vnfd_RO
, "vdu"):
465 vdu
.pop("cloud-init-file", None)
466 vdu
.pop("cloud-init", None)
470 def ip_profile_2_RO(ip_profile
):
471 RO_ip_profile
= deepcopy(ip_profile
)
472 if "dns-server" in RO_ip_profile
:
473 if isinstance(RO_ip_profile
["dns-server"], list):
474 RO_ip_profile
["dns-address"] = []
475 for ds
in RO_ip_profile
.pop("dns-server"):
476 RO_ip_profile
["dns-address"].append(ds
["address"])
478 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
479 if RO_ip_profile
.get("ip-version") == "ipv4":
480 RO_ip_profile
["ip-version"] = "IPv4"
481 if RO_ip_profile
.get("ip-version") == "ipv6":
482 RO_ip_profile
["ip-version"] = "IPv6"
483 if "dhcp-params" in RO_ip_profile
:
484 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
487 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
488 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
489 if db_vim
["_admin"]["operationalState"] != "ENABLED":
491 "VIM={} is not available. operationalState={}".format(
492 vim_account
, db_vim
["_admin"]["operationalState"]
495 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
498 def get_ro_wim_id_for_wim_account(self
, wim_account
):
499 if isinstance(wim_account
, str):
500 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
501 if db_wim
["_admin"]["operationalState"] != "ENABLED":
503 "WIM={} is not available. operationalState={}".format(
504 wim_account
, db_wim
["_admin"]["operationalState"]
507 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
512 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
514 db_vdu_push_list
= []
515 db_update
= {"_admin.modified": time()}
517 for vdu_id
, vdu_count
in vdu_create
.items():
521 for vdur
in reversed(db_vnfr
["vdur"])
522 if vdur
["vdu-id-ref"] == vdu_id
528 "Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
533 for count
in range(vdu_count
):
534 vdur_copy
= deepcopy(vdur
)
535 vdur_copy
["status"] = "BUILD"
536 vdur_copy
["status-detailed"] = None
537 vdur_copy
["ip-address"]: None
538 vdur_copy
["_id"] = str(uuid4())
539 vdur_copy
["count-index"] += count
+ 1
540 vdur_copy
["id"] = "{}-{}".format(
541 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
543 vdur_copy
.pop("vim_info", None)
544 for iface
in vdur_copy
["interfaces"]:
545 if iface
.get("fixed-ip"):
546 iface
["ip-address"] = self
.increment_ip_mac(
547 iface
["ip-address"], count
+ 1
550 iface
.pop("ip-address", None)
551 if iface
.get("fixed-mac"):
552 iface
["mac-address"] = self
.increment_ip_mac(
553 iface
["mac-address"], count
+ 1
556 iface
.pop("mac-address", None)
559 ) # only first vdu can be managment of vnf
560 db_vdu_push_list
.append(vdur_copy
)
561 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
563 for vdu_id
, vdu_count
in vdu_delete
.items():
565 indexes_to_delete
= [
567 for iv
in enumerate(db_vnfr
["vdur"])
568 if iv
[1]["vdu-id-ref"] == vdu_id
572 "vdur.{}.status".format(i
): "DELETING"
573 for i
in indexes_to_delete
[-vdu_count
:]
577 # it must be deleted one by one because common.db does not allow otherwise
580 for v
in reversed(db_vnfr
["vdur"])
581 if v
["vdu-id-ref"] == vdu_id
583 for vdu
in vdus_to_delete
[:vdu_count
]:
586 {"_id": db_vnfr
["_id"]},
588 pull
={"vdur": {"_id": vdu
["_id"]}},
590 db_push
= {"vdur": db_vdu_push_list
} if db_vdu_push_list
else None
591 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
592 # modify passed dictionary db_vnfr
593 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
594 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
596 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
598 Updates database nsr with the RO info for the created vld
599 :param ns_update_nsr: dictionary to be filled with the updated info
600 :param db_nsr: content of db_nsr. This is also modified
601 :param nsr_desc_RO: nsr descriptor from RO
602 :return: Nothing, LcmException is raised on errors
605 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
606 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
607 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
609 vld
["vim-id"] = net_RO
.get("vim_net_id")
610 vld
["name"] = net_RO
.get("vim_name")
611 vld
["status"] = net_RO
.get("status")
612 vld
["status-detailed"] = net_RO
.get("error_msg")
613 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
617 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
620 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
622 for db_vnfr
in db_vnfrs
.values():
623 vnfr_update
= {"status": "ERROR"}
624 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
625 if "status" not in vdur
:
626 vdur
["status"] = "ERROR"
627 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
629 vdur
["status-detailed"] = str(error_text
)
631 "vdur.{}.status-detailed".format(vdu_index
)
633 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
634 except DbException
as e
:
635 self
.logger
.error("Cannot update vnf. {}".format(e
))
637 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
639 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
640 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
641 :param nsr_desc_RO: nsr descriptor from RO
642 :return: Nothing, LcmException is raised on errors
644 for vnf_index
, db_vnfr
in db_vnfrs
.items():
645 for vnf_RO
in nsr_desc_RO
["vnfs"]:
646 if vnf_RO
["member_vnf_index"] != vnf_index
:
649 if vnf_RO
.get("ip_address"):
650 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
[
653 elif not db_vnfr
.get("ip-address"):
654 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
655 raise LcmExceptionNoMgmtIP(
656 "ns member_vnf_index '{}' has no IP address".format(
661 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
662 vdur_RO_count_index
= 0
663 if vdur
.get("pdu-type"):
665 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
666 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
668 if vdur
["count-index"] != vdur_RO_count_index
:
669 vdur_RO_count_index
+= 1
671 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
672 if vdur_RO
.get("ip_address"):
673 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
675 vdur
["ip-address"] = None
676 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
677 vdur
["name"] = vdur_RO
.get("vim_name")
678 vdur
["status"] = vdur_RO
.get("status")
679 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
680 for ifacer
in get_iterable(vdur
, "interfaces"):
681 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
682 if ifacer
["name"] == interface_RO
.get("internal_name"):
683 ifacer
["ip-address"] = interface_RO
.get(
686 ifacer
["mac-address"] = interface_RO
.get(
692 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
693 "from VIM info".format(
694 vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]
697 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
701 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
703 vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]
707 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
708 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
709 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
711 vld
["vim-id"] = net_RO
.get("vim_net_id")
712 vld
["name"] = net_RO
.get("vim_name")
713 vld
["status"] = net_RO
.get("status")
714 vld
["status-detailed"] = net_RO
.get("error_msg")
715 vnfr_update
["vld.{}".format(vld_index
)] = vld
719 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
724 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
729 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
734 def _get_ns_config_info(self
, nsr_id
):
736 Generates a mapping between vnf,vdu elements and the N2VC id
737 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
738 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
739 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
740 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
742 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
743 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
745 ns_config_info
= {"osm-config-mapping": mapping
}
746 for vca
in vca_deployed_list
:
747 if not vca
["member-vnf-index"]:
749 if not vca
["vdu_id"]:
750 mapping
[vca
["member-vnf-index"]] = vca
["application"]
754 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
756 ] = vca
["application"]
757 return ns_config_info
759 async def _instantiate_ng_ro(
776 def get_vim_account(vim_account_id
):
778 if vim_account_id
in db_vims
:
779 return db_vims
[vim_account_id
]
780 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
781 db_vims
[vim_account_id
] = db_vim
784 # modify target_vld info with instantiation parameters
785 def parse_vld_instantiation_params(
786 target_vim
, target_vld
, vld_params
, target_sdn
788 if vld_params
.get("ip-profile"):
789 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
[
792 if vld_params
.get("provider-network"):
793 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
796 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
797 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
800 if vld_params
.get("wimAccountId"):
801 target_wim
= "wim:{}".format(vld_params
["wimAccountId"])
802 target_vld
["vim_info"][target_wim
] = {}
803 for param
in ("vim-network-name", "vim-network-id"):
804 if vld_params
.get(param
):
805 if isinstance(vld_params
[param
], dict):
806 for vim
, vim_net
in vld_params
[param
].items():
807 other_target_vim
= "vim:" + vim
809 target_vld
["vim_info"],
810 (other_target_vim
, param
.replace("-", "_")),
813 else: # isinstance str
814 target_vld
["vim_info"][target_vim
][
815 param
.replace("-", "_")
816 ] = vld_params
[param
]
817 if vld_params
.get("common_id"):
818 target_vld
["common_id"] = vld_params
.get("common_id")
820 nslcmop_id
= db_nslcmop
["_id"]
822 "name": db_nsr
["name"],
825 "image": deepcopy(db_nsr
["image"]),
826 "flavor": deepcopy(db_nsr
["flavor"]),
827 "action_id": nslcmop_id
,
828 "cloud_init_content": {},
830 for image
in target
["image"]:
831 image
["vim_info"] = {}
832 for flavor
in target
["flavor"]:
833 flavor
["vim_info"] = {}
835 if db_nslcmop
.get("lcmOperationType") != "instantiate":
836 # get parameters of instantiation:
837 db_nslcmop_instantiate
= self
.db
.get_list(
840 "nsInstanceId": db_nslcmop
["nsInstanceId"],
841 "lcmOperationType": "instantiate",
844 ns_params
= db_nslcmop_instantiate
.get("operationParams")
846 ns_params
= db_nslcmop
.get("operationParams")
847 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
848 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
851 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
852 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
856 "mgmt-network": vld
.get("mgmt-network", False),
857 "type": vld
.get("type"),
860 "vim_network_name": vld
.get("vim-network-name"),
861 "vim_account_id": ns_params
["vimAccountId"],
865 # check if this network needs SDN assist
866 if vld
.get("pci-interfaces"):
867 db_vim
= get_vim_account(ns_params
["vimAccountId"])
868 sdnc_id
= db_vim
["config"].get("sdn-controller")
870 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
871 target_sdn
= "sdn:{}".format(sdnc_id
)
872 target_vld
["vim_info"][target_sdn
] = {
874 "target_vim": target_vim
,
876 "type": vld
.get("type"),
879 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
880 for nsd_vnf_profile
in nsd_vnf_profiles
:
881 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
882 if cp
["virtual-link-profile-id"] == vld
["id"]:
884 "member_vnf:{}.{}".format(
885 cp
["constituent-cpd-id"][0][
886 "constituent-base-element-id"
888 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
890 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
892 # check at nsd descriptor, if there is an ip-profile
894 nsd_vlp
= find_in_list(
895 get_virtual_link_profiles(nsd
),
896 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
901 and nsd_vlp
.get("virtual-link-protocol-data")
902 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
904 ip_profile_source_data
= nsd_vlp
["virtual-link-protocol-data"][
907 ip_profile_dest_data
= {}
908 if "ip-version" in ip_profile_source_data
:
909 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
912 if "cidr" in ip_profile_source_data
:
913 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
916 if "gateway-ip" in ip_profile_source_data
:
917 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
[
920 if "dhcp-enabled" in ip_profile_source_data
:
921 ip_profile_dest_data
["dhcp-params"] = {
922 "enabled": ip_profile_source_data
["dhcp-enabled"]
924 vld_params
["ip-profile"] = ip_profile_dest_data
926 # update vld_params with instantiation params
927 vld_instantiation_params
= find_in_list(
928 get_iterable(ns_params
, "vld"),
929 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
931 if vld_instantiation_params
:
932 vld_params
.update(vld_instantiation_params
)
933 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
934 target
["ns"]["vld"].append(target_vld
)
936 for vnfr
in db_vnfrs
.values():
938 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
940 vnf_params
= find_in_list(
941 get_iterable(ns_params
, "vnf"),
942 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
944 target_vnf
= deepcopy(vnfr
)
945 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
946 for vld
in target_vnf
.get("vld", ()):
947 # check if connected to a ns.vld, to fill target'
948 vnf_cp
= find_in_list(
949 vnfd
.get("int-virtual-link-desc", ()),
950 lambda cpd
: cpd
.get("id") == vld
["id"],
953 ns_cp
= "member_vnf:{}.{}".format(
954 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
956 if cp2target
.get(ns_cp
):
957 vld
["target"] = cp2target
[ns_cp
]
960 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
962 # check if this network needs SDN assist
964 if vld
.get("pci-interfaces"):
965 db_vim
= get_vim_account(vnfr
["vim-account-id"])
966 sdnc_id
= db_vim
["config"].get("sdn-controller")
968 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
969 target_sdn
= "sdn:{}".format(sdnc_id
)
970 vld
["vim_info"][target_sdn
] = {
972 "target_vim": target_vim
,
974 "type": vld
.get("type"),
977 # check at vnfd descriptor, if there is an ip-profile
979 vnfd_vlp
= find_in_list(
980 get_virtual_link_profiles(vnfd
),
981 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
985 and vnfd_vlp
.get("virtual-link-protocol-data")
986 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
988 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"][
991 ip_profile_dest_data
= {}
992 if "ip-version" in ip_profile_source_data
:
993 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
996 if "cidr" in ip_profile_source_data
:
997 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1000 if "gateway-ip" in ip_profile_source_data
:
1001 ip_profile_dest_data
[
1003 ] = ip_profile_source_data
["gateway-ip"]
1004 if "dhcp-enabled" in ip_profile_source_data
:
1005 ip_profile_dest_data
["dhcp-params"] = {
1006 "enabled": ip_profile_source_data
["dhcp-enabled"]
1009 vld_params
["ip-profile"] = ip_profile_dest_data
1010 # update vld_params with instantiation params
1012 vld_instantiation_params
= find_in_list(
1013 get_iterable(vnf_params
, "internal-vld"),
1014 lambda i_vld
: i_vld
["name"] == vld
["id"],
1016 if vld_instantiation_params
:
1017 vld_params
.update(vld_instantiation_params
)
1018 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
1021 for vdur
in target_vnf
.get("vdur", ()):
1022 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
1023 continue # This vdu must not be created
1024 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
1026 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
1029 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
1030 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
1033 and vdu_configuration
.get("config-access")
1034 and vdu_configuration
.get("config-access").get("ssh-access")
1036 vdur
["ssh-keys"] = ssh_keys_all
1037 vdur
["ssh-access-required"] = vdu_configuration
[
1039 ]["ssh-access"]["required"]
1042 and vnf_configuration
.get("config-access")
1043 and vnf_configuration
.get("config-access").get("ssh-access")
1044 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
1046 vdur
["ssh-keys"] = ssh_keys_all
1047 vdur
["ssh-access-required"] = vnf_configuration
[
1049 ]["ssh-access"]["required"]
1050 elif ssh_keys_instantiation
and find_in_list(
1051 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
1053 vdur
["ssh-keys"] = ssh_keys_instantiation
1055 self
.logger
.debug("NS > vdur > {}".format(vdur
))
1057 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1059 if vdud
.get("cloud-init-file"):
1060 vdur
["cloud-init"] = "{}:file:{}".format(
1061 vnfd
["_id"], vdud
.get("cloud-init-file")
1063 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1064 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1065 base_folder
= vnfd
["_admin"]["storage"]
1066 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1067 base_folder
["folder"],
1068 base_folder
["pkg-dir"],
1069 vdud
.get("cloud-init-file"),
1071 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1072 target
["cloud_init_content"][
1075 elif vdud
.get("cloud-init"):
1076 vdur
["cloud-init"] = "{}:vdu:{}".format(
1077 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1079 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1080 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1083 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1084 deploy_params_vdu
= self
._format
_additional
_params
(
1085 vdur
.get("additionalParams") or {}
1087 deploy_params_vdu
["OSM"] = get_osm_params(
1088 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1090 vdur
["additionalParams"] = deploy_params_vdu
1093 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1094 if target_vim
not in ns_flavor
["vim_info"]:
1095 ns_flavor
["vim_info"][target_vim
] = {}
1098 # in case alternative images are provided we must check if they should be applied
1099 # for the vim_type, modify the vim_type taking into account
1100 ns_image_id
= int(vdur
["ns-image-id"])
1101 if vdur
.get("alt-image-ids"):
1102 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1103 vim_type
= db_vim
["vim_type"]
1104 for alt_image_id
in vdur
.get("alt-image-ids"):
1105 ns_alt_image
= target
["image"][int(alt_image_id
)]
1106 if vim_type
== ns_alt_image
.get("vim-type"):
1107 # must use alternative image
1109 "use alternative image id: {}".format(alt_image_id
)
1111 ns_image_id
= alt_image_id
1112 vdur
["ns-image-id"] = ns_image_id
1114 ns_image
= target
["image"][int(ns_image_id
)]
1115 if target_vim
not in ns_image
["vim_info"]:
1116 ns_image
["vim_info"][target_vim
] = {}
1118 vdur
["vim_info"] = {target_vim
: {}}
1119 # instantiation parameters
1121 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
1122 # vdud["id"]), None)
1123 vdur_list
.append(vdur
)
1124 target_vnf
["vdur"] = vdur_list
1125 target
["vnf"].append(target_vnf
)
1127 desc
= await self
.RO
.deploy(nsr_id
, target
)
1128 self
.logger
.debug("RO return > {}".format(desc
))
1129 action_id
= desc
["action_id"]
1130 await self
._wait
_ng
_ro
(
1131 nsr_id
, action_id
, nslcmop_id
, start_deploy
, timeout_ns_deploy
, stage
1136 "_admin.deployed.RO.operational-status": "running",
1137 "detailed-status": " ".join(stage
),
1139 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1140 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1141 self
._write
_op
_status
(nslcmop_id
, stage
)
1143 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1147 async def _wait_ng_ro(
1156 detailed_status_old
= None
1158 start_time
= start_time
or time()
1159 while time() <= start_time
+ timeout
:
1160 desc_status
= await self
.RO
.status(nsr_id
, action_id
)
1161 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1162 if desc_status
["status"] == "FAILED":
1163 raise NgRoException(desc_status
["details"])
1164 elif desc_status
["status"] == "BUILD":
1166 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1167 elif desc_status
["status"] == "DONE":
1169 stage
[2] = "Deployed at VIM"
1172 assert False, "ROclient.check_ns_status returns unknown {}".format(
1173 desc_status
["status"]
1175 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1176 detailed_status_old
= stage
[2]
1177 db_nsr_update
["detailed-status"] = " ".join(stage
)
1178 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1179 self
._write
_op
_status
(nslcmop_id
, stage
)
1180 await asyncio
.sleep(15, loop
=self
.loop
)
1181 else: # timeout_ns_deploy
1182 raise NgRoException("Timeout waiting ns to deploy")
1184 async def _terminate_ng_ro(
1185 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1190 start_deploy
= time()
1197 "action_id": nslcmop_id
,
1199 desc
= await self
.RO
.deploy(nsr_id
, target
)
1200 action_id
= desc
["action_id"]
1201 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1202 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1205 + "ns terminate action at RO. action_id={}".format(action_id
)
1209 delete_timeout
= 20 * 60 # 20 minutes
1210 await self
._wait
_ng
_ro
(
1211 nsr_id
, action_id
, nslcmop_id
, start_deploy
, delete_timeout
, stage
1214 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1215 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1217 await self
.RO
.delete(nsr_id
)
1218 except Exception as e
:
1219 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
1220 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1221 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1222 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1224 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1226 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
1227 failed_detail
.append("delete conflict: {}".format(e
))
1230 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1233 failed_detail
.append("delete error: {}".format(e
))
1236 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1240 stage
[2] = "Error deleting from VIM"
1242 stage
[2] = "Deleted from VIM"
1243 db_nsr_update
["detailed-status"] = " ".join(stage
)
1244 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1245 self
._write
_op
_status
(nslcmop_id
, stage
)
1248 raise LcmException("; ".join(failed_detail
))
1251 async def instantiate_RO(
1265 :param logging_text: preffix text to use at logging
1266 :param nsr_id: nsr identity
1267 :param nsd: database content of ns descriptor
1268 :param db_nsr: database content of ns record
1269 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1271 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1272 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1273 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1274 :return: None or exception
1277 start_deploy
= time()
1278 ns_params
= db_nslcmop
.get("operationParams")
1279 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1280 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1282 timeout_ns_deploy
= self
.timeout
.get(
1283 "ns_deploy", self
.timeout_ns_deploy
1286 # Check for and optionally request placement optimization. Database will be updated if placement activated
1287 stage
[2] = "Waiting for Placement."
1288 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1289 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1290 for vnfr
in db_vnfrs
.values():
1291 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1294 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1296 return await self
._instantiate
_ng
_ro
(
1309 except Exception as e
:
1310 stage
[2] = "ERROR deploying at VIM"
1311 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1313 "Error deploying at VIM {}".format(e
),
1314 exc_info
=not isinstance(
1317 ROclient
.ROClientException
,
1326 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1328 Wait for kdu to be up, get ip address
1329 :param logging_text: prefix use for logging
1336 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1339 while nb_tries
< 360:
1340 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1344 for x
in get_iterable(db_vnfr
, "kdur")
1345 if x
.get("kdu-name") == kdu_name
1351 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1353 if kdur
.get("status"):
1354 if kdur
["status"] in ("READY", "ENABLED"):
1355 return kdur
.get("ip-address")
1358 "target KDU={} is in error state".format(kdu_name
)
1361 await asyncio
.sleep(10, loop
=self
.loop
)
1363 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1365 async def wait_vm_up_insert_key_ro(
1366 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1369 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1370 :param logging_text: prefix use for logging
1375 :param pub_key: public ssh key to inject, None to skip
1376 :param user: user to apply the public ssh key
1380 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1384 target_vdu_id
= None
1390 if ro_retries
>= 360: # 1 hour
1392 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1395 await asyncio
.sleep(10, loop
=self
.loop
)
1398 if not target_vdu_id
:
1399 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1401 if not vdu_id
: # for the VNF case
1402 if db_vnfr
.get("status") == "ERROR":
1404 "Cannot inject ssh-key because target VNF is in error state"
1406 ip_address
= db_vnfr
.get("ip-address")
1412 for x
in get_iterable(db_vnfr
, "vdur")
1413 if x
.get("ip-address") == ip_address
1421 for x
in get_iterable(db_vnfr
, "vdur")
1422 if x
.get("vdu-id-ref") == vdu_id
1423 and x
.get("count-index") == vdu_index
1429 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1430 ): # If only one, this should be the target vdu
1431 vdur
= db_vnfr
["vdur"][0]
1434 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1435 vnfr_id
, vdu_id
, vdu_index
1438 # New generation RO stores information at "vim_info"
1441 if vdur
.get("vim_info"):
1443 t
for t
in vdur
["vim_info"]
1444 ) # there should be only one key
1445 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1447 vdur
.get("pdu-type")
1448 or vdur
.get("status") == "ACTIVE"
1449 or ng_ro_status
== "ACTIVE"
1451 ip_address
= vdur
.get("ip-address")
1454 target_vdu_id
= vdur
["vdu-id-ref"]
1455 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1457 "Cannot inject ssh-key because target VM is in error state"
1460 if not target_vdu_id
:
1463 # inject public key into machine
1464 if pub_key
and user
:
1465 self
.logger
.debug(logging_text
+ "Inserting RO key")
1466 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1467 if vdur
.get("pdu-type"):
1468 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1471 ro_vm_id
= "{}-{}".format(
1472 db_vnfr
["member-vnf-index-ref"], target_vdu_id
1473 ) # TODO add vdu_index
1477 "action": "inject_ssh_key",
1481 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1483 desc
= await self
.RO
.deploy(nsr_id
, target
)
1484 action_id
= desc
["action_id"]
1485 await self
._wait
_ng
_ro
(nsr_id
, action_id
, timeout
=600)
1488 # wait until NS is deployed at RO
1490 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1491 ro_nsr_id
= deep_get(
1492 db_nsrs
, ("_admin", "deployed", "RO", "nsr_id")
1496 result_dict
= await self
.RO
.create_action(
1498 item_id_name
=ro_nsr_id
,
1500 "add_public_key": pub_key
,
1505 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1506 if not result_dict
or not isinstance(result_dict
, dict):
1508 "Unknown response from RO when injecting key"
1510 for result
in result_dict
.values():
1511 if result
.get("vim_result") == 200:
1514 raise ROclient
.ROClientException(
1515 "error injecting key: {}".format(
1516 result
.get("description")
1520 except NgRoException
as e
:
1522 "Reaching max tries injecting key. Error: {}".format(e
)
1524 except ROclient
.ROClientException
as e
:
1528 + "error injecting key: {}. Retrying until {} seconds".format(
1535 "Reaching max tries injecting key. Error: {}".format(e
)
1542 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1544 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1546 my_vca
= vca_deployed_list
[vca_index
]
1547 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1548 # vdu or kdu: no dependencies
1552 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1553 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1554 configuration_status_list
= db_nsr
["configurationStatus"]
1555 for index
, vca_deployed
in enumerate(configuration_status_list
):
1556 if index
== vca_index
:
1559 if not my_vca
.get("member-vnf-index") or (
1560 vca_deployed
.get("member-vnf-index")
1561 == my_vca
.get("member-vnf-index")
1563 internal_status
= configuration_status_list
[index
].get("status")
1564 if internal_status
== "READY":
1566 elif internal_status
== "BROKEN":
1568 "Configuration aborted because dependent charm/s has failed"
1573 # no dependencies, return
1575 await asyncio
.sleep(10)
1578 raise LcmException("Configuration aborted because dependent charm/s timeout")
1580 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1583 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1585 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1586 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1589 async def instantiate_N2VC(
1606 ee_config_descriptor
,
1608 nsr_id
= db_nsr
["_id"]
1609 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1610 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1611 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1612 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1614 "collection": "nsrs",
1615 "filter": {"_id": nsr_id
},
1616 "path": db_update_entry
,
1622 element_under_configuration
= nsr_id
1626 vnfr_id
= db_vnfr
["_id"]
1627 osm_config
["osm"]["vnf_id"] = vnfr_id
1629 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1631 if vca_type
== "native_charm":
1634 index_number
= vdu_index
or 0
1637 element_type
= "VNF"
1638 element_under_configuration
= vnfr_id
1639 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1641 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1642 element_type
= "VDU"
1643 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1644 osm_config
["osm"]["vdu_id"] = vdu_id
1646 namespace
+= ".{}".format(kdu_name
)
1647 element_type
= "KDU"
1648 element_under_configuration
= kdu_name
1649 osm_config
["osm"]["kdu_name"] = kdu_name
1652 artifact_path
= "{}/{}/{}/{}".format(
1653 base_folder
["folder"],
1654 base_folder
["pkg-dir"],
1656 if vca_type
in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1661 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1663 # get initial_config_primitive_list that applies to this element
1664 initial_config_primitive_list
= config_descriptor
.get(
1665 "initial-config-primitive"
1669 "Initial config primitive list > {}".format(
1670 initial_config_primitive_list
1674 # add config if not present for NS charm
1675 ee_descriptor_id
= ee_config_descriptor
.get("id")
1676 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1677 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1678 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1682 "Initial config primitive list #2 > {}".format(
1683 initial_config_primitive_list
1686 # n2vc_redesign STEP 3.1
1687 # find old ee_id if exists
1688 ee_id
= vca_deployed
.get("ee_id")
1690 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1691 # create or register execution environment in VCA
1692 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1694 self
._write
_configuration
_status
(
1696 vca_index
=vca_index
,
1698 element_under_configuration
=element_under_configuration
,
1699 element_type
=element_type
,
1702 step
= "create execution environment"
1703 self
.logger
.debug(logging_text
+ step
)
1707 if vca_type
== "k8s_proxy_charm":
1708 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1709 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1710 namespace
=namespace
,
1711 artifact_path
=artifact_path
,
1715 elif vca_type
== "helm" or vca_type
== "helm-v3":
1716 ee_id
, credentials
= await self
.vca_map
[
1718 ].create_execution_environment(
1719 namespace
=namespace
,
1723 artifact_path
=artifact_path
,
1727 ee_id
, credentials
= await self
.vca_map
[
1729 ].create_execution_environment(
1730 namespace
=namespace
,
1736 elif vca_type
== "native_charm":
1737 step
= "Waiting to VM being up and getting IP address"
1738 self
.logger
.debug(logging_text
+ step
)
1739 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1748 credentials
= {"hostname": rw_mgmt_ip
}
1750 username
= deep_get(
1751 config_descriptor
, ("config-access", "ssh-access", "default-user")
1753 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1754 # merged. Meanwhile let's get username from initial-config-primitive
1755 if not username
and initial_config_primitive_list
:
1756 for config_primitive
in initial_config_primitive_list
:
1757 for param
in config_primitive
.get("parameter", ()):
1758 if param
["name"] == "ssh-username":
1759 username
= param
["value"]
1763 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1764 "'config-access.ssh-access.default-user'"
1766 credentials
["username"] = username
1767 # n2vc_redesign STEP 3.2
1769 self
._write
_configuration
_status
(
1771 vca_index
=vca_index
,
1772 status
="REGISTERING",
1773 element_under_configuration
=element_under_configuration
,
1774 element_type
=element_type
,
1777 step
= "register execution environment {}".format(credentials
)
1778 self
.logger
.debug(logging_text
+ step
)
1779 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1780 credentials
=credentials
,
1781 namespace
=namespace
,
1786 # for compatibility with MON/POL modules, the need model and application name at database
1787 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1788 ee_id_parts
= ee_id
.split(".")
1789 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1790 if len(ee_id_parts
) >= 2:
1791 model_name
= ee_id_parts
[0]
1792 application_name
= ee_id_parts
[1]
1793 db_nsr_update
[db_update_entry
+ "model"] = model_name
1794 db_nsr_update
[db_update_entry
+ "application"] = application_name
1796 # n2vc_redesign STEP 3.3
1797 step
= "Install configuration Software"
1799 self
._write
_configuration
_status
(
1801 vca_index
=vca_index
,
1802 status
="INSTALLING SW",
1803 element_under_configuration
=element_under_configuration
,
1804 element_type
=element_type
,
1805 other_update
=db_nsr_update
,
1808 # TODO check if already done
1809 self
.logger
.debug(logging_text
+ step
)
1811 if vca_type
== "native_charm":
1812 config_primitive
= next(
1813 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
1816 if config_primitive
:
1817 config
= self
._map
_primitive
_params
(
1818 config_primitive
, {}, deploy_params
1821 if vca_type
== "lxc_proxy_charm":
1822 if element_type
== "NS":
1823 num_units
= db_nsr
.get("config-units") or 1
1824 elif element_type
== "VNF":
1825 num_units
= db_vnfr
.get("config-units") or 1
1826 elif element_type
== "VDU":
1827 for v
in db_vnfr
["vdur"]:
1828 if vdu_id
== v
["vdu-id-ref"]:
1829 num_units
= v
.get("config-units") or 1
1831 if vca_type
!= "k8s_proxy_charm":
1832 await self
.vca_map
[vca_type
].install_configuration_sw(
1834 artifact_path
=artifact_path
,
1837 num_units
=num_units
,
1842 # write in db flag of configuration_sw already installed
1844 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
1847 # add relations for this VCA (wait for other peers related with this VCA)
1848 await self
._add
_vca
_relations
(
1849 logging_text
=logging_text
,
1852 vca_index
=vca_index
,
1855 # if SSH access is required, then get execution environment SSH public
1856 # if native charm we have waited already to VM be UP
1857 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1860 # self.logger.debug("get ssh key block")
1862 config_descriptor
, ("config-access", "ssh-access", "required")
1864 # self.logger.debug("ssh key needed")
1865 # Needed to inject a ssh key
1868 ("config-access", "ssh-access", "default-user"),
1870 step
= "Install configuration Software, getting public ssh key"
1871 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
1872 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
1875 step
= "Insert public key into VM user={} ssh_key={}".format(
1879 # self.logger.debug("no need to get ssh key")
1880 step
= "Waiting to VM being up and getting IP address"
1881 self
.logger
.debug(logging_text
+ step
)
1883 # n2vc_redesign STEP 5.1
1884 # wait for RO (ip-address) Insert pub_key into VM
1887 rw_mgmt_ip
= await self
.wait_kdu_up(
1888 logging_text
, nsr_id
, vnfr_id
, kdu_name
1891 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1901 rw_mgmt_ip
= None # This is for a NS configuration
1903 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
1905 # store rw_mgmt_ip in deploy params for later replacement
1906 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
1908 # n2vc_redesign STEP 6 Execute initial config primitive
1909 step
= "execute initial config primitive"
1911 # wait for dependent primitives execution (NS -> VNF -> VDU)
1912 if initial_config_primitive_list
:
1913 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
1915 # stage, in function of element type: vdu, kdu, vnf or ns
1916 my_vca
= vca_deployed_list
[vca_index
]
1917 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1919 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
1920 elif my_vca
.get("member-vnf-index"):
1922 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
1925 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
1927 self
._write
_configuration
_status
(
1928 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
1931 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
1933 check_if_terminated_needed
= True
1934 for initial_config_primitive
in initial_config_primitive_list
:
1935 # adding information on the vca_deployed if it is a NS execution environment
1936 if not vca_deployed
["member-vnf-index"]:
1937 deploy_params
["ns_config_info"] = json
.dumps(
1938 self
._get
_ns
_config
_info
(nsr_id
)
1940 # TODO check if already done
1941 primitive_params_
= self
._map
_primitive
_params
(
1942 initial_config_primitive
, {}, deploy_params
1945 step
= "execute primitive '{}' params '{}'".format(
1946 initial_config_primitive
["name"], primitive_params_
1948 self
.logger
.debug(logging_text
+ step
)
1949 await self
.vca_map
[vca_type
].exec_primitive(
1951 primitive_name
=initial_config_primitive
["name"],
1952 params_dict
=primitive_params_
,
1957 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
1958 if check_if_terminated_needed
:
1959 if config_descriptor
.get("terminate-config-primitive"):
1961 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
1963 check_if_terminated_needed
= False
1965 # TODO register in database that primitive is done
1967 # STEP 7 Configure metrics
1968 if vca_type
== "helm" or vca_type
== "helm-v3":
1969 prometheus_jobs
= await self
.add_prometheus_metrics(
1971 artifact_path
=artifact_path
,
1972 ee_config_descriptor
=ee_config_descriptor
,
1975 target_ip
=rw_mgmt_ip
,
1981 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
1984 step
= "instantiated at VCA"
1985 self
.logger
.debug(logging_text
+ step
)
1987 self
._write
_configuration
_status
(
1988 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
1991 except Exception as e
: # TODO not use Exception but N2VC exception
1992 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1994 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
1997 "Exception while {} : {}".format(step
, e
), exc_info
=True
1999 self
._write
_configuration
_status
(
2000 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2002 raise LcmException("{} {}".format(step
, e
)) from e
2004 def _write_ns_status(
2008 current_operation
: str,
2009 current_operation_id
: str,
2010 error_description
: str = None,
2011 error_detail
: str = None,
2012 other_update
: dict = None,
2015 Update db_nsr fields.
2018 :param current_operation:
2019 :param current_operation_id:
2020 :param error_description:
2021 :param error_detail:
2022 :param other_update: Other required changes at database if provided, will be cleared
2026 db_dict
= other_update
or {}
2029 ] = current_operation_id
# for backward compatibility
2030 db_dict
["_admin.current-operation"] = current_operation_id
2031 db_dict
["_admin.operation-type"] = (
2032 current_operation
if current_operation
!= "IDLE" else None
2034 db_dict
["currentOperation"] = current_operation
2035 db_dict
["currentOperationID"] = current_operation_id
2036 db_dict
["errorDescription"] = error_description
2037 db_dict
["errorDetail"] = error_detail
2040 db_dict
["nsState"] = ns_state
2041 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2042 except DbException
as e
:
2043 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2045 def _write_op_status(
2049 error_message
: str = None,
2050 queuePosition
: int = 0,
2051 operation_state
: str = None,
2052 other_update
: dict = None,
2055 db_dict
= other_update
or {}
2056 db_dict
["queuePosition"] = queuePosition
2057 if isinstance(stage
, list):
2058 db_dict
["stage"] = stage
[0]
2059 db_dict
["detailed-status"] = " ".join(stage
)
2060 elif stage
is not None:
2061 db_dict
["stage"] = str(stage
)
2063 if error_message
is not None:
2064 db_dict
["errorMessage"] = error_message
2065 if operation_state
is not None:
2066 db_dict
["operationState"] = operation_state
2067 db_dict
["statusEnteredTime"] = time()
2068 self
.update_db_2("nslcmops", op_id
, db_dict
)
2069 except DbException
as e
:
2071 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2074 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2076 nsr_id
= db_nsr
["_id"]
2077 # configurationStatus
2078 config_status
= db_nsr
.get("configurationStatus")
2081 "configurationStatus.{}.status".format(index
): status
2082 for index
, v
in enumerate(config_status
)
2086 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2088 except DbException
as e
:
2090 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2093 def _write_configuration_status(
2098 element_under_configuration
: str = None,
2099 element_type
: str = None,
2100 other_update
: dict = None,
2103 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2104 # .format(vca_index, status))
2107 db_path
= "configurationStatus.{}.".format(vca_index
)
2108 db_dict
= other_update
or {}
2110 db_dict
[db_path
+ "status"] = status
2111 if element_under_configuration
:
2113 db_path
+ "elementUnderConfiguration"
2114 ] = element_under_configuration
2116 db_dict
[db_path
+ "elementType"] = element_type
2117 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2118 except DbException
as e
:
2120 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2121 status
, nsr_id
, vca_index
, e
2125 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2127 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2128 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2129 Database is used because the result can be obtained from a different LCM worker in case of HA.
2130 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2131 :param db_nslcmop: database content of nslcmop
2132 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2133 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2134 computed 'vim-account-id'
2137 nslcmop_id
= db_nslcmop
["_id"]
2138 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2139 if placement_engine
== "PLA":
2141 logging_text
+ "Invoke and wait for placement optimization"
2143 await self
.msg
.aiowrite(
2144 "pla", "get_placement", {"nslcmopId": nslcmop_id
}, loop
=self
.loop
2146 db_poll_interval
= 5
2147 wait
= db_poll_interval
* 10
2149 while not pla_result
and wait
>= 0:
2150 await asyncio
.sleep(db_poll_interval
)
2151 wait
-= db_poll_interval
2152 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2153 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2157 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2160 for pla_vnf
in pla_result
["vnf"]:
2161 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2162 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2167 {"_id": vnfr
["_id"]},
2168 {"vim-account-id": pla_vnf
["vimAccountId"]},
2171 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2174 def update_nsrs_with_pla_result(self
, params
):
2176 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2178 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2180 except Exception as e
:
2181 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2183 async def instantiate(self
, nsr_id
, nslcmop_id
):
2186 :param nsr_id: ns instance to deploy
2187 :param nslcmop_id: operation to run
2191 # Try to lock HA task here
2192 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2193 if not task_is_locked_by_me
:
2195 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2199 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2200 self
.logger
.debug(logging_text
+ "Enter")
2202 # get all needed from database
2204 # database nsrs record
2207 # database nslcmops record
2210 # update operation on nsrs
2212 # update operation on nslcmops
2213 db_nslcmop_update
= {}
2215 nslcmop_operation_state
= None
2216 db_vnfrs
= {} # vnf's info indexed by member-index
2218 tasks_dict_info
= {} # from task to info text
2222 "Stage 1/5: preparation of the environment.",
2223 "Waiting for previous operations to terminate.",
2226 # ^ stage, step, VIM progress
2228 # wait for any previous tasks in process
2229 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2231 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2232 stage
[1] = "Reading from database."
2233 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2234 db_nsr_update
["detailed-status"] = "creating"
2235 db_nsr_update
["operational-status"] = "init"
2236 self
._write
_ns
_status
(
2238 ns_state
="BUILDING",
2239 current_operation
="INSTANTIATING",
2240 current_operation_id
=nslcmop_id
,
2241 other_update
=db_nsr_update
,
2243 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2245 # read from db: operation
2246 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2247 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2248 ns_params
= db_nslcmop
.get("operationParams")
2249 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2250 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2252 timeout_ns_deploy
= self
.timeout
.get(
2253 "ns_deploy", self
.timeout_ns_deploy
2257 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2258 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2259 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2260 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2261 self
.fs
.sync(db_nsr
["nsd-id"])
2263 # nsr_name = db_nsr["name"] # TODO short-name??
2265 # read from db: vnf's of this ns
2266 stage
[1] = "Getting vnfrs from db."
2267 self
.logger
.debug(logging_text
+ stage
[1])
2268 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2270 # read from db: vnfd's for every vnf
2271 db_vnfds
= [] # every vnfd data
2273 # for each vnf in ns, read vnfd
2274 for vnfr
in db_vnfrs_list
:
2275 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2276 vnfd_id
= vnfr
["vnfd-id"]
2277 vnfd_ref
= vnfr
["vnfd-ref"]
2278 self
.fs
.sync(vnfd_id
)
2280 # if we haven't this vnfd, read it from db
2281 if vnfd_id
not in db_vnfds
:
2283 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2286 self
.logger
.debug(logging_text
+ stage
[1])
2287 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2290 db_vnfds
.append(vnfd
)
2292 # Get or generates the _admin.deployed.VCA list
2293 vca_deployed_list
= None
2294 if db_nsr
["_admin"].get("deployed"):
2295 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2296 if vca_deployed_list
is None:
2297 vca_deployed_list
= []
2298 configuration_status_list
= []
2299 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2300 db_nsr_update
["configurationStatus"] = configuration_status_list
2301 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2302 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2303 elif isinstance(vca_deployed_list
, dict):
2304 # maintain backward compatibility. Change a dict to list at database
2305 vca_deployed_list
= list(vca_deployed_list
.values())
2306 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2307 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2310 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2312 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2313 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2315 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2316 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2317 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2319 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2322 # n2vc_redesign STEP 2 Deploy Network Scenario
2323 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2324 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2326 stage
[1] = "Deploying KDUs."
2327 # self.logger.debug(logging_text + "Before deploy_kdus")
2328 # Call to deploy_kdus in case exists the "vdu:kdu" param
2329 await self
.deploy_kdus(
2330 logging_text
=logging_text
,
2332 nslcmop_id
=nslcmop_id
,
2335 task_instantiation_info
=tasks_dict_info
,
2338 stage
[1] = "Getting VCA public key."
2339 # n2vc_redesign STEP 1 Get VCA public ssh-key
2340 # feature 1429. Add n2vc public key to needed VMs
2341 n2vc_key
= self
.n2vc
.get_public_key()
2342 n2vc_key_list
= [n2vc_key
]
2343 if self
.vca_config
.get("public_key"):
2344 n2vc_key_list
.append(self
.vca_config
["public_key"])
2346 stage
[1] = "Deploying NS at VIM."
2347 task_ro
= asyncio
.ensure_future(
2348 self
.instantiate_RO(
2349 logging_text
=logging_text
,
2353 db_nslcmop
=db_nslcmop
,
2356 n2vc_key_list
=n2vc_key_list
,
2360 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2361 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2363 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2364 stage
[1] = "Deploying Execution Environments."
2365 self
.logger
.debug(logging_text
+ stage
[1])
2367 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2368 for vnf_profile
in get_vnf_profiles(nsd
):
2369 vnfd_id
= vnf_profile
["vnfd-id"]
2370 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2371 member_vnf_index
= str(vnf_profile
["id"])
2372 db_vnfr
= db_vnfrs
[member_vnf_index
]
2373 base_folder
= vnfd
["_admin"]["storage"]
2379 # Get additional parameters
2380 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2381 if db_vnfr
.get("additionalParamsForVnf"):
2382 deploy_params
.update(
2383 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2386 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2387 if descriptor_config
:
2389 logging_text
=logging_text
2390 + "member_vnf_index={} ".format(member_vnf_index
),
2393 nslcmop_id
=nslcmop_id
,
2399 member_vnf_index
=member_vnf_index
,
2400 vdu_index
=vdu_index
,
2402 deploy_params
=deploy_params
,
2403 descriptor_config
=descriptor_config
,
2404 base_folder
=base_folder
,
2405 task_instantiation_info
=tasks_dict_info
,
2409 # Deploy charms for each VDU that supports one.
2410 for vdud
in get_vdu_list(vnfd
):
2412 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2413 vdur
= find_in_list(
2414 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2417 if vdur
.get("additionalParams"):
2418 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2420 deploy_params_vdu
= deploy_params
2421 deploy_params_vdu
["OSM"] = get_osm_params(
2422 db_vnfr
, vdu_id
, vdu_count_index
=0
2424 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2426 self
.logger
.debug("VDUD > {}".format(vdud
))
2428 "Descriptor config > {}".format(descriptor_config
)
2430 if descriptor_config
:
2433 for vdu_index
in range(vdud_count
):
2434 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2436 logging_text
=logging_text
2437 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2438 member_vnf_index
, vdu_id
, vdu_index
2442 nslcmop_id
=nslcmop_id
,
2448 member_vnf_index
=member_vnf_index
,
2449 vdu_index
=vdu_index
,
2451 deploy_params
=deploy_params_vdu
,
2452 descriptor_config
=descriptor_config
,
2453 base_folder
=base_folder
,
2454 task_instantiation_info
=tasks_dict_info
,
2457 for kdud
in get_kdu_list(vnfd
):
2458 kdu_name
= kdud
["name"]
2459 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2460 if descriptor_config
:
2465 x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
2467 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2468 if kdur
.get("additionalParams"):
2469 deploy_params_kdu
= parse_yaml_strings(
2470 kdur
["additionalParams"]
2474 logging_text
=logging_text
,
2477 nslcmop_id
=nslcmop_id
,
2483 member_vnf_index
=member_vnf_index
,
2484 vdu_index
=vdu_index
,
2486 deploy_params
=deploy_params_kdu
,
2487 descriptor_config
=descriptor_config
,
2488 base_folder
=base_folder
,
2489 task_instantiation_info
=tasks_dict_info
,
2493 # Check if this NS has a charm configuration
2494 descriptor_config
= nsd
.get("ns-configuration")
2495 if descriptor_config
and descriptor_config
.get("juju"):
2498 member_vnf_index
= None
2504 # Get additional parameters
2505 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2506 if db_nsr
.get("additionalParamsForNs"):
2507 deploy_params
.update(
2508 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2510 base_folder
= nsd
["_admin"]["storage"]
2512 logging_text
=logging_text
,
2515 nslcmop_id
=nslcmop_id
,
2521 member_vnf_index
=member_vnf_index
,
2522 vdu_index
=vdu_index
,
2524 deploy_params
=deploy_params
,
2525 descriptor_config
=descriptor_config
,
2526 base_folder
=base_folder
,
2527 task_instantiation_info
=tasks_dict_info
,
2531 # rest of staff will be done at finally
2534 ROclient
.ROClientException
,
2540 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
2543 except asyncio
.CancelledError
:
2545 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
2547 exc
= "Operation was cancelled"
2548 except Exception as e
:
2549 exc
= traceback
.format_exc()
2550 self
.logger
.critical(
2551 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
2556 error_list
.append(str(exc
))
2558 # wait for pending tasks
2560 stage
[1] = "Waiting for instantiate pending tasks."
2561 self
.logger
.debug(logging_text
+ stage
[1])
2562 error_list
+= await self
._wait
_for
_tasks
(
2570 stage
[1] = stage
[2] = ""
2571 except asyncio
.CancelledError
:
2572 error_list
.append("Cancelled")
2573 # TODO cancel all tasks
2574 except Exception as exc
:
2575 error_list
.append(str(exc
))
2577 # update operation-status
2578 db_nsr_update
["operational-status"] = "running"
2579 # let's begin with VCA 'configured' status (later we can change it)
2580 db_nsr_update
["config-status"] = "configured"
2581 for task
, task_name
in tasks_dict_info
.items():
2582 if not task
.done() or task
.cancelled() or task
.exception():
2583 if task_name
.startswith(self
.task_name_deploy_vca
):
2584 # A N2VC task is pending
2585 db_nsr_update
["config-status"] = "failed"
2587 # RO or KDU task is pending
2588 db_nsr_update
["operational-status"] = "failed"
2590 # update status at database
2592 error_detail
= ". ".join(error_list
)
2593 self
.logger
.error(logging_text
+ error_detail
)
2594 error_description_nslcmop
= "{} Detail: {}".format(
2595 stage
[0], error_detail
2597 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
2598 nslcmop_id
, stage
[0]
2601 db_nsr_update
["detailed-status"] = (
2602 error_description_nsr
+ " Detail: " + error_detail
2604 db_nslcmop_update
["detailed-status"] = error_detail
2605 nslcmop_operation_state
= "FAILED"
2609 error_description_nsr
= error_description_nslcmop
= None
2611 db_nsr_update
["detailed-status"] = "Done"
2612 db_nslcmop_update
["detailed-status"] = "Done"
2613 nslcmop_operation_state
= "COMPLETED"
2616 self
._write
_ns
_status
(
2619 current_operation
="IDLE",
2620 current_operation_id
=None,
2621 error_description
=error_description_nsr
,
2622 error_detail
=error_detail
,
2623 other_update
=db_nsr_update
,
2625 self
._write
_op
_status
(
2628 error_message
=error_description_nslcmop
,
2629 operation_state
=nslcmop_operation_state
,
2630 other_update
=db_nslcmop_update
,
2633 if nslcmop_operation_state
:
2635 await self
.msg
.aiowrite(
2640 "nslcmop_id": nslcmop_id
,
2641 "operationState": nslcmop_operation_state
,
2645 except Exception as e
:
2647 logging_text
+ "kafka_write notification Exception {}".format(e
)
2650 self
.logger
.debug(logging_text
+ "Exit")
2651 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2653 def _get_vnfd(self
, vnfd_id
: str, cached_vnfds
: Dict
[str, Any
]):
2654 if vnfd_id
not in cached_vnfds
:
2655 cached_vnfds
[vnfd_id
] = self
.db
.get_one("vnfds", {"id": vnfd_id
})
2656 return cached_vnfds
[vnfd_id
]
2658 def _get_vnfr(self
, nsr_id
: str, vnf_profile_id
: str, cached_vnfrs
: Dict
[str, Any
]):
2659 if vnf_profile_id
not in cached_vnfrs
:
2660 cached_vnfrs
[vnf_profile_id
] = self
.db
.get_one(
2663 "member-vnf-index-ref": vnf_profile_id
,
2664 "nsr-id-ref": nsr_id
,
2667 return cached_vnfrs
[vnf_profile_id
]
2669 def _is_deployed_vca_in_relation(
2670 self
, vca
: DeployedVCA
, relation
: Relation
2673 for endpoint
in (relation
.provider
, relation
.requirer
):
2674 if endpoint
["kdu-resource-profile-id"]:
2677 vca
.vnf_profile_id
== endpoint
.vnf_profile_id
2678 and vca
.vdu_profile_id
== endpoint
.vdu_profile_id
2679 and vca
.execution_environment_ref
== endpoint
.execution_environment_ref
2685 def _update_ee_relation_data_with_implicit_data(
2686 self
, nsr_id
, nsd
, ee_relation_data
, cached_vnfds
, vnf_profile_id
: str = None
2688 ee_relation_data
= safe_get_ee_relation(
2689 nsr_id
, ee_relation_data
, vnf_profile_id
=vnf_profile_id
2691 ee_relation_level
= EELevel
.get_level(ee_relation_data
)
2692 if (ee_relation_level
in (EELevel
.VNF
, EELevel
.VDU
)) and not ee_relation_data
[
2693 "execution-environment-ref"
2695 vnf_profile
= get_vnf_profile(nsd
, ee_relation_data
["vnf-profile-id"])
2696 vnfd_id
= vnf_profile
["vnfd-id"]
2697 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2700 if ee_relation_level
== EELevel
.VNF
2701 else ee_relation_data
["vdu-profile-id"]
2703 ee
= get_juju_ee_ref(db_vnfd
, entity_id
)
2706 f
"not execution environments found for ee_relation {ee_relation_data}"
2708 ee_relation_data
["execution-environment-ref"] = ee
["id"]
2709 return ee_relation_data
2711 def _get_ns_relations(
2714 nsd
: Dict
[str, Any
],
2716 cached_vnfds
: Dict
[str, Any
],
2717 ) -> List
[Relation
]:
2719 db_ns_relations
= get_ns_configuration_relation_list(nsd
)
2720 for r
in db_ns_relations
:
2721 provider_dict
= None
2722 requirer_dict
= None
2723 if all(key
in r
for key
in ("provider", "requirer")):
2724 provider_dict
= r
["provider"]
2725 requirer_dict
= r
["requirer"]
2726 elif "entities" in r
:
2727 provider_id
= r
["entities"][0]["id"]
2730 "endpoint": r
["entities"][0]["endpoint"],
2732 if provider_id
!= nsd
["id"]:
2733 provider_dict
["vnf-profile-id"] = provider_id
2734 requirer_id
= r
["entities"][1]["id"]
2737 "endpoint": r
["entities"][1]["endpoint"],
2739 if requirer_id
!= nsd
["id"]:
2740 requirer_dict
["vnf-profile-id"] = requirer_id
2742 raise Exception("provider/requirer or entities must be included in the relation.")
2743 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2744 nsr_id
, nsd
, provider_dict
, cached_vnfds
2746 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2747 nsr_id
, nsd
, requirer_dict
, cached_vnfds
2749 provider
= EERelation(relation_provider
)
2750 requirer
= EERelation(relation_requirer
)
2751 relation
= Relation(r
["name"], provider
, requirer
)
2752 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
2754 relations
.append(relation
)
2757 def _get_vnf_relations(
2760 nsd
: Dict
[str, Any
],
2762 cached_vnfds
: Dict
[str, Any
],
2763 ) -> List
[Relation
]:
2765 vnf_profile
= get_vnf_profile(nsd
, vca
.vnf_profile_id
)
2766 vnf_profile_id
= vnf_profile
["id"]
2767 vnfd_id
= vnf_profile
["vnfd-id"]
2768 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2769 db_vnf_relations
= get_relation_list(db_vnfd
, vnfd_id
)
2770 for r
in db_vnf_relations
:
2771 provider_dict
= None
2772 requirer_dict
= None
2773 if all(key
in r
for key
in ("provider", "requirer")):
2774 provider_dict
= r
["provider"]
2775 requirer_dict
= r
["requirer"]
2776 elif "entities" in r
:
2777 provider_id
= r
["entities"][0]["id"]
2780 "vnf-profile-id": vnf_profile_id
,
2781 "endpoint": r
["entities"][0]["endpoint"],
2783 if provider_id
!= vnfd_id
:
2784 provider_dict
["vdu-profile-id"] = provider_id
2785 requirer_id
= r
["entities"][1]["id"]
2788 "vnf-profile-id": vnf_profile_id
,
2789 "endpoint": r
["entities"][1]["endpoint"],
2791 if requirer_id
!= vnfd_id
:
2792 requirer_dict
["vdu-profile-id"] = requirer_id
2794 raise Exception("provider/requirer or entities must be included in the relation.")
2795 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2796 nsr_id
, nsd
, provider_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
2798 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2799 nsr_id
, nsd
, requirer_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
2801 provider
= EERelation(relation_provider
)
2802 requirer
= EERelation(relation_requirer
)
2803 relation
= Relation(r
["name"], provider
, requirer
)
2804 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
2806 relations
.append(relation
)
2809 def _get_kdu_resource_data(
2811 ee_relation
: EERelation
,
2812 db_nsr
: Dict
[str, Any
],
2813 cached_vnfds
: Dict
[str, Any
],
2814 ) -> DeployedK8sResource
:
2815 nsd
= get_nsd(db_nsr
)
2816 vnf_profiles
= get_vnf_profiles(nsd
)
2817 vnfd_id
= find_in_list(
2819 lambda vnf_profile
: vnf_profile
["id"] == ee_relation
.vnf_profile_id
,
2821 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2822 kdu_resource_profile
= get_kdu_resource_profile(
2823 db_vnfd
, ee_relation
.kdu_resource_profile_id
2825 kdu_name
= kdu_resource_profile
["kdu-name"]
2826 deployed_kdu
, _
= get_deployed_kdu(
2827 db_nsr
.get("_admin", ()).get("deployed", ()),
2829 ee_relation
.vnf_profile_id
,
2831 deployed_kdu
.update({"resource-name": kdu_resource_profile
["resource-name"]})
2834 def _get_deployed_component(
2836 ee_relation
: EERelation
,
2837 db_nsr
: Dict
[str, Any
],
2838 cached_vnfds
: Dict
[str, Any
],
2839 ) -> DeployedComponent
:
2840 nsr_id
= db_nsr
["_id"]
2841 deployed_component
= None
2842 ee_level
= EELevel
.get_level(ee_relation
)
2843 if ee_level
== EELevel
.NS
:
2844 vca
= get_deployed_vca(db_nsr
, {"vdu_id": None, "member-vnf-index": None})
2846 deployed_component
= DeployedVCA(nsr_id
, vca
)
2847 elif ee_level
== EELevel
.VNF
:
2848 vca
= get_deployed_vca(
2852 "member-vnf-index": ee_relation
.vnf_profile_id
,
2853 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
2857 deployed_component
= DeployedVCA(nsr_id
, vca
)
2858 elif ee_level
== EELevel
.VDU
:
2859 vca
= get_deployed_vca(
2862 "vdu_id": ee_relation
.vdu_profile_id
,
2863 "member-vnf-index": ee_relation
.vnf_profile_id
,
2864 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
2868 deployed_component
= DeployedVCA(nsr_id
, vca
)
2869 elif ee_level
== EELevel
.KDU
:
2870 kdu_resource_data
= self
._get
_kdu
_resource
_data
(
2871 ee_relation
, db_nsr
, cached_vnfds
2873 if kdu_resource_data
:
2874 deployed_component
= DeployedK8sResource(kdu_resource_data
)
2875 return deployed_component
2877 async def _add_relation(
2881 db_nsr
: Dict
[str, Any
],
2882 cached_vnfds
: Dict
[str, Any
],
2883 cached_vnfrs
: Dict
[str, Any
],
2885 deployed_provider
= self
._get
_deployed
_component
(
2886 relation
.provider
, db_nsr
, cached_vnfds
2888 deployed_requirer
= self
._get
_deployed
_component
(
2889 relation
.requirer
, db_nsr
, cached_vnfds
2893 and deployed_requirer
2894 and deployed_provider
.config_sw_installed
2895 and deployed_requirer
.config_sw_installed
2897 provider_db_vnfr
= (
2899 relation
.provider
.nsr_id
,
2900 relation
.provider
.vnf_profile_id
,
2903 if relation
.provider
.vnf_profile_id
2906 requirer_db_vnfr
= (
2908 relation
.requirer
.nsr_id
,
2909 relation
.requirer
.vnf_profile_id
,
2912 if relation
.requirer
.vnf_profile_id
2915 provider_vca_id
= self
.get_vca_id(provider_db_vnfr
, db_nsr
)
2916 requirer_vca_id
= self
.get_vca_id(requirer_db_vnfr
, db_nsr
)
2917 provider_relation_endpoint
= RelationEndpoint(
2918 deployed_provider
.ee_id
,
2920 relation
.provider
.endpoint
,
2922 requirer_relation_endpoint
= RelationEndpoint(
2923 deployed_requirer
.ee_id
,
2925 relation
.requirer
.endpoint
,
2927 await self
.vca_map
[vca_type
].add_relation(
2928 provider
=provider_relation_endpoint
,
2929 requirer
=requirer_relation_endpoint
,
2931 # remove entry from relations list
2935 async def _add_vca_relations(
2941 timeout
: int = 3600,
2945 # 1. find all relations for this VCA
2946 # 2. wait for other peers related
2950 # STEP 1: find all relations for this VCA
2953 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2954 nsd
= get_nsd(db_nsr
)
2957 deployed_vca_dict
= get_deployed_vca_list(db_nsr
)[vca_index
]
2958 my_vca
= DeployedVCA(nsr_id
, deployed_vca_dict
)
2963 relations
.extend(self
._get
_ns
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
2964 relations
.extend(self
._get
_vnf
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
2966 # if no relations, terminate
2968 self
.logger
.debug(logging_text
+ " No relations")
2971 self
.logger
.debug(logging_text
+ " adding relations {}".format(relations
))
2978 if now
- start
>= timeout
:
2979 self
.logger
.error(logging_text
+ " : timeout adding relations")
2982 # reload nsr from database (we need to update record: _admin.deployed.VCA)
2983 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2985 # for each relation, find the VCA's related
2986 for relation
in relations
.copy():
2987 added
= await self
._add
_relation
(
2995 relations
.remove(relation
)
2998 self
.logger
.debug("Relations added")
3000 await asyncio
.sleep(5.0)
3004 except Exception as e
:
3005 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
3008 async def _install_kdu(
3016 k8s_instance_info
: dict,
3017 k8params
: dict = None,
3023 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
3026 "collection": "nsrs",
3027 "filter": {"_id": nsr_id
},
3028 "path": nsr_db_path
,
3031 if k8s_instance_info
.get("kdu-deployment-name"):
3032 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
3034 kdu_instance
= self
.k8scluster_map
[
3036 ].generate_kdu_instance_name(
3037 db_dict
=db_dict_install
,
3038 kdu_model
=k8s_instance_info
["kdu-model"],
3039 kdu_name
=k8s_instance_info
["kdu-name"],
3042 "nsrs", nsr_id
, {nsr_db_path
+ ".kdu-instance": kdu_instance
}
3044 await self
.k8scluster_map
[k8sclustertype
].install(
3045 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3046 kdu_model
=k8s_instance_info
["kdu-model"],
3049 db_dict
=db_dict_install
,
3051 kdu_name
=k8s_instance_info
["kdu-name"],
3052 namespace
=k8s_instance_info
["namespace"],
3053 kdu_instance
=kdu_instance
,
3057 "nsrs", nsr_id
, {nsr_db_path
+ ".kdu-instance": kdu_instance
}
3060 # Obtain services to obtain management service ip
3061 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
3062 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3063 kdu_instance
=kdu_instance
,
3064 namespace
=k8s_instance_info
["namespace"],
3067 # Obtain management service info (if exists)
3068 vnfr_update_dict
= {}
3069 kdu_config
= get_configuration(vnfd
, kdud
["name"])
3071 target_ee_list
= kdu_config
.get("execution-environment-list", [])
3076 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
3079 for service
in kdud
.get("service", [])
3080 if service
.get("mgmt-service")
3082 for mgmt_service
in mgmt_services
:
3083 for service
in services
:
3084 if service
["name"].startswith(mgmt_service
["name"]):
3085 # Mgmt service found, Obtain service ip
3086 ip
= service
.get("external_ip", service
.get("cluster_ip"))
3087 if isinstance(ip
, list) and len(ip
) == 1:
3091 "kdur.{}.ip-address".format(kdu_index
)
3094 # Check if must update also mgmt ip at the vnf
3095 service_external_cp
= mgmt_service
.get(
3096 "external-connection-point-ref"
3098 if service_external_cp
:
3100 deep_get(vnfd
, ("mgmt-interface", "cp"))
3101 == service_external_cp
3103 vnfr_update_dict
["ip-address"] = ip
3108 "external-connection-point-ref", ""
3110 == service_external_cp
,
3113 "kdur.{}.ip-address".format(kdu_index
)
3118 "Mgmt service name: {} not found".format(
3119 mgmt_service
["name"]
3123 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3124 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3126 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3129 and kdu_config
.get("initial-config-primitive")
3130 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3132 initial_config_primitive_list
= kdu_config
.get(
3133 "initial-config-primitive"
3135 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3137 for initial_config_primitive
in initial_config_primitive_list
:
3138 primitive_params_
= self
._map
_primitive
_params
(
3139 initial_config_primitive
, {}, {}
3142 await asyncio
.wait_for(
3143 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3144 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3145 kdu_instance
=kdu_instance
,
3146 primitive_name
=initial_config_primitive
["name"],
3147 params
=primitive_params_
,
3148 db_dict
=db_dict_install
,
3154 except Exception as e
:
3155 # Prepare update db with error and raise exception
3158 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3162 vnfr_data
.get("_id"),
3163 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3166 # ignore to keep original exception
3168 # reraise original error
3173 async def deploy_kdus(
3180 task_instantiation_info
,
3182 # Launch kdus if present in the descriptor
3184 k8scluster_id_2_uuic
= {
3185 "helm-chart-v3": {},
3190 async def _get_cluster_id(cluster_id
, cluster_type
):
3191 nonlocal k8scluster_id_2_uuic
3192 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3193 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3195 # check if K8scluster is creating and wait look if previous tasks in process
3196 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3197 "k8scluster", cluster_id
3200 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3201 task_name
, cluster_id
3203 self
.logger
.debug(logging_text
+ text
)
3204 await asyncio
.wait(task_dependency
, timeout
=3600)
3206 db_k8scluster
= self
.db
.get_one(
3207 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3209 if not db_k8scluster
:
3210 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3212 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3214 if cluster_type
== "helm-chart-v3":
3216 # backward compatibility for existing clusters that have not been initialized for helm v3
3217 k8s_credentials
= yaml
.safe_dump(
3218 db_k8scluster
.get("credentials")
3220 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3221 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3223 db_k8scluster_update
= {}
3224 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3225 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3226 db_k8scluster_update
[
3227 "_admin.helm-chart-v3.created"
3229 db_k8scluster_update
[
3230 "_admin.helm-chart-v3.operationalState"
3233 "k8sclusters", cluster_id
, db_k8scluster_update
3235 except Exception as e
:
3238 + "error initializing helm-v3 cluster: {}".format(str(e
))
3241 "K8s cluster '{}' has not been initialized for '{}'".format(
3242 cluster_id
, cluster_type
3247 "K8s cluster '{}' has not been initialized for '{}'".format(
3248 cluster_id
, cluster_type
3251 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3254 logging_text
+= "Deploy kdus: "
3257 db_nsr_update
= {"_admin.deployed.K8s": []}
3258 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3261 updated_cluster_list
= []
3262 updated_v3_cluster_list
= []
3264 for vnfr_data
in db_vnfrs
.values():
3265 vca_id
= self
.get_vca_id(vnfr_data
, {})
3266 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3267 # Step 0: Prepare and set parameters
3268 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3269 vnfd_id
= vnfr_data
.get("vnfd-id")
3270 vnfd_with_id
= find_in_list(
3271 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3275 for kdud
in vnfd_with_id
["kdu"]
3276 if kdud
["name"] == kdur
["kdu-name"]
3278 namespace
= kdur
.get("k8s-namespace")
3279 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3280 if kdur
.get("helm-chart"):
3281 kdumodel
= kdur
["helm-chart"]
3282 # Default version: helm3, if helm-version is v2 assign v2
3283 k8sclustertype
= "helm-chart-v3"
3284 self
.logger
.debug("kdur: {}".format(kdur
))
3286 kdur
.get("helm-version")
3287 and kdur
.get("helm-version") == "v2"
3289 k8sclustertype
= "helm-chart"
3290 elif kdur
.get("juju-bundle"):
3291 kdumodel
= kdur
["juju-bundle"]
3292 k8sclustertype
= "juju-bundle"
3295 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3296 "juju-bundle. Maybe an old NBI version is running".format(
3297 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3300 # check if kdumodel is a file and exists
3302 vnfd_with_id
= find_in_list(
3303 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3305 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3306 if storage
and storage
.get(
3308 ): # may be not present if vnfd has not artifacts
3309 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3310 filename
= "{}/{}/{}s/{}".format(
3316 if self
.fs
.file_exists(
3317 filename
, mode
="file"
3318 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3319 kdumodel
= self
.fs
.path
+ filename
3320 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3322 except Exception: # it is not a file
3325 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3326 step
= "Synchronize repos for k8s cluster '{}'".format(
3329 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3333 k8sclustertype
== "helm-chart"
3334 and cluster_uuid
not in updated_cluster_list
3336 k8sclustertype
== "helm-chart-v3"
3337 and cluster_uuid
not in updated_v3_cluster_list
3339 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3340 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3341 cluster_uuid
=cluster_uuid
3344 if del_repo_list
or added_repo_dict
:
3345 if k8sclustertype
== "helm-chart":
3347 "_admin.helm_charts_added." + item
: None
3348 for item
in del_repo_list
3351 "_admin.helm_charts_added." + item
: name
3352 for item
, name
in added_repo_dict
.items()
3354 updated_cluster_list
.append(cluster_uuid
)
3355 elif k8sclustertype
== "helm-chart-v3":
3357 "_admin.helm_charts_v3_added." + item
: None
3358 for item
in del_repo_list
3361 "_admin.helm_charts_v3_added." + item
: name
3362 for item
, name
in added_repo_dict
.items()
3364 updated_v3_cluster_list
.append(cluster_uuid
)
3366 logging_text
+ "repos synchronized on k8s cluster "
3367 "'{}' to_delete: {}, to_add: {}".format(
3368 k8s_cluster_id
, del_repo_list
, added_repo_dict
3373 {"_id": k8s_cluster_id
},
3379 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3380 vnfr_data
["member-vnf-index-ref"],
3384 k8s_instance_info
= {
3385 "kdu-instance": None,
3386 "k8scluster-uuid": cluster_uuid
,
3387 "k8scluster-type": k8sclustertype
,
3388 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3389 "kdu-name": kdur
["kdu-name"],
3390 "kdu-model": kdumodel
,
3391 "namespace": namespace
,
3392 "kdu-deployment-name": kdu_deployment_name
,
3394 db_path
= "_admin.deployed.K8s.{}".format(index
)
3395 db_nsr_update
[db_path
] = k8s_instance_info
3396 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3397 vnfd_with_id
= find_in_list(
3398 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3400 task
= asyncio
.ensure_future(
3409 k8params
=desc_params
,
3414 self
.lcm_tasks
.register(
3418 "instantiate_KDU-{}".format(index
),
3421 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3427 except (LcmException
, asyncio
.CancelledError
):
3429 except Exception as e
:
3430 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3431 if isinstance(e
, (N2VCException
, DbException
)):
3432 self
.logger
.error(logging_text
+ msg
)
3434 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3435 raise LcmException(msg
)
3438 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3457 task_instantiation_info
,
3460 # launch instantiate_N2VC in a asyncio task and register task object
3461 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3462 # if not found, create one entry and update database
3463 # fill db_nsr._admin.deployed.VCA.<index>
3466 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
3468 if "execution-environment-list" in descriptor_config
:
3469 ee_list
= descriptor_config
.get("execution-environment-list", [])
3470 elif "juju" in descriptor_config
:
3471 ee_list
= [descriptor_config
] # ns charms
3472 else: # other types as script are not supported
3475 for ee_item
in ee_list
:
3478 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3479 ee_item
.get("juju"), ee_item
.get("helm-chart")
3482 ee_descriptor_id
= ee_item
.get("id")
3483 if ee_item
.get("juju"):
3484 vca_name
= ee_item
["juju"].get("charm")
3487 if ee_item
["juju"].get("charm") is not None
3490 if ee_item
["juju"].get("cloud") == "k8s":
3491 vca_type
= "k8s_proxy_charm"
3492 elif ee_item
["juju"].get("proxy") is False:
3493 vca_type
= "native_charm"
3494 elif ee_item
.get("helm-chart"):
3495 vca_name
= ee_item
["helm-chart"]
3496 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
3499 vca_type
= "helm-v3"
3502 logging_text
+ "skipping non juju neither charm configuration"
3507 for vca_index
, vca_deployed
in enumerate(
3508 db_nsr
["_admin"]["deployed"]["VCA"]
3510 if not vca_deployed
:
3513 vca_deployed
.get("member-vnf-index") == member_vnf_index
3514 and vca_deployed
.get("vdu_id") == vdu_id
3515 and vca_deployed
.get("kdu_name") == kdu_name
3516 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
3517 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
3521 # not found, create one.
3523 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
3526 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
3528 target
+= "/kdu/{}".format(kdu_name
)
3530 "target_element": target
,
3531 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3532 "member-vnf-index": member_vnf_index
,
3534 "kdu_name": kdu_name
,
3535 "vdu_count_index": vdu_index
,
3536 "operational-status": "init", # TODO revise
3537 "detailed-status": "", # TODO revise
3538 "step": "initial-deploy", # TODO revise
3540 "vdu_name": vdu_name
,
3542 "ee_descriptor_id": ee_descriptor_id
,
3546 # create VCA and configurationStatus in db
3548 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
3549 "configurationStatus.{}".format(vca_index
): dict(),
3551 self
.update_db_2("nsrs", nsr_id
, db_dict
)
3553 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
3555 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
3556 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
3557 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
3560 task_n2vc
= asyncio
.ensure_future(
3561 self
.instantiate_N2VC(
3562 logging_text
=logging_text
,
3563 vca_index
=vca_index
,
3569 vdu_index
=vdu_index
,
3570 deploy_params
=deploy_params
,
3571 config_descriptor
=descriptor_config
,
3572 base_folder
=base_folder
,
3573 nslcmop_id
=nslcmop_id
,
3577 ee_config_descriptor
=ee_item
,
3580 self
.lcm_tasks
.register(
3584 "instantiate_N2VC-{}".format(vca_index
),
3587 task_instantiation_info
[
3589 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
3590 member_vnf_index
or "", vdu_id
or ""
3594 def _create_nslcmop(nsr_id
, operation
, params
):
3596 Creates a ns-lcm-opp content to be stored at database.
3597 :param nsr_id: internal id of the instance
3598 :param operation: instantiate, terminate, scale, action, ...
3599 :param params: user parameters for the operation
3600 :return: dictionary following SOL005 format
3602 # Raise exception if invalid arguments
3603 if not (nsr_id
and operation
and params
):
3605 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3612 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3613 "operationState": "PROCESSING",
3614 "statusEnteredTime": now
,
3615 "nsInstanceId": nsr_id
,
3616 "lcmOperationType": operation
,
3618 "isAutomaticInvocation": False,
3619 "operationParams": params
,
3620 "isCancelPending": False,
3622 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
3623 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
3628 def _format_additional_params(self
, params
):
3629 params
= params
or {}
3630 for key
, value
in params
.items():
3631 if str(value
).startswith("!!yaml "):
3632 params
[key
] = yaml
.safe_load(value
[7:])
3635 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
3636 primitive
= seq
.get("name")
3637 primitive_params
= {}
3639 "member_vnf_index": vnf_index
,
3640 "primitive": primitive
,
3641 "primitive_params": primitive_params
,
3644 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
3648 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
3649 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
3650 if op
.get("operationState") == "COMPLETED":
3651 # b. Skip sub-operation
3652 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3653 return self
.SUBOPERATION_STATUS_SKIP
3655 # c. retry executing sub-operation
3656 # The sub-operation exists, and operationState != 'COMPLETED'
3657 # Update operationState = 'PROCESSING' to indicate a retry.
3658 operationState
= "PROCESSING"
3659 detailed_status
= "In progress"
3660 self
._update
_suboperation
_status
(
3661 db_nslcmop
, op_index
, operationState
, detailed_status
3663 # Return the sub-operation index
3664 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3665 # with arguments extracted from the sub-operation
3668 # Find a sub-operation where all keys in a matching dictionary must match
3669 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3670 def _find_suboperation(self
, db_nslcmop
, match
):
3671 if db_nslcmop
and match
:
3672 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
3673 for i
, op
in enumerate(op_list
):
3674 if all(op
.get(k
) == match
[k
] for k
in match
):
3676 return self
.SUBOPERATION_STATUS_NOT_FOUND
3678 # Update status for a sub-operation given its index
3679 def _update_suboperation_status(
3680 self
, db_nslcmop
, op_index
, operationState
, detailed_status
3682 # Update DB for HA tasks
3683 q_filter
= {"_id": db_nslcmop
["_id"]}
3685 "_admin.operations.{}.operationState".format(op_index
): operationState
,
3686 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
3689 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
3692 # Add sub-operation, return the index of the added sub-operation
3693 # Optionally, set operationState, detailed-status, and operationType
3694 # Status and type are currently set for 'scale' sub-operations:
3695 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3696 # 'detailed-status' : status message
3697 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3698 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3699 def _add_suboperation(
3707 mapped_primitive_params
,
3708 operationState
=None,
3709 detailed_status
=None,
3712 RO_scaling_info
=None,
3715 return self
.SUBOPERATION_STATUS_NOT_FOUND
3716 # Get the "_admin.operations" list, if it exists
3717 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
3718 op_list
= db_nslcmop_admin
.get("operations")
3719 # Create or append to the "_admin.operations" list
3721 "member_vnf_index": vnf_index
,
3723 "vdu_count_index": vdu_count_index
,
3724 "primitive": primitive
,
3725 "primitive_params": mapped_primitive_params
,
3728 new_op
["operationState"] = operationState
3730 new_op
["detailed-status"] = detailed_status
3732 new_op
["lcmOperationType"] = operationType
3734 new_op
["RO_nsr_id"] = RO_nsr_id
3736 new_op
["RO_scaling_info"] = RO_scaling_info
3738 # No existing operations, create key 'operations' with current operation as first list element
3739 db_nslcmop_admin
.update({"operations": [new_op
]})
3740 op_list
= db_nslcmop_admin
.get("operations")
3742 # Existing operations, append operation to list
3743 op_list
.append(new_op
)
3745 db_nslcmop_update
= {"_admin.operations": op_list
}
3746 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
3747 op_index
= len(op_list
) - 1
3750 # Helper methods for scale() sub-operations
3752 # pre-scale/post-scale:
3753 # Check for 3 different cases:
3754 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
3755 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
3756 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
3757 def _check_or_add_scale_suboperation(
3761 vnf_config_primitive
,
3765 RO_scaling_info
=None,
3767 # Find this sub-operation
3768 if RO_nsr_id
and RO_scaling_info
:
3769 operationType
= "SCALE-RO"
3771 "member_vnf_index": vnf_index
,
3772 "RO_nsr_id": RO_nsr_id
,
3773 "RO_scaling_info": RO_scaling_info
,
3777 "member_vnf_index": vnf_index
,
3778 "primitive": vnf_config_primitive
,
3779 "primitive_params": primitive_params
,
3780 "lcmOperationType": operationType
,
3782 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
3783 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
3784 # a. New sub-operation
3785 # The sub-operation does not exist, add it.
3786 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
3787 # The following parameters are set to None for all kind of scaling:
3789 vdu_count_index
= None
3791 if RO_nsr_id
and RO_scaling_info
:
3792 vnf_config_primitive
= None
3793 primitive_params
= None
3796 RO_scaling_info
= None
3797 # Initial status for sub-operation
3798 operationState
= "PROCESSING"
3799 detailed_status
= "In progress"
3800 # Add sub-operation for pre/post-scaling (zero or more operations)
3801 self
._add
_suboperation
(
3807 vnf_config_primitive
,
3815 return self
.SUBOPERATION_STATUS_NEW
3817 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3818 # or op_index (operationState != 'COMPLETED')
3819 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
3821 # Function to return execution_environment id
3823 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
3824 # TODO vdu_index_count
3825 for vca
in vca_deployed_list
:
3826 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
3829 async def destroy_N2VC(
3837 exec_primitives
=True,
3842 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
3843 :param logging_text:
3845 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
3846 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
3847 :param vca_index: index in the database _admin.deployed.VCA
3848 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
3849 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
3850 not executed properly
3851 :param scaling_in: True destroys the application, False destroys the model
3852 :return: None or exception
3857 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
3858 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
3862 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
3864 # execute terminate_primitives
3866 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
3867 config_descriptor
.get("terminate-config-primitive"),
3868 vca_deployed
.get("ee_descriptor_id"),
3870 vdu_id
= vca_deployed
.get("vdu_id")
3871 vdu_count_index
= vca_deployed
.get("vdu_count_index")
3872 vdu_name
= vca_deployed
.get("vdu_name")
3873 vnf_index
= vca_deployed
.get("member-vnf-index")
3874 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
3875 for seq
in terminate_primitives
:
3876 # For each sequence in list, get primitive and call _ns_execute_primitive()
3877 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
3878 vnf_index
, seq
.get("name")
3880 self
.logger
.debug(logging_text
+ step
)
3881 # Create the primitive for each sequence, i.e. "primitive": "touch"
3882 primitive
= seq
.get("name")
3883 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
3888 self
._add
_suboperation
(
3895 mapped_primitive_params
,
3897 # Sub-operations: Call _ns_execute_primitive() instead of action()
3899 result
, result_detail
= await self
._ns
_execute
_primitive
(
3900 vca_deployed
["ee_id"],
3902 mapped_primitive_params
,
3906 except LcmException
:
3907 # this happens when VCA is not deployed. In this case it is not needed to terminate
3909 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
3910 if result
not in result_ok
:
3912 "terminate_primitive {} for vnf_member_index={} fails with "
3913 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
3915 # set that this VCA do not need terminated
3916 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
3920 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
3923 if vca_deployed
.get("prometheus_jobs") and self
.prometheus
:
3924 await self
.prometheus
.update(remove_jobs
=vca_deployed
["prometheus_jobs"])
3927 await self
.vca_map
[vca_type
].delete_execution_environment(
3928 vca_deployed
["ee_id"],
3929 scaling_in
=scaling_in
,
3934 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
3935 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
3936 namespace
= "." + db_nsr
["_id"]
3938 await self
.n2vc
.delete_namespace(
3939 namespace
=namespace
,
3940 total_timeout
=self
.timeout_charm_delete
,
3943 except N2VCNotFound
: # already deleted. Skip
3945 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
3947 async def _terminate_RO(
3948 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
3951 Terminates a deployment from RO
3952 :param logging_text:
3953 :param nsr_deployed: db_nsr._admin.deployed
3956 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
3957 this method will update only the index 2, but it will write on database the concatenated content of the list
3962 ro_nsr_id
= ro_delete_action
= None
3963 if nsr_deployed
and nsr_deployed
.get("RO"):
3964 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
3965 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
3968 stage
[2] = "Deleting ns from VIM."
3969 db_nsr_update
["detailed-status"] = " ".join(stage
)
3970 self
._write
_op
_status
(nslcmop_id
, stage
)
3971 self
.logger
.debug(logging_text
+ stage
[2])
3972 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3973 self
._write
_op
_status
(nslcmop_id
, stage
)
3974 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
3975 ro_delete_action
= desc
["action_id"]
3977 "_admin.deployed.RO.nsr_delete_action_id"
3978 ] = ro_delete_action
3979 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
3980 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
3981 if ro_delete_action
:
3982 # wait until NS is deleted from VIM
3983 stage
[2] = "Waiting ns deleted from VIM."
3984 detailed_status_old
= None
3988 + " RO_id={} ro_delete_action={}".format(
3989 ro_nsr_id
, ro_delete_action
3992 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3993 self
._write
_op
_status
(nslcmop_id
, stage
)
3995 delete_timeout
= 20 * 60 # 20 minutes
3996 while delete_timeout
> 0:
3997 desc
= await self
.RO
.show(
3999 item_id_name
=ro_nsr_id
,
4000 extra_item
="action",
4001 extra_item_id
=ro_delete_action
,
4005 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
4007 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
4008 if ns_status
== "ERROR":
4009 raise ROclient
.ROClientException(ns_status_info
)
4010 elif ns_status
== "BUILD":
4011 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
4012 elif ns_status
== "ACTIVE":
4013 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4014 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4019 ), "ROclient.check_action_status returns unknown {}".format(
4022 if stage
[2] != detailed_status_old
:
4023 detailed_status_old
= stage
[2]
4024 db_nsr_update
["detailed-status"] = " ".join(stage
)
4025 self
._write
_op
_status
(nslcmop_id
, stage
)
4026 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4027 await asyncio
.sleep(5, loop
=self
.loop
)
4029 else: # delete_timeout <= 0:
4030 raise ROclient
.ROClientException(
4031 "Timeout waiting ns deleted from VIM"
4034 except Exception as e
:
4035 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4037 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4039 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4040 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4041 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4043 logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
)
4046 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4048 failed_detail
.append("delete conflict: {}".format(e
))
4051 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
)
4054 failed_detail
.append("delete error: {}".format(e
))
4056 logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
)
4060 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
4061 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
4063 stage
[2] = "Deleting nsd from RO."
4064 db_nsr_update
["detailed-status"] = " ".join(stage
)
4065 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4066 self
._write
_op
_status
(nslcmop_id
, stage
)
4067 await self
.RO
.delete("nsd", ro_nsd_id
)
4069 logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
)
4071 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4072 except Exception as e
:
4074 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4076 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4078 logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
)
4081 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4083 failed_detail
.append(
4084 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
)
4086 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4088 failed_detail
.append(
4089 "ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
)
4091 self
.logger
.error(logging_text
+ failed_detail
[-1])
4093 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
4094 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
4095 if not vnf_deployed
or not vnf_deployed
["id"]:
4098 ro_vnfd_id
= vnf_deployed
["id"]
4101 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4102 vnf_deployed
["member-vnf-index"], ro_vnfd_id
4104 db_nsr_update
["detailed-status"] = " ".join(stage
)
4105 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4106 self
._write
_op
_status
(nslcmop_id
, stage
)
4107 await self
.RO
.delete("vnfd", ro_vnfd_id
)
4109 logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
)
4111 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
4112 except Exception as e
:
4114 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4117 "_admin.deployed.RO.vnfd.{}.id".format(index
)
4121 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
)
4124 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4126 failed_detail
.append(
4127 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
)
4129 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4131 failed_detail
.append(
4132 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
)
4134 self
.logger
.error(logging_text
+ failed_detail
[-1])
4137 stage
[2] = "Error deleting from VIM"
4139 stage
[2] = "Deleted from VIM"
4140 db_nsr_update
["detailed-status"] = " ".join(stage
)
4141 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4142 self
._write
_op
_status
(nslcmop_id
, stage
)
4145 raise LcmException("; ".join(failed_detail
))
4147 async def terminate(self
, nsr_id
, nslcmop_id
):
4148 # Try to lock HA task here
4149 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4150 if not task_is_locked_by_me
:
4153 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4154 self
.logger
.debug(logging_text
+ "Enter")
4155 timeout_ns_terminate
= self
.timeout_ns_terminate
4158 operation_params
= None
4160 error_list
= [] # annotates all failed error messages
4161 db_nslcmop_update
= {}
4162 autoremove
= False # autoremove after terminated
4163 tasks_dict_info
= {}
4166 "Stage 1/3: Preparing task.",
4167 "Waiting for previous operations to terminate.",
4170 # ^ contains [stage, step, VIM-status]
4172 # wait for any previous tasks in process
4173 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4175 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4176 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4177 operation_params
= db_nslcmop
.get("operationParams") or {}
4178 if operation_params
.get("timeout_ns_terminate"):
4179 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4180 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4181 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4183 db_nsr_update
["operational-status"] = "terminating"
4184 db_nsr_update
["config-status"] = "terminating"
4185 self
._write
_ns
_status
(
4187 ns_state
="TERMINATING",
4188 current_operation
="TERMINATING",
4189 current_operation_id
=nslcmop_id
,
4190 other_update
=db_nsr_update
,
4192 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4193 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4194 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4197 stage
[1] = "Getting vnf descriptors from db."
4198 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4200 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4202 db_vnfds_from_id
= {}
4203 db_vnfds_from_member_index
= {}
4205 for vnfr
in db_vnfrs_list
:
4206 vnfd_id
= vnfr
["vnfd-id"]
4207 if vnfd_id
not in db_vnfds_from_id
:
4208 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4209 db_vnfds_from_id
[vnfd_id
] = vnfd
4210 db_vnfds_from_member_index
[
4211 vnfr
["member-vnf-index-ref"]
4212 ] = db_vnfds_from_id
[vnfd_id
]
4214 # Destroy individual execution environments when there are terminating primitives.
4215 # Rest of EE will be deleted at once
4216 # TODO - check before calling _destroy_N2VC
4217 # if not operation_params.get("skip_terminate_primitives"):#
4218 # or not vca.get("needed_terminate"):
4219 stage
[0] = "Stage 2/3 execute terminating primitives."
4220 self
.logger
.debug(logging_text
+ stage
[0])
4221 stage
[1] = "Looking execution environment that needs terminate."
4222 self
.logger
.debug(logging_text
+ stage
[1])
4224 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4225 config_descriptor
= None
4226 vca_member_vnf_index
= vca
.get("member-vnf-index")
4227 vca_id
= self
.get_vca_id(
4228 db_vnfrs_dict
.get(vca_member_vnf_index
)
4229 if vca_member_vnf_index
4233 if not vca
or not vca
.get("ee_id"):
4235 if not vca
.get("member-vnf-index"):
4237 config_descriptor
= db_nsr
.get("ns-configuration")
4238 elif vca
.get("vdu_id"):
4239 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4240 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4241 elif vca
.get("kdu_name"):
4242 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4243 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4245 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4246 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4247 vca_type
= vca
.get("type")
4248 exec_terminate_primitives
= not operation_params
.get(
4249 "skip_terminate_primitives"
4250 ) and vca
.get("needed_terminate")
4251 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4252 # pending native charms
4254 True if vca_type
in ("helm", "helm-v3", "native_charm") else False
4256 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4257 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4258 task
= asyncio
.ensure_future(
4266 exec_terminate_primitives
,
4270 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4272 # wait for pending tasks of terminate primitives
4276 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4278 error_list
= await self
._wait
_for
_tasks
(
4281 min(self
.timeout_charm_delete
, timeout_ns_terminate
),
4285 tasks_dict_info
.clear()
4287 return # raise LcmException("; ".join(error_list))
4289 # remove All execution environments at once
4290 stage
[0] = "Stage 3/3 delete all."
4292 if nsr_deployed
.get("VCA"):
4293 stage
[1] = "Deleting all execution environments."
4294 self
.logger
.debug(logging_text
+ stage
[1])
4295 vca_id
= self
.get_vca_id({}, db_nsr
)
4296 task_delete_ee
= asyncio
.ensure_future(
4298 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4299 timeout
=self
.timeout_charm_delete
,
4302 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4303 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4305 # Delete from k8scluster
4306 stage
[1] = "Deleting KDUs."
4307 self
.logger
.debug(logging_text
+ stage
[1])
4308 # print(nsr_deployed)
4309 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4310 if not kdu
or not kdu
.get("kdu-instance"):
4312 kdu_instance
= kdu
.get("kdu-instance")
4313 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4314 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4315 vca_id
= self
.get_vca_id({}, db_nsr
)
4316 task_delete_kdu_instance
= asyncio
.ensure_future(
4317 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4318 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4319 kdu_instance
=kdu_instance
,
4326 + "Unknown k8s deployment type {}".format(
4327 kdu
.get("k8scluster-type")
4332 task_delete_kdu_instance
4333 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4336 stage
[1] = "Deleting ns from VIM."
4338 task_delete_ro
= asyncio
.ensure_future(
4339 self
._terminate
_ng
_ro
(
4340 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4344 task_delete_ro
= asyncio
.ensure_future(
4346 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4349 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4351 # rest of staff will be done at finally
4354 ROclient
.ROClientException
,
4359 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4361 except asyncio
.CancelledError
:
4363 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4365 exc
= "Operation was cancelled"
4366 except Exception as e
:
4367 exc
= traceback
.format_exc()
4368 self
.logger
.critical(
4369 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4374 error_list
.append(str(exc
))
4376 # wait for pending tasks
4378 stage
[1] = "Waiting for terminate pending tasks."
4379 self
.logger
.debug(logging_text
+ stage
[1])
4380 error_list
+= await self
._wait
_for
_tasks
(
4383 timeout_ns_terminate
,
4387 stage
[1] = stage
[2] = ""
4388 except asyncio
.CancelledError
:
4389 error_list
.append("Cancelled")
4390 # TODO cancell all tasks
4391 except Exception as exc
:
4392 error_list
.append(str(exc
))
4393 # update status at database
4395 error_detail
= "; ".join(error_list
)
4396 # self.logger.error(logging_text + error_detail)
4397 error_description_nslcmop
= "{} Detail: {}".format(
4398 stage
[0], error_detail
4400 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4401 nslcmop_id
, stage
[0]
4404 db_nsr_update
["operational-status"] = "failed"
4405 db_nsr_update
["detailed-status"] = (
4406 error_description_nsr
+ " Detail: " + error_detail
4408 db_nslcmop_update
["detailed-status"] = error_detail
4409 nslcmop_operation_state
= "FAILED"
4413 error_description_nsr
= error_description_nslcmop
= None
4414 ns_state
= "NOT_INSTANTIATED"
4415 db_nsr_update
["operational-status"] = "terminated"
4416 db_nsr_update
["detailed-status"] = "Done"
4417 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4418 db_nslcmop_update
["detailed-status"] = "Done"
4419 nslcmop_operation_state
= "COMPLETED"
4422 self
._write
_ns
_status
(
4425 current_operation
="IDLE",
4426 current_operation_id
=None,
4427 error_description
=error_description_nsr
,
4428 error_detail
=error_detail
,
4429 other_update
=db_nsr_update
,
4431 self
._write
_op
_status
(
4434 error_message
=error_description_nslcmop
,
4435 operation_state
=nslcmop_operation_state
,
4436 other_update
=db_nslcmop_update
,
4438 if ns_state
== "NOT_INSTANTIATED":
4442 {"nsr-id-ref": nsr_id
},
4443 {"_admin.nsState": "NOT_INSTANTIATED"},
4445 except DbException
as e
:
4448 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4452 if operation_params
:
4453 autoremove
= operation_params
.get("autoremove", False)
4454 if nslcmop_operation_state
:
4456 await self
.msg
.aiowrite(
4461 "nslcmop_id": nslcmop_id
,
4462 "operationState": nslcmop_operation_state
,
4463 "autoremove": autoremove
,
4467 except Exception as e
:
4469 logging_text
+ "kafka_write notification Exception {}".format(e
)
4472 self
.logger
.debug(logging_text
+ "Exit")
4473 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4475 async def _wait_for_tasks(
4476 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4479 error_detail_list
= []
4481 pending_tasks
= list(created_tasks_info
.keys())
4482 num_tasks
= len(pending_tasks
)
4484 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4485 self
._write
_op
_status
(nslcmop_id
, stage
)
4486 while pending_tasks
:
4488 _timeout
= timeout
+ time_start
- time()
4489 done
, pending_tasks
= await asyncio
.wait(
4490 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4492 num_done
+= len(done
)
4493 if not done
: # Timeout
4494 for task
in pending_tasks
:
4495 new_error
= created_tasks_info
[task
] + ": Timeout"
4496 error_detail_list
.append(new_error
)
4497 error_list
.append(new_error
)
4500 if task
.cancelled():
4503 exc
= task
.exception()
4505 if isinstance(exc
, asyncio
.TimeoutError
):
4507 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4508 error_list
.append(created_tasks_info
[task
])
4509 error_detail_list
.append(new_error
)
4516 ROclient
.ROClientException
,
4522 self
.logger
.error(logging_text
+ new_error
)
4524 exc_traceback
= "".join(
4525 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4529 + created_tasks_info
[task
]
4535 logging_text
+ created_tasks_info
[task
] + ": Done"
4537 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4539 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4540 if nsr_id
: # update also nsr
4545 "errorDescription": "Error at: " + ", ".join(error_list
),
4546 "errorDetail": ". ".join(error_detail_list
),
4549 self
._write
_op
_status
(nslcmop_id
, stage
)
4550 return error_detail_list
4553 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4555 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4556 The default-value is used. If it is between < > it look for a value at instantiation_params
4557 :param primitive_desc: portion of VNFD/NSD that describes primitive
4558 :param params: Params provided by user
4559 :param instantiation_params: Instantiation params provided by user
4560 :return: a dictionary with the calculated params
4562 calculated_params
= {}
4563 for parameter
in primitive_desc
.get("parameter", ()):
4564 param_name
= parameter
["name"]
4565 if param_name
in params
:
4566 calculated_params
[param_name
] = params
[param_name
]
4567 elif "default-value" in parameter
or "value" in parameter
:
4568 if "value" in parameter
:
4569 calculated_params
[param_name
] = parameter
["value"]
4571 calculated_params
[param_name
] = parameter
["default-value"]
4573 isinstance(calculated_params
[param_name
], str)
4574 and calculated_params
[param_name
].startswith("<")
4575 and calculated_params
[param_name
].endswith(">")
4577 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4578 calculated_params
[param_name
] = instantiation_params
[
4579 calculated_params
[param_name
][1:-1]
4583 "Parameter {} needed to execute primitive {} not provided".format(
4584 calculated_params
[param_name
], primitive_desc
["name"]
4589 "Parameter {} needed to execute primitive {} not provided".format(
4590 param_name
, primitive_desc
["name"]
4594 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4595 calculated_params
[param_name
] = yaml
.safe_dump(
4596 calculated_params
[param_name
], default_flow_style
=True, width
=256
4598 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4600 ].startswith("!!yaml "):
4601 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4602 if parameter
.get("data-type") == "INTEGER":
4604 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4605 except ValueError: # error converting string to int
4607 "Parameter {} of primitive {} must be integer".format(
4608 param_name
, primitive_desc
["name"]
4611 elif parameter
.get("data-type") == "BOOLEAN":
4612 calculated_params
[param_name
] = not (
4613 (str(calculated_params
[param_name
])).lower() == "false"
4616 # add always ns_config_info if primitive name is config
4617 if primitive_desc
["name"] == "config":
4618 if "ns_config_info" in instantiation_params
:
4619 calculated_params
["ns_config_info"] = instantiation_params
[
4622 return calculated_params
4624 def _look_for_deployed_vca(
4631 ee_descriptor_id
=None,
4633 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4634 for vca
in deployed_vca
:
4637 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4640 vdu_count_index
is not None
4641 and vdu_count_index
!= vca
["vdu_count_index"]
4644 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4646 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4650 # vca_deployed not found
4652 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4653 " is not deployed".format(
4662 ee_id
= vca
.get("ee_id")
4664 "type", "lxc_proxy_charm"
4665 ) # default value for backward compatibility - proxy charm
4668 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4669 "execution environment".format(
4670 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
4673 return ee_id
, vca_type
4675 async def _ns_execute_primitive(
4681 retries_interval
=30,
4688 if primitive
== "config":
4689 primitive_params
= {"params": primitive_params
}
4691 vca_type
= vca_type
or "lxc_proxy_charm"
4695 output
= await asyncio
.wait_for(
4696 self
.vca_map
[vca_type
].exec_primitive(
4698 primitive_name
=primitive
,
4699 params_dict
=primitive_params
,
4700 progress_timeout
=self
.timeout_progress_primitive
,
4701 total_timeout
=self
.timeout_primitive
,
4706 timeout
=timeout
or self
.timeout_primitive
,
4710 except asyncio
.CancelledError
:
4712 except Exception as e
: # asyncio.TimeoutError
4713 if isinstance(e
, asyncio
.TimeoutError
):
4718 "Error executing action {} on {} -> {}".format(
4723 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
4725 return "FAILED", str(e
)
4727 return "COMPLETED", output
4729 except (LcmException
, asyncio
.CancelledError
):
4731 except Exception as e
:
4732 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
4734 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
4736 Updating the vca_status with latest juju information in nsrs record
4737 :param: nsr_id: Id of the nsr
4738 :param: nslcmop_id: Id of the nslcmop
4742 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
4743 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4744 vca_id
= self
.get_vca_id({}, db_nsr
)
4745 if db_nsr
["_admin"]["deployed"]["K8s"]:
4746 for k8s_index
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
4747 cluster_uuid
, kdu_instance
= k8s
["k8scluster-uuid"], k8s
["kdu-instance"]
4748 await self
._on
_update
_k
8s
_db
(
4749 cluster_uuid
, kdu_instance
, filter={"_id": nsr_id
}, vca_id
=vca_id
4752 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
4753 table
, filter = "nsrs", {"_id": nsr_id
}
4754 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
4755 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
4757 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
4758 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
4760 async def action(self
, nsr_id
, nslcmop_id
):
4761 # Try to lock HA task here
4762 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4763 if not task_is_locked_by_me
:
4766 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
4767 self
.logger
.debug(logging_text
+ "Enter")
4768 # get all needed from database
4772 db_nslcmop_update
= {}
4773 nslcmop_operation_state
= None
4774 error_description_nslcmop
= None
4777 # wait for any previous tasks in process
4778 step
= "Waiting for previous operations to terminate"
4779 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4781 self
._write
_ns
_status
(
4784 current_operation
="RUNNING ACTION",
4785 current_operation_id
=nslcmop_id
,
4788 step
= "Getting information from database"
4789 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4790 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4792 nsr_deployed
= db_nsr
["_admin"].get("deployed")
4793 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
4794 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
4795 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
4796 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
4797 primitive
= db_nslcmop
["operationParams"]["primitive"]
4798 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
4799 timeout_ns_action
= db_nslcmop
["operationParams"].get(
4800 "timeout_ns_action", self
.timeout_primitive
4804 step
= "Getting vnfr from database"
4805 db_vnfr
= self
.db
.get_one(
4806 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
4808 step
= "Getting vnfd from database"
4809 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
4811 step
= "Getting nsd from database"
4812 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
4814 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
4815 # for backward compatibility
4816 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
4817 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
4818 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
4819 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4821 # look for primitive
4822 config_primitive_desc
= descriptor_configuration
= None
4824 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
4826 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
4828 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
4830 descriptor_configuration
= db_nsd
.get("ns-configuration")
4832 if descriptor_configuration
and descriptor_configuration
.get(
4835 for config_primitive
in descriptor_configuration
["config-primitive"]:
4836 if config_primitive
["name"] == primitive
:
4837 config_primitive_desc
= config_primitive
4840 if not config_primitive_desc
:
4841 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
4843 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
4847 primitive_name
= primitive
4848 ee_descriptor_id
= None
4850 primitive_name
= config_primitive_desc
.get(
4851 "execution-environment-primitive", primitive
4853 ee_descriptor_id
= config_primitive_desc
.get(
4854 "execution-environment-ref"
4860 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
4862 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
4865 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
4867 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
4869 desc_params
= parse_yaml_strings(
4870 db_vnfr
.get("additionalParamsForVnf")
4873 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
4874 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
4875 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
4877 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
4878 actions
.add(primitive
["name"])
4879 for primitive
in kdu_configuration
.get("config-primitive", []):
4880 actions
.add(primitive
["name"])
4881 kdu_action
= True if primitive_name
in actions
else False
4883 # TODO check if ns is in a proper status
4885 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
4887 # kdur and desc_params already set from before
4888 if primitive_params
:
4889 desc_params
.update(primitive_params
)
4890 # TODO Check if we will need something at vnf level
4891 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
4893 kdu_name
== kdu
["kdu-name"]
4894 and kdu
["member-vnf-index"] == vnf_index
4899 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
4902 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
4903 msg
= "unknown k8scluster-type '{}'".format(
4904 kdu
.get("k8scluster-type")
4906 raise LcmException(msg
)
4909 "collection": "nsrs",
4910 "filter": {"_id": nsr_id
},
4911 "path": "_admin.deployed.K8s.{}".format(index
),
4915 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
4917 step
= "Executing kdu {}".format(primitive_name
)
4918 if primitive_name
== "upgrade":
4919 if desc_params
.get("kdu_model"):
4920 kdu_model
= desc_params
.get("kdu_model")
4921 del desc_params
["kdu_model"]
4923 kdu_model
= kdu
.get("kdu-model")
4924 parts
= kdu_model
.split(sep
=":")
4926 kdu_model
= parts
[0]
4928 detailed_status
= await asyncio
.wait_for(
4929 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
4930 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4931 kdu_instance
=kdu
.get("kdu-instance"),
4933 kdu_model
=kdu_model
,
4936 timeout
=timeout_ns_action
,
4938 timeout
=timeout_ns_action
+ 10,
4941 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
4943 elif primitive_name
== "rollback":
4944 detailed_status
= await asyncio
.wait_for(
4945 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
4946 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4947 kdu_instance
=kdu
.get("kdu-instance"),
4950 timeout
=timeout_ns_action
,
4952 elif primitive_name
== "status":
4953 detailed_status
= await asyncio
.wait_for(
4954 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
4955 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4956 kdu_instance
=kdu
.get("kdu-instance"),
4959 timeout
=timeout_ns_action
,
4962 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
4963 kdu
["kdu-name"], nsr_id
4965 params
= self
._map
_primitive
_params
(
4966 config_primitive_desc
, primitive_params
, desc_params
4969 detailed_status
= await asyncio
.wait_for(
4970 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
4971 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4972 kdu_instance
=kdu_instance
,
4973 primitive_name
=primitive_name
,
4976 timeout
=timeout_ns_action
,
4979 timeout
=timeout_ns_action
,
4983 nslcmop_operation_state
= "COMPLETED"
4985 detailed_status
= ""
4986 nslcmop_operation_state
= "FAILED"
4988 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
4989 nsr_deployed
["VCA"],
4990 member_vnf_index
=vnf_index
,
4992 vdu_count_index
=vdu_count_index
,
4993 ee_descriptor_id
=ee_descriptor_id
,
4995 for vca_index
, vca_deployed
in enumerate(
4996 db_nsr
["_admin"]["deployed"]["VCA"]
4998 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5000 "collection": "nsrs",
5001 "filter": {"_id": nsr_id
},
5002 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
5006 nslcmop_operation_state
,
5008 ) = await self
._ns
_execute
_primitive
(
5010 primitive
=primitive_name
,
5011 primitive_params
=self
._map
_primitive
_params
(
5012 config_primitive_desc
, primitive_params
, desc_params
5014 timeout
=timeout_ns_action
,
5020 db_nslcmop_update
["detailed-status"] = detailed_status
5021 error_description_nslcmop
= (
5022 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
5026 + " task Done with result {} {}".format(
5027 nslcmop_operation_state
, detailed_status
5030 return # database update is called inside finally
5032 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5033 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5035 except asyncio
.CancelledError
:
5037 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5039 exc
= "Operation was cancelled"
5040 except asyncio
.TimeoutError
:
5041 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5043 except Exception as e
:
5044 exc
= traceback
.format_exc()
5045 self
.logger
.critical(
5046 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5055 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5056 nslcmop_operation_state
= "FAILED"
5058 self
._write
_ns
_status
(
5062 ], # TODO check if degraded. For the moment use previous status
5063 current_operation
="IDLE",
5064 current_operation_id
=None,
5065 # error_description=error_description_nsr,
5066 # error_detail=error_detail,
5067 other_update
=db_nsr_update
,
5070 self
._write
_op
_status
(
5073 error_message
=error_description_nslcmop
,
5074 operation_state
=nslcmop_operation_state
,
5075 other_update
=db_nslcmop_update
,
5078 if nslcmop_operation_state
:
5080 await self
.msg
.aiowrite(
5085 "nslcmop_id": nslcmop_id
,
5086 "operationState": nslcmop_operation_state
,
5090 except Exception as e
:
5092 logging_text
+ "kafka_write notification Exception {}".format(e
)
5094 self
.logger
.debug(logging_text
+ "Exit")
5095 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5096 return nslcmop_operation_state
, detailed_status
5098 async def scale(self
, nsr_id
, nslcmop_id
):
5099 # Try to lock HA task here
5100 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5101 if not task_is_locked_by_me
:
5104 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
5105 stage
= ["", "", ""]
5106 tasks_dict_info
= {}
5107 # ^ stage, step, VIM progress
5108 self
.logger
.debug(logging_text
+ "Enter")
5109 # get all needed from database
5111 db_nslcmop_update
= {}
5114 # in case of error, indicates what part of scale was failed to put nsr at error status
5115 scale_process
= None
5116 old_operational_status
= ""
5117 old_config_status
= ""
5120 # wait for any previous tasks in process
5121 step
= "Waiting for previous operations to terminate"
5122 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5123 self
._write
_ns
_status
(
5126 current_operation
="SCALING",
5127 current_operation_id
=nslcmop_id
,
5130 step
= "Getting nslcmop from database"
5132 step
+ " after having waited for previous tasks to be completed"
5134 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5136 step
= "Getting nsr from database"
5137 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5138 old_operational_status
= db_nsr
["operational-status"]
5139 old_config_status
= db_nsr
["config-status"]
5141 step
= "Parsing scaling parameters"
5142 db_nsr_update
["operational-status"] = "scaling"
5143 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5144 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5146 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
5148 ]["member-vnf-index"]
5149 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
5151 ]["scaling-group-descriptor"]
5152 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
5153 # for backward compatibility
5154 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5155 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5156 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5157 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5159 step
= "Getting vnfr from database"
5160 db_vnfr
= self
.db
.get_one(
5161 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
5164 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5166 step
= "Getting vnfd from database"
5167 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
5169 base_folder
= db_vnfd
["_admin"]["storage"]
5171 step
= "Getting scaling-group-descriptor"
5172 scaling_descriptor
= find_in_list(
5173 get_scaling_aspect(db_vnfd
),
5174 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
5176 if not scaling_descriptor
:
5178 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
5179 "at vnfd:scaling-group-descriptor".format(scaling_group
)
5182 step
= "Sending scale order to VIM"
5183 # TODO check if ns is in a proper status
5185 if not db_nsr
["_admin"].get("scaling-group"):
5190 "_admin.scaling-group": [
5191 {"name": scaling_group
, "nb-scale-op": 0}
5195 admin_scale_index
= 0
5197 for admin_scale_index
, admin_scale_info
in enumerate(
5198 db_nsr
["_admin"]["scaling-group"]
5200 if admin_scale_info
["name"] == scaling_group
:
5201 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
5203 else: # not found, set index one plus last element and add new entry with the name
5204 admin_scale_index
+= 1
5206 "_admin.scaling-group.{}.name".format(admin_scale_index
)
5209 vca_scaling_info
= []
5210 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
5211 if scaling_type
== "SCALE_OUT":
5212 if "aspect-delta-details" not in scaling_descriptor
:
5214 "Aspect delta details not fount in scaling descriptor {}".format(
5215 scaling_descriptor
["name"]
5218 # count if max-instance-count is reached
5219 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
5221 scaling_info
["scaling_direction"] = "OUT"
5222 scaling_info
["vdu-create"] = {}
5223 scaling_info
["kdu-create"] = {}
5224 for delta
in deltas
:
5225 for vdu_delta
in delta
.get("vdu-delta", {}):
5226 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
5227 # vdu_index also provides the number of instance of the targeted vdu
5228 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
5229 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
5233 additional_params
= (
5234 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
5237 cloud_init_list
= []
5239 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
5240 max_instance_count
= 10
5241 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
5242 max_instance_count
= vdu_profile
.get(
5243 "max-number-of-instances", 10
5246 default_instance_num
= get_number_of_instances(
5249 instances_number
= vdu_delta
.get("number-of-instances", 1)
5250 nb_scale_op
+= instances_number
5252 new_instance_count
= nb_scale_op
+ default_instance_num
5253 # Control if new count is over max and vdu count is less than max.
5254 # Then assign new instance count
5255 if new_instance_count
> max_instance_count
> vdu_count
:
5256 instances_number
= new_instance_count
- max_instance_count
5258 instances_number
= instances_number
5260 if new_instance_count
> max_instance_count
:
5262 "reached the limit of {} (max-instance-count) "
5263 "scaling-out operations for the "
5264 "scaling-group-descriptor '{}'".format(
5265 nb_scale_op
, scaling_group
5268 for x
in range(vdu_delta
.get("number-of-instances", 1)):
5270 # TODO Information of its own ip is not available because db_vnfr is not updated.
5271 additional_params
["OSM"] = get_osm_params(
5272 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
5274 cloud_init_list
.append(
5275 self
._parse
_cloud
_init
(
5282 vca_scaling_info
.append(
5284 "osm_vdu_id": vdu_delta
["id"],
5285 "member-vnf-index": vnf_index
,
5287 "vdu_index": vdu_index
+ x
,
5290 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
5291 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
5292 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
5293 kdu_name
= kdu_profile
["kdu-name"]
5294 resource_name
= kdu_profile
["resource-name"]
5296 # Might have different kdus in the same delta
5297 # Should have list for each kdu
5298 if not scaling_info
["kdu-create"].get(kdu_name
, None):
5299 scaling_info
["kdu-create"][kdu_name
] = []
5301 kdur
= get_kdur(db_vnfr
, kdu_name
)
5302 if kdur
.get("helm-chart"):
5303 k8s_cluster_type
= "helm-chart-v3"
5304 self
.logger
.debug("kdur: {}".format(kdur
))
5306 kdur
.get("helm-version")
5307 and kdur
.get("helm-version") == "v2"
5309 k8s_cluster_type
= "helm-chart"
5310 raise NotImplementedError
5311 elif kdur
.get("juju-bundle"):
5312 k8s_cluster_type
= "juju-bundle"
5315 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5316 "juju-bundle. Maybe an old NBI version is running".format(
5317 db_vnfr
["member-vnf-index-ref"], kdu_name
5321 max_instance_count
= 10
5322 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
5323 max_instance_count
= kdu_profile
.get(
5324 "max-number-of-instances", 10
5327 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
5328 deployed_kdu
, _
= get_deployed_kdu(
5329 nsr_deployed
, kdu_name
, vnf_index
5331 if deployed_kdu
is None:
5333 "KDU '{}' for vnf '{}' not deployed".format(
5337 kdu_instance
= deployed_kdu
.get("kdu-instance")
5338 instance_num
= await self
.k8scluster_map
[
5340 ].get_scale_count(resource_name
, kdu_instance
, vca_id
=vca_id
)
5341 kdu_replica_count
= instance_num
+ kdu_delta
.get(
5342 "number-of-instances", 1
5345 # Control if new count is over max and instance_num is less than max.
5346 # Then assign max instance number to kdu replica count
5347 if kdu_replica_count
> max_instance_count
> instance_num
:
5348 kdu_replica_count
= max_instance_count
5349 if kdu_replica_count
> max_instance_count
:
5351 "reached the limit of {} (max-instance-count) "
5352 "scaling-out operations for the "
5353 "scaling-group-descriptor '{}'".format(
5354 instance_num
, scaling_group
5358 for x
in range(kdu_delta
.get("number-of-instances", 1)):
5359 vca_scaling_info
.append(
5361 "osm_kdu_id": kdu_name
,
5362 "member-vnf-index": vnf_index
,
5364 "kdu_index": instance_num
+ x
- 1,
5367 scaling_info
["kdu-create"][kdu_name
].append(
5369 "member-vnf-index": vnf_index
,
5371 "k8s-cluster-type": k8s_cluster_type
,
5372 "resource-name": resource_name
,
5373 "scale": kdu_replica_count
,
5376 elif scaling_type
== "SCALE_IN":
5377 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
5379 scaling_info
["scaling_direction"] = "IN"
5380 scaling_info
["vdu-delete"] = {}
5381 scaling_info
["kdu-delete"] = {}
5383 for delta
in deltas
:
5384 for vdu_delta
in delta
.get("vdu-delta", {}):
5385 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
5386 min_instance_count
= 0
5387 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
5388 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
5389 min_instance_count
= vdu_profile
["min-number-of-instances"]
5391 default_instance_num
= get_number_of_instances(
5392 db_vnfd
, vdu_delta
["id"]
5394 instance_num
= vdu_delta
.get("number-of-instances", 1)
5395 nb_scale_op
-= instance_num
5397 new_instance_count
= nb_scale_op
+ default_instance_num
5399 if new_instance_count
< min_instance_count
< vdu_count
:
5400 instances_number
= min_instance_count
- new_instance_count
5402 instances_number
= instance_num
5404 if new_instance_count
< min_instance_count
:
5406 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5407 "scaling-group-descriptor '{}'".format(
5408 nb_scale_op
, scaling_group
5411 for x
in range(vdu_delta
.get("number-of-instances", 1)):
5412 vca_scaling_info
.append(
5414 "osm_vdu_id": vdu_delta
["id"],
5415 "member-vnf-index": vnf_index
,
5417 "vdu_index": vdu_index
- 1 - x
,
5420 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
5421 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
5422 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
5423 kdu_name
= kdu_profile
["kdu-name"]
5424 resource_name
= kdu_profile
["resource-name"]
5426 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
5427 scaling_info
["kdu-delete"][kdu_name
] = []
5429 kdur
= get_kdur(db_vnfr
, kdu_name
)
5430 if kdur
.get("helm-chart"):
5431 k8s_cluster_type
= "helm-chart-v3"
5432 self
.logger
.debug("kdur: {}".format(kdur
))
5434 kdur
.get("helm-version")
5435 and kdur
.get("helm-version") == "v2"
5437 k8s_cluster_type
= "helm-chart"
5438 raise NotImplementedError
5439 elif kdur
.get("juju-bundle"):
5440 k8s_cluster_type
= "juju-bundle"
5443 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5444 "juju-bundle. Maybe an old NBI version is running".format(
5445 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
5449 min_instance_count
= 0
5450 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
5451 min_instance_count
= kdu_profile
["min-number-of-instances"]
5453 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
5454 deployed_kdu
, _
= get_deployed_kdu(
5455 nsr_deployed
, kdu_name
, vnf_index
5457 if deployed_kdu
is None:
5459 "KDU '{}' for vnf '{}' not deployed".format(
5463 kdu_instance
= deployed_kdu
.get("kdu-instance")
5464 instance_num
= await self
.k8scluster_map
[
5466 ].get_scale_count(resource_name
, kdu_instance
, vca_id
=vca_id
)
5467 kdu_replica_count
= instance_num
- kdu_delta
.get(
5468 "number-of-instances", 1
5471 if kdu_replica_count
< min_instance_count
< instance_num
:
5472 kdu_replica_count
= min_instance_count
5473 if kdu_replica_count
< min_instance_count
:
5475 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5476 "scaling-group-descriptor '{}'".format(
5477 instance_num
, scaling_group
5481 for x
in range(kdu_delta
.get("number-of-instances", 1)):
5482 vca_scaling_info
.append(
5484 "osm_kdu_id": kdu_name
,
5485 "member-vnf-index": vnf_index
,
5487 "kdu_index": instance_num
- x
- 1,
5490 scaling_info
["kdu-delete"][kdu_name
].append(
5492 "member-vnf-index": vnf_index
,
5494 "k8s-cluster-type": k8s_cluster_type
,
5495 "resource-name": resource_name
,
5496 "scale": kdu_replica_count
,
5500 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
5501 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
5502 if scaling_info
["scaling_direction"] == "IN":
5503 for vdur
in reversed(db_vnfr
["vdur"]):
5504 if vdu_delete
.get(vdur
["vdu-id-ref"]):
5505 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
5506 scaling_info
["vdu"].append(
5508 "name": vdur
.get("name") or vdur
.get("vdu-name"),
5509 "vdu_id": vdur
["vdu-id-ref"],
5513 for interface
in vdur
["interfaces"]:
5514 scaling_info
["vdu"][-1]["interface"].append(
5516 "name": interface
["name"],
5517 "ip_address": interface
["ip-address"],
5518 "mac_address": interface
.get("mac-address"),
5521 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
5524 step
= "Executing pre-scale vnf-config-primitive"
5525 if scaling_descriptor
.get("scaling-config-action"):
5526 for scaling_config_action
in scaling_descriptor
[
5527 "scaling-config-action"
5530 scaling_config_action
.get("trigger") == "pre-scale-in"
5531 and scaling_type
== "SCALE_IN"
5533 scaling_config_action
.get("trigger") == "pre-scale-out"
5534 and scaling_type
== "SCALE_OUT"
5536 vnf_config_primitive
= scaling_config_action
[
5537 "vnf-config-primitive-name-ref"
5539 step
= db_nslcmop_update
[
5541 ] = "executing pre-scale scaling-config-action '{}'".format(
5542 vnf_config_primitive
5545 # look for primitive
5546 for config_primitive
in (
5547 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
5548 ).get("config-primitive", ()):
5549 if config_primitive
["name"] == vnf_config_primitive
:
5553 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
5554 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
5555 "primitive".format(scaling_group
, vnf_config_primitive
)
5558 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
5559 if db_vnfr
.get("additionalParamsForVnf"):
5560 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
5562 scale_process
= "VCA"
5563 db_nsr_update
["config-status"] = "configuring pre-scaling"
5564 primitive_params
= self
._map
_primitive
_params
(
5565 config_primitive
, {}, vnfr_params
5568 # Pre-scale retry check: Check if this sub-operation has been executed before
5569 op_index
= self
._check
_or
_add
_scale
_suboperation
(
5572 vnf_config_primitive
,
5576 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
5577 # Skip sub-operation
5578 result
= "COMPLETED"
5579 result_detail
= "Done"
5582 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5583 vnf_config_primitive
, result
, result_detail
5587 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
5588 # New sub-operation: Get index of this sub-operation
5590 len(db_nslcmop
.get("_admin", {}).get("operations"))
5595 + "vnf_config_primitive={} New sub-operation".format(
5596 vnf_config_primitive
5600 # retry: Get registered params for this existing sub-operation
5601 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
5604 vnf_index
= op
.get("member_vnf_index")
5605 vnf_config_primitive
= op
.get("primitive")
5606 primitive_params
= op
.get("primitive_params")
5609 + "vnf_config_primitive={} Sub-operation retry".format(
5610 vnf_config_primitive
5613 # Execute the primitive, either with new (first-time) or registered (reintent) args
5614 ee_descriptor_id
= config_primitive
.get(
5615 "execution-environment-ref"
5617 primitive_name
= config_primitive
.get(
5618 "execution-environment-primitive", vnf_config_primitive
5620 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5621 nsr_deployed
["VCA"],
5622 member_vnf_index
=vnf_index
,
5624 vdu_count_index
=None,
5625 ee_descriptor_id
=ee_descriptor_id
,
5627 result
, result_detail
= await self
._ns
_execute
_primitive
(
5636 + "vnf_config_primitive={} Done with result {} {}".format(
5637 vnf_config_primitive
, result
, result_detail
5640 # Update operationState = COMPLETED | FAILED
5641 self
._update
_suboperation
_status
(
5642 db_nslcmop
, op_index
, result
, result_detail
5645 if result
== "FAILED":
5646 raise LcmException(result_detail
)
5647 db_nsr_update
["config-status"] = old_config_status
5648 scale_process
= None
5652 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
5655 "_admin.scaling-group.{}.time".format(admin_scale_index
)
5658 # SCALE-IN VCA - BEGIN
5659 if vca_scaling_info
:
5660 step
= db_nslcmop_update
[
5662 ] = "Deleting the execution environments"
5663 scale_process
= "VCA"
5664 for vca_info
in vca_scaling_info
:
5665 if vca_info
["type"] == "delete":
5666 member_vnf_index
= str(vca_info
["member-vnf-index"])
5668 logging_text
+ "vdu info: {}".format(vca_info
)
5670 if vca_info
.get("osm_vdu_id"):
5671 vdu_id
= vca_info
["osm_vdu_id"]
5672 vdu_index
= int(vca_info
["vdu_index"])
5675 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5676 member_vnf_index
, vdu_id
, vdu_index
5680 kdu_id
= vca_info
["osm_kdu_id"]
5683 ] = "Scaling member_vnf_index={}, kdu_id={}, vdu_index={} ".format(
5684 member_vnf_index
, kdu_id
, vdu_index
5686 stage
[2] = step
= "Scaling in VCA"
5687 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
5688 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
5689 config_update
= db_nsr
["configurationStatus"]
5690 for vca_index
, vca
in enumerate(vca_update
):
5692 (vca
or vca
.get("ee_id"))
5693 and vca
["member-vnf-index"] == member_vnf_index
5694 and vca
["vdu_count_index"] == vdu_index
5696 if vca
.get("vdu_id"):
5697 config_descriptor
= get_configuration(
5698 db_vnfd
, vca
.get("vdu_id")
5700 elif vca
.get("kdu_name"):
5701 config_descriptor
= get_configuration(
5702 db_vnfd
, vca
.get("kdu_name")
5705 config_descriptor
= get_configuration(
5706 db_vnfd
, db_vnfd
["id"]
5708 operation_params
= (
5709 db_nslcmop
.get("operationParams") or {}
5711 exec_terminate_primitives
= not operation_params
.get(
5712 "skip_terminate_primitives"
5713 ) and vca
.get("needed_terminate")
5714 task
= asyncio
.ensure_future(
5723 exec_primitives
=exec_terminate_primitives
,
5727 timeout
=self
.timeout_charm_delete
,
5730 tasks_dict_info
[task
] = "Terminating VCA {}".format(
5733 del vca_update
[vca_index
]
5734 del config_update
[vca_index
]
5735 # wait for pending tasks of terminate primitives
5739 + "Waiting for tasks {}".format(
5740 list(tasks_dict_info
.keys())
5743 error_list
= await self
._wait
_for
_tasks
(
5747 self
.timeout_charm_delete
, self
.timeout_ns_terminate
5752 tasks_dict_info
.clear()
5754 raise LcmException("; ".join(error_list
))
5756 db_vca_and_config_update
= {
5757 "_admin.deployed.VCA": vca_update
,
5758 "configurationStatus": config_update
,
5761 "nsrs", db_nsr
["_id"], db_vca_and_config_update
5763 scale_process
= None
5764 # SCALE-IN VCA - END
5767 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
5768 scale_process
= "RO"
5769 if self
.ro_config
.get("ng"):
5770 await self
._scale
_ng
_ro
(
5771 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
5773 scaling_info
.pop("vdu-create", None)
5774 scaling_info
.pop("vdu-delete", None)
5776 scale_process
= None
5780 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
5781 scale_process
= "KDU"
5782 await self
._scale
_kdu
(
5783 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
5785 scaling_info
.pop("kdu-create", None)
5786 scaling_info
.pop("kdu-delete", None)
5788 scale_process
= None
5792 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5794 # SCALE-UP VCA - BEGIN
5795 if vca_scaling_info
:
5796 step
= db_nslcmop_update
[
5798 ] = "Creating new execution environments"
5799 scale_process
= "VCA"
5800 for vca_info
in vca_scaling_info
:
5801 if vca_info
["type"] == "create":
5802 member_vnf_index
= str(vca_info
["member-vnf-index"])
5804 logging_text
+ "vdu info: {}".format(vca_info
)
5806 vnfd_id
= db_vnfr
["vnfd-ref"]
5807 if vca_info
.get("osm_vdu_id"):
5808 vdu_index
= int(vca_info
["vdu_index"])
5809 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
5810 if db_vnfr
.get("additionalParamsForVnf"):
5811 deploy_params
.update(
5813 db_vnfr
["additionalParamsForVnf"].copy()
5816 descriptor_config
= get_configuration(
5817 db_vnfd
, db_vnfd
["id"]
5819 if descriptor_config
:
5824 logging_text
=logging_text
5825 + "member_vnf_index={} ".format(member_vnf_index
),
5828 nslcmop_id
=nslcmop_id
,
5834 member_vnf_index
=member_vnf_index
,
5835 vdu_index
=vdu_index
,
5837 deploy_params
=deploy_params
,
5838 descriptor_config
=descriptor_config
,
5839 base_folder
=base_folder
,
5840 task_instantiation_info
=tasks_dict_info
,
5843 vdu_id
= vca_info
["osm_vdu_id"]
5844 vdur
= find_in_list(
5845 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
5847 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
5848 if vdur
.get("additionalParams"):
5849 deploy_params_vdu
= parse_yaml_strings(
5850 vdur
["additionalParams"]
5853 deploy_params_vdu
= deploy_params
5854 deploy_params_vdu
["OSM"] = get_osm_params(
5855 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
5857 if descriptor_config
:
5862 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5863 member_vnf_index
, vdu_id
, vdu_index
5865 stage
[2] = step
= "Scaling out VCA"
5866 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
5868 logging_text
=logging_text
5869 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5870 member_vnf_index
, vdu_id
, vdu_index
5874 nslcmop_id
=nslcmop_id
,
5880 member_vnf_index
=member_vnf_index
,
5881 vdu_index
=vdu_index
,
5883 deploy_params
=deploy_params_vdu
,
5884 descriptor_config
=descriptor_config
,
5885 base_folder
=base_folder
,
5886 task_instantiation_info
=tasks_dict_info
,
5890 kdu_name
= vca_info
["osm_kdu_id"]
5891 descriptor_config
= get_configuration(db_vnfd
, kdu_name
)
5892 if descriptor_config
:
5894 kdu_index
= int(vca_info
["kdu_index"])
5898 for x
in db_vnfr
["kdur"]
5899 if x
["kdu-name"] == kdu_name
5901 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
5902 if kdur
.get("additionalParams"):
5903 deploy_params_kdu
= parse_yaml_strings(
5904 kdur
["additionalParams"]
5908 logging_text
=logging_text
,
5911 nslcmop_id
=nslcmop_id
,
5917 member_vnf_index
=member_vnf_index
,
5918 vdu_index
=kdu_index
,
5920 deploy_params
=deploy_params_kdu
,
5921 descriptor_config
=descriptor_config
,
5922 base_folder
=base_folder
,
5923 task_instantiation_info
=tasks_dict_info
,
5926 # SCALE-UP VCA - END
5927 scale_process
= None
5930 # execute primitive service POST-SCALING
5931 step
= "Executing post-scale vnf-config-primitive"
5932 if scaling_descriptor
.get("scaling-config-action"):
5933 for scaling_config_action
in scaling_descriptor
[
5934 "scaling-config-action"
5937 scaling_config_action
.get("trigger") == "post-scale-in"
5938 and scaling_type
== "SCALE_IN"
5940 scaling_config_action
.get("trigger") == "post-scale-out"
5941 and scaling_type
== "SCALE_OUT"
5943 vnf_config_primitive
= scaling_config_action
[
5944 "vnf-config-primitive-name-ref"
5946 step
= db_nslcmop_update
[
5948 ] = "executing post-scale scaling-config-action '{}'".format(
5949 vnf_config_primitive
5952 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
5953 if db_vnfr
.get("additionalParamsForVnf"):
5954 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
5956 # look for primitive
5957 for config_primitive
in (
5958 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
5959 ).get("config-primitive", ()):
5960 if config_primitive
["name"] == vnf_config_primitive
:
5964 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
5965 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
5966 "config-primitive".format(
5967 scaling_group
, vnf_config_primitive
5970 scale_process
= "VCA"
5971 db_nsr_update
["config-status"] = "configuring post-scaling"
5972 primitive_params
= self
._map
_primitive
_params
(
5973 config_primitive
, {}, vnfr_params
5976 # Post-scale retry check: Check if this sub-operation has been executed before
5977 op_index
= self
._check
_or
_add
_scale
_suboperation
(
5980 vnf_config_primitive
,
5984 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
5985 # Skip sub-operation
5986 result
= "COMPLETED"
5987 result_detail
= "Done"
5990 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5991 vnf_config_primitive
, result
, result_detail
5995 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
5996 # New sub-operation: Get index of this sub-operation
5998 len(db_nslcmop
.get("_admin", {}).get("operations"))
6003 + "vnf_config_primitive={} New sub-operation".format(
6004 vnf_config_primitive
6008 # retry: Get registered params for this existing sub-operation
6009 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6012 vnf_index
= op
.get("member_vnf_index")
6013 vnf_config_primitive
= op
.get("primitive")
6014 primitive_params
= op
.get("primitive_params")
6017 + "vnf_config_primitive={} Sub-operation retry".format(
6018 vnf_config_primitive
6021 # Execute the primitive, either with new (first-time) or registered (reintent) args
6022 ee_descriptor_id
= config_primitive
.get(
6023 "execution-environment-ref"
6025 primitive_name
= config_primitive
.get(
6026 "execution-environment-primitive", vnf_config_primitive
6028 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6029 nsr_deployed
["VCA"],
6030 member_vnf_index
=vnf_index
,
6032 vdu_count_index
=None,
6033 ee_descriptor_id
=ee_descriptor_id
,
6035 result
, result_detail
= await self
._ns
_execute
_primitive
(
6044 + "vnf_config_primitive={} Done with result {} {}".format(
6045 vnf_config_primitive
, result
, result_detail
6048 # Update operationState = COMPLETED | FAILED
6049 self
._update
_suboperation
_status
(
6050 db_nslcmop
, op_index
, result
, result_detail
6053 if result
== "FAILED":
6054 raise LcmException(result_detail
)
6055 db_nsr_update
["config-status"] = old_config_status
6056 scale_process
= None
6061 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
6062 db_nsr_update
["operational-status"] = (
6064 if old_operational_status
== "failed"
6065 else old_operational_status
6067 db_nsr_update
["config-status"] = old_config_status
6070 ROclient
.ROClientException
,
6075 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
6077 except asyncio
.CancelledError
:
6079 logging_text
+ "Cancelled Exception while '{}'".format(step
)
6081 exc
= "Operation was cancelled"
6082 except Exception as e
:
6083 exc
= traceback
.format_exc()
6084 self
.logger
.critical(
6085 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6089 self
._write
_ns
_status
(
6092 current_operation
="IDLE",
6093 current_operation_id
=None,
6096 stage
[1] = "Waiting for instantiate pending tasks."
6097 self
.logger
.debug(logging_text
+ stage
[1])
6098 exc
= await self
._wait
_for
_tasks
(
6101 self
.timeout_ns_deploy
,
6109 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
6110 nslcmop_operation_state
= "FAILED"
6112 db_nsr_update
["operational-status"] = old_operational_status
6113 db_nsr_update
["config-status"] = old_config_status
6114 db_nsr_update
["detailed-status"] = ""
6116 if "VCA" in scale_process
:
6117 db_nsr_update
["config-status"] = "failed"
6118 if "RO" in scale_process
:
6119 db_nsr_update
["operational-status"] = "failed"
6122 ] = "FAILED scaling nslcmop={} {}: {}".format(
6123 nslcmop_id
, step
, exc
6126 error_description_nslcmop
= None
6127 nslcmop_operation_state
= "COMPLETED"
6128 db_nslcmop_update
["detailed-status"] = "Done"
6130 self
._write
_op
_status
(
6133 error_message
=error_description_nslcmop
,
6134 operation_state
=nslcmop_operation_state
,
6135 other_update
=db_nslcmop_update
,
6138 self
._write
_ns
_status
(
6141 current_operation
="IDLE",
6142 current_operation_id
=None,
6143 other_update
=db_nsr_update
,
6146 if nslcmop_operation_state
:
6150 "nslcmop_id": nslcmop_id
,
6151 "operationState": nslcmop_operation_state
,
6153 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
6154 except Exception as e
:
6156 logging_text
+ "kafka_write notification Exception {}".format(e
)
6158 self
.logger
.debug(logging_text
+ "Exit")
6159 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
6161 async def _scale_kdu(
6162 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6164 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
6165 for kdu_name
in _scaling_info
:
6166 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
6167 deployed_kdu
, index
= get_deployed_kdu(
6168 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
6170 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
6171 kdu_instance
= deployed_kdu
["kdu-instance"]
6172 scale
= int(kdu_scaling_info
["scale"])
6173 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
6176 "collection": "nsrs",
6177 "filter": {"_id": nsr_id
},
6178 "path": "_admin.deployed.K8s.{}".format(index
),
6181 step
= "scaling application {}".format(
6182 kdu_scaling_info
["resource-name"]
6184 self
.logger
.debug(logging_text
+ step
)
6186 if kdu_scaling_info
["type"] == "delete":
6187 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
6190 and kdu_config
.get("terminate-config-primitive")
6191 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
6193 terminate_config_primitive_list
= kdu_config
.get(
6194 "terminate-config-primitive"
6196 terminate_config_primitive_list
.sort(
6197 key
=lambda val
: int(val
["seq"])
6201 terminate_config_primitive
6202 ) in terminate_config_primitive_list
:
6203 primitive_params_
= self
._map
_primitive
_params
(
6204 terminate_config_primitive
, {}, {}
6206 step
= "execute terminate config primitive"
6207 self
.logger
.debug(logging_text
+ step
)
6208 await asyncio
.wait_for(
6209 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
6210 cluster_uuid
=cluster_uuid
,
6211 kdu_instance
=kdu_instance
,
6212 primitive_name
=terminate_config_primitive
["name"],
6213 params
=primitive_params_
,
6220 await asyncio
.wait_for(
6221 self
.k8scluster_map
[k8s_cluster_type
].scale(
6224 kdu_scaling_info
["resource-name"],
6227 timeout
=self
.timeout_vca_on_error
,
6230 if kdu_scaling_info
["type"] == "create":
6231 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
6234 and kdu_config
.get("initial-config-primitive")
6235 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
6237 initial_config_primitive_list
= kdu_config
.get(
6238 "initial-config-primitive"
6240 initial_config_primitive_list
.sort(
6241 key
=lambda val
: int(val
["seq"])
6244 for initial_config_primitive
in initial_config_primitive_list
:
6245 primitive_params_
= self
._map
_primitive
_params
(
6246 initial_config_primitive
, {}, {}
6248 step
= "execute initial config primitive"
6249 self
.logger
.debug(logging_text
+ step
)
6250 await asyncio
.wait_for(
6251 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
6252 cluster_uuid
=cluster_uuid
,
6253 kdu_instance
=kdu_instance
,
6254 primitive_name
=initial_config_primitive
["name"],
6255 params
=primitive_params_
,
6262 async def _scale_ng_ro(
6263 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
6265 nsr_id
= db_nslcmop
["nsInstanceId"]
6266 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
6269 # read from db: vnfd's for every vnf
6272 # for each vnf in ns, read vnfd
6273 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
6274 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
6275 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
6276 # if we haven't this vnfd, read it from db
6277 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
6279 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
6280 db_vnfds
.append(vnfd
)
6281 n2vc_key
= self
.n2vc
.get_public_key()
6282 n2vc_key_list
= [n2vc_key
]
6285 vdu_scaling_info
.get("vdu-create"),
6286 vdu_scaling_info
.get("vdu-delete"),
6289 # db_vnfr has been updated, update db_vnfrs to use it
6290 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
6291 await self
._instantiate
_ng
_ro
(
6301 start_deploy
=time(),
6302 timeout_ns_deploy
=self
.timeout_ns_deploy
,
6304 if vdu_scaling_info
.get("vdu-delete"):
6306 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
6309 async def add_prometheus_metrics(
6310 self
, ee_id
, artifact_path
, ee_config_descriptor
, vnfr_id
, nsr_id
, target_ip
6312 if not self
.prometheus
:
6314 # look if exist a file called 'prometheus*.j2' and
6315 artifact_content
= self
.fs
.dir_ls(artifact_path
)
6319 for f
in artifact_content
6320 if f
.startswith("prometheus") and f
.endswith(".j2")
6326 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
6330 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
6331 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
6333 vnfr_id
= vnfr_id
.replace("-", "")
6335 "JOB_NAME": vnfr_id
,
6336 "TARGET_IP": target_ip
,
6337 "EXPORTER_POD_IP": host_name
,
6338 "EXPORTER_POD_PORT": host_port
,
6340 job_list
= self
.prometheus
.parse_job(job_data
, variables
)
6341 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
6342 for job
in job_list
:
6344 not isinstance(job
.get("job_name"), str)
6345 or vnfr_id
not in job
["job_name"]
6347 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
6348 job
["nsr_id"] = nsr_id
6349 job_dict
= {jl
["job_name"]: jl
for jl
in job_list
}
6350 if await self
.prometheus
.update(job_dict
):
6351 return list(job_dict
.keys())
6353 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
6355 Get VCA Cloud and VCA Cloud Credentials for the VIM account
6357 :param: vim_account_id: VIM Account ID
6359 :return: (cloud_name, cloud_credential)
6361 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
6362 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
6364 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
6366 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
6368 :param: vim_account_id: VIM Account ID
6370 :return: (cloud_name, cloud_credential)
6372 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
6373 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")