1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
20 from typing
import Any
, Dict
, List
23 import logging
.handlers
34 from osm_lcm
import ROclient
35 from osm_lcm
.data_utils
.nsr
import (
38 get_deployed_vca_list
,
41 from osm_lcm
.data_utils
.vca
import (
50 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
51 from osm_lcm
.lcm_utils
import (
59 from osm_lcm
.data_utils
.nsd
import (
60 get_ns_configuration_relation_list
,
64 from osm_lcm
.data_utils
.vnfd
import (
70 get_ee_sorted_initial_config_primitive_list
,
71 get_ee_sorted_terminate_config_primitive_list
,
73 get_virtual_link_profiles
,
78 get_number_of_instances
,
80 get_kdu_resource_profile
,
82 from osm_lcm
.data_utils
.list_utils
import find_in_list
83 from osm_lcm
.data_utils
.vnfr
import get_osm_params
, get_vdur_index
, get_kdur
84 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
85 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
86 from n2vc
.definitions
import RelationEndpoint
87 from n2vc
.k8s_helm_conn
import K8sHelmConnector
88 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
89 from n2vc
.k8s_juju_conn
import K8sJujuConnector
91 from osm_common
.dbbase
import DbException
92 from osm_common
.fsbase
import FsException
94 from osm_lcm
.data_utils
.database
.database
import Database
95 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
97 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
98 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
100 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
101 from osm_lcm
.osm_config
import OsmConfigBuilder
102 from osm_lcm
.prometheus
import parse_job
104 from copy
import copy
, deepcopy
105 from time
import time
106 from uuid
import uuid4
108 from random
import randint
110 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
113 class NsLcm(LcmBase
):
114 timeout_vca_on_error
= (
116 ) # Time for charm from first time at blocked,error status to mark as failed
117 timeout_ns_deploy
= 2 * 3600 # default global timeout for deployment a ns
118 timeout_ns_terminate
= 1800 # default global timeout for un deployment a ns
119 timeout_charm_delete
= 10 * 60
120 timeout_primitive
= 30 * 60 # timeout for primitive execution
121 timeout_progress_primitive
= (
123 ) # timeout for some progress in a primitive execution
125 SUBOPERATION_STATUS_NOT_FOUND
= -1
126 SUBOPERATION_STATUS_NEW
= -2
127 SUBOPERATION_STATUS_SKIP
= -3
128 task_name_deploy_vca
= "Deploying VCA"
130 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
132 Init, Connect to database, filesystem storage, and messaging
133 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
136 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
138 self
.db
= Database().instance
.db
139 self
.fs
= Filesystem().instance
.fs
141 self
.lcm_tasks
= lcm_tasks
142 self
.timeout
= config
["timeout"]
143 self
.ro_config
= config
["ro_config"]
144 self
.ng_ro
= config
["ro_config"].get("ng")
145 self
.vca_config
= config
["VCA"].copy()
147 # create N2VC connector
148 self
.n2vc
= N2VCJujuConnector(
151 on_update_db
=self
._on
_update
_n
2vc
_db
,
156 self
.conn_helm_ee
= LCMHelmConn(
159 vca_config
=self
.vca_config
,
160 on_update_db
=self
._on
_update
_n
2vc
_db
,
163 self
.k8sclusterhelm2
= K8sHelmConnector(
164 kubectl_command
=self
.vca_config
.get("kubectlpath"),
165 helm_command
=self
.vca_config
.get("helmpath"),
172 self
.k8sclusterhelm3
= K8sHelm3Connector(
173 kubectl_command
=self
.vca_config
.get("kubectlpath"),
174 helm_command
=self
.vca_config
.get("helm3path"),
181 self
.k8sclusterjuju
= K8sJujuConnector(
182 kubectl_command
=self
.vca_config
.get("kubectlpath"),
183 juju_command
=self
.vca_config
.get("jujupath"),
186 on_update_db
=self
._on
_update
_k
8s
_db
,
191 self
.k8scluster_map
= {
192 "helm-chart": self
.k8sclusterhelm2
,
193 "helm-chart-v3": self
.k8sclusterhelm3
,
194 "chart": self
.k8sclusterhelm3
,
195 "juju-bundle": self
.k8sclusterjuju
,
196 "juju": self
.k8sclusterjuju
,
200 "lxc_proxy_charm": self
.n2vc
,
201 "native_charm": self
.n2vc
,
202 "k8s_proxy_charm": self
.n2vc
,
203 "helm": self
.conn_helm_ee
,
204 "helm-v3": self
.conn_helm_ee
,
208 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
)
211 def increment_ip_mac(ip_mac
, vm_index
=1):
212 if not isinstance(ip_mac
, str):
215 # try with ipv4 look for last dot
216 i
= ip_mac
.rfind(".")
219 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
220 # try with ipv6 or mac look for last colon. Operate in hex
221 i
= ip_mac
.rfind(":")
224 # format in hex, len can be 2 for mac or 4 for ipv6
225 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
226 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
232 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
234 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
237 # TODO filter RO descriptor fields...
241 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
242 db_dict
["deploymentStatus"] = ro_descriptor
243 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
245 except Exception as e
:
247 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id
, e
)
250 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
252 # remove last dot from path (if exists)
253 if path
.endswith("."):
256 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
257 # .format(table, filter, path, updated_data))
260 nsr_id
= filter.get("_id")
262 # read ns record from database
263 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
264 current_ns_status
= nsr
.get("nsState")
266 # get vca status for NS
267 status_dict
= await self
.n2vc
.get_status(
268 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
273 db_dict
["vcaStatus"] = status_dict
274 await self
.n2vc
.update_vca_status(db_dict
["vcaStatus"], vca_id
=vca_id
)
276 # update configurationStatus for this VCA
278 vca_index
= int(path
[path
.rfind(".") + 1 :])
281 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
283 vca_status
= vca_list
[vca_index
].get("status")
285 configuration_status_list
= nsr
.get("configurationStatus")
286 config_status
= configuration_status_list
[vca_index
].get("status")
288 if config_status
== "BROKEN" and vca_status
!= "failed":
289 db_dict
["configurationStatus"][vca_index
] = "READY"
290 elif config_status
!= "BROKEN" and vca_status
== "failed":
291 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
292 except Exception as e
:
293 # not update configurationStatus
294 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
296 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
297 # if nsState = 'DEGRADED' check if all is OK
299 if current_ns_status
in ("READY", "DEGRADED"):
300 error_description
= ""
302 if status_dict
.get("machines"):
303 for machine_id
in status_dict
.get("machines"):
304 machine
= status_dict
.get("machines").get(machine_id
)
305 # check machine agent-status
306 if machine
.get("agent-status"):
307 s
= machine
.get("agent-status").get("status")
310 error_description
+= (
311 "machine {} agent-status={} ; ".format(
315 # check machine instance status
316 if machine
.get("instance-status"):
317 s
= machine
.get("instance-status").get("status")
320 error_description
+= (
321 "machine {} instance-status={} ; ".format(
326 if status_dict
.get("applications"):
327 for app_id
in status_dict
.get("applications"):
328 app
= status_dict
.get("applications").get(app_id
)
329 # check application status
330 if app
.get("status"):
331 s
= app
.get("status").get("status")
334 error_description
+= (
335 "application {} status={} ; ".format(app_id
, s
)
338 if error_description
:
339 db_dict
["errorDescription"] = error_description
340 if current_ns_status
== "READY" and is_degraded
:
341 db_dict
["nsState"] = "DEGRADED"
342 if current_ns_status
== "DEGRADED" and not is_degraded
:
343 db_dict
["nsState"] = "READY"
346 self
.update_db_2("nsrs", nsr_id
, db_dict
)
348 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
350 except Exception as e
:
351 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
353 async def _on_update_k8s_db(
354 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None, cluster_type
="juju"
357 Updating vca status in NSR record
358 :param cluster_uuid: UUID of a k8s cluster
359 :param kdu_instance: The unique name of the KDU instance
360 :param filter: To get nsr_id
361 :cluster_type: The cluster type (juju, k8s)
365 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
366 # .format(cluster_uuid, kdu_instance, filter))
368 nsr_id
= filter.get("_id")
370 vca_status
= await self
.k8scluster_map
[cluster_type
].status_kdu(
371 cluster_uuid
=cluster_uuid
,
372 kdu_instance
=kdu_instance
,
374 complete_status
=True,
380 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
382 if cluster_type
in ("juju-bundle", "juju"):
383 # TODO -> this should be done in a more uniform way, I think in N2VC, in order to update the K8s VCA
384 # status in a similar way between Juju Bundles and Helm Charts on this side
385 await self
.k8sclusterjuju
.update_vca_status(
386 db_dict
["vcaStatus"],
392 f
"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
396 self
.update_db_2("nsrs", nsr_id
, db_dict
)
397 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
399 except Exception as e
:
400 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
403 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
405 env
= Environment(undefined
=StrictUndefined
)
406 template
= env
.from_string(cloud_init_text
)
407 return template
.render(additional_params
or {})
408 except UndefinedError
as e
:
410 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
411 "file, must be provided in the instantiation parameters inside the "
412 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
414 except (TemplateError
, TemplateNotFound
) as e
:
416 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
421 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
422 cloud_init_content
= cloud_init_file
= None
424 if vdu
.get("cloud-init-file"):
425 base_folder
= vnfd
["_admin"]["storage"]
426 if base_folder
["pkg-dir"]:
427 cloud_init_file
= "{}/{}/cloud_init/{}".format(
428 base_folder
["folder"],
429 base_folder
["pkg-dir"],
430 vdu
["cloud-init-file"],
433 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
434 base_folder
["folder"],
435 vdu
["cloud-init-file"],
437 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
438 cloud_init_content
= ci_file
.read()
439 elif vdu
.get("cloud-init"):
440 cloud_init_content
= vdu
["cloud-init"]
442 return cloud_init_content
443 except FsException
as e
:
445 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
446 vnfd
["id"], vdu
["id"], cloud_init_file
, e
450 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
452 (vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]),
455 additional_params
= vdur
.get("additionalParams")
456 return parse_yaml_strings(additional_params
)
458 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
460 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
461 :param vnfd: input vnfd
462 :param new_id: overrides vnf id if provided
463 :param additionalParams: Instantiation params for VNFs provided
464 :param nsrId: Id of the NSR
465 :return: copy of vnfd
467 vnfd_RO
= deepcopy(vnfd
)
468 # remove unused by RO configuration, monitoring, scaling and internal keys
469 vnfd_RO
.pop("_id", None)
470 vnfd_RO
.pop("_admin", None)
471 vnfd_RO
.pop("monitoring-param", None)
472 vnfd_RO
.pop("scaling-group-descriptor", None)
473 vnfd_RO
.pop("kdu", None)
474 vnfd_RO
.pop("k8s-cluster", None)
476 vnfd_RO
["id"] = new_id
478 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
479 for vdu
in get_iterable(vnfd_RO
, "vdu"):
480 vdu
.pop("cloud-init-file", None)
481 vdu
.pop("cloud-init", None)
485 def ip_profile_2_RO(ip_profile
):
486 RO_ip_profile
= deepcopy(ip_profile
)
487 if "dns-server" in RO_ip_profile
:
488 if isinstance(RO_ip_profile
["dns-server"], list):
489 RO_ip_profile
["dns-address"] = []
490 for ds
in RO_ip_profile
.pop("dns-server"):
491 RO_ip_profile
["dns-address"].append(ds
["address"])
493 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
494 if RO_ip_profile
.get("ip-version") == "ipv4":
495 RO_ip_profile
["ip-version"] = "IPv4"
496 if RO_ip_profile
.get("ip-version") == "ipv6":
497 RO_ip_profile
["ip-version"] = "IPv6"
498 if "dhcp-params" in RO_ip_profile
:
499 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
502 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
503 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
504 if db_vim
["_admin"]["operationalState"] != "ENABLED":
506 "VIM={} is not available. operationalState={}".format(
507 vim_account
, db_vim
["_admin"]["operationalState"]
510 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
513 def get_ro_wim_id_for_wim_account(self
, wim_account
):
514 if isinstance(wim_account
, str):
515 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
516 if db_wim
["_admin"]["operationalState"] != "ENABLED":
518 "WIM={} is not available. operationalState={}".format(
519 wim_account
, db_wim
["_admin"]["operationalState"]
522 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
527 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
529 db_vdu_push_list
= []
531 db_update
= {"_admin.modified": time()}
533 for vdu_id
, vdu_count
in vdu_create
.items():
537 for vdur
in reversed(db_vnfr
["vdur"])
538 if vdur
["vdu-id-ref"] == vdu_id
543 # Read the template saved in the db:
544 self
.logger
.debug(f
"No vdur in the database. Using the vdur-template to scale")
545 vdur_template
= db_vnfr
.get("vdur-template")
546 if not vdur_template
:
548 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
552 vdur
= vdur_template
[0]
553 #Delete a template from the database after using it
554 self
.db
.set_one("vnfrs",
555 {"_id": db_vnfr
["_id"]},
557 pull
={"vdur-template": {"_id": vdur
['_id']}}
559 for count
in range(vdu_count
):
560 vdur_copy
= deepcopy(vdur
)
561 vdur_copy
["status"] = "BUILD"
562 vdur_copy
["status-detailed"] = None
563 vdur_copy
["ip-address"] = None
564 vdur_copy
["_id"] = str(uuid4())
565 vdur_copy
["count-index"] += count
+ 1
566 vdur_copy
["id"] = "{}-{}".format(
567 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
569 vdur_copy
.pop("vim_info", None)
570 for iface
in vdur_copy
["interfaces"]:
571 if iface
.get("fixed-ip"):
572 iface
["ip-address"] = self
.increment_ip_mac(
573 iface
["ip-address"], count
+ 1
576 iface
.pop("ip-address", None)
577 if iface
.get("fixed-mac"):
578 iface
["mac-address"] = self
.increment_ip_mac(
579 iface
["mac-address"], count
+ 1
582 iface
.pop("mac-address", None)
586 ) # only first vdu can be managment of vnf
587 db_vdu_push_list
.append(vdur_copy
)
588 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
590 if len(db_vnfr
["vdur"]) == 1:
591 # The scale will move to 0 instances
592 self
.logger
.debug(f
"Scaling to 0 !, creating the template with the last vdur")
593 template_vdur
= [db_vnfr
["vdur"][0]]
594 for vdu_id
, vdu_count
in vdu_delete
.items():
596 indexes_to_delete
= [
598 for iv
in enumerate(db_vnfr
["vdur"])
599 if iv
[1]["vdu-id-ref"] == vdu_id
603 "vdur.{}.status".format(i
): "DELETING"
604 for i
in indexes_to_delete
[-vdu_count
:]
608 # it must be deleted one by one because common.db does not allow otherwise
611 for v
in reversed(db_vnfr
["vdur"])
612 if v
["vdu-id-ref"] == vdu_id
614 for vdu
in vdus_to_delete
[:vdu_count
]:
617 {"_id": db_vnfr
["_id"]},
619 pull
={"vdur": {"_id": vdu
["_id"]}},
623 db_push
["vdur"] = db_vdu_push_list
625 db_push
["vdur-template"] = template_vdur
628 db_vnfr
["vdur-template"] = template_vdur
629 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
630 # modify passed dictionary db_vnfr
631 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
632 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
634 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
636 Updates database nsr with the RO info for the created vld
637 :param ns_update_nsr: dictionary to be filled with the updated info
638 :param db_nsr: content of db_nsr. This is also modified
639 :param nsr_desc_RO: nsr descriptor from RO
640 :return: Nothing, LcmException is raised on errors
643 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
644 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
645 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
647 vld
["vim-id"] = net_RO
.get("vim_net_id")
648 vld
["name"] = net_RO
.get("vim_name")
649 vld
["status"] = net_RO
.get("status")
650 vld
["status-detailed"] = net_RO
.get("error_msg")
651 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
655 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
658 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
660 for db_vnfr
in db_vnfrs
.values():
661 vnfr_update
= {"status": "ERROR"}
662 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
663 if "status" not in vdur
:
664 vdur
["status"] = "ERROR"
665 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
667 vdur
["status-detailed"] = str(error_text
)
669 "vdur.{}.status-detailed".format(vdu_index
)
671 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
672 except DbException
as e
:
673 self
.logger
.error("Cannot update vnf. {}".format(e
))
675 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
677 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
678 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
679 :param nsr_desc_RO: nsr descriptor from RO
680 :return: Nothing, LcmException is raised on errors
682 for vnf_index
, db_vnfr
in db_vnfrs
.items():
683 for vnf_RO
in nsr_desc_RO
["vnfs"]:
684 if vnf_RO
["member_vnf_index"] != vnf_index
:
687 if vnf_RO
.get("ip_address"):
688 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
[
691 elif not db_vnfr
.get("ip-address"):
692 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
693 raise LcmExceptionNoMgmtIP(
694 "ns member_vnf_index '{}' has no IP address".format(
699 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
700 vdur_RO_count_index
= 0
701 if vdur
.get("pdu-type"):
703 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
704 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
706 if vdur
["count-index"] != vdur_RO_count_index
:
707 vdur_RO_count_index
+= 1
709 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
710 if vdur_RO
.get("ip_address"):
711 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
713 vdur
["ip-address"] = None
714 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
715 vdur
["name"] = vdur_RO
.get("vim_name")
716 vdur
["status"] = vdur_RO
.get("status")
717 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
718 for ifacer
in get_iterable(vdur
, "interfaces"):
719 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
720 if ifacer
["name"] == interface_RO
.get("internal_name"):
721 ifacer
["ip-address"] = interface_RO
.get(
724 ifacer
["mac-address"] = interface_RO
.get(
730 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
731 "from VIM info".format(
732 vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]
735 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
739 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
741 vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]
745 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
746 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
747 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
749 vld
["vim-id"] = net_RO
.get("vim_net_id")
750 vld
["name"] = net_RO
.get("vim_name")
751 vld
["status"] = net_RO
.get("status")
752 vld
["status-detailed"] = net_RO
.get("error_msg")
753 vnfr_update
["vld.{}".format(vld_index
)] = vld
757 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
762 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
767 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
772 def _get_ns_config_info(self
, nsr_id
):
774 Generates a mapping between vnf,vdu elements and the N2VC id
775 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
776 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
777 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
778 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
780 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
781 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
783 ns_config_info
= {"osm-config-mapping": mapping
}
784 for vca
in vca_deployed_list
:
785 if not vca
["member-vnf-index"]:
787 if not vca
["vdu_id"]:
788 mapping
[vca
["member-vnf-index"]] = vca
["application"]
792 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
794 ] = vca
["application"]
795 return ns_config_info
797 async def _instantiate_ng_ro(
814 def get_vim_account(vim_account_id
):
816 if vim_account_id
in db_vims
:
817 return db_vims
[vim_account_id
]
818 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
819 db_vims
[vim_account_id
] = db_vim
822 # modify target_vld info with instantiation parameters
823 def parse_vld_instantiation_params(
824 target_vim
, target_vld
, vld_params
, target_sdn
826 if vld_params
.get("ip-profile"):
827 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
[
830 if vld_params
.get("provider-network"):
831 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
834 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
835 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
838 if vld_params
.get("wimAccountId"):
839 target_wim
= "wim:{}".format(vld_params
["wimAccountId"])
840 target_vld
["vim_info"][target_wim
] = {}
841 for param
in ("vim-network-name", "vim-network-id"):
842 if vld_params
.get(param
):
843 if isinstance(vld_params
[param
], dict):
844 for vim
, vim_net
in vld_params
[param
].items():
845 other_target_vim
= "vim:" + vim
847 target_vld
["vim_info"],
848 (other_target_vim
, param
.replace("-", "_")),
851 else: # isinstance str
852 target_vld
["vim_info"][target_vim
][
853 param
.replace("-", "_")
854 ] = vld_params
[param
]
855 if vld_params
.get("common_id"):
856 target_vld
["common_id"] = vld_params
.get("common_id")
858 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
859 def update_ns_vld_target(target
, ns_params
):
860 for vnf_params
in ns_params
.get("vnf", ()):
861 if vnf_params
.get("vimAccountId"):
865 for vnfr
in db_vnfrs
.values()
866 if vnf_params
["member-vnf-index"]
867 == vnfr
["member-vnf-index-ref"]
871 vdur
= next((vdur
for vdur
in target_vnf
.get("vdur", ())), None)
872 for a_index
, a_vld
in enumerate(target
["ns"]["vld"]):
873 target_vld
= find_in_list(
874 get_iterable(vdur
, "interfaces"),
875 lambda iface
: iface
.get("ns-vld-id") == a_vld
["name"],
878 if vnf_params
.get("vimAccountId") not in a_vld
.get(
881 target
["ns"]["vld"][a_index
].get("vim_info").update(
883 "vim:{}".format(vnf_params
["vimAccountId"]): {
884 "vim_network_name": ""
889 nslcmop_id
= db_nslcmop
["_id"]
891 "name": db_nsr
["name"],
894 "image": deepcopy(db_nsr
["image"]),
895 "flavor": deepcopy(db_nsr
["flavor"]),
896 "action_id": nslcmop_id
,
897 "cloud_init_content": {},
899 for image
in target
["image"]:
900 image
["vim_info"] = {}
901 for flavor
in target
["flavor"]:
902 flavor
["vim_info"] = {}
903 if db_nsr
.get("affinity-or-anti-affinity-group"):
904 target
["affinity-or-anti-affinity-group"] = deepcopy(
905 db_nsr
["affinity-or-anti-affinity-group"]
907 for affinity_or_anti_affinity_group
in target
[
908 "affinity-or-anti-affinity-group"
910 affinity_or_anti_affinity_group
["vim_info"] = {}
912 if db_nslcmop
.get("lcmOperationType") != "instantiate":
913 # get parameters of instantiation:
914 db_nslcmop_instantiate
= self
.db
.get_list(
917 "nsInstanceId": db_nslcmop
["nsInstanceId"],
918 "lcmOperationType": "instantiate",
921 ns_params
= db_nslcmop_instantiate
.get("operationParams")
923 ns_params
= db_nslcmop
.get("operationParams")
924 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
925 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
928 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
929 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
933 "mgmt-network": vld
.get("mgmt-network", False),
934 "type": vld
.get("type"),
937 "vim_network_name": vld
.get("vim-network-name"),
938 "vim_account_id": ns_params
["vimAccountId"],
942 # check if this network needs SDN assist
943 if vld
.get("pci-interfaces"):
944 db_vim
= get_vim_account(ns_params
["vimAccountId"])
945 sdnc_id
= db_vim
["config"].get("sdn-controller")
947 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
948 target_sdn
= "sdn:{}".format(sdnc_id
)
949 target_vld
["vim_info"][target_sdn
] = {
951 "target_vim": target_vim
,
953 "type": vld
.get("type"),
956 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
957 for nsd_vnf_profile
in nsd_vnf_profiles
:
958 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
959 if cp
["virtual-link-profile-id"] == vld
["id"]:
961 "member_vnf:{}.{}".format(
962 cp
["constituent-cpd-id"][0][
963 "constituent-base-element-id"
965 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
967 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
969 # check at nsd descriptor, if there is an ip-profile
971 nsd_vlp
= find_in_list(
972 get_virtual_link_profiles(nsd
),
973 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
978 and nsd_vlp
.get("virtual-link-protocol-data")
979 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
981 ip_profile_source_data
= nsd_vlp
["virtual-link-protocol-data"][
984 ip_profile_dest_data
= {}
985 if "ip-version" in ip_profile_source_data
:
986 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
989 if "cidr" in ip_profile_source_data
:
990 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
993 if "gateway-ip" in ip_profile_source_data
:
994 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
[
997 if "dhcp-enabled" in ip_profile_source_data
:
998 ip_profile_dest_data
["dhcp-params"] = {
999 "enabled": ip_profile_source_data
["dhcp-enabled"]
1001 vld_params
["ip-profile"] = ip_profile_dest_data
1003 # update vld_params with instantiation params
1004 vld_instantiation_params
= find_in_list(
1005 get_iterable(ns_params
, "vld"),
1006 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
1008 if vld_instantiation_params
:
1009 vld_params
.update(vld_instantiation_params
)
1010 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
1011 target
["ns"]["vld"].append(target_vld
)
1012 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1013 update_ns_vld_target(target
, ns_params
)
1015 for vnfr
in db_vnfrs
.values():
1016 vnfd
= find_in_list(
1017 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
1019 vnf_params
= find_in_list(
1020 get_iterable(ns_params
, "vnf"),
1021 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
1023 target_vnf
= deepcopy(vnfr
)
1024 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
1025 for vld
in target_vnf
.get("vld", ()):
1026 # check if connected to a ns.vld, to fill target'
1027 vnf_cp
= find_in_list(
1028 vnfd
.get("int-virtual-link-desc", ()),
1029 lambda cpd
: cpd
.get("id") == vld
["id"],
1032 ns_cp
= "member_vnf:{}.{}".format(
1033 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
1035 if cp2target
.get(ns_cp
):
1036 vld
["target"] = cp2target
[ns_cp
]
1039 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
1041 # check if this network needs SDN assist
1043 if vld
.get("pci-interfaces"):
1044 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1045 sdnc_id
= db_vim
["config"].get("sdn-controller")
1047 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
1048 target_sdn
= "sdn:{}".format(sdnc_id
)
1049 vld
["vim_info"][target_sdn
] = {
1051 "target_vim": target_vim
,
1053 "type": vld
.get("type"),
1056 # check at vnfd descriptor, if there is an ip-profile
1058 vnfd_vlp
= find_in_list(
1059 get_virtual_link_profiles(vnfd
),
1060 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
1064 and vnfd_vlp
.get("virtual-link-protocol-data")
1065 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1067 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"][
1070 ip_profile_dest_data
= {}
1071 if "ip-version" in ip_profile_source_data
:
1072 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1075 if "cidr" in ip_profile_source_data
:
1076 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1079 if "gateway-ip" in ip_profile_source_data
:
1080 ip_profile_dest_data
[
1082 ] = ip_profile_source_data
["gateway-ip"]
1083 if "dhcp-enabled" in ip_profile_source_data
:
1084 ip_profile_dest_data
["dhcp-params"] = {
1085 "enabled": ip_profile_source_data
["dhcp-enabled"]
1088 vld_params
["ip-profile"] = ip_profile_dest_data
1089 # update vld_params with instantiation params
1091 vld_instantiation_params
= find_in_list(
1092 get_iterable(vnf_params
, "internal-vld"),
1093 lambda i_vld
: i_vld
["name"] == vld
["id"],
1095 if vld_instantiation_params
:
1096 vld_params
.update(vld_instantiation_params
)
1097 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
1100 for vdur
in target_vnf
.get("vdur", ()):
1101 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
1102 continue # This vdu must not be created
1103 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
1105 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
1108 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
1109 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
1112 and vdu_configuration
.get("config-access")
1113 and vdu_configuration
.get("config-access").get("ssh-access")
1115 vdur
["ssh-keys"] = ssh_keys_all
1116 vdur
["ssh-access-required"] = vdu_configuration
[
1118 ]["ssh-access"]["required"]
1121 and vnf_configuration
.get("config-access")
1122 and vnf_configuration
.get("config-access").get("ssh-access")
1123 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
1125 vdur
["ssh-keys"] = ssh_keys_all
1126 vdur
["ssh-access-required"] = vnf_configuration
[
1128 ]["ssh-access"]["required"]
1129 elif ssh_keys_instantiation
and find_in_list(
1130 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
1132 vdur
["ssh-keys"] = ssh_keys_instantiation
1134 self
.logger
.debug("NS > vdur > {}".format(vdur
))
1136 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1138 if vdud
.get("cloud-init-file"):
1139 vdur
["cloud-init"] = "{}:file:{}".format(
1140 vnfd
["_id"], vdud
.get("cloud-init-file")
1142 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1143 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1144 base_folder
= vnfd
["_admin"]["storage"]
1145 if base_folder
["pkg-dir"]:
1146 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1147 base_folder
["folder"],
1148 base_folder
["pkg-dir"],
1149 vdud
.get("cloud-init-file"),
1152 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
1153 base_folder
["folder"],
1154 vdud
.get("cloud-init-file"),
1156 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1157 target
["cloud_init_content"][
1160 elif vdud
.get("cloud-init"):
1161 vdur
["cloud-init"] = "{}:vdu:{}".format(
1162 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1164 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1165 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1168 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1169 deploy_params_vdu
= self
._format
_additional
_params
(
1170 vdur
.get("additionalParams") or {}
1172 deploy_params_vdu
["OSM"] = get_osm_params(
1173 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1175 vdur
["additionalParams"] = deploy_params_vdu
1178 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1179 if target_vim
not in ns_flavor
["vim_info"]:
1180 ns_flavor
["vim_info"][target_vim
] = {}
1183 # in case alternative images are provided we must check if they should be applied
1184 # for the vim_type, modify the vim_type taking into account
1185 ns_image_id
= int(vdur
["ns-image-id"])
1186 if vdur
.get("alt-image-ids"):
1187 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1188 vim_type
= db_vim
["vim_type"]
1189 for alt_image_id
in vdur
.get("alt-image-ids"):
1190 ns_alt_image
= target
["image"][int(alt_image_id
)]
1191 if vim_type
== ns_alt_image
.get("vim-type"):
1192 # must use alternative image
1194 "use alternative image id: {}".format(alt_image_id
)
1196 ns_image_id
= alt_image_id
1197 vdur
["ns-image-id"] = ns_image_id
1199 ns_image
= target
["image"][int(ns_image_id
)]
1200 if target_vim
not in ns_image
["vim_info"]:
1201 ns_image
["vim_info"][target_vim
] = {}
1204 if vdur
.get("affinity-or-anti-affinity-group-id"):
1205 for ags_id
in vdur
["affinity-or-anti-affinity-group-id"]:
1206 ns_ags
= target
["affinity-or-anti-affinity-group"][int(ags_id
)]
1207 if target_vim
not in ns_ags
["vim_info"]:
1208 ns_ags
["vim_info"][target_vim
] = {}
1210 vdur
["vim_info"] = {target_vim
: {}}
1211 # instantiation parameters
1213 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
1214 # vdud["id"]), None)
1215 vdur_list
.append(vdur
)
1216 target_vnf
["vdur"] = vdur_list
1217 target
["vnf"].append(target_vnf
)
1219 desc
= await self
.RO
.deploy(nsr_id
, target
)
1220 self
.logger
.debug("RO return > {}".format(desc
))
1221 action_id
= desc
["action_id"]
1222 await self
._wait
_ng
_ro
(
1223 nsr_id
, action_id
, nslcmop_id
, start_deploy
, timeout_ns_deploy
, stage
1228 "_admin.deployed.RO.operational-status": "running",
1229 "detailed-status": " ".join(stage
),
1231 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1232 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1233 self
._write
_op
_status
(nslcmop_id
, stage
)
1235 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1239 async def _wait_ng_ro(
1248 detailed_status_old
= None
1250 start_time
= start_time
or time()
1251 while time() <= start_time
+ timeout
:
1252 desc_status
= await self
.RO
.status(nsr_id
, action_id
)
1253 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1254 if desc_status
["status"] == "FAILED":
1255 raise NgRoException(desc_status
["details"])
1256 elif desc_status
["status"] == "BUILD":
1258 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1259 elif desc_status
["status"] == "DONE":
1261 stage
[2] = "Deployed at VIM"
1264 assert False, "ROclient.check_ns_status returns unknown {}".format(
1265 desc_status
["status"]
1267 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1268 detailed_status_old
= stage
[2]
1269 db_nsr_update
["detailed-status"] = " ".join(stage
)
1270 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1271 self
._write
_op
_status
(nslcmop_id
, stage
)
1272 await asyncio
.sleep(15, loop
=self
.loop
)
1273 else: # timeout_ns_deploy
1274 raise NgRoException("Timeout waiting ns to deploy")
1276 async def _terminate_ng_ro(
1277 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1282 start_deploy
= time()
1289 "action_id": nslcmop_id
,
1291 desc
= await self
.RO
.deploy(nsr_id
, target
)
1292 action_id
= desc
["action_id"]
1293 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1294 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1297 + "ns terminate action at RO. action_id={}".format(action_id
)
1301 delete_timeout
= 20 * 60 # 20 minutes
1302 await self
._wait
_ng
_ro
(
1303 nsr_id
, action_id
, nslcmop_id
, start_deploy
, delete_timeout
, stage
1306 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1307 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1309 await self
.RO
.delete(nsr_id
)
1310 except Exception as e
:
1311 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
1312 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1313 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1314 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1316 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1318 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
1319 failed_detail
.append("delete conflict: {}".format(e
))
1322 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1325 failed_detail
.append("delete error: {}".format(e
))
1328 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1332 stage
[2] = "Error deleting from VIM"
1334 stage
[2] = "Deleted from VIM"
1335 db_nsr_update
["detailed-status"] = " ".join(stage
)
1336 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1337 self
._write
_op
_status
(nslcmop_id
, stage
)
1340 raise LcmException("; ".join(failed_detail
))
1343 async def instantiate_RO(
1357 :param logging_text: preffix text to use at logging
1358 :param nsr_id: nsr identity
1359 :param nsd: database content of ns descriptor
1360 :param db_nsr: database content of ns record
1361 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1363 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1364 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1365 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1366 :return: None or exception
1369 start_deploy
= time()
1370 ns_params
= db_nslcmop
.get("operationParams")
1371 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1372 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1374 timeout_ns_deploy
= self
.timeout
.get(
1375 "ns_deploy", self
.timeout_ns_deploy
1378 # Check for and optionally request placement optimization. Database will be updated if placement activated
1379 stage
[2] = "Waiting for Placement."
1380 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1381 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1382 for vnfr
in db_vnfrs
.values():
1383 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1386 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1388 return await self
._instantiate
_ng
_ro
(
1401 except Exception as e
:
1402 stage
[2] = "ERROR deploying at VIM"
1403 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1405 "Error deploying at VIM {}".format(e
),
1406 exc_info
=not isinstance(
1409 ROclient
.ROClientException
,
1418 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1420 Wait for kdu to be up, get ip address
1421 :param logging_text: prefix use for logging
1425 :return: IP address, K8s services
1428 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1431 while nb_tries
< 360:
1432 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1436 for x
in get_iterable(db_vnfr
, "kdur")
1437 if x
.get("kdu-name") == kdu_name
1443 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1445 if kdur
.get("status"):
1446 if kdur
["status"] in ("READY", "ENABLED"):
1447 return kdur
.get("ip-address"), kdur
.get("services")
1450 "target KDU={} is in error state".format(kdu_name
)
1453 await asyncio
.sleep(10, loop
=self
.loop
)
1455 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1457 async def wait_vm_up_insert_key_ro(
1458 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1461 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1462 :param logging_text: prefix use for logging
1467 :param pub_key: public ssh key to inject, None to skip
1468 :param user: user to apply the public ssh key
1472 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1476 target_vdu_id
= None
1482 if ro_retries
>= 360: # 1 hour
1484 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1487 await asyncio
.sleep(10, loop
=self
.loop
)
1490 if not target_vdu_id
:
1491 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1493 if not vdu_id
: # for the VNF case
1494 if db_vnfr
.get("status") == "ERROR":
1496 "Cannot inject ssh-key because target VNF is in error state"
1498 ip_address
= db_vnfr
.get("ip-address")
1504 for x
in get_iterable(db_vnfr
, "vdur")
1505 if x
.get("ip-address") == ip_address
1513 for x
in get_iterable(db_vnfr
, "vdur")
1514 if x
.get("vdu-id-ref") == vdu_id
1515 and x
.get("count-index") == vdu_index
1521 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1522 ): # If only one, this should be the target vdu
1523 vdur
= db_vnfr
["vdur"][0]
1526 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1527 vnfr_id
, vdu_id
, vdu_index
1530 # New generation RO stores information at "vim_info"
1533 if vdur
.get("vim_info"):
1535 t
for t
in vdur
["vim_info"]
1536 ) # there should be only one key
1537 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1539 vdur
.get("pdu-type")
1540 or vdur
.get("status") == "ACTIVE"
1541 or ng_ro_status
== "ACTIVE"
1543 ip_address
= vdur
.get("ip-address")
1546 target_vdu_id
= vdur
["vdu-id-ref"]
1547 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1549 "Cannot inject ssh-key because target VM is in error state"
1552 if not target_vdu_id
:
1555 # inject public key into machine
1556 if pub_key
and user
:
1557 self
.logger
.debug(logging_text
+ "Inserting RO key")
1558 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1559 if vdur
.get("pdu-type"):
1560 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1563 ro_vm_id
= "{}-{}".format(
1564 db_vnfr
["member-vnf-index-ref"], target_vdu_id
1565 ) # TODO add vdu_index
1569 "action": "inject_ssh_key",
1573 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1575 desc
= await self
.RO
.deploy(nsr_id
, target
)
1576 action_id
= desc
["action_id"]
1577 await self
._wait
_ng
_ro
(nsr_id
, action_id
, timeout
=600)
1580 # wait until NS is deployed at RO
1582 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1583 ro_nsr_id
= deep_get(
1584 db_nsrs
, ("_admin", "deployed", "RO", "nsr_id")
1588 result_dict
= await self
.RO
.create_action(
1590 item_id_name
=ro_nsr_id
,
1592 "add_public_key": pub_key
,
1597 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1598 if not result_dict
or not isinstance(result_dict
, dict):
1600 "Unknown response from RO when injecting key"
1602 for result
in result_dict
.values():
1603 if result
.get("vim_result") == 200:
1606 raise ROclient
.ROClientException(
1607 "error injecting key: {}".format(
1608 result
.get("description")
1612 except NgRoException
as e
:
1614 "Reaching max tries injecting key. Error: {}".format(e
)
1616 except ROclient
.ROClientException
as e
:
1620 + "error injecting key: {}. Retrying until {} seconds".format(
1627 "Reaching max tries injecting key. Error: {}".format(e
)
1634 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1636 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1638 my_vca
= vca_deployed_list
[vca_index
]
1639 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1640 # vdu or kdu: no dependencies
1644 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1645 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1646 configuration_status_list
= db_nsr
["configurationStatus"]
1647 for index
, vca_deployed
in enumerate(configuration_status_list
):
1648 if index
== vca_index
:
1651 if not my_vca
.get("member-vnf-index") or (
1652 vca_deployed
.get("member-vnf-index")
1653 == my_vca
.get("member-vnf-index")
1655 internal_status
= configuration_status_list
[index
].get("status")
1656 if internal_status
== "READY":
1658 elif internal_status
== "BROKEN":
1660 "Configuration aborted because dependent charm/s has failed"
1665 # no dependencies, return
1667 await asyncio
.sleep(10)
1670 raise LcmException("Configuration aborted because dependent charm/s timeout")
1672 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1675 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1677 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1678 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1681 async def instantiate_N2VC(
1698 ee_config_descriptor
,
1700 nsr_id
= db_nsr
["_id"]
1701 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1702 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1703 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1704 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1706 "collection": "nsrs",
1707 "filter": {"_id": nsr_id
},
1708 "path": db_update_entry
,
1714 element_under_configuration
= nsr_id
1718 vnfr_id
= db_vnfr
["_id"]
1719 osm_config
["osm"]["vnf_id"] = vnfr_id
1721 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1723 if vca_type
== "native_charm":
1726 index_number
= vdu_index
or 0
1729 element_type
= "VNF"
1730 element_under_configuration
= vnfr_id
1731 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1733 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1734 element_type
= "VDU"
1735 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1736 osm_config
["osm"]["vdu_id"] = vdu_id
1738 namespace
+= ".{}".format(kdu_name
)
1739 element_type
= "KDU"
1740 element_under_configuration
= kdu_name
1741 osm_config
["osm"]["kdu_name"] = kdu_name
1744 if base_folder
["pkg-dir"]:
1745 artifact_path
= "{}/{}/{}/{}".format(
1746 base_folder
["folder"],
1747 base_folder
["pkg-dir"],
1750 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1755 artifact_path
= "{}/Scripts/{}/{}/".format(
1756 base_folder
["folder"],
1759 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1764 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1766 # get initial_config_primitive_list that applies to this element
1767 initial_config_primitive_list
= config_descriptor
.get(
1768 "initial-config-primitive"
1772 "Initial config primitive list > {}".format(
1773 initial_config_primitive_list
1777 # add config if not present for NS charm
1778 ee_descriptor_id
= ee_config_descriptor
.get("id")
1779 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1780 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1781 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1785 "Initial config primitive list #2 > {}".format(
1786 initial_config_primitive_list
1789 # n2vc_redesign STEP 3.1
1790 # find old ee_id if exists
1791 ee_id
= vca_deployed
.get("ee_id")
1793 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1794 # create or register execution environment in VCA
1795 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1797 self
._write
_configuration
_status
(
1799 vca_index
=vca_index
,
1801 element_under_configuration
=element_under_configuration
,
1802 element_type
=element_type
,
1805 step
= "create execution environment"
1806 self
.logger
.debug(logging_text
+ step
)
1810 if vca_type
== "k8s_proxy_charm":
1811 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1812 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1813 namespace
=namespace
,
1814 artifact_path
=artifact_path
,
1818 elif vca_type
== "helm" or vca_type
== "helm-v3":
1819 ee_id
, credentials
= await self
.vca_map
[
1821 ].create_execution_environment(
1822 namespace
=namespace
,
1826 artifact_path
=artifact_path
,
1830 ee_id
, credentials
= await self
.vca_map
[
1832 ].create_execution_environment(
1833 namespace
=namespace
,
1839 elif vca_type
== "native_charm":
1840 step
= "Waiting to VM being up and getting IP address"
1841 self
.logger
.debug(logging_text
+ step
)
1842 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1851 credentials
= {"hostname": rw_mgmt_ip
}
1853 username
= deep_get(
1854 config_descriptor
, ("config-access", "ssh-access", "default-user")
1856 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1857 # merged. Meanwhile let's get username from initial-config-primitive
1858 if not username
and initial_config_primitive_list
:
1859 for config_primitive
in initial_config_primitive_list
:
1860 for param
in config_primitive
.get("parameter", ()):
1861 if param
["name"] == "ssh-username":
1862 username
= param
["value"]
1866 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1867 "'config-access.ssh-access.default-user'"
1869 credentials
["username"] = username
1870 # n2vc_redesign STEP 3.2
1872 self
._write
_configuration
_status
(
1874 vca_index
=vca_index
,
1875 status
="REGISTERING",
1876 element_under_configuration
=element_under_configuration
,
1877 element_type
=element_type
,
1880 step
= "register execution environment {}".format(credentials
)
1881 self
.logger
.debug(logging_text
+ step
)
1882 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1883 credentials
=credentials
,
1884 namespace
=namespace
,
1889 # for compatibility with MON/POL modules, the need model and application name at database
1890 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1891 ee_id_parts
= ee_id
.split(".")
1892 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1893 if len(ee_id_parts
) >= 2:
1894 model_name
= ee_id_parts
[0]
1895 application_name
= ee_id_parts
[1]
1896 db_nsr_update
[db_update_entry
+ "model"] = model_name
1897 db_nsr_update
[db_update_entry
+ "application"] = application_name
1899 # n2vc_redesign STEP 3.3
1900 step
= "Install configuration Software"
1902 self
._write
_configuration
_status
(
1904 vca_index
=vca_index
,
1905 status
="INSTALLING SW",
1906 element_under_configuration
=element_under_configuration
,
1907 element_type
=element_type
,
1908 other_update
=db_nsr_update
,
1911 # TODO check if already done
1912 self
.logger
.debug(logging_text
+ step
)
1914 if vca_type
== "native_charm":
1915 config_primitive
= next(
1916 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
1919 if config_primitive
:
1920 config
= self
._map
_primitive
_params
(
1921 config_primitive
, {}, deploy_params
1924 if vca_type
== "lxc_proxy_charm":
1925 if element_type
== "NS":
1926 num_units
= db_nsr
.get("config-units") or 1
1927 elif element_type
== "VNF":
1928 num_units
= db_vnfr
.get("config-units") or 1
1929 elif element_type
== "VDU":
1930 for v
in db_vnfr
["vdur"]:
1931 if vdu_id
== v
["vdu-id-ref"]:
1932 num_units
= v
.get("config-units") or 1
1934 if vca_type
!= "k8s_proxy_charm":
1935 await self
.vca_map
[vca_type
].install_configuration_sw(
1937 artifact_path
=artifact_path
,
1940 num_units
=num_units
,
1945 # write in db flag of configuration_sw already installed
1947 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
1950 # add relations for this VCA (wait for other peers related with this VCA)
1951 await self
._add
_vca
_relations
(
1952 logging_text
=logging_text
,
1955 vca_index
=vca_index
,
1958 # if SSH access is required, then get execution environment SSH public
1959 # if native charm we have waited already to VM be UP
1960 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1963 # self.logger.debug("get ssh key block")
1965 config_descriptor
, ("config-access", "ssh-access", "required")
1967 # self.logger.debug("ssh key needed")
1968 # Needed to inject a ssh key
1971 ("config-access", "ssh-access", "default-user"),
1973 step
= "Install configuration Software, getting public ssh key"
1974 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
1975 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
1978 step
= "Insert public key into VM user={} ssh_key={}".format(
1982 # self.logger.debug("no need to get ssh key")
1983 step
= "Waiting to VM being up and getting IP address"
1984 self
.logger
.debug(logging_text
+ step
)
1986 # n2vc_redesign STEP 5.1
1987 # wait for RO (ip-address) Insert pub_key into VM
1990 rw_mgmt_ip
, services
= await self
.wait_kdu_up(
1991 logging_text
, nsr_id
, vnfr_id
, kdu_name
1993 vnfd
= self
.db
.get_one(
1995 {"_id": f
'{db_vnfr["vnfd-id"]}:{db_vnfr["revision"]}'},
1997 kdu
= get_kdu(vnfd
, kdu_name
)
1999 service
["name"] for service
in get_kdu_services(kdu
)
2001 exposed_services
= []
2002 for service
in services
:
2003 if any(s
in service
["name"] for s
in kdu_services
):
2004 exposed_services
.append(service
)
2005 await self
.vca_map
[vca_type
].exec_primitive(
2007 primitive_name
="config",
2009 "osm-config": json
.dumps(
2011 k8s
={"services": exposed_services
}
2018 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
2029 rw_mgmt_ip
= None # This is for a NS configuration
2031 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
2033 # store rw_mgmt_ip in deploy params for later replacement
2034 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
2036 # n2vc_redesign STEP 6 Execute initial config primitive
2037 step
= "execute initial config primitive"
2039 # wait for dependent primitives execution (NS -> VNF -> VDU)
2040 if initial_config_primitive_list
:
2041 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
2043 # stage, in function of element type: vdu, kdu, vnf or ns
2044 my_vca
= vca_deployed_list
[vca_index
]
2045 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
2047 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
2048 elif my_vca
.get("member-vnf-index"):
2050 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
2053 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
2055 self
._write
_configuration
_status
(
2056 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
2059 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2061 check_if_terminated_needed
= True
2062 for initial_config_primitive
in initial_config_primitive_list
:
2063 # adding information on the vca_deployed if it is a NS execution environment
2064 if not vca_deployed
["member-vnf-index"]:
2065 deploy_params
["ns_config_info"] = json
.dumps(
2066 self
._get
_ns
_config
_info
(nsr_id
)
2068 # TODO check if already done
2069 primitive_params_
= self
._map
_primitive
_params
(
2070 initial_config_primitive
, {}, deploy_params
2073 step
= "execute primitive '{}' params '{}'".format(
2074 initial_config_primitive
["name"], primitive_params_
2076 self
.logger
.debug(logging_text
+ step
)
2077 await self
.vca_map
[vca_type
].exec_primitive(
2079 primitive_name
=initial_config_primitive
["name"],
2080 params_dict
=primitive_params_
,
2085 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2086 if check_if_terminated_needed
:
2087 if config_descriptor
.get("terminate-config-primitive"):
2089 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
2091 check_if_terminated_needed
= False
2093 # TODO register in database that primitive is done
2095 # STEP 7 Configure metrics
2096 if vca_type
== "helm" or vca_type
== "helm-v3":
2097 prometheus_jobs
= await self
.extract_prometheus_scrape_jobs(
2099 artifact_path
=artifact_path
,
2100 ee_config_descriptor
=ee_config_descriptor
,
2103 target_ip
=rw_mgmt_ip
,
2109 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
2112 for job
in prometheus_jobs
:
2115 {"job_name": job
["job_name"]},
2118 fail_on_empty
=False,
2121 step
= "instantiated at VCA"
2122 self
.logger
.debug(logging_text
+ step
)
2124 self
._write
_configuration
_status
(
2125 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
2128 except Exception as e
: # TODO not use Exception but N2VC exception
2129 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2131 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
2134 "Exception while {} : {}".format(step
, e
), exc_info
=True
2136 self
._write
_configuration
_status
(
2137 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2139 raise LcmException("{} {}".format(step
, e
)) from e
2141 def _write_ns_status(
2145 current_operation
: str,
2146 current_operation_id
: str,
2147 error_description
: str = None,
2148 error_detail
: str = None,
2149 other_update
: dict = None,
2152 Update db_nsr fields.
2155 :param current_operation:
2156 :param current_operation_id:
2157 :param error_description:
2158 :param error_detail:
2159 :param other_update: Other required changes at database if provided, will be cleared
2163 db_dict
= other_update
or {}
2166 ] = current_operation_id
# for backward compatibility
2167 db_dict
["_admin.current-operation"] = current_operation_id
2168 db_dict
["_admin.operation-type"] = (
2169 current_operation
if current_operation
!= "IDLE" else None
2171 db_dict
["currentOperation"] = current_operation
2172 db_dict
["currentOperationID"] = current_operation_id
2173 db_dict
["errorDescription"] = error_description
2174 db_dict
["errorDetail"] = error_detail
2177 db_dict
["nsState"] = ns_state
2178 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2179 except DbException
as e
:
2180 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2182 def _write_op_status(
2186 error_message
: str = None,
2187 queuePosition
: int = 0,
2188 operation_state
: str = None,
2189 other_update
: dict = None,
2192 db_dict
= other_update
or {}
2193 db_dict
["queuePosition"] = queuePosition
2194 if isinstance(stage
, list):
2195 db_dict
["stage"] = stage
[0]
2196 db_dict
["detailed-status"] = " ".join(stage
)
2197 elif stage
is not None:
2198 db_dict
["stage"] = str(stage
)
2200 if error_message
is not None:
2201 db_dict
["errorMessage"] = error_message
2202 if operation_state
is not None:
2203 db_dict
["operationState"] = operation_state
2204 db_dict
["statusEnteredTime"] = time()
2205 self
.update_db_2("nslcmops", op_id
, db_dict
)
2206 except DbException
as e
:
2208 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2211 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2213 nsr_id
= db_nsr
["_id"]
2214 # configurationStatus
2215 config_status
= db_nsr
.get("configurationStatus")
2218 "configurationStatus.{}.status".format(index
): status
2219 for index
, v
in enumerate(config_status
)
2223 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2225 except DbException
as e
:
2227 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2230 def _write_configuration_status(
2235 element_under_configuration
: str = None,
2236 element_type
: str = None,
2237 other_update
: dict = None,
2240 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2241 # .format(vca_index, status))
2244 db_path
= "configurationStatus.{}.".format(vca_index
)
2245 db_dict
= other_update
or {}
2247 db_dict
[db_path
+ "status"] = status
2248 if element_under_configuration
:
2250 db_path
+ "elementUnderConfiguration"
2251 ] = element_under_configuration
2253 db_dict
[db_path
+ "elementType"] = element_type
2254 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2255 except DbException
as e
:
2257 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2258 status
, nsr_id
, vca_index
, e
2262 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2264 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2265 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2266 Database is used because the result can be obtained from a different LCM worker in case of HA.
2267 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2268 :param db_nslcmop: database content of nslcmop
2269 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2270 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2271 computed 'vim-account-id'
2274 nslcmop_id
= db_nslcmop
["_id"]
2275 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2276 if placement_engine
== "PLA":
2278 logging_text
+ "Invoke and wait for placement optimization"
2280 await self
.msg
.aiowrite(
2281 "pla", "get_placement", {"nslcmopId": nslcmop_id
}, loop
=self
.loop
2283 db_poll_interval
= 5
2284 wait
= db_poll_interval
* 10
2286 while not pla_result
and wait
>= 0:
2287 await asyncio
.sleep(db_poll_interval
)
2288 wait
-= db_poll_interval
2289 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2290 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2294 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2297 for pla_vnf
in pla_result
["vnf"]:
2298 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2299 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2304 {"_id": vnfr
["_id"]},
2305 {"vim-account-id": pla_vnf
["vimAccountId"]},
2308 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2311 def update_nsrs_with_pla_result(self
, params
):
2313 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2315 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2317 except Exception as e
:
2318 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2320 async def instantiate(self
, nsr_id
, nslcmop_id
):
2323 :param nsr_id: ns instance to deploy
2324 :param nslcmop_id: operation to run
2328 # Try to lock HA task here
2329 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2330 if not task_is_locked_by_me
:
2332 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2336 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2337 self
.logger
.debug(logging_text
+ "Enter")
2339 # get all needed from database
2341 # database nsrs record
2344 # database nslcmops record
2347 # update operation on nsrs
2349 # update operation on nslcmops
2350 db_nslcmop_update
= {}
2352 nslcmop_operation_state
= None
2353 db_vnfrs
= {} # vnf's info indexed by member-index
2355 tasks_dict_info
= {} # from task to info text
2359 "Stage 1/5: preparation of the environment.",
2360 "Waiting for previous operations to terminate.",
2363 # ^ stage, step, VIM progress
2365 # wait for any previous tasks in process
2366 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2368 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2369 stage
[1] = "Reading from database."
2370 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2371 db_nsr_update
["detailed-status"] = "creating"
2372 db_nsr_update
["operational-status"] = "init"
2373 self
._write
_ns
_status
(
2375 ns_state
="BUILDING",
2376 current_operation
="INSTANTIATING",
2377 current_operation_id
=nslcmop_id
,
2378 other_update
=db_nsr_update
,
2380 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2382 # read from db: operation
2383 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2384 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2385 if db_nslcmop
["operationParams"].get("additionalParamsForVnf"):
2386 db_nslcmop
["operationParams"]["additionalParamsForVnf"] = json
.loads(
2387 db_nslcmop
["operationParams"]["additionalParamsForVnf"]
2389 ns_params
= db_nslcmop
.get("operationParams")
2390 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2391 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2393 timeout_ns_deploy
= self
.timeout
.get(
2394 "ns_deploy", self
.timeout_ns_deploy
2398 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2399 self
.logger
.debug(logging_text
+ stage
[1])
2400 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2401 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2402 self
.logger
.debug(logging_text
+ stage
[1])
2403 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2404 self
.fs
.sync(db_nsr
["nsd-id"])
2406 # nsr_name = db_nsr["name"] # TODO short-name??
2408 # read from db: vnf's of this ns
2409 stage
[1] = "Getting vnfrs from db."
2410 self
.logger
.debug(logging_text
+ stage
[1])
2411 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2413 # read from db: vnfd's for every vnf
2414 db_vnfds
= [] # every vnfd data
2416 # for each vnf in ns, read vnfd
2417 for vnfr
in db_vnfrs_list
:
2418 if vnfr
.get("kdur"):
2420 for kdur
in vnfr
["kdur"]:
2421 if kdur
.get("additionalParams"):
2422 kdur
["additionalParams"] = json
.loads(
2423 kdur
["additionalParams"]
2425 kdur_list
.append(kdur
)
2426 vnfr
["kdur"] = kdur_list
2428 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2429 vnfd_id
= vnfr
["vnfd-id"]
2430 vnfd_ref
= vnfr
["vnfd-ref"]
2431 self
.fs
.sync(vnfd_id
)
2433 # if we haven't this vnfd, read it from db
2434 if vnfd_id
not in db_vnfds
:
2436 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2439 self
.logger
.debug(logging_text
+ stage
[1])
2440 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2443 db_vnfds
.append(vnfd
)
2445 # Get or generates the _admin.deployed.VCA list
2446 vca_deployed_list
= None
2447 if db_nsr
["_admin"].get("deployed"):
2448 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2449 if vca_deployed_list
is None:
2450 vca_deployed_list
= []
2451 configuration_status_list
= []
2452 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2453 db_nsr_update
["configurationStatus"] = configuration_status_list
2454 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2455 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2456 elif isinstance(vca_deployed_list
, dict):
2457 # maintain backward compatibility. Change a dict to list at database
2458 vca_deployed_list
= list(vca_deployed_list
.values())
2459 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2460 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2463 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2465 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2466 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2468 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2469 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2470 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2472 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2475 # n2vc_redesign STEP 2 Deploy Network Scenario
2476 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2477 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2479 stage
[1] = "Deploying KDUs."
2480 # self.logger.debug(logging_text + "Before deploy_kdus")
2481 # Call to deploy_kdus in case exists the "vdu:kdu" param
2482 await self
.deploy_kdus(
2483 logging_text
=logging_text
,
2485 nslcmop_id
=nslcmop_id
,
2488 task_instantiation_info
=tasks_dict_info
,
2491 stage
[1] = "Getting VCA public key."
2492 # n2vc_redesign STEP 1 Get VCA public ssh-key
2493 # feature 1429. Add n2vc public key to needed VMs
2494 n2vc_key
= self
.n2vc
.get_public_key()
2495 n2vc_key_list
= [n2vc_key
]
2496 if self
.vca_config
.get("public_key"):
2497 n2vc_key_list
.append(self
.vca_config
["public_key"])
2499 stage
[1] = "Deploying NS at VIM."
2500 task_ro
= asyncio
.ensure_future(
2501 self
.instantiate_RO(
2502 logging_text
=logging_text
,
2506 db_nslcmop
=db_nslcmop
,
2509 n2vc_key_list
=n2vc_key_list
,
2513 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2514 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2516 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2517 stage
[1] = "Deploying Execution Environments."
2518 self
.logger
.debug(logging_text
+ stage
[1])
2520 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2521 for vnf_profile
in get_vnf_profiles(nsd
):
2522 vnfd_id
= vnf_profile
["vnfd-id"]
2523 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2524 member_vnf_index
= str(vnf_profile
["id"])
2525 db_vnfr
= db_vnfrs
[member_vnf_index
]
2526 base_folder
= vnfd
["_admin"]["storage"]
2532 # Get additional parameters
2533 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2534 if db_vnfr
.get("additionalParamsForVnf"):
2535 deploy_params
.update(
2536 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2539 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2540 if descriptor_config
:
2542 logging_text
=logging_text
2543 + "member_vnf_index={} ".format(member_vnf_index
),
2546 nslcmop_id
=nslcmop_id
,
2552 member_vnf_index
=member_vnf_index
,
2553 vdu_index
=vdu_index
,
2555 deploy_params
=deploy_params
,
2556 descriptor_config
=descriptor_config
,
2557 base_folder
=base_folder
,
2558 task_instantiation_info
=tasks_dict_info
,
2562 # Deploy charms for each VDU that supports one.
2563 for vdud
in get_vdu_list(vnfd
):
2565 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2566 vdur
= find_in_list(
2567 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2570 if vdur
.get("additionalParams"):
2571 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2573 deploy_params_vdu
= deploy_params
2574 deploy_params_vdu
["OSM"] = get_osm_params(
2575 db_vnfr
, vdu_id
, vdu_count_index
=0
2577 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2579 self
.logger
.debug("VDUD > {}".format(vdud
))
2581 "Descriptor config > {}".format(descriptor_config
)
2583 if descriptor_config
:
2586 for vdu_index
in range(vdud_count
):
2587 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2589 logging_text
=logging_text
2590 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2591 member_vnf_index
, vdu_id
, vdu_index
2595 nslcmop_id
=nslcmop_id
,
2601 member_vnf_index
=member_vnf_index
,
2602 vdu_index
=vdu_index
,
2604 deploy_params
=deploy_params_vdu
,
2605 descriptor_config
=descriptor_config
,
2606 base_folder
=base_folder
,
2607 task_instantiation_info
=tasks_dict_info
,
2610 for kdud
in get_kdu_list(vnfd
):
2611 kdu_name
= kdud
["name"]
2612 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2613 if descriptor_config
:
2618 x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
2620 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2621 if kdur
.get("additionalParams"):
2622 deploy_params_kdu
.update(
2623 parse_yaml_strings(kdur
["additionalParams"].copy())
2627 logging_text
=logging_text
,
2630 nslcmop_id
=nslcmop_id
,
2636 member_vnf_index
=member_vnf_index
,
2637 vdu_index
=vdu_index
,
2639 deploy_params
=deploy_params_kdu
,
2640 descriptor_config
=descriptor_config
,
2641 base_folder
=base_folder
,
2642 task_instantiation_info
=tasks_dict_info
,
2646 # Check if this NS has a charm configuration
2647 descriptor_config
= nsd
.get("ns-configuration")
2648 if descriptor_config
and descriptor_config
.get("juju"):
2651 member_vnf_index
= None
2657 # Get additional parameters
2658 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2659 if db_nsr
.get("additionalParamsForNs"):
2660 deploy_params
.update(
2661 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2663 base_folder
= nsd
["_admin"]["storage"]
2665 logging_text
=logging_text
,
2668 nslcmop_id
=nslcmop_id
,
2674 member_vnf_index
=member_vnf_index
,
2675 vdu_index
=vdu_index
,
2677 deploy_params
=deploy_params
,
2678 descriptor_config
=descriptor_config
,
2679 base_folder
=base_folder
,
2680 task_instantiation_info
=tasks_dict_info
,
2684 # rest of staff will be done at finally
2687 ROclient
.ROClientException
,
2693 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
2696 except asyncio
.CancelledError
:
2698 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
2700 exc
= "Operation was cancelled"
2701 except Exception as e
:
2702 exc
= traceback
.format_exc()
2703 self
.logger
.critical(
2704 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
2709 error_list
.append(str(exc
))
2711 # wait for pending tasks
2713 stage
[1] = "Waiting for instantiate pending tasks."
2714 self
.logger
.debug(logging_text
+ stage
[1])
2715 error_list
+= await self
._wait
_for
_tasks
(
2723 stage
[1] = stage
[2] = ""
2724 except asyncio
.CancelledError
:
2725 error_list
.append("Cancelled")
2726 # TODO cancel all tasks
2727 except Exception as exc
:
2728 error_list
.append(str(exc
))
2730 # update operation-status
2731 db_nsr_update
["operational-status"] = "running"
2732 # let's begin with VCA 'configured' status (later we can change it)
2733 db_nsr_update
["config-status"] = "configured"
2734 for task
, task_name
in tasks_dict_info
.items():
2735 if not task
.done() or task
.cancelled() or task
.exception():
2736 if task_name
.startswith(self
.task_name_deploy_vca
):
2737 # A N2VC task is pending
2738 db_nsr_update
["config-status"] = "failed"
2740 # RO or KDU task is pending
2741 db_nsr_update
["operational-status"] = "failed"
2743 # update status at database
2745 error_detail
= ". ".join(error_list
)
2746 self
.logger
.error(logging_text
+ error_detail
)
2747 error_description_nslcmop
= "{} Detail: {}".format(
2748 stage
[0], error_detail
2750 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
2751 nslcmop_id
, stage
[0]
2754 db_nsr_update
["detailed-status"] = (
2755 error_description_nsr
+ " Detail: " + error_detail
2757 db_nslcmop_update
["detailed-status"] = error_detail
2758 nslcmop_operation_state
= "FAILED"
2762 error_description_nsr
= error_description_nslcmop
= None
2764 db_nsr_update
["detailed-status"] = "Done"
2765 db_nslcmop_update
["detailed-status"] = "Done"
2766 nslcmop_operation_state
= "COMPLETED"
2769 self
._write
_ns
_status
(
2772 current_operation
="IDLE",
2773 current_operation_id
=None,
2774 error_description
=error_description_nsr
,
2775 error_detail
=error_detail
,
2776 other_update
=db_nsr_update
,
2778 self
._write
_op
_status
(
2781 error_message
=error_description_nslcmop
,
2782 operation_state
=nslcmop_operation_state
,
2783 other_update
=db_nslcmop_update
,
2786 if nslcmop_operation_state
:
2788 await self
.msg
.aiowrite(
2793 "nslcmop_id": nslcmop_id
,
2794 "operationState": nslcmop_operation_state
,
2798 except Exception as e
:
2800 logging_text
+ "kafka_write notification Exception {}".format(e
)
2803 self
.logger
.debug(logging_text
+ "Exit")
2804 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2806 def _get_vnfd(self
, vnfd_id
: str, cached_vnfds
: Dict
[str, Any
]):
2807 if vnfd_id
not in cached_vnfds
:
2808 cached_vnfds
[vnfd_id
] = self
.db
.get_one("vnfds", {"id": vnfd_id
})
2809 return cached_vnfds
[vnfd_id
]
2811 def _get_vnfr(self
, nsr_id
: str, vnf_profile_id
: str, cached_vnfrs
: Dict
[str, Any
]):
2812 if vnf_profile_id
not in cached_vnfrs
:
2813 cached_vnfrs
[vnf_profile_id
] = self
.db
.get_one(
2816 "member-vnf-index-ref": vnf_profile_id
,
2817 "nsr-id-ref": nsr_id
,
2820 return cached_vnfrs
[vnf_profile_id
]
2822 def _is_deployed_vca_in_relation(
2823 self
, vca
: DeployedVCA
, relation
: Relation
2826 for endpoint
in (relation
.provider
, relation
.requirer
):
2827 if endpoint
["kdu-resource-profile-id"]:
2830 vca
.vnf_profile_id
== endpoint
.vnf_profile_id
2831 and vca
.vdu_profile_id
== endpoint
.vdu_profile_id
2832 and vca
.execution_environment_ref
== endpoint
.execution_environment_ref
2838 def _update_ee_relation_data_with_implicit_data(
2839 self
, nsr_id
, nsd
, ee_relation_data
, cached_vnfds
, vnf_profile_id
: str = None
2841 ee_relation_data
= safe_get_ee_relation(
2842 nsr_id
, ee_relation_data
, vnf_profile_id
=vnf_profile_id
2844 ee_relation_level
= EELevel
.get_level(ee_relation_data
)
2845 if (ee_relation_level
in (EELevel
.VNF
, EELevel
.VDU
)) and not ee_relation_data
[
2846 "execution-environment-ref"
2848 vnf_profile
= get_vnf_profile(nsd
, ee_relation_data
["vnf-profile-id"])
2849 vnfd_id
= vnf_profile
["vnfd-id"]
2850 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2853 if ee_relation_level
== EELevel
.VNF
2854 else ee_relation_data
["vdu-profile-id"]
2856 ee
= get_juju_ee_ref(db_vnfd
, entity_id
)
2859 f
"not execution environments found for ee_relation {ee_relation_data}"
2861 ee_relation_data
["execution-environment-ref"] = ee
["id"]
2862 return ee_relation_data
2864 def _get_ns_relations(
2867 nsd
: Dict
[str, Any
],
2869 cached_vnfds
: Dict
[str, Any
],
2870 ) -> List
[Relation
]:
2872 db_ns_relations
= get_ns_configuration_relation_list(nsd
)
2873 for r
in db_ns_relations
:
2874 provider_dict
= None
2875 requirer_dict
= None
2876 if all(key
in r
for key
in ("provider", "requirer")):
2877 provider_dict
= r
["provider"]
2878 requirer_dict
= r
["requirer"]
2879 elif "entities" in r
:
2880 provider_id
= r
["entities"][0]["id"]
2883 "endpoint": r
["entities"][0]["endpoint"],
2885 if provider_id
!= nsd
["id"]:
2886 provider_dict
["vnf-profile-id"] = provider_id
2887 requirer_id
= r
["entities"][1]["id"]
2890 "endpoint": r
["entities"][1]["endpoint"],
2892 if requirer_id
!= nsd
["id"]:
2893 requirer_dict
["vnf-profile-id"] = requirer_id
2896 "provider/requirer or entities must be included in the relation."
2898 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2899 nsr_id
, nsd
, provider_dict
, cached_vnfds
2901 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2902 nsr_id
, nsd
, requirer_dict
, cached_vnfds
2904 provider
= EERelation(relation_provider
)
2905 requirer
= EERelation(relation_requirer
)
2906 relation
= Relation(r
["name"], provider
, requirer
)
2907 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
2909 relations
.append(relation
)
2912 def _get_vnf_relations(
2915 nsd
: Dict
[str, Any
],
2917 cached_vnfds
: Dict
[str, Any
],
2918 ) -> List
[Relation
]:
2920 vnf_profile
= get_vnf_profile(nsd
, vca
.vnf_profile_id
)
2921 vnf_profile_id
= vnf_profile
["id"]
2922 vnfd_id
= vnf_profile
["vnfd-id"]
2923 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2924 db_vnf_relations
= get_relation_list(db_vnfd
, vnfd_id
)
2925 for r
in db_vnf_relations
:
2926 provider_dict
= None
2927 requirer_dict
= None
2928 if all(key
in r
for key
in ("provider", "requirer")):
2929 provider_dict
= r
["provider"]
2930 requirer_dict
= r
["requirer"]
2931 elif "entities" in r
:
2932 provider_id
= r
["entities"][0]["id"]
2935 "vnf-profile-id": vnf_profile_id
,
2936 "endpoint": r
["entities"][0]["endpoint"],
2938 if provider_id
!= vnfd_id
:
2939 provider_dict
["vdu-profile-id"] = provider_id
2940 requirer_id
= r
["entities"][1]["id"]
2943 "vnf-profile-id": vnf_profile_id
,
2944 "endpoint": r
["entities"][1]["endpoint"],
2946 if requirer_id
!= vnfd_id
:
2947 requirer_dict
["vdu-profile-id"] = requirer_id
2950 "provider/requirer or entities must be included in the relation."
2952 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2953 nsr_id
, nsd
, provider_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
2955 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2956 nsr_id
, nsd
, requirer_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
2958 provider
= EERelation(relation_provider
)
2959 requirer
= EERelation(relation_requirer
)
2960 relation
= Relation(r
["name"], provider
, requirer
)
2961 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
2963 relations
.append(relation
)
2966 def _get_kdu_resource_data(
2968 ee_relation
: EERelation
,
2969 db_nsr
: Dict
[str, Any
],
2970 cached_vnfds
: Dict
[str, Any
],
2971 ) -> DeployedK8sResource
:
2972 nsd
= get_nsd(db_nsr
)
2973 vnf_profiles
= get_vnf_profiles(nsd
)
2974 vnfd_id
= find_in_list(
2976 lambda vnf_profile
: vnf_profile
["id"] == ee_relation
.vnf_profile_id
,
2978 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2979 kdu_resource_profile
= get_kdu_resource_profile(
2980 db_vnfd
, ee_relation
.kdu_resource_profile_id
2982 kdu_name
= kdu_resource_profile
["kdu-name"]
2983 deployed_kdu
, _
= get_deployed_kdu(
2984 db_nsr
.get("_admin", ()).get("deployed", ()),
2986 ee_relation
.vnf_profile_id
,
2988 deployed_kdu
.update({"resource-name": kdu_resource_profile
["resource-name"]})
2991 def _get_deployed_component(
2993 ee_relation
: EERelation
,
2994 db_nsr
: Dict
[str, Any
],
2995 cached_vnfds
: Dict
[str, Any
],
2996 ) -> DeployedComponent
:
2997 nsr_id
= db_nsr
["_id"]
2998 deployed_component
= None
2999 ee_level
= EELevel
.get_level(ee_relation
)
3000 if ee_level
== EELevel
.NS
:
3001 vca
= get_deployed_vca(db_nsr
, {"vdu_id": None, "member-vnf-index": None})
3003 deployed_component
= DeployedVCA(nsr_id
, vca
)
3004 elif ee_level
== EELevel
.VNF
:
3005 vca
= get_deployed_vca(
3009 "member-vnf-index": ee_relation
.vnf_profile_id
,
3010 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3014 deployed_component
= DeployedVCA(nsr_id
, vca
)
3015 elif ee_level
== EELevel
.VDU
:
3016 vca
= get_deployed_vca(
3019 "vdu_id": ee_relation
.vdu_profile_id
,
3020 "member-vnf-index": ee_relation
.vnf_profile_id
,
3021 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3025 deployed_component
= DeployedVCA(nsr_id
, vca
)
3026 elif ee_level
== EELevel
.KDU
:
3027 kdu_resource_data
= self
._get
_kdu
_resource
_data
(
3028 ee_relation
, db_nsr
, cached_vnfds
3030 if kdu_resource_data
:
3031 deployed_component
= DeployedK8sResource(kdu_resource_data
)
3032 return deployed_component
3034 async def _add_relation(
3038 db_nsr
: Dict
[str, Any
],
3039 cached_vnfds
: Dict
[str, Any
],
3040 cached_vnfrs
: Dict
[str, Any
],
3042 deployed_provider
= self
._get
_deployed
_component
(
3043 relation
.provider
, db_nsr
, cached_vnfds
3045 deployed_requirer
= self
._get
_deployed
_component
(
3046 relation
.requirer
, db_nsr
, cached_vnfds
3050 and deployed_requirer
3051 and deployed_provider
.config_sw_installed
3052 and deployed_requirer
.config_sw_installed
3054 provider_db_vnfr
= (
3056 relation
.provider
.nsr_id
,
3057 relation
.provider
.vnf_profile_id
,
3060 if relation
.provider
.vnf_profile_id
3063 requirer_db_vnfr
= (
3065 relation
.requirer
.nsr_id
,
3066 relation
.requirer
.vnf_profile_id
,
3069 if relation
.requirer
.vnf_profile_id
3072 provider_vca_id
= self
.get_vca_id(provider_db_vnfr
, db_nsr
)
3073 requirer_vca_id
= self
.get_vca_id(requirer_db_vnfr
, db_nsr
)
3074 provider_relation_endpoint
= RelationEndpoint(
3075 deployed_provider
.ee_id
,
3077 relation
.provider
.endpoint
,
3079 requirer_relation_endpoint
= RelationEndpoint(
3080 deployed_requirer
.ee_id
,
3082 relation
.requirer
.endpoint
,
3084 await self
.vca_map
[vca_type
].add_relation(
3085 provider
=provider_relation_endpoint
,
3086 requirer
=requirer_relation_endpoint
,
3088 # remove entry from relations list
3092 async def _add_vca_relations(
3098 timeout
: int = 3600,
3102 # 1. find all relations for this VCA
3103 # 2. wait for other peers related
3107 # STEP 1: find all relations for this VCA
3110 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3111 nsd
= get_nsd(db_nsr
)
3114 deployed_vca_dict
= get_deployed_vca_list(db_nsr
)[vca_index
]
3115 my_vca
= DeployedVCA(nsr_id
, deployed_vca_dict
)
3120 relations
.extend(self
._get
_ns
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3121 relations
.extend(self
._get
_vnf
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3123 # if no relations, terminate
3125 self
.logger
.debug(logging_text
+ " No relations")
3128 self
.logger
.debug(logging_text
+ " adding relations {}".format(relations
))
3135 if now
- start
>= timeout
:
3136 self
.logger
.error(logging_text
+ " : timeout adding relations")
3139 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3140 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3142 # for each relation, find the VCA's related
3143 for relation
in relations
.copy():
3144 added
= await self
._add
_relation
(
3152 relations
.remove(relation
)
3155 self
.logger
.debug("Relations added")
3157 await asyncio
.sleep(5.0)
3161 except Exception as e
:
3162 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
3165 async def _install_kdu(
3173 k8s_instance_info
: dict,
3174 k8params
: dict = None,
3180 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
3183 "collection": "nsrs",
3184 "filter": {"_id": nsr_id
},
3185 "path": nsr_db_path
,
3188 if k8s_instance_info
.get("kdu-deployment-name"):
3189 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
3191 kdu_instance
= self
.k8scluster_map
[
3193 ].generate_kdu_instance_name(
3194 db_dict
=db_dict_install
,
3195 kdu_model
=k8s_instance_info
["kdu-model"],
3196 kdu_name
=k8s_instance_info
["kdu-name"],
3199 "nsrs", nsr_id
, {nsr_db_path
+ ".kdu-instance": kdu_instance
}
3201 await self
.k8scluster_map
[k8sclustertype
].install(
3202 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3203 kdu_model
=k8s_instance_info
["kdu-model"],
3206 db_dict
=db_dict_install
,
3208 kdu_name
=k8s_instance_info
["kdu-name"],
3209 namespace
=k8s_instance_info
["namespace"],
3210 kdu_instance
=kdu_instance
,
3214 "nsrs", nsr_id
, {nsr_db_path
+ ".kdu-instance": kdu_instance
}
3217 # Obtain services to obtain management service ip
3218 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
3219 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3220 kdu_instance
=kdu_instance
,
3221 namespace
=k8s_instance_info
["namespace"],
3224 # Obtain management service info (if exists)
3225 vnfr_update_dict
= {}
3226 kdu_config
= get_configuration(vnfd
, kdud
["name"])
3228 target_ee_list
= kdu_config
.get("execution-environment-list", [])
3233 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
3236 for service
in kdud
.get("service", [])
3237 if service
.get("mgmt-service")
3239 for mgmt_service
in mgmt_services
:
3240 for service
in services
:
3241 if service
["name"].startswith(mgmt_service
["name"]):
3242 # Mgmt service found, Obtain service ip
3243 ip
= service
.get("external_ip", service
.get("cluster_ip"))
3244 if isinstance(ip
, list) and len(ip
) == 1:
3248 "kdur.{}.ip-address".format(kdu_index
)
3251 # Check if must update also mgmt ip at the vnf
3252 service_external_cp
= mgmt_service
.get(
3253 "external-connection-point-ref"
3255 if service_external_cp
:
3257 deep_get(vnfd
, ("mgmt-interface", "cp"))
3258 == service_external_cp
3260 vnfr_update_dict
["ip-address"] = ip
3265 "external-connection-point-ref", ""
3267 == service_external_cp
,
3270 "kdur.{}.ip-address".format(kdu_index
)
3275 "Mgmt service name: {} not found".format(
3276 mgmt_service
["name"]
3280 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3281 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3283 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3286 and kdu_config
.get("initial-config-primitive")
3287 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3289 initial_config_primitive_list
= kdu_config
.get(
3290 "initial-config-primitive"
3292 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3294 for initial_config_primitive
in initial_config_primitive_list
:
3295 primitive_params_
= self
._map
_primitive
_params
(
3296 initial_config_primitive
, {}, {}
3299 await asyncio
.wait_for(
3300 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3301 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3302 kdu_instance
=kdu_instance
,
3303 primitive_name
=initial_config_primitive
["name"],
3304 params
=primitive_params_
,
3305 db_dict
=db_dict_install
,
3311 except Exception as e
:
3312 # Prepare update db with error and raise exception
3315 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3319 vnfr_data
.get("_id"),
3320 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3323 # ignore to keep original exception
3325 # reraise original error
3330 async def deploy_kdus(
3337 task_instantiation_info
,
3339 # Launch kdus if present in the descriptor
3341 k8scluster_id_2_uuic
= {
3342 "helm-chart-v3": {},
3347 async def _get_cluster_id(cluster_id
, cluster_type
):
3348 nonlocal k8scluster_id_2_uuic
3349 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3350 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3352 # check if K8scluster is creating and wait look if previous tasks in process
3353 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3354 "k8scluster", cluster_id
3357 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3358 task_name
, cluster_id
3360 self
.logger
.debug(logging_text
+ text
)
3361 await asyncio
.wait(task_dependency
, timeout
=3600)
3363 db_k8scluster
= self
.db
.get_one(
3364 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3366 if not db_k8scluster
:
3367 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3369 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3371 if cluster_type
== "helm-chart-v3":
3373 # backward compatibility for existing clusters that have not been initialized for helm v3
3374 k8s_credentials
= yaml
.safe_dump(
3375 db_k8scluster
.get("credentials")
3377 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3378 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3380 db_k8scluster_update
= {}
3381 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3382 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3383 db_k8scluster_update
[
3384 "_admin.helm-chart-v3.created"
3386 db_k8scluster_update
[
3387 "_admin.helm-chart-v3.operationalState"
3390 "k8sclusters", cluster_id
, db_k8scluster_update
3392 except Exception as e
:
3395 + "error initializing helm-v3 cluster: {}".format(str(e
))
3398 "K8s cluster '{}' has not been initialized for '{}'".format(
3399 cluster_id
, cluster_type
3404 "K8s cluster '{}' has not been initialized for '{}'".format(
3405 cluster_id
, cluster_type
3408 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3411 logging_text
+= "Deploy kdus: "
3414 db_nsr_update
= {"_admin.deployed.K8s": []}
3415 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3418 updated_cluster_list
= []
3419 updated_v3_cluster_list
= []
3421 for vnfr_data
in db_vnfrs
.values():
3422 vca_id
= self
.get_vca_id(vnfr_data
, {})
3423 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3424 # Step 0: Prepare and set parameters
3425 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3426 vnfd_id
= vnfr_data
.get("vnfd-id")
3427 vnfd_with_id
= find_in_list(
3428 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3432 for kdud
in vnfd_with_id
["kdu"]
3433 if kdud
["name"] == kdur
["kdu-name"]
3435 namespace
= kdur
.get("k8s-namespace")
3436 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3437 if kdur
.get("helm-chart"):
3438 kdumodel
= kdur
["helm-chart"]
3439 # Default version: helm3, if helm-version is v2 assign v2
3440 k8sclustertype
= "helm-chart-v3"
3441 self
.logger
.debug("kdur: {}".format(kdur
))
3443 kdur
.get("helm-version")
3444 and kdur
.get("helm-version") == "v2"
3446 k8sclustertype
= "helm-chart"
3447 elif kdur
.get("juju-bundle"):
3448 kdumodel
= kdur
["juju-bundle"]
3449 k8sclustertype
= "juju-bundle"
3452 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3453 "juju-bundle. Maybe an old NBI version is running".format(
3454 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3457 # check if kdumodel is a file and exists
3459 vnfd_with_id
= find_in_list(
3460 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3462 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3463 if storage
: # may be not present if vnfd has not artifacts
3464 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3465 if storage
["pkg-dir"]:
3466 filename
= "{}/{}/{}s/{}".format(
3473 filename
= "{}/Scripts/{}s/{}".format(
3478 if self
.fs
.file_exists(
3479 filename
, mode
="file"
3480 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3481 kdumodel
= self
.fs
.path
+ filename
3482 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3484 except Exception: # it is not a file
3487 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3488 step
= "Synchronize repos for k8s cluster '{}'".format(
3491 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3495 k8sclustertype
== "helm-chart"
3496 and cluster_uuid
not in updated_cluster_list
3498 k8sclustertype
== "helm-chart-v3"
3499 and cluster_uuid
not in updated_v3_cluster_list
3501 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3502 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3503 cluster_uuid
=cluster_uuid
3506 if del_repo_list
or added_repo_dict
:
3507 if k8sclustertype
== "helm-chart":
3509 "_admin.helm_charts_added." + item
: None
3510 for item
in del_repo_list
3513 "_admin.helm_charts_added." + item
: name
3514 for item
, name
in added_repo_dict
.items()
3516 updated_cluster_list
.append(cluster_uuid
)
3517 elif k8sclustertype
== "helm-chart-v3":
3519 "_admin.helm_charts_v3_added." + item
: None
3520 for item
in del_repo_list
3523 "_admin.helm_charts_v3_added." + item
: name
3524 for item
, name
in added_repo_dict
.items()
3526 updated_v3_cluster_list
.append(cluster_uuid
)
3528 logging_text
+ "repos synchronized on k8s cluster "
3529 "'{}' to_delete: {}, to_add: {}".format(
3530 k8s_cluster_id
, del_repo_list
, added_repo_dict
3535 {"_id": k8s_cluster_id
},
3541 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3542 vnfr_data
["member-vnf-index-ref"],
3546 k8s_instance_info
= {
3547 "kdu-instance": None,
3548 "k8scluster-uuid": cluster_uuid
,
3549 "k8scluster-type": k8sclustertype
,
3550 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3551 "kdu-name": kdur
["kdu-name"],
3552 "kdu-model": kdumodel
,
3553 "namespace": namespace
,
3554 "kdu-deployment-name": kdu_deployment_name
,
3556 db_path
= "_admin.deployed.K8s.{}".format(index
)
3557 db_nsr_update
[db_path
] = k8s_instance_info
3558 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3559 vnfd_with_id
= find_in_list(
3560 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3562 task
= asyncio
.ensure_future(
3571 k8params
=desc_params
,
3576 self
.lcm_tasks
.register(
3580 "instantiate_KDU-{}".format(index
),
3583 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3589 except (LcmException
, asyncio
.CancelledError
):
3591 except Exception as e
:
3592 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3593 if isinstance(e
, (N2VCException
, DbException
)):
3594 self
.logger
.error(logging_text
+ msg
)
3596 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3597 raise LcmException(msg
)
3600 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3619 task_instantiation_info
,
3622 # launch instantiate_N2VC in a asyncio task and register task object
3623 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3624 # if not found, create one entry and update database
3625 # fill db_nsr._admin.deployed.VCA.<index>
3628 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
3630 if "execution-environment-list" in descriptor_config
:
3631 ee_list
= descriptor_config
.get("execution-environment-list", [])
3632 elif "juju" in descriptor_config
:
3633 ee_list
= [descriptor_config
] # ns charms
3634 else: # other types as script are not supported
3637 for ee_item
in ee_list
:
3640 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3641 ee_item
.get("juju"), ee_item
.get("helm-chart")
3644 ee_descriptor_id
= ee_item
.get("id")
3645 if ee_item
.get("juju"):
3646 vca_name
= ee_item
["juju"].get("charm")
3649 if ee_item
["juju"].get("charm") is not None
3652 if ee_item
["juju"].get("cloud") == "k8s":
3653 vca_type
= "k8s_proxy_charm"
3654 elif ee_item
["juju"].get("proxy") is False:
3655 vca_type
= "native_charm"
3656 elif ee_item
.get("helm-chart"):
3657 vca_name
= ee_item
["helm-chart"]
3658 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
3661 vca_type
= "helm-v3"
3664 logging_text
+ "skipping non juju neither charm configuration"
3669 for vca_index
, vca_deployed
in enumerate(
3670 db_nsr
["_admin"]["deployed"]["VCA"]
3672 if not vca_deployed
:
3675 vca_deployed
.get("member-vnf-index") == member_vnf_index
3676 and vca_deployed
.get("vdu_id") == vdu_id
3677 and vca_deployed
.get("kdu_name") == kdu_name
3678 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
3679 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
3683 # not found, create one.
3685 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
3688 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
3690 target
+= "/kdu/{}".format(kdu_name
)
3692 "target_element": target
,
3693 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3694 "member-vnf-index": member_vnf_index
,
3696 "kdu_name": kdu_name
,
3697 "vdu_count_index": vdu_index
,
3698 "operational-status": "init", # TODO revise
3699 "detailed-status": "", # TODO revise
3700 "step": "initial-deploy", # TODO revise
3702 "vdu_name": vdu_name
,
3704 "ee_descriptor_id": ee_descriptor_id
,
3708 # create VCA and configurationStatus in db
3710 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
3711 "configurationStatus.{}".format(vca_index
): dict(),
3713 self
.update_db_2("nsrs", nsr_id
, db_dict
)
3715 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
3717 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
3718 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
3719 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
3722 task_n2vc
= asyncio
.ensure_future(
3723 self
.instantiate_N2VC(
3724 logging_text
=logging_text
,
3725 vca_index
=vca_index
,
3731 vdu_index
=vdu_index
,
3732 deploy_params
=deploy_params
,
3733 config_descriptor
=descriptor_config
,
3734 base_folder
=base_folder
,
3735 nslcmop_id
=nslcmop_id
,
3739 ee_config_descriptor
=ee_item
,
3742 self
.lcm_tasks
.register(
3746 "instantiate_N2VC-{}".format(vca_index
),
3749 task_instantiation_info
[
3751 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
3752 member_vnf_index
or "", vdu_id
or ""
3756 def _create_nslcmop(nsr_id
, operation
, params
):
3758 Creates a ns-lcm-opp content to be stored at database.
3759 :param nsr_id: internal id of the instance
3760 :param operation: instantiate, terminate, scale, action, ...
3761 :param params: user parameters for the operation
3762 :return: dictionary following SOL005 format
3764 # Raise exception if invalid arguments
3765 if not (nsr_id
and operation
and params
):
3767 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3774 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3775 "operationState": "PROCESSING",
3776 "statusEnteredTime": now
,
3777 "nsInstanceId": nsr_id
,
3778 "lcmOperationType": operation
,
3780 "isAutomaticInvocation": False,
3781 "operationParams": params
,
3782 "isCancelPending": False,
3784 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
3785 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
3790 def _format_additional_params(self
, params
):
3791 params
= params
or {}
3792 for key
, value
in params
.items():
3793 if str(value
).startswith("!!yaml "):
3794 params
[key
] = yaml
.safe_load(value
[7:])
3797 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
3798 primitive
= seq
.get("name")
3799 primitive_params
= {}
3801 "member_vnf_index": vnf_index
,
3802 "primitive": primitive
,
3803 "primitive_params": primitive_params
,
3806 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
3810 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
3811 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
3812 if op
.get("operationState") == "COMPLETED":
3813 # b. Skip sub-operation
3814 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3815 return self
.SUBOPERATION_STATUS_SKIP
3817 # c. retry executing sub-operation
3818 # The sub-operation exists, and operationState != 'COMPLETED'
3819 # Update operationState = 'PROCESSING' to indicate a retry.
3820 operationState
= "PROCESSING"
3821 detailed_status
= "In progress"
3822 self
._update
_suboperation
_status
(
3823 db_nslcmop
, op_index
, operationState
, detailed_status
3825 # Return the sub-operation index
3826 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3827 # with arguments extracted from the sub-operation
3830 # Find a sub-operation where all keys in a matching dictionary must match
3831 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3832 def _find_suboperation(self
, db_nslcmop
, match
):
3833 if db_nslcmop
and match
:
3834 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
3835 for i
, op
in enumerate(op_list
):
3836 if all(op
.get(k
) == match
[k
] for k
in match
):
3838 return self
.SUBOPERATION_STATUS_NOT_FOUND
3840 # Update status for a sub-operation given its index
3841 def _update_suboperation_status(
3842 self
, db_nslcmop
, op_index
, operationState
, detailed_status
3844 # Update DB for HA tasks
3845 q_filter
= {"_id": db_nslcmop
["_id"]}
3847 "_admin.operations.{}.operationState".format(op_index
): operationState
,
3848 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
3851 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
3854 # Add sub-operation, return the index of the added sub-operation
3855 # Optionally, set operationState, detailed-status, and operationType
3856 # Status and type are currently set for 'scale' sub-operations:
3857 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3858 # 'detailed-status' : status message
3859 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3860 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3861 def _add_suboperation(
3869 mapped_primitive_params
,
3870 operationState
=None,
3871 detailed_status
=None,
3874 RO_scaling_info
=None,
3877 return self
.SUBOPERATION_STATUS_NOT_FOUND
3878 # Get the "_admin.operations" list, if it exists
3879 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
3880 op_list
= db_nslcmop_admin
.get("operations")
3881 # Create or append to the "_admin.operations" list
3883 "member_vnf_index": vnf_index
,
3885 "vdu_count_index": vdu_count_index
,
3886 "primitive": primitive
,
3887 "primitive_params": mapped_primitive_params
,
3890 new_op
["operationState"] = operationState
3892 new_op
["detailed-status"] = detailed_status
3894 new_op
["lcmOperationType"] = operationType
3896 new_op
["RO_nsr_id"] = RO_nsr_id
3898 new_op
["RO_scaling_info"] = RO_scaling_info
3900 # No existing operations, create key 'operations' with current operation as first list element
3901 db_nslcmop_admin
.update({"operations": [new_op
]})
3902 op_list
= db_nslcmop_admin
.get("operations")
3904 # Existing operations, append operation to list
3905 op_list
.append(new_op
)
3907 db_nslcmop_update
= {"_admin.operations": op_list
}
3908 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
3909 op_index
= len(op_list
) - 1
3912 # Helper methods for scale() sub-operations
3914 # pre-scale/post-scale:
3915 # Check for 3 different cases:
3916 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
3917 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
3918 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
3919 def _check_or_add_scale_suboperation(
3923 vnf_config_primitive
,
3927 RO_scaling_info
=None,
3929 # Find this sub-operation
3930 if RO_nsr_id
and RO_scaling_info
:
3931 operationType
= "SCALE-RO"
3933 "member_vnf_index": vnf_index
,
3934 "RO_nsr_id": RO_nsr_id
,
3935 "RO_scaling_info": RO_scaling_info
,
3939 "member_vnf_index": vnf_index
,
3940 "primitive": vnf_config_primitive
,
3941 "primitive_params": primitive_params
,
3942 "lcmOperationType": operationType
,
3944 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
3945 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
3946 # a. New sub-operation
3947 # The sub-operation does not exist, add it.
3948 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
3949 # The following parameters are set to None for all kind of scaling:
3951 vdu_count_index
= None
3953 if RO_nsr_id
and RO_scaling_info
:
3954 vnf_config_primitive
= None
3955 primitive_params
= None
3958 RO_scaling_info
= None
3959 # Initial status for sub-operation
3960 operationState
= "PROCESSING"
3961 detailed_status
= "In progress"
3962 # Add sub-operation for pre/post-scaling (zero or more operations)
3963 self
._add
_suboperation
(
3969 vnf_config_primitive
,
3977 return self
.SUBOPERATION_STATUS_NEW
3979 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3980 # or op_index (operationState != 'COMPLETED')
3981 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
3983 # Function to return execution_environment id
3985 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
3986 # TODO vdu_index_count
3987 for vca
in vca_deployed_list
:
3988 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
3991 async def destroy_N2VC(
3999 exec_primitives
=True,
4004 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
4005 :param logging_text:
4007 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
4008 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
4009 :param vca_index: index in the database _admin.deployed.VCA
4010 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
4011 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
4012 not executed properly
4013 :param scaling_in: True destroys the application, False destroys the model
4014 :return: None or exception
4019 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
4020 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
4024 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
4026 # execute terminate_primitives
4028 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
4029 config_descriptor
.get("terminate-config-primitive"),
4030 vca_deployed
.get("ee_descriptor_id"),
4032 vdu_id
= vca_deployed
.get("vdu_id")
4033 vdu_count_index
= vca_deployed
.get("vdu_count_index")
4034 vdu_name
= vca_deployed
.get("vdu_name")
4035 vnf_index
= vca_deployed
.get("member-vnf-index")
4036 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
4037 for seq
in terminate_primitives
:
4038 # For each sequence in list, get primitive and call _ns_execute_primitive()
4039 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
4040 vnf_index
, seq
.get("name")
4042 self
.logger
.debug(logging_text
+ step
)
4043 # Create the primitive for each sequence, i.e. "primitive": "touch"
4044 primitive
= seq
.get("name")
4045 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
4050 self
._add
_suboperation
(
4057 mapped_primitive_params
,
4059 # Sub-operations: Call _ns_execute_primitive() instead of action()
4061 result
, result_detail
= await self
._ns
_execute
_primitive
(
4062 vca_deployed
["ee_id"],
4064 mapped_primitive_params
,
4068 except LcmException
:
4069 # this happens when VCA is not deployed. In this case it is not needed to terminate
4071 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
4072 if result
not in result_ok
:
4074 "terminate_primitive {} for vnf_member_index={} fails with "
4075 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
4077 # set that this VCA do not need terminated
4078 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
4082 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
4085 # Delete Prometheus Jobs if any
4086 # This uses NSR_ID, so it will destroy any jobs under this index
4087 self
.db
.del_list("prometheus_jobs", {"nsr_id": db_nslcmop
["nsInstanceId"]})
4090 await self
.vca_map
[vca_type
].delete_execution_environment(
4091 vca_deployed
["ee_id"],
4092 scaling_in
=scaling_in
,
4097 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
4098 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
4099 namespace
= "." + db_nsr
["_id"]
4101 await self
.n2vc
.delete_namespace(
4102 namespace
=namespace
,
4103 total_timeout
=self
.timeout_charm_delete
,
4106 except N2VCNotFound
: # already deleted. Skip
4108 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
4110 async def _terminate_RO(
4111 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4114 Terminates a deployment from RO
4115 :param logging_text:
4116 :param nsr_deployed: db_nsr._admin.deployed
4119 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
4120 this method will update only the index 2, but it will write on database the concatenated content of the list
4125 ro_nsr_id
= ro_delete_action
= None
4126 if nsr_deployed
and nsr_deployed
.get("RO"):
4127 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
4128 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
4131 stage
[2] = "Deleting ns from VIM."
4132 db_nsr_update
["detailed-status"] = " ".join(stage
)
4133 self
._write
_op
_status
(nslcmop_id
, stage
)
4134 self
.logger
.debug(logging_text
+ stage
[2])
4135 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4136 self
._write
_op
_status
(nslcmop_id
, stage
)
4137 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
4138 ro_delete_action
= desc
["action_id"]
4140 "_admin.deployed.RO.nsr_delete_action_id"
4141 ] = ro_delete_action
4142 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4143 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4144 if ro_delete_action
:
4145 # wait until NS is deleted from VIM
4146 stage
[2] = "Waiting ns deleted from VIM."
4147 detailed_status_old
= None
4151 + " RO_id={} ro_delete_action={}".format(
4152 ro_nsr_id
, ro_delete_action
4155 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4156 self
._write
_op
_status
(nslcmop_id
, stage
)
4158 delete_timeout
= 20 * 60 # 20 minutes
4159 while delete_timeout
> 0:
4160 desc
= await self
.RO
.show(
4162 item_id_name
=ro_nsr_id
,
4163 extra_item
="action",
4164 extra_item_id
=ro_delete_action
,
4168 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
4170 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
4171 if ns_status
== "ERROR":
4172 raise ROclient
.ROClientException(ns_status_info
)
4173 elif ns_status
== "BUILD":
4174 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
4175 elif ns_status
== "ACTIVE":
4176 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4177 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4182 ), "ROclient.check_action_status returns unknown {}".format(
4185 if stage
[2] != detailed_status_old
:
4186 detailed_status_old
= stage
[2]
4187 db_nsr_update
["detailed-status"] = " ".join(stage
)
4188 self
._write
_op
_status
(nslcmop_id
, stage
)
4189 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4190 await asyncio
.sleep(5, loop
=self
.loop
)
4192 else: # delete_timeout <= 0:
4193 raise ROclient
.ROClientException(
4194 "Timeout waiting ns deleted from VIM"
4197 except Exception as e
:
4198 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4200 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4202 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4203 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4204 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4206 logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
)
4209 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4211 failed_detail
.append("delete conflict: {}".format(e
))
4214 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
)
4217 failed_detail
.append("delete error: {}".format(e
))
4219 logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
)
4223 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
4224 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
4226 stage
[2] = "Deleting nsd from RO."
4227 db_nsr_update
["detailed-status"] = " ".join(stage
)
4228 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4229 self
._write
_op
_status
(nslcmop_id
, stage
)
4230 await self
.RO
.delete("nsd", ro_nsd_id
)
4232 logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
)
4234 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4235 except Exception as e
:
4237 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4239 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4241 logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
)
4244 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4246 failed_detail
.append(
4247 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
)
4249 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4251 failed_detail
.append(
4252 "ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
)
4254 self
.logger
.error(logging_text
+ failed_detail
[-1])
4256 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
4257 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
4258 if not vnf_deployed
or not vnf_deployed
["id"]:
4261 ro_vnfd_id
= vnf_deployed
["id"]
4264 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4265 vnf_deployed
["member-vnf-index"], ro_vnfd_id
4267 db_nsr_update
["detailed-status"] = " ".join(stage
)
4268 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4269 self
._write
_op
_status
(nslcmop_id
, stage
)
4270 await self
.RO
.delete("vnfd", ro_vnfd_id
)
4272 logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
)
4274 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
4275 except Exception as e
:
4277 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4280 "_admin.deployed.RO.vnfd.{}.id".format(index
)
4284 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
)
4287 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4289 failed_detail
.append(
4290 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
)
4292 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4294 failed_detail
.append(
4295 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
)
4297 self
.logger
.error(logging_text
+ failed_detail
[-1])
4300 stage
[2] = "Error deleting from VIM"
4302 stage
[2] = "Deleted from VIM"
4303 db_nsr_update
["detailed-status"] = " ".join(stage
)
4304 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4305 self
._write
_op
_status
(nslcmop_id
, stage
)
4308 raise LcmException("; ".join(failed_detail
))
4310 async def terminate(self
, nsr_id
, nslcmop_id
):
4311 # Try to lock HA task here
4312 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4313 if not task_is_locked_by_me
:
4316 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4317 self
.logger
.debug(logging_text
+ "Enter")
4318 timeout_ns_terminate
= self
.timeout_ns_terminate
4321 operation_params
= None
4323 error_list
= [] # annotates all failed error messages
4324 db_nslcmop_update
= {}
4325 autoremove
= False # autoremove after terminated
4326 tasks_dict_info
= {}
4329 "Stage 1/3: Preparing task.",
4330 "Waiting for previous operations to terminate.",
4333 # ^ contains [stage, step, VIM-status]
4335 # wait for any previous tasks in process
4336 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4338 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4339 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4340 operation_params
= db_nslcmop
.get("operationParams") or {}
4341 if operation_params
.get("timeout_ns_terminate"):
4342 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4343 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4344 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4346 db_nsr_update
["operational-status"] = "terminating"
4347 db_nsr_update
["config-status"] = "terminating"
4348 self
._write
_ns
_status
(
4350 ns_state
="TERMINATING",
4351 current_operation
="TERMINATING",
4352 current_operation_id
=nslcmop_id
,
4353 other_update
=db_nsr_update
,
4355 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4356 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4357 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4360 stage
[1] = "Getting vnf descriptors from db."
4361 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4363 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4365 db_vnfds_from_id
= {}
4366 db_vnfds_from_member_index
= {}
4368 for vnfr
in db_vnfrs_list
:
4369 vnfd_id
= vnfr
["vnfd-id"]
4370 if vnfd_id
not in db_vnfds_from_id
:
4371 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4372 db_vnfds_from_id
[vnfd_id
] = vnfd
4373 db_vnfds_from_member_index
[
4374 vnfr
["member-vnf-index-ref"]
4375 ] = db_vnfds_from_id
[vnfd_id
]
4377 # Destroy individual execution environments when there are terminating primitives.
4378 # Rest of EE will be deleted at once
4379 # TODO - check before calling _destroy_N2VC
4380 # if not operation_params.get("skip_terminate_primitives"):#
4381 # or not vca.get("needed_terminate"):
4382 stage
[0] = "Stage 2/3 execute terminating primitives."
4383 self
.logger
.debug(logging_text
+ stage
[0])
4384 stage
[1] = "Looking execution environment that needs terminate."
4385 self
.logger
.debug(logging_text
+ stage
[1])
4387 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4388 config_descriptor
= None
4389 vca_member_vnf_index
= vca
.get("member-vnf-index")
4390 vca_id
= self
.get_vca_id(
4391 db_vnfrs_dict
.get(vca_member_vnf_index
)
4392 if vca_member_vnf_index
4396 if not vca
or not vca
.get("ee_id"):
4398 if not vca
.get("member-vnf-index"):
4400 config_descriptor
= db_nsr
.get("ns-configuration")
4401 elif vca
.get("vdu_id"):
4402 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4403 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4404 elif vca
.get("kdu_name"):
4405 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4406 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4408 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4409 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4410 vca_type
= vca
.get("type")
4411 exec_terminate_primitives
= not operation_params
.get(
4412 "skip_terminate_primitives"
4413 ) and vca
.get("needed_terminate")
4414 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4415 # pending native charms
4417 True if vca_type
in ("helm", "helm-v3", "native_charm") else False
4419 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4420 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4421 task
= asyncio
.ensure_future(
4429 exec_terminate_primitives
,
4433 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4435 # wait for pending tasks of terminate primitives
4439 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4441 error_list
= await self
._wait
_for
_tasks
(
4444 min(self
.timeout_charm_delete
, timeout_ns_terminate
),
4448 tasks_dict_info
.clear()
4450 return # raise LcmException("; ".join(error_list))
4452 # remove All execution environments at once
4453 stage
[0] = "Stage 3/3 delete all."
4455 if nsr_deployed
.get("VCA"):
4456 stage
[1] = "Deleting all execution environments."
4457 self
.logger
.debug(logging_text
+ stage
[1])
4458 vca_id
= self
.get_vca_id({}, db_nsr
)
4459 task_delete_ee
= asyncio
.ensure_future(
4461 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4462 timeout
=self
.timeout_charm_delete
,
4465 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4466 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4468 # Delete from k8scluster
4469 stage
[1] = "Deleting KDUs."
4470 self
.logger
.debug(logging_text
+ stage
[1])
4471 # print(nsr_deployed)
4472 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4473 if not kdu
or not kdu
.get("kdu-instance"):
4475 kdu_instance
= kdu
.get("kdu-instance")
4476 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4477 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4478 vca_id
= self
.get_vca_id({}, db_nsr
)
4479 task_delete_kdu_instance
= asyncio
.ensure_future(
4480 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4481 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4482 kdu_instance
=kdu_instance
,
4489 + "Unknown k8s deployment type {}".format(
4490 kdu
.get("k8scluster-type")
4495 task_delete_kdu_instance
4496 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4499 stage
[1] = "Deleting ns from VIM."
4501 task_delete_ro
= asyncio
.ensure_future(
4502 self
._terminate
_ng
_ro
(
4503 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4507 task_delete_ro
= asyncio
.ensure_future(
4509 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4512 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4514 # rest of staff will be done at finally
4517 ROclient
.ROClientException
,
4522 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4524 except asyncio
.CancelledError
:
4526 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4528 exc
= "Operation was cancelled"
4529 except Exception as e
:
4530 exc
= traceback
.format_exc()
4531 self
.logger
.critical(
4532 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4537 error_list
.append(str(exc
))
4539 # wait for pending tasks
4541 stage
[1] = "Waiting for terminate pending tasks."
4542 self
.logger
.debug(logging_text
+ stage
[1])
4543 error_list
+= await self
._wait
_for
_tasks
(
4546 timeout_ns_terminate
,
4550 stage
[1] = stage
[2] = ""
4551 except asyncio
.CancelledError
:
4552 error_list
.append("Cancelled")
4553 # TODO cancell all tasks
4554 except Exception as exc
:
4555 error_list
.append(str(exc
))
4556 # update status at database
4558 error_detail
= "; ".join(error_list
)
4559 # self.logger.error(logging_text + error_detail)
4560 error_description_nslcmop
= "{} Detail: {}".format(
4561 stage
[0], error_detail
4563 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4564 nslcmop_id
, stage
[0]
4567 db_nsr_update
["operational-status"] = "failed"
4568 db_nsr_update
["detailed-status"] = (
4569 error_description_nsr
+ " Detail: " + error_detail
4571 db_nslcmop_update
["detailed-status"] = error_detail
4572 nslcmop_operation_state
= "FAILED"
4576 error_description_nsr
= error_description_nslcmop
= None
4577 ns_state
= "NOT_INSTANTIATED"
4578 db_nsr_update
["operational-status"] = "terminated"
4579 db_nsr_update
["detailed-status"] = "Done"
4580 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4581 db_nslcmop_update
["detailed-status"] = "Done"
4582 nslcmop_operation_state
= "COMPLETED"
4585 self
._write
_ns
_status
(
4588 current_operation
="IDLE",
4589 current_operation_id
=None,
4590 error_description
=error_description_nsr
,
4591 error_detail
=error_detail
,
4592 other_update
=db_nsr_update
,
4594 self
._write
_op
_status
(
4597 error_message
=error_description_nslcmop
,
4598 operation_state
=nslcmop_operation_state
,
4599 other_update
=db_nslcmop_update
,
4601 if ns_state
== "NOT_INSTANTIATED":
4605 {"nsr-id-ref": nsr_id
},
4606 {"_admin.nsState": "NOT_INSTANTIATED"},
4608 except DbException
as e
:
4611 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4615 if operation_params
:
4616 autoremove
= operation_params
.get("autoremove", False)
4617 if nslcmop_operation_state
:
4619 await self
.msg
.aiowrite(
4624 "nslcmop_id": nslcmop_id
,
4625 "operationState": nslcmop_operation_state
,
4626 "autoremove": autoremove
,
4630 except Exception as e
:
4632 logging_text
+ "kafka_write notification Exception {}".format(e
)
4635 self
.logger
.debug(logging_text
+ "Exit")
4636 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4638 async def _wait_for_tasks(
4639 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4642 error_detail_list
= []
4644 pending_tasks
= list(created_tasks_info
.keys())
4645 num_tasks
= len(pending_tasks
)
4647 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4648 self
._write
_op
_status
(nslcmop_id
, stage
)
4649 while pending_tasks
:
4651 _timeout
= timeout
+ time_start
- time()
4652 done
, pending_tasks
= await asyncio
.wait(
4653 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4655 num_done
+= len(done
)
4656 if not done
: # Timeout
4657 for task
in pending_tasks
:
4658 new_error
= created_tasks_info
[task
] + ": Timeout"
4659 error_detail_list
.append(new_error
)
4660 error_list
.append(new_error
)
4663 if task
.cancelled():
4666 exc
= task
.exception()
4668 if isinstance(exc
, asyncio
.TimeoutError
):
4670 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4671 error_list
.append(created_tasks_info
[task
])
4672 error_detail_list
.append(new_error
)
4679 ROclient
.ROClientException
,
4685 self
.logger
.error(logging_text
+ new_error
)
4687 exc_traceback
= "".join(
4688 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4692 + created_tasks_info
[task
]
4698 logging_text
+ created_tasks_info
[task
] + ": Done"
4700 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4702 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4703 if nsr_id
: # update also nsr
4708 "errorDescription": "Error at: " + ", ".join(error_list
),
4709 "errorDetail": ". ".join(error_detail_list
),
4712 self
._write
_op
_status
(nslcmop_id
, stage
)
4713 return error_detail_list
4716 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4718 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4719 The default-value is used. If it is between < > it look for a value at instantiation_params
4720 :param primitive_desc: portion of VNFD/NSD that describes primitive
4721 :param params: Params provided by user
4722 :param instantiation_params: Instantiation params provided by user
4723 :return: a dictionary with the calculated params
4725 calculated_params
= {}
4726 for parameter
in primitive_desc
.get("parameter", ()):
4727 param_name
= parameter
["name"]
4728 if param_name
in params
:
4729 calculated_params
[param_name
] = params
[param_name
]
4730 elif "default-value" in parameter
or "value" in parameter
:
4731 if "value" in parameter
:
4732 calculated_params
[param_name
] = parameter
["value"]
4734 calculated_params
[param_name
] = parameter
["default-value"]
4736 isinstance(calculated_params
[param_name
], str)
4737 and calculated_params
[param_name
].startswith("<")
4738 and calculated_params
[param_name
].endswith(">")
4740 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4741 calculated_params
[param_name
] = instantiation_params
[
4742 calculated_params
[param_name
][1:-1]
4746 "Parameter {} needed to execute primitive {} not provided".format(
4747 calculated_params
[param_name
], primitive_desc
["name"]
4752 "Parameter {} needed to execute primitive {} not provided".format(
4753 param_name
, primitive_desc
["name"]
4757 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4758 calculated_params
[param_name
] = yaml
.safe_dump(
4759 calculated_params
[param_name
], default_flow_style
=True, width
=256
4761 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4763 ].startswith("!!yaml "):
4764 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4765 if parameter
.get("data-type") == "INTEGER":
4767 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4768 except ValueError: # error converting string to int
4770 "Parameter {} of primitive {} must be integer".format(
4771 param_name
, primitive_desc
["name"]
4774 elif parameter
.get("data-type") == "BOOLEAN":
4775 calculated_params
[param_name
] = not (
4776 (str(calculated_params
[param_name
])).lower() == "false"
4779 # add always ns_config_info if primitive name is config
4780 if primitive_desc
["name"] == "config":
4781 if "ns_config_info" in instantiation_params
:
4782 calculated_params
["ns_config_info"] = instantiation_params
[
4785 return calculated_params
4787 def _look_for_deployed_vca(
4794 ee_descriptor_id
=None,
4796 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4797 for vca
in deployed_vca
:
4800 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4803 vdu_count_index
is not None
4804 and vdu_count_index
!= vca
["vdu_count_index"]
4807 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4809 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4813 # vca_deployed not found
4815 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4816 " is not deployed".format(
4825 ee_id
= vca
.get("ee_id")
4827 "type", "lxc_proxy_charm"
4828 ) # default value for backward compatibility - proxy charm
4831 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4832 "execution environment".format(
4833 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
4836 return ee_id
, vca_type
4838 async def _ns_execute_primitive(
4844 retries_interval
=30,
4851 if primitive
== "config":
4852 primitive_params
= {"params": primitive_params
}
4854 vca_type
= vca_type
or "lxc_proxy_charm"
4858 output
= await asyncio
.wait_for(
4859 self
.vca_map
[vca_type
].exec_primitive(
4861 primitive_name
=primitive
,
4862 params_dict
=primitive_params
,
4863 progress_timeout
=self
.timeout_progress_primitive
,
4864 total_timeout
=self
.timeout_primitive
,
4869 timeout
=timeout
or self
.timeout_primitive
,
4873 except asyncio
.CancelledError
:
4875 except Exception as e
: # asyncio.TimeoutError
4876 if isinstance(e
, asyncio
.TimeoutError
):
4881 "Error executing action {} on {} -> {}".format(
4886 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
4888 return "FAILED", str(e
)
4890 return "COMPLETED", output
4892 except (LcmException
, asyncio
.CancelledError
):
4894 except Exception as e
:
4895 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
4897 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
4899 Updating the vca_status with latest juju information in nsrs record
4900 :param: nsr_id: Id of the nsr
4901 :param: nslcmop_id: Id of the nslcmop
4905 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
4906 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4907 vca_id
= self
.get_vca_id({}, db_nsr
)
4908 if db_nsr
["_admin"]["deployed"]["K8s"]:
4909 for _
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
4910 cluster_uuid
, kdu_instance
, cluster_type
= (
4911 k8s
["k8scluster-uuid"],
4912 k8s
["kdu-instance"],
4913 k8s
["k8scluster-type"],
4915 await self
._on
_update
_k
8s
_db
(
4916 cluster_uuid
=cluster_uuid
,
4917 kdu_instance
=kdu_instance
,
4918 filter={"_id": nsr_id
},
4920 cluster_type
=cluster_type
,
4923 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
4924 table
, filter = "nsrs", {"_id": nsr_id
}
4925 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
4926 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
4928 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
4929 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
4931 async def action(self
, nsr_id
, nslcmop_id
):
4932 # Try to lock HA task here
4933 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4934 if not task_is_locked_by_me
:
4937 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
4938 self
.logger
.debug(logging_text
+ "Enter")
4939 # get all needed from database
4943 db_nslcmop_update
= {}
4944 nslcmop_operation_state
= None
4945 error_description_nslcmop
= None
4948 # wait for any previous tasks in process
4949 step
= "Waiting for previous operations to terminate"
4950 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4952 self
._write
_ns
_status
(
4955 current_operation
="RUNNING ACTION",
4956 current_operation_id
=nslcmop_id
,
4959 step
= "Getting information from database"
4960 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4961 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4962 if db_nslcmop
["operationParams"].get("primitive_params"):
4963 db_nslcmop
["operationParams"]["primitive_params"] = json
.loads(
4964 db_nslcmop
["operationParams"]["primitive_params"]
4967 nsr_deployed
= db_nsr
["_admin"].get("deployed")
4968 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
4969 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
4970 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
4971 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
4972 primitive
= db_nslcmop
["operationParams"]["primitive"]
4973 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
4974 timeout_ns_action
= db_nslcmop
["operationParams"].get(
4975 "timeout_ns_action", self
.timeout_primitive
4979 step
= "Getting vnfr from database"
4980 db_vnfr
= self
.db
.get_one(
4981 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
4983 if db_vnfr
.get("kdur"):
4985 for kdur
in db_vnfr
["kdur"]:
4986 if kdur
.get("additionalParams"):
4987 kdur
["additionalParams"] = json
.loads(
4988 kdur
["additionalParams"]
4990 kdur_list
.append(kdur
)
4991 db_vnfr
["kdur"] = kdur_list
4992 step
= "Getting vnfd from database"
4993 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
4995 # Sync filesystem before running a primitive
4996 self
.fs
.sync(db_vnfr
["vnfd-id"])
4998 step
= "Getting nsd from database"
4999 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
5001 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5002 # for backward compatibility
5003 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5004 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5005 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5006 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5008 # look for primitive
5009 config_primitive_desc
= descriptor_configuration
= None
5011 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
5013 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
5015 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
5017 descriptor_configuration
= db_nsd
.get("ns-configuration")
5019 if descriptor_configuration
and descriptor_configuration
.get(
5022 for config_primitive
in descriptor_configuration
["config-primitive"]:
5023 if config_primitive
["name"] == primitive
:
5024 config_primitive_desc
= config_primitive
5027 if not config_primitive_desc
:
5028 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
5030 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
5034 primitive_name
= primitive
5035 ee_descriptor_id
= None
5037 primitive_name
= config_primitive_desc
.get(
5038 "execution-environment-primitive", primitive
5040 ee_descriptor_id
= config_primitive_desc
.get(
5041 "execution-environment-ref"
5047 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
5049 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
5052 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
5054 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
5056 desc_params
= parse_yaml_strings(
5057 db_vnfr
.get("additionalParamsForVnf")
5060 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
5061 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
5062 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
5064 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
5065 actions
.add(primitive
["name"])
5066 for primitive
in kdu_configuration
.get("config-primitive", []):
5067 actions
.add(primitive
["name"])
5069 nsr_deployed
["K8s"],
5070 lambda kdu
: kdu_name
== kdu
["kdu-name"]
5071 and kdu
["member-vnf-index"] == vnf_index
,
5075 if primitive_name
in actions
5076 and kdu
["k8scluster-type"] not in ("helm-chart", "helm-chart-v3")
5080 # TODO check if ns is in a proper status
5082 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
5084 # kdur and desc_params already set from before
5085 if primitive_params
:
5086 desc_params
.update(primitive_params
)
5087 # TODO Check if we will need something at vnf level
5088 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
5090 kdu_name
== kdu
["kdu-name"]
5091 and kdu
["member-vnf-index"] == vnf_index
5096 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
5099 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
5100 msg
= "unknown k8scluster-type '{}'".format(
5101 kdu
.get("k8scluster-type")
5103 raise LcmException(msg
)
5106 "collection": "nsrs",
5107 "filter": {"_id": nsr_id
},
5108 "path": "_admin.deployed.K8s.{}".format(index
),
5112 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
5114 step
= "Executing kdu {}".format(primitive_name
)
5115 if primitive_name
== "upgrade":
5116 if desc_params
.get("kdu_model"):
5117 kdu_model
= desc_params
.get("kdu_model")
5118 del desc_params
["kdu_model"]
5120 kdu_model
= kdu
.get("kdu-model")
5121 parts
= kdu_model
.split(sep
=":")
5123 kdu_model
= parts
[0]
5125 detailed_status
= await asyncio
.wait_for(
5126 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
5127 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5128 kdu_instance
=kdu
.get("kdu-instance"),
5130 kdu_model
=kdu_model
,
5133 timeout
=timeout_ns_action
,
5135 timeout
=timeout_ns_action
+ 10,
5138 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
5140 elif primitive_name
== "rollback":
5141 detailed_status
= await asyncio
.wait_for(
5142 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
5143 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5144 kdu_instance
=kdu
.get("kdu-instance"),
5147 timeout
=timeout_ns_action
,
5149 elif primitive_name
== "status":
5150 detailed_status
= await asyncio
.wait_for(
5151 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
5152 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5153 kdu_instance
=kdu
.get("kdu-instance"),
5156 timeout
=timeout_ns_action
,
5159 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
5160 kdu
["kdu-name"], nsr_id
5162 params
= self
._map
_primitive
_params
(
5163 config_primitive_desc
, primitive_params
, desc_params
5166 detailed_status
= await asyncio
.wait_for(
5167 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
5168 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5169 kdu_instance
=kdu_instance
,
5170 primitive_name
=primitive_name
,
5173 timeout
=timeout_ns_action
,
5176 timeout
=timeout_ns_action
,
5180 nslcmop_operation_state
= "COMPLETED"
5182 detailed_status
= ""
5183 nslcmop_operation_state
= "FAILED"
5185 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5186 nsr_deployed
["VCA"],
5187 member_vnf_index
=vnf_index
,
5189 vdu_count_index
=vdu_count_index
,
5190 ee_descriptor_id
=ee_descriptor_id
,
5192 for vca_index
, vca_deployed
in enumerate(
5193 db_nsr
["_admin"]["deployed"]["VCA"]
5195 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5197 "collection": "nsrs",
5198 "filter": {"_id": nsr_id
},
5199 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
5203 nslcmop_operation_state
,
5205 ) = await self
._ns
_execute
_primitive
(
5207 primitive
=primitive_name
,
5208 primitive_params
=self
._map
_primitive
_params
(
5209 config_primitive_desc
, primitive_params
, desc_params
5211 timeout
=timeout_ns_action
,
5217 db_nslcmop_update
["detailed-status"] = detailed_status
5218 error_description_nslcmop
= (
5219 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
5223 + " task Done with result {} {}".format(
5224 nslcmop_operation_state
, detailed_status
5227 return # database update is called inside finally
5229 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5230 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5232 except asyncio
.CancelledError
:
5234 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5236 exc
= "Operation was cancelled"
5237 except asyncio
.TimeoutError
:
5238 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5240 except Exception as e
:
5241 exc
= traceback
.format_exc()
5242 self
.logger
.critical(
5243 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5252 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5253 nslcmop_operation_state
= "FAILED"
5255 self
._write
_ns
_status
(
5259 ], # TODO check if degraded. For the moment use previous status
5260 current_operation
="IDLE",
5261 current_operation_id
=None,
5262 # error_description=error_description_nsr,
5263 # error_detail=error_detail,
5264 other_update
=db_nsr_update
,
5267 self
._write
_op
_status
(
5270 error_message
=error_description_nslcmop
,
5271 operation_state
=nslcmop_operation_state
,
5272 other_update
=db_nslcmop_update
,
5275 if nslcmop_operation_state
:
5277 await self
.msg
.aiowrite(
5282 "nslcmop_id": nslcmop_id
,
5283 "operationState": nslcmop_operation_state
,
5287 except Exception as e
:
5289 logging_text
+ "kafka_write notification Exception {}".format(e
)
5291 self
.logger
.debug(logging_text
+ "Exit")
5292 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5293 return nslcmop_operation_state
, detailed_status
5295 async def scale(self
, nsr_id
, nslcmop_id
):
5296 # Try to lock HA task here
5297 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5298 if not task_is_locked_by_me
:
5301 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
5302 stage
= ["", "", ""]
5303 tasks_dict_info
= {}
5304 # ^ stage, step, VIM progress
5305 self
.logger
.debug(logging_text
+ "Enter")
5306 # get all needed from database
5308 db_nslcmop_update
= {}
5311 # in case of error, indicates what part of scale was failed to put nsr at error status
5312 scale_process
= None
5313 old_operational_status
= ""
5314 old_config_status
= ""
5317 # wait for any previous tasks in process
5318 step
= "Waiting for previous operations to terminate"
5319 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5320 self
._write
_ns
_status
(
5323 current_operation
="SCALING",
5324 current_operation_id
=nslcmop_id
,
5327 step
= "Getting nslcmop from database"
5329 step
+ " after having waited for previous tasks to be completed"
5331 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5333 step
= "Getting nsr from database"
5334 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5335 old_operational_status
= db_nsr
["operational-status"]
5336 old_config_status
= db_nsr
["config-status"]
5338 step
= "Parsing scaling parameters"
5339 db_nsr_update
["operational-status"] = "scaling"
5340 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5341 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5343 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
5345 ]["member-vnf-index"]
5346 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
5348 ]["scaling-group-descriptor"]
5349 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
5350 # for backward compatibility
5351 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5352 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5353 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5354 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5356 step
= "Getting vnfr from database"
5357 db_vnfr
= self
.db
.get_one(
5358 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
5361 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5363 step
= "Getting vnfd from database"
5364 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
5366 base_folder
= db_vnfd
["_admin"]["storage"]
5368 step
= "Getting scaling-group-descriptor"
5369 scaling_descriptor
= find_in_list(
5370 get_scaling_aspect(db_vnfd
),
5371 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
5373 if not scaling_descriptor
:
5375 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
5376 "at vnfd:scaling-group-descriptor".format(scaling_group
)
5379 step
= "Sending scale order to VIM"
5380 # TODO check if ns is in a proper status
5382 if not db_nsr
["_admin"].get("scaling-group"):
5387 "_admin.scaling-group": [
5388 {"name": scaling_group
, "nb-scale-op": 0}
5392 admin_scale_index
= 0
5394 for admin_scale_index
, admin_scale_info
in enumerate(
5395 db_nsr
["_admin"]["scaling-group"]
5397 if admin_scale_info
["name"] == scaling_group
:
5398 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
5400 else: # not found, set index one plus last element and add new entry with the name
5401 admin_scale_index
+= 1
5403 "_admin.scaling-group.{}.name".format(admin_scale_index
)
5406 vca_scaling_info
= []
5407 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
5408 if scaling_type
== "SCALE_OUT":
5409 if "aspect-delta-details" not in scaling_descriptor
:
5411 "Aspect delta details not fount in scaling descriptor {}".format(
5412 scaling_descriptor
["name"]
5415 # count if max-instance-count is reached
5416 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
5418 scaling_info
["scaling_direction"] = "OUT"
5419 scaling_info
["vdu-create"] = {}
5420 scaling_info
["kdu-create"] = {}
5421 for delta
in deltas
:
5422 for vdu_delta
in delta
.get("vdu-delta", {}):
5423 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
5424 # vdu_index also provides the number of instance of the targeted vdu
5425 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
5426 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
5430 additional_params
= (
5431 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
5434 cloud_init_list
= []
5436 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
5437 max_instance_count
= 10
5438 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
5439 max_instance_count
= vdu_profile
.get(
5440 "max-number-of-instances", 10
5443 default_instance_num
= get_number_of_instances(
5446 instances_number
= vdu_delta
.get("number-of-instances", 1)
5447 nb_scale_op
+= instances_number
5449 new_instance_count
= nb_scale_op
+ default_instance_num
5450 # Control if new count is over max and vdu count is less than max.
5451 # Then assign new instance count
5452 if new_instance_count
> max_instance_count
> vdu_count
:
5453 instances_number
= new_instance_count
- max_instance_count
5455 instances_number
= instances_number
5457 if new_instance_count
> max_instance_count
:
5459 "reached the limit of {} (max-instance-count) "
5460 "scaling-out operations for the "
5461 "scaling-group-descriptor '{}'".format(
5462 nb_scale_op
, scaling_group
5465 for x
in range(vdu_delta
.get("number-of-instances", 1)):
5467 # TODO Information of its own ip is not available because db_vnfr is not updated.
5468 additional_params
["OSM"] = get_osm_params(
5469 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
5471 cloud_init_list
.append(
5472 self
._parse
_cloud
_init
(
5479 vca_scaling_info
.append(
5481 "osm_vdu_id": vdu_delta
["id"],
5482 "member-vnf-index": vnf_index
,
5484 "vdu_index": vdu_index
+ x
,
5487 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
5488 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
5489 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
5490 kdu_name
= kdu_profile
["kdu-name"]
5491 resource_name
= kdu_profile
.get("resource-name", "")
5493 # Might have different kdus in the same delta
5494 # Should have list for each kdu
5495 if not scaling_info
["kdu-create"].get(kdu_name
, None):
5496 scaling_info
["kdu-create"][kdu_name
] = []
5498 kdur
= get_kdur(db_vnfr
, kdu_name
)
5499 if kdur
.get("helm-chart"):
5500 k8s_cluster_type
= "helm-chart-v3"
5501 self
.logger
.debug("kdur: {}".format(kdur
))
5503 kdur
.get("helm-version")
5504 and kdur
.get("helm-version") == "v2"
5506 k8s_cluster_type
= "helm-chart"
5507 elif kdur
.get("juju-bundle"):
5508 k8s_cluster_type
= "juju-bundle"
5511 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5512 "juju-bundle. Maybe an old NBI version is running".format(
5513 db_vnfr
["member-vnf-index-ref"], kdu_name
5517 max_instance_count
= 10
5518 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
5519 max_instance_count
= kdu_profile
.get(
5520 "max-number-of-instances", 10
5523 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
5524 deployed_kdu
, _
= get_deployed_kdu(
5525 nsr_deployed
, kdu_name
, vnf_index
5527 if deployed_kdu
is None:
5529 "KDU '{}' for vnf '{}' not deployed".format(
5533 kdu_instance
= deployed_kdu
.get("kdu-instance")
5534 instance_num
= await self
.k8scluster_map
[
5540 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
5541 kdu_model
=deployed_kdu
.get("kdu-model"),
5543 kdu_replica_count
= instance_num
+ kdu_delta
.get(
5544 "number-of-instances", 1
5547 # Control if new count is over max and instance_num is less than max.
5548 # Then assign max instance number to kdu replica count
5549 if kdu_replica_count
> max_instance_count
> instance_num
:
5550 kdu_replica_count
= max_instance_count
5551 if kdu_replica_count
> max_instance_count
:
5553 "reached the limit of {} (max-instance-count) "
5554 "scaling-out operations for the "
5555 "scaling-group-descriptor '{}'".format(
5556 instance_num
, scaling_group
5560 for x
in range(kdu_delta
.get("number-of-instances", 1)):
5561 vca_scaling_info
.append(
5563 "osm_kdu_id": kdu_name
,
5564 "member-vnf-index": vnf_index
,
5566 "kdu_index": instance_num
+ x
- 1,
5569 scaling_info
["kdu-create"][kdu_name
].append(
5571 "member-vnf-index": vnf_index
,
5573 "k8s-cluster-type": k8s_cluster_type
,
5574 "resource-name": resource_name
,
5575 "scale": kdu_replica_count
,
5578 elif scaling_type
== "SCALE_IN":
5579 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
5581 scaling_info
["scaling_direction"] = "IN"
5582 scaling_info
["vdu-delete"] = {}
5583 scaling_info
["kdu-delete"] = {}
5585 for delta
in deltas
:
5586 for vdu_delta
in delta
.get("vdu-delta", {}):
5587 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
5588 min_instance_count
= 0
5589 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
5590 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
5591 min_instance_count
= vdu_profile
["min-number-of-instances"]
5593 default_instance_num
= get_number_of_instances(
5594 db_vnfd
, vdu_delta
["id"]
5596 instance_num
= vdu_delta
.get("number-of-instances", 1)
5597 nb_scale_op
-= instance_num
5599 new_instance_count
= nb_scale_op
+ default_instance_num
5601 if new_instance_count
< min_instance_count
< vdu_count
:
5602 instances_number
= min_instance_count
- new_instance_count
5604 instances_number
= instance_num
5606 if new_instance_count
< min_instance_count
:
5608 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5609 "scaling-group-descriptor '{}'".format(
5610 nb_scale_op
, scaling_group
5613 for x
in range(vdu_delta
.get("number-of-instances", 1)):
5614 vca_scaling_info
.append(
5616 "osm_vdu_id": vdu_delta
["id"],
5617 "member-vnf-index": vnf_index
,
5619 "vdu_index": vdu_index
- 1 - x
,
5622 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
5623 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
5624 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
5625 kdu_name
= kdu_profile
["kdu-name"]
5626 resource_name
= kdu_profile
.get("resource-name", "")
5628 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
5629 scaling_info
["kdu-delete"][kdu_name
] = []
5631 kdur
= get_kdur(db_vnfr
, kdu_name
)
5632 if kdur
.get("helm-chart"):
5633 k8s_cluster_type
= "helm-chart-v3"
5634 self
.logger
.debug("kdur: {}".format(kdur
))
5636 kdur
.get("helm-version")
5637 and kdur
.get("helm-version") == "v2"
5639 k8s_cluster_type
= "helm-chart"
5640 elif kdur
.get("juju-bundle"):
5641 k8s_cluster_type
= "juju-bundle"
5644 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5645 "juju-bundle. Maybe an old NBI version is running".format(
5646 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
5650 min_instance_count
= 0
5651 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
5652 min_instance_count
= kdu_profile
["min-number-of-instances"]
5654 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
5655 deployed_kdu
, _
= get_deployed_kdu(
5656 nsr_deployed
, kdu_name
, vnf_index
5658 if deployed_kdu
is None:
5660 "KDU '{}' for vnf '{}' not deployed".format(
5664 kdu_instance
= deployed_kdu
.get("kdu-instance")
5665 instance_num
= await self
.k8scluster_map
[
5671 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
5672 kdu_model
=deployed_kdu
.get("kdu-model"),
5674 kdu_replica_count
= instance_num
- kdu_delta
.get(
5675 "number-of-instances", 1
5678 if kdu_replica_count
< min_instance_count
< instance_num
:
5679 kdu_replica_count
= min_instance_count
5680 if kdu_replica_count
< min_instance_count
:
5682 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5683 "scaling-group-descriptor '{}'".format(
5684 instance_num
, scaling_group
5688 for x
in range(kdu_delta
.get("number-of-instances", 1)):
5689 vca_scaling_info
.append(
5691 "osm_kdu_id": kdu_name
,
5692 "member-vnf-index": vnf_index
,
5694 "kdu_index": instance_num
- x
- 1,
5697 scaling_info
["kdu-delete"][kdu_name
].append(
5699 "member-vnf-index": vnf_index
,
5701 "k8s-cluster-type": k8s_cluster_type
,
5702 "resource-name": resource_name
,
5703 "scale": kdu_replica_count
,
5707 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
5708 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
5709 if scaling_info
["scaling_direction"] == "IN":
5710 for vdur
in reversed(db_vnfr
["vdur"]):
5711 if vdu_delete
.get(vdur
["vdu-id-ref"]):
5712 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
5713 scaling_info
["vdu"].append(
5715 "name": vdur
.get("name") or vdur
.get("vdu-name"),
5716 "vdu_id": vdur
["vdu-id-ref"],
5720 for interface
in vdur
["interfaces"]:
5721 scaling_info
["vdu"][-1]["interface"].append(
5723 "name": interface
["name"],
5724 "ip_address": interface
["ip-address"],
5725 "mac_address": interface
.get("mac-address"),
5728 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
5731 step
= "Executing pre-scale vnf-config-primitive"
5732 if scaling_descriptor
.get("scaling-config-action"):
5733 for scaling_config_action
in scaling_descriptor
[
5734 "scaling-config-action"
5737 scaling_config_action
.get("trigger") == "pre-scale-in"
5738 and scaling_type
== "SCALE_IN"
5740 scaling_config_action
.get("trigger") == "pre-scale-out"
5741 and scaling_type
== "SCALE_OUT"
5743 vnf_config_primitive
= scaling_config_action
[
5744 "vnf-config-primitive-name-ref"
5746 step
= db_nslcmop_update
[
5748 ] = "executing pre-scale scaling-config-action '{}'".format(
5749 vnf_config_primitive
5752 # look for primitive
5753 for config_primitive
in (
5754 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
5755 ).get("config-primitive", ()):
5756 if config_primitive
["name"] == vnf_config_primitive
:
5760 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
5761 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
5762 "primitive".format(scaling_group
, vnf_config_primitive
)
5765 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
5766 if db_vnfr
.get("additionalParamsForVnf"):
5767 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
5769 scale_process
= "VCA"
5770 db_nsr_update
["config-status"] = "configuring pre-scaling"
5771 primitive_params
= self
._map
_primitive
_params
(
5772 config_primitive
, {}, vnfr_params
5775 # Pre-scale retry check: Check if this sub-operation has been executed before
5776 op_index
= self
._check
_or
_add
_scale
_suboperation
(
5779 vnf_config_primitive
,
5783 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
5784 # Skip sub-operation
5785 result
= "COMPLETED"
5786 result_detail
= "Done"
5789 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5790 vnf_config_primitive
, result
, result_detail
5794 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
5795 # New sub-operation: Get index of this sub-operation
5797 len(db_nslcmop
.get("_admin", {}).get("operations"))
5802 + "vnf_config_primitive={} New sub-operation".format(
5803 vnf_config_primitive
5807 # retry: Get registered params for this existing sub-operation
5808 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
5811 vnf_index
= op
.get("member_vnf_index")
5812 vnf_config_primitive
= op
.get("primitive")
5813 primitive_params
= op
.get("primitive_params")
5816 + "vnf_config_primitive={} Sub-operation retry".format(
5817 vnf_config_primitive
5820 # Execute the primitive, either with new (first-time) or registered (reintent) args
5821 ee_descriptor_id
= config_primitive
.get(
5822 "execution-environment-ref"
5824 primitive_name
= config_primitive
.get(
5825 "execution-environment-primitive", vnf_config_primitive
5827 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5828 nsr_deployed
["VCA"],
5829 member_vnf_index
=vnf_index
,
5831 vdu_count_index
=None,
5832 ee_descriptor_id
=ee_descriptor_id
,
5834 result
, result_detail
= await self
._ns
_execute
_primitive
(
5843 + "vnf_config_primitive={} Done with result {} {}".format(
5844 vnf_config_primitive
, result
, result_detail
5847 # Update operationState = COMPLETED | FAILED
5848 self
._update
_suboperation
_status
(
5849 db_nslcmop
, op_index
, result
, result_detail
5852 if result
== "FAILED":
5853 raise LcmException(result_detail
)
5854 db_nsr_update
["config-status"] = old_config_status
5855 scale_process
= None
5859 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
5862 "_admin.scaling-group.{}.time".format(admin_scale_index
)
5865 # SCALE-IN VCA - BEGIN
5866 if vca_scaling_info
:
5867 step
= db_nslcmop_update
[
5869 ] = "Deleting the execution environments"
5870 scale_process
= "VCA"
5871 for vca_info
in vca_scaling_info
:
5872 if vca_info
["type"] == "delete" and not vca_info
.get("osm_kdu_id"):
5873 member_vnf_index
= str(vca_info
["member-vnf-index"])
5875 logging_text
+ "vdu info: {}".format(vca_info
)
5877 if vca_info
.get("osm_vdu_id"):
5878 vdu_id
= vca_info
["osm_vdu_id"]
5879 vdu_index
= int(vca_info
["vdu_index"])
5882 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5883 member_vnf_index
, vdu_id
, vdu_index
5885 stage
[2] = step
= "Scaling in VCA"
5886 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
5887 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
5888 config_update
= db_nsr
["configurationStatus"]
5889 for vca_index
, vca
in enumerate(vca_update
):
5891 (vca
or vca
.get("ee_id"))
5892 and vca
["member-vnf-index"] == member_vnf_index
5893 and vca
["vdu_count_index"] == vdu_index
5895 if vca
.get("vdu_id"):
5896 config_descriptor
= get_configuration(
5897 db_vnfd
, vca
.get("vdu_id")
5899 elif vca
.get("kdu_name"):
5900 config_descriptor
= get_configuration(
5901 db_vnfd
, vca
.get("kdu_name")
5904 config_descriptor
= get_configuration(
5905 db_vnfd
, db_vnfd
["id"]
5907 operation_params
= (
5908 db_nslcmop
.get("operationParams") or {}
5910 exec_terminate_primitives
= not operation_params
.get(
5911 "skip_terminate_primitives"
5912 ) and vca
.get("needed_terminate")
5913 task
= asyncio
.ensure_future(
5922 exec_primitives
=exec_terminate_primitives
,
5926 timeout
=self
.timeout_charm_delete
,
5929 tasks_dict_info
[task
] = "Terminating VCA {}".format(
5932 del vca_update
[vca_index
]
5933 del config_update
[vca_index
]
5934 # wait for pending tasks of terminate primitives
5938 + "Waiting for tasks {}".format(
5939 list(tasks_dict_info
.keys())
5942 error_list
= await self
._wait
_for
_tasks
(
5946 self
.timeout_charm_delete
, self
.timeout_ns_terminate
5951 tasks_dict_info
.clear()
5953 raise LcmException("; ".join(error_list
))
5955 db_vca_and_config_update
= {
5956 "_admin.deployed.VCA": vca_update
,
5957 "configurationStatus": config_update
,
5960 "nsrs", db_nsr
["_id"], db_vca_and_config_update
5962 scale_process
= None
5963 # SCALE-IN VCA - END
5966 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
5967 scale_process
= "RO"
5968 if self
.ro_config
.get("ng"):
5969 await self
._scale
_ng
_ro
(
5970 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
5972 scaling_info
.pop("vdu-create", None)
5973 scaling_info
.pop("vdu-delete", None)
5975 scale_process
= None
5979 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
5980 scale_process
= "KDU"
5981 await self
._scale
_kdu
(
5982 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
5984 scaling_info
.pop("kdu-create", None)
5985 scaling_info
.pop("kdu-delete", None)
5987 scale_process
= None
5991 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5993 # SCALE-UP VCA - BEGIN
5994 if vca_scaling_info
:
5995 step
= db_nslcmop_update
[
5997 ] = "Creating new execution environments"
5998 scale_process
= "VCA"
5999 for vca_info
in vca_scaling_info
:
6000 if vca_info
["type"] == "create" and not vca_info
.get("osm_kdu_id"):
6001 member_vnf_index
= str(vca_info
["member-vnf-index"])
6003 logging_text
+ "vdu info: {}".format(vca_info
)
6005 vnfd_id
= db_vnfr
["vnfd-ref"]
6006 if vca_info
.get("osm_vdu_id"):
6007 vdu_index
= int(vca_info
["vdu_index"])
6008 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
6009 if db_vnfr
.get("additionalParamsForVnf"):
6010 deploy_params
.update(
6012 db_vnfr
["additionalParamsForVnf"].copy()
6015 descriptor_config
= get_configuration(
6016 db_vnfd
, db_vnfd
["id"]
6018 if descriptor_config
:
6023 logging_text
=logging_text
6024 + "member_vnf_index={} ".format(member_vnf_index
),
6027 nslcmop_id
=nslcmop_id
,
6033 member_vnf_index
=member_vnf_index
,
6034 vdu_index
=vdu_index
,
6036 deploy_params
=deploy_params
,
6037 descriptor_config
=descriptor_config
,
6038 base_folder
=base_folder
,
6039 task_instantiation_info
=tasks_dict_info
,
6042 vdu_id
= vca_info
["osm_vdu_id"]
6043 vdur
= find_in_list(
6044 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
6046 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
6047 if vdur
.get("additionalParams"):
6048 deploy_params_vdu
= parse_yaml_strings(
6049 vdur
["additionalParams"]
6052 deploy_params_vdu
= deploy_params
6053 deploy_params_vdu
["OSM"] = get_osm_params(
6054 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
6056 if descriptor_config
:
6061 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6062 member_vnf_index
, vdu_id
, vdu_index
6064 stage
[2] = step
= "Scaling out VCA"
6065 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6067 logging_text
=logging_text
6068 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6069 member_vnf_index
, vdu_id
, vdu_index
6073 nslcmop_id
=nslcmop_id
,
6079 member_vnf_index
=member_vnf_index
,
6080 vdu_index
=vdu_index
,
6082 deploy_params
=deploy_params_vdu
,
6083 descriptor_config
=descriptor_config
,
6084 base_folder
=base_folder
,
6085 task_instantiation_info
=tasks_dict_info
,
6088 # SCALE-UP VCA - END
6089 scale_process
= None
6092 # execute primitive service POST-SCALING
6093 step
= "Executing post-scale vnf-config-primitive"
6094 if scaling_descriptor
.get("scaling-config-action"):
6095 for scaling_config_action
in scaling_descriptor
[
6096 "scaling-config-action"
6099 scaling_config_action
.get("trigger") == "post-scale-in"
6100 and scaling_type
== "SCALE_IN"
6102 scaling_config_action
.get("trigger") == "post-scale-out"
6103 and scaling_type
== "SCALE_OUT"
6105 vnf_config_primitive
= scaling_config_action
[
6106 "vnf-config-primitive-name-ref"
6108 step
= db_nslcmop_update
[
6110 ] = "executing post-scale scaling-config-action '{}'".format(
6111 vnf_config_primitive
6114 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6115 if db_vnfr
.get("additionalParamsForVnf"):
6116 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6118 # look for primitive
6119 for config_primitive
in (
6120 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6121 ).get("config-primitive", ()):
6122 if config_primitive
["name"] == vnf_config_primitive
:
6126 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
6127 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
6128 "config-primitive".format(
6129 scaling_group
, vnf_config_primitive
6132 scale_process
= "VCA"
6133 db_nsr_update
["config-status"] = "configuring post-scaling"
6134 primitive_params
= self
._map
_primitive
_params
(
6135 config_primitive
, {}, vnfr_params
6138 # Post-scale retry check: Check if this sub-operation has been executed before
6139 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6142 vnf_config_primitive
,
6146 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6147 # Skip sub-operation
6148 result
= "COMPLETED"
6149 result_detail
= "Done"
6152 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6153 vnf_config_primitive
, result
, result_detail
6157 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6158 # New sub-operation: Get index of this sub-operation
6160 len(db_nslcmop
.get("_admin", {}).get("operations"))
6165 + "vnf_config_primitive={} New sub-operation".format(
6166 vnf_config_primitive
6170 # retry: Get registered params for this existing sub-operation
6171 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6174 vnf_index
= op
.get("member_vnf_index")
6175 vnf_config_primitive
= op
.get("primitive")
6176 primitive_params
= op
.get("primitive_params")
6179 + "vnf_config_primitive={} Sub-operation retry".format(
6180 vnf_config_primitive
6183 # Execute the primitive, either with new (first-time) or registered (reintent) args
6184 ee_descriptor_id
= config_primitive
.get(
6185 "execution-environment-ref"
6187 primitive_name
= config_primitive
.get(
6188 "execution-environment-primitive", vnf_config_primitive
6190 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6191 nsr_deployed
["VCA"],
6192 member_vnf_index
=vnf_index
,
6194 vdu_count_index
=None,
6195 ee_descriptor_id
=ee_descriptor_id
,
6197 result
, result_detail
= await self
._ns
_execute
_primitive
(
6206 + "vnf_config_primitive={} Done with result {} {}".format(
6207 vnf_config_primitive
, result
, result_detail
6210 # Update operationState = COMPLETED | FAILED
6211 self
._update
_suboperation
_status
(
6212 db_nslcmop
, op_index
, result
, result_detail
6215 if result
== "FAILED":
6216 raise LcmException(result_detail
)
6217 db_nsr_update
["config-status"] = old_config_status
6218 scale_process
= None
6223 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
6224 db_nsr_update
["operational-status"] = (
6226 if old_operational_status
== "failed"
6227 else old_operational_status
6229 db_nsr_update
["config-status"] = old_config_status
6232 ROclient
.ROClientException
,
6237 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
6239 except asyncio
.CancelledError
:
6241 logging_text
+ "Cancelled Exception while '{}'".format(step
)
6243 exc
= "Operation was cancelled"
6244 except Exception as e
:
6245 exc
= traceback
.format_exc()
6246 self
.logger
.critical(
6247 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6251 self
._write
_ns
_status
(
6254 current_operation
="IDLE",
6255 current_operation_id
=None,
6258 stage
[1] = "Waiting for instantiate pending tasks."
6259 self
.logger
.debug(logging_text
+ stage
[1])
6260 exc
= await self
._wait
_for
_tasks
(
6263 self
.timeout_ns_deploy
,
6271 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
6272 nslcmop_operation_state
= "FAILED"
6274 db_nsr_update
["operational-status"] = old_operational_status
6275 db_nsr_update
["config-status"] = old_config_status
6276 db_nsr_update
["detailed-status"] = ""
6278 if "VCA" in scale_process
:
6279 db_nsr_update
["config-status"] = "failed"
6280 if "RO" in scale_process
:
6281 db_nsr_update
["operational-status"] = "failed"
6284 ] = "FAILED scaling nslcmop={} {}: {}".format(
6285 nslcmop_id
, step
, exc
6288 error_description_nslcmop
= None
6289 nslcmop_operation_state
= "COMPLETED"
6290 db_nslcmop_update
["detailed-status"] = "Done"
6292 self
._write
_op
_status
(
6295 error_message
=error_description_nslcmop
,
6296 operation_state
=nslcmop_operation_state
,
6297 other_update
=db_nslcmop_update
,
6300 self
._write
_ns
_status
(
6303 current_operation
="IDLE",
6304 current_operation_id
=None,
6305 other_update
=db_nsr_update
,
6308 if nslcmop_operation_state
:
6312 "nslcmop_id": nslcmop_id
,
6313 "operationState": nslcmop_operation_state
,
6315 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
6316 except Exception as e
:
6318 logging_text
+ "kafka_write notification Exception {}".format(e
)
6320 self
.logger
.debug(logging_text
+ "Exit")
6321 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
6323 async def _scale_kdu(
6324 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6326 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
6327 for kdu_name
in _scaling_info
:
6328 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
6329 deployed_kdu
, index
= get_deployed_kdu(
6330 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
6332 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
6333 kdu_instance
= deployed_kdu
["kdu-instance"]
6334 kdu_model
= deployed_kdu
.get("kdu-model")
6335 scale
= int(kdu_scaling_info
["scale"])
6336 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
6339 "collection": "nsrs",
6340 "filter": {"_id": nsr_id
},
6341 "path": "_admin.deployed.K8s.{}".format(index
),
6344 step
= "scaling application {}".format(
6345 kdu_scaling_info
["resource-name"]
6347 self
.logger
.debug(logging_text
+ step
)
6349 if kdu_scaling_info
["type"] == "delete":
6350 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
6353 and kdu_config
.get("terminate-config-primitive")
6354 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
6356 terminate_config_primitive_list
= kdu_config
.get(
6357 "terminate-config-primitive"
6359 terminate_config_primitive_list
.sort(
6360 key
=lambda val
: int(val
["seq"])
6364 terminate_config_primitive
6365 ) in terminate_config_primitive_list
:
6366 primitive_params_
= self
._map
_primitive
_params
(
6367 terminate_config_primitive
, {}, {}
6369 step
= "execute terminate config primitive"
6370 self
.logger
.debug(logging_text
+ step
)
6371 await asyncio
.wait_for(
6372 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
6373 cluster_uuid
=cluster_uuid
,
6374 kdu_instance
=kdu_instance
,
6375 primitive_name
=terminate_config_primitive
["name"],
6376 params
=primitive_params_
,
6383 await asyncio
.wait_for(
6384 self
.k8scluster_map
[k8s_cluster_type
].scale(
6387 kdu_scaling_info
["resource-name"],
6389 cluster_uuid
=cluster_uuid
,
6390 kdu_model
=kdu_model
,
6394 timeout
=self
.timeout_vca_on_error
,
6397 if kdu_scaling_info
["type"] == "create":
6398 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
6401 and kdu_config
.get("initial-config-primitive")
6402 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
6404 initial_config_primitive_list
= kdu_config
.get(
6405 "initial-config-primitive"
6407 initial_config_primitive_list
.sort(
6408 key
=lambda val
: int(val
["seq"])
6411 for initial_config_primitive
in initial_config_primitive_list
:
6412 primitive_params_
= self
._map
_primitive
_params
(
6413 initial_config_primitive
, {}, {}
6415 step
= "execute initial config primitive"
6416 self
.logger
.debug(logging_text
+ step
)
6417 await asyncio
.wait_for(
6418 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
6419 cluster_uuid
=cluster_uuid
,
6420 kdu_instance
=kdu_instance
,
6421 primitive_name
=initial_config_primitive
["name"],
6422 params
=primitive_params_
,
6429 async def _scale_ng_ro(
6430 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
6432 nsr_id
= db_nslcmop
["nsInstanceId"]
6433 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
6436 # read from db: vnfd's for every vnf
6439 # for each vnf in ns, read vnfd
6440 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
6441 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
6442 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
6443 # if we haven't this vnfd, read it from db
6444 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
6446 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
6447 db_vnfds
.append(vnfd
)
6448 n2vc_key
= self
.n2vc
.get_public_key()
6449 n2vc_key_list
= [n2vc_key
]
6452 vdu_scaling_info
.get("vdu-create"),
6453 vdu_scaling_info
.get("vdu-delete"),
6456 # db_vnfr has been updated, update db_vnfrs to use it
6457 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
6458 await self
._instantiate
_ng
_ro
(
6468 start_deploy
=time(),
6469 timeout_ns_deploy
=self
.timeout_ns_deploy
,
6471 if vdu_scaling_info
.get("vdu-delete"):
6473 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
6476 async def extract_prometheus_scrape_jobs(
6477 self
, ee_id
, artifact_path
, ee_config_descriptor
, vnfr_id
, nsr_id
, target_ip
6479 # look if exist a file called 'prometheus*.j2' and
6480 artifact_content
= self
.fs
.dir_ls(artifact_path
)
6484 for f
in artifact_content
6485 if f
.startswith("prometheus") and f
.endswith(".j2")
6491 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
6495 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
6496 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
6498 vnfr_id
= vnfr_id
.replace("-", "")
6500 "JOB_NAME": vnfr_id
,
6501 "TARGET_IP": target_ip
,
6502 "EXPORTER_POD_IP": host_name
,
6503 "EXPORTER_POD_PORT": host_port
,
6505 job_list
= parse_job(job_data
, variables
)
6506 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
6507 for job
in job_list
:
6509 not isinstance(job
.get("job_name"), str)
6510 or vnfr_id
not in job
["job_name"]
6512 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
6513 job
["nsr_id"] = nsr_id
6514 job
["vnfr_id"] = vnfr_id
6517 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
6519 Get VCA Cloud and VCA Cloud Credentials for the VIM account
6521 :param: vim_account_id: VIM Account ID
6523 :return: (cloud_name, cloud_credential)
6525 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
6526 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
6528 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
6530 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
6532 :param: vim_account_id: VIM Account ID
6534 :return: (cloud_name, cloud_credential)
6536 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
6537 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")