1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 from typing
import Any
, Dict
, List
24 import logging
.handlers
35 from osm_lcm
import ROclient
36 from osm_lcm
.data_utils
.nsr
import (
39 get_deployed_vca_list
,
42 from osm_lcm
.data_utils
.vca
import (
51 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
52 from osm_lcm
.lcm_utils
import (
59 check_juju_bundle_existence
,
60 get_charm_artifact_path
,
62 from osm_lcm
.data_utils
.nsd
import (
63 get_ns_configuration_relation_list
,
67 from osm_lcm
.data_utils
.vnfd
import (
73 get_ee_sorted_initial_config_primitive_list
,
74 get_ee_sorted_terminate_config_primitive_list
,
76 get_virtual_link_profiles
,
81 get_number_of_instances
,
83 get_kdu_resource_profile
,
84 find_software_version
,
86 from osm_lcm
.data_utils
.list_utils
import find_in_list
87 from osm_lcm
.data_utils
.vnfr
import get_osm_params
, get_vdur_index
, get_kdur
88 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
89 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
90 from n2vc
.definitions
import RelationEndpoint
91 from n2vc
.k8s_helm_conn
import K8sHelmConnector
92 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
93 from n2vc
.k8s_juju_conn
import K8sJujuConnector
95 from osm_common
.dbbase
import DbException
96 from osm_common
.fsbase
import FsException
98 from osm_lcm
.data_utils
.database
.database
import Database
99 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
101 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
102 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
104 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
105 from osm_lcm
.osm_config
import OsmConfigBuilder
106 from osm_lcm
.prometheus
import parse_job
108 from copy
import copy
, deepcopy
109 from time
import time
110 from uuid
import uuid4
112 from random
import randint
114 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
117 class NsLcm(LcmBase
):
118 timeout_vca_on_error
= (
120 ) # Time for charm from first time at blocked,error status to mark as failed
121 timeout_ns_deploy
= 2 * 3600 # default global timeout for deployment a ns
122 timeout_ns_terminate
= 1800 # default global timeout for un deployment a ns
123 timeout_charm_delete
= 10 * 60
124 timeout_primitive
= 30 * 60 # timeout for primitive execution
125 timeout_ns_update
= 30 * 60 # timeout for ns update
126 timeout_progress_primitive
= (
128 ) # timeout for some progress in a primitive execution
130 SUBOPERATION_STATUS_NOT_FOUND
= -1
131 SUBOPERATION_STATUS_NEW
= -2
132 SUBOPERATION_STATUS_SKIP
= -3
133 task_name_deploy_vca
= "Deploying VCA"
135 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
137 Init, Connect to database, filesystem storage, and messaging
138 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
141 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
143 self
.db
= Database().instance
.db
144 self
.fs
= Filesystem().instance
.fs
146 self
.lcm_tasks
= lcm_tasks
147 self
.timeout
= config
["timeout"]
148 self
.ro_config
= config
["ro_config"]
149 self
.ng_ro
= config
["ro_config"].get("ng")
150 self
.vca_config
= config
["VCA"].copy()
152 # create N2VC connector
153 self
.n2vc
= N2VCJujuConnector(
156 on_update_db
=self
._on
_update
_n
2vc
_db
,
161 self
.conn_helm_ee
= LCMHelmConn(
164 vca_config
=self
.vca_config
,
165 on_update_db
=self
._on
_update
_n
2vc
_db
,
168 self
.k8sclusterhelm2
= K8sHelmConnector(
169 kubectl_command
=self
.vca_config
.get("kubectlpath"),
170 helm_command
=self
.vca_config
.get("helmpath"),
177 self
.k8sclusterhelm3
= K8sHelm3Connector(
178 kubectl_command
=self
.vca_config
.get("kubectlpath"),
179 helm_command
=self
.vca_config
.get("helm3path"),
186 self
.k8sclusterjuju
= K8sJujuConnector(
187 kubectl_command
=self
.vca_config
.get("kubectlpath"),
188 juju_command
=self
.vca_config
.get("jujupath"),
191 on_update_db
=self
._on
_update
_k
8s
_db
,
196 self
.k8scluster_map
= {
197 "helm-chart": self
.k8sclusterhelm2
,
198 "helm-chart-v3": self
.k8sclusterhelm3
,
199 "chart": self
.k8sclusterhelm3
,
200 "juju-bundle": self
.k8sclusterjuju
,
201 "juju": self
.k8sclusterjuju
,
205 "lxc_proxy_charm": self
.n2vc
,
206 "native_charm": self
.n2vc
,
207 "k8s_proxy_charm": self
.n2vc
,
208 "helm": self
.conn_helm_ee
,
209 "helm-v3": self
.conn_helm_ee
,
213 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
)
216 def increment_ip_mac(ip_mac
, vm_index
=1):
217 if not isinstance(ip_mac
, str):
220 # try with ipv4 look for last dot
221 i
= ip_mac
.rfind(".")
224 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
225 # try with ipv6 or mac look for last colon. Operate in hex
226 i
= ip_mac
.rfind(":")
229 # format in hex, len can be 2 for mac or 4 for ipv6
230 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
231 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
237 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
239 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
242 # TODO filter RO descriptor fields...
246 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
247 db_dict
["deploymentStatus"] = ro_descriptor
248 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
250 except Exception as e
:
252 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id
, e
)
255 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
257 # remove last dot from path (if exists)
258 if path
.endswith("."):
261 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
262 # .format(table, filter, path, updated_data))
265 nsr_id
= filter.get("_id")
267 # read ns record from database
268 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
269 current_ns_status
= nsr
.get("nsState")
271 # get vca status for NS
272 status_dict
= await self
.n2vc
.get_status(
273 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
278 db_dict
["vcaStatus"] = status_dict
279 await self
.n2vc
.update_vca_status(db_dict
["vcaStatus"], vca_id
=vca_id
)
281 # update configurationStatus for this VCA
283 vca_index
= int(path
[path
.rfind(".") + 1 :])
286 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
288 vca_status
= vca_list
[vca_index
].get("status")
290 configuration_status_list
= nsr
.get("configurationStatus")
291 config_status
= configuration_status_list
[vca_index
].get("status")
293 if config_status
== "BROKEN" and vca_status
!= "failed":
294 db_dict
["configurationStatus"][vca_index
] = "READY"
295 elif config_status
!= "BROKEN" and vca_status
== "failed":
296 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
297 except Exception as e
:
298 # not update configurationStatus
299 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
301 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
302 # if nsState = 'DEGRADED' check if all is OK
304 if current_ns_status
in ("READY", "DEGRADED"):
305 error_description
= ""
307 if status_dict
.get("machines"):
308 for machine_id
in status_dict
.get("machines"):
309 machine
= status_dict
.get("machines").get(machine_id
)
310 # check machine agent-status
311 if machine
.get("agent-status"):
312 s
= machine
.get("agent-status").get("status")
315 error_description
+= (
316 "machine {} agent-status={} ; ".format(
320 # check machine instance status
321 if machine
.get("instance-status"):
322 s
= machine
.get("instance-status").get("status")
325 error_description
+= (
326 "machine {} instance-status={} ; ".format(
331 if status_dict
.get("applications"):
332 for app_id
in status_dict
.get("applications"):
333 app
= status_dict
.get("applications").get(app_id
)
334 # check application status
335 if app
.get("status"):
336 s
= app
.get("status").get("status")
339 error_description
+= (
340 "application {} status={} ; ".format(app_id
, s
)
343 if error_description
:
344 db_dict
["errorDescription"] = error_description
345 if current_ns_status
== "READY" and is_degraded
:
346 db_dict
["nsState"] = "DEGRADED"
347 if current_ns_status
== "DEGRADED" and not is_degraded
:
348 db_dict
["nsState"] = "READY"
351 self
.update_db_2("nsrs", nsr_id
, db_dict
)
353 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
355 except Exception as e
:
356 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
358 async def _on_update_k8s_db(
359 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None, cluster_type
="juju"
362 Updating vca status in NSR record
363 :param cluster_uuid: UUID of a k8s cluster
364 :param kdu_instance: The unique name of the KDU instance
365 :param filter: To get nsr_id
366 :cluster_type: The cluster type (juju, k8s)
370 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
371 # .format(cluster_uuid, kdu_instance, filter))
373 nsr_id
= filter.get("_id")
375 vca_status
= await self
.k8scluster_map
[cluster_type
].status_kdu(
376 cluster_uuid
=cluster_uuid
,
377 kdu_instance
=kdu_instance
,
379 complete_status
=True,
385 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
387 if cluster_type
in ("juju-bundle", "juju"):
388 # TODO -> this should be done in a more uniform way, I think in N2VC, in order to update the K8s VCA
389 # status in a similar way between Juju Bundles and Helm Charts on this side
390 await self
.k8sclusterjuju
.update_vca_status(
391 db_dict
["vcaStatus"],
397 f
"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
401 self
.update_db_2("nsrs", nsr_id
, db_dict
)
402 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
404 except Exception as e
:
405 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
408 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
410 env
= Environment(undefined
=StrictUndefined
)
411 template
= env
.from_string(cloud_init_text
)
412 return template
.render(additional_params
or {})
413 except UndefinedError
as e
:
415 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
416 "file, must be provided in the instantiation parameters inside the "
417 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
419 except (TemplateError
, TemplateNotFound
) as e
:
421 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
426 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
427 cloud_init_content
= cloud_init_file
= None
429 if vdu
.get("cloud-init-file"):
430 base_folder
= vnfd
["_admin"]["storage"]
431 if base_folder
["pkg-dir"]:
432 cloud_init_file
= "{}/{}/cloud_init/{}".format(
433 base_folder
["folder"],
434 base_folder
["pkg-dir"],
435 vdu
["cloud-init-file"],
438 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
439 base_folder
["folder"],
440 vdu
["cloud-init-file"],
442 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
443 cloud_init_content
= ci_file
.read()
444 elif vdu
.get("cloud-init"):
445 cloud_init_content
= vdu
["cloud-init"]
447 return cloud_init_content
448 except FsException
as e
:
450 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
451 vnfd
["id"], vdu
["id"], cloud_init_file
, e
455 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
457 (vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]),
460 additional_params
= vdur
.get("additionalParams")
461 return parse_yaml_strings(additional_params
)
463 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
465 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
466 :param vnfd: input vnfd
467 :param new_id: overrides vnf id if provided
468 :param additionalParams: Instantiation params for VNFs provided
469 :param nsrId: Id of the NSR
470 :return: copy of vnfd
472 vnfd_RO
= deepcopy(vnfd
)
473 # remove unused by RO configuration, monitoring, scaling and internal keys
474 vnfd_RO
.pop("_id", None)
475 vnfd_RO
.pop("_admin", None)
476 vnfd_RO
.pop("monitoring-param", None)
477 vnfd_RO
.pop("scaling-group-descriptor", None)
478 vnfd_RO
.pop("kdu", None)
479 vnfd_RO
.pop("k8s-cluster", None)
481 vnfd_RO
["id"] = new_id
483 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
484 for vdu
in get_iterable(vnfd_RO
, "vdu"):
485 vdu
.pop("cloud-init-file", None)
486 vdu
.pop("cloud-init", None)
490 def ip_profile_2_RO(ip_profile
):
491 RO_ip_profile
= deepcopy(ip_profile
)
492 if "dns-server" in RO_ip_profile
:
493 if isinstance(RO_ip_profile
["dns-server"], list):
494 RO_ip_profile
["dns-address"] = []
495 for ds
in RO_ip_profile
.pop("dns-server"):
496 RO_ip_profile
["dns-address"].append(ds
["address"])
498 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
499 if RO_ip_profile
.get("ip-version") == "ipv4":
500 RO_ip_profile
["ip-version"] = "IPv4"
501 if RO_ip_profile
.get("ip-version") == "ipv6":
502 RO_ip_profile
["ip-version"] = "IPv6"
503 if "dhcp-params" in RO_ip_profile
:
504 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
507 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
508 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
509 if db_vim
["_admin"]["operationalState"] != "ENABLED":
511 "VIM={} is not available. operationalState={}".format(
512 vim_account
, db_vim
["_admin"]["operationalState"]
515 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
518 def get_ro_wim_id_for_wim_account(self
, wim_account
):
519 if isinstance(wim_account
, str):
520 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
521 if db_wim
["_admin"]["operationalState"] != "ENABLED":
523 "WIM={} is not available. operationalState={}".format(
524 wim_account
, db_wim
["_admin"]["operationalState"]
527 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
532 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
534 db_vdu_push_list
= []
536 db_update
= {"_admin.modified": time()}
538 for vdu_id
, vdu_count
in vdu_create
.items():
542 for vdur
in reversed(db_vnfr
["vdur"])
543 if vdur
["vdu-id-ref"] == vdu_id
548 # Read the template saved in the db:
549 self
.logger
.debug(f
"No vdur in the database. Using the vdur-template to scale")
550 vdur_template
= db_vnfr
.get("vdur-template")
551 if not vdur_template
:
553 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
557 vdur
= vdur_template
[0]
558 #Delete a template from the database after using it
559 self
.db
.set_one("vnfrs",
560 {"_id": db_vnfr
["_id"]},
562 pull
={"vdur-template": {"_id": vdur
['_id']}}
564 for count
in range(vdu_count
):
565 vdur_copy
= deepcopy(vdur
)
566 vdur_copy
["status"] = "BUILD"
567 vdur_copy
["status-detailed"] = None
568 vdur_copy
["ip-address"] = None
569 vdur_copy
["_id"] = str(uuid4())
570 vdur_copy
["count-index"] += count
+ 1
571 vdur_copy
["id"] = "{}-{}".format(
572 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
574 vdur_copy
.pop("vim_info", None)
575 for iface
in vdur_copy
["interfaces"]:
576 if iface
.get("fixed-ip"):
577 iface
["ip-address"] = self
.increment_ip_mac(
578 iface
["ip-address"], count
+ 1
581 iface
.pop("ip-address", None)
582 if iface
.get("fixed-mac"):
583 iface
["mac-address"] = self
.increment_ip_mac(
584 iface
["mac-address"], count
+ 1
587 iface
.pop("mac-address", None)
591 ) # only first vdu can be managment of vnf
592 db_vdu_push_list
.append(vdur_copy
)
593 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
595 if len(db_vnfr
["vdur"]) == 1:
596 # The scale will move to 0 instances
597 self
.logger
.debug(f
"Scaling to 0 !, creating the template with the last vdur")
598 template_vdur
= [db_vnfr
["vdur"][0]]
599 for vdu_id
, vdu_count
in vdu_delete
.items():
601 indexes_to_delete
= [
603 for iv
in enumerate(db_vnfr
["vdur"])
604 if iv
[1]["vdu-id-ref"] == vdu_id
608 "vdur.{}.status".format(i
): "DELETING"
609 for i
in indexes_to_delete
[-vdu_count
:]
613 # it must be deleted one by one because common.db does not allow otherwise
616 for v
in reversed(db_vnfr
["vdur"])
617 if v
["vdu-id-ref"] == vdu_id
619 for vdu
in vdus_to_delete
[:vdu_count
]:
622 {"_id": db_vnfr
["_id"]},
624 pull
={"vdur": {"_id": vdu
["_id"]}},
628 db_push
["vdur"] = db_vdu_push_list
630 db_push
["vdur-template"] = template_vdur
633 db_vnfr
["vdur-template"] = template_vdur
634 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
635 # modify passed dictionary db_vnfr
636 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
637 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
639 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
641 Updates database nsr with the RO info for the created vld
642 :param ns_update_nsr: dictionary to be filled with the updated info
643 :param db_nsr: content of db_nsr. This is also modified
644 :param nsr_desc_RO: nsr descriptor from RO
645 :return: Nothing, LcmException is raised on errors
648 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
649 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
650 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
652 vld
["vim-id"] = net_RO
.get("vim_net_id")
653 vld
["name"] = net_RO
.get("vim_name")
654 vld
["status"] = net_RO
.get("status")
655 vld
["status-detailed"] = net_RO
.get("error_msg")
656 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
660 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
663 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
665 for db_vnfr
in db_vnfrs
.values():
666 vnfr_update
= {"status": "ERROR"}
667 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
668 if "status" not in vdur
:
669 vdur
["status"] = "ERROR"
670 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
672 vdur
["status-detailed"] = str(error_text
)
674 "vdur.{}.status-detailed".format(vdu_index
)
676 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
677 except DbException
as e
:
678 self
.logger
.error("Cannot update vnf. {}".format(e
))
680 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
682 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
683 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
684 :param nsr_desc_RO: nsr descriptor from RO
685 :return: Nothing, LcmException is raised on errors
687 for vnf_index
, db_vnfr
in db_vnfrs
.items():
688 for vnf_RO
in nsr_desc_RO
["vnfs"]:
689 if vnf_RO
["member_vnf_index"] != vnf_index
:
692 if vnf_RO
.get("ip_address"):
693 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
[
696 elif not db_vnfr
.get("ip-address"):
697 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
698 raise LcmExceptionNoMgmtIP(
699 "ns member_vnf_index '{}' has no IP address".format(
704 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
705 vdur_RO_count_index
= 0
706 if vdur
.get("pdu-type"):
708 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
709 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
711 if vdur
["count-index"] != vdur_RO_count_index
:
712 vdur_RO_count_index
+= 1
714 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
715 if vdur_RO
.get("ip_address"):
716 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
718 vdur
["ip-address"] = None
719 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
720 vdur
["name"] = vdur_RO
.get("vim_name")
721 vdur
["status"] = vdur_RO
.get("status")
722 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
723 for ifacer
in get_iterable(vdur
, "interfaces"):
724 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
725 if ifacer
["name"] == interface_RO
.get("internal_name"):
726 ifacer
["ip-address"] = interface_RO
.get(
729 ifacer
["mac-address"] = interface_RO
.get(
735 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
736 "from VIM info".format(
737 vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]
740 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
744 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
746 vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]
750 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
751 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
752 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
754 vld
["vim-id"] = net_RO
.get("vim_net_id")
755 vld
["name"] = net_RO
.get("vim_name")
756 vld
["status"] = net_RO
.get("status")
757 vld
["status-detailed"] = net_RO
.get("error_msg")
758 vnfr_update
["vld.{}".format(vld_index
)] = vld
762 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
767 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
772 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
777 def _get_ns_config_info(self
, nsr_id
):
779 Generates a mapping between vnf,vdu elements and the N2VC id
780 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
781 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
782 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
783 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
785 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
786 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
788 ns_config_info
= {"osm-config-mapping": mapping
}
789 for vca
in vca_deployed_list
:
790 if not vca
["member-vnf-index"]:
792 if not vca
["vdu_id"]:
793 mapping
[vca
["member-vnf-index"]] = vca
["application"]
797 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
799 ] = vca
["application"]
800 return ns_config_info
802 async def _instantiate_ng_ro(
819 def get_vim_account(vim_account_id
):
821 if vim_account_id
in db_vims
:
822 return db_vims
[vim_account_id
]
823 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
824 db_vims
[vim_account_id
] = db_vim
827 # modify target_vld info with instantiation parameters
828 def parse_vld_instantiation_params(
829 target_vim
, target_vld
, vld_params
, target_sdn
831 if vld_params
.get("ip-profile"):
832 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
[
835 if vld_params
.get("provider-network"):
836 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
839 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
840 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
843 if vld_params
.get("wimAccountId"):
844 target_wim
= "wim:{}".format(vld_params
["wimAccountId"])
845 target_vld
["vim_info"][target_wim
] = {}
846 for param
in ("vim-network-name", "vim-network-id"):
847 if vld_params
.get(param
):
848 if isinstance(vld_params
[param
], dict):
849 for vim
, vim_net
in vld_params
[param
].items():
850 other_target_vim
= "vim:" + vim
852 target_vld
["vim_info"],
853 (other_target_vim
, param
.replace("-", "_")),
856 else: # isinstance str
857 target_vld
["vim_info"][target_vim
][
858 param
.replace("-", "_")
859 ] = vld_params
[param
]
860 if vld_params
.get("common_id"):
861 target_vld
["common_id"] = vld_params
.get("common_id")
863 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
864 def update_ns_vld_target(target
, ns_params
):
865 for vnf_params
in ns_params
.get("vnf", ()):
866 if vnf_params
.get("vimAccountId"):
870 for vnfr
in db_vnfrs
.values()
871 if vnf_params
["member-vnf-index"]
872 == vnfr
["member-vnf-index-ref"]
876 vdur
= next((vdur
for vdur
in target_vnf
.get("vdur", ())), None)
877 for a_index
, a_vld
in enumerate(target
["ns"]["vld"]):
878 target_vld
= find_in_list(
879 get_iterable(vdur
, "interfaces"),
880 lambda iface
: iface
.get("ns-vld-id") == a_vld
["name"],
883 if vnf_params
.get("vimAccountId") not in a_vld
.get(
886 target
["ns"]["vld"][a_index
].get("vim_info").update(
888 "vim:{}".format(vnf_params
["vimAccountId"]): {
889 "vim_network_name": ""
894 nslcmop_id
= db_nslcmop
["_id"]
896 "name": db_nsr
["name"],
899 "image": deepcopy(db_nsr
["image"]),
900 "flavor": deepcopy(db_nsr
["flavor"]),
901 "action_id": nslcmop_id
,
902 "cloud_init_content": {},
904 for image
in target
["image"]:
905 image
["vim_info"] = {}
906 for flavor
in target
["flavor"]:
907 flavor
["vim_info"] = {}
908 if db_nsr
.get("affinity-or-anti-affinity-group"):
909 target
["affinity-or-anti-affinity-group"] = deepcopy(
910 db_nsr
["affinity-or-anti-affinity-group"]
912 for affinity_or_anti_affinity_group
in target
[
913 "affinity-or-anti-affinity-group"
915 affinity_or_anti_affinity_group
["vim_info"] = {}
917 if db_nslcmop
.get("lcmOperationType") != "instantiate":
918 # get parameters of instantiation:
919 db_nslcmop_instantiate
= self
.db
.get_list(
922 "nsInstanceId": db_nslcmop
["nsInstanceId"],
923 "lcmOperationType": "instantiate",
926 ns_params
= db_nslcmop_instantiate
.get("operationParams")
928 ns_params
= db_nslcmop
.get("operationParams")
929 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
930 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
933 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
934 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
938 "mgmt-network": vld
.get("mgmt-network", False),
939 "type": vld
.get("type"),
942 "vim_network_name": vld
.get("vim-network-name"),
943 "vim_account_id": ns_params
["vimAccountId"],
947 # check if this network needs SDN assist
948 if vld
.get("pci-interfaces"):
949 db_vim
= get_vim_account(ns_params
["vimAccountId"])
950 sdnc_id
= db_vim
["config"].get("sdn-controller")
952 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
953 target_sdn
= "sdn:{}".format(sdnc_id
)
954 target_vld
["vim_info"][target_sdn
] = {
956 "target_vim": target_vim
,
958 "type": vld
.get("type"),
961 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
962 for nsd_vnf_profile
in nsd_vnf_profiles
:
963 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
964 if cp
["virtual-link-profile-id"] == vld
["id"]:
966 "member_vnf:{}.{}".format(
967 cp
["constituent-cpd-id"][0][
968 "constituent-base-element-id"
970 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
972 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
974 # check at nsd descriptor, if there is an ip-profile
976 nsd_vlp
= find_in_list(
977 get_virtual_link_profiles(nsd
),
978 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
983 and nsd_vlp
.get("virtual-link-protocol-data")
984 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
986 ip_profile_source_data
= nsd_vlp
["virtual-link-protocol-data"][
989 ip_profile_dest_data
= {}
990 if "ip-version" in ip_profile_source_data
:
991 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
994 if "cidr" in ip_profile_source_data
:
995 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
998 if "gateway-ip" in ip_profile_source_data
:
999 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
[
1002 if "dhcp-enabled" in ip_profile_source_data
:
1003 ip_profile_dest_data
["dhcp-params"] = {
1004 "enabled": ip_profile_source_data
["dhcp-enabled"]
1006 vld_params
["ip-profile"] = ip_profile_dest_data
1008 # update vld_params with instantiation params
1009 vld_instantiation_params
= find_in_list(
1010 get_iterable(ns_params
, "vld"),
1011 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
1013 if vld_instantiation_params
:
1014 vld_params
.update(vld_instantiation_params
)
1015 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
1016 target
["ns"]["vld"].append(target_vld
)
1017 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1018 update_ns_vld_target(target
, ns_params
)
1020 for vnfr
in db_vnfrs
.values():
1021 vnfd
= find_in_list(
1022 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
1024 vnf_params
= find_in_list(
1025 get_iterable(ns_params
, "vnf"),
1026 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
1028 target_vnf
= deepcopy(vnfr
)
1029 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
1030 for vld
in target_vnf
.get("vld", ()):
1031 # check if connected to a ns.vld, to fill target'
1032 vnf_cp
= find_in_list(
1033 vnfd
.get("int-virtual-link-desc", ()),
1034 lambda cpd
: cpd
.get("id") == vld
["id"],
1037 ns_cp
= "member_vnf:{}.{}".format(
1038 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
1040 if cp2target
.get(ns_cp
):
1041 vld
["target"] = cp2target
[ns_cp
]
1044 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
1046 # check if this network needs SDN assist
1048 if vld
.get("pci-interfaces"):
1049 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1050 sdnc_id
= db_vim
["config"].get("sdn-controller")
1052 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
1053 target_sdn
= "sdn:{}".format(sdnc_id
)
1054 vld
["vim_info"][target_sdn
] = {
1056 "target_vim": target_vim
,
1058 "type": vld
.get("type"),
1061 # check at vnfd descriptor, if there is an ip-profile
1063 vnfd_vlp
= find_in_list(
1064 get_virtual_link_profiles(vnfd
),
1065 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
1069 and vnfd_vlp
.get("virtual-link-protocol-data")
1070 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1072 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"][
1075 ip_profile_dest_data
= {}
1076 if "ip-version" in ip_profile_source_data
:
1077 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1080 if "cidr" in ip_profile_source_data
:
1081 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1084 if "gateway-ip" in ip_profile_source_data
:
1085 ip_profile_dest_data
[
1087 ] = ip_profile_source_data
["gateway-ip"]
1088 if "dhcp-enabled" in ip_profile_source_data
:
1089 ip_profile_dest_data
["dhcp-params"] = {
1090 "enabled": ip_profile_source_data
["dhcp-enabled"]
1093 vld_params
["ip-profile"] = ip_profile_dest_data
1094 # update vld_params with instantiation params
1096 vld_instantiation_params
= find_in_list(
1097 get_iterable(vnf_params
, "internal-vld"),
1098 lambda i_vld
: i_vld
["name"] == vld
["id"],
1100 if vld_instantiation_params
:
1101 vld_params
.update(vld_instantiation_params
)
1102 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
1105 for vdur
in target_vnf
.get("vdur", ()):
1106 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
1107 continue # This vdu must not be created
1108 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
1110 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
1113 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
1114 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
1117 and vdu_configuration
.get("config-access")
1118 and vdu_configuration
.get("config-access").get("ssh-access")
1120 vdur
["ssh-keys"] = ssh_keys_all
1121 vdur
["ssh-access-required"] = vdu_configuration
[
1123 ]["ssh-access"]["required"]
1126 and vnf_configuration
.get("config-access")
1127 and vnf_configuration
.get("config-access").get("ssh-access")
1128 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
1130 vdur
["ssh-keys"] = ssh_keys_all
1131 vdur
["ssh-access-required"] = vnf_configuration
[
1133 ]["ssh-access"]["required"]
1134 elif ssh_keys_instantiation
and find_in_list(
1135 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
1137 vdur
["ssh-keys"] = ssh_keys_instantiation
1139 self
.logger
.debug("NS > vdur > {}".format(vdur
))
1141 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1143 if vdud
.get("cloud-init-file"):
1144 vdur
["cloud-init"] = "{}:file:{}".format(
1145 vnfd
["_id"], vdud
.get("cloud-init-file")
1147 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1148 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1149 base_folder
= vnfd
["_admin"]["storage"]
1150 if base_folder
["pkg-dir"]:
1151 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1152 base_folder
["folder"],
1153 base_folder
["pkg-dir"],
1154 vdud
.get("cloud-init-file"),
1157 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
1158 base_folder
["folder"],
1159 vdud
.get("cloud-init-file"),
1161 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1162 target
["cloud_init_content"][
1165 elif vdud
.get("cloud-init"):
1166 vdur
["cloud-init"] = "{}:vdu:{}".format(
1167 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1169 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1170 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1173 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1174 deploy_params_vdu
= self
._format
_additional
_params
(
1175 vdur
.get("additionalParams") or {}
1177 deploy_params_vdu
["OSM"] = get_osm_params(
1178 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1180 vdur
["additionalParams"] = deploy_params_vdu
1183 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1184 if target_vim
not in ns_flavor
["vim_info"]:
1185 ns_flavor
["vim_info"][target_vim
] = {}
1188 # in case alternative images are provided we must check if they should be applied
1189 # for the vim_type, modify the vim_type taking into account
1190 ns_image_id
= int(vdur
["ns-image-id"])
1191 if vdur
.get("alt-image-ids"):
1192 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1193 vim_type
= db_vim
["vim_type"]
1194 for alt_image_id
in vdur
.get("alt-image-ids"):
1195 ns_alt_image
= target
["image"][int(alt_image_id
)]
1196 if vim_type
== ns_alt_image
.get("vim-type"):
1197 # must use alternative image
1199 "use alternative image id: {}".format(alt_image_id
)
1201 ns_image_id
= alt_image_id
1202 vdur
["ns-image-id"] = ns_image_id
1204 ns_image
= target
["image"][int(ns_image_id
)]
1205 if target_vim
not in ns_image
["vim_info"]:
1206 ns_image
["vim_info"][target_vim
] = {}
1209 if vdur
.get("affinity-or-anti-affinity-group-id"):
1210 for ags_id
in vdur
["affinity-or-anti-affinity-group-id"]:
1211 ns_ags
= target
["affinity-or-anti-affinity-group"][int(ags_id
)]
1212 if target_vim
not in ns_ags
["vim_info"]:
1213 ns_ags
["vim_info"][target_vim
] = {}
1215 vdur
["vim_info"] = {target_vim
: {}}
1216 # instantiation parameters
1218 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
1219 # vdud["id"]), None)
1220 vdur_list
.append(vdur
)
1221 target_vnf
["vdur"] = vdur_list
1222 target
["vnf"].append(target_vnf
)
1224 desc
= await self
.RO
.deploy(nsr_id
, target
)
1225 self
.logger
.debug("RO return > {}".format(desc
))
1226 action_id
= desc
["action_id"]
1227 await self
._wait
_ng
_ro
(
1228 nsr_id
, action_id
, nslcmop_id
, start_deploy
, timeout_ns_deploy
, stage
1233 "_admin.deployed.RO.operational-status": "running",
1234 "detailed-status": " ".join(stage
),
1236 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1237 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1238 self
._write
_op
_status
(nslcmop_id
, stage
)
1240 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1244 async def _wait_ng_ro(
1253 detailed_status_old
= None
1255 start_time
= start_time
or time()
1256 while time() <= start_time
+ timeout
:
1257 desc_status
= await self
.RO
.status(nsr_id
, action_id
)
1258 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1259 if desc_status
["status"] == "FAILED":
1260 raise NgRoException(desc_status
["details"])
1261 elif desc_status
["status"] == "BUILD":
1263 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1264 elif desc_status
["status"] == "DONE":
1266 stage
[2] = "Deployed at VIM"
1269 assert False, "ROclient.check_ns_status returns unknown {}".format(
1270 desc_status
["status"]
1272 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1273 detailed_status_old
= stage
[2]
1274 db_nsr_update
["detailed-status"] = " ".join(stage
)
1275 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1276 self
._write
_op
_status
(nslcmop_id
, stage
)
1277 await asyncio
.sleep(15, loop
=self
.loop
)
1278 else: # timeout_ns_deploy
1279 raise NgRoException("Timeout waiting ns to deploy")
1281 async def _terminate_ng_ro(
1282 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1287 start_deploy
= time()
1294 "action_id": nslcmop_id
,
1296 desc
= await self
.RO
.deploy(nsr_id
, target
)
1297 action_id
= desc
["action_id"]
1298 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1299 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1302 + "ns terminate action at RO. action_id={}".format(action_id
)
1306 delete_timeout
= 20 * 60 # 20 minutes
1307 await self
._wait
_ng
_ro
(
1308 nsr_id
, action_id
, nslcmop_id
, start_deploy
, delete_timeout
, stage
1311 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1312 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1314 await self
.RO
.delete(nsr_id
)
1315 except Exception as e
:
1316 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
1317 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1318 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1319 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1321 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1323 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
1324 failed_detail
.append("delete conflict: {}".format(e
))
1327 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1330 failed_detail
.append("delete error: {}".format(e
))
1333 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1337 stage
[2] = "Error deleting from VIM"
1339 stage
[2] = "Deleted from VIM"
1340 db_nsr_update
["detailed-status"] = " ".join(stage
)
1341 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1342 self
._write
_op
_status
(nslcmop_id
, stage
)
1345 raise LcmException("; ".join(failed_detail
))
1348 async def instantiate_RO(
1362 :param logging_text: preffix text to use at logging
1363 :param nsr_id: nsr identity
1364 :param nsd: database content of ns descriptor
1365 :param db_nsr: database content of ns record
1366 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1368 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1369 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1370 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1371 :return: None or exception
1374 start_deploy
= time()
1375 ns_params
= db_nslcmop
.get("operationParams")
1376 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1377 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1379 timeout_ns_deploy
= self
.timeout
.get(
1380 "ns_deploy", self
.timeout_ns_deploy
1383 # Check for and optionally request placement optimization. Database will be updated if placement activated
1384 stage
[2] = "Waiting for Placement."
1385 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1386 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1387 for vnfr
in db_vnfrs
.values():
1388 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1391 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1393 return await self
._instantiate
_ng
_ro
(
1406 except Exception as e
:
1407 stage
[2] = "ERROR deploying at VIM"
1408 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1410 "Error deploying at VIM {}".format(e
),
1411 exc_info
=not isinstance(
1414 ROclient
.ROClientException
,
1423 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1425 Wait for kdu to be up, get ip address
1426 :param logging_text: prefix use for logging
1430 :return: IP address, K8s services
1433 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1436 while nb_tries
< 360:
1437 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1441 for x
in get_iterable(db_vnfr
, "kdur")
1442 if x
.get("kdu-name") == kdu_name
1448 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1450 if kdur
.get("status"):
1451 if kdur
["status"] in ("READY", "ENABLED"):
1452 return kdur
.get("ip-address"), kdur
.get("services")
1455 "target KDU={} is in error state".format(kdu_name
)
1458 await asyncio
.sleep(10, loop
=self
.loop
)
1460 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1462 async def wait_vm_up_insert_key_ro(
1463 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1466 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1467 :param logging_text: prefix use for logging
1472 :param pub_key: public ssh key to inject, None to skip
1473 :param user: user to apply the public ssh key
1477 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1481 target_vdu_id
= None
1487 if ro_retries
>= 360: # 1 hour
1489 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1492 await asyncio
.sleep(10, loop
=self
.loop
)
1495 if not target_vdu_id
:
1496 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1498 if not vdu_id
: # for the VNF case
1499 if db_vnfr
.get("status") == "ERROR":
1501 "Cannot inject ssh-key because target VNF is in error state"
1503 ip_address
= db_vnfr
.get("ip-address")
1509 for x
in get_iterable(db_vnfr
, "vdur")
1510 if x
.get("ip-address") == ip_address
1518 for x
in get_iterable(db_vnfr
, "vdur")
1519 if x
.get("vdu-id-ref") == vdu_id
1520 and x
.get("count-index") == vdu_index
1526 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1527 ): # If only one, this should be the target vdu
1528 vdur
= db_vnfr
["vdur"][0]
1531 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1532 vnfr_id
, vdu_id
, vdu_index
1535 # New generation RO stores information at "vim_info"
1538 if vdur
.get("vim_info"):
1540 t
for t
in vdur
["vim_info"]
1541 ) # there should be only one key
1542 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1544 vdur
.get("pdu-type")
1545 or vdur
.get("status") == "ACTIVE"
1546 or ng_ro_status
== "ACTIVE"
1548 ip_address
= vdur
.get("ip-address")
1551 target_vdu_id
= vdur
["vdu-id-ref"]
1552 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1554 "Cannot inject ssh-key because target VM is in error state"
1557 if not target_vdu_id
:
1560 # inject public key into machine
1561 if pub_key
and user
:
1562 self
.logger
.debug(logging_text
+ "Inserting RO key")
1563 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1564 if vdur
.get("pdu-type"):
1565 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1568 ro_vm_id
= "{}-{}".format(
1569 db_vnfr
["member-vnf-index-ref"], target_vdu_id
1570 ) # TODO add vdu_index
1574 "action": "inject_ssh_key",
1578 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1580 desc
= await self
.RO
.deploy(nsr_id
, target
)
1581 action_id
= desc
["action_id"]
1582 await self
._wait
_ng
_ro
(nsr_id
, action_id
, timeout
=600)
1585 # wait until NS is deployed at RO
1587 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1588 ro_nsr_id
= deep_get(
1589 db_nsrs
, ("_admin", "deployed", "RO", "nsr_id")
1593 result_dict
= await self
.RO
.create_action(
1595 item_id_name
=ro_nsr_id
,
1597 "add_public_key": pub_key
,
1602 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1603 if not result_dict
or not isinstance(result_dict
, dict):
1605 "Unknown response from RO when injecting key"
1607 for result
in result_dict
.values():
1608 if result
.get("vim_result") == 200:
1611 raise ROclient
.ROClientException(
1612 "error injecting key: {}".format(
1613 result
.get("description")
1617 except NgRoException
as e
:
1619 "Reaching max tries injecting key. Error: {}".format(e
)
1621 except ROclient
.ROClientException
as e
:
1625 + "error injecting key: {}. Retrying until {} seconds".format(
1632 "Reaching max tries injecting key. Error: {}".format(e
)
1639 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1641 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1643 my_vca
= vca_deployed_list
[vca_index
]
1644 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1645 # vdu or kdu: no dependencies
1649 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1650 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1651 configuration_status_list
= db_nsr
["configurationStatus"]
1652 for index
, vca_deployed
in enumerate(configuration_status_list
):
1653 if index
== vca_index
:
1656 if not my_vca
.get("member-vnf-index") or (
1657 vca_deployed
.get("member-vnf-index")
1658 == my_vca
.get("member-vnf-index")
1660 internal_status
= configuration_status_list
[index
].get("status")
1661 if internal_status
== "READY":
1663 elif internal_status
== "BROKEN":
1665 "Configuration aborted because dependent charm/s has failed"
1670 # no dependencies, return
1672 await asyncio
.sleep(10)
1675 raise LcmException("Configuration aborted because dependent charm/s timeout")
1677 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1680 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1682 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1683 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1686 async def instantiate_N2VC(
1703 ee_config_descriptor
,
1705 nsr_id
= db_nsr
["_id"]
1706 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1707 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1708 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1709 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1711 "collection": "nsrs",
1712 "filter": {"_id": nsr_id
},
1713 "path": db_update_entry
,
1719 element_under_configuration
= nsr_id
1723 vnfr_id
= db_vnfr
["_id"]
1724 osm_config
["osm"]["vnf_id"] = vnfr_id
1726 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1728 if vca_type
== "native_charm":
1731 index_number
= vdu_index
or 0
1734 element_type
= "VNF"
1735 element_under_configuration
= vnfr_id
1736 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1738 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1739 element_type
= "VDU"
1740 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1741 osm_config
["osm"]["vdu_id"] = vdu_id
1743 namespace
+= ".{}".format(kdu_name
)
1744 element_type
= "KDU"
1745 element_under_configuration
= kdu_name
1746 osm_config
["osm"]["kdu_name"] = kdu_name
1749 if base_folder
["pkg-dir"]:
1750 artifact_path
= "{}/{}/{}/{}".format(
1751 base_folder
["folder"],
1752 base_folder
["pkg-dir"],
1755 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1760 artifact_path
= "{}/Scripts/{}/{}/".format(
1761 base_folder
["folder"],
1764 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1769 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1771 # get initial_config_primitive_list that applies to this element
1772 initial_config_primitive_list
= config_descriptor
.get(
1773 "initial-config-primitive"
1777 "Initial config primitive list > {}".format(
1778 initial_config_primitive_list
1782 # add config if not present for NS charm
1783 ee_descriptor_id
= ee_config_descriptor
.get("id")
1784 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1785 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1786 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1790 "Initial config primitive list #2 > {}".format(
1791 initial_config_primitive_list
1794 # n2vc_redesign STEP 3.1
1795 # find old ee_id if exists
1796 ee_id
= vca_deployed
.get("ee_id")
1798 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1799 # create or register execution environment in VCA
1800 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1802 self
._write
_configuration
_status
(
1804 vca_index
=vca_index
,
1806 element_under_configuration
=element_under_configuration
,
1807 element_type
=element_type
,
1810 step
= "create execution environment"
1811 self
.logger
.debug(logging_text
+ step
)
1815 if vca_type
== "k8s_proxy_charm":
1816 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1817 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1818 namespace
=namespace
,
1819 artifact_path
=artifact_path
,
1823 elif vca_type
== "helm" or vca_type
== "helm-v3":
1824 ee_id
, credentials
= await self
.vca_map
[
1826 ].create_execution_environment(
1827 namespace
=namespace
,
1831 artifact_path
=artifact_path
,
1835 ee_id
, credentials
= await self
.vca_map
[
1837 ].create_execution_environment(
1838 namespace
=namespace
,
1844 elif vca_type
== "native_charm":
1845 step
= "Waiting to VM being up and getting IP address"
1846 self
.logger
.debug(logging_text
+ step
)
1847 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1856 credentials
= {"hostname": rw_mgmt_ip
}
1858 username
= deep_get(
1859 config_descriptor
, ("config-access", "ssh-access", "default-user")
1861 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1862 # merged. Meanwhile let's get username from initial-config-primitive
1863 if not username
and initial_config_primitive_list
:
1864 for config_primitive
in initial_config_primitive_list
:
1865 for param
in config_primitive
.get("parameter", ()):
1866 if param
["name"] == "ssh-username":
1867 username
= param
["value"]
1871 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1872 "'config-access.ssh-access.default-user'"
1874 credentials
["username"] = username
1875 # n2vc_redesign STEP 3.2
1877 self
._write
_configuration
_status
(
1879 vca_index
=vca_index
,
1880 status
="REGISTERING",
1881 element_under_configuration
=element_under_configuration
,
1882 element_type
=element_type
,
1885 step
= "register execution environment {}".format(credentials
)
1886 self
.logger
.debug(logging_text
+ step
)
1887 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1888 credentials
=credentials
,
1889 namespace
=namespace
,
1894 # for compatibility with MON/POL modules, the need model and application name at database
1895 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1896 ee_id_parts
= ee_id
.split(".")
1897 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1898 if len(ee_id_parts
) >= 2:
1899 model_name
= ee_id_parts
[0]
1900 application_name
= ee_id_parts
[1]
1901 db_nsr_update
[db_update_entry
+ "model"] = model_name
1902 db_nsr_update
[db_update_entry
+ "application"] = application_name
1904 # n2vc_redesign STEP 3.3
1905 step
= "Install configuration Software"
1907 self
._write
_configuration
_status
(
1909 vca_index
=vca_index
,
1910 status
="INSTALLING SW",
1911 element_under_configuration
=element_under_configuration
,
1912 element_type
=element_type
,
1913 other_update
=db_nsr_update
,
1916 # TODO check if already done
1917 self
.logger
.debug(logging_text
+ step
)
1919 if vca_type
== "native_charm":
1920 config_primitive
= next(
1921 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
1924 if config_primitive
:
1925 config
= self
._map
_primitive
_params
(
1926 config_primitive
, {}, deploy_params
1929 if vca_type
== "lxc_proxy_charm":
1930 if element_type
== "NS":
1931 num_units
= db_nsr
.get("config-units") or 1
1932 elif element_type
== "VNF":
1933 num_units
= db_vnfr
.get("config-units") or 1
1934 elif element_type
== "VDU":
1935 for v
in db_vnfr
["vdur"]:
1936 if vdu_id
== v
["vdu-id-ref"]:
1937 num_units
= v
.get("config-units") or 1
1939 if vca_type
!= "k8s_proxy_charm":
1940 await self
.vca_map
[vca_type
].install_configuration_sw(
1942 artifact_path
=artifact_path
,
1945 num_units
=num_units
,
1950 # write in db flag of configuration_sw already installed
1952 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
1955 # add relations for this VCA (wait for other peers related with this VCA)
1956 await self
._add
_vca
_relations
(
1957 logging_text
=logging_text
,
1960 vca_index
=vca_index
,
1963 # if SSH access is required, then get execution environment SSH public
1964 # if native charm we have waited already to VM be UP
1965 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1968 # self.logger.debug("get ssh key block")
1970 config_descriptor
, ("config-access", "ssh-access", "required")
1972 # self.logger.debug("ssh key needed")
1973 # Needed to inject a ssh key
1976 ("config-access", "ssh-access", "default-user"),
1978 step
= "Install configuration Software, getting public ssh key"
1979 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
1980 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
1983 step
= "Insert public key into VM user={} ssh_key={}".format(
1987 # self.logger.debug("no need to get ssh key")
1988 step
= "Waiting to VM being up and getting IP address"
1989 self
.logger
.debug(logging_text
+ step
)
1991 # n2vc_redesign STEP 5.1
1992 # wait for RO (ip-address) Insert pub_key into VM
1995 rw_mgmt_ip
, services
= await self
.wait_kdu_up(
1996 logging_text
, nsr_id
, vnfr_id
, kdu_name
1998 vnfd
= self
.db
.get_one(
2000 {"_id": f
'{db_vnfr["vnfd-id"]}:{db_vnfr["revision"]}'},
2002 kdu
= get_kdu(vnfd
, kdu_name
)
2004 service
["name"] for service
in get_kdu_services(kdu
)
2006 exposed_services
= []
2007 for service
in services
:
2008 if any(s
in service
["name"] for s
in kdu_services
):
2009 exposed_services
.append(service
)
2010 await self
.vca_map
[vca_type
].exec_primitive(
2012 primitive_name
="config",
2014 "osm-config": json
.dumps(
2016 k8s
={"services": exposed_services
}
2023 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
2034 rw_mgmt_ip
= None # This is for a NS configuration
2036 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
2038 # store rw_mgmt_ip in deploy params for later replacement
2039 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
2041 # n2vc_redesign STEP 6 Execute initial config primitive
2042 step
= "execute initial config primitive"
2044 # wait for dependent primitives execution (NS -> VNF -> VDU)
2045 if initial_config_primitive_list
:
2046 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
2048 # stage, in function of element type: vdu, kdu, vnf or ns
2049 my_vca
= vca_deployed_list
[vca_index
]
2050 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
2052 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
2053 elif my_vca
.get("member-vnf-index"):
2055 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
2058 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
2060 self
._write
_configuration
_status
(
2061 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
2064 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2066 check_if_terminated_needed
= True
2067 for initial_config_primitive
in initial_config_primitive_list
:
2068 # adding information on the vca_deployed if it is a NS execution environment
2069 if not vca_deployed
["member-vnf-index"]:
2070 deploy_params
["ns_config_info"] = json
.dumps(
2071 self
._get
_ns
_config
_info
(nsr_id
)
2073 # TODO check if already done
2074 primitive_params_
= self
._map
_primitive
_params
(
2075 initial_config_primitive
, {}, deploy_params
2078 step
= "execute primitive '{}' params '{}'".format(
2079 initial_config_primitive
["name"], primitive_params_
2081 self
.logger
.debug(logging_text
+ step
)
2082 await self
.vca_map
[vca_type
].exec_primitive(
2084 primitive_name
=initial_config_primitive
["name"],
2085 params_dict
=primitive_params_
,
2090 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2091 if check_if_terminated_needed
:
2092 if config_descriptor
.get("terminate-config-primitive"):
2094 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
2096 check_if_terminated_needed
= False
2098 # TODO register in database that primitive is done
2100 # STEP 7 Configure metrics
2101 if vca_type
== "helm" or vca_type
== "helm-v3":
2102 prometheus_jobs
= await self
.extract_prometheus_scrape_jobs(
2104 artifact_path
=artifact_path
,
2105 ee_config_descriptor
=ee_config_descriptor
,
2108 target_ip
=rw_mgmt_ip
,
2114 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
2117 for job
in prometheus_jobs
:
2120 {"job_name": job
["job_name"]},
2123 fail_on_empty
=False,
2126 step
= "instantiated at VCA"
2127 self
.logger
.debug(logging_text
+ step
)
2129 self
._write
_configuration
_status
(
2130 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
2133 except Exception as e
: # TODO not use Exception but N2VC exception
2134 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2136 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
2139 "Exception while {} : {}".format(step
, e
), exc_info
=True
2141 self
._write
_configuration
_status
(
2142 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2144 raise LcmException("{} {}".format(step
, e
)) from e
2146 def _write_ns_status(
2150 current_operation
: str,
2151 current_operation_id
: str,
2152 error_description
: str = None,
2153 error_detail
: str = None,
2154 other_update
: dict = None,
2157 Update db_nsr fields.
2160 :param current_operation:
2161 :param current_operation_id:
2162 :param error_description:
2163 :param error_detail:
2164 :param other_update: Other required changes at database if provided, will be cleared
2168 db_dict
= other_update
or {}
2171 ] = current_operation_id
# for backward compatibility
2172 db_dict
["_admin.current-operation"] = current_operation_id
2173 db_dict
["_admin.operation-type"] = (
2174 current_operation
if current_operation
!= "IDLE" else None
2176 db_dict
["currentOperation"] = current_operation
2177 db_dict
["currentOperationID"] = current_operation_id
2178 db_dict
["errorDescription"] = error_description
2179 db_dict
["errorDetail"] = error_detail
2182 db_dict
["nsState"] = ns_state
2183 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2184 except DbException
as e
:
2185 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2187 def _write_op_status(
2191 error_message
: str = None,
2192 queuePosition
: int = 0,
2193 operation_state
: str = None,
2194 other_update
: dict = None,
2197 db_dict
= other_update
or {}
2198 db_dict
["queuePosition"] = queuePosition
2199 if isinstance(stage
, list):
2200 db_dict
["stage"] = stage
[0]
2201 db_dict
["detailed-status"] = " ".join(stage
)
2202 elif stage
is not None:
2203 db_dict
["stage"] = str(stage
)
2205 if error_message
is not None:
2206 db_dict
["errorMessage"] = error_message
2207 if operation_state
is not None:
2208 db_dict
["operationState"] = operation_state
2209 db_dict
["statusEnteredTime"] = time()
2210 self
.update_db_2("nslcmops", op_id
, db_dict
)
2211 except DbException
as e
:
2213 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2216 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2218 nsr_id
= db_nsr
["_id"]
2219 # configurationStatus
2220 config_status
= db_nsr
.get("configurationStatus")
2223 "configurationStatus.{}.status".format(index
): status
2224 for index
, v
in enumerate(config_status
)
2228 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2230 except DbException
as e
:
2232 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2235 def _write_configuration_status(
2240 element_under_configuration
: str = None,
2241 element_type
: str = None,
2242 other_update
: dict = None,
2245 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2246 # .format(vca_index, status))
2249 db_path
= "configurationStatus.{}.".format(vca_index
)
2250 db_dict
= other_update
or {}
2252 db_dict
[db_path
+ "status"] = status
2253 if element_under_configuration
:
2255 db_path
+ "elementUnderConfiguration"
2256 ] = element_under_configuration
2258 db_dict
[db_path
+ "elementType"] = element_type
2259 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2260 except DbException
as e
:
2262 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2263 status
, nsr_id
, vca_index
, e
2267 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2269 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2270 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2271 Database is used because the result can be obtained from a different LCM worker in case of HA.
2272 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2273 :param db_nslcmop: database content of nslcmop
2274 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2275 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2276 computed 'vim-account-id'
2279 nslcmop_id
= db_nslcmop
["_id"]
2280 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2281 if placement_engine
== "PLA":
2283 logging_text
+ "Invoke and wait for placement optimization"
2285 await self
.msg
.aiowrite(
2286 "pla", "get_placement", {"nslcmopId": nslcmop_id
}, loop
=self
.loop
2288 db_poll_interval
= 5
2289 wait
= db_poll_interval
* 10
2291 while not pla_result
and wait
>= 0:
2292 await asyncio
.sleep(db_poll_interval
)
2293 wait
-= db_poll_interval
2294 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2295 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2299 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2302 for pla_vnf
in pla_result
["vnf"]:
2303 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2304 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2309 {"_id": vnfr
["_id"]},
2310 {"vim-account-id": pla_vnf
["vimAccountId"]},
2313 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2316 def update_nsrs_with_pla_result(self
, params
):
2318 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2320 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2322 except Exception as e
:
2323 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2325 async def instantiate(self
, nsr_id
, nslcmop_id
):
2328 :param nsr_id: ns instance to deploy
2329 :param nslcmop_id: operation to run
2333 # Try to lock HA task here
2334 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2335 if not task_is_locked_by_me
:
2337 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2341 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2342 self
.logger
.debug(logging_text
+ "Enter")
2344 # get all needed from database
2346 # database nsrs record
2349 # database nslcmops record
2352 # update operation on nsrs
2354 # update operation on nslcmops
2355 db_nslcmop_update
= {}
2357 nslcmop_operation_state
= None
2358 db_vnfrs
= {} # vnf's info indexed by member-index
2360 tasks_dict_info
= {} # from task to info text
2364 "Stage 1/5: preparation of the environment.",
2365 "Waiting for previous operations to terminate.",
2368 # ^ stage, step, VIM progress
2370 # wait for any previous tasks in process
2371 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2373 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2374 stage
[1] = "Reading from database."
2375 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2376 db_nsr_update
["detailed-status"] = "creating"
2377 db_nsr_update
["operational-status"] = "init"
2378 self
._write
_ns
_status
(
2380 ns_state
="BUILDING",
2381 current_operation
="INSTANTIATING",
2382 current_operation_id
=nslcmop_id
,
2383 other_update
=db_nsr_update
,
2385 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2387 # read from db: operation
2388 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2389 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2390 if db_nslcmop
["operationParams"].get("additionalParamsForVnf"):
2391 db_nslcmop
["operationParams"]["additionalParamsForVnf"] = json
.loads(
2392 db_nslcmop
["operationParams"]["additionalParamsForVnf"]
2394 ns_params
= db_nslcmop
.get("operationParams")
2395 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2396 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2398 timeout_ns_deploy
= self
.timeout
.get(
2399 "ns_deploy", self
.timeout_ns_deploy
2403 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2404 self
.logger
.debug(logging_text
+ stage
[1])
2405 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2406 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2407 self
.logger
.debug(logging_text
+ stage
[1])
2408 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2409 self
.fs
.sync(db_nsr
["nsd-id"])
2411 # nsr_name = db_nsr["name"] # TODO short-name??
2413 # read from db: vnf's of this ns
2414 stage
[1] = "Getting vnfrs from db."
2415 self
.logger
.debug(logging_text
+ stage
[1])
2416 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2418 # read from db: vnfd's for every vnf
2419 db_vnfds
= [] # every vnfd data
2421 # for each vnf in ns, read vnfd
2422 for vnfr
in db_vnfrs_list
:
2423 if vnfr
.get("kdur"):
2425 for kdur
in vnfr
["kdur"]:
2426 if kdur
.get("additionalParams"):
2427 kdur
["additionalParams"] = json
.loads(
2428 kdur
["additionalParams"]
2430 kdur_list
.append(kdur
)
2431 vnfr
["kdur"] = kdur_list
2433 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2434 vnfd_id
= vnfr
["vnfd-id"]
2435 vnfd_ref
= vnfr
["vnfd-ref"]
2436 self
.fs
.sync(vnfd_id
)
2438 # if we haven't this vnfd, read it from db
2439 if vnfd_id
not in db_vnfds
:
2441 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2444 self
.logger
.debug(logging_text
+ stage
[1])
2445 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2448 db_vnfds
.append(vnfd
)
2450 # Get or generates the _admin.deployed.VCA list
2451 vca_deployed_list
= None
2452 if db_nsr
["_admin"].get("deployed"):
2453 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2454 if vca_deployed_list
is None:
2455 vca_deployed_list
= []
2456 configuration_status_list
= []
2457 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2458 db_nsr_update
["configurationStatus"] = configuration_status_list
2459 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2460 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2461 elif isinstance(vca_deployed_list
, dict):
2462 # maintain backward compatibility. Change a dict to list at database
2463 vca_deployed_list
= list(vca_deployed_list
.values())
2464 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2465 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2468 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2470 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2471 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2473 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2474 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2475 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2477 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2480 # n2vc_redesign STEP 2 Deploy Network Scenario
2481 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2482 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2484 stage
[1] = "Deploying KDUs."
2485 # self.logger.debug(logging_text + "Before deploy_kdus")
2486 # Call to deploy_kdus in case exists the "vdu:kdu" param
2487 await self
.deploy_kdus(
2488 logging_text
=logging_text
,
2490 nslcmop_id
=nslcmop_id
,
2493 task_instantiation_info
=tasks_dict_info
,
2496 stage
[1] = "Getting VCA public key."
2497 # n2vc_redesign STEP 1 Get VCA public ssh-key
2498 # feature 1429. Add n2vc public key to needed VMs
2499 n2vc_key
= self
.n2vc
.get_public_key()
2500 n2vc_key_list
= [n2vc_key
]
2501 if self
.vca_config
.get("public_key"):
2502 n2vc_key_list
.append(self
.vca_config
["public_key"])
2504 stage
[1] = "Deploying NS at VIM."
2505 task_ro
= asyncio
.ensure_future(
2506 self
.instantiate_RO(
2507 logging_text
=logging_text
,
2511 db_nslcmop
=db_nslcmop
,
2514 n2vc_key_list
=n2vc_key_list
,
2518 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2519 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2521 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2522 stage
[1] = "Deploying Execution Environments."
2523 self
.logger
.debug(logging_text
+ stage
[1])
2525 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2526 for vnf_profile
in get_vnf_profiles(nsd
):
2527 vnfd_id
= vnf_profile
["vnfd-id"]
2528 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2529 member_vnf_index
= str(vnf_profile
["id"])
2530 db_vnfr
= db_vnfrs
[member_vnf_index
]
2531 base_folder
= vnfd
["_admin"]["storage"]
2537 # Get additional parameters
2538 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2539 if db_vnfr
.get("additionalParamsForVnf"):
2540 deploy_params
.update(
2541 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2544 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2545 if descriptor_config
:
2547 logging_text
=logging_text
2548 + "member_vnf_index={} ".format(member_vnf_index
),
2551 nslcmop_id
=nslcmop_id
,
2557 member_vnf_index
=member_vnf_index
,
2558 vdu_index
=vdu_index
,
2560 deploy_params
=deploy_params
,
2561 descriptor_config
=descriptor_config
,
2562 base_folder
=base_folder
,
2563 task_instantiation_info
=tasks_dict_info
,
2567 # Deploy charms for each VDU that supports one.
2568 for vdud
in get_vdu_list(vnfd
):
2570 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2571 vdur
= find_in_list(
2572 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2575 if vdur
.get("additionalParams"):
2576 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2578 deploy_params_vdu
= deploy_params
2579 deploy_params_vdu
["OSM"] = get_osm_params(
2580 db_vnfr
, vdu_id
, vdu_count_index
=0
2582 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2584 self
.logger
.debug("VDUD > {}".format(vdud
))
2586 "Descriptor config > {}".format(descriptor_config
)
2588 if descriptor_config
:
2591 for vdu_index
in range(vdud_count
):
2592 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2594 logging_text
=logging_text
2595 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2596 member_vnf_index
, vdu_id
, vdu_index
2600 nslcmop_id
=nslcmop_id
,
2606 member_vnf_index
=member_vnf_index
,
2607 vdu_index
=vdu_index
,
2609 deploy_params
=deploy_params_vdu
,
2610 descriptor_config
=descriptor_config
,
2611 base_folder
=base_folder
,
2612 task_instantiation_info
=tasks_dict_info
,
2615 for kdud
in get_kdu_list(vnfd
):
2616 kdu_name
= kdud
["name"]
2617 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2618 if descriptor_config
:
2623 x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
2625 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2626 if kdur
.get("additionalParams"):
2627 deploy_params_kdu
.update(
2628 parse_yaml_strings(kdur
["additionalParams"].copy())
2632 logging_text
=logging_text
,
2635 nslcmop_id
=nslcmop_id
,
2641 member_vnf_index
=member_vnf_index
,
2642 vdu_index
=vdu_index
,
2644 deploy_params
=deploy_params_kdu
,
2645 descriptor_config
=descriptor_config
,
2646 base_folder
=base_folder
,
2647 task_instantiation_info
=tasks_dict_info
,
2651 # Check if this NS has a charm configuration
2652 descriptor_config
= nsd
.get("ns-configuration")
2653 if descriptor_config
and descriptor_config
.get("juju"):
2656 member_vnf_index
= None
2662 # Get additional parameters
2663 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2664 if db_nsr
.get("additionalParamsForNs"):
2665 deploy_params
.update(
2666 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2668 base_folder
= nsd
["_admin"]["storage"]
2670 logging_text
=logging_text
,
2673 nslcmop_id
=nslcmop_id
,
2679 member_vnf_index
=member_vnf_index
,
2680 vdu_index
=vdu_index
,
2682 deploy_params
=deploy_params
,
2683 descriptor_config
=descriptor_config
,
2684 base_folder
=base_folder
,
2685 task_instantiation_info
=tasks_dict_info
,
2689 # rest of staff will be done at finally
2692 ROclient
.ROClientException
,
2698 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
2701 except asyncio
.CancelledError
:
2703 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
2705 exc
= "Operation was cancelled"
2706 except Exception as e
:
2707 exc
= traceback
.format_exc()
2708 self
.logger
.critical(
2709 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
2714 error_list
.append(str(exc
))
2716 # wait for pending tasks
2718 stage
[1] = "Waiting for instantiate pending tasks."
2719 self
.logger
.debug(logging_text
+ stage
[1])
2720 error_list
+= await self
._wait
_for
_tasks
(
2728 stage
[1] = stage
[2] = ""
2729 except asyncio
.CancelledError
:
2730 error_list
.append("Cancelled")
2731 # TODO cancel all tasks
2732 except Exception as exc
:
2733 error_list
.append(str(exc
))
2735 # update operation-status
2736 db_nsr_update
["operational-status"] = "running"
2737 # let's begin with VCA 'configured' status (later we can change it)
2738 db_nsr_update
["config-status"] = "configured"
2739 for task
, task_name
in tasks_dict_info
.items():
2740 if not task
.done() or task
.cancelled() or task
.exception():
2741 if task_name
.startswith(self
.task_name_deploy_vca
):
2742 # A N2VC task is pending
2743 db_nsr_update
["config-status"] = "failed"
2745 # RO or KDU task is pending
2746 db_nsr_update
["operational-status"] = "failed"
2748 # update status at database
2750 error_detail
= ". ".join(error_list
)
2751 self
.logger
.error(logging_text
+ error_detail
)
2752 error_description_nslcmop
= "{} Detail: {}".format(
2753 stage
[0], error_detail
2755 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
2756 nslcmop_id
, stage
[0]
2759 db_nsr_update
["detailed-status"] = (
2760 error_description_nsr
+ " Detail: " + error_detail
2762 db_nslcmop_update
["detailed-status"] = error_detail
2763 nslcmop_operation_state
= "FAILED"
2767 error_description_nsr
= error_description_nslcmop
= None
2769 db_nsr_update
["detailed-status"] = "Done"
2770 db_nslcmop_update
["detailed-status"] = "Done"
2771 nslcmop_operation_state
= "COMPLETED"
2774 self
._write
_ns
_status
(
2777 current_operation
="IDLE",
2778 current_operation_id
=None,
2779 error_description
=error_description_nsr
,
2780 error_detail
=error_detail
,
2781 other_update
=db_nsr_update
,
2783 self
._write
_op
_status
(
2786 error_message
=error_description_nslcmop
,
2787 operation_state
=nslcmop_operation_state
,
2788 other_update
=db_nslcmop_update
,
2791 if nslcmop_operation_state
:
2793 await self
.msg
.aiowrite(
2798 "nslcmop_id": nslcmop_id
,
2799 "operationState": nslcmop_operation_state
,
2803 except Exception as e
:
2805 logging_text
+ "kafka_write notification Exception {}".format(e
)
2808 self
.logger
.debug(logging_text
+ "Exit")
2809 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2811 def _get_vnfd(self
, vnfd_id
: str, cached_vnfds
: Dict
[str, Any
]):
2812 if vnfd_id
not in cached_vnfds
:
2813 cached_vnfds
[vnfd_id
] = self
.db
.get_one("vnfds", {"id": vnfd_id
})
2814 return cached_vnfds
[vnfd_id
]
2816 def _get_vnfr(self
, nsr_id
: str, vnf_profile_id
: str, cached_vnfrs
: Dict
[str, Any
]):
2817 if vnf_profile_id
not in cached_vnfrs
:
2818 cached_vnfrs
[vnf_profile_id
] = self
.db
.get_one(
2821 "member-vnf-index-ref": vnf_profile_id
,
2822 "nsr-id-ref": nsr_id
,
2825 return cached_vnfrs
[vnf_profile_id
]
2827 def _is_deployed_vca_in_relation(
2828 self
, vca
: DeployedVCA
, relation
: Relation
2831 for endpoint
in (relation
.provider
, relation
.requirer
):
2832 if endpoint
["kdu-resource-profile-id"]:
2835 vca
.vnf_profile_id
== endpoint
.vnf_profile_id
2836 and vca
.vdu_profile_id
== endpoint
.vdu_profile_id
2837 and vca
.execution_environment_ref
== endpoint
.execution_environment_ref
2843 def _update_ee_relation_data_with_implicit_data(
2844 self
, nsr_id
, nsd
, ee_relation_data
, cached_vnfds
, vnf_profile_id
: str = None
2846 ee_relation_data
= safe_get_ee_relation(
2847 nsr_id
, ee_relation_data
, vnf_profile_id
=vnf_profile_id
2849 ee_relation_level
= EELevel
.get_level(ee_relation_data
)
2850 if (ee_relation_level
in (EELevel
.VNF
, EELevel
.VDU
)) and not ee_relation_data
[
2851 "execution-environment-ref"
2853 vnf_profile
= get_vnf_profile(nsd
, ee_relation_data
["vnf-profile-id"])
2854 vnfd_id
= vnf_profile
["vnfd-id"]
2855 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2858 if ee_relation_level
== EELevel
.VNF
2859 else ee_relation_data
["vdu-profile-id"]
2861 ee
= get_juju_ee_ref(db_vnfd
, entity_id
)
2864 f
"not execution environments found for ee_relation {ee_relation_data}"
2866 ee_relation_data
["execution-environment-ref"] = ee
["id"]
2867 return ee_relation_data
2869 def _get_ns_relations(
2872 nsd
: Dict
[str, Any
],
2874 cached_vnfds
: Dict
[str, Any
],
2875 ) -> List
[Relation
]:
2877 db_ns_relations
= get_ns_configuration_relation_list(nsd
)
2878 for r
in db_ns_relations
:
2879 provider_dict
= None
2880 requirer_dict
= None
2881 if all(key
in r
for key
in ("provider", "requirer")):
2882 provider_dict
= r
["provider"]
2883 requirer_dict
= r
["requirer"]
2884 elif "entities" in r
:
2885 provider_id
= r
["entities"][0]["id"]
2888 "endpoint": r
["entities"][0]["endpoint"],
2890 if provider_id
!= nsd
["id"]:
2891 provider_dict
["vnf-profile-id"] = provider_id
2892 requirer_id
= r
["entities"][1]["id"]
2895 "endpoint": r
["entities"][1]["endpoint"],
2897 if requirer_id
!= nsd
["id"]:
2898 requirer_dict
["vnf-profile-id"] = requirer_id
2901 "provider/requirer or entities must be included in the relation."
2903 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2904 nsr_id
, nsd
, provider_dict
, cached_vnfds
2906 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2907 nsr_id
, nsd
, requirer_dict
, cached_vnfds
2909 provider
= EERelation(relation_provider
)
2910 requirer
= EERelation(relation_requirer
)
2911 relation
= Relation(r
["name"], provider
, requirer
)
2912 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
2914 relations
.append(relation
)
2917 def _get_vnf_relations(
2920 nsd
: Dict
[str, Any
],
2922 cached_vnfds
: Dict
[str, Any
],
2923 ) -> List
[Relation
]:
2925 vnf_profile
= get_vnf_profile(nsd
, vca
.vnf_profile_id
)
2926 vnf_profile_id
= vnf_profile
["id"]
2927 vnfd_id
= vnf_profile
["vnfd-id"]
2928 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2929 db_vnf_relations
= get_relation_list(db_vnfd
, vnfd_id
)
2930 for r
in db_vnf_relations
:
2931 provider_dict
= None
2932 requirer_dict
= None
2933 if all(key
in r
for key
in ("provider", "requirer")):
2934 provider_dict
= r
["provider"]
2935 requirer_dict
= r
["requirer"]
2936 elif "entities" in r
:
2937 provider_id
= r
["entities"][0]["id"]
2940 "vnf-profile-id": vnf_profile_id
,
2941 "endpoint": r
["entities"][0]["endpoint"],
2943 if provider_id
!= vnfd_id
:
2944 provider_dict
["vdu-profile-id"] = provider_id
2945 requirer_id
= r
["entities"][1]["id"]
2948 "vnf-profile-id": vnf_profile_id
,
2949 "endpoint": r
["entities"][1]["endpoint"],
2951 if requirer_id
!= vnfd_id
:
2952 requirer_dict
["vdu-profile-id"] = requirer_id
2955 "provider/requirer or entities must be included in the relation."
2957 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2958 nsr_id
, nsd
, provider_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
2960 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2961 nsr_id
, nsd
, requirer_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
2963 provider
= EERelation(relation_provider
)
2964 requirer
= EERelation(relation_requirer
)
2965 relation
= Relation(r
["name"], provider
, requirer
)
2966 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
2968 relations
.append(relation
)
2971 def _get_kdu_resource_data(
2973 ee_relation
: EERelation
,
2974 db_nsr
: Dict
[str, Any
],
2975 cached_vnfds
: Dict
[str, Any
],
2976 ) -> DeployedK8sResource
:
2977 nsd
= get_nsd(db_nsr
)
2978 vnf_profiles
= get_vnf_profiles(nsd
)
2979 vnfd_id
= find_in_list(
2981 lambda vnf_profile
: vnf_profile
["id"] == ee_relation
.vnf_profile_id
,
2983 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2984 kdu_resource_profile
= get_kdu_resource_profile(
2985 db_vnfd
, ee_relation
.kdu_resource_profile_id
2987 kdu_name
= kdu_resource_profile
["kdu-name"]
2988 deployed_kdu
, _
= get_deployed_kdu(
2989 db_nsr
.get("_admin", ()).get("deployed", ()),
2991 ee_relation
.vnf_profile_id
,
2993 deployed_kdu
.update({"resource-name": kdu_resource_profile
["resource-name"]})
2996 def _get_deployed_component(
2998 ee_relation
: EERelation
,
2999 db_nsr
: Dict
[str, Any
],
3000 cached_vnfds
: Dict
[str, Any
],
3001 ) -> DeployedComponent
:
3002 nsr_id
= db_nsr
["_id"]
3003 deployed_component
= None
3004 ee_level
= EELevel
.get_level(ee_relation
)
3005 if ee_level
== EELevel
.NS
:
3006 vca
= get_deployed_vca(db_nsr
, {"vdu_id": None, "member-vnf-index": None})
3008 deployed_component
= DeployedVCA(nsr_id
, vca
)
3009 elif ee_level
== EELevel
.VNF
:
3010 vca
= get_deployed_vca(
3014 "member-vnf-index": ee_relation
.vnf_profile_id
,
3015 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3019 deployed_component
= DeployedVCA(nsr_id
, vca
)
3020 elif ee_level
== EELevel
.VDU
:
3021 vca
= get_deployed_vca(
3024 "vdu_id": ee_relation
.vdu_profile_id
,
3025 "member-vnf-index": ee_relation
.vnf_profile_id
,
3026 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3030 deployed_component
= DeployedVCA(nsr_id
, vca
)
3031 elif ee_level
== EELevel
.KDU
:
3032 kdu_resource_data
= self
._get
_kdu
_resource
_data
(
3033 ee_relation
, db_nsr
, cached_vnfds
3035 if kdu_resource_data
:
3036 deployed_component
= DeployedK8sResource(kdu_resource_data
)
3037 return deployed_component
3039 async def _add_relation(
3043 db_nsr
: Dict
[str, Any
],
3044 cached_vnfds
: Dict
[str, Any
],
3045 cached_vnfrs
: Dict
[str, Any
],
3047 deployed_provider
= self
._get
_deployed
_component
(
3048 relation
.provider
, db_nsr
, cached_vnfds
3050 deployed_requirer
= self
._get
_deployed
_component
(
3051 relation
.requirer
, db_nsr
, cached_vnfds
3055 and deployed_requirer
3056 and deployed_provider
.config_sw_installed
3057 and deployed_requirer
.config_sw_installed
3059 provider_db_vnfr
= (
3061 relation
.provider
.nsr_id
,
3062 relation
.provider
.vnf_profile_id
,
3065 if relation
.provider
.vnf_profile_id
3068 requirer_db_vnfr
= (
3070 relation
.requirer
.nsr_id
,
3071 relation
.requirer
.vnf_profile_id
,
3074 if relation
.requirer
.vnf_profile_id
3077 provider_vca_id
= self
.get_vca_id(provider_db_vnfr
, db_nsr
)
3078 requirer_vca_id
= self
.get_vca_id(requirer_db_vnfr
, db_nsr
)
3079 provider_relation_endpoint
= RelationEndpoint(
3080 deployed_provider
.ee_id
,
3082 relation
.provider
.endpoint
,
3084 requirer_relation_endpoint
= RelationEndpoint(
3085 deployed_requirer
.ee_id
,
3087 relation
.requirer
.endpoint
,
3089 await self
.vca_map
[vca_type
].add_relation(
3090 provider
=provider_relation_endpoint
,
3091 requirer
=requirer_relation_endpoint
,
3093 # remove entry from relations list
3097 async def _add_vca_relations(
3103 timeout
: int = 3600,
3107 # 1. find all relations for this VCA
3108 # 2. wait for other peers related
3112 # STEP 1: find all relations for this VCA
3115 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3116 nsd
= get_nsd(db_nsr
)
3119 deployed_vca_dict
= get_deployed_vca_list(db_nsr
)[vca_index
]
3120 my_vca
= DeployedVCA(nsr_id
, deployed_vca_dict
)
3125 relations
.extend(self
._get
_ns
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3126 relations
.extend(self
._get
_vnf
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3128 # if no relations, terminate
3130 self
.logger
.debug(logging_text
+ " No relations")
3133 self
.logger
.debug(logging_text
+ " adding relations {}".format(relations
))
3140 if now
- start
>= timeout
:
3141 self
.logger
.error(logging_text
+ " : timeout adding relations")
3144 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3145 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3147 # for each relation, find the VCA's related
3148 for relation
in relations
.copy():
3149 added
= await self
._add
_relation
(
3157 relations
.remove(relation
)
3160 self
.logger
.debug("Relations added")
3162 await asyncio
.sleep(5.0)
3166 except Exception as e
:
3167 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
3170 async def _install_kdu(
3178 k8s_instance_info
: dict,
3179 k8params
: dict = None,
3185 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
3188 "collection": "nsrs",
3189 "filter": {"_id": nsr_id
},
3190 "path": nsr_db_path
,
3193 if k8s_instance_info
.get("kdu-deployment-name"):
3194 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
3196 kdu_instance
= self
.k8scluster_map
[
3198 ].generate_kdu_instance_name(
3199 db_dict
=db_dict_install
,
3200 kdu_model
=k8s_instance_info
["kdu-model"],
3201 kdu_name
=k8s_instance_info
["kdu-name"],
3204 "nsrs", nsr_id
, {nsr_db_path
+ ".kdu-instance": kdu_instance
}
3206 await self
.k8scluster_map
[k8sclustertype
].install(
3207 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3208 kdu_model
=k8s_instance_info
["kdu-model"],
3211 db_dict
=db_dict_install
,
3213 kdu_name
=k8s_instance_info
["kdu-name"],
3214 namespace
=k8s_instance_info
["namespace"],
3215 kdu_instance
=kdu_instance
,
3219 "nsrs", nsr_id
, {nsr_db_path
+ ".kdu-instance": kdu_instance
}
3222 # Obtain services to obtain management service ip
3223 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
3224 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3225 kdu_instance
=kdu_instance
,
3226 namespace
=k8s_instance_info
["namespace"],
3229 # Obtain management service info (if exists)
3230 vnfr_update_dict
= {}
3231 kdu_config
= get_configuration(vnfd
, kdud
["name"])
3233 target_ee_list
= kdu_config
.get("execution-environment-list", [])
3238 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
3241 for service
in kdud
.get("service", [])
3242 if service
.get("mgmt-service")
3244 for mgmt_service
in mgmt_services
:
3245 for service
in services
:
3246 if service
["name"].startswith(mgmt_service
["name"]):
3247 # Mgmt service found, Obtain service ip
3248 ip
= service
.get("external_ip", service
.get("cluster_ip"))
3249 if isinstance(ip
, list) and len(ip
) == 1:
3253 "kdur.{}.ip-address".format(kdu_index
)
3256 # Check if must update also mgmt ip at the vnf
3257 service_external_cp
= mgmt_service
.get(
3258 "external-connection-point-ref"
3260 if service_external_cp
:
3262 deep_get(vnfd
, ("mgmt-interface", "cp"))
3263 == service_external_cp
3265 vnfr_update_dict
["ip-address"] = ip
3270 "external-connection-point-ref", ""
3272 == service_external_cp
,
3275 "kdur.{}.ip-address".format(kdu_index
)
3280 "Mgmt service name: {} not found".format(
3281 mgmt_service
["name"]
3285 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3286 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3288 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3291 and kdu_config
.get("initial-config-primitive")
3292 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3294 initial_config_primitive_list
= kdu_config
.get(
3295 "initial-config-primitive"
3297 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3299 for initial_config_primitive
in initial_config_primitive_list
:
3300 primitive_params_
= self
._map
_primitive
_params
(
3301 initial_config_primitive
, {}, {}
3304 await asyncio
.wait_for(
3305 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3306 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3307 kdu_instance
=kdu_instance
,
3308 primitive_name
=initial_config_primitive
["name"],
3309 params
=primitive_params_
,
3310 db_dict
=db_dict_install
,
3316 except Exception as e
:
3317 # Prepare update db with error and raise exception
3320 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3324 vnfr_data
.get("_id"),
3325 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3328 # ignore to keep original exception
3330 # reraise original error
3335 async def deploy_kdus(
3342 task_instantiation_info
,
3344 # Launch kdus if present in the descriptor
3346 k8scluster_id_2_uuic
= {
3347 "helm-chart-v3": {},
3352 async def _get_cluster_id(cluster_id
, cluster_type
):
3353 nonlocal k8scluster_id_2_uuic
3354 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3355 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3357 # check if K8scluster is creating and wait look if previous tasks in process
3358 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3359 "k8scluster", cluster_id
3362 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3363 task_name
, cluster_id
3365 self
.logger
.debug(logging_text
+ text
)
3366 await asyncio
.wait(task_dependency
, timeout
=3600)
3368 db_k8scluster
= self
.db
.get_one(
3369 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3371 if not db_k8scluster
:
3372 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3374 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3376 if cluster_type
== "helm-chart-v3":
3378 # backward compatibility for existing clusters that have not been initialized for helm v3
3379 k8s_credentials
= yaml
.safe_dump(
3380 db_k8scluster
.get("credentials")
3382 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3383 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3385 db_k8scluster_update
= {}
3386 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3387 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3388 db_k8scluster_update
[
3389 "_admin.helm-chart-v3.created"
3391 db_k8scluster_update
[
3392 "_admin.helm-chart-v3.operationalState"
3395 "k8sclusters", cluster_id
, db_k8scluster_update
3397 except Exception as e
:
3400 + "error initializing helm-v3 cluster: {}".format(str(e
))
3403 "K8s cluster '{}' has not been initialized for '{}'".format(
3404 cluster_id
, cluster_type
3409 "K8s cluster '{}' has not been initialized for '{}'".format(
3410 cluster_id
, cluster_type
3413 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3416 logging_text
+= "Deploy kdus: "
3419 db_nsr_update
= {"_admin.deployed.K8s": []}
3420 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3423 updated_cluster_list
= []
3424 updated_v3_cluster_list
= []
3426 for vnfr_data
in db_vnfrs
.values():
3427 vca_id
= self
.get_vca_id(vnfr_data
, {})
3428 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3429 # Step 0: Prepare and set parameters
3430 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3431 vnfd_id
= vnfr_data
.get("vnfd-id")
3432 vnfd_with_id
= find_in_list(
3433 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3437 for kdud
in vnfd_with_id
["kdu"]
3438 if kdud
["name"] == kdur
["kdu-name"]
3440 namespace
= kdur
.get("k8s-namespace")
3441 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3442 if kdur
.get("helm-chart"):
3443 kdumodel
= kdur
["helm-chart"]
3444 # Default version: helm3, if helm-version is v2 assign v2
3445 k8sclustertype
= "helm-chart-v3"
3446 self
.logger
.debug("kdur: {}".format(kdur
))
3448 kdur
.get("helm-version")
3449 and kdur
.get("helm-version") == "v2"
3451 k8sclustertype
= "helm-chart"
3452 elif kdur
.get("juju-bundle"):
3453 kdumodel
= kdur
["juju-bundle"]
3454 k8sclustertype
= "juju-bundle"
3457 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3458 "juju-bundle. Maybe an old NBI version is running".format(
3459 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3462 # check if kdumodel is a file and exists
3464 vnfd_with_id
= find_in_list(
3465 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3467 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3468 if storage
: # may be not present if vnfd has not artifacts
3469 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3470 if storage
["pkg-dir"]:
3471 filename
= "{}/{}/{}s/{}".format(
3478 filename
= "{}/Scripts/{}s/{}".format(
3483 if self
.fs
.file_exists(
3484 filename
, mode
="file"
3485 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3486 kdumodel
= self
.fs
.path
+ filename
3487 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3489 except Exception: # it is not a file
3492 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3493 step
= "Synchronize repos for k8s cluster '{}'".format(
3496 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3500 k8sclustertype
== "helm-chart"
3501 and cluster_uuid
not in updated_cluster_list
3503 k8sclustertype
== "helm-chart-v3"
3504 and cluster_uuid
not in updated_v3_cluster_list
3506 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3507 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3508 cluster_uuid
=cluster_uuid
3511 if del_repo_list
or added_repo_dict
:
3512 if k8sclustertype
== "helm-chart":
3514 "_admin.helm_charts_added." + item
: None
3515 for item
in del_repo_list
3518 "_admin.helm_charts_added." + item
: name
3519 for item
, name
in added_repo_dict
.items()
3521 updated_cluster_list
.append(cluster_uuid
)
3522 elif k8sclustertype
== "helm-chart-v3":
3524 "_admin.helm_charts_v3_added." + item
: None
3525 for item
in del_repo_list
3528 "_admin.helm_charts_v3_added." + item
: name
3529 for item
, name
in added_repo_dict
.items()
3531 updated_v3_cluster_list
.append(cluster_uuid
)
3533 logging_text
+ "repos synchronized on k8s cluster "
3534 "'{}' to_delete: {}, to_add: {}".format(
3535 k8s_cluster_id
, del_repo_list
, added_repo_dict
3540 {"_id": k8s_cluster_id
},
3546 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3547 vnfr_data
["member-vnf-index-ref"],
3551 k8s_instance_info
= {
3552 "kdu-instance": None,
3553 "k8scluster-uuid": cluster_uuid
,
3554 "k8scluster-type": k8sclustertype
,
3555 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3556 "kdu-name": kdur
["kdu-name"],
3557 "kdu-model": kdumodel
,
3558 "namespace": namespace
,
3559 "kdu-deployment-name": kdu_deployment_name
,
3561 db_path
= "_admin.deployed.K8s.{}".format(index
)
3562 db_nsr_update
[db_path
] = k8s_instance_info
3563 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3564 vnfd_with_id
= find_in_list(
3565 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3567 task
= asyncio
.ensure_future(
3576 k8params
=desc_params
,
3581 self
.lcm_tasks
.register(
3585 "instantiate_KDU-{}".format(index
),
3588 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3594 except (LcmException
, asyncio
.CancelledError
):
3596 except Exception as e
:
3597 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3598 if isinstance(e
, (N2VCException
, DbException
)):
3599 self
.logger
.error(logging_text
+ msg
)
3601 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3602 raise LcmException(msg
)
3605 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3624 task_instantiation_info
,
3627 # launch instantiate_N2VC in a asyncio task and register task object
3628 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3629 # if not found, create one entry and update database
3630 # fill db_nsr._admin.deployed.VCA.<index>
3633 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
3635 if "execution-environment-list" in descriptor_config
:
3636 ee_list
= descriptor_config
.get("execution-environment-list", [])
3637 elif "juju" in descriptor_config
:
3638 ee_list
= [descriptor_config
] # ns charms
3639 else: # other types as script are not supported
3642 for ee_item
in ee_list
:
3645 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3646 ee_item
.get("juju"), ee_item
.get("helm-chart")
3649 ee_descriptor_id
= ee_item
.get("id")
3650 if ee_item
.get("juju"):
3651 vca_name
= ee_item
["juju"].get("charm")
3654 if ee_item
["juju"].get("charm") is not None
3657 if ee_item
["juju"].get("cloud") == "k8s":
3658 vca_type
= "k8s_proxy_charm"
3659 elif ee_item
["juju"].get("proxy") is False:
3660 vca_type
= "native_charm"
3661 elif ee_item
.get("helm-chart"):
3662 vca_name
= ee_item
["helm-chart"]
3663 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
3666 vca_type
= "helm-v3"
3669 logging_text
+ "skipping non juju neither charm configuration"
3674 for vca_index
, vca_deployed
in enumerate(
3675 db_nsr
["_admin"]["deployed"]["VCA"]
3677 if not vca_deployed
:
3680 vca_deployed
.get("member-vnf-index") == member_vnf_index
3681 and vca_deployed
.get("vdu_id") == vdu_id
3682 and vca_deployed
.get("kdu_name") == kdu_name
3683 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
3684 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
3688 # not found, create one.
3690 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
3693 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
3695 target
+= "/kdu/{}".format(kdu_name
)
3697 "target_element": target
,
3698 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3699 "member-vnf-index": member_vnf_index
,
3701 "kdu_name": kdu_name
,
3702 "vdu_count_index": vdu_index
,
3703 "operational-status": "init", # TODO revise
3704 "detailed-status": "", # TODO revise
3705 "step": "initial-deploy", # TODO revise
3707 "vdu_name": vdu_name
,
3709 "ee_descriptor_id": ee_descriptor_id
,
3713 # create VCA and configurationStatus in db
3715 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
3716 "configurationStatus.{}".format(vca_index
): dict(),
3718 self
.update_db_2("nsrs", nsr_id
, db_dict
)
3720 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
3722 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
3723 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
3724 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
3727 task_n2vc
= asyncio
.ensure_future(
3728 self
.instantiate_N2VC(
3729 logging_text
=logging_text
,
3730 vca_index
=vca_index
,
3736 vdu_index
=vdu_index
,
3737 deploy_params
=deploy_params
,
3738 config_descriptor
=descriptor_config
,
3739 base_folder
=base_folder
,
3740 nslcmop_id
=nslcmop_id
,
3744 ee_config_descriptor
=ee_item
,
3747 self
.lcm_tasks
.register(
3751 "instantiate_N2VC-{}".format(vca_index
),
3754 task_instantiation_info
[
3756 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
3757 member_vnf_index
or "", vdu_id
or ""
3761 def _create_nslcmop(nsr_id
, operation
, params
):
3763 Creates a ns-lcm-opp content to be stored at database.
3764 :param nsr_id: internal id of the instance
3765 :param operation: instantiate, terminate, scale, action, ...
3766 :param params: user parameters for the operation
3767 :return: dictionary following SOL005 format
3769 # Raise exception if invalid arguments
3770 if not (nsr_id
and operation
and params
):
3772 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3779 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3780 "operationState": "PROCESSING",
3781 "statusEnteredTime": now
,
3782 "nsInstanceId": nsr_id
,
3783 "lcmOperationType": operation
,
3785 "isAutomaticInvocation": False,
3786 "operationParams": params
,
3787 "isCancelPending": False,
3789 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
3790 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
3795 def _format_additional_params(self
, params
):
3796 params
= params
or {}
3797 for key
, value
in params
.items():
3798 if str(value
).startswith("!!yaml "):
3799 params
[key
] = yaml
.safe_load(value
[7:])
3802 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
3803 primitive
= seq
.get("name")
3804 primitive_params
= {}
3806 "member_vnf_index": vnf_index
,
3807 "primitive": primitive
,
3808 "primitive_params": primitive_params
,
3811 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
3815 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
3816 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
3817 if op
.get("operationState") == "COMPLETED":
3818 # b. Skip sub-operation
3819 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3820 return self
.SUBOPERATION_STATUS_SKIP
3822 # c. retry executing sub-operation
3823 # The sub-operation exists, and operationState != 'COMPLETED'
3824 # Update operationState = 'PROCESSING' to indicate a retry.
3825 operationState
= "PROCESSING"
3826 detailed_status
= "In progress"
3827 self
._update
_suboperation
_status
(
3828 db_nslcmop
, op_index
, operationState
, detailed_status
3830 # Return the sub-operation index
3831 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3832 # with arguments extracted from the sub-operation
3835 # Find a sub-operation where all keys in a matching dictionary must match
3836 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3837 def _find_suboperation(self
, db_nslcmop
, match
):
3838 if db_nslcmop
and match
:
3839 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
3840 for i
, op
in enumerate(op_list
):
3841 if all(op
.get(k
) == match
[k
] for k
in match
):
3843 return self
.SUBOPERATION_STATUS_NOT_FOUND
3845 # Update status for a sub-operation given its index
3846 def _update_suboperation_status(
3847 self
, db_nslcmop
, op_index
, operationState
, detailed_status
3849 # Update DB for HA tasks
3850 q_filter
= {"_id": db_nslcmop
["_id"]}
3852 "_admin.operations.{}.operationState".format(op_index
): operationState
,
3853 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
3856 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
3859 # Add sub-operation, return the index of the added sub-operation
3860 # Optionally, set operationState, detailed-status, and operationType
3861 # Status and type are currently set for 'scale' sub-operations:
3862 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3863 # 'detailed-status' : status message
3864 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3865 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3866 def _add_suboperation(
3874 mapped_primitive_params
,
3875 operationState
=None,
3876 detailed_status
=None,
3879 RO_scaling_info
=None,
3882 return self
.SUBOPERATION_STATUS_NOT_FOUND
3883 # Get the "_admin.operations" list, if it exists
3884 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
3885 op_list
= db_nslcmop_admin
.get("operations")
3886 # Create or append to the "_admin.operations" list
3888 "member_vnf_index": vnf_index
,
3890 "vdu_count_index": vdu_count_index
,
3891 "primitive": primitive
,
3892 "primitive_params": mapped_primitive_params
,
3895 new_op
["operationState"] = operationState
3897 new_op
["detailed-status"] = detailed_status
3899 new_op
["lcmOperationType"] = operationType
3901 new_op
["RO_nsr_id"] = RO_nsr_id
3903 new_op
["RO_scaling_info"] = RO_scaling_info
3905 # No existing operations, create key 'operations' with current operation as first list element
3906 db_nslcmop_admin
.update({"operations": [new_op
]})
3907 op_list
= db_nslcmop_admin
.get("operations")
3909 # Existing operations, append operation to list
3910 op_list
.append(new_op
)
3912 db_nslcmop_update
= {"_admin.operations": op_list
}
3913 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
3914 op_index
= len(op_list
) - 1
3917 # Helper methods for scale() sub-operations
3919 # pre-scale/post-scale:
3920 # Check for 3 different cases:
3921 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
3922 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
3923 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
3924 def _check_or_add_scale_suboperation(
3928 vnf_config_primitive
,
3932 RO_scaling_info
=None,
3934 # Find this sub-operation
3935 if RO_nsr_id
and RO_scaling_info
:
3936 operationType
= "SCALE-RO"
3938 "member_vnf_index": vnf_index
,
3939 "RO_nsr_id": RO_nsr_id
,
3940 "RO_scaling_info": RO_scaling_info
,
3944 "member_vnf_index": vnf_index
,
3945 "primitive": vnf_config_primitive
,
3946 "primitive_params": primitive_params
,
3947 "lcmOperationType": operationType
,
3949 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
3950 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
3951 # a. New sub-operation
3952 # The sub-operation does not exist, add it.
3953 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
3954 # The following parameters are set to None for all kind of scaling:
3956 vdu_count_index
= None
3958 if RO_nsr_id
and RO_scaling_info
:
3959 vnf_config_primitive
= None
3960 primitive_params
= None
3963 RO_scaling_info
= None
3964 # Initial status for sub-operation
3965 operationState
= "PROCESSING"
3966 detailed_status
= "In progress"
3967 # Add sub-operation for pre/post-scaling (zero or more operations)
3968 self
._add
_suboperation
(
3974 vnf_config_primitive
,
3982 return self
.SUBOPERATION_STATUS_NEW
3984 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3985 # or op_index (operationState != 'COMPLETED')
3986 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
3988 # Function to return execution_environment id
3990 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
3991 # TODO vdu_index_count
3992 for vca
in vca_deployed_list
:
3993 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
3996 async def destroy_N2VC(
4004 exec_primitives
=True,
4009 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
4010 :param logging_text:
4012 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
4013 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
4014 :param vca_index: index in the database _admin.deployed.VCA
4015 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
4016 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
4017 not executed properly
4018 :param scaling_in: True destroys the application, False destroys the model
4019 :return: None or exception
4024 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
4025 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
4029 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
4031 # execute terminate_primitives
4033 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
4034 config_descriptor
.get("terminate-config-primitive"),
4035 vca_deployed
.get("ee_descriptor_id"),
4037 vdu_id
= vca_deployed
.get("vdu_id")
4038 vdu_count_index
= vca_deployed
.get("vdu_count_index")
4039 vdu_name
= vca_deployed
.get("vdu_name")
4040 vnf_index
= vca_deployed
.get("member-vnf-index")
4041 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
4042 for seq
in terminate_primitives
:
4043 # For each sequence in list, get primitive and call _ns_execute_primitive()
4044 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
4045 vnf_index
, seq
.get("name")
4047 self
.logger
.debug(logging_text
+ step
)
4048 # Create the primitive for each sequence, i.e. "primitive": "touch"
4049 primitive
= seq
.get("name")
4050 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
4055 self
._add
_suboperation
(
4062 mapped_primitive_params
,
4064 # Sub-operations: Call _ns_execute_primitive() instead of action()
4066 result
, result_detail
= await self
._ns
_execute
_primitive
(
4067 vca_deployed
["ee_id"],
4069 mapped_primitive_params
,
4073 except LcmException
:
4074 # this happens when VCA is not deployed. In this case it is not needed to terminate
4076 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
4077 if result
not in result_ok
:
4079 "terminate_primitive {} for vnf_member_index={} fails with "
4080 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
4082 # set that this VCA do not need terminated
4083 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
4087 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
4090 # Delete Prometheus Jobs if any
4091 # This uses NSR_ID, so it will destroy any jobs under this index
4092 self
.db
.del_list("prometheus_jobs", {"nsr_id": db_nslcmop
["nsInstanceId"]})
4095 await self
.vca_map
[vca_type
].delete_execution_environment(
4096 vca_deployed
["ee_id"],
4097 scaling_in
=scaling_in
,
4102 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
4103 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
4104 namespace
= "." + db_nsr
["_id"]
4106 await self
.n2vc
.delete_namespace(
4107 namespace
=namespace
,
4108 total_timeout
=self
.timeout_charm_delete
,
4111 except N2VCNotFound
: # already deleted. Skip
4113 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
4115 async def _terminate_RO(
4116 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4119 Terminates a deployment from RO
4120 :param logging_text:
4121 :param nsr_deployed: db_nsr._admin.deployed
4124 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
4125 this method will update only the index 2, but it will write on database the concatenated content of the list
4130 ro_nsr_id
= ro_delete_action
= None
4131 if nsr_deployed
and nsr_deployed
.get("RO"):
4132 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
4133 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
4136 stage
[2] = "Deleting ns from VIM."
4137 db_nsr_update
["detailed-status"] = " ".join(stage
)
4138 self
._write
_op
_status
(nslcmop_id
, stage
)
4139 self
.logger
.debug(logging_text
+ stage
[2])
4140 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4141 self
._write
_op
_status
(nslcmop_id
, stage
)
4142 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
4143 ro_delete_action
= desc
["action_id"]
4145 "_admin.deployed.RO.nsr_delete_action_id"
4146 ] = ro_delete_action
4147 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4148 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4149 if ro_delete_action
:
4150 # wait until NS is deleted from VIM
4151 stage
[2] = "Waiting ns deleted from VIM."
4152 detailed_status_old
= None
4156 + " RO_id={} ro_delete_action={}".format(
4157 ro_nsr_id
, ro_delete_action
4160 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4161 self
._write
_op
_status
(nslcmop_id
, stage
)
4163 delete_timeout
= 20 * 60 # 20 minutes
4164 while delete_timeout
> 0:
4165 desc
= await self
.RO
.show(
4167 item_id_name
=ro_nsr_id
,
4168 extra_item
="action",
4169 extra_item_id
=ro_delete_action
,
4173 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
4175 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
4176 if ns_status
== "ERROR":
4177 raise ROclient
.ROClientException(ns_status_info
)
4178 elif ns_status
== "BUILD":
4179 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
4180 elif ns_status
== "ACTIVE":
4181 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4182 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4187 ), "ROclient.check_action_status returns unknown {}".format(
4190 if stage
[2] != detailed_status_old
:
4191 detailed_status_old
= stage
[2]
4192 db_nsr_update
["detailed-status"] = " ".join(stage
)
4193 self
._write
_op
_status
(nslcmop_id
, stage
)
4194 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4195 await asyncio
.sleep(5, loop
=self
.loop
)
4197 else: # delete_timeout <= 0:
4198 raise ROclient
.ROClientException(
4199 "Timeout waiting ns deleted from VIM"
4202 except Exception as e
:
4203 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4205 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4207 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4208 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4209 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4211 logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
)
4214 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4216 failed_detail
.append("delete conflict: {}".format(e
))
4219 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
)
4222 failed_detail
.append("delete error: {}".format(e
))
4224 logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
)
4228 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
4229 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
4231 stage
[2] = "Deleting nsd from RO."
4232 db_nsr_update
["detailed-status"] = " ".join(stage
)
4233 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4234 self
._write
_op
_status
(nslcmop_id
, stage
)
4235 await self
.RO
.delete("nsd", ro_nsd_id
)
4237 logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
)
4239 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4240 except Exception as e
:
4242 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4244 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4246 logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
)
4249 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4251 failed_detail
.append(
4252 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
)
4254 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4256 failed_detail
.append(
4257 "ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
)
4259 self
.logger
.error(logging_text
+ failed_detail
[-1])
4261 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
4262 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
4263 if not vnf_deployed
or not vnf_deployed
["id"]:
4266 ro_vnfd_id
= vnf_deployed
["id"]
4269 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4270 vnf_deployed
["member-vnf-index"], ro_vnfd_id
4272 db_nsr_update
["detailed-status"] = " ".join(stage
)
4273 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4274 self
._write
_op
_status
(nslcmop_id
, stage
)
4275 await self
.RO
.delete("vnfd", ro_vnfd_id
)
4277 logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
)
4279 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
4280 except Exception as e
:
4282 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4285 "_admin.deployed.RO.vnfd.{}.id".format(index
)
4289 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
)
4292 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4294 failed_detail
.append(
4295 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
)
4297 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4299 failed_detail
.append(
4300 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
)
4302 self
.logger
.error(logging_text
+ failed_detail
[-1])
4305 stage
[2] = "Error deleting from VIM"
4307 stage
[2] = "Deleted from VIM"
4308 db_nsr_update
["detailed-status"] = " ".join(stage
)
4309 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4310 self
._write
_op
_status
(nslcmop_id
, stage
)
4313 raise LcmException("; ".join(failed_detail
))
4315 async def terminate(self
, nsr_id
, nslcmop_id
):
4316 # Try to lock HA task here
4317 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4318 if not task_is_locked_by_me
:
4321 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4322 self
.logger
.debug(logging_text
+ "Enter")
4323 timeout_ns_terminate
= self
.timeout_ns_terminate
4326 operation_params
= None
4328 error_list
= [] # annotates all failed error messages
4329 db_nslcmop_update
= {}
4330 autoremove
= False # autoremove after terminated
4331 tasks_dict_info
= {}
4334 "Stage 1/3: Preparing task.",
4335 "Waiting for previous operations to terminate.",
4338 # ^ contains [stage, step, VIM-status]
4340 # wait for any previous tasks in process
4341 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4343 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4344 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4345 operation_params
= db_nslcmop
.get("operationParams") or {}
4346 if operation_params
.get("timeout_ns_terminate"):
4347 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4348 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4349 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4351 db_nsr_update
["operational-status"] = "terminating"
4352 db_nsr_update
["config-status"] = "terminating"
4353 self
._write
_ns
_status
(
4355 ns_state
="TERMINATING",
4356 current_operation
="TERMINATING",
4357 current_operation_id
=nslcmop_id
,
4358 other_update
=db_nsr_update
,
4360 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4361 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4362 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4365 stage
[1] = "Getting vnf descriptors from db."
4366 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4368 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4370 db_vnfds_from_id
= {}
4371 db_vnfds_from_member_index
= {}
4373 for vnfr
in db_vnfrs_list
:
4374 vnfd_id
= vnfr
["vnfd-id"]
4375 if vnfd_id
not in db_vnfds_from_id
:
4376 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4377 db_vnfds_from_id
[vnfd_id
] = vnfd
4378 db_vnfds_from_member_index
[
4379 vnfr
["member-vnf-index-ref"]
4380 ] = db_vnfds_from_id
[vnfd_id
]
4382 # Destroy individual execution environments when there are terminating primitives.
4383 # Rest of EE will be deleted at once
4384 # TODO - check before calling _destroy_N2VC
4385 # if not operation_params.get("skip_terminate_primitives"):#
4386 # or not vca.get("needed_terminate"):
4387 stage
[0] = "Stage 2/3 execute terminating primitives."
4388 self
.logger
.debug(logging_text
+ stage
[0])
4389 stage
[1] = "Looking execution environment that needs terminate."
4390 self
.logger
.debug(logging_text
+ stage
[1])
4392 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4393 config_descriptor
= None
4394 vca_member_vnf_index
= vca
.get("member-vnf-index")
4395 vca_id
= self
.get_vca_id(
4396 db_vnfrs_dict
.get(vca_member_vnf_index
)
4397 if vca_member_vnf_index
4401 if not vca
or not vca
.get("ee_id"):
4403 if not vca
.get("member-vnf-index"):
4405 config_descriptor
= db_nsr
.get("ns-configuration")
4406 elif vca
.get("vdu_id"):
4407 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4408 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4409 elif vca
.get("kdu_name"):
4410 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4411 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4413 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4414 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4415 vca_type
= vca
.get("type")
4416 exec_terminate_primitives
= not operation_params
.get(
4417 "skip_terminate_primitives"
4418 ) and vca
.get("needed_terminate")
4419 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4420 # pending native charms
4422 True if vca_type
in ("helm", "helm-v3", "native_charm") else False
4424 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4425 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4426 task
= asyncio
.ensure_future(
4434 exec_terminate_primitives
,
4438 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4440 # wait for pending tasks of terminate primitives
4444 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4446 error_list
= await self
._wait
_for
_tasks
(
4449 min(self
.timeout_charm_delete
, timeout_ns_terminate
),
4453 tasks_dict_info
.clear()
4455 return # raise LcmException("; ".join(error_list))
4457 # remove All execution environments at once
4458 stage
[0] = "Stage 3/3 delete all."
4460 if nsr_deployed
.get("VCA"):
4461 stage
[1] = "Deleting all execution environments."
4462 self
.logger
.debug(logging_text
+ stage
[1])
4463 vca_id
= self
.get_vca_id({}, db_nsr
)
4464 task_delete_ee
= asyncio
.ensure_future(
4466 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4467 timeout
=self
.timeout_charm_delete
,
4470 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4471 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4473 # Delete from k8scluster
4474 stage
[1] = "Deleting KDUs."
4475 self
.logger
.debug(logging_text
+ stage
[1])
4476 # print(nsr_deployed)
4477 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4478 if not kdu
or not kdu
.get("kdu-instance"):
4480 kdu_instance
= kdu
.get("kdu-instance")
4481 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4482 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4483 vca_id
= self
.get_vca_id({}, db_nsr
)
4484 task_delete_kdu_instance
= asyncio
.ensure_future(
4485 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4486 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4487 kdu_instance
=kdu_instance
,
4494 + "Unknown k8s deployment type {}".format(
4495 kdu
.get("k8scluster-type")
4500 task_delete_kdu_instance
4501 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4504 stage
[1] = "Deleting ns from VIM."
4506 task_delete_ro
= asyncio
.ensure_future(
4507 self
._terminate
_ng
_ro
(
4508 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4512 task_delete_ro
= asyncio
.ensure_future(
4514 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4517 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4519 # rest of staff will be done at finally
4522 ROclient
.ROClientException
,
4527 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4529 except asyncio
.CancelledError
:
4531 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4533 exc
= "Operation was cancelled"
4534 except Exception as e
:
4535 exc
= traceback
.format_exc()
4536 self
.logger
.critical(
4537 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4542 error_list
.append(str(exc
))
4544 # wait for pending tasks
4546 stage
[1] = "Waiting for terminate pending tasks."
4547 self
.logger
.debug(logging_text
+ stage
[1])
4548 error_list
+= await self
._wait
_for
_tasks
(
4551 timeout_ns_terminate
,
4555 stage
[1] = stage
[2] = ""
4556 except asyncio
.CancelledError
:
4557 error_list
.append("Cancelled")
4558 # TODO cancell all tasks
4559 except Exception as exc
:
4560 error_list
.append(str(exc
))
4561 # update status at database
4563 error_detail
= "; ".join(error_list
)
4564 # self.logger.error(logging_text + error_detail)
4565 error_description_nslcmop
= "{} Detail: {}".format(
4566 stage
[0], error_detail
4568 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4569 nslcmop_id
, stage
[0]
4572 db_nsr_update
["operational-status"] = "failed"
4573 db_nsr_update
["detailed-status"] = (
4574 error_description_nsr
+ " Detail: " + error_detail
4576 db_nslcmop_update
["detailed-status"] = error_detail
4577 nslcmop_operation_state
= "FAILED"
4581 error_description_nsr
= error_description_nslcmop
= None
4582 ns_state
= "NOT_INSTANTIATED"
4583 db_nsr_update
["operational-status"] = "terminated"
4584 db_nsr_update
["detailed-status"] = "Done"
4585 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4586 db_nslcmop_update
["detailed-status"] = "Done"
4587 nslcmop_operation_state
= "COMPLETED"
4590 self
._write
_ns
_status
(
4593 current_operation
="IDLE",
4594 current_operation_id
=None,
4595 error_description
=error_description_nsr
,
4596 error_detail
=error_detail
,
4597 other_update
=db_nsr_update
,
4599 self
._write
_op
_status
(
4602 error_message
=error_description_nslcmop
,
4603 operation_state
=nslcmop_operation_state
,
4604 other_update
=db_nslcmop_update
,
4606 if ns_state
== "NOT_INSTANTIATED":
4610 {"nsr-id-ref": nsr_id
},
4611 {"_admin.nsState": "NOT_INSTANTIATED"},
4613 except DbException
as e
:
4616 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4620 if operation_params
:
4621 autoremove
= operation_params
.get("autoremove", False)
4622 if nslcmop_operation_state
:
4624 await self
.msg
.aiowrite(
4629 "nslcmop_id": nslcmop_id
,
4630 "operationState": nslcmop_operation_state
,
4631 "autoremove": autoremove
,
4635 except Exception as e
:
4637 logging_text
+ "kafka_write notification Exception {}".format(e
)
4640 self
.logger
.debug(logging_text
+ "Exit")
4641 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4643 async def _wait_for_tasks(
4644 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4647 error_detail_list
= []
4649 pending_tasks
= list(created_tasks_info
.keys())
4650 num_tasks
= len(pending_tasks
)
4652 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4653 self
._write
_op
_status
(nslcmop_id
, stage
)
4654 while pending_tasks
:
4656 _timeout
= timeout
+ time_start
- time()
4657 done
, pending_tasks
= await asyncio
.wait(
4658 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4660 num_done
+= len(done
)
4661 if not done
: # Timeout
4662 for task
in pending_tasks
:
4663 new_error
= created_tasks_info
[task
] + ": Timeout"
4664 error_detail_list
.append(new_error
)
4665 error_list
.append(new_error
)
4668 if task
.cancelled():
4671 exc
= task
.exception()
4673 if isinstance(exc
, asyncio
.TimeoutError
):
4675 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4676 error_list
.append(created_tasks_info
[task
])
4677 error_detail_list
.append(new_error
)
4684 ROclient
.ROClientException
,
4690 self
.logger
.error(logging_text
+ new_error
)
4692 exc_traceback
= "".join(
4693 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4697 + created_tasks_info
[task
]
4703 logging_text
+ created_tasks_info
[task
] + ": Done"
4705 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4707 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4708 if nsr_id
: # update also nsr
4713 "errorDescription": "Error at: " + ", ".join(error_list
),
4714 "errorDetail": ". ".join(error_detail_list
),
4717 self
._write
_op
_status
(nslcmop_id
, stage
)
4718 return error_detail_list
4721 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4723 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4724 The default-value is used. If it is between < > it look for a value at instantiation_params
4725 :param primitive_desc: portion of VNFD/NSD that describes primitive
4726 :param params: Params provided by user
4727 :param instantiation_params: Instantiation params provided by user
4728 :return: a dictionary with the calculated params
4730 calculated_params
= {}
4731 for parameter
in primitive_desc
.get("parameter", ()):
4732 param_name
= parameter
["name"]
4733 if param_name
in params
:
4734 calculated_params
[param_name
] = params
[param_name
]
4735 elif "default-value" in parameter
or "value" in parameter
:
4736 if "value" in parameter
:
4737 calculated_params
[param_name
] = parameter
["value"]
4739 calculated_params
[param_name
] = parameter
["default-value"]
4741 isinstance(calculated_params
[param_name
], str)
4742 and calculated_params
[param_name
].startswith("<")
4743 and calculated_params
[param_name
].endswith(">")
4745 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4746 calculated_params
[param_name
] = instantiation_params
[
4747 calculated_params
[param_name
][1:-1]
4751 "Parameter {} needed to execute primitive {} not provided".format(
4752 calculated_params
[param_name
], primitive_desc
["name"]
4757 "Parameter {} needed to execute primitive {} not provided".format(
4758 param_name
, primitive_desc
["name"]
4762 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4763 calculated_params
[param_name
] = yaml
.safe_dump(
4764 calculated_params
[param_name
], default_flow_style
=True, width
=256
4766 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4768 ].startswith("!!yaml "):
4769 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4770 if parameter
.get("data-type") == "INTEGER":
4772 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4773 except ValueError: # error converting string to int
4775 "Parameter {} of primitive {} must be integer".format(
4776 param_name
, primitive_desc
["name"]
4779 elif parameter
.get("data-type") == "BOOLEAN":
4780 calculated_params
[param_name
] = not (
4781 (str(calculated_params
[param_name
])).lower() == "false"
4784 # add always ns_config_info if primitive name is config
4785 if primitive_desc
["name"] == "config":
4786 if "ns_config_info" in instantiation_params
:
4787 calculated_params
["ns_config_info"] = instantiation_params
[
4790 return calculated_params
4792 def _look_for_deployed_vca(
4799 ee_descriptor_id
=None,
4801 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4802 for vca
in deployed_vca
:
4805 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4808 vdu_count_index
is not None
4809 and vdu_count_index
!= vca
["vdu_count_index"]
4812 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4814 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4818 # vca_deployed not found
4820 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4821 " is not deployed".format(
4830 ee_id
= vca
.get("ee_id")
4832 "type", "lxc_proxy_charm"
4833 ) # default value for backward compatibility - proxy charm
4836 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4837 "execution environment".format(
4838 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
4841 return ee_id
, vca_type
4843 async def _ns_execute_primitive(
4849 retries_interval
=30,
4856 if primitive
== "config":
4857 primitive_params
= {"params": primitive_params
}
4859 vca_type
= vca_type
or "lxc_proxy_charm"
4863 output
= await asyncio
.wait_for(
4864 self
.vca_map
[vca_type
].exec_primitive(
4866 primitive_name
=primitive
,
4867 params_dict
=primitive_params
,
4868 progress_timeout
=self
.timeout_progress_primitive
,
4869 total_timeout
=self
.timeout_primitive
,
4874 timeout
=timeout
or self
.timeout_primitive
,
4878 except asyncio
.CancelledError
:
4880 except Exception as e
: # asyncio.TimeoutError
4881 if isinstance(e
, asyncio
.TimeoutError
):
4886 "Error executing action {} on {} -> {}".format(
4891 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
4893 return "FAILED", str(e
)
4895 return "COMPLETED", output
4897 except (LcmException
, asyncio
.CancelledError
):
4899 except Exception as e
:
4900 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
4902 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
4904 Updating the vca_status with latest juju information in nsrs record
4905 :param: nsr_id: Id of the nsr
4906 :param: nslcmop_id: Id of the nslcmop
4910 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
4911 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4912 vca_id
= self
.get_vca_id({}, db_nsr
)
4913 if db_nsr
["_admin"]["deployed"]["K8s"]:
4914 for _
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
4915 cluster_uuid
, kdu_instance
, cluster_type
= (
4916 k8s
["k8scluster-uuid"],
4917 k8s
["kdu-instance"],
4918 k8s
["k8scluster-type"],
4920 await self
._on
_update
_k
8s
_db
(
4921 cluster_uuid
=cluster_uuid
,
4922 kdu_instance
=kdu_instance
,
4923 filter={"_id": nsr_id
},
4925 cluster_type
=cluster_type
,
4928 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
4929 table
, filter = "nsrs", {"_id": nsr_id
}
4930 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
4931 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
4933 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
4934 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
4936 async def action(self
, nsr_id
, nslcmop_id
):
4937 # Try to lock HA task here
4938 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4939 if not task_is_locked_by_me
:
4942 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
4943 self
.logger
.debug(logging_text
+ "Enter")
4944 # get all needed from database
4948 db_nslcmop_update
= {}
4949 nslcmop_operation_state
= None
4950 error_description_nslcmop
= None
4953 # wait for any previous tasks in process
4954 step
= "Waiting for previous operations to terminate"
4955 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4957 self
._write
_ns
_status
(
4960 current_operation
="RUNNING ACTION",
4961 current_operation_id
=nslcmop_id
,
4964 step
= "Getting information from database"
4965 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4966 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4967 if db_nslcmop
["operationParams"].get("primitive_params"):
4968 db_nslcmop
["operationParams"]["primitive_params"] = json
.loads(
4969 db_nslcmop
["operationParams"]["primitive_params"]
4972 nsr_deployed
= db_nsr
["_admin"].get("deployed")
4973 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
4974 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
4975 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
4976 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
4977 primitive
= db_nslcmop
["operationParams"]["primitive"]
4978 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
4979 timeout_ns_action
= db_nslcmop
["operationParams"].get(
4980 "timeout_ns_action", self
.timeout_primitive
4984 step
= "Getting vnfr from database"
4985 db_vnfr
= self
.db
.get_one(
4986 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
4988 if db_vnfr
.get("kdur"):
4990 for kdur
in db_vnfr
["kdur"]:
4991 if kdur
.get("additionalParams"):
4992 kdur
["additionalParams"] = json
.loads(
4993 kdur
["additionalParams"]
4995 kdur_list
.append(kdur
)
4996 db_vnfr
["kdur"] = kdur_list
4997 step
= "Getting vnfd from database"
4998 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
5000 # Sync filesystem before running a primitive
5001 self
.fs
.sync(db_vnfr
["vnfd-id"])
5003 step
= "Getting nsd from database"
5004 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
5006 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5007 # for backward compatibility
5008 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5009 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5010 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5011 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5013 # look for primitive
5014 config_primitive_desc
= descriptor_configuration
= None
5016 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
5018 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
5020 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
5022 descriptor_configuration
= db_nsd
.get("ns-configuration")
5024 if descriptor_configuration
and descriptor_configuration
.get(
5027 for config_primitive
in descriptor_configuration
["config-primitive"]:
5028 if config_primitive
["name"] == primitive
:
5029 config_primitive_desc
= config_primitive
5032 if not config_primitive_desc
:
5033 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
5035 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
5039 primitive_name
= primitive
5040 ee_descriptor_id
= None
5042 primitive_name
= config_primitive_desc
.get(
5043 "execution-environment-primitive", primitive
5045 ee_descriptor_id
= config_primitive_desc
.get(
5046 "execution-environment-ref"
5052 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
5054 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
5057 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
5059 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
5061 desc_params
= parse_yaml_strings(
5062 db_vnfr
.get("additionalParamsForVnf")
5065 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
5066 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
5067 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
5069 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
5070 actions
.add(primitive
["name"])
5071 for primitive
in kdu_configuration
.get("config-primitive", []):
5072 actions
.add(primitive
["name"])
5074 nsr_deployed
["K8s"],
5075 lambda kdu
: kdu_name
== kdu
["kdu-name"]
5076 and kdu
["member-vnf-index"] == vnf_index
,
5080 if primitive_name
in actions
5081 and kdu
["k8scluster-type"] not in ("helm-chart", "helm-chart-v3")
5085 # TODO check if ns is in a proper status
5087 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
5089 # kdur and desc_params already set from before
5090 if primitive_params
:
5091 desc_params
.update(primitive_params
)
5092 # TODO Check if we will need something at vnf level
5093 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
5095 kdu_name
== kdu
["kdu-name"]
5096 and kdu
["member-vnf-index"] == vnf_index
5101 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
5104 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
5105 msg
= "unknown k8scluster-type '{}'".format(
5106 kdu
.get("k8scluster-type")
5108 raise LcmException(msg
)
5111 "collection": "nsrs",
5112 "filter": {"_id": nsr_id
},
5113 "path": "_admin.deployed.K8s.{}".format(index
),
5117 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
5119 step
= "Executing kdu {}".format(primitive_name
)
5120 if primitive_name
== "upgrade":
5121 if desc_params
.get("kdu_model"):
5122 kdu_model
= desc_params
.get("kdu_model")
5123 del desc_params
["kdu_model"]
5125 kdu_model
= kdu
.get("kdu-model")
5126 parts
= kdu_model
.split(sep
=":")
5128 kdu_model
= parts
[0]
5130 detailed_status
= await asyncio
.wait_for(
5131 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
5132 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5133 kdu_instance
=kdu
.get("kdu-instance"),
5135 kdu_model
=kdu_model
,
5138 timeout
=timeout_ns_action
,
5140 timeout
=timeout_ns_action
+ 10,
5143 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
5145 elif primitive_name
== "rollback":
5146 detailed_status
= await asyncio
.wait_for(
5147 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
5148 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5149 kdu_instance
=kdu
.get("kdu-instance"),
5152 timeout
=timeout_ns_action
,
5154 elif primitive_name
== "status":
5155 detailed_status
= await asyncio
.wait_for(
5156 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
5157 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5158 kdu_instance
=kdu
.get("kdu-instance"),
5161 timeout
=timeout_ns_action
,
5164 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
5165 kdu
["kdu-name"], nsr_id
5167 params
= self
._map
_primitive
_params
(
5168 config_primitive_desc
, primitive_params
, desc_params
5171 detailed_status
= await asyncio
.wait_for(
5172 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
5173 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5174 kdu_instance
=kdu_instance
,
5175 primitive_name
=primitive_name
,
5178 timeout
=timeout_ns_action
,
5181 timeout
=timeout_ns_action
,
5185 nslcmop_operation_state
= "COMPLETED"
5187 detailed_status
= ""
5188 nslcmop_operation_state
= "FAILED"
5190 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5191 nsr_deployed
["VCA"],
5192 member_vnf_index
=vnf_index
,
5194 vdu_count_index
=vdu_count_index
,
5195 ee_descriptor_id
=ee_descriptor_id
,
5197 for vca_index
, vca_deployed
in enumerate(
5198 db_nsr
["_admin"]["deployed"]["VCA"]
5200 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5202 "collection": "nsrs",
5203 "filter": {"_id": nsr_id
},
5204 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
5208 nslcmop_operation_state
,
5210 ) = await self
._ns
_execute
_primitive
(
5212 primitive
=primitive_name
,
5213 primitive_params
=self
._map
_primitive
_params
(
5214 config_primitive_desc
, primitive_params
, desc_params
5216 timeout
=timeout_ns_action
,
5222 db_nslcmop_update
["detailed-status"] = detailed_status
5223 error_description_nslcmop
= (
5224 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
5228 + " task Done with result {} {}".format(
5229 nslcmop_operation_state
, detailed_status
5232 return # database update is called inside finally
5234 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5235 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5237 except asyncio
.CancelledError
:
5239 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5241 exc
= "Operation was cancelled"
5242 except asyncio
.TimeoutError
:
5243 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5245 except Exception as e
:
5246 exc
= traceback
.format_exc()
5247 self
.logger
.critical(
5248 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5257 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5258 nslcmop_operation_state
= "FAILED"
5260 self
._write
_ns
_status
(
5264 ], # TODO check if degraded. For the moment use previous status
5265 current_operation
="IDLE",
5266 current_operation_id
=None,
5267 # error_description=error_description_nsr,
5268 # error_detail=error_detail,
5269 other_update
=db_nsr_update
,
5272 self
._write
_op
_status
(
5275 error_message
=error_description_nslcmop
,
5276 operation_state
=nslcmop_operation_state
,
5277 other_update
=db_nslcmop_update
,
5280 if nslcmop_operation_state
:
5282 await self
.msg
.aiowrite(
5287 "nslcmop_id": nslcmop_id
,
5288 "operationState": nslcmop_operation_state
,
5292 except Exception as e
:
5294 logging_text
+ "kafka_write notification Exception {}".format(e
)
5296 self
.logger
.debug(logging_text
+ "Exit")
5297 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5298 return nslcmop_operation_state
, detailed_status
5300 async def _ns_charm_upgrade(
5306 timeout
: float = None,
5308 """This method upgrade charms in VNF instances
5311 ee_id: Execution environment id
5312 path: Local path to the charm
5314 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
5315 timeout: (Float) Timeout for the ns update operation
5318 result: (str, str) COMPLETED/FAILED, details
5321 charm_type
= charm_type
or "lxc_proxy_charm"
5322 output
= await self
.vca_map
[charm_type
].upgrade_charm(
5326 charm_type
=charm_type
,
5327 timeout
=timeout
or self
.timeout_ns_update
,
5331 return "COMPLETED", output
5333 except (LcmException
, asyncio
.CancelledError
):
5336 except Exception as e
:
5338 self
.logger
.debug("Error upgrading charm {}".format(path
))
5340 return "FAILED", "Error upgrading charm {}: {}".format(path
, e
)
5342 async def update(self
, nsr_id
, nslcmop_id
):
5343 """Update NS according to different update types
5345 This method performs upgrade of VNF instances then updates the revision
5346 number in VNF record
5349 nsr_id: Network service will be updated
5350 nslcmop_id: ns lcm operation id
5353 It may raise DbException, LcmException, N2VCException, K8sException
5356 # Try to lock HA task here
5357 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5358 if not task_is_locked_by_me
:
5361 logging_text
= "Task ns={} update={} ".format(nsr_id
, nslcmop_id
)
5362 self
.logger
.debug(logging_text
+ "Enter")
5364 # Set the required variables to be filled up later
5366 db_nslcmop_update
= {}
5368 nslcmop_operation_state
= None
5370 error_description_nslcmop
= ""
5373 detailed_status
= ""
5376 # wait for any previous tasks in process
5377 step
= "Waiting for previous operations to terminate"
5378 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5379 self
._write
_ns
_status
(
5382 current_operation
="UPDATING",
5383 current_operation_id
=nslcmop_id
,
5386 step
= "Getting nslcmop from database"
5387 db_nslcmop
= self
.db
.get_one(
5388 "nslcmops", {"_id": nslcmop_id
}, fail_on_empty
=False
5390 update_type
= db_nslcmop
["operationParams"]["updateType"]
5392 step
= "Getting nsr from database"
5393 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5394 old_operational_status
= db_nsr
["operational-status"]
5395 db_nsr_update
["operational-status"] = "updating"
5396 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5397 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5399 if update_type
== "CHANGE_VNFPKG":
5401 # Get the input parameters given through update request
5402 vnf_instance_id
= db_nslcmop
["operationParams"][
5403 "changeVnfPackageData"
5404 ].get("vnfInstanceId")
5406 vnfd_id
= db_nslcmop
["operationParams"]["changeVnfPackageData"].get(
5409 timeout_seconds
= db_nslcmop
["operationParams"].get("timeout_ns_update")
5411 step
= "Getting vnfr from database"
5412 db_vnfr
= self
.db
.get_one(
5413 "vnfrs", {"_id": vnf_instance_id
}, fail_on_empty
=False
5416 step
= "Getting vnfds from database"
5418 latest_vnfd
= self
.db
.get_one(
5419 "vnfds", {"_id": vnfd_id
}, fail_on_empty
=False
5421 latest_vnfd_revision
= latest_vnfd
["_admin"].get("revision")
5424 current_vnf_revision
= db_vnfr
.get("revision", 1)
5425 current_vnfd
= self
.db
.get_one(
5427 {"_id": vnfd_id
+ ":" + str(current_vnf_revision
)},
5428 fail_on_empty
=False,
5430 # Charm artifact paths will be filled up later
5432 current_charm_artifact_path
,
5433 target_charm_artifact_path
,
5434 charm_artifact_paths
,
5437 step
= "Checking if revision has changed in VNFD"
5438 if current_vnf_revision
!= latest_vnfd_revision
:
5440 # There is new revision of VNFD, update operation is required
5441 current_vnfd_path
= vnfd_id
+ ":" + str(current_vnf_revision
)
5442 latest_vnfd_path
= vnfd_id
5444 step
= "Removing the VNFD packages if they exist in the local path"
5445 shutil
.rmtree(self
.fs
.path
+ current_vnfd_path
, ignore_errors
=True)
5446 shutil
.rmtree(self
.fs
.path
+ latest_vnfd_path
, ignore_errors
=True)
5448 step
= "Get the VNFD packages from FSMongo"
5449 self
.fs
.sync(from_path
=latest_vnfd_path
)
5450 self
.fs
.sync(from_path
=current_vnfd_path
)
5453 "Get the charm-type, charm-id, ee-id if there is deployed VCA"
5455 base_folder
= latest_vnfd
["_admin"]["storage"]
5457 for charm_index
, charm_deployed
in enumerate(
5458 get_iterable(nsr_deployed
, "VCA")
5460 vnf_index
= db_vnfr
.get("member-vnf-index-ref")
5462 # Getting charm-id and charm-type
5463 if charm_deployed
.get("member-vnf-index") == vnf_index
:
5464 charm_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5465 charm_type
= charm_deployed
.get("type")
5468 ee_id
= charm_deployed
.get("ee_id")
5470 step
= "Getting descriptor config"
5471 descriptor_config
= get_configuration(
5472 current_vnfd
, current_vnfd
["id"]
5475 if "execution-environment-list" in descriptor_config
:
5476 ee_list
= descriptor_config
.get(
5477 "execution-environment-list", []
5482 # There could be several charm used in the same VNF
5483 for ee_item
in ee_list
:
5484 if ee_item
.get("juju"):
5486 step
= "Getting charm name"
5487 charm_name
= ee_item
["juju"].get("charm")
5489 step
= "Setting Charm artifact paths"
5490 current_charm_artifact_path
.append(
5491 get_charm_artifact_path(
5495 current_vnf_revision
,
5498 target_charm_artifact_path
.append(
5499 get_charm_artifact_path(
5506 charm_artifact_paths
= zip(
5507 current_charm_artifact_path
, target_charm_artifact_path
5510 step
= "Checking if software version has changed in VNFD"
5511 if find_software_version(current_vnfd
) != find_software_version(
5515 step
= "Checking if existing VNF has charm"
5516 for current_charm_path
, target_charm_path
in list(
5517 charm_artifact_paths
5519 if current_charm_path
:
5521 "Software version change is not supported as VNF instance {} has charm.".format(
5526 # There is no change in the charm package, then redeploy the VNF
5527 # based on new descriptor
5528 step
= "Redeploying VNF"
5529 # This part is in https://osm.etsi.org/gerrit/11943
5532 step
= "Checking if any charm package has changed or not"
5533 for current_charm_path
, target_charm_path
in list(
5534 charm_artifact_paths
5538 and target_charm_path
5539 and self
.check_charm_hash_changed(
5540 current_charm_path
, target_charm_path
5544 step
= "Checking whether VNF uses juju bundle"
5545 if check_juju_bundle_existence(current_vnfd
):
5548 "Charm upgrade is not supported for the instance which"
5549 " uses juju-bundle: {}".format(
5550 check_juju_bundle_existence(current_vnfd
)
5554 step
= "Upgrading Charm"
5558 ) = await self
._ns
_charm
_upgrade
(
5561 charm_type
=charm_type
,
5562 path
=self
.fs
.path
+ target_charm_path
,
5563 timeout
=timeout_seconds
,
5566 if result
== "FAILED":
5567 nslcmop_operation_state
= result
5568 error_description_nslcmop
= detailed_status
5570 db_nslcmop_update
["detailed-status"] = detailed_status
5573 + " step {} Done with result {} {}".format(
5574 step
, nslcmop_operation_state
, detailed_status
5578 step
= "Updating policies"
5579 # This part is in https://osm.etsi.org/gerrit/11943
5581 # If nslcmop_operation_state is None, so any operation is not failed.
5582 if not nslcmop_operation_state
:
5583 nslcmop_operation_state
= "COMPLETED"
5585 # If update CHANGE_VNFPKG nslcmop_operation is successful
5586 # vnf revision need to be updated
5587 vnfr_update
["revision"] = latest_vnfd_revision
5588 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
5592 + " task Done with result {} {}".format(
5593 nslcmop_operation_state
, detailed_status
5596 elif update_type
== "REMOVE_VNF":
5597 # This part is included in https://osm.etsi.org/gerrit/11876
5600 # If nslcmop_operation_state is None, so any operation is not failed.
5601 # All operations are executed in overall.
5602 if not nslcmop_operation_state
:
5603 nslcmop_operation_state
= "COMPLETED"
5604 db_nsr_update
["operational-status"] = old_operational_status
5606 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5607 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5609 except asyncio
.CancelledError
:
5611 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5613 exc
= "Operation was cancelled"
5614 except asyncio
.TimeoutError
:
5615 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5617 except Exception as e
:
5618 exc
= traceback
.format_exc()
5619 self
.logger
.critical(
5620 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5629 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5630 nslcmop_operation_state
= "FAILED"
5631 db_nsr_update
["operational-status"] = old_operational_status
5633 self
._write
_ns
_status
(
5635 ns_state
=db_nsr
["nsState"],
5636 current_operation
="IDLE",
5637 current_operation_id
=None,
5638 other_update
=db_nsr_update
,
5641 self
._write
_op
_status
(
5644 error_message
=error_description_nslcmop
,
5645 operation_state
=nslcmop_operation_state
,
5646 other_update
=db_nslcmop_update
,
5649 if nslcmop_operation_state
:
5651 await self
.msg
.aiowrite(
5656 "nslcmop_id": nslcmop_id
,
5657 "operationState": nslcmop_operation_state
,
5661 except Exception as e
:
5663 logging_text
+ "kafka_write notification Exception {}".format(e
)
5665 self
.logger
.debug(logging_text
+ "Exit")
5666 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_update")
5667 return nslcmop_operation_state
, detailed_status
5669 async def scale(self
, nsr_id
, nslcmop_id
):
5670 # Try to lock HA task here
5671 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5672 if not task_is_locked_by_me
:
5675 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
5676 stage
= ["", "", ""]
5677 tasks_dict_info
= {}
5678 # ^ stage, step, VIM progress
5679 self
.logger
.debug(logging_text
+ "Enter")
5680 # get all needed from database
5682 db_nslcmop_update
= {}
5685 # in case of error, indicates what part of scale was failed to put nsr at error status
5686 scale_process
= None
5687 old_operational_status
= ""
5688 old_config_status
= ""
5691 # wait for any previous tasks in process
5692 step
= "Waiting for previous operations to terminate"
5693 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5694 self
._write
_ns
_status
(
5697 current_operation
="SCALING",
5698 current_operation_id
=nslcmop_id
,
5701 step
= "Getting nslcmop from database"
5703 step
+ " after having waited for previous tasks to be completed"
5705 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5707 step
= "Getting nsr from database"
5708 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5709 old_operational_status
= db_nsr
["operational-status"]
5710 old_config_status
= db_nsr
["config-status"]
5712 step
= "Parsing scaling parameters"
5713 db_nsr_update
["operational-status"] = "scaling"
5714 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5715 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5717 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
5719 ]["member-vnf-index"]
5720 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
5722 ]["scaling-group-descriptor"]
5723 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
5724 # for backward compatibility
5725 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5726 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5727 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5728 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5730 step
= "Getting vnfr from database"
5731 db_vnfr
= self
.db
.get_one(
5732 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
5735 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5737 step
= "Getting vnfd from database"
5738 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
5740 base_folder
= db_vnfd
["_admin"]["storage"]
5742 step
= "Getting scaling-group-descriptor"
5743 scaling_descriptor
= find_in_list(
5744 get_scaling_aspect(db_vnfd
),
5745 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
5747 if not scaling_descriptor
:
5749 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
5750 "at vnfd:scaling-group-descriptor".format(scaling_group
)
5753 step
= "Sending scale order to VIM"
5754 # TODO check if ns is in a proper status
5756 if not db_nsr
["_admin"].get("scaling-group"):
5761 "_admin.scaling-group": [
5762 {"name": scaling_group
, "nb-scale-op": 0}
5766 admin_scale_index
= 0
5768 for admin_scale_index
, admin_scale_info
in enumerate(
5769 db_nsr
["_admin"]["scaling-group"]
5771 if admin_scale_info
["name"] == scaling_group
:
5772 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
5774 else: # not found, set index one plus last element and add new entry with the name
5775 admin_scale_index
+= 1
5777 "_admin.scaling-group.{}.name".format(admin_scale_index
)
5780 vca_scaling_info
= []
5781 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
5782 if scaling_type
== "SCALE_OUT":
5783 if "aspect-delta-details" not in scaling_descriptor
:
5785 "Aspect delta details not fount in scaling descriptor {}".format(
5786 scaling_descriptor
["name"]
5789 # count if max-instance-count is reached
5790 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
5792 scaling_info
["scaling_direction"] = "OUT"
5793 scaling_info
["vdu-create"] = {}
5794 scaling_info
["kdu-create"] = {}
5795 for delta
in deltas
:
5796 for vdu_delta
in delta
.get("vdu-delta", {}):
5797 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
5798 # vdu_index also provides the number of instance of the targeted vdu
5799 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
5800 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
5804 additional_params
= (
5805 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
5808 cloud_init_list
= []
5810 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
5811 max_instance_count
= 10
5812 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
5813 max_instance_count
= vdu_profile
.get(
5814 "max-number-of-instances", 10
5817 default_instance_num
= get_number_of_instances(
5820 instances_number
= vdu_delta
.get("number-of-instances", 1)
5821 nb_scale_op
+= instances_number
5823 new_instance_count
= nb_scale_op
+ default_instance_num
5824 # Control if new count is over max and vdu count is less than max.
5825 # Then assign new instance count
5826 if new_instance_count
> max_instance_count
> vdu_count
:
5827 instances_number
= new_instance_count
- max_instance_count
5829 instances_number
= instances_number
5831 if new_instance_count
> max_instance_count
:
5833 "reached the limit of {} (max-instance-count) "
5834 "scaling-out operations for the "
5835 "scaling-group-descriptor '{}'".format(
5836 nb_scale_op
, scaling_group
5839 for x
in range(vdu_delta
.get("number-of-instances", 1)):
5841 # TODO Information of its own ip is not available because db_vnfr is not updated.
5842 additional_params
["OSM"] = get_osm_params(
5843 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
5845 cloud_init_list
.append(
5846 self
._parse
_cloud
_init
(
5853 vca_scaling_info
.append(
5855 "osm_vdu_id": vdu_delta
["id"],
5856 "member-vnf-index": vnf_index
,
5858 "vdu_index": vdu_index
+ x
,
5861 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
5862 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
5863 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
5864 kdu_name
= kdu_profile
["kdu-name"]
5865 resource_name
= kdu_profile
.get("resource-name", "")
5867 # Might have different kdus in the same delta
5868 # Should have list for each kdu
5869 if not scaling_info
["kdu-create"].get(kdu_name
, None):
5870 scaling_info
["kdu-create"][kdu_name
] = []
5872 kdur
= get_kdur(db_vnfr
, kdu_name
)
5873 if kdur
.get("helm-chart"):
5874 k8s_cluster_type
= "helm-chart-v3"
5875 self
.logger
.debug("kdur: {}".format(kdur
))
5877 kdur
.get("helm-version")
5878 and kdur
.get("helm-version") == "v2"
5880 k8s_cluster_type
= "helm-chart"
5881 elif kdur
.get("juju-bundle"):
5882 k8s_cluster_type
= "juju-bundle"
5885 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5886 "juju-bundle. Maybe an old NBI version is running".format(
5887 db_vnfr
["member-vnf-index-ref"], kdu_name
5891 max_instance_count
= 10
5892 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
5893 max_instance_count
= kdu_profile
.get(
5894 "max-number-of-instances", 10
5897 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
5898 deployed_kdu
, _
= get_deployed_kdu(
5899 nsr_deployed
, kdu_name
, vnf_index
5901 if deployed_kdu
is None:
5903 "KDU '{}' for vnf '{}' not deployed".format(
5907 kdu_instance
= deployed_kdu
.get("kdu-instance")
5908 instance_num
= await self
.k8scluster_map
[
5914 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
5915 kdu_model
=deployed_kdu
.get("kdu-model"),
5917 kdu_replica_count
= instance_num
+ kdu_delta
.get(
5918 "number-of-instances", 1
5921 # Control if new count is over max and instance_num is less than max.
5922 # Then assign max instance number to kdu replica count
5923 if kdu_replica_count
> max_instance_count
> instance_num
:
5924 kdu_replica_count
= max_instance_count
5925 if kdu_replica_count
> max_instance_count
:
5927 "reached the limit of {} (max-instance-count) "
5928 "scaling-out operations for the "
5929 "scaling-group-descriptor '{}'".format(
5930 instance_num
, scaling_group
5934 for x
in range(kdu_delta
.get("number-of-instances", 1)):
5935 vca_scaling_info
.append(
5937 "osm_kdu_id": kdu_name
,
5938 "member-vnf-index": vnf_index
,
5940 "kdu_index": instance_num
+ x
- 1,
5943 scaling_info
["kdu-create"][kdu_name
].append(
5945 "member-vnf-index": vnf_index
,
5947 "k8s-cluster-type": k8s_cluster_type
,
5948 "resource-name": resource_name
,
5949 "scale": kdu_replica_count
,
5952 elif scaling_type
== "SCALE_IN":
5953 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
5955 scaling_info
["scaling_direction"] = "IN"
5956 scaling_info
["vdu-delete"] = {}
5957 scaling_info
["kdu-delete"] = {}
5959 for delta
in deltas
:
5960 for vdu_delta
in delta
.get("vdu-delta", {}):
5961 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
5962 min_instance_count
= 0
5963 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
5964 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
5965 min_instance_count
= vdu_profile
["min-number-of-instances"]
5967 default_instance_num
= get_number_of_instances(
5968 db_vnfd
, vdu_delta
["id"]
5970 instance_num
= vdu_delta
.get("number-of-instances", 1)
5971 nb_scale_op
-= instance_num
5973 new_instance_count
= nb_scale_op
+ default_instance_num
5975 if new_instance_count
< min_instance_count
< vdu_count
:
5976 instances_number
= min_instance_count
- new_instance_count
5978 instances_number
= instance_num
5980 if new_instance_count
< min_instance_count
:
5982 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5983 "scaling-group-descriptor '{}'".format(
5984 nb_scale_op
, scaling_group
5987 for x
in range(vdu_delta
.get("number-of-instances", 1)):
5988 vca_scaling_info
.append(
5990 "osm_vdu_id": vdu_delta
["id"],
5991 "member-vnf-index": vnf_index
,
5993 "vdu_index": vdu_index
- 1 - x
,
5996 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
5997 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
5998 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
5999 kdu_name
= kdu_profile
["kdu-name"]
6000 resource_name
= kdu_profile
.get("resource-name", "")
6002 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
6003 scaling_info
["kdu-delete"][kdu_name
] = []
6005 kdur
= get_kdur(db_vnfr
, kdu_name
)
6006 if kdur
.get("helm-chart"):
6007 k8s_cluster_type
= "helm-chart-v3"
6008 self
.logger
.debug("kdur: {}".format(kdur
))
6010 kdur
.get("helm-version")
6011 and kdur
.get("helm-version") == "v2"
6013 k8s_cluster_type
= "helm-chart"
6014 elif kdur
.get("juju-bundle"):
6015 k8s_cluster_type
= "juju-bundle"
6018 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6019 "juju-bundle. Maybe an old NBI version is running".format(
6020 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
6024 min_instance_count
= 0
6025 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
6026 min_instance_count
= kdu_profile
["min-number-of-instances"]
6028 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
6029 deployed_kdu
, _
= get_deployed_kdu(
6030 nsr_deployed
, kdu_name
, vnf_index
6032 if deployed_kdu
is None:
6034 "KDU '{}' for vnf '{}' not deployed".format(
6038 kdu_instance
= deployed_kdu
.get("kdu-instance")
6039 instance_num
= await self
.k8scluster_map
[
6045 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6046 kdu_model
=deployed_kdu
.get("kdu-model"),
6048 kdu_replica_count
= instance_num
- kdu_delta
.get(
6049 "number-of-instances", 1
6052 if kdu_replica_count
< min_instance_count
< instance_num
:
6053 kdu_replica_count
= min_instance_count
6054 if kdu_replica_count
< min_instance_count
:
6056 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6057 "scaling-group-descriptor '{}'".format(
6058 instance_num
, scaling_group
6062 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6063 vca_scaling_info
.append(
6065 "osm_kdu_id": kdu_name
,
6066 "member-vnf-index": vnf_index
,
6068 "kdu_index": instance_num
- x
- 1,
6071 scaling_info
["kdu-delete"][kdu_name
].append(
6073 "member-vnf-index": vnf_index
,
6075 "k8s-cluster-type": k8s_cluster_type
,
6076 "resource-name": resource_name
,
6077 "scale": kdu_replica_count
,
6081 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
6082 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
6083 if scaling_info
["scaling_direction"] == "IN":
6084 for vdur
in reversed(db_vnfr
["vdur"]):
6085 if vdu_delete
.get(vdur
["vdu-id-ref"]):
6086 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
6087 scaling_info
["vdu"].append(
6089 "name": vdur
.get("name") or vdur
.get("vdu-name"),
6090 "vdu_id": vdur
["vdu-id-ref"],
6094 for interface
in vdur
["interfaces"]:
6095 scaling_info
["vdu"][-1]["interface"].append(
6097 "name": interface
["name"],
6098 "ip_address": interface
["ip-address"],
6099 "mac_address": interface
.get("mac-address"),
6102 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
6105 step
= "Executing pre-scale vnf-config-primitive"
6106 if scaling_descriptor
.get("scaling-config-action"):
6107 for scaling_config_action
in scaling_descriptor
[
6108 "scaling-config-action"
6111 scaling_config_action
.get("trigger") == "pre-scale-in"
6112 and scaling_type
== "SCALE_IN"
6114 scaling_config_action
.get("trigger") == "pre-scale-out"
6115 and scaling_type
== "SCALE_OUT"
6117 vnf_config_primitive
= scaling_config_action
[
6118 "vnf-config-primitive-name-ref"
6120 step
= db_nslcmop_update
[
6122 ] = "executing pre-scale scaling-config-action '{}'".format(
6123 vnf_config_primitive
6126 # look for primitive
6127 for config_primitive
in (
6128 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6129 ).get("config-primitive", ()):
6130 if config_primitive
["name"] == vnf_config_primitive
:
6134 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
6135 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
6136 "primitive".format(scaling_group
, vnf_config_primitive
)
6139 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6140 if db_vnfr
.get("additionalParamsForVnf"):
6141 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6143 scale_process
= "VCA"
6144 db_nsr_update
["config-status"] = "configuring pre-scaling"
6145 primitive_params
= self
._map
_primitive
_params
(
6146 config_primitive
, {}, vnfr_params
6149 # Pre-scale retry check: Check if this sub-operation has been executed before
6150 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6153 vnf_config_primitive
,
6157 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6158 # Skip sub-operation
6159 result
= "COMPLETED"
6160 result_detail
= "Done"
6163 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6164 vnf_config_primitive
, result
, result_detail
6168 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6169 # New sub-operation: Get index of this sub-operation
6171 len(db_nslcmop
.get("_admin", {}).get("operations"))
6176 + "vnf_config_primitive={} New sub-operation".format(
6177 vnf_config_primitive
6181 # retry: Get registered params for this existing sub-operation
6182 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6185 vnf_index
= op
.get("member_vnf_index")
6186 vnf_config_primitive
= op
.get("primitive")
6187 primitive_params
= op
.get("primitive_params")
6190 + "vnf_config_primitive={} Sub-operation retry".format(
6191 vnf_config_primitive
6194 # Execute the primitive, either with new (first-time) or registered (reintent) args
6195 ee_descriptor_id
= config_primitive
.get(
6196 "execution-environment-ref"
6198 primitive_name
= config_primitive
.get(
6199 "execution-environment-primitive", vnf_config_primitive
6201 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6202 nsr_deployed
["VCA"],
6203 member_vnf_index
=vnf_index
,
6205 vdu_count_index
=None,
6206 ee_descriptor_id
=ee_descriptor_id
,
6208 result
, result_detail
= await self
._ns
_execute
_primitive
(
6217 + "vnf_config_primitive={} Done with result {} {}".format(
6218 vnf_config_primitive
, result
, result_detail
6221 # Update operationState = COMPLETED | FAILED
6222 self
._update
_suboperation
_status
(
6223 db_nslcmop
, op_index
, result
, result_detail
6226 if result
== "FAILED":
6227 raise LcmException(result_detail
)
6228 db_nsr_update
["config-status"] = old_config_status
6229 scale_process
= None
6233 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
6236 "_admin.scaling-group.{}.time".format(admin_scale_index
)
6239 # SCALE-IN VCA - BEGIN
6240 if vca_scaling_info
:
6241 step
= db_nslcmop_update
[
6243 ] = "Deleting the execution environments"
6244 scale_process
= "VCA"
6245 for vca_info
in vca_scaling_info
:
6246 if vca_info
["type"] == "delete" and not vca_info
.get("osm_kdu_id"):
6247 member_vnf_index
= str(vca_info
["member-vnf-index"])
6249 logging_text
+ "vdu info: {}".format(vca_info
)
6251 if vca_info
.get("osm_vdu_id"):
6252 vdu_id
= vca_info
["osm_vdu_id"]
6253 vdu_index
= int(vca_info
["vdu_index"])
6256 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6257 member_vnf_index
, vdu_id
, vdu_index
6259 stage
[2] = step
= "Scaling in VCA"
6260 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6261 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
6262 config_update
= db_nsr
["configurationStatus"]
6263 for vca_index
, vca
in enumerate(vca_update
):
6265 (vca
or vca
.get("ee_id"))
6266 and vca
["member-vnf-index"] == member_vnf_index
6267 and vca
["vdu_count_index"] == vdu_index
6269 if vca
.get("vdu_id"):
6270 config_descriptor
= get_configuration(
6271 db_vnfd
, vca
.get("vdu_id")
6273 elif vca
.get("kdu_name"):
6274 config_descriptor
= get_configuration(
6275 db_vnfd
, vca
.get("kdu_name")
6278 config_descriptor
= get_configuration(
6279 db_vnfd
, db_vnfd
["id"]
6281 operation_params
= (
6282 db_nslcmop
.get("operationParams") or {}
6284 exec_terminate_primitives
= not operation_params
.get(
6285 "skip_terminate_primitives"
6286 ) and vca
.get("needed_terminate")
6287 task
= asyncio
.ensure_future(
6296 exec_primitives
=exec_terminate_primitives
,
6300 timeout
=self
.timeout_charm_delete
,
6303 tasks_dict_info
[task
] = "Terminating VCA {}".format(
6306 del vca_update
[vca_index
]
6307 del config_update
[vca_index
]
6308 # wait for pending tasks of terminate primitives
6312 + "Waiting for tasks {}".format(
6313 list(tasks_dict_info
.keys())
6316 error_list
= await self
._wait
_for
_tasks
(
6320 self
.timeout_charm_delete
, self
.timeout_ns_terminate
6325 tasks_dict_info
.clear()
6327 raise LcmException("; ".join(error_list
))
6329 db_vca_and_config_update
= {
6330 "_admin.deployed.VCA": vca_update
,
6331 "configurationStatus": config_update
,
6334 "nsrs", db_nsr
["_id"], db_vca_and_config_update
6336 scale_process
= None
6337 # SCALE-IN VCA - END
6340 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
6341 scale_process
= "RO"
6342 if self
.ro_config
.get("ng"):
6343 await self
._scale
_ng
_ro
(
6344 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
6346 scaling_info
.pop("vdu-create", None)
6347 scaling_info
.pop("vdu-delete", None)
6349 scale_process
= None
6353 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
6354 scale_process
= "KDU"
6355 await self
._scale
_kdu
(
6356 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6358 scaling_info
.pop("kdu-create", None)
6359 scaling_info
.pop("kdu-delete", None)
6361 scale_process
= None
6365 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6367 # SCALE-UP VCA - BEGIN
6368 if vca_scaling_info
:
6369 step
= db_nslcmop_update
[
6371 ] = "Creating new execution environments"
6372 scale_process
= "VCA"
6373 for vca_info
in vca_scaling_info
:
6374 if vca_info
["type"] == "create" and not vca_info
.get("osm_kdu_id"):
6375 member_vnf_index
= str(vca_info
["member-vnf-index"])
6377 logging_text
+ "vdu info: {}".format(vca_info
)
6379 vnfd_id
= db_vnfr
["vnfd-ref"]
6380 if vca_info
.get("osm_vdu_id"):
6381 vdu_index
= int(vca_info
["vdu_index"])
6382 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
6383 if db_vnfr
.get("additionalParamsForVnf"):
6384 deploy_params
.update(
6386 db_vnfr
["additionalParamsForVnf"].copy()
6389 descriptor_config
= get_configuration(
6390 db_vnfd
, db_vnfd
["id"]
6392 if descriptor_config
:
6397 logging_text
=logging_text
6398 + "member_vnf_index={} ".format(member_vnf_index
),
6401 nslcmop_id
=nslcmop_id
,
6407 member_vnf_index
=member_vnf_index
,
6408 vdu_index
=vdu_index
,
6410 deploy_params
=deploy_params
,
6411 descriptor_config
=descriptor_config
,
6412 base_folder
=base_folder
,
6413 task_instantiation_info
=tasks_dict_info
,
6416 vdu_id
= vca_info
["osm_vdu_id"]
6417 vdur
= find_in_list(
6418 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
6420 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
6421 if vdur
.get("additionalParams"):
6422 deploy_params_vdu
= parse_yaml_strings(
6423 vdur
["additionalParams"]
6426 deploy_params_vdu
= deploy_params
6427 deploy_params_vdu
["OSM"] = get_osm_params(
6428 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
6430 if descriptor_config
:
6435 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6436 member_vnf_index
, vdu_id
, vdu_index
6438 stage
[2] = step
= "Scaling out VCA"
6439 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6441 logging_text
=logging_text
6442 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6443 member_vnf_index
, vdu_id
, vdu_index
6447 nslcmop_id
=nslcmop_id
,
6453 member_vnf_index
=member_vnf_index
,
6454 vdu_index
=vdu_index
,
6456 deploy_params
=deploy_params_vdu
,
6457 descriptor_config
=descriptor_config
,
6458 base_folder
=base_folder
,
6459 task_instantiation_info
=tasks_dict_info
,
6462 # SCALE-UP VCA - END
6463 scale_process
= None
6466 # execute primitive service POST-SCALING
6467 step
= "Executing post-scale vnf-config-primitive"
6468 if scaling_descriptor
.get("scaling-config-action"):
6469 for scaling_config_action
in scaling_descriptor
[
6470 "scaling-config-action"
6473 scaling_config_action
.get("trigger") == "post-scale-in"
6474 and scaling_type
== "SCALE_IN"
6476 scaling_config_action
.get("trigger") == "post-scale-out"
6477 and scaling_type
== "SCALE_OUT"
6479 vnf_config_primitive
= scaling_config_action
[
6480 "vnf-config-primitive-name-ref"
6482 step
= db_nslcmop_update
[
6484 ] = "executing post-scale scaling-config-action '{}'".format(
6485 vnf_config_primitive
6488 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6489 if db_vnfr
.get("additionalParamsForVnf"):
6490 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6492 # look for primitive
6493 for config_primitive
in (
6494 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6495 ).get("config-primitive", ()):
6496 if config_primitive
["name"] == vnf_config_primitive
:
6500 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
6501 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
6502 "config-primitive".format(
6503 scaling_group
, vnf_config_primitive
6506 scale_process
= "VCA"
6507 db_nsr_update
["config-status"] = "configuring post-scaling"
6508 primitive_params
= self
._map
_primitive
_params
(
6509 config_primitive
, {}, vnfr_params
6512 # Post-scale retry check: Check if this sub-operation has been executed before
6513 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6516 vnf_config_primitive
,
6520 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6521 # Skip sub-operation
6522 result
= "COMPLETED"
6523 result_detail
= "Done"
6526 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6527 vnf_config_primitive
, result
, result_detail
6531 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6532 # New sub-operation: Get index of this sub-operation
6534 len(db_nslcmop
.get("_admin", {}).get("operations"))
6539 + "vnf_config_primitive={} New sub-operation".format(
6540 vnf_config_primitive
6544 # retry: Get registered params for this existing sub-operation
6545 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6548 vnf_index
= op
.get("member_vnf_index")
6549 vnf_config_primitive
= op
.get("primitive")
6550 primitive_params
= op
.get("primitive_params")
6553 + "vnf_config_primitive={} Sub-operation retry".format(
6554 vnf_config_primitive
6557 # Execute the primitive, either with new (first-time) or registered (reintent) args
6558 ee_descriptor_id
= config_primitive
.get(
6559 "execution-environment-ref"
6561 primitive_name
= config_primitive
.get(
6562 "execution-environment-primitive", vnf_config_primitive
6564 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6565 nsr_deployed
["VCA"],
6566 member_vnf_index
=vnf_index
,
6568 vdu_count_index
=None,
6569 ee_descriptor_id
=ee_descriptor_id
,
6571 result
, result_detail
= await self
._ns
_execute
_primitive
(
6580 + "vnf_config_primitive={} Done with result {} {}".format(
6581 vnf_config_primitive
, result
, result_detail
6584 # Update operationState = COMPLETED | FAILED
6585 self
._update
_suboperation
_status
(
6586 db_nslcmop
, op_index
, result
, result_detail
6589 if result
== "FAILED":
6590 raise LcmException(result_detail
)
6591 db_nsr_update
["config-status"] = old_config_status
6592 scale_process
= None
6597 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
6598 db_nsr_update
["operational-status"] = (
6600 if old_operational_status
== "failed"
6601 else old_operational_status
6603 db_nsr_update
["config-status"] = old_config_status
6606 ROclient
.ROClientException
,
6611 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
6613 except asyncio
.CancelledError
:
6615 logging_text
+ "Cancelled Exception while '{}'".format(step
)
6617 exc
= "Operation was cancelled"
6618 except Exception as e
:
6619 exc
= traceback
.format_exc()
6620 self
.logger
.critical(
6621 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6625 self
._write
_ns
_status
(
6628 current_operation
="IDLE",
6629 current_operation_id
=None,
6632 stage
[1] = "Waiting for instantiate pending tasks."
6633 self
.logger
.debug(logging_text
+ stage
[1])
6634 exc
= await self
._wait
_for
_tasks
(
6637 self
.timeout_ns_deploy
,
6645 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
6646 nslcmop_operation_state
= "FAILED"
6648 db_nsr_update
["operational-status"] = old_operational_status
6649 db_nsr_update
["config-status"] = old_config_status
6650 db_nsr_update
["detailed-status"] = ""
6652 if "VCA" in scale_process
:
6653 db_nsr_update
["config-status"] = "failed"
6654 if "RO" in scale_process
:
6655 db_nsr_update
["operational-status"] = "failed"
6658 ] = "FAILED scaling nslcmop={} {}: {}".format(
6659 nslcmop_id
, step
, exc
6662 error_description_nslcmop
= None
6663 nslcmop_operation_state
= "COMPLETED"
6664 db_nslcmop_update
["detailed-status"] = "Done"
6666 self
._write
_op
_status
(
6669 error_message
=error_description_nslcmop
,
6670 operation_state
=nslcmop_operation_state
,
6671 other_update
=db_nslcmop_update
,
6674 self
._write
_ns
_status
(
6677 current_operation
="IDLE",
6678 current_operation_id
=None,
6679 other_update
=db_nsr_update
,
6682 if nslcmop_operation_state
:
6686 "nslcmop_id": nslcmop_id
,
6687 "operationState": nslcmop_operation_state
,
6689 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
6690 except Exception as e
:
6692 logging_text
+ "kafka_write notification Exception {}".format(e
)
6694 self
.logger
.debug(logging_text
+ "Exit")
6695 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
6697 async def _scale_kdu(
6698 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6700 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
6701 for kdu_name
in _scaling_info
:
6702 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
6703 deployed_kdu
, index
= get_deployed_kdu(
6704 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
6706 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
6707 kdu_instance
= deployed_kdu
["kdu-instance"]
6708 kdu_model
= deployed_kdu
.get("kdu-model")
6709 scale
= int(kdu_scaling_info
["scale"])
6710 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
6713 "collection": "nsrs",
6714 "filter": {"_id": nsr_id
},
6715 "path": "_admin.deployed.K8s.{}".format(index
),
6718 step
= "scaling application {}".format(
6719 kdu_scaling_info
["resource-name"]
6721 self
.logger
.debug(logging_text
+ step
)
6723 if kdu_scaling_info
["type"] == "delete":
6724 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
6727 and kdu_config
.get("terminate-config-primitive")
6728 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
6730 terminate_config_primitive_list
= kdu_config
.get(
6731 "terminate-config-primitive"
6733 terminate_config_primitive_list
.sort(
6734 key
=lambda val
: int(val
["seq"])
6738 terminate_config_primitive
6739 ) in terminate_config_primitive_list
:
6740 primitive_params_
= self
._map
_primitive
_params
(
6741 terminate_config_primitive
, {}, {}
6743 step
= "execute terminate config primitive"
6744 self
.logger
.debug(logging_text
+ step
)
6745 await asyncio
.wait_for(
6746 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
6747 cluster_uuid
=cluster_uuid
,
6748 kdu_instance
=kdu_instance
,
6749 primitive_name
=terminate_config_primitive
["name"],
6750 params
=primitive_params_
,
6757 await asyncio
.wait_for(
6758 self
.k8scluster_map
[k8s_cluster_type
].scale(
6761 kdu_scaling_info
["resource-name"],
6763 cluster_uuid
=cluster_uuid
,
6764 kdu_model
=kdu_model
,
6768 timeout
=self
.timeout_vca_on_error
,
6771 if kdu_scaling_info
["type"] == "create":
6772 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
6775 and kdu_config
.get("initial-config-primitive")
6776 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
6778 initial_config_primitive_list
= kdu_config
.get(
6779 "initial-config-primitive"
6781 initial_config_primitive_list
.sort(
6782 key
=lambda val
: int(val
["seq"])
6785 for initial_config_primitive
in initial_config_primitive_list
:
6786 primitive_params_
= self
._map
_primitive
_params
(
6787 initial_config_primitive
, {}, {}
6789 step
= "execute initial config primitive"
6790 self
.logger
.debug(logging_text
+ step
)
6791 await asyncio
.wait_for(
6792 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
6793 cluster_uuid
=cluster_uuid
,
6794 kdu_instance
=kdu_instance
,
6795 primitive_name
=initial_config_primitive
["name"],
6796 params
=primitive_params_
,
6803 async def _scale_ng_ro(
6804 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
6806 nsr_id
= db_nslcmop
["nsInstanceId"]
6807 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
6810 # read from db: vnfd's for every vnf
6813 # for each vnf in ns, read vnfd
6814 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
6815 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
6816 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
6817 # if we haven't this vnfd, read it from db
6818 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
6820 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
6821 db_vnfds
.append(vnfd
)
6822 n2vc_key
= self
.n2vc
.get_public_key()
6823 n2vc_key_list
= [n2vc_key
]
6826 vdu_scaling_info
.get("vdu-create"),
6827 vdu_scaling_info
.get("vdu-delete"),
6830 # db_vnfr has been updated, update db_vnfrs to use it
6831 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
6832 await self
._instantiate
_ng
_ro
(
6842 start_deploy
=time(),
6843 timeout_ns_deploy
=self
.timeout_ns_deploy
,
6845 if vdu_scaling_info
.get("vdu-delete"):
6847 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
6850 async def extract_prometheus_scrape_jobs(
6851 self
, ee_id
, artifact_path
, ee_config_descriptor
, vnfr_id
, nsr_id
, target_ip
6853 # look if exist a file called 'prometheus*.j2' and
6854 artifact_content
= self
.fs
.dir_ls(artifact_path
)
6858 for f
in artifact_content
6859 if f
.startswith("prometheus") and f
.endswith(".j2")
6865 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
6869 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
6870 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
6872 vnfr_id
= vnfr_id
.replace("-", "")
6874 "JOB_NAME": vnfr_id
,
6875 "TARGET_IP": target_ip
,
6876 "EXPORTER_POD_IP": host_name
,
6877 "EXPORTER_POD_PORT": host_port
,
6879 job_list
= parse_job(job_data
, variables
)
6880 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
6881 for job
in job_list
:
6883 not isinstance(job
.get("job_name"), str)
6884 or vnfr_id
not in job
["job_name"]
6886 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
6887 job
["nsr_id"] = nsr_id
6888 job
["vnfr_id"] = vnfr_id
6891 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
6893 Get VCA Cloud and VCA Cloud Credentials for the VIM account
6895 :param: vim_account_id: VIM Account ID
6897 :return: (cloud_name, cloud_credential)
6899 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
6900 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
6902 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
6904 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
6906 :param: vim_account_id: VIM Account ID
6908 :return: (cloud_name, cloud_credential)
6910 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
6911 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")