1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
22 import logging
.handlers
33 from osm_lcm
import ROclient
34 from osm_lcm
.data_utils
.nsr
import get_deployed_kdu
35 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
36 from osm_lcm
.lcm_utils
import (
44 from osm_lcm
.data_utils
.nsd
import get_vnf_profiles
45 from osm_lcm
.data_utils
.vnfd
import (
48 get_ee_sorted_initial_config_primitive_list
,
49 get_ee_sorted_terminate_config_primitive_list
,
51 get_virtual_link_profiles
,
56 get_number_of_instances
,
60 from osm_lcm
.data_utils
.list_utils
import find_in_list
61 from osm_lcm
.data_utils
.vnfr
import get_osm_params
, get_vdur_index
, get_kdur
62 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
63 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
64 from n2vc
.k8s_helm_conn
import K8sHelmConnector
65 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
66 from n2vc
.k8s_juju_conn
import K8sJujuConnector
68 from osm_common
.dbbase
import DbException
69 from osm_common
.fsbase
import FsException
71 from osm_lcm
.data_utils
.database
.database
import Database
72 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
74 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
75 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
77 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
79 from copy
import copy
, deepcopy
81 from uuid
import uuid4
83 from random
import randint
85 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
89 timeout_vca_on_error
= (
91 ) # Time for charm from first time at blocked,error status to mark as failed
92 timeout_ns_deploy
= 2 * 3600 # default global timeout for deployment a ns
93 timeout_ns_terminate
= 1800 # default global timeout for un deployment a ns
94 timeout_charm_delete
= 10 * 60
95 timeout_primitive
= 30 * 60 # timeout for primitive execution
96 timeout_progress_primitive
= (
98 ) # timeout for some progress in a primitive execution
100 SUBOPERATION_STATUS_NOT_FOUND
= -1
101 SUBOPERATION_STATUS_NEW
= -2
102 SUBOPERATION_STATUS_SKIP
= -3
103 task_name_deploy_vca
= "Deploying VCA"
105 def __init__(self
, msg
, lcm_tasks
, config
, loop
, prometheus
=None):
107 Init, Connect to database, filesystem storage, and messaging
108 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
111 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
113 self
.db
= Database().instance
.db
114 self
.fs
= Filesystem().instance
.fs
116 self
.lcm_tasks
= lcm_tasks
117 self
.timeout
= config
["timeout"]
118 self
.ro_config
= config
["ro_config"]
119 self
.ng_ro
= config
["ro_config"].get("ng")
120 self
.vca_config
= config
["VCA"].copy()
122 # create N2VC connector
123 self
.n2vc
= N2VCJujuConnector(
126 on_update_db
=self
._on
_update
_n
2vc
_db
,
131 self
.conn_helm_ee
= LCMHelmConn(
134 vca_config
=self
.vca_config
,
135 on_update_db
=self
._on
_update
_n
2vc
_db
,
138 self
.k8sclusterhelm2
= K8sHelmConnector(
139 kubectl_command
=self
.vca_config
.get("kubectlpath"),
140 helm_command
=self
.vca_config
.get("helmpath"),
147 self
.k8sclusterhelm3
= K8sHelm3Connector(
148 kubectl_command
=self
.vca_config
.get("kubectlpath"),
149 helm_command
=self
.vca_config
.get("helm3path"),
156 self
.k8sclusterjuju
= K8sJujuConnector(
157 kubectl_command
=self
.vca_config
.get("kubectlpath"),
158 juju_command
=self
.vca_config
.get("jujupath"),
161 on_update_db
=self
._on
_update
_k
8s
_db
,
166 self
.k8scluster_map
= {
167 "helm-chart": self
.k8sclusterhelm2
,
168 "helm-chart-v3": self
.k8sclusterhelm3
,
169 "chart": self
.k8sclusterhelm3
,
170 "juju-bundle": self
.k8sclusterjuju
,
171 "juju": self
.k8sclusterjuju
,
175 "lxc_proxy_charm": self
.n2vc
,
176 "native_charm": self
.n2vc
,
177 "k8s_proxy_charm": self
.n2vc
,
178 "helm": self
.conn_helm_ee
,
179 "helm-v3": self
.conn_helm_ee
,
182 self
.prometheus
= prometheus
185 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
)
188 def increment_ip_mac(ip_mac
, vm_index
=1):
189 if not isinstance(ip_mac
, str):
192 # try with ipv4 look for last dot
193 i
= ip_mac
.rfind(".")
196 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
197 # try with ipv6 or mac look for last colon. Operate in hex
198 i
= ip_mac
.rfind(":")
201 # format in hex, len can be 2 for mac or 4 for ipv6
202 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
203 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
209 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
211 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
214 # TODO filter RO descriptor fields...
218 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
219 db_dict
["deploymentStatus"] = ro_descriptor
220 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
222 except Exception as e
:
224 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id
, e
)
227 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
229 # remove last dot from path (if exists)
230 if path
.endswith("."):
233 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
234 # .format(table, filter, path, updated_data))
237 nsr_id
= filter.get("_id")
239 # read ns record from database
240 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
241 current_ns_status
= nsr
.get("nsState")
243 # get vca status for NS
244 status_dict
= await self
.n2vc
.get_status(
245 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
250 db_dict
["vcaStatus"] = status_dict
251 await self
.n2vc
.update_vca_status(db_dict
["vcaStatus"], vca_id
=vca_id
)
253 # update configurationStatus for this VCA
255 vca_index
= int(path
[path
.rfind(".") + 1 :])
258 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
260 vca_status
= vca_list
[vca_index
].get("status")
262 configuration_status_list
= nsr
.get("configurationStatus")
263 config_status
= configuration_status_list
[vca_index
].get("status")
265 if config_status
== "BROKEN" and vca_status
!= "failed":
266 db_dict
["configurationStatus"][vca_index
] = "READY"
267 elif config_status
!= "BROKEN" and vca_status
== "failed":
268 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
269 except Exception as e
:
270 # not update configurationStatus
271 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
273 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
274 # if nsState = 'DEGRADED' check if all is OK
276 if current_ns_status
in ("READY", "DEGRADED"):
277 error_description
= ""
279 if status_dict
.get("machines"):
280 for machine_id
in status_dict
.get("machines"):
281 machine
= status_dict
.get("machines").get(machine_id
)
282 # check machine agent-status
283 if machine
.get("agent-status"):
284 s
= machine
.get("agent-status").get("status")
287 error_description
+= (
288 "machine {} agent-status={} ; ".format(
292 # check machine instance status
293 if machine
.get("instance-status"):
294 s
= machine
.get("instance-status").get("status")
297 error_description
+= (
298 "machine {} instance-status={} ; ".format(
303 if status_dict
.get("applications"):
304 for app_id
in status_dict
.get("applications"):
305 app
= status_dict
.get("applications").get(app_id
)
306 # check application status
307 if app
.get("status"):
308 s
= app
.get("status").get("status")
311 error_description
+= (
312 "application {} status={} ; ".format(app_id
, s
)
315 if error_description
:
316 db_dict
["errorDescription"] = error_description
317 if current_ns_status
== "READY" and is_degraded
:
318 db_dict
["nsState"] = "DEGRADED"
319 if current_ns_status
== "DEGRADED" and not is_degraded
:
320 db_dict
["nsState"] = "READY"
323 self
.update_db_2("nsrs", nsr_id
, db_dict
)
325 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
327 except Exception as e
:
328 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
330 async def _on_update_k8s_db(
331 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None, cluster_type
="juju"
334 Updating vca status in NSR record
335 :param cluster_uuid: UUID of a k8s cluster
336 :param kdu_instance: The unique name of the KDU instance
337 :param filter: To get nsr_id
338 :cluster_type: The cluster type (juju, k8s)
342 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
343 # .format(cluster_uuid, kdu_instance, filter))
345 nsr_id
= filter.get("_id")
347 vca_status
= await self
.k8scluster_map
[cluster_type
].status_kdu(
348 cluster_uuid
=cluster_uuid
,
349 kdu_instance
=kdu_instance
,
351 complete_status
=True,
357 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
359 if cluster_type
in ("juju-bundle", "juju"):
360 # TODO -> this should be done in a more uniform way, I think in N2VC, in order to update the K8s VCA
361 # status in a similar way between Juju Bundles and Helm Charts on this side
362 await self
.k8sclusterjuju
.update_vca_status(
363 db_dict
["vcaStatus"],
369 f
"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
373 self
.update_db_2("nsrs", nsr_id
, db_dict
)
374 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
376 except Exception as e
:
377 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
380 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
382 env
= Environment(undefined
=StrictUndefined
)
383 template
= env
.from_string(cloud_init_text
)
384 return template
.render(additional_params
or {})
385 except UndefinedError
as e
:
387 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
388 "file, must be provided in the instantiation parameters inside the "
389 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
391 except (TemplateError
, TemplateNotFound
) as e
:
393 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
398 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
399 cloud_init_content
= cloud_init_file
= None
401 if vdu
.get("cloud-init-file"):
402 base_folder
= vnfd
["_admin"]["storage"]
403 cloud_init_file
= "{}/{}/cloud_init/{}".format(
404 base_folder
["folder"],
405 base_folder
["pkg-dir"],
406 vdu
["cloud-init-file"],
408 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
409 cloud_init_content
= ci_file
.read()
410 elif vdu
.get("cloud-init"):
411 cloud_init_content
= vdu
["cloud-init"]
413 return cloud_init_content
414 except FsException
as e
:
416 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
417 vnfd
["id"], vdu
["id"], cloud_init_file
, e
421 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
423 (vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]),
426 additional_params
= vdur
.get("additionalParams")
427 return parse_yaml_strings(additional_params
)
429 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
431 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
432 :param vnfd: input vnfd
433 :param new_id: overrides vnf id if provided
434 :param additionalParams: Instantiation params for VNFs provided
435 :param nsrId: Id of the NSR
436 :return: copy of vnfd
438 vnfd_RO
= deepcopy(vnfd
)
439 # remove unused by RO configuration, monitoring, scaling and internal keys
440 vnfd_RO
.pop("_id", None)
441 vnfd_RO
.pop("_admin", None)
442 vnfd_RO
.pop("monitoring-param", None)
443 vnfd_RO
.pop("scaling-group-descriptor", None)
444 vnfd_RO
.pop("kdu", None)
445 vnfd_RO
.pop("k8s-cluster", None)
447 vnfd_RO
["id"] = new_id
449 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
450 for vdu
in get_iterable(vnfd_RO
, "vdu"):
451 vdu
.pop("cloud-init-file", None)
452 vdu
.pop("cloud-init", None)
456 def ip_profile_2_RO(ip_profile
):
457 RO_ip_profile
= deepcopy(ip_profile
)
458 if "dns-server" in RO_ip_profile
:
459 if isinstance(RO_ip_profile
["dns-server"], list):
460 RO_ip_profile
["dns-address"] = []
461 for ds
in RO_ip_profile
.pop("dns-server"):
462 RO_ip_profile
["dns-address"].append(ds
["address"])
464 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
465 if RO_ip_profile
.get("ip-version") == "ipv4":
466 RO_ip_profile
["ip-version"] = "IPv4"
467 if RO_ip_profile
.get("ip-version") == "ipv6":
468 RO_ip_profile
["ip-version"] = "IPv6"
469 if "dhcp-params" in RO_ip_profile
:
470 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
473 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
474 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
475 if db_vim
["_admin"]["operationalState"] != "ENABLED":
477 "VIM={} is not available. operationalState={}".format(
478 vim_account
, db_vim
["_admin"]["operationalState"]
481 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
484 def get_ro_wim_id_for_wim_account(self
, wim_account
):
485 if isinstance(wim_account
, str):
486 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
487 if db_wim
["_admin"]["operationalState"] != "ENABLED":
489 "WIM={} is not available. operationalState={}".format(
490 wim_account
, db_wim
["_admin"]["operationalState"]
493 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
498 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
500 db_vdu_push_list
= []
502 db_update
= {"_admin.modified": time()}
504 for vdu_id
, vdu_count
in vdu_create
.items():
508 for vdur
in reversed(db_vnfr
["vdur"])
509 if vdur
["vdu-id-ref"] == vdu_id
514 # Read the template saved in the db:
515 self
.logger
.debug(f
"No vdur in the database. Using the vdur-template to scale")
516 vdur_template
= db_vnfr
.get("vdur-template")
517 if not vdur_template
:
519 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
523 vdur
= vdur_template
[0]
524 #Delete a template from the database after using it
525 self
.db
.set_one("vnfrs",
526 {"_id": db_vnfr
["_id"]},
528 pull
={"vdur-template": {"_id": vdur
['_id']}}
530 for count
in range(vdu_count
):
531 vdur_copy
= deepcopy(vdur
)
532 vdur_copy
["status"] = "BUILD"
533 vdur_copy
["status-detailed"] = None
534 vdur_copy
["ip-address"] = None
535 vdur_copy
["_id"] = str(uuid4())
536 vdur_copy
["count-index"] += count
+ 1
537 vdur_copy
["id"] = "{}-{}".format(
538 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
540 vdur_copy
.pop("vim_info", None)
541 for iface
in vdur_copy
["interfaces"]:
542 if iface
.get("fixed-ip"):
543 iface
["ip-address"] = self
.increment_ip_mac(
544 iface
["ip-address"], count
+ 1
547 iface
.pop("ip-address", None)
548 if iface
.get("fixed-mac"):
549 iface
["mac-address"] = self
.increment_ip_mac(
550 iface
["mac-address"], count
+ 1
553 iface
.pop("mac-address", None)
557 ) # only first vdu can be managment of vnf
558 db_vdu_push_list
.append(vdur_copy
)
559 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
561 if len(db_vnfr
["vdur"]) == 1:
562 # The scale will move to 0 instances
563 self
.logger
.debug(f
"Scaling to 0 !, creating the template with the last vdur")
564 template_vdur
= [db_vnfr
["vdur"][0]]
565 for vdu_id
, vdu_count
in vdu_delete
.items():
567 indexes_to_delete
= [
569 for iv
in enumerate(db_vnfr
["vdur"])
570 if iv
[1]["vdu-id-ref"] == vdu_id
574 "vdur.{}.status".format(i
): "DELETING"
575 for i
in indexes_to_delete
[-vdu_count
:]
579 # it must be deleted one by one because common.db does not allow otherwise
582 for v
in reversed(db_vnfr
["vdur"])
583 if v
["vdu-id-ref"] == vdu_id
585 for vdu
in vdus_to_delete
[:vdu_count
]:
588 {"_id": db_vnfr
["_id"]},
590 pull
={"vdur": {"_id": vdu
["_id"]}},
594 db_push
["vdur"] = db_vdu_push_list
596 db_push
["vdur-template"] = template_vdur
599 db_vnfr
["vdur-template"] = template_vdur
600 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
601 # modify passed dictionary db_vnfr
602 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
603 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
605 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
607 Updates database nsr with the RO info for the created vld
608 :param ns_update_nsr: dictionary to be filled with the updated info
609 :param db_nsr: content of db_nsr. This is also modified
610 :param nsr_desc_RO: nsr descriptor from RO
611 :return: Nothing, LcmException is raised on errors
614 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
615 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
616 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
618 vld
["vim-id"] = net_RO
.get("vim_net_id")
619 vld
["name"] = net_RO
.get("vim_name")
620 vld
["status"] = net_RO
.get("status")
621 vld
["status-detailed"] = net_RO
.get("error_msg")
622 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
626 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
629 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
631 for db_vnfr
in db_vnfrs
.values():
632 vnfr_update
= {"status": "ERROR"}
633 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
634 if "status" not in vdur
:
635 vdur
["status"] = "ERROR"
636 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
638 vdur
["status-detailed"] = str(error_text
)
640 "vdur.{}.status-detailed".format(vdu_index
)
642 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
643 except DbException
as e
:
644 self
.logger
.error("Cannot update vnf. {}".format(e
))
646 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
648 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
649 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
650 :param nsr_desc_RO: nsr descriptor from RO
651 :return: Nothing, LcmException is raised on errors
653 for vnf_index
, db_vnfr
in db_vnfrs
.items():
654 for vnf_RO
in nsr_desc_RO
["vnfs"]:
655 if vnf_RO
["member_vnf_index"] != vnf_index
:
658 if vnf_RO
.get("ip_address"):
659 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
[
662 elif not db_vnfr
.get("ip-address"):
663 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
664 raise LcmExceptionNoMgmtIP(
665 "ns member_vnf_index '{}' has no IP address".format(
670 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
671 vdur_RO_count_index
= 0
672 if vdur
.get("pdu-type"):
674 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
675 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
677 if vdur
["count-index"] != vdur_RO_count_index
:
678 vdur_RO_count_index
+= 1
680 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
681 if vdur_RO
.get("ip_address"):
682 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
684 vdur
["ip-address"] = None
685 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
686 vdur
["name"] = vdur_RO
.get("vim_name")
687 vdur
["status"] = vdur_RO
.get("status")
688 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
689 for ifacer
in get_iterable(vdur
, "interfaces"):
690 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
691 if ifacer
["name"] == interface_RO
.get("internal_name"):
692 ifacer
["ip-address"] = interface_RO
.get(
695 ifacer
["mac-address"] = interface_RO
.get(
701 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
702 "from VIM info".format(
703 vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]
706 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
710 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
712 vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]
716 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
717 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
718 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
720 vld
["vim-id"] = net_RO
.get("vim_net_id")
721 vld
["name"] = net_RO
.get("vim_name")
722 vld
["status"] = net_RO
.get("status")
723 vld
["status-detailed"] = net_RO
.get("error_msg")
724 vnfr_update
["vld.{}".format(vld_index
)] = vld
728 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
733 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
738 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
743 def _get_ns_config_info(self
, nsr_id
):
745 Generates a mapping between vnf,vdu elements and the N2VC id
746 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
747 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
748 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
749 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
751 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
752 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
754 ns_config_info
= {"osm-config-mapping": mapping
}
755 for vca
in vca_deployed_list
:
756 if not vca
["member-vnf-index"]:
758 if not vca
["vdu_id"]:
759 mapping
[vca
["member-vnf-index"]] = vca
["application"]
763 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
765 ] = vca
["application"]
766 return ns_config_info
768 async def _instantiate_ng_ro(
785 def get_vim_account(vim_account_id
):
787 if vim_account_id
in db_vims
:
788 return db_vims
[vim_account_id
]
789 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
790 db_vims
[vim_account_id
] = db_vim
793 # modify target_vld info with instantiation parameters
794 def parse_vld_instantiation_params(
795 target_vim
, target_vld
, vld_params
, target_sdn
797 if vld_params
.get("ip-profile"):
798 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
[
801 if vld_params
.get("provider-network"):
802 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
805 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
806 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
809 if vld_params
.get("wimAccountId"):
810 target_wim
= "wim:{}".format(vld_params
["wimAccountId"])
811 target_vld
["vim_info"][target_wim
] = {}
812 for param
in ("vim-network-name", "vim-network-id"):
813 if vld_params
.get(param
):
814 if isinstance(vld_params
[param
], dict):
815 for vim
, vim_net
in vld_params
[param
].items():
816 other_target_vim
= "vim:" + vim
818 target_vld
["vim_info"],
819 (other_target_vim
, param
.replace("-", "_")),
822 else: # isinstance str
823 target_vld
["vim_info"][target_vim
][
824 param
.replace("-", "_")
825 ] = vld_params
[param
]
826 if vld_params
.get("common_id"):
827 target_vld
["common_id"] = vld_params
.get("common_id")
829 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
830 def update_ns_vld_target(target
, ns_params
):
831 for vnf_params
in ns_params
.get("vnf", ()):
832 if vnf_params
.get("vimAccountId"):
836 for vnfr
in db_vnfrs
.values()
837 if vnf_params
["member-vnf-index"]
838 == vnfr
["member-vnf-index-ref"]
842 vdur
= next((vdur
for vdur
in target_vnf
.get("vdur", ())), None)
843 for a_index
, a_vld
in enumerate(target
["ns"]["vld"]):
844 target_vld
= find_in_list(
845 get_iterable(vdur
, "interfaces"),
846 lambda iface
: iface
.get("ns-vld-id") == a_vld
["name"],
849 if vnf_params
.get("vimAccountId") not in a_vld
.get(
852 target
["ns"]["vld"][a_index
].get("vim_info").update(
854 "vim:{}".format(vnf_params
["vimAccountId"]): {
855 "vim_network_name": ""
860 nslcmop_id
= db_nslcmop
["_id"]
862 "name": db_nsr
["name"],
865 "image": deepcopy(db_nsr
["image"]),
866 "flavor": deepcopy(db_nsr
["flavor"]),
867 "action_id": nslcmop_id
,
868 "cloud_init_content": {},
870 for image
in target
["image"]:
871 image
["vim_info"] = {}
872 for flavor
in target
["flavor"]:
873 flavor
["vim_info"] = {}
874 if db_nsr
.get("affinity-or-anti-affinity-group"):
875 target
["affinity-or-anti-affinity-group"] = deepcopy(db_nsr
["affinity-or-anti-affinity-group"])
876 for affinity_or_anti_affinity_group
in target
["affinity-or-anti-affinity-group"]:
877 affinity_or_anti_affinity_group
["vim_info"] = {}
879 if db_nslcmop
.get("lcmOperationType") != "instantiate":
880 # get parameters of instantiation:
881 db_nslcmop_instantiate
= self
.db
.get_list(
884 "nsInstanceId": db_nslcmop
["nsInstanceId"],
885 "lcmOperationType": "instantiate",
888 ns_params
= db_nslcmop_instantiate
.get("operationParams")
890 ns_params
= db_nslcmop
.get("operationParams")
891 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
892 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
895 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
896 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
900 "mgmt-network": vld
.get("mgmt-network", False),
901 "type": vld
.get("type"),
904 "vim_network_name": vld
.get("vim-network-name"),
905 "vim_account_id": ns_params
["vimAccountId"],
909 # check if this network needs SDN assist
910 if vld
.get("pci-interfaces"):
911 db_vim
= get_vim_account(ns_params
["vimAccountId"])
912 sdnc_id
= db_vim
["config"].get("sdn-controller")
914 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
915 target_sdn
= "sdn:{}".format(sdnc_id
)
916 target_vld
["vim_info"][target_sdn
] = {
918 "target_vim": target_vim
,
920 "type": vld
.get("type"),
923 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
924 for nsd_vnf_profile
in nsd_vnf_profiles
:
925 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
926 if cp
["virtual-link-profile-id"] == vld
["id"]:
928 "member_vnf:{}.{}".format(
929 cp
["constituent-cpd-id"][0][
930 "constituent-base-element-id"
932 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
934 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
936 # check at nsd descriptor, if there is an ip-profile
938 nsd_vlp
= find_in_list(
939 get_virtual_link_profiles(nsd
),
940 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
945 and nsd_vlp
.get("virtual-link-protocol-data")
946 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
948 ip_profile_source_data
= nsd_vlp
["virtual-link-protocol-data"][
951 ip_profile_dest_data
= {}
952 if "ip-version" in ip_profile_source_data
:
953 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
956 if "cidr" in ip_profile_source_data
:
957 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
960 if "gateway-ip" in ip_profile_source_data
:
961 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
[
964 if "dhcp-enabled" in ip_profile_source_data
:
965 ip_profile_dest_data
["dhcp-params"] = {
966 "enabled": ip_profile_source_data
["dhcp-enabled"]
968 vld_params
["ip-profile"] = ip_profile_dest_data
970 # update vld_params with instantiation params
971 vld_instantiation_params
= find_in_list(
972 get_iterable(ns_params
, "vld"),
973 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
975 if vld_instantiation_params
:
976 vld_params
.update(vld_instantiation_params
)
977 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
978 target
["ns"]["vld"].append(target_vld
)
979 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
980 update_ns_vld_target(target
, ns_params
)
982 for vnfr
in db_vnfrs
.values():
984 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
986 vnf_params
= find_in_list(
987 get_iterable(ns_params
, "vnf"),
988 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
990 target_vnf
= deepcopy(vnfr
)
991 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
992 for vld
in target_vnf
.get("vld", ()):
993 # check if connected to a ns.vld, to fill target'
994 vnf_cp
= find_in_list(
995 vnfd
.get("int-virtual-link-desc", ()),
996 lambda cpd
: cpd
.get("id") == vld
["id"],
999 ns_cp
= "member_vnf:{}.{}".format(
1000 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
1002 if cp2target
.get(ns_cp
):
1003 vld
["target"] = cp2target
[ns_cp
]
1006 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
1008 # check if this network needs SDN assist
1010 if vld
.get("pci-interfaces"):
1011 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1012 sdnc_id
= db_vim
["config"].get("sdn-controller")
1014 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
1015 target_sdn
= "sdn:{}".format(sdnc_id
)
1016 vld
["vim_info"][target_sdn
] = {
1018 "target_vim": target_vim
,
1020 "type": vld
.get("type"),
1023 # check at vnfd descriptor, if there is an ip-profile
1025 vnfd_vlp
= find_in_list(
1026 get_virtual_link_profiles(vnfd
),
1027 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
1031 and vnfd_vlp
.get("virtual-link-protocol-data")
1032 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1034 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"][
1037 ip_profile_dest_data
= {}
1038 if "ip-version" in ip_profile_source_data
:
1039 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1042 if "cidr" in ip_profile_source_data
:
1043 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1046 if "gateway-ip" in ip_profile_source_data
:
1047 ip_profile_dest_data
[
1049 ] = ip_profile_source_data
["gateway-ip"]
1050 if "dhcp-enabled" in ip_profile_source_data
:
1051 ip_profile_dest_data
["dhcp-params"] = {
1052 "enabled": ip_profile_source_data
["dhcp-enabled"]
1055 vld_params
["ip-profile"] = ip_profile_dest_data
1056 # update vld_params with instantiation params
1058 vld_instantiation_params
= find_in_list(
1059 get_iterable(vnf_params
, "internal-vld"),
1060 lambda i_vld
: i_vld
["name"] == vld
["id"],
1062 if vld_instantiation_params
:
1063 vld_params
.update(vld_instantiation_params
)
1064 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
1067 for vdur
in target_vnf
.get("vdur", ()):
1068 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
1069 continue # This vdu must not be created
1070 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
1072 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
1075 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
1076 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
1079 and vdu_configuration
.get("config-access")
1080 and vdu_configuration
.get("config-access").get("ssh-access")
1082 vdur
["ssh-keys"] = ssh_keys_all
1083 vdur
["ssh-access-required"] = vdu_configuration
[
1085 ]["ssh-access"]["required"]
1088 and vnf_configuration
.get("config-access")
1089 and vnf_configuration
.get("config-access").get("ssh-access")
1090 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
1092 vdur
["ssh-keys"] = ssh_keys_all
1093 vdur
["ssh-access-required"] = vnf_configuration
[
1095 ]["ssh-access"]["required"]
1096 elif ssh_keys_instantiation
and find_in_list(
1097 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
1099 vdur
["ssh-keys"] = ssh_keys_instantiation
1101 self
.logger
.debug("NS > vdur > {}".format(vdur
))
1103 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1105 if vdud
.get("cloud-init-file"):
1106 vdur
["cloud-init"] = "{}:file:{}".format(
1107 vnfd
["_id"], vdud
.get("cloud-init-file")
1109 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1110 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1111 base_folder
= vnfd
["_admin"]["storage"]
1112 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1113 base_folder
["folder"],
1114 base_folder
["pkg-dir"],
1115 vdud
.get("cloud-init-file"),
1117 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1118 target
["cloud_init_content"][
1121 elif vdud
.get("cloud-init"):
1122 vdur
["cloud-init"] = "{}:vdu:{}".format(
1123 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1125 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1126 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1129 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1130 deploy_params_vdu
= self
._format
_additional
_params
(
1131 vdur
.get("additionalParams") or {}
1133 deploy_params_vdu
["OSM"] = get_osm_params(
1134 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1136 vdur
["additionalParams"] = deploy_params_vdu
1139 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1140 if target_vim
not in ns_flavor
["vim_info"]:
1141 ns_flavor
["vim_info"][target_vim
] = {}
1144 # in case alternative images are provided we must check if they should be applied
1145 # for the vim_type, modify the vim_type taking into account
1146 ns_image_id
= int(vdur
["ns-image-id"])
1147 if vdur
.get("alt-image-ids"):
1148 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1149 vim_type
= db_vim
["vim_type"]
1150 for alt_image_id
in vdur
.get("alt-image-ids"):
1151 ns_alt_image
= target
["image"][int(alt_image_id
)]
1152 if vim_type
== ns_alt_image
.get("vim-type"):
1153 # must use alternative image
1155 "use alternative image id: {}".format(alt_image_id
)
1157 ns_image_id
= alt_image_id
1158 vdur
["ns-image-id"] = ns_image_id
1160 ns_image
= target
["image"][int(ns_image_id
)]
1161 if target_vim
not in ns_image
["vim_info"]:
1162 ns_image
["vim_info"][target_vim
] = {}
1165 if vdur
.get("affinity-or-anti-affinity-group-id"):
1166 for ags_id
in vdur
["affinity-or-anti-affinity-group-id"]:
1167 ns_ags
= target
["affinity-or-anti-affinity-group"][int(ags_id
)]
1168 if target_vim
not in ns_ags
["vim_info"]:
1169 ns_ags
["vim_info"][target_vim
] = {}
1171 vdur
["vim_info"] = {target_vim
: {}}
1172 # instantiation parameters
1174 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
1175 # vdud["id"]), None)
1176 vdur_list
.append(vdur
)
1177 target_vnf
["vdur"] = vdur_list
1178 target
["vnf"].append(target_vnf
)
1180 desc
= await self
.RO
.deploy(nsr_id
, target
)
1181 self
.logger
.debug("RO return > {}".format(desc
))
1182 action_id
= desc
["action_id"]
1183 await self
._wait
_ng
_ro
(
1184 nsr_id
, action_id
, nslcmop_id
, start_deploy
, timeout_ns_deploy
, stage
1189 "_admin.deployed.RO.operational-status": "running",
1190 "detailed-status": " ".join(stage
),
1192 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1193 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1194 self
._write
_op
_status
(nslcmop_id
, stage
)
1196 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1200 async def _wait_ng_ro(
1209 detailed_status_old
= None
1211 start_time
= start_time
or time()
1212 while time() <= start_time
+ timeout
:
1213 desc_status
= await self
.RO
.status(nsr_id
, action_id
)
1214 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1215 if desc_status
["status"] == "FAILED":
1216 raise NgRoException(desc_status
["details"])
1217 elif desc_status
["status"] == "BUILD":
1219 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1220 elif desc_status
["status"] == "DONE":
1222 stage
[2] = "Deployed at VIM"
1225 assert False, "ROclient.check_ns_status returns unknown {}".format(
1226 desc_status
["status"]
1228 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1229 detailed_status_old
= stage
[2]
1230 db_nsr_update
["detailed-status"] = " ".join(stage
)
1231 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1232 self
._write
_op
_status
(nslcmop_id
, stage
)
1233 await asyncio
.sleep(15, loop
=self
.loop
)
1234 else: # timeout_ns_deploy
1235 raise NgRoException("Timeout waiting ns to deploy")
1237 async def _terminate_ng_ro(
1238 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1243 start_deploy
= time()
1250 "action_id": nslcmop_id
,
1252 desc
= await self
.RO
.deploy(nsr_id
, target
)
1253 action_id
= desc
["action_id"]
1254 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1255 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1258 + "ns terminate action at RO. action_id={}".format(action_id
)
1262 delete_timeout
= 20 * 60 # 20 minutes
1263 await self
._wait
_ng
_ro
(
1264 nsr_id
, action_id
, nslcmop_id
, start_deploy
, delete_timeout
, stage
1267 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1268 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1270 await self
.RO
.delete(nsr_id
)
1271 except Exception as e
:
1272 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
1273 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1274 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1275 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1277 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1279 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
1280 failed_detail
.append("delete conflict: {}".format(e
))
1283 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1286 failed_detail
.append("delete error: {}".format(e
))
1289 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1293 stage
[2] = "Error deleting from VIM"
1295 stage
[2] = "Deleted from VIM"
1296 db_nsr_update
["detailed-status"] = " ".join(stage
)
1297 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1298 self
._write
_op
_status
(nslcmop_id
, stage
)
1301 raise LcmException("; ".join(failed_detail
))
1304 async def instantiate_RO(
1318 :param logging_text: preffix text to use at logging
1319 :param nsr_id: nsr identity
1320 :param nsd: database content of ns descriptor
1321 :param db_nsr: database content of ns record
1322 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1324 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1325 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1326 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1327 :return: None or exception
1330 start_deploy
= time()
1331 ns_params
= db_nslcmop
.get("operationParams")
1332 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1333 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1335 timeout_ns_deploy
= self
.timeout
.get(
1336 "ns_deploy", self
.timeout_ns_deploy
1339 # Check for and optionally request placement optimization. Database will be updated if placement activated
1340 stage
[2] = "Waiting for Placement."
1341 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1342 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1343 for vnfr
in db_vnfrs
.values():
1344 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1347 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1349 return await self
._instantiate
_ng
_ro
(
1362 except Exception as e
:
1363 stage
[2] = "ERROR deploying at VIM"
1364 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1366 "Error deploying at VIM {}".format(e
),
1367 exc_info
=not isinstance(
1370 ROclient
.ROClientException
,
1379 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1381 Wait for kdu to be up, get ip address
1382 :param logging_text: prefix use for logging
1389 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1392 while nb_tries
< 360:
1393 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1397 for x
in get_iterable(db_vnfr
, "kdur")
1398 if x
.get("kdu-name") == kdu_name
1404 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1406 if kdur
.get("status"):
1407 if kdur
["status"] in ("READY", "ENABLED"):
1408 return kdur
.get("ip-address")
1411 "target KDU={} is in error state".format(kdu_name
)
1414 await asyncio
.sleep(10, loop
=self
.loop
)
1416 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1418 async def wait_vm_up_insert_key_ro(
1419 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1422 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1423 :param logging_text: prefix use for logging
1428 :param pub_key: public ssh key to inject, None to skip
1429 :param user: user to apply the public ssh key
1433 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1437 target_vdu_id
= None
1443 if ro_retries
>= 360: # 1 hour
1445 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1448 await asyncio
.sleep(10, loop
=self
.loop
)
1451 if not target_vdu_id
:
1452 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1454 if not vdu_id
: # for the VNF case
1455 if db_vnfr
.get("status") == "ERROR":
1457 "Cannot inject ssh-key because target VNF is in error state"
1459 ip_address
= db_vnfr
.get("ip-address")
1465 for x
in get_iterable(db_vnfr
, "vdur")
1466 if x
.get("ip-address") == ip_address
1474 for x
in get_iterable(db_vnfr
, "vdur")
1475 if x
.get("vdu-id-ref") == vdu_id
1476 and x
.get("count-index") == vdu_index
1482 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1483 ): # If only one, this should be the target vdu
1484 vdur
= db_vnfr
["vdur"][0]
1487 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1488 vnfr_id
, vdu_id
, vdu_index
1491 # New generation RO stores information at "vim_info"
1494 if vdur
.get("vim_info"):
1496 t
for t
in vdur
["vim_info"]
1497 ) # there should be only one key
1498 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1500 vdur
.get("pdu-type")
1501 or vdur
.get("status") == "ACTIVE"
1502 or ng_ro_status
== "ACTIVE"
1504 ip_address
= vdur
.get("ip-address")
1507 target_vdu_id
= vdur
["vdu-id-ref"]
1508 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1510 "Cannot inject ssh-key because target VM is in error state"
1513 if not target_vdu_id
:
1516 # inject public key into machine
1517 if pub_key
and user
:
1518 self
.logger
.debug(logging_text
+ "Inserting RO key")
1519 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1520 if vdur
.get("pdu-type"):
1521 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1524 ro_vm_id
= "{}-{}".format(
1525 db_vnfr
["member-vnf-index-ref"], target_vdu_id
1526 ) # TODO add vdu_index
1530 "action": "inject_ssh_key",
1534 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1536 desc
= await self
.RO
.deploy(nsr_id
, target
)
1537 action_id
= desc
["action_id"]
1538 await self
._wait
_ng
_ro
(nsr_id
, action_id
, timeout
=600)
1541 # wait until NS is deployed at RO
1543 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1544 ro_nsr_id
= deep_get(
1545 db_nsrs
, ("_admin", "deployed", "RO", "nsr_id")
1549 result_dict
= await self
.RO
.create_action(
1551 item_id_name
=ro_nsr_id
,
1553 "add_public_key": pub_key
,
1558 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1559 if not result_dict
or not isinstance(result_dict
, dict):
1561 "Unknown response from RO when injecting key"
1563 for result
in result_dict
.values():
1564 if result
.get("vim_result") == 200:
1567 raise ROclient
.ROClientException(
1568 "error injecting key: {}".format(
1569 result
.get("description")
1573 except NgRoException
as e
:
1575 "Reaching max tries injecting key. Error: {}".format(e
)
1577 except ROclient
.ROClientException
as e
:
1581 + "error injecting key: {}. Retrying until {} seconds".format(
1588 "Reaching max tries injecting key. Error: {}".format(e
)
1595 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1597 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1599 my_vca
= vca_deployed_list
[vca_index
]
1600 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1601 # vdu or kdu: no dependencies
1605 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1606 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1607 configuration_status_list
= db_nsr
["configurationStatus"]
1608 for index
, vca_deployed
in enumerate(configuration_status_list
):
1609 if index
== vca_index
:
1612 if not my_vca
.get("member-vnf-index") or (
1613 vca_deployed
.get("member-vnf-index")
1614 == my_vca
.get("member-vnf-index")
1616 internal_status
= configuration_status_list
[index
].get("status")
1617 if internal_status
== "READY":
1619 elif internal_status
== "BROKEN":
1621 "Configuration aborted because dependent charm/s has failed"
1626 # no dependencies, return
1628 await asyncio
.sleep(10)
1631 raise LcmException("Configuration aborted because dependent charm/s timeout")
1633 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1636 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1638 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1639 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1642 async def instantiate_N2VC(
1659 ee_config_descriptor
,
1661 nsr_id
= db_nsr
["_id"]
1662 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1663 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1664 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1665 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1667 "collection": "nsrs",
1668 "filter": {"_id": nsr_id
},
1669 "path": db_update_entry
,
1675 element_under_configuration
= nsr_id
1679 vnfr_id
= db_vnfr
["_id"]
1680 osm_config
["osm"]["vnf_id"] = vnfr_id
1682 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1684 if vca_type
== "native_charm":
1687 index_number
= vdu_index
or 0
1690 element_type
= "VNF"
1691 element_under_configuration
= vnfr_id
1692 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1694 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1695 element_type
= "VDU"
1696 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1697 osm_config
["osm"]["vdu_id"] = vdu_id
1699 namespace
+= ".{}".format(kdu_name
)
1700 element_type
= "KDU"
1701 element_under_configuration
= kdu_name
1702 osm_config
["osm"]["kdu_name"] = kdu_name
1705 artifact_path
= "{}/{}/{}/{}".format(
1706 base_folder
["folder"],
1707 base_folder
["pkg-dir"],
1709 if vca_type
in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1714 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1716 # get initial_config_primitive_list that applies to this element
1717 initial_config_primitive_list
= config_descriptor
.get(
1718 "initial-config-primitive"
1722 "Initial config primitive list > {}".format(
1723 initial_config_primitive_list
1727 # add config if not present for NS charm
1728 ee_descriptor_id
= ee_config_descriptor
.get("id")
1729 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1730 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1731 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1735 "Initial config primitive list #2 > {}".format(
1736 initial_config_primitive_list
1739 # n2vc_redesign STEP 3.1
1740 # find old ee_id if exists
1741 ee_id
= vca_deployed
.get("ee_id")
1743 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1744 # create or register execution environment in VCA
1745 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1747 self
._write
_configuration
_status
(
1749 vca_index
=vca_index
,
1751 element_under_configuration
=element_under_configuration
,
1752 element_type
=element_type
,
1755 step
= "create execution environment"
1756 self
.logger
.debug(logging_text
+ step
)
1760 if vca_type
== "k8s_proxy_charm":
1761 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1762 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1763 namespace
=namespace
,
1764 artifact_path
=artifact_path
,
1768 elif vca_type
== "helm" or vca_type
== "helm-v3":
1769 ee_id
, credentials
= await self
.vca_map
[
1771 ].create_execution_environment(
1772 namespace
=namespace
,
1776 artifact_path
=artifact_path
,
1780 ee_id
, credentials
= await self
.vca_map
[
1782 ].create_execution_environment(
1783 namespace
=namespace
,
1789 elif vca_type
== "native_charm":
1790 step
= "Waiting to VM being up and getting IP address"
1791 self
.logger
.debug(logging_text
+ step
)
1792 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1801 credentials
= {"hostname": rw_mgmt_ip
}
1803 username
= deep_get(
1804 config_descriptor
, ("config-access", "ssh-access", "default-user")
1806 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1807 # merged. Meanwhile let's get username from initial-config-primitive
1808 if not username
and initial_config_primitive_list
:
1809 for config_primitive
in initial_config_primitive_list
:
1810 for param
in config_primitive
.get("parameter", ()):
1811 if param
["name"] == "ssh-username":
1812 username
= param
["value"]
1816 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1817 "'config-access.ssh-access.default-user'"
1819 credentials
["username"] = username
1820 # n2vc_redesign STEP 3.2
1822 self
._write
_configuration
_status
(
1824 vca_index
=vca_index
,
1825 status
="REGISTERING",
1826 element_under_configuration
=element_under_configuration
,
1827 element_type
=element_type
,
1830 step
= "register execution environment {}".format(credentials
)
1831 self
.logger
.debug(logging_text
+ step
)
1832 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1833 credentials
=credentials
,
1834 namespace
=namespace
,
1839 # for compatibility with MON/POL modules, the need model and application name at database
1840 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1841 ee_id_parts
= ee_id
.split(".")
1842 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1843 if len(ee_id_parts
) >= 2:
1844 model_name
= ee_id_parts
[0]
1845 application_name
= ee_id_parts
[1]
1846 db_nsr_update
[db_update_entry
+ "model"] = model_name
1847 db_nsr_update
[db_update_entry
+ "application"] = application_name
1849 # n2vc_redesign STEP 3.3
1850 step
= "Install configuration Software"
1852 self
._write
_configuration
_status
(
1854 vca_index
=vca_index
,
1855 status
="INSTALLING SW",
1856 element_under_configuration
=element_under_configuration
,
1857 element_type
=element_type
,
1858 other_update
=db_nsr_update
,
1861 # TODO check if already done
1862 self
.logger
.debug(logging_text
+ step
)
1864 if vca_type
== "native_charm":
1865 config_primitive
= next(
1866 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
1869 if config_primitive
:
1870 config
= self
._map
_primitive
_params
(
1871 config_primitive
, {}, deploy_params
1874 if vca_type
== "lxc_proxy_charm":
1875 if element_type
== "NS":
1876 num_units
= db_nsr
.get("config-units") or 1
1877 elif element_type
== "VNF":
1878 num_units
= db_vnfr
.get("config-units") or 1
1879 elif element_type
== "VDU":
1880 for v
in db_vnfr
["vdur"]:
1881 if vdu_id
== v
["vdu-id-ref"]:
1882 num_units
= v
.get("config-units") or 1
1884 if vca_type
!= "k8s_proxy_charm":
1885 await self
.vca_map
[vca_type
].install_configuration_sw(
1887 artifact_path
=artifact_path
,
1890 num_units
=num_units
,
1895 # write in db flag of configuration_sw already installed
1897 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
1900 # add relations for this VCA (wait for other peers related with this VCA)
1901 await self
._add
_vca
_relations
(
1902 logging_text
=logging_text
,
1904 vca_index
=vca_index
,
1909 # if SSH access is required, then get execution environment SSH public
1910 # if native charm we have waited already to VM be UP
1911 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1914 # self.logger.debug("get ssh key block")
1916 config_descriptor
, ("config-access", "ssh-access", "required")
1918 # self.logger.debug("ssh key needed")
1919 # Needed to inject a ssh key
1922 ("config-access", "ssh-access", "default-user"),
1924 step
= "Install configuration Software, getting public ssh key"
1925 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
1926 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
1929 step
= "Insert public key into VM user={} ssh_key={}".format(
1933 # self.logger.debug("no need to get ssh key")
1934 step
= "Waiting to VM being up and getting IP address"
1935 self
.logger
.debug(logging_text
+ step
)
1937 # default rw_mgmt_ip to None, avoiding the non definition of the variable
1940 # n2vc_redesign STEP 5.1
1941 # wait for RO (ip-address) Insert pub_key into VM
1944 rw_mgmt_ip
= await self
.wait_kdu_up(
1945 logging_text
, nsr_id
, vnfr_id
, kdu_name
1948 # This verification is needed in order to avoid trying to add a public key
1949 # to a VM, when the VNF is a KNF (in the edge case where the user creates a VCA
1950 # for a KNF and not for its KDUs, the previous verification gives False, and the code
1951 # jumps to this block, meaning that there is the need to verify if the VNF is actually a VNF
1953 elif db_vnfr
.get('vdur'):
1954 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1964 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
1966 # store rw_mgmt_ip in deploy params for later replacement
1967 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
1969 # n2vc_redesign STEP 6 Execute initial config primitive
1970 step
= "execute initial config primitive"
1972 # wait for dependent primitives execution (NS -> VNF -> VDU)
1973 if initial_config_primitive_list
:
1974 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
1976 # stage, in function of element type: vdu, kdu, vnf or ns
1977 my_vca
= vca_deployed_list
[vca_index
]
1978 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1980 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
1981 elif my_vca
.get("member-vnf-index"):
1983 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
1986 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
1988 self
._write
_configuration
_status
(
1989 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
1992 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
1994 check_if_terminated_needed
= True
1995 for initial_config_primitive
in initial_config_primitive_list
:
1996 # adding information on the vca_deployed if it is a NS execution environment
1997 if not vca_deployed
["member-vnf-index"]:
1998 deploy_params
["ns_config_info"] = json
.dumps(
1999 self
._get
_ns
_config
_info
(nsr_id
)
2001 # TODO check if already done
2002 primitive_params_
= self
._map
_primitive
_params
(
2003 initial_config_primitive
, {}, deploy_params
2006 step
= "execute primitive '{}' params '{}'".format(
2007 initial_config_primitive
["name"], primitive_params_
2009 self
.logger
.debug(logging_text
+ step
)
2010 await self
.vca_map
[vca_type
].exec_primitive(
2012 primitive_name
=initial_config_primitive
["name"],
2013 params_dict
=primitive_params_
,
2018 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2019 if check_if_terminated_needed
:
2020 if config_descriptor
.get("terminate-config-primitive"):
2022 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
2024 check_if_terminated_needed
= False
2026 # TODO register in database that primitive is done
2028 # STEP 7 Configure metrics
2029 if vca_type
== "helm" or vca_type
== "helm-v3":
2030 prometheus_jobs
= await self
.add_prometheus_metrics(
2032 artifact_path
=artifact_path
,
2033 ee_config_descriptor
=ee_config_descriptor
,
2036 target_ip
=rw_mgmt_ip
,
2042 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
2045 step
= "instantiated at VCA"
2046 self
.logger
.debug(logging_text
+ step
)
2048 self
._write
_configuration
_status
(
2049 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
2052 except Exception as e
: # TODO not use Exception but N2VC exception
2053 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2055 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
2058 "Exception while {} : {}".format(step
, e
), exc_info
=True
2060 self
._write
_configuration
_status
(
2061 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2063 raise LcmException("{} {}".format(step
, e
)) from e
2065 def _write_ns_status(
2069 current_operation
: str,
2070 current_operation_id
: str,
2071 error_description
: str = None,
2072 error_detail
: str = None,
2073 other_update
: dict = None,
2076 Update db_nsr fields.
2079 :param current_operation:
2080 :param current_operation_id:
2081 :param error_description:
2082 :param error_detail:
2083 :param other_update: Other required changes at database if provided, will be cleared
2087 db_dict
= other_update
or {}
2090 ] = current_operation_id
# for backward compatibility
2091 db_dict
["_admin.current-operation"] = current_operation_id
2092 db_dict
["_admin.operation-type"] = (
2093 current_operation
if current_operation
!= "IDLE" else None
2095 db_dict
["currentOperation"] = current_operation
2096 db_dict
["currentOperationID"] = current_operation_id
2097 db_dict
["errorDescription"] = error_description
2098 db_dict
["errorDetail"] = error_detail
2101 db_dict
["nsState"] = ns_state
2102 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2103 except DbException
as e
:
2104 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2106 def _write_op_status(
2110 error_message
: str = None,
2111 queuePosition
: int = 0,
2112 operation_state
: str = None,
2113 other_update
: dict = None,
2116 db_dict
= other_update
or {}
2117 db_dict
["queuePosition"] = queuePosition
2118 if isinstance(stage
, list):
2119 db_dict
["stage"] = stage
[0]
2120 db_dict
["detailed-status"] = " ".join(stage
)
2121 elif stage
is not None:
2122 db_dict
["stage"] = str(stage
)
2124 if error_message
is not None:
2125 db_dict
["errorMessage"] = error_message
2126 if operation_state
is not None:
2127 db_dict
["operationState"] = operation_state
2128 db_dict
["statusEnteredTime"] = time()
2129 self
.update_db_2("nslcmops", op_id
, db_dict
)
2130 except DbException
as e
:
2132 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2135 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2137 nsr_id
= db_nsr
["_id"]
2138 # configurationStatus
2139 config_status
= db_nsr
.get("configurationStatus")
2142 "configurationStatus.{}.status".format(index
): status
2143 for index
, v
in enumerate(config_status
)
2147 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2149 except DbException
as e
:
2151 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2154 def _write_configuration_status(
2159 element_under_configuration
: str = None,
2160 element_type
: str = None,
2161 other_update
: dict = None,
2164 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2165 # .format(vca_index, status))
2168 db_path
= "configurationStatus.{}.".format(vca_index
)
2169 db_dict
= other_update
or {}
2171 db_dict
[db_path
+ "status"] = status
2172 if element_under_configuration
:
2174 db_path
+ "elementUnderConfiguration"
2175 ] = element_under_configuration
2177 db_dict
[db_path
+ "elementType"] = element_type
2178 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2179 except DbException
as e
:
2181 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2182 status
, nsr_id
, vca_index
, e
2186 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2188 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2189 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2190 Database is used because the result can be obtained from a different LCM worker in case of HA.
2191 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2192 :param db_nslcmop: database content of nslcmop
2193 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2194 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2195 computed 'vim-account-id'
2198 nslcmop_id
= db_nslcmop
["_id"]
2199 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2200 if placement_engine
== "PLA":
2202 logging_text
+ "Invoke and wait for placement optimization"
2204 await self
.msg
.aiowrite(
2205 "pla", "get_placement", {"nslcmopId": nslcmop_id
}, loop
=self
.loop
2207 db_poll_interval
= 5
2208 wait
= db_poll_interval
* 10
2210 while not pla_result
and wait
>= 0:
2211 await asyncio
.sleep(db_poll_interval
)
2212 wait
-= db_poll_interval
2213 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2214 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2218 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2221 for pla_vnf
in pla_result
["vnf"]:
2222 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2223 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2228 {"_id": vnfr
["_id"]},
2229 {"vim-account-id": pla_vnf
["vimAccountId"]},
2232 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2235 def update_nsrs_with_pla_result(self
, params
):
2237 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2239 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2241 except Exception as e
:
2242 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2244 async def instantiate(self
, nsr_id
, nslcmop_id
):
2247 :param nsr_id: ns instance to deploy
2248 :param nslcmop_id: operation to run
2252 # Try to lock HA task here
2253 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2254 if not task_is_locked_by_me
:
2256 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2260 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2261 self
.logger
.debug(logging_text
+ "Enter")
2263 # get all needed from database
2265 # database nsrs record
2268 # database nslcmops record
2271 # update operation on nsrs
2273 # update operation on nslcmops
2274 db_nslcmop_update
= {}
2276 nslcmop_operation_state
= None
2277 db_vnfrs
= {} # vnf's info indexed by member-index
2279 tasks_dict_info
= {} # from task to info text
2283 "Stage 1/5: preparation of the environment.",
2284 "Waiting for previous operations to terminate.",
2287 # ^ stage, step, VIM progress
2289 # wait for any previous tasks in process
2290 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2292 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2293 stage
[1] = "Reading from database."
2294 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2295 db_nsr_update
["detailed-status"] = "creating"
2296 db_nsr_update
["operational-status"] = "init"
2297 self
._write
_ns
_status
(
2299 ns_state
="BUILDING",
2300 current_operation
="INSTANTIATING",
2301 current_operation_id
=nslcmop_id
,
2302 other_update
=db_nsr_update
,
2304 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2306 # read from db: operation
2307 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2308 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2309 if db_nslcmop
["operationParams"].get("additionalParamsForVnf"):
2310 db_nslcmop
["operationParams"]["additionalParamsForVnf"] = json
.loads(
2311 db_nslcmop
["operationParams"]["additionalParamsForVnf"]
2313 ns_params
= db_nslcmop
.get("operationParams")
2314 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2315 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2317 timeout_ns_deploy
= self
.timeout
.get(
2318 "ns_deploy", self
.timeout_ns_deploy
2322 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2323 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2324 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2325 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2326 self
.fs
.sync(db_nsr
["nsd-id"])
2328 # nsr_name = db_nsr["name"] # TODO short-name??
2330 # read from db: vnf's of this ns
2331 stage
[1] = "Getting vnfrs from db."
2332 self
.logger
.debug(logging_text
+ stage
[1])
2333 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2335 # read from db: vnfd's for every vnf
2336 db_vnfds
= [] # every vnfd data
2338 # for each vnf in ns, read vnfd
2339 for vnfr
in db_vnfrs_list
:
2340 if vnfr
.get("kdur"):
2342 for kdur
in vnfr
["kdur"]:
2343 if kdur
.get("additionalParams"):
2344 kdur
["additionalParams"] = json
.loads(
2345 kdur
["additionalParams"]
2347 kdur_list
.append(kdur
)
2348 vnfr
["kdur"] = kdur_list
2350 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2351 vnfd_id
= vnfr
["vnfd-id"]
2352 vnfd_ref
= vnfr
["vnfd-ref"]
2353 self
.fs
.sync(vnfd_id
)
2355 # if we haven't this vnfd, read it from db
2356 if vnfd_id
not in db_vnfds
:
2358 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2361 self
.logger
.debug(logging_text
+ stage
[1])
2362 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2365 db_vnfds
.append(vnfd
)
2367 # Get or generates the _admin.deployed.VCA list
2368 vca_deployed_list
= None
2369 if db_nsr
["_admin"].get("deployed"):
2370 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2371 if vca_deployed_list
is None:
2372 vca_deployed_list
= []
2373 configuration_status_list
= []
2374 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2375 db_nsr_update
["configurationStatus"] = configuration_status_list
2376 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2377 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2378 elif isinstance(vca_deployed_list
, dict):
2379 # maintain backward compatibility. Change a dict to list at database
2380 vca_deployed_list
= list(vca_deployed_list
.values())
2381 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2382 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2385 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2387 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2388 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2390 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2391 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2392 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2394 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2397 # n2vc_redesign STEP 2 Deploy Network Scenario
2398 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2399 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2401 stage
[1] = "Deploying KDUs."
2402 # self.logger.debug(logging_text + "Before deploy_kdus")
2403 # Call to deploy_kdus in case exists the "vdu:kdu" param
2404 await self
.deploy_kdus(
2405 logging_text
=logging_text
,
2407 nslcmop_id
=nslcmop_id
,
2410 task_instantiation_info
=tasks_dict_info
,
2413 stage
[1] = "Getting VCA public key."
2414 # n2vc_redesign STEP 1 Get VCA public ssh-key
2415 # feature 1429. Add n2vc public key to needed VMs
2416 n2vc_key
= self
.n2vc
.get_public_key()
2417 n2vc_key_list
= [n2vc_key
]
2418 if self
.vca_config
.get("public_key"):
2419 n2vc_key_list
.append(self
.vca_config
["public_key"])
2421 stage
[1] = "Deploying NS at VIM."
2422 task_ro
= asyncio
.ensure_future(
2423 self
.instantiate_RO(
2424 logging_text
=logging_text
,
2428 db_nslcmop
=db_nslcmop
,
2431 n2vc_key_list
=n2vc_key_list
,
2435 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2436 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2438 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2439 stage
[1] = "Deploying Execution Environments."
2440 self
.logger
.debug(logging_text
+ stage
[1])
2442 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2443 for vnf_profile
in get_vnf_profiles(nsd
):
2444 vnfd_id
= vnf_profile
["vnfd-id"]
2445 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2446 member_vnf_index
= str(vnf_profile
["id"])
2447 db_vnfr
= db_vnfrs
[member_vnf_index
]
2448 base_folder
= vnfd
["_admin"]["storage"]
2454 # Get additional parameters
2455 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2456 if db_vnfr
.get("additionalParamsForVnf"):
2457 deploy_params
.update(
2458 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2461 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2462 if descriptor_config
:
2464 logging_text
=logging_text
2465 + "member_vnf_index={} ".format(member_vnf_index
),
2468 nslcmop_id
=nslcmop_id
,
2474 member_vnf_index
=member_vnf_index
,
2475 vdu_index
=vdu_index
,
2477 deploy_params
=deploy_params
,
2478 descriptor_config
=descriptor_config
,
2479 base_folder
=base_folder
,
2480 task_instantiation_info
=tasks_dict_info
,
2484 # Deploy charms for each VDU that supports one.
2485 for vdud
in get_vdu_list(vnfd
):
2487 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2488 vdur
= find_in_list(
2489 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2492 if vdur
.get("additionalParams"):
2493 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2495 deploy_params_vdu
= deploy_params
2496 deploy_params_vdu
["OSM"] = get_osm_params(
2497 db_vnfr
, vdu_id
, vdu_count_index
=0
2499 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2501 self
.logger
.debug("VDUD > {}".format(vdud
))
2503 "Descriptor config > {}".format(descriptor_config
)
2505 if descriptor_config
:
2508 for vdu_index
in range(vdud_count
):
2509 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2511 logging_text
=logging_text
2512 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2513 member_vnf_index
, vdu_id
, vdu_index
2517 nslcmop_id
=nslcmop_id
,
2523 member_vnf_index
=member_vnf_index
,
2524 vdu_index
=vdu_index
,
2526 deploy_params
=deploy_params_vdu
,
2527 descriptor_config
=descriptor_config
,
2528 base_folder
=base_folder
,
2529 task_instantiation_info
=tasks_dict_info
,
2532 for kdud
in get_kdu_list(vnfd
):
2533 kdu_name
= kdud
["name"]
2534 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2535 if descriptor_config
:
2540 x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
2542 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2543 if kdur
.get("additionalParams"):
2544 deploy_params_kdu
.update(
2545 parse_yaml_strings(kdur
["additionalParams"].copy())
2549 logging_text
=logging_text
,
2552 nslcmop_id
=nslcmop_id
,
2558 member_vnf_index
=member_vnf_index
,
2559 vdu_index
=vdu_index
,
2561 deploy_params
=deploy_params_kdu
,
2562 descriptor_config
=descriptor_config
,
2563 base_folder
=base_folder
,
2564 task_instantiation_info
=tasks_dict_info
,
2568 # Check if this NS has a charm configuration
2569 descriptor_config
= nsd
.get("ns-configuration")
2570 if descriptor_config
and descriptor_config
.get("juju"):
2573 member_vnf_index
= None
2579 # Get additional parameters
2580 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2581 if db_nsr
.get("additionalParamsForNs"):
2582 deploy_params
.update(
2583 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2585 base_folder
= nsd
["_admin"]["storage"]
2587 logging_text
=logging_text
,
2590 nslcmop_id
=nslcmop_id
,
2596 member_vnf_index
=member_vnf_index
,
2597 vdu_index
=vdu_index
,
2599 deploy_params
=deploy_params
,
2600 descriptor_config
=descriptor_config
,
2601 base_folder
=base_folder
,
2602 task_instantiation_info
=tasks_dict_info
,
2606 # rest of staff will be done at finally
2609 ROclient
.ROClientException
,
2615 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
2618 except asyncio
.CancelledError
:
2620 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
2622 exc
= "Operation was cancelled"
2623 except Exception as e
:
2624 exc
= traceback
.format_exc()
2625 self
.logger
.critical(
2626 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
2631 error_list
.append(str(exc
))
2633 # wait for pending tasks
2635 stage
[1] = "Waiting for instantiate pending tasks."
2636 self
.logger
.debug(logging_text
+ stage
[1])
2637 error_list
+= await self
._wait
_for
_tasks
(
2645 stage
[1] = stage
[2] = ""
2646 except asyncio
.CancelledError
:
2647 error_list
.append("Cancelled")
2648 # TODO cancel all tasks
2649 except Exception as exc
:
2650 error_list
.append(str(exc
))
2652 # update operation-status
2653 db_nsr_update
["operational-status"] = "running"
2654 # let's begin with VCA 'configured' status (later we can change it)
2655 db_nsr_update
["config-status"] = "configured"
2656 for task
, task_name
in tasks_dict_info
.items():
2657 if not task
.done() or task
.cancelled() or task
.exception():
2658 if task_name
.startswith(self
.task_name_deploy_vca
):
2659 # A N2VC task is pending
2660 db_nsr_update
["config-status"] = "failed"
2662 # RO or KDU task is pending
2663 db_nsr_update
["operational-status"] = "failed"
2665 # update status at database
2667 error_detail
= ". ".join(error_list
)
2668 self
.logger
.error(logging_text
+ error_detail
)
2669 error_description_nslcmop
= "{} Detail: {}".format(
2670 stage
[0], error_detail
2672 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
2673 nslcmop_id
, stage
[0]
2676 db_nsr_update
["detailed-status"] = (
2677 error_description_nsr
+ " Detail: " + error_detail
2679 db_nslcmop_update
["detailed-status"] = error_detail
2680 nslcmop_operation_state
= "FAILED"
2684 error_description_nsr
= error_description_nslcmop
= None
2686 db_nsr_update
["detailed-status"] = "Done"
2687 db_nslcmop_update
["detailed-status"] = "Done"
2688 nslcmop_operation_state
= "COMPLETED"
2691 self
._write
_ns
_status
(
2694 current_operation
="IDLE",
2695 current_operation_id
=None,
2696 error_description
=error_description_nsr
,
2697 error_detail
=error_detail
,
2698 other_update
=db_nsr_update
,
2700 self
._write
_op
_status
(
2703 error_message
=error_description_nslcmop
,
2704 operation_state
=nslcmop_operation_state
,
2705 other_update
=db_nslcmop_update
,
2708 if nslcmop_operation_state
:
2710 await self
.msg
.aiowrite(
2715 "nslcmop_id": nslcmop_id
,
2716 "operationState": nslcmop_operation_state
,
2720 except Exception as e
:
2722 logging_text
+ "kafka_write notification Exception {}".format(e
)
2725 self
.logger
.debug(logging_text
+ "Exit")
2726 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2728 async def _add_vca_relations(
2733 timeout
: int = 3600,
2734 vca_type
: str = None,
2739 # 1. find all relations for this VCA
2740 # 2. wait for other peers related
2744 vca_type
= vca_type
or "lxc_proxy_charm"
2746 # STEP 1: find all relations for this VCA
2749 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2750 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2753 my_vca
= deep_get(db_nsr
, ("_admin", "deployed", "VCA"))[vca_index
]
2755 # read all ns-configuration relations
2756 ns_relations
= list()
2757 db_ns_relations
= deep_get(nsd
, ("ns-configuration", "relation"))
2759 for r
in db_ns_relations
:
2760 # check if this VCA is in the relation
2761 if my_vca
.get("member-vnf-index") in (
2762 r
.get("entities")[0].get("id"),
2763 r
.get("entities")[1].get("id"),
2765 ns_relations
.append(r
)
2767 # read all vnf-configuration relations
2768 vnf_relations
= list()
2769 db_vnfd_list
= db_nsr
.get("vnfd-id")
2771 for vnfd
in db_vnfd_list
:
2772 db_vnf_relations
= None
2773 db_vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd
})
2774 db_vnf_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
2775 if db_vnf_configuration
:
2776 db_vnf_relations
= db_vnf_configuration
.get("relation", [])
2777 if db_vnf_relations
:
2778 for r
in db_vnf_relations
:
2779 # check if this VCA is in the relation
2780 if my_vca
.get("vdu_id") in (
2781 r
.get("entities")[0].get("id"),
2782 r
.get("entities")[1].get("id"),
2784 vnf_relations
.append(r
)
2786 # if no relations, terminate
2787 if not ns_relations
and not vnf_relations
:
2788 self
.logger
.debug(logging_text
+ " No relations")
2793 + " adding relations\n {}\n {}".format(
2794 ns_relations
, vnf_relations
2803 if now
- start
>= timeout
:
2804 self
.logger
.error(logging_text
+ " : timeout adding relations")
2807 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
2808 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2810 # for each defined NS relation, find the VCA's related
2811 for r
in ns_relations
.copy():
2812 from_vca_ee_id
= None
2814 from_vca_endpoint
= None
2815 to_vca_endpoint
= None
2816 vca_list
= deep_get(db_nsr
, ("_admin", "deployed", "VCA"))
2817 for vca
in vca_list
:
2818 if vca
.get("member-vnf-index") == r
.get("entities")[0].get(
2820 ) and vca
.get("config_sw_installed"):
2821 from_vca_ee_id
= vca
.get("ee_id")
2822 from_vca_endpoint
= r
.get("entities")[0].get("endpoint")
2823 if vca
.get("member-vnf-index") == r
.get("entities")[1].get(
2825 ) and vca
.get("config_sw_installed"):
2826 to_vca_ee_id
= vca
.get("ee_id")
2827 to_vca_endpoint
= r
.get("entities")[1].get("endpoint")
2828 if from_vca_ee_id
and to_vca_ee_id
:
2830 await self
.vca_map
[vca_type
].add_relation(
2831 ee_id_1
=from_vca_ee_id
,
2832 ee_id_2
=to_vca_ee_id
,
2833 endpoint_1
=from_vca_endpoint
,
2834 endpoint_2
=to_vca_endpoint
,
2837 # remove entry from relations list
2838 ns_relations
.remove(r
)
2840 # check failed peers
2842 vca_status_list
= db_nsr
.get("configurationStatus")
2844 for i
in range(len(vca_list
)):
2846 vca_status
= vca_status_list
[i
]
2847 if vca
.get("member-vnf-index") == r
.get("entities")[
2850 if vca_status
.get("status") == "BROKEN":
2851 # peer broken: remove relation from list
2852 ns_relations
.remove(r
)
2853 if vca
.get("member-vnf-index") == r
.get("entities")[
2856 if vca_status
.get("status") == "BROKEN":
2857 # peer broken: remove relation from list
2858 ns_relations
.remove(r
)
2863 # for each defined VNF relation, find the VCA's related
2864 for r
in vnf_relations
.copy():
2865 from_vca_ee_id
= None
2867 from_vca_endpoint
= None
2868 to_vca_endpoint
= None
2869 vca_list
= deep_get(db_nsr
, ("_admin", "deployed", "VCA"))
2870 for vca
in vca_list
:
2871 key_to_check
= "vdu_id"
2872 if vca
.get("vdu_id") is None:
2873 key_to_check
= "vnfd_id"
2874 if vca
.get(key_to_check
) == r
.get("entities")[0].get(
2876 ) and vca
.get("config_sw_installed"):
2877 from_vca_ee_id
= vca
.get("ee_id")
2878 from_vca_endpoint
= r
.get("entities")[0].get("endpoint")
2879 if vca
.get(key_to_check
) == r
.get("entities")[1].get(
2881 ) and vca
.get("config_sw_installed"):
2882 to_vca_ee_id
= vca
.get("ee_id")
2883 to_vca_endpoint
= r
.get("entities")[1].get("endpoint")
2884 if from_vca_ee_id
and to_vca_ee_id
:
2886 await self
.vca_map
[vca_type
].add_relation(
2887 ee_id_1
=from_vca_ee_id
,
2888 ee_id_2
=to_vca_ee_id
,
2889 endpoint_1
=from_vca_endpoint
,
2890 endpoint_2
=to_vca_endpoint
,
2893 # remove entry from relations list
2894 vnf_relations
.remove(r
)
2896 # check failed peers
2898 vca_status_list
= db_nsr
.get("configurationStatus")
2900 for i
in range(len(vca_list
)):
2902 vca_status
= vca_status_list
[i
]
2903 if vca
.get("vdu_id") == r
.get("entities")[0].get(
2906 if vca_status
.get("status") == "BROKEN":
2907 # peer broken: remove relation from list
2908 vnf_relations
.remove(r
)
2909 if vca
.get("vdu_id") == r
.get("entities")[1].get(
2912 if vca_status
.get("status") == "BROKEN":
2913 # peer broken: remove relation from list
2914 vnf_relations
.remove(r
)
2920 await asyncio
.sleep(5.0)
2922 if not ns_relations
and not vnf_relations
:
2923 self
.logger
.debug("Relations added")
2928 except Exception as e
:
2929 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
2932 async def _install_kdu(
2940 k8s_instance_info
: dict,
2941 k8params
: dict = None,
2947 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
2950 "collection": "nsrs",
2951 "filter": {"_id": nsr_id
},
2952 "path": nsr_db_path
,
2955 if k8s_instance_info
.get("kdu-deployment-name"):
2956 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
2958 kdu_instance
= self
.k8scluster_map
[
2960 ].generate_kdu_instance_name(
2961 db_dict
=db_dict_install
,
2962 kdu_model
=k8s_instance_info
["kdu-model"],
2963 kdu_name
=k8s_instance_info
["kdu-name"],
2966 # Update the nsrs table with the kdu-instance value
2970 _desc
={nsr_db_path
+ ".kdu-instance": kdu_instance
},
2973 # Update the nsrs table with the actual namespace being used, if the k8scluster-type is `juju` or
2974 # `juju-bundle`. This verification is needed because there is not a standard/homogeneous namespace
2975 # between the Helm Charts and Juju Bundles-based KNFs. If we found a way of having an homogeneous
2976 # namespace, this first verification could be removed, and the next step would be done for any kind
2978 # TODO -> find a way to have an homogeneous namespace between the Helm Charts and Juju Bundles-based
2979 # KNFs (Bug 2027: https://osm.etsi.org/bugzilla/show_bug.cgi?id=2027)
2980 if k8sclustertype
in ("juju", "juju-bundle"):
2981 # First, verify if the current namespace is present in the `_admin.projects_read` (if not, it means
2982 # that the user passed a namespace which he wants its KDU to be deployed in)
2988 "_admin.projects_write": k8s_instance_info
["namespace"],
2989 "_admin.projects_read": k8s_instance_info
["namespace"],
2995 f
"Updating namespace/model for Juju Bundle from {k8s_instance_info['namespace']} to {kdu_instance}"
3000 _desc
={f
"{nsr_db_path}.namespace": kdu_instance
},
3002 k8s_instance_info
["namespace"] = kdu_instance
3004 await self
.k8scluster_map
[k8sclustertype
].install(
3005 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3006 kdu_model
=k8s_instance_info
["kdu-model"],
3009 db_dict
=db_dict_install
,
3011 kdu_name
=k8s_instance_info
["kdu-name"],
3012 namespace
=k8s_instance_info
["namespace"],
3013 kdu_instance
=kdu_instance
,
3017 # Obtain services to obtain management service ip
3018 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
3019 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3020 kdu_instance
=kdu_instance
,
3021 namespace
=k8s_instance_info
["namespace"],
3024 # Obtain management service info (if exists)
3025 vnfr_update_dict
= {}
3026 kdu_config
= get_configuration(vnfd
, kdud
["name"])
3028 target_ee_list
= kdu_config
.get("execution-environment-list", [])
3033 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
3036 for service
in kdud
.get("service", [])
3037 if service
.get("mgmt-service")
3039 for mgmt_service
in mgmt_services
:
3040 for service
in services
:
3041 if service
["name"].startswith(mgmt_service
["name"]):
3042 # Mgmt service found, Obtain service ip
3043 ip
= service
.get("external_ip", service
.get("cluster_ip"))
3044 if isinstance(ip
, list) and len(ip
) == 1:
3048 "kdur.{}.ip-address".format(kdu_index
)
3051 # Check if must update also mgmt ip at the vnf
3052 service_external_cp
= mgmt_service
.get(
3053 "external-connection-point-ref"
3055 if service_external_cp
:
3057 deep_get(vnfd
, ("mgmt-interface", "cp"))
3058 == service_external_cp
3060 vnfr_update_dict
["ip-address"] = ip
3065 "external-connection-point-ref", ""
3067 == service_external_cp
,
3070 "kdur.{}.ip-address".format(kdu_index
)
3075 "Mgmt service name: {} not found".format(
3076 mgmt_service
["name"]
3080 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3081 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3083 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3086 and kdu_config
.get("initial-config-primitive")
3087 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3089 initial_config_primitive_list
= kdu_config
.get(
3090 "initial-config-primitive"
3092 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3094 for initial_config_primitive
in initial_config_primitive_list
:
3095 primitive_params_
= self
._map
_primitive
_params
(
3096 initial_config_primitive
, {}, {}
3099 await asyncio
.wait_for(
3100 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3101 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3102 kdu_instance
=kdu_instance
,
3103 primitive_name
=initial_config_primitive
["name"],
3104 params
=primitive_params_
,
3105 db_dict
=db_dict_install
,
3111 except Exception as e
:
3112 # Prepare update db with error and raise exception
3115 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3119 vnfr_data
.get("_id"),
3120 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3123 # ignore to keep original exception
3125 # reraise original error
3130 async def deploy_kdus(
3137 task_instantiation_info
,
3139 # Launch kdus if present in the descriptor
3141 k8scluster_id_2_uuic
= {
3142 "helm-chart-v3": {},
3147 async def _get_cluster_id(cluster_id
, cluster_type
):
3148 nonlocal k8scluster_id_2_uuic
3149 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3150 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3152 # check if K8scluster is creating and wait look if previous tasks in process
3153 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3154 "k8scluster", cluster_id
3157 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3158 task_name
, cluster_id
3160 self
.logger
.debug(logging_text
+ text
)
3161 await asyncio
.wait(task_dependency
, timeout
=3600)
3163 db_k8scluster
= self
.db
.get_one(
3164 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3166 if not db_k8scluster
:
3167 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3169 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3171 if cluster_type
== "helm-chart-v3":
3173 # backward compatibility for existing clusters that have not been initialized for helm v3
3174 k8s_credentials
= yaml
.safe_dump(
3175 db_k8scluster
.get("credentials")
3177 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3178 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3180 db_k8scluster_update
= {}
3181 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3182 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3183 db_k8scluster_update
[
3184 "_admin.helm-chart-v3.created"
3186 db_k8scluster_update
[
3187 "_admin.helm-chart-v3.operationalState"
3190 "k8sclusters", cluster_id
, db_k8scluster_update
3192 except Exception as e
:
3195 + "error initializing helm-v3 cluster: {}".format(str(e
))
3198 "K8s cluster '{}' has not been initialized for '{}'".format(
3199 cluster_id
, cluster_type
3204 "K8s cluster '{}' has not been initialized for '{}'".format(
3205 cluster_id
, cluster_type
3208 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3211 logging_text
+= "Deploy kdus: "
3214 db_nsr_update
= {"_admin.deployed.K8s": []}
3215 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3218 updated_cluster_list
= []
3219 updated_v3_cluster_list
= []
3221 for vnfr_data
in db_vnfrs
.values():
3222 vca_id
= self
.get_vca_id(vnfr_data
, {})
3223 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3224 # Step 0: Prepare and set parameters
3225 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3226 vnfd_id
= vnfr_data
.get("vnfd-id")
3227 vnfd_with_id
= find_in_list(
3228 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3232 for kdud
in vnfd_with_id
["kdu"]
3233 if kdud
["name"] == kdur
["kdu-name"]
3235 namespace
= kdur
.get("k8s-namespace")
3236 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3237 if kdur
.get("helm-chart"):
3238 kdumodel
= kdur
["helm-chart"]
3239 # Default version: helm3, if helm-version is v2 assign v2
3240 k8sclustertype
= "helm-chart-v3"
3241 self
.logger
.debug("kdur: {}".format(kdur
))
3243 kdur
.get("helm-version")
3244 and kdur
.get("helm-version") == "v2"
3246 k8sclustertype
= "helm-chart"
3247 elif kdur
.get("juju-bundle"):
3248 kdumodel
= kdur
["juju-bundle"]
3249 k8sclustertype
= "juju-bundle"
3252 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3253 "juju-bundle. Maybe an old NBI version is running".format(
3254 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3257 # check if kdumodel is a file and exists
3259 vnfd_with_id
= find_in_list(
3260 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3262 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3263 if storage
and storage
.get(
3265 ): # may be not present if vnfd has not artifacts
3266 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3267 filename
= "{}/{}/{}s/{}".format(
3273 if self
.fs
.file_exists(
3274 filename
, mode
="file"
3275 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3276 kdumodel
= self
.fs
.path
+ filename
3277 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3279 except Exception: # it is not a file
3282 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3283 step
= "Synchronize repos for k8s cluster '{}'".format(
3286 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3290 k8sclustertype
== "helm-chart"
3291 and cluster_uuid
not in updated_cluster_list
3293 k8sclustertype
== "helm-chart-v3"
3294 and cluster_uuid
not in updated_v3_cluster_list
3296 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3297 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3298 cluster_uuid
=cluster_uuid
3301 if del_repo_list
or added_repo_dict
:
3302 if k8sclustertype
== "helm-chart":
3304 "_admin.helm_charts_added." + item
: None
3305 for item
in del_repo_list
3308 "_admin.helm_charts_added." + item
: name
3309 for item
, name
in added_repo_dict
.items()
3311 updated_cluster_list
.append(cluster_uuid
)
3312 elif k8sclustertype
== "helm-chart-v3":
3314 "_admin.helm_charts_v3_added." + item
: None
3315 for item
in del_repo_list
3318 "_admin.helm_charts_v3_added." + item
: name
3319 for item
, name
in added_repo_dict
.items()
3321 updated_v3_cluster_list
.append(cluster_uuid
)
3323 logging_text
+ "repos synchronized on k8s cluster "
3324 "'{}' to_delete: {}, to_add: {}".format(
3325 k8s_cluster_id
, del_repo_list
, added_repo_dict
3330 {"_id": k8s_cluster_id
},
3336 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3337 vnfr_data
["member-vnf-index-ref"],
3341 k8s_instance_info
= {
3342 "kdu-instance": None,
3343 "k8scluster-uuid": cluster_uuid
,
3344 "k8scluster-type": k8sclustertype
,
3345 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3346 "kdu-name": kdur
["kdu-name"],
3347 "kdu-model": kdumodel
,
3348 "namespace": namespace
,
3349 "kdu-deployment-name": kdu_deployment_name
,
3351 db_path
= "_admin.deployed.K8s.{}".format(index
)
3352 db_nsr_update
[db_path
] = k8s_instance_info
3353 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3354 vnfd_with_id
= find_in_list(
3355 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3357 task
= asyncio
.ensure_future(
3366 k8params
=desc_params
,
3371 self
.lcm_tasks
.register(
3375 "instantiate_KDU-{}".format(index
),
3378 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3384 except (LcmException
, asyncio
.CancelledError
):
3386 except Exception as e
:
3387 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3388 if isinstance(e
, (N2VCException
, DbException
)):
3389 self
.logger
.error(logging_text
+ msg
)
3391 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3392 raise LcmException(msg
)
3395 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3414 task_instantiation_info
,
3417 # launch instantiate_N2VC in a asyncio task and register task object
3418 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3419 # if not found, create one entry and update database
3420 # fill db_nsr._admin.deployed.VCA.<index>
3423 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
3425 if "execution-environment-list" in descriptor_config
:
3426 ee_list
= descriptor_config
.get("execution-environment-list", [])
3427 elif "juju" in descriptor_config
:
3428 ee_list
= [descriptor_config
] # ns charms
3429 else: # other types as script are not supported
3432 for ee_item
in ee_list
:
3435 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3436 ee_item
.get("juju"), ee_item
.get("helm-chart")
3439 ee_descriptor_id
= ee_item
.get("id")
3440 if ee_item
.get("juju"):
3441 vca_name
= ee_item
["juju"].get("charm")
3444 if ee_item
["juju"].get("charm") is not None
3447 if ee_item
["juju"].get("cloud") == "k8s":
3448 vca_type
= "k8s_proxy_charm"
3449 elif ee_item
["juju"].get("proxy") is False:
3450 vca_type
= "native_charm"
3451 elif ee_item
.get("helm-chart"):
3452 vca_name
= ee_item
["helm-chart"]
3453 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
3456 vca_type
= "helm-v3"
3459 logging_text
+ "skipping non juju neither charm configuration"
3464 for vca_index
, vca_deployed
in enumerate(
3465 db_nsr
["_admin"]["deployed"]["VCA"]
3467 if not vca_deployed
:
3470 vca_deployed
.get("member-vnf-index") == member_vnf_index
3471 and vca_deployed
.get("vdu_id") == vdu_id
3472 and vca_deployed
.get("kdu_name") == kdu_name
3473 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
3474 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
3478 # not found, create one.
3480 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
3483 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
3485 target
+= "/kdu/{}".format(kdu_name
)
3487 "target_element": target
,
3488 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3489 "member-vnf-index": member_vnf_index
,
3491 "kdu_name": kdu_name
,
3492 "vdu_count_index": vdu_index
,
3493 "operational-status": "init", # TODO revise
3494 "detailed-status": "", # TODO revise
3495 "step": "initial-deploy", # TODO revise
3497 "vdu_name": vdu_name
,
3499 "ee_descriptor_id": ee_descriptor_id
,
3503 # create VCA and configurationStatus in db
3505 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
3506 "configurationStatus.{}".format(vca_index
): dict(),
3508 self
.update_db_2("nsrs", nsr_id
, db_dict
)
3510 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
3512 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
3513 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
3514 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
3517 task_n2vc
= asyncio
.ensure_future(
3518 self
.instantiate_N2VC(
3519 logging_text
=logging_text
,
3520 vca_index
=vca_index
,
3526 vdu_index
=vdu_index
,
3527 deploy_params
=deploy_params
,
3528 config_descriptor
=descriptor_config
,
3529 base_folder
=base_folder
,
3530 nslcmop_id
=nslcmop_id
,
3534 ee_config_descriptor
=ee_item
,
3537 self
.lcm_tasks
.register(
3541 "instantiate_N2VC-{}".format(vca_index
),
3544 task_instantiation_info
[
3546 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
3547 member_vnf_index
or "", vdu_id
or ""
3551 def _create_nslcmop(nsr_id
, operation
, params
):
3553 Creates a ns-lcm-opp content to be stored at database.
3554 :param nsr_id: internal id of the instance
3555 :param operation: instantiate, terminate, scale, action, ...
3556 :param params: user parameters for the operation
3557 :return: dictionary following SOL005 format
3559 # Raise exception if invalid arguments
3560 if not (nsr_id
and operation
and params
):
3562 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3569 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3570 "operationState": "PROCESSING",
3571 "statusEnteredTime": now
,
3572 "nsInstanceId": nsr_id
,
3573 "lcmOperationType": operation
,
3575 "isAutomaticInvocation": False,
3576 "operationParams": params
,
3577 "isCancelPending": False,
3579 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
3580 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
3585 def _format_additional_params(self
, params
):
3586 params
= params
or {}
3587 for key
, value
in params
.items():
3588 if str(value
).startswith("!!yaml "):
3589 params
[key
] = yaml
.safe_load(value
[7:])
3592 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
3593 primitive
= seq
.get("name")
3594 primitive_params
= {}
3596 "member_vnf_index": vnf_index
,
3597 "primitive": primitive
,
3598 "primitive_params": primitive_params
,
3601 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
3605 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
3606 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
3607 if op
.get("operationState") == "COMPLETED":
3608 # b. Skip sub-operation
3609 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3610 return self
.SUBOPERATION_STATUS_SKIP
3612 # c. retry executing sub-operation
3613 # The sub-operation exists, and operationState != 'COMPLETED'
3614 # Update operationState = 'PROCESSING' to indicate a retry.
3615 operationState
= "PROCESSING"
3616 detailed_status
= "In progress"
3617 self
._update
_suboperation
_status
(
3618 db_nslcmop
, op_index
, operationState
, detailed_status
3620 # Return the sub-operation index
3621 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3622 # with arguments extracted from the sub-operation
3625 # Find a sub-operation where all keys in a matching dictionary must match
3626 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3627 def _find_suboperation(self
, db_nslcmop
, match
):
3628 if db_nslcmop
and match
:
3629 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
3630 for i
, op
in enumerate(op_list
):
3631 if all(op
.get(k
) == match
[k
] for k
in match
):
3633 return self
.SUBOPERATION_STATUS_NOT_FOUND
3635 # Update status for a sub-operation given its index
3636 def _update_suboperation_status(
3637 self
, db_nslcmop
, op_index
, operationState
, detailed_status
3639 # Update DB for HA tasks
3640 q_filter
= {"_id": db_nslcmop
["_id"]}
3642 "_admin.operations.{}.operationState".format(op_index
): operationState
,
3643 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
3646 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
3649 # Add sub-operation, return the index of the added sub-operation
3650 # Optionally, set operationState, detailed-status, and operationType
3651 # Status and type are currently set for 'scale' sub-operations:
3652 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3653 # 'detailed-status' : status message
3654 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3655 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3656 def _add_suboperation(
3664 mapped_primitive_params
,
3665 operationState
=None,
3666 detailed_status
=None,
3669 RO_scaling_info
=None,
3672 return self
.SUBOPERATION_STATUS_NOT_FOUND
3673 # Get the "_admin.operations" list, if it exists
3674 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
3675 op_list
= db_nslcmop_admin
.get("operations")
3676 # Create or append to the "_admin.operations" list
3678 "member_vnf_index": vnf_index
,
3680 "vdu_count_index": vdu_count_index
,
3681 "primitive": primitive
,
3682 "primitive_params": mapped_primitive_params
,
3685 new_op
["operationState"] = operationState
3687 new_op
["detailed-status"] = detailed_status
3689 new_op
["lcmOperationType"] = operationType
3691 new_op
["RO_nsr_id"] = RO_nsr_id
3693 new_op
["RO_scaling_info"] = RO_scaling_info
3695 # No existing operations, create key 'operations' with current operation as first list element
3696 db_nslcmop_admin
.update({"operations": [new_op
]})
3697 op_list
= db_nslcmop_admin
.get("operations")
3699 # Existing operations, append operation to list
3700 op_list
.append(new_op
)
3702 db_nslcmop_update
= {"_admin.operations": op_list
}
3703 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
3704 op_index
= len(op_list
) - 1
3707 # Helper methods for scale() sub-operations
3709 # pre-scale/post-scale:
3710 # Check for 3 different cases:
3711 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
3712 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
3713 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
3714 def _check_or_add_scale_suboperation(
3718 vnf_config_primitive
,
3722 RO_scaling_info
=None,
3724 # Find this sub-operation
3725 if RO_nsr_id
and RO_scaling_info
:
3726 operationType
= "SCALE-RO"
3728 "member_vnf_index": vnf_index
,
3729 "RO_nsr_id": RO_nsr_id
,
3730 "RO_scaling_info": RO_scaling_info
,
3734 "member_vnf_index": vnf_index
,
3735 "primitive": vnf_config_primitive
,
3736 "primitive_params": primitive_params
,
3737 "lcmOperationType": operationType
,
3739 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
3740 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
3741 # a. New sub-operation
3742 # The sub-operation does not exist, add it.
3743 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
3744 # The following parameters are set to None for all kind of scaling:
3746 vdu_count_index
= None
3748 if RO_nsr_id
and RO_scaling_info
:
3749 vnf_config_primitive
= None
3750 primitive_params
= None
3753 RO_scaling_info
= None
3754 # Initial status for sub-operation
3755 operationState
= "PROCESSING"
3756 detailed_status
= "In progress"
3757 # Add sub-operation for pre/post-scaling (zero or more operations)
3758 self
._add
_suboperation
(
3764 vnf_config_primitive
,
3772 return self
.SUBOPERATION_STATUS_NEW
3774 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3775 # or op_index (operationState != 'COMPLETED')
3776 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
3778 # Function to return execution_environment id
3780 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
3781 # TODO vdu_index_count
3782 for vca
in vca_deployed_list
:
3783 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
3786 async def destroy_N2VC(
3794 exec_primitives
=True,
3799 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
3800 :param logging_text:
3802 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
3803 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
3804 :param vca_index: index in the database _admin.deployed.VCA
3805 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
3806 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
3807 not executed properly
3808 :param scaling_in: True destroys the application, False destroys the model
3809 :return: None or exception
3814 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
3815 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
3819 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
3821 # execute terminate_primitives
3823 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
3824 config_descriptor
.get("terminate-config-primitive"),
3825 vca_deployed
.get("ee_descriptor_id"),
3827 vdu_id
= vca_deployed
.get("vdu_id")
3828 vdu_count_index
= vca_deployed
.get("vdu_count_index")
3829 vdu_name
= vca_deployed
.get("vdu_name")
3830 vnf_index
= vca_deployed
.get("member-vnf-index")
3831 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
3832 for seq
in terminate_primitives
:
3833 # For each sequence in list, get primitive and call _ns_execute_primitive()
3834 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
3835 vnf_index
, seq
.get("name")
3837 self
.logger
.debug(logging_text
+ step
)
3838 # Create the primitive for each sequence, i.e. "primitive": "touch"
3839 primitive
= seq
.get("name")
3840 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
3845 self
._add
_suboperation
(
3852 mapped_primitive_params
,
3854 # Sub-operations: Call _ns_execute_primitive() instead of action()
3856 result
, result_detail
= await self
._ns
_execute
_primitive
(
3857 vca_deployed
["ee_id"],
3859 mapped_primitive_params
,
3863 except LcmException
:
3864 # this happens when VCA is not deployed. In this case it is not needed to terminate
3866 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
3867 if result
not in result_ok
:
3869 "terminate_primitive {} for vnf_member_index={} fails with "
3870 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
3872 # set that this VCA do not need terminated
3873 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
3877 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
3880 if vca_deployed
.get("prometheus_jobs") and self
.prometheus
:
3881 await self
.prometheus
.update(remove_jobs
=vca_deployed
["prometheus_jobs"])
3884 await self
.vca_map
[vca_type
].delete_execution_environment(
3885 vca_deployed
["ee_id"],
3886 scaling_in
=scaling_in
,
3891 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
3892 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
3893 namespace
= "." + db_nsr
["_id"]
3895 await self
.n2vc
.delete_namespace(
3896 namespace
=namespace
,
3897 total_timeout
=self
.timeout_charm_delete
,
3900 except N2VCNotFound
: # already deleted. Skip
3902 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
3904 async def _terminate_RO(
3905 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
3908 Terminates a deployment from RO
3909 :param logging_text:
3910 :param nsr_deployed: db_nsr._admin.deployed
3913 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
3914 this method will update only the index 2, but it will write on database the concatenated content of the list
3919 ro_nsr_id
= ro_delete_action
= None
3920 if nsr_deployed
and nsr_deployed
.get("RO"):
3921 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
3922 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
3925 stage
[2] = "Deleting ns from VIM."
3926 db_nsr_update
["detailed-status"] = " ".join(stage
)
3927 self
._write
_op
_status
(nslcmop_id
, stage
)
3928 self
.logger
.debug(logging_text
+ stage
[2])
3929 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3930 self
._write
_op
_status
(nslcmop_id
, stage
)
3931 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
3932 ro_delete_action
= desc
["action_id"]
3934 "_admin.deployed.RO.nsr_delete_action_id"
3935 ] = ro_delete_action
3936 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
3937 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
3938 if ro_delete_action
:
3939 # wait until NS is deleted from VIM
3940 stage
[2] = "Waiting ns deleted from VIM."
3941 detailed_status_old
= None
3945 + " RO_id={} ro_delete_action={}".format(
3946 ro_nsr_id
, ro_delete_action
3949 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3950 self
._write
_op
_status
(nslcmop_id
, stage
)
3952 delete_timeout
= 20 * 60 # 20 minutes
3953 while delete_timeout
> 0:
3954 desc
= await self
.RO
.show(
3956 item_id_name
=ro_nsr_id
,
3957 extra_item
="action",
3958 extra_item_id
=ro_delete_action
,
3962 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
3964 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
3965 if ns_status
== "ERROR":
3966 raise ROclient
.ROClientException(ns_status_info
)
3967 elif ns_status
== "BUILD":
3968 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
3969 elif ns_status
== "ACTIVE":
3970 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
3971 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
3976 ), "ROclient.check_action_status returns unknown {}".format(
3979 if stage
[2] != detailed_status_old
:
3980 detailed_status_old
= stage
[2]
3981 db_nsr_update
["detailed-status"] = " ".join(stage
)
3982 self
._write
_op
_status
(nslcmop_id
, stage
)
3983 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3984 await asyncio
.sleep(5, loop
=self
.loop
)
3986 else: # delete_timeout <= 0:
3987 raise ROclient
.ROClientException(
3988 "Timeout waiting ns deleted from VIM"
3991 except Exception as e
:
3992 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3994 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
3996 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
3997 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
3998 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4000 logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
)
4003 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4005 failed_detail
.append("delete conflict: {}".format(e
))
4008 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
)
4011 failed_detail
.append("delete error: {}".format(e
))
4013 logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
)
4017 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
4018 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
4020 stage
[2] = "Deleting nsd from RO."
4021 db_nsr_update
["detailed-status"] = " ".join(stage
)
4022 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4023 self
._write
_op
_status
(nslcmop_id
, stage
)
4024 await self
.RO
.delete("nsd", ro_nsd_id
)
4026 logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
)
4028 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4029 except Exception as e
:
4031 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4033 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4035 logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
)
4038 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4040 failed_detail
.append(
4041 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
)
4043 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4045 failed_detail
.append(
4046 "ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
)
4048 self
.logger
.error(logging_text
+ failed_detail
[-1])
4050 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
4051 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
4052 if not vnf_deployed
or not vnf_deployed
["id"]:
4055 ro_vnfd_id
= vnf_deployed
["id"]
4058 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4059 vnf_deployed
["member-vnf-index"], ro_vnfd_id
4061 db_nsr_update
["detailed-status"] = " ".join(stage
)
4062 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4063 self
._write
_op
_status
(nslcmop_id
, stage
)
4064 await self
.RO
.delete("vnfd", ro_vnfd_id
)
4066 logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
)
4068 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
4069 except Exception as e
:
4071 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4074 "_admin.deployed.RO.vnfd.{}.id".format(index
)
4078 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
)
4081 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4083 failed_detail
.append(
4084 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
)
4086 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4088 failed_detail
.append(
4089 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
)
4091 self
.logger
.error(logging_text
+ failed_detail
[-1])
4094 stage
[2] = "Error deleting from VIM"
4096 stage
[2] = "Deleted from VIM"
4097 db_nsr_update
["detailed-status"] = " ".join(stage
)
4098 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4099 self
._write
_op
_status
(nslcmop_id
, stage
)
4102 raise LcmException("; ".join(failed_detail
))
4104 async def terminate(self
, nsr_id
, nslcmop_id
):
4105 # Try to lock HA task here
4106 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4107 if not task_is_locked_by_me
:
4110 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4111 self
.logger
.debug(logging_text
+ "Enter")
4112 timeout_ns_terminate
= self
.timeout_ns_terminate
4115 operation_params
= None
4117 error_list
= [] # annotates all failed error messages
4118 db_nslcmop_update
= {}
4119 autoremove
= False # autoremove after terminated
4120 tasks_dict_info
= {}
4123 "Stage 1/3: Preparing task.",
4124 "Waiting for previous operations to terminate.",
4127 # ^ contains [stage, step, VIM-status]
4129 # wait for any previous tasks in process
4130 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4132 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4133 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4134 operation_params
= db_nslcmop
.get("operationParams") or {}
4135 if operation_params
.get("timeout_ns_terminate"):
4136 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4137 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4138 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4140 db_nsr_update
["operational-status"] = "terminating"
4141 db_nsr_update
["config-status"] = "terminating"
4142 self
._write
_ns
_status
(
4144 ns_state
="TERMINATING",
4145 current_operation
="TERMINATING",
4146 current_operation_id
=nslcmop_id
,
4147 other_update
=db_nsr_update
,
4149 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4150 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4151 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4154 stage
[1] = "Getting vnf descriptors from db."
4155 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4157 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4159 db_vnfds_from_id
= {}
4160 db_vnfds_from_member_index
= {}
4162 for vnfr
in db_vnfrs_list
:
4163 vnfd_id
= vnfr
["vnfd-id"]
4164 if vnfd_id
not in db_vnfds_from_id
:
4165 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4166 db_vnfds_from_id
[vnfd_id
] = vnfd
4167 db_vnfds_from_member_index
[
4168 vnfr
["member-vnf-index-ref"]
4169 ] = db_vnfds_from_id
[vnfd_id
]
4171 # Destroy individual execution environments when there are terminating primitives.
4172 # Rest of EE will be deleted at once
4173 # TODO - check before calling _destroy_N2VC
4174 # if not operation_params.get("skip_terminate_primitives"):#
4175 # or not vca.get("needed_terminate"):
4176 stage
[0] = "Stage 2/3 execute terminating primitives."
4177 self
.logger
.debug(logging_text
+ stage
[0])
4178 stage
[1] = "Looking execution environment that needs terminate."
4179 self
.logger
.debug(logging_text
+ stage
[1])
4181 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4182 config_descriptor
= None
4183 vca_member_vnf_index
= vca
.get("member-vnf-index")
4184 vca_id
= self
.get_vca_id(
4185 db_vnfrs_dict
.get(vca_member_vnf_index
)
4186 if vca_member_vnf_index
4190 if not vca
or not vca
.get("ee_id"):
4192 if not vca
.get("member-vnf-index"):
4194 config_descriptor
= db_nsr
.get("ns-configuration")
4195 elif vca
.get("vdu_id"):
4196 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4197 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4198 elif vca
.get("kdu_name"):
4199 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4200 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4202 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4203 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4204 vca_type
= vca
.get("type")
4205 exec_terminate_primitives
= not operation_params
.get(
4206 "skip_terminate_primitives"
4207 ) and vca
.get("needed_terminate")
4208 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4209 # pending native charms
4211 True if vca_type
in ("helm", "helm-v3", "native_charm") else False
4213 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4214 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4215 task
= asyncio
.ensure_future(
4223 exec_terminate_primitives
,
4227 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4229 # wait for pending tasks of terminate primitives
4233 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4235 error_list
= await self
._wait
_for
_tasks
(
4238 min(self
.timeout_charm_delete
, timeout_ns_terminate
),
4242 tasks_dict_info
.clear()
4244 return # raise LcmException("; ".join(error_list))
4246 # remove All execution environments at once
4247 stage
[0] = "Stage 3/3 delete all."
4249 if nsr_deployed
.get("VCA"):
4250 stage
[1] = "Deleting all execution environments."
4251 self
.logger
.debug(logging_text
+ stage
[1])
4252 vca_id
= self
.get_vca_id({}, db_nsr
)
4253 task_delete_ee
= asyncio
.ensure_future(
4255 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4256 timeout
=self
.timeout_charm_delete
,
4259 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4260 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4262 # Delete from k8scluster
4263 stage
[1] = "Deleting KDUs."
4264 self
.logger
.debug(logging_text
+ stage
[1])
4265 # print(nsr_deployed)
4266 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4267 if not kdu
or not kdu
.get("kdu-instance"):
4269 kdu_instance
= kdu
.get("kdu-instance")
4270 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4271 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4272 vca_id
= self
.get_vca_id({}, db_nsr
)
4273 task_delete_kdu_instance
= asyncio
.ensure_future(
4274 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4275 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4276 kdu_instance
=kdu_instance
,
4283 + "Unknown k8s deployment type {}".format(
4284 kdu
.get("k8scluster-type")
4289 task_delete_kdu_instance
4290 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4293 stage
[1] = "Deleting ns from VIM."
4295 task_delete_ro
= asyncio
.ensure_future(
4296 self
._terminate
_ng
_ro
(
4297 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4301 task_delete_ro
= asyncio
.ensure_future(
4303 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4306 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4308 # rest of staff will be done at finally
4311 ROclient
.ROClientException
,
4316 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4318 except asyncio
.CancelledError
:
4320 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4322 exc
= "Operation was cancelled"
4323 except Exception as e
:
4324 exc
= traceback
.format_exc()
4325 self
.logger
.critical(
4326 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4331 error_list
.append(str(exc
))
4333 # wait for pending tasks
4335 stage
[1] = "Waiting for terminate pending tasks."
4336 self
.logger
.debug(logging_text
+ stage
[1])
4337 error_list
+= await self
._wait
_for
_tasks
(
4340 timeout_ns_terminate
,
4344 stage
[1] = stage
[2] = ""
4345 except asyncio
.CancelledError
:
4346 error_list
.append("Cancelled")
4347 # TODO cancell all tasks
4348 except Exception as exc
:
4349 error_list
.append(str(exc
))
4350 # update status at database
4352 error_detail
= "; ".join(error_list
)
4353 # self.logger.error(logging_text + error_detail)
4354 error_description_nslcmop
= "{} Detail: {}".format(
4355 stage
[0], error_detail
4357 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4358 nslcmop_id
, stage
[0]
4361 db_nsr_update
["operational-status"] = "failed"
4362 db_nsr_update
["detailed-status"] = (
4363 error_description_nsr
+ " Detail: " + error_detail
4365 db_nslcmop_update
["detailed-status"] = error_detail
4366 nslcmop_operation_state
= "FAILED"
4370 error_description_nsr
= error_description_nslcmop
= None
4371 ns_state
= "NOT_INSTANTIATED"
4372 db_nsr_update
["operational-status"] = "terminated"
4373 db_nsr_update
["detailed-status"] = "Done"
4374 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4375 db_nslcmop_update
["detailed-status"] = "Done"
4376 nslcmop_operation_state
= "COMPLETED"
4379 self
._write
_ns
_status
(
4382 current_operation
="IDLE",
4383 current_operation_id
=None,
4384 error_description
=error_description_nsr
,
4385 error_detail
=error_detail
,
4386 other_update
=db_nsr_update
,
4388 self
._write
_op
_status
(
4391 error_message
=error_description_nslcmop
,
4392 operation_state
=nslcmop_operation_state
,
4393 other_update
=db_nslcmop_update
,
4395 if ns_state
== "NOT_INSTANTIATED":
4399 {"nsr-id-ref": nsr_id
},
4400 {"_admin.nsState": "NOT_INSTANTIATED"},
4402 except DbException
as e
:
4405 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4409 if operation_params
:
4410 autoremove
= operation_params
.get("autoremove", False)
4411 if nslcmop_operation_state
:
4413 await self
.msg
.aiowrite(
4418 "nslcmop_id": nslcmop_id
,
4419 "operationState": nslcmop_operation_state
,
4420 "autoremove": autoremove
,
4424 except Exception as e
:
4426 logging_text
+ "kafka_write notification Exception {}".format(e
)
4429 self
.logger
.debug(logging_text
+ "Exit")
4430 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4432 async def _wait_for_tasks(
4433 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4436 error_detail_list
= []
4438 pending_tasks
= list(created_tasks_info
.keys())
4439 num_tasks
= len(pending_tasks
)
4441 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4442 self
._write
_op
_status
(nslcmop_id
, stage
)
4443 while pending_tasks
:
4445 _timeout
= timeout
+ time_start
- time()
4446 done
, pending_tasks
= await asyncio
.wait(
4447 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4449 num_done
+= len(done
)
4450 if not done
: # Timeout
4451 for task
in pending_tasks
:
4452 new_error
= created_tasks_info
[task
] + ": Timeout"
4453 error_detail_list
.append(new_error
)
4454 error_list
.append(new_error
)
4457 if task
.cancelled():
4460 exc
= task
.exception()
4462 if isinstance(exc
, asyncio
.TimeoutError
):
4464 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4465 error_list
.append(created_tasks_info
[task
])
4466 error_detail_list
.append(new_error
)
4473 ROclient
.ROClientException
,
4479 self
.logger
.error(logging_text
+ new_error
)
4481 exc_traceback
= "".join(
4482 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4486 + created_tasks_info
[task
]
4492 logging_text
+ created_tasks_info
[task
] + ": Done"
4494 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4496 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4497 if nsr_id
: # update also nsr
4502 "errorDescription": "Error at: " + ", ".join(error_list
),
4503 "errorDetail": ". ".join(error_detail_list
),
4506 self
._write
_op
_status
(nslcmop_id
, stage
)
4507 return error_detail_list
4510 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4512 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4513 The default-value is used. If it is between < > it look for a value at instantiation_params
4514 :param primitive_desc: portion of VNFD/NSD that describes primitive
4515 :param params: Params provided by user
4516 :param instantiation_params: Instantiation params provided by user
4517 :return: a dictionary with the calculated params
4519 calculated_params
= {}
4520 for parameter
in primitive_desc
.get("parameter", ()):
4521 param_name
= parameter
["name"]
4522 if param_name
in params
:
4523 calculated_params
[param_name
] = params
[param_name
]
4524 elif "default-value" in parameter
or "value" in parameter
:
4525 if "value" in parameter
:
4526 calculated_params
[param_name
] = parameter
["value"]
4528 calculated_params
[param_name
] = parameter
["default-value"]
4530 isinstance(calculated_params
[param_name
], str)
4531 and calculated_params
[param_name
].startswith("<")
4532 and calculated_params
[param_name
].endswith(">")
4534 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4535 calculated_params
[param_name
] = instantiation_params
[
4536 calculated_params
[param_name
][1:-1]
4540 "Parameter {} needed to execute primitive {} not provided".format(
4541 calculated_params
[param_name
], primitive_desc
["name"]
4546 "Parameter {} needed to execute primitive {} not provided".format(
4547 param_name
, primitive_desc
["name"]
4551 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4552 calculated_params
[param_name
] = yaml
.safe_dump(
4553 calculated_params
[param_name
], default_flow_style
=True, width
=256
4555 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4557 ].startswith("!!yaml "):
4558 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4559 if parameter
.get("data-type") == "INTEGER":
4561 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4562 except ValueError: # error converting string to int
4564 "Parameter {} of primitive {} must be integer".format(
4565 param_name
, primitive_desc
["name"]
4568 elif parameter
.get("data-type") == "BOOLEAN":
4569 calculated_params
[param_name
] = not (
4570 (str(calculated_params
[param_name
])).lower() == "false"
4573 # add always ns_config_info if primitive name is config
4574 if primitive_desc
["name"] == "config":
4575 if "ns_config_info" in instantiation_params
:
4576 calculated_params
["ns_config_info"] = instantiation_params
[
4579 return calculated_params
4581 def _look_for_deployed_vca(
4588 ee_descriptor_id
=None,
4590 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4591 for vca
in deployed_vca
:
4594 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4597 vdu_count_index
is not None
4598 and vdu_count_index
!= vca
["vdu_count_index"]
4601 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4603 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4607 # vca_deployed not found
4609 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4610 " is not deployed".format(
4619 ee_id
= vca
.get("ee_id")
4621 "type", "lxc_proxy_charm"
4622 ) # default value for backward compatibility - proxy charm
4625 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4626 "execution environment".format(
4627 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
4630 return ee_id
, vca_type
4632 async def _ns_execute_primitive(
4638 retries_interval
=30,
4645 if primitive
== "config":
4646 primitive_params
= {"params": primitive_params
}
4648 vca_type
= vca_type
or "lxc_proxy_charm"
4652 output
= await asyncio
.wait_for(
4653 self
.vca_map
[vca_type
].exec_primitive(
4655 primitive_name
=primitive
,
4656 params_dict
=primitive_params
,
4657 progress_timeout
=self
.timeout_progress_primitive
,
4658 total_timeout
=self
.timeout_primitive
,
4663 timeout
=timeout
or self
.timeout_primitive
,
4667 except asyncio
.CancelledError
:
4669 except Exception as e
: # asyncio.TimeoutError
4670 if isinstance(e
, asyncio
.TimeoutError
):
4675 "Error executing action {} on {} -> {}".format(
4680 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
4682 return "FAILED", str(e
)
4684 return "COMPLETED", output
4686 except (LcmException
, asyncio
.CancelledError
):
4688 except Exception as e
:
4689 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
4691 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
4693 Updating the vca_status with latest juju information in nsrs record
4694 :param: nsr_id: Id of the nsr
4695 :param: nslcmop_id: Id of the nslcmop
4699 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
4700 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4701 vca_id
= self
.get_vca_id({}, db_nsr
)
4702 if db_nsr
["_admin"]["deployed"]["K8s"]:
4703 for _
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
4704 cluster_uuid
, kdu_instance
, cluster_type
= (
4705 k8s
["k8scluster-uuid"],
4706 k8s
["kdu-instance"],
4707 k8s
["k8scluster-type"],
4709 await self
._on
_update
_k
8s
_db
(
4710 cluster_uuid
=cluster_uuid
,
4711 kdu_instance
=kdu_instance
,
4712 filter={"_id": nsr_id
},
4714 cluster_type
=cluster_type
,
4717 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
4718 table
, filter = "nsrs", {"_id": nsr_id
}
4719 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
4720 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
4722 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
4723 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
4725 async def action(self
, nsr_id
, nslcmop_id
):
4726 # Try to lock HA task here
4727 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4728 if not task_is_locked_by_me
:
4731 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
4732 self
.logger
.debug(logging_text
+ "Enter")
4733 # get all needed from database
4737 db_nslcmop_update
= {}
4738 nslcmop_operation_state
= None
4739 error_description_nslcmop
= None
4742 # wait for any previous tasks in process
4743 step
= "Waiting for previous operations to terminate"
4744 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4746 self
._write
_ns
_status
(
4749 current_operation
="RUNNING ACTION",
4750 current_operation_id
=nslcmop_id
,
4753 step
= "Getting information from database"
4754 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4755 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4756 if db_nslcmop
["operationParams"].get("primitive_params"):
4757 db_nslcmop
["operationParams"]["primitive_params"] = json
.loads(
4758 db_nslcmop
["operationParams"]["primitive_params"]
4761 nsr_deployed
= db_nsr
["_admin"].get("deployed")
4762 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
4763 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
4764 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
4765 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
4766 primitive
= db_nslcmop
["operationParams"]["primitive"]
4767 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
4768 timeout_ns_action
= db_nslcmop
["operationParams"].get(
4769 "timeout_ns_action", self
.timeout_primitive
4773 step
= "Getting vnfr from database"
4774 db_vnfr
= self
.db
.get_one(
4775 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
4777 if db_vnfr
.get("kdur"):
4779 for kdur
in db_vnfr
["kdur"]:
4780 if kdur
.get("additionalParams"):
4781 kdur
["additionalParams"] = json
.loads(
4782 kdur
["additionalParams"]
4784 kdur_list
.append(kdur
)
4785 db_vnfr
["kdur"] = kdur_list
4786 step
= "Getting vnfd from database"
4787 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
4789 step
= "Getting nsd from database"
4790 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
4792 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
4793 # for backward compatibility
4794 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
4795 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
4796 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
4797 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4799 # look for primitive
4800 config_primitive_desc
= descriptor_configuration
= None
4802 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
4804 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
4806 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
4808 descriptor_configuration
= db_nsd
.get("ns-configuration")
4810 if descriptor_configuration
and descriptor_configuration
.get(
4813 for config_primitive
in descriptor_configuration
["config-primitive"]:
4814 if config_primitive
["name"] == primitive
:
4815 config_primitive_desc
= config_primitive
4818 if not config_primitive_desc
:
4819 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
4821 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
4825 primitive_name
= primitive
4826 ee_descriptor_id
= None
4828 primitive_name
= config_primitive_desc
.get(
4829 "execution-environment-primitive", primitive
4831 ee_descriptor_id
= config_primitive_desc
.get(
4832 "execution-environment-ref"
4838 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
4840 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
4843 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
4845 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
4847 desc_params
= parse_yaml_strings(
4848 db_vnfr
.get("additionalParamsForVnf")
4851 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
4852 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
4853 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
4855 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
4856 actions
.add(primitive
["name"])
4857 for primitive
in kdu_configuration
.get("config-primitive", []):
4858 actions
.add(primitive
["name"])
4859 kdu_action
= True if primitive_name
in actions
else False
4861 # TODO check if ns is in a proper status
4863 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
4865 # kdur and desc_params already set from before
4866 if primitive_params
:
4867 desc_params
.update(primitive_params
)
4868 # TODO Check if we will need something at vnf level
4869 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
4871 kdu_name
== kdu
["kdu-name"]
4872 and kdu
["member-vnf-index"] == vnf_index
4877 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
4880 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
4881 msg
= "unknown k8scluster-type '{}'".format(
4882 kdu
.get("k8scluster-type")
4884 raise LcmException(msg
)
4887 "collection": "nsrs",
4888 "filter": {"_id": nsr_id
},
4889 "path": "_admin.deployed.K8s.{}".format(index
),
4893 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
4895 step
= "Executing kdu {}".format(primitive_name
)
4896 if primitive_name
== "upgrade":
4897 if desc_params
.get("kdu_model"):
4898 kdu_model
= desc_params
.get("kdu_model")
4899 del desc_params
["kdu_model"]
4901 kdu_model
= kdu
.get("kdu-model")
4902 parts
= kdu_model
.split(sep
=":")
4904 kdu_model
= parts
[0]
4906 detailed_status
= await asyncio
.wait_for(
4907 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
4908 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4909 kdu_instance
=kdu
.get("kdu-instance"),
4911 kdu_model
=kdu_model
,
4914 timeout
=timeout_ns_action
,
4916 timeout
=timeout_ns_action
+ 10,
4919 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
4921 elif primitive_name
== "rollback":
4922 detailed_status
= await asyncio
.wait_for(
4923 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
4924 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4925 kdu_instance
=kdu
.get("kdu-instance"),
4928 timeout
=timeout_ns_action
,
4930 elif primitive_name
== "status":
4931 detailed_status
= await asyncio
.wait_for(
4932 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
4933 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4934 kdu_instance
=kdu
.get("kdu-instance"),
4937 timeout
=timeout_ns_action
,
4940 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
4941 kdu
["kdu-name"], nsr_id
4943 params
= self
._map
_primitive
_params
(
4944 config_primitive_desc
, primitive_params
, desc_params
4947 detailed_status
= await asyncio
.wait_for(
4948 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
4949 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4950 kdu_instance
=kdu_instance
,
4951 primitive_name
=primitive_name
,
4954 timeout
=timeout_ns_action
,
4957 timeout
=timeout_ns_action
,
4961 nslcmop_operation_state
= "COMPLETED"
4963 detailed_status
= ""
4964 nslcmop_operation_state
= "FAILED"
4966 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
4967 nsr_deployed
["VCA"],
4968 member_vnf_index
=vnf_index
,
4970 vdu_count_index
=vdu_count_index
,
4971 ee_descriptor_id
=ee_descriptor_id
,
4973 for vca_index
, vca_deployed
in enumerate(
4974 db_nsr
["_admin"]["deployed"]["VCA"]
4976 if vca_deployed
.get("member-vnf-index") == vnf_index
:
4978 "collection": "nsrs",
4979 "filter": {"_id": nsr_id
},
4980 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
4984 nslcmop_operation_state
,
4986 ) = await self
._ns
_execute
_primitive
(
4988 primitive
=primitive_name
,
4989 primitive_params
=self
._map
_primitive
_params
(
4990 config_primitive_desc
, primitive_params
, desc_params
4992 timeout
=timeout_ns_action
,
4998 db_nslcmop_update
["detailed-status"] = detailed_status
4999 error_description_nslcmop
= (
5000 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
5004 + " task Done with result {} {}".format(
5005 nslcmop_operation_state
, detailed_status
5008 return # database update is called inside finally
5010 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5011 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5013 except asyncio
.CancelledError
:
5015 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5017 exc
= "Operation was cancelled"
5018 except asyncio
.TimeoutError
:
5019 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5021 except Exception as e
:
5022 exc
= traceback
.format_exc()
5023 self
.logger
.critical(
5024 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5033 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5034 nslcmop_operation_state
= "FAILED"
5036 self
._write
_ns
_status
(
5040 ], # TODO check if degraded. For the moment use previous status
5041 current_operation
="IDLE",
5042 current_operation_id
=None,
5043 # error_description=error_description_nsr,
5044 # error_detail=error_detail,
5045 other_update
=db_nsr_update
,
5048 self
._write
_op
_status
(
5051 error_message
=error_description_nslcmop
,
5052 operation_state
=nslcmop_operation_state
,
5053 other_update
=db_nslcmop_update
,
5056 if nslcmop_operation_state
:
5058 await self
.msg
.aiowrite(
5063 "nslcmop_id": nslcmop_id
,
5064 "operationState": nslcmop_operation_state
,
5068 except Exception as e
:
5070 logging_text
+ "kafka_write notification Exception {}".format(e
)
5072 self
.logger
.debug(logging_text
+ "Exit")
5073 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5074 return nslcmop_operation_state
, detailed_status
5076 async def scale(self
, nsr_id
, nslcmop_id
):
5077 # Try to lock HA task here
5078 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5079 if not task_is_locked_by_me
:
5082 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
5083 stage
= ["", "", ""]
5084 tasks_dict_info
= {}
5085 # ^ stage, step, VIM progress
5086 self
.logger
.debug(logging_text
+ "Enter")
5087 # get all needed from database
5089 db_nslcmop_update
= {}
5092 # in case of error, indicates what part of scale was failed to put nsr at error status
5093 scale_process
= None
5094 old_operational_status
= ""
5095 old_config_status
= ""
5098 # wait for any previous tasks in process
5099 step
= "Waiting for previous operations to terminate"
5100 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5101 self
._write
_ns
_status
(
5104 current_operation
="SCALING",
5105 current_operation_id
=nslcmop_id
,
5108 step
= "Getting nslcmop from database"
5110 step
+ " after having waited for previous tasks to be completed"
5112 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5114 step
= "Getting nsr from database"
5115 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5116 old_operational_status
= db_nsr
["operational-status"]
5117 old_config_status
= db_nsr
["config-status"]
5119 step
= "Parsing scaling parameters"
5120 db_nsr_update
["operational-status"] = "scaling"
5121 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5122 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5124 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
5126 ]["member-vnf-index"]
5127 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
5129 ]["scaling-group-descriptor"]
5130 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
5131 # for backward compatibility
5132 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5133 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5134 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5135 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5137 step
= "Getting vnfr from database"
5138 db_vnfr
= self
.db
.get_one(
5139 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
5142 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5144 step
= "Getting vnfd from database"
5145 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
5147 base_folder
= db_vnfd
["_admin"]["storage"]
5149 step
= "Getting scaling-group-descriptor"
5150 scaling_descriptor
= find_in_list(
5151 get_scaling_aspect(db_vnfd
),
5152 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
5154 if not scaling_descriptor
:
5156 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
5157 "at vnfd:scaling-group-descriptor".format(scaling_group
)
5160 step
= "Sending scale order to VIM"
5161 # TODO check if ns is in a proper status
5163 if not db_nsr
["_admin"].get("scaling-group"):
5168 "_admin.scaling-group": [
5169 {"name": scaling_group
, "nb-scale-op": 0}
5173 admin_scale_index
= 0
5175 for admin_scale_index
, admin_scale_info
in enumerate(
5176 db_nsr
["_admin"]["scaling-group"]
5178 if admin_scale_info
["name"] == scaling_group
:
5179 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
5181 else: # not found, set index one plus last element and add new entry with the name
5182 admin_scale_index
+= 1
5184 "_admin.scaling-group.{}.name".format(admin_scale_index
)
5187 vca_scaling_info
= []
5188 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
5189 if scaling_type
== "SCALE_OUT":
5190 if "aspect-delta-details" not in scaling_descriptor
:
5192 "Aspect delta details not fount in scaling descriptor {}".format(
5193 scaling_descriptor
["name"]
5196 # count if max-instance-count is reached
5197 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
5199 scaling_info
["scaling_direction"] = "OUT"
5200 scaling_info
["vdu-create"] = {}
5201 scaling_info
["kdu-create"] = {}
5202 for delta
in deltas
:
5203 for vdu_delta
in delta
.get("vdu-delta", {}):
5204 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
5205 # vdu_index also provides the number of instance of the targeted vdu
5206 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
5207 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
5211 additional_params
= (
5212 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
5215 cloud_init_list
= []
5217 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
5218 max_instance_count
= 10
5219 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
5220 max_instance_count
= vdu_profile
.get(
5221 "max-number-of-instances", 10
5224 default_instance_num
= get_number_of_instances(
5227 instances_number
= vdu_delta
.get("number-of-instances", 1)
5228 nb_scale_op
+= instances_number
5230 new_instance_count
= nb_scale_op
+ default_instance_num
5231 # Control if new count is over max and vdu count is less than max.
5232 # Then assign new instance count
5233 if new_instance_count
> max_instance_count
> vdu_count
:
5234 instances_number
= new_instance_count
- max_instance_count
5236 instances_number
= instances_number
5238 if new_instance_count
> max_instance_count
:
5240 "reached the limit of {} (max-instance-count) "
5241 "scaling-out operations for the "
5242 "scaling-group-descriptor '{}'".format(
5243 nb_scale_op
, scaling_group
5246 for x
in range(vdu_delta
.get("number-of-instances", 1)):
5248 # TODO Information of its own ip is not available because db_vnfr is not updated.
5249 additional_params
["OSM"] = get_osm_params(
5250 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
5252 cloud_init_list
.append(
5253 self
._parse
_cloud
_init
(
5260 vca_scaling_info
.append(
5262 "osm_vdu_id": vdu_delta
["id"],
5263 "member-vnf-index": vnf_index
,
5265 "vdu_index": vdu_index
+ x
,
5268 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
5269 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
5270 kdu_profile
= get_kdu_profile(db_vnfd
, kdu_delta
["id"])
5271 kdu_name
= kdu_profile
["kdu-name"]
5272 resource_name
= kdu_profile
["resource-name"]
5274 # Might have different kdus in the same delta
5275 # Should have list for each kdu
5276 if not scaling_info
["kdu-create"].get(kdu_name
, None):
5277 scaling_info
["kdu-create"][kdu_name
] = []
5279 kdur
= get_kdur(db_vnfr
, kdu_name
)
5280 if kdur
.get("helm-chart"):
5281 k8s_cluster_type
= "helm-chart-v3"
5282 self
.logger
.debug("kdur: {}".format(kdur
))
5284 kdur
.get("helm-version")
5285 and kdur
.get("helm-version") == "v2"
5287 k8s_cluster_type
= "helm-chart"
5288 raise NotImplementedError
5289 elif kdur
.get("juju-bundle"):
5290 k8s_cluster_type
= "juju-bundle"
5293 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5294 "juju-bundle. Maybe an old NBI version is running".format(
5295 db_vnfr
["member-vnf-index-ref"], kdu_name
5299 max_instance_count
= 10
5300 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
5301 max_instance_count
= kdu_profile
.get(
5302 "max-number-of-instances", 10
5305 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
5306 deployed_kdu
, _
= get_deployed_kdu(
5307 nsr_deployed
, kdu_name
, vnf_index
5309 if deployed_kdu
is None:
5311 "KDU '{}' for vnf '{}' not deployed".format(
5315 kdu_instance
= deployed_kdu
.get("kdu-instance")
5316 instance_num
= await self
.k8scluster_map
[
5318 ].get_scale_count(resource_name
, kdu_instance
, vca_id
=vca_id
)
5319 kdu_replica_count
= instance_num
+ kdu_delta
.get(
5320 "number-of-instances", 1
5323 # Control if new count is over max and instance_num is less than max.
5324 # Then assign max instance number to kdu replica count
5325 if kdu_replica_count
> max_instance_count
> instance_num
:
5326 kdu_replica_count
= max_instance_count
5327 if kdu_replica_count
> max_instance_count
:
5329 "reached the limit of {} (max-instance-count) "
5330 "scaling-out operations for the "
5331 "scaling-group-descriptor '{}'".format(
5332 instance_num
, scaling_group
5336 for x
in range(kdu_delta
.get("number-of-instances", 1)):
5337 vca_scaling_info
.append(
5339 "osm_kdu_id": kdu_name
,
5340 "member-vnf-index": vnf_index
,
5342 "kdu_index": instance_num
+ x
- 1,
5345 scaling_info
["kdu-create"][kdu_name
].append(
5347 "member-vnf-index": vnf_index
,
5349 "k8s-cluster-type": k8s_cluster_type
,
5350 "resource-name": resource_name
,
5351 "scale": kdu_replica_count
,
5354 elif scaling_type
== "SCALE_IN":
5355 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
5357 scaling_info
["scaling_direction"] = "IN"
5358 scaling_info
["vdu-delete"] = {}
5359 scaling_info
["kdu-delete"] = {}
5361 for delta
in deltas
:
5362 for vdu_delta
in delta
.get("vdu-delta", {}):
5363 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
5364 min_instance_count
= 0
5365 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
5366 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
5367 min_instance_count
= vdu_profile
["min-number-of-instances"]
5369 default_instance_num
= get_number_of_instances(
5370 db_vnfd
, vdu_delta
["id"]
5372 instance_num
= vdu_delta
.get("number-of-instances", 1)
5373 nb_scale_op
-= instance_num
5375 new_instance_count
= nb_scale_op
+ default_instance_num
5377 if new_instance_count
< min_instance_count
< vdu_count
:
5378 instances_number
= min_instance_count
- new_instance_count
5380 instances_number
= instance_num
5382 if new_instance_count
< min_instance_count
:
5384 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5385 "scaling-group-descriptor '{}'".format(
5386 nb_scale_op
, scaling_group
5389 for x
in range(vdu_delta
.get("number-of-instances", 1)):
5390 vca_scaling_info
.append(
5392 "osm_vdu_id": vdu_delta
["id"],
5393 "member-vnf-index": vnf_index
,
5395 "vdu_index": vdu_index
- 1 - x
,
5398 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
5399 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
5400 kdu_profile
= get_kdu_profile(db_vnfd
, kdu_delta
["id"])
5401 kdu_name
= kdu_profile
["kdu-name"]
5402 resource_name
= kdu_profile
["resource-name"]
5404 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
5405 scaling_info
["kdu-delete"][kdu_name
] = []
5407 kdur
= get_kdur(db_vnfr
, kdu_name
)
5408 if kdur
.get("helm-chart"):
5409 k8s_cluster_type
= "helm-chart-v3"
5410 self
.logger
.debug("kdur: {}".format(kdur
))
5412 kdur
.get("helm-version")
5413 and kdur
.get("helm-version") == "v2"
5415 k8s_cluster_type
= "helm-chart"
5416 raise NotImplementedError
5417 elif kdur
.get("juju-bundle"):
5418 k8s_cluster_type
= "juju-bundle"
5421 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5422 "juju-bundle. Maybe an old NBI version is running".format(
5423 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
5427 min_instance_count
= 0
5428 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
5429 min_instance_count
= kdu_profile
["min-number-of-instances"]
5431 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
5432 deployed_kdu
, _
= get_deployed_kdu(
5433 nsr_deployed
, kdu_name
, vnf_index
5435 if deployed_kdu
is None:
5437 "KDU '{}' for vnf '{}' not deployed".format(
5441 kdu_instance
= deployed_kdu
.get("kdu-instance")
5442 instance_num
= await self
.k8scluster_map
[
5444 ].get_scale_count(resource_name
, kdu_instance
, vca_id
=vca_id
)
5445 kdu_replica_count
= instance_num
- kdu_delta
.get(
5446 "number-of-instances", 1
5449 if kdu_replica_count
< min_instance_count
< instance_num
:
5450 kdu_replica_count
= min_instance_count
5451 if kdu_replica_count
< min_instance_count
:
5453 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5454 "scaling-group-descriptor '{}'".format(
5455 instance_num
, scaling_group
5459 for x
in range(kdu_delta
.get("number-of-instances", 1)):
5460 vca_scaling_info
.append(
5462 "osm_kdu_id": kdu_name
,
5463 "member-vnf-index": vnf_index
,
5465 "kdu_index": instance_num
- x
- 1,
5468 scaling_info
["kdu-delete"][kdu_name
].append(
5470 "member-vnf-index": vnf_index
,
5472 "k8s-cluster-type": k8s_cluster_type
,
5473 "resource-name": resource_name
,
5474 "scale": kdu_replica_count
,
5478 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
5479 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
5480 if scaling_info
["scaling_direction"] == "IN":
5481 for vdur
in reversed(db_vnfr
["vdur"]):
5482 if vdu_delete
.get(vdur
["vdu-id-ref"]):
5483 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
5484 scaling_info
["vdu"].append(
5486 "name": vdur
.get("name") or vdur
.get("vdu-name"),
5487 "vdu_id": vdur
["vdu-id-ref"],
5491 for interface
in vdur
["interfaces"]:
5492 scaling_info
["vdu"][-1]["interface"].append(
5494 "name": interface
["name"],
5495 "ip_address": interface
["ip-address"],
5496 "mac_address": interface
.get("mac-address"),
5499 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
5502 step
= "Executing pre-scale vnf-config-primitive"
5503 if scaling_descriptor
.get("scaling-config-action"):
5504 for scaling_config_action
in scaling_descriptor
[
5505 "scaling-config-action"
5508 scaling_config_action
.get("trigger") == "pre-scale-in"
5509 and scaling_type
== "SCALE_IN"
5511 scaling_config_action
.get("trigger") == "pre-scale-out"
5512 and scaling_type
== "SCALE_OUT"
5514 vnf_config_primitive
= scaling_config_action
[
5515 "vnf-config-primitive-name-ref"
5517 step
= db_nslcmop_update
[
5519 ] = "executing pre-scale scaling-config-action '{}'".format(
5520 vnf_config_primitive
5523 # look for primitive
5524 for config_primitive
in (
5525 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
5526 ).get("config-primitive", ()):
5527 if config_primitive
["name"] == vnf_config_primitive
:
5531 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
5532 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
5533 "primitive".format(scaling_group
, vnf_config_primitive
)
5536 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
5537 if db_vnfr
.get("additionalParamsForVnf"):
5538 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
5540 scale_process
= "VCA"
5541 db_nsr_update
["config-status"] = "configuring pre-scaling"
5542 primitive_params
= self
._map
_primitive
_params
(
5543 config_primitive
, {}, vnfr_params
5546 # Pre-scale retry check: Check if this sub-operation has been executed before
5547 op_index
= self
._check
_or
_add
_scale
_suboperation
(
5550 vnf_config_primitive
,
5554 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
5555 # Skip sub-operation
5556 result
= "COMPLETED"
5557 result_detail
= "Done"
5560 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5561 vnf_config_primitive
, result
, result_detail
5565 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
5566 # New sub-operation: Get index of this sub-operation
5568 len(db_nslcmop
.get("_admin", {}).get("operations"))
5573 + "vnf_config_primitive={} New sub-operation".format(
5574 vnf_config_primitive
5578 # retry: Get registered params for this existing sub-operation
5579 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
5582 vnf_index
= op
.get("member_vnf_index")
5583 vnf_config_primitive
= op
.get("primitive")
5584 primitive_params
= op
.get("primitive_params")
5587 + "vnf_config_primitive={} Sub-operation retry".format(
5588 vnf_config_primitive
5591 # Execute the primitive, either with new (first-time) or registered (reintent) args
5592 ee_descriptor_id
= config_primitive
.get(
5593 "execution-environment-ref"
5595 primitive_name
= config_primitive
.get(
5596 "execution-environment-primitive", vnf_config_primitive
5598 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5599 nsr_deployed
["VCA"],
5600 member_vnf_index
=vnf_index
,
5602 vdu_count_index
=None,
5603 ee_descriptor_id
=ee_descriptor_id
,
5605 result
, result_detail
= await self
._ns
_execute
_primitive
(
5614 + "vnf_config_primitive={} Done with result {} {}".format(
5615 vnf_config_primitive
, result
, result_detail
5618 # Update operationState = COMPLETED | FAILED
5619 self
._update
_suboperation
_status
(
5620 db_nslcmop
, op_index
, result
, result_detail
5623 if result
== "FAILED":
5624 raise LcmException(result_detail
)
5625 db_nsr_update
["config-status"] = old_config_status
5626 scale_process
= None
5630 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
5633 "_admin.scaling-group.{}.time".format(admin_scale_index
)
5636 # SCALE-IN VCA - BEGIN
5637 if vca_scaling_info
:
5638 step
= db_nslcmop_update
[
5640 ] = "Deleting the execution environments"
5641 scale_process
= "VCA"
5642 for vca_info
in vca_scaling_info
:
5643 if vca_info
["type"] == "delete":
5644 member_vnf_index
= str(vca_info
["member-vnf-index"])
5646 logging_text
+ "vdu info: {}".format(vca_info
)
5648 if vca_info
.get("osm_vdu_id"):
5649 vdu_id
= vca_info
["osm_vdu_id"]
5650 vdu_index
= int(vca_info
["vdu_index"])
5653 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5654 member_vnf_index
, vdu_id
, vdu_index
5658 kdu_id
= vca_info
["osm_kdu_id"]
5661 ] = "Scaling member_vnf_index={}, kdu_id={}, vdu_index={} ".format(
5662 member_vnf_index
, kdu_id
, vdu_index
5664 stage
[2] = step
= "Scaling in VCA"
5665 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
5666 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
5667 config_update
= db_nsr
["configurationStatus"]
5668 for vca_index
, vca
in enumerate(vca_update
):
5670 (vca
or vca
.get("ee_id"))
5671 and vca
["member-vnf-index"] == member_vnf_index
5672 and vca
["vdu_count_index"] == vdu_index
5674 if vca
.get("vdu_id"):
5675 config_descriptor
= get_configuration(
5676 db_vnfd
, vca
.get("vdu_id")
5678 elif vca
.get("kdu_name"):
5679 config_descriptor
= get_configuration(
5680 db_vnfd
, vca
.get("kdu_name")
5683 config_descriptor
= get_configuration(
5684 db_vnfd
, db_vnfd
["id"]
5686 operation_params
= (
5687 db_nslcmop
.get("operationParams") or {}
5689 exec_terminate_primitives
= not operation_params
.get(
5690 "skip_terminate_primitives"
5691 ) and vca
.get("needed_terminate")
5692 task
= asyncio
.ensure_future(
5701 exec_primitives
=exec_terminate_primitives
,
5705 timeout
=self
.timeout_charm_delete
,
5708 tasks_dict_info
[task
] = "Terminating VCA {}".format(
5711 del vca_update
[vca_index
]
5712 del config_update
[vca_index
]
5713 # wait for pending tasks of terminate primitives
5717 + "Waiting for tasks {}".format(
5718 list(tasks_dict_info
.keys())
5721 error_list
= await self
._wait
_for
_tasks
(
5725 self
.timeout_charm_delete
, self
.timeout_ns_terminate
5730 tasks_dict_info
.clear()
5732 raise LcmException("; ".join(error_list
))
5734 db_vca_and_config_update
= {
5735 "_admin.deployed.VCA": vca_update
,
5736 "configurationStatus": config_update
,
5739 "nsrs", db_nsr
["_id"], db_vca_and_config_update
5741 scale_process
= None
5742 # SCALE-IN VCA - END
5745 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
5746 scale_process
= "RO"
5747 if self
.ro_config
.get("ng"):
5748 await self
._scale
_ng
_ro
(
5749 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
5751 scaling_info
.pop("vdu-create", None)
5752 scaling_info
.pop("vdu-delete", None)
5754 scale_process
= None
5758 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
5759 scale_process
= "KDU"
5760 await self
._scale
_kdu
(
5761 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
5763 scaling_info
.pop("kdu-create", None)
5764 scaling_info
.pop("kdu-delete", None)
5766 scale_process
= None
5770 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5772 # SCALE-UP VCA - BEGIN
5773 if vca_scaling_info
:
5774 step
= db_nslcmop_update
[
5776 ] = "Creating new execution environments"
5777 scale_process
= "VCA"
5778 for vca_info
in vca_scaling_info
:
5779 if vca_info
["type"] == "create":
5780 member_vnf_index
= str(vca_info
["member-vnf-index"])
5782 logging_text
+ "vdu info: {}".format(vca_info
)
5784 vnfd_id
= db_vnfr
["vnfd-ref"]
5785 if vca_info
.get("osm_vdu_id"):
5786 vdu_index
= int(vca_info
["vdu_index"])
5787 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
5788 if db_vnfr
.get("additionalParamsForVnf"):
5789 deploy_params
.update(
5791 db_vnfr
["additionalParamsForVnf"].copy()
5794 descriptor_config
= get_configuration(
5795 db_vnfd
, db_vnfd
["id"]
5797 if descriptor_config
:
5802 logging_text
=logging_text
5803 + "member_vnf_index={} ".format(member_vnf_index
),
5806 nslcmop_id
=nslcmop_id
,
5812 member_vnf_index
=member_vnf_index
,
5813 vdu_index
=vdu_index
,
5815 deploy_params
=deploy_params
,
5816 descriptor_config
=descriptor_config
,
5817 base_folder
=base_folder
,
5818 task_instantiation_info
=tasks_dict_info
,
5821 vdu_id
= vca_info
["osm_vdu_id"]
5822 vdur
= find_in_list(
5823 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
5825 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
5826 if vdur
.get("additionalParams"):
5827 deploy_params_vdu
= parse_yaml_strings(
5828 vdur
["additionalParams"]
5831 deploy_params_vdu
= deploy_params
5832 deploy_params_vdu
["OSM"] = get_osm_params(
5833 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
5835 if descriptor_config
:
5840 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5841 member_vnf_index
, vdu_id
, vdu_index
5843 stage
[2] = step
= "Scaling out VCA"
5844 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
5846 logging_text
=logging_text
5847 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5848 member_vnf_index
, vdu_id
, vdu_index
5852 nslcmop_id
=nslcmop_id
,
5858 member_vnf_index
=member_vnf_index
,
5859 vdu_index
=vdu_index
,
5861 deploy_params
=deploy_params_vdu
,
5862 descriptor_config
=descriptor_config
,
5863 base_folder
=base_folder
,
5864 task_instantiation_info
=tasks_dict_info
,
5868 kdu_name
= vca_info
["osm_kdu_id"]
5869 descriptor_config
= get_configuration(db_vnfd
, kdu_name
)
5870 if descriptor_config
:
5872 kdu_index
= int(vca_info
["kdu_index"])
5876 for x
in db_vnfr
["kdur"]
5877 if x
["kdu-name"] == kdu_name
5879 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
5880 if kdur
.get("additionalParams"):
5881 deploy_params_kdu
= parse_yaml_strings(
5882 kdur
["additionalParams"]
5886 logging_text
=logging_text
,
5889 nslcmop_id
=nslcmop_id
,
5895 member_vnf_index
=member_vnf_index
,
5896 vdu_index
=kdu_index
,
5898 deploy_params
=deploy_params_kdu
,
5899 descriptor_config
=descriptor_config
,
5900 base_folder
=base_folder
,
5901 task_instantiation_info
=tasks_dict_info
,
5904 # SCALE-UP VCA - END
5905 scale_process
= None
5908 # execute primitive service POST-SCALING
5909 step
= "Executing post-scale vnf-config-primitive"
5910 if scaling_descriptor
.get("scaling-config-action"):
5911 for scaling_config_action
in scaling_descriptor
[
5912 "scaling-config-action"
5915 scaling_config_action
.get("trigger") == "post-scale-in"
5916 and scaling_type
== "SCALE_IN"
5918 scaling_config_action
.get("trigger") == "post-scale-out"
5919 and scaling_type
== "SCALE_OUT"
5921 vnf_config_primitive
= scaling_config_action
[
5922 "vnf-config-primitive-name-ref"
5924 step
= db_nslcmop_update
[
5926 ] = "executing post-scale scaling-config-action '{}'".format(
5927 vnf_config_primitive
5930 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
5931 if db_vnfr
.get("additionalParamsForVnf"):
5932 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
5934 # look for primitive
5935 for config_primitive
in (
5936 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
5937 ).get("config-primitive", ()):
5938 if config_primitive
["name"] == vnf_config_primitive
:
5942 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
5943 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
5944 "config-primitive".format(
5945 scaling_group
, vnf_config_primitive
5948 scale_process
= "VCA"
5949 db_nsr_update
["config-status"] = "configuring post-scaling"
5950 primitive_params
= self
._map
_primitive
_params
(
5951 config_primitive
, {}, vnfr_params
5954 # Post-scale retry check: Check if this sub-operation has been executed before
5955 op_index
= self
._check
_or
_add
_scale
_suboperation
(
5958 vnf_config_primitive
,
5962 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
5963 # Skip sub-operation
5964 result
= "COMPLETED"
5965 result_detail
= "Done"
5968 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5969 vnf_config_primitive
, result
, result_detail
5973 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
5974 # New sub-operation: Get index of this sub-operation
5976 len(db_nslcmop
.get("_admin", {}).get("operations"))
5981 + "vnf_config_primitive={} New sub-operation".format(
5982 vnf_config_primitive
5986 # retry: Get registered params for this existing sub-operation
5987 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
5990 vnf_index
= op
.get("member_vnf_index")
5991 vnf_config_primitive
= op
.get("primitive")
5992 primitive_params
= op
.get("primitive_params")
5995 + "vnf_config_primitive={} Sub-operation retry".format(
5996 vnf_config_primitive
5999 # Execute the primitive, either with new (first-time) or registered (reintent) args
6000 ee_descriptor_id
= config_primitive
.get(
6001 "execution-environment-ref"
6003 primitive_name
= config_primitive
.get(
6004 "execution-environment-primitive", vnf_config_primitive
6006 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6007 nsr_deployed
["VCA"],
6008 member_vnf_index
=vnf_index
,
6010 vdu_count_index
=None,
6011 ee_descriptor_id
=ee_descriptor_id
,
6013 result
, result_detail
= await self
._ns
_execute
_primitive
(
6022 + "vnf_config_primitive={} Done with result {} {}".format(
6023 vnf_config_primitive
, result
, result_detail
6026 # Update operationState = COMPLETED | FAILED
6027 self
._update
_suboperation
_status
(
6028 db_nslcmop
, op_index
, result
, result_detail
6031 if result
== "FAILED":
6032 raise LcmException(result_detail
)
6033 db_nsr_update
["config-status"] = old_config_status
6034 scale_process
= None
6039 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
6040 db_nsr_update
["operational-status"] = (
6042 if old_operational_status
== "failed"
6043 else old_operational_status
6045 db_nsr_update
["config-status"] = old_config_status
6048 ROclient
.ROClientException
,
6053 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
6055 except asyncio
.CancelledError
:
6057 logging_text
+ "Cancelled Exception while '{}'".format(step
)
6059 exc
= "Operation was cancelled"
6060 except Exception as e
:
6061 exc
= traceback
.format_exc()
6062 self
.logger
.critical(
6063 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6067 self
._write
_ns
_status
(
6070 current_operation
="IDLE",
6071 current_operation_id
=None,
6074 stage
[1] = "Waiting for instantiate pending tasks."
6075 self
.logger
.debug(logging_text
+ stage
[1])
6076 exc
= await self
._wait
_for
_tasks
(
6079 self
.timeout_ns_deploy
,
6087 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
6088 nslcmop_operation_state
= "FAILED"
6090 db_nsr_update
["operational-status"] = old_operational_status
6091 db_nsr_update
["config-status"] = old_config_status
6092 db_nsr_update
["detailed-status"] = ""
6094 if "VCA" in scale_process
:
6095 db_nsr_update
["config-status"] = "failed"
6096 if "RO" in scale_process
:
6097 db_nsr_update
["operational-status"] = "failed"
6100 ] = "FAILED scaling nslcmop={} {}: {}".format(
6101 nslcmop_id
, step
, exc
6104 error_description_nslcmop
= None
6105 nslcmop_operation_state
= "COMPLETED"
6106 db_nslcmop_update
["detailed-status"] = "Done"
6108 self
._write
_op
_status
(
6111 error_message
=error_description_nslcmop
,
6112 operation_state
=nslcmop_operation_state
,
6113 other_update
=db_nslcmop_update
,
6116 self
._write
_ns
_status
(
6119 current_operation
="IDLE",
6120 current_operation_id
=None,
6121 other_update
=db_nsr_update
,
6124 if nslcmop_operation_state
:
6128 "nslcmop_id": nslcmop_id
,
6129 "operationState": nslcmop_operation_state
,
6131 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
6132 except Exception as e
:
6134 logging_text
+ "kafka_write notification Exception {}".format(e
)
6136 self
.logger
.debug(logging_text
+ "Exit")
6137 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
6139 async def _scale_kdu(
6140 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6142 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
6143 for kdu_name
in _scaling_info
:
6144 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
6145 deployed_kdu
, index
= get_deployed_kdu(
6146 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
6148 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
6149 kdu_instance
= deployed_kdu
["kdu-instance"]
6150 scale
= int(kdu_scaling_info
["scale"])
6151 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
6154 "collection": "nsrs",
6155 "filter": {"_id": nsr_id
},
6156 "path": "_admin.deployed.K8s.{}".format(index
),
6159 step
= "scaling application {}".format(
6160 kdu_scaling_info
["resource-name"]
6162 self
.logger
.debug(logging_text
+ step
)
6164 if kdu_scaling_info
["type"] == "delete":
6165 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
6168 and kdu_config
.get("terminate-config-primitive")
6169 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
6171 terminate_config_primitive_list
= kdu_config
.get(
6172 "terminate-config-primitive"
6174 terminate_config_primitive_list
.sort(
6175 key
=lambda val
: int(val
["seq"])
6179 terminate_config_primitive
6180 ) in terminate_config_primitive_list
:
6181 primitive_params_
= self
._map
_primitive
_params
(
6182 terminate_config_primitive
, {}, {}
6184 step
= "execute terminate config primitive"
6185 self
.logger
.debug(logging_text
+ step
)
6186 await asyncio
.wait_for(
6187 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
6188 cluster_uuid
=cluster_uuid
,
6189 kdu_instance
=kdu_instance
,
6190 primitive_name
=terminate_config_primitive
["name"],
6191 params
=primitive_params_
,
6198 await asyncio
.wait_for(
6199 self
.k8scluster_map
[k8s_cluster_type
].scale(
6202 kdu_scaling_info
["resource-name"],
6205 timeout
=self
.timeout_vca_on_error
,
6208 if kdu_scaling_info
["type"] == "create":
6209 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
6212 and kdu_config
.get("initial-config-primitive")
6213 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
6215 initial_config_primitive_list
= kdu_config
.get(
6216 "initial-config-primitive"
6218 initial_config_primitive_list
.sort(
6219 key
=lambda val
: int(val
["seq"])
6222 for initial_config_primitive
in initial_config_primitive_list
:
6223 primitive_params_
= self
._map
_primitive
_params
(
6224 initial_config_primitive
, {}, {}
6226 step
= "execute initial config primitive"
6227 self
.logger
.debug(logging_text
+ step
)
6228 await asyncio
.wait_for(
6229 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
6230 cluster_uuid
=cluster_uuid
,
6231 kdu_instance
=kdu_instance
,
6232 primitive_name
=initial_config_primitive
["name"],
6233 params
=primitive_params_
,
6240 async def _scale_ng_ro(
6241 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
6243 nsr_id
= db_nslcmop
["nsInstanceId"]
6244 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
6247 # read from db: vnfd's for every vnf
6250 # for each vnf in ns, read vnfd
6251 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
6252 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
6253 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
6254 # if we haven't this vnfd, read it from db
6255 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
6257 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
6258 db_vnfds
.append(vnfd
)
6259 n2vc_key
= self
.n2vc
.get_public_key()
6260 n2vc_key_list
= [n2vc_key
]
6263 vdu_scaling_info
.get("vdu-create"),
6264 vdu_scaling_info
.get("vdu-delete"),
6267 # db_vnfr has been updated, update db_vnfrs to use it
6268 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
6269 await self
._instantiate
_ng
_ro
(
6279 start_deploy
=time(),
6280 timeout_ns_deploy
=self
.timeout_ns_deploy
,
6282 if vdu_scaling_info
.get("vdu-delete"):
6284 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
6287 async def add_prometheus_metrics(
6288 self
, ee_id
, artifact_path
, ee_config_descriptor
, vnfr_id
, nsr_id
, target_ip
6290 if not self
.prometheus
:
6292 # look if exist a file called 'prometheus*.j2' and
6293 artifact_content
= self
.fs
.dir_ls(artifact_path
)
6297 for f
in artifact_content
6298 if f
.startswith("prometheus") and f
.endswith(".j2")
6304 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
6308 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
6309 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
6311 vnfr_id
= vnfr_id
.replace("-", "")
6313 "JOB_NAME": vnfr_id
,
6314 "TARGET_IP": target_ip
,
6315 "EXPORTER_POD_IP": host_name
,
6316 "EXPORTER_POD_PORT": host_port
,
6318 job_list
= self
.prometheus
.parse_job(job_data
, variables
)
6319 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
6320 for job
in job_list
:
6322 not isinstance(job
.get("job_name"), str)
6323 or vnfr_id
not in job
["job_name"]
6325 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
6326 job
["nsr_id"] = nsr_id
6327 job_dict
= {jl
["job_name"]: jl
for jl
in job_list
}
6328 if await self
.prometheus
.update(job_dict
):
6329 return list(job_dict
.keys())
6331 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
6333 Get VCA Cloud and VCA Cloud Credentials for the VIM account
6335 :param: vim_account_id: VIM Account ID
6337 :return: (cloud_name, cloud_credential)
6339 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
6340 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
6342 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
6344 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
6346 :param: vim_account_id: VIM Account ID
6348 :return: (cloud_name, cloud_credential)
6350 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
6351 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")