1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
22 import logging
.handlers
33 from osm_lcm
import ROclient
34 from osm_lcm
.data_utils
.nsr
import get_deployed_kdu
35 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
36 from osm_lcm
.lcm_utils
import (
44 from osm_lcm
.data_utils
.nsd
import get_vnf_profiles
45 from osm_lcm
.data_utils
.vnfd
import (
48 get_ee_sorted_initial_config_primitive_list
,
49 get_ee_sorted_terminate_config_primitive_list
,
51 get_virtual_link_profiles
,
56 get_number_of_instances
,
60 from osm_lcm
.data_utils
.list_utils
import find_in_list
61 from osm_lcm
.data_utils
.vnfr
import get_osm_params
, get_vdur_index
, get_kdur
62 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
63 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
64 from n2vc
.k8s_helm_conn
import K8sHelmConnector
65 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
66 from n2vc
.k8s_juju_conn
import K8sJujuConnector
68 from osm_common
.dbbase
import DbException
69 from osm_common
.fsbase
import FsException
71 from osm_lcm
.data_utils
.database
.database
import Database
72 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
74 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
75 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
77 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
79 from copy
import copy
, deepcopy
81 from uuid
import uuid4
83 from random
import randint
85 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
89 timeout_vca_on_error
= (
91 ) # Time for charm from first time at blocked,error status to mark as failed
92 timeout_ns_deploy
= 2 * 3600 # default global timeout for deployment a ns
93 timeout_ns_terminate
= 1800 # default global timeout for un deployment a ns
94 timeout_charm_delete
= 10 * 60
95 timeout_primitive
= 30 * 60 # timeout for primitive execution
96 timeout_progress_primitive
= (
98 ) # timeout for some progress in a primitive execution
100 SUBOPERATION_STATUS_NOT_FOUND
= -1
101 SUBOPERATION_STATUS_NEW
= -2
102 SUBOPERATION_STATUS_SKIP
= -3
103 task_name_deploy_vca
= "Deploying VCA"
105 def __init__(self
, msg
, lcm_tasks
, config
, loop
, prometheus
=None):
107 Init, Connect to database, filesystem storage, and messaging
108 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
111 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
113 self
.db
= Database().instance
.db
114 self
.fs
= Filesystem().instance
.fs
116 self
.lcm_tasks
= lcm_tasks
117 self
.timeout
= config
["timeout"]
118 self
.ro_config
= config
["ro_config"]
119 self
.ng_ro
= config
["ro_config"].get("ng")
120 self
.vca_config
= config
["VCA"].copy()
122 # create N2VC connector
123 self
.n2vc
= N2VCJujuConnector(
126 on_update_db
=self
._on
_update
_n
2vc
_db
,
131 self
.conn_helm_ee
= LCMHelmConn(
134 vca_config
=self
.vca_config
,
135 on_update_db
=self
._on
_update
_n
2vc
_db
,
138 self
.k8sclusterhelm2
= K8sHelmConnector(
139 kubectl_command
=self
.vca_config
.get("kubectlpath"),
140 helm_command
=self
.vca_config
.get("helmpath"),
147 self
.k8sclusterhelm3
= K8sHelm3Connector(
148 kubectl_command
=self
.vca_config
.get("kubectlpath"),
149 helm_command
=self
.vca_config
.get("helm3path"),
156 self
.k8sclusterjuju
= K8sJujuConnector(
157 kubectl_command
=self
.vca_config
.get("kubectlpath"),
158 juju_command
=self
.vca_config
.get("jujupath"),
161 on_update_db
=self
._on
_update
_k
8s
_db
,
166 self
.k8scluster_map
= {
167 "helm-chart": self
.k8sclusterhelm2
,
168 "helm-chart-v3": self
.k8sclusterhelm3
,
169 "chart": self
.k8sclusterhelm3
,
170 "juju-bundle": self
.k8sclusterjuju
,
171 "juju": self
.k8sclusterjuju
,
175 "lxc_proxy_charm": self
.n2vc
,
176 "native_charm": self
.n2vc
,
177 "k8s_proxy_charm": self
.n2vc
,
178 "helm": self
.conn_helm_ee
,
179 "helm-v3": self
.conn_helm_ee
,
182 self
.prometheus
= prometheus
185 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
)
188 def increment_ip_mac(ip_mac
, vm_index
=1):
189 if not isinstance(ip_mac
, str):
192 # try with ipv4 look for last dot
193 i
= ip_mac
.rfind(".")
196 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
197 # try with ipv6 or mac look for last colon. Operate in hex
198 i
= ip_mac
.rfind(":")
201 # format in hex, len can be 2 for mac or 4 for ipv6
202 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
203 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
209 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
211 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
214 # TODO filter RO descriptor fields...
218 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
219 db_dict
["deploymentStatus"] = ro_descriptor
220 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
222 except Exception as e
:
224 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id
, e
)
227 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
229 # remove last dot from path (if exists)
230 if path
.endswith("."):
233 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
234 # .format(table, filter, path, updated_data))
237 nsr_id
= filter.get("_id")
239 # read ns record from database
240 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
241 current_ns_status
= nsr
.get("nsState")
243 # get vca status for NS
244 status_dict
= await self
.n2vc
.get_status(
245 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
250 db_dict
["vcaStatus"] = status_dict
251 await self
.n2vc
.update_vca_status(db_dict
["vcaStatus"], vca_id
=vca_id
)
253 # update configurationStatus for this VCA
255 vca_index
= int(path
[path
.rfind(".") + 1 :])
258 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
260 vca_status
= vca_list
[vca_index
].get("status")
262 configuration_status_list
= nsr
.get("configurationStatus")
263 config_status
= configuration_status_list
[vca_index
].get("status")
265 if config_status
== "BROKEN" and vca_status
!= "failed":
266 db_dict
["configurationStatus"][vca_index
] = "READY"
267 elif config_status
!= "BROKEN" and vca_status
== "failed":
268 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
269 except Exception as e
:
270 # not update configurationStatus
271 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
273 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
274 # if nsState = 'DEGRADED' check if all is OK
276 if current_ns_status
in ("READY", "DEGRADED"):
277 error_description
= ""
279 if status_dict
.get("machines"):
280 for machine_id
in status_dict
.get("machines"):
281 machine
= status_dict
.get("machines").get(machine_id
)
282 # check machine agent-status
283 if machine
.get("agent-status"):
284 s
= machine
.get("agent-status").get("status")
287 error_description
+= (
288 "machine {} agent-status={} ; ".format(
292 # check machine instance status
293 if machine
.get("instance-status"):
294 s
= machine
.get("instance-status").get("status")
297 error_description
+= (
298 "machine {} instance-status={} ; ".format(
303 if status_dict
.get("applications"):
304 for app_id
in status_dict
.get("applications"):
305 app
= status_dict
.get("applications").get(app_id
)
306 # check application status
307 if app
.get("status"):
308 s
= app
.get("status").get("status")
311 error_description
+= (
312 "application {} status={} ; ".format(app_id
, s
)
315 if error_description
:
316 db_dict
["errorDescription"] = error_description
317 if current_ns_status
== "READY" and is_degraded
:
318 db_dict
["nsState"] = "DEGRADED"
319 if current_ns_status
== "DEGRADED" and not is_degraded
:
320 db_dict
["nsState"] = "READY"
323 self
.update_db_2("nsrs", nsr_id
, db_dict
)
325 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
327 except Exception as e
:
328 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
330 async def _on_update_k8s_db(
331 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None, cluster_type
="juju"
334 Updating vca status in NSR record
335 :param cluster_uuid: UUID of a k8s cluster
336 :param kdu_instance: The unique name of the KDU instance
337 :param filter: To get nsr_id
338 :cluster_type: The cluster type (juju, k8s)
342 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
343 # .format(cluster_uuid, kdu_instance, filter))
345 nsr_id
= filter.get("_id")
347 vca_status
= await self
.k8scluster_map
[cluster_type
].status_kdu(
348 cluster_uuid
=cluster_uuid
,
349 kdu_instance
=kdu_instance
,
351 complete_status
=True,
357 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
359 if cluster_type
in ("juju-bundle", "juju"):
360 # TODO -> this should be done in a more uniform way, I think in N2VC, in order to update the K8s VCA
361 # status in a similar way between Juju Bundles and Helm Charts on this side
362 await self
.k8sclusterjuju
.update_vca_status(
363 db_dict
["vcaStatus"],
369 f
"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
373 self
.update_db_2("nsrs", nsr_id
, db_dict
)
374 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
376 except Exception as e
:
377 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
380 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
382 env
= Environment(undefined
=StrictUndefined
)
383 template
= env
.from_string(cloud_init_text
)
384 return template
.render(additional_params
or {})
385 except UndefinedError
as e
:
387 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
388 "file, must be provided in the instantiation parameters inside the "
389 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
391 except (TemplateError
, TemplateNotFound
) as e
:
393 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
398 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
399 cloud_init_content
= cloud_init_file
= None
401 if vdu
.get("cloud-init-file"):
402 base_folder
= vnfd
["_admin"]["storage"]
403 cloud_init_file
= "{}/{}/cloud_init/{}".format(
404 base_folder
["folder"],
405 base_folder
["pkg-dir"],
406 vdu
["cloud-init-file"],
408 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
409 cloud_init_content
= ci_file
.read()
410 elif vdu
.get("cloud-init"):
411 cloud_init_content
= vdu
["cloud-init"]
413 return cloud_init_content
414 except FsException
as e
:
416 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
417 vnfd
["id"], vdu
["id"], cloud_init_file
, e
421 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
423 (vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]),
426 additional_params
= vdur
.get("additionalParams")
427 return parse_yaml_strings(additional_params
)
429 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
431 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
432 :param vnfd: input vnfd
433 :param new_id: overrides vnf id if provided
434 :param additionalParams: Instantiation params for VNFs provided
435 :param nsrId: Id of the NSR
436 :return: copy of vnfd
438 vnfd_RO
= deepcopy(vnfd
)
439 # remove unused by RO configuration, monitoring, scaling and internal keys
440 vnfd_RO
.pop("_id", None)
441 vnfd_RO
.pop("_admin", None)
442 vnfd_RO
.pop("monitoring-param", None)
443 vnfd_RO
.pop("scaling-group-descriptor", None)
444 vnfd_RO
.pop("kdu", None)
445 vnfd_RO
.pop("k8s-cluster", None)
447 vnfd_RO
["id"] = new_id
449 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
450 for vdu
in get_iterable(vnfd_RO
, "vdu"):
451 vdu
.pop("cloud-init-file", None)
452 vdu
.pop("cloud-init", None)
456 def ip_profile_2_RO(ip_profile
):
457 RO_ip_profile
= deepcopy(ip_profile
)
458 if "dns-server" in RO_ip_profile
:
459 if isinstance(RO_ip_profile
["dns-server"], list):
460 RO_ip_profile
["dns-address"] = []
461 for ds
in RO_ip_profile
.pop("dns-server"):
462 RO_ip_profile
["dns-address"].append(ds
["address"])
464 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
465 if RO_ip_profile
.get("ip-version") == "ipv4":
466 RO_ip_profile
["ip-version"] = "IPv4"
467 if RO_ip_profile
.get("ip-version") == "ipv6":
468 RO_ip_profile
["ip-version"] = "IPv6"
469 if "dhcp-params" in RO_ip_profile
:
470 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
473 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
474 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
475 if db_vim
["_admin"]["operationalState"] != "ENABLED":
477 "VIM={} is not available. operationalState={}".format(
478 vim_account
, db_vim
["_admin"]["operationalState"]
481 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
484 def get_ro_wim_id_for_wim_account(self
, wim_account
):
485 if isinstance(wim_account
, str):
486 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
487 if db_wim
["_admin"]["operationalState"] != "ENABLED":
489 "WIM={} is not available. operationalState={}".format(
490 wim_account
, db_wim
["_admin"]["operationalState"]
493 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
498 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
500 db_vdu_push_list
= []
502 db_update
= {"_admin.modified": time()}
504 for vdu_id
, vdu_count
in vdu_create
.items():
508 for vdur
in reversed(db_vnfr
["vdur"])
509 if vdur
["vdu-id-ref"] == vdu_id
514 # Read the template saved in the db:
515 self
.logger
.debug(f
"No vdur in the database. Using the vdur-template to scale")
516 vdur_template
= db_vnfr
.get("vdur-template")
517 if not vdur_template
:
519 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
523 vdur
= vdur_template
[0]
524 #Delete a template from the database after using it
525 self
.db
.set_one("vnfrs",
526 {"_id": db_vnfr
["_id"]},
528 pull
={"vdur-template": {"_id": vdur
['_id']}}
530 for count
in range(vdu_count
):
531 vdur_copy
= deepcopy(vdur
)
532 vdur_copy
["status"] = "BUILD"
533 vdur_copy
["status-detailed"] = None
534 vdur_copy
["ip-address"] = None
535 vdur_copy
["_id"] = str(uuid4())
536 vdur_copy
["count-index"] += count
+ 1
537 vdur_copy
["id"] = "{}-{}".format(
538 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
540 vdur_copy
.pop("vim_info", None)
541 for iface
in vdur_copy
["interfaces"]:
542 if iface
.get("fixed-ip"):
543 iface
["ip-address"] = self
.increment_ip_mac(
544 iface
["ip-address"], count
+ 1
547 iface
.pop("ip-address", None)
548 if iface
.get("fixed-mac"):
549 iface
["mac-address"] = self
.increment_ip_mac(
550 iface
["mac-address"], count
+ 1
553 iface
.pop("mac-address", None)
557 ) # only first vdu can be managment of vnf
558 db_vdu_push_list
.append(vdur_copy
)
559 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
561 if len(db_vnfr
["vdur"]) == 1:
562 # The scale will move to 0 instances
563 self
.logger
.debug(f
"Scaling to 0 !, creating the template with the last vdur")
564 template_vdur
= [db_vnfr
["vdur"][0]]
565 for vdu_id
, vdu_count
in vdu_delete
.items():
567 indexes_to_delete
= [
569 for iv
in enumerate(db_vnfr
["vdur"])
570 if iv
[1]["vdu-id-ref"] == vdu_id
574 "vdur.{}.status".format(i
): "DELETING"
575 for i
in indexes_to_delete
[-vdu_count
:]
579 # it must be deleted one by one because common.db does not allow otherwise
582 for v
in reversed(db_vnfr
["vdur"])
583 if v
["vdu-id-ref"] == vdu_id
585 for vdu
in vdus_to_delete
[:vdu_count
]:
588 {"_id": db_vnfr
["_id"]},
590 pull
={"vdur": {"_id": vdu
["_id"]}},
594 db_push
["vdur"] = db_vdu_push_list
596 db_push
["vdur-template"] = template_vdur
599 db_vnfr
["vdur-template"] = template_vdur
600 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
601 # modify passed dictionary db_vnfr
602 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
603 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
605 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
607 Updates database nsr with the RO info for the created vld
608 :param ns_update_nsr: dictionary to be filled with the updated info
609 :param db_nsr: content of db_nsr. This is also modified
610 :param nsr_desc_RO: nsr descriptor from RO
611 :return: Nothing, LcmException is raised on errors
614 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
615 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
616 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
618 vld
["vim-id"] = net_RO
.get("vim_net_id")
619 vld
["name"] = net_RO
.get("vim_name")
620 vld
["status"] = net_RO
.get("status")
621 vld
["status-detailed"] = net_RO
.get("error_msg")
622 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
626 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
629 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
631 for db_vnfr
in db_vnfrs
.values():
632 vnfr_update
= {"status": "ERROR"}
633 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
634 if "status" not in vdur
:
635 vdur
["status"] = "ERROR"
636 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
638 vdur
["status-detailed"] = str(error_text
)
640 "vdur.{}.status-detailed".format(vdu_index
)
642 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
643 except DbException
as e
:
644 self
.logger
.error("Cannot update vnf. {}".format(e
))
646 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
648 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
649 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
650 :param nsr_desc_RO: nsr descriptor from RO
651 :return: Nothing, LcmException is raised on errors
653 for vnf_index
, db_vnfr
in db_vnfrs
.items():
654 for vnf_RO
in nsr_desc_RO
["vnfs"]:
655 if vnf_RO
["member_vnf_index"] != vnf_index
:
658 if vnf_RO
.get("ip_address"):
659 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
[
662 elif not db_vnfr
.get("ip-address"):
663 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
664 raise LcmExceptionNoMgmtIP(
665 "ns member_vnf_index '{}' has no IP address".format(
670 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
671 vdur_RO_count_index
= 0
672 if vdur
.get("pdu-type"):
674 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
675 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
677 if vdur
["count-index"] != vdur_RO_count_index
:
678 vdur_RO_count_index
+= 1
680 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
681 if vdur_RO
.get("ip_address"):
682 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
684 vdur
["ip-address"] = None
685 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
686 vdur
["name"] = vdur_RO
.get("vim_name")
687 vdur
["status"] = vdur_RO
.get("status")
688 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
689 for ifacer
in get_iterable(vdur
, "interfaces"):
690 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
691 if ifacer
["name"] == interface_RO
.get("internal_name"):
692 ifacer
["ip-address"] = interface_RO
.get(
695 ifacer
["mac-address"] = interface_RO
.get(
701 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
702 "from VIM info".format(
703 vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]
706 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
710 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
712 vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]
716 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
717 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
718 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
720 vld
["vim-id"] = net_RO
.get("vim_net_id")
721 vld
["name"] = net_RO
.get("vim_name")
722 vld
["status"] = net_RO
.get("status")
723 vld
["status-detailed"] = net_RO
.get("error_msg")
724 vnfr_update
["vld.{}".format(vld_index
)] = vld
728 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
733 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
738 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
743 def _get_ns_config_info(self
, nsr_id
):
745 Generates a mapping between vnf,vdu elements and the N2VC id
746 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
747 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
748 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
749 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
751 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
752 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
754 ns_config_info
= {"osm-config-mapping": mapping
}
755 for vca
in vca_deployed_list
:
756 if not vca
["member-vnf-index"]:
758 if not vca
["vdu_id"]:
759 mapping
[vca
["member-vnf-index"]] = vca
["application"]
763 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
765 ] = vca
["application"]
766 return ns_config_info
768 async def _instantiate_ng_ro(
785 def get_vim_account(vim_account_id
):
787 if vim_account_id
in db_vims
:
788 return db_vims
[vim_account_id
]
789 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
790 db_vims
[vim_account_id
] = db_vim
793 # modify target_vld info with instantiation parameters
794 def parse_vld_instantiation_params(
795 target_vim
, target_vld
, vld_params
, target_sdn
797 if vld_params
.get("ip-profile"):
798 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
[
801 if vld_params
.get("provider-network"):
802 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
805 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
806 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
809 if vld_params
.get("wimAccountId"):
810 target_wim
= "wim:{}".format(vld_params
["wimAccountId"])
811 target_vld
["vim_info"][target_wim
] = {}
812 for param
in ("vim-network-name", "vim-network-id"):
813 if vld_params
.get(param
):
814 if isinstance(vld_params
[param
], dict):
815 for vim
, vim_net
in vld_params
[param
].items():
816 other_target_vim
= "vim:" + vim
818 target_vld
["vim_info"],
819 (other_target_vim
, param
.replace("-", "_")),
822 else: # isinstance str
823 target_vld
["vim_info"][target_vim
][
824 param
.replace("-", "_")
825 ] = vld_params
[param
]
826 if vld_params
.get("common_id"):
827 target_vld
["common_id"] = vld_params
.get("common_id")
829 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
830 def update_ns_vld_target(target
, ns_params
):
831 for vnf_params
in ns_params
.get("vnf", ()):
832 if vnf_params
.get("vimAccountId"):
836 for vnfr
in db_vnfrs
.values()
837 if vnf_params
["member-vnf-index"]
838 == vnfr
["member-vnf-index-ref"]
842 vdur
= next((vdur
for vdur
in target_vnf
.get("vdur", ())), None)
843 for a_index
, a_vld
in enumerate(target
["ns"]["vld"]):
844 target_vld
= find_in_list(
845 get_iterable(vdur
, "interfaces"),
846 lambda iface
: iface
.get("ns-vld-id") == a_vld
["name"],
849 if vnf_params
.get("vimAccountId") not in a_vld
.get(
852 target
["ns"]["vld"][a_index
].get("vim_info").update(
854 "vim:{}".format(vnf_params
["vimAccountId"]): {
855 "vim_network_name": ""
860 nslcmop_id
= db_nslcmop
["_id"]
862 "name": db_nsr
["name"],
865 "image": deepcopy(db_nsr
["image"]),
866 "flavor": deepcopy(db_nsr
["flavor"]),
867 "action_id": nslcmop_id
,
868 "cloud_init_content": {},
870 for image
in target
["image"]:
871 image
["vim_info"] = {}
872 for flavor
in target
["flavor"]:
873 flavor
["vim_info"] = {}
874 if db_nsr
.get("affinity-or-anti-affinity-group"):
875 target
["affinity-or-anti-affinity-group"] = deepcopy(db_nsr
["affinity-or-anti-affinity-group"])
876 for affinity_or_anti_affinity_group
in target
["affinity-or-anti-affinity-group"]:
877 affinity_or_anti_affinity_group
["vim_info"] = {}
879 if db_nslcmop
.get("lcmOperationType") != "instantiate":
880 # get parameters of instantiation:
881 db_nslcmop_instantiate
= self
.db
.get_list(
884 "nsInstanceId": db_nslcmop
["nsInstanceId"],
885 "lcmOperationType": "instantiate",
888 ns_params
= db_nslcmop_instantiate
.get("operationParams")
890 ns_params
= db_nslcmop
.get("operationParams")
891 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
892 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
895 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
896 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
900 "mgmt-network": vld
.get("mgmt-network", False),
901 "type": vld
.get("type"),
904 "vim_network_name": vld
.get("vim-network-name"),
905 "vim_account_id": ns_params
["vimAccountId"],
909 # check if this network needs SDN assist
910 if vld
.get("pci-interfaces"):
911 db_vim
= get_vim_account(ns_params
["vimAccountId"])
912 sdnc_id
= db_vim
["config"].get("sdn-controller")
914 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
915 target_sdn
= "sdn:{}".format(sdnc_id
)
916 target_vld
["vim_info"][target_sdn
] = {
918 "target_vim": target_vim
,
920 "type": vld
.get("type"),
923 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
924 for nsd_vnf_profile
in nsd_vnf_profiles
:
925 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
926 if cp
["virtual-link-profile-id"] == vld
["id"]:
928 "member_vnf:{}.{}".format(
929 cp
["constituent-cpd-id"][0][
930 "constituent-base-element-id"
932 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
934 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
936 # check at nsd descriptor, if there is an ip-profile
938 nsd_vlp
= find_in_list(
939 get_virtual_link_profiles(nsd
),
940 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
945 and nsd_vlp
.get("virtual-link-protocol-data")
946 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
948 ip_profile_source_data
= nsd_vlp
["virtual-link-protocol-data"][
951 ip_profile_dest_data
= {}
952 if "ip-version" in ip_profile_source_data
:
953 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
956 if "cidr" in ip_profile_source_data
:
957 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
960 if "gateway-ip" in ip_profile_source_data
:
961 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
[
964 if "dhcp-enabled" in ip_profile_source_data
:
965 ip_profile_dest_data
["dhcp-params"] = {
966 "enabled": ip_profile_source_data
["dhcp-enabled"]
968 vld_params
["ip-profile"] = ip_profile_dest_data
970 # update vld_params with instantiation params
971 vld_instantiation_params
= find_in_list(
972 get_iterable(ns_params
, "vld"),
973 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
975 if vld_instantiation_params
:
976 vld_params
.update(vld_instantiation_params
)
977 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
978 target
["ns"]["vld"].append(target_vld
)
979 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
980 update_ns_vld_target(target
, ns_params
)
982 for vnfr
in db_vnfrs
.values():
984 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
986 vnf_params
= find_in_list(
987 get_iterable(ns_params
, "vnf"),
988 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
990 target_vnf
= deepcopy(vnfr
)
991 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
992 for vld
in target_vnf
.get("vld", ()):
993 # check if connected to a ns.vld, to fill target'
994 vnf_cp
= find_in_list(
995 vnfd
.get("int-virtual-link-desc", ()),
996 lambda cpd
: cpd
.get("id") == vld
["id"],
999 ns_cp
= "member_vnf:{}.{}".format(
1000 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
1002 if cp2target
.get(ns_cp
):
1003 vld
["target"] = cp2target
[ns_cp
]
1006 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
1008 # check if this network needs SDN assist
1010 if vld
.get("pci-interfaces"):
1011 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1012 sdnc_id
= db_vim
["config"].get("sdn-controller")
1014 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
1015 target_sdn
= "sdn:{}".format(sdnc_id
)
1016 vld
["vim_info"][target_sdn
] = {
1018 "target_vim": target_vim
,
1020 "type": vld
.get("type"),
1023 # check at vnfd descriptor, if there is an ip-profile
1025 vnfd_vlp
= find_in_list(
1026 get_virtual_link_profiles(vnfd
),
1027 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
1031 and vnfd_vlp
.get("virtual-link-protocol-data")
1032 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1034 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"][
1037 ip_profile_dest_data
= {}
1038 if "ip-version" in ip_profile_source_data
:
1039 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1042 if "cidr" in ip_profile_source_data
:
1043 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1046 if "gateway-ip" in ip_profile_source_data
:
1047 ip_profile_dest_data
[
1049 ] = ip_profile_source_data
["gateway-ip"]
1050 if "dhcp-enabled" in ip_profile_source_data
:
1051 ip_profile_dest_data
["dhcp-params"] = {
1052 "enabled": ip_profile_source_data
["dhcp-enabled"]
1055 vld_params
["ip-profile"] = ip_profile_dest_data
1056 # update vld_params with instantiation params
1058 vld_instantiation_params
= find_in_list(
1059 get_iterable(vnf_params
, "internal-vld"),
1060 lambda i_vld
: i_vld
["name"] == vld
["id"],
1062 if vld_instantiation_params
:
1063 vld_params
.update(vld_instantiation_params
)
1064 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
1067 for vdur
in target_vnf
.get("vdur", ()):
1068 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
1069 continue # This vdu must not be created
1070 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
1072 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
1075 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
1076 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
1079 and vdu_configuration
.get("config-access")
1080 and vdu_configuration
.get("config-access").get("ssh-access")
1082 vdur
["ssh-keys"] = ssh_keys_all
1083 vdur
["ssh-access-required"] = vdu_configuration
[
1085 ]["ssh-access"]["required"]
1088 and vnf_configuration
.get("config-access")
1089 and vnf_configuration
.get("config-access").get("ssh-access")
1090 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
1092 vdur
["ssh-keys"] = ssh_keys_all
1093 vdur
["ssh-access-required"] = vnf_configuration
[
1095 ]["ssh-access"]["required"]
1096 elif ssh_keys_instantiation
and find_in_list(
1097 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
1099 vdur
["ssh-keys"] = ssh_keys_instantiation
1101 self
.logger
.debug("NS > vdur > {}".format(vdur
))
1103 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1105 if vdud
.get("cloud-init-file"):
1106 vdur
["cloud-init"] = "{}:file:{}".format(
1107 vnfd
["_id"], vdud
.get("cloud-init-file")
1109 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1110 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1111 base_folder
= vnfd
["_admin"]["storage"]
1112 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1113 base_folder
["folder"],
1114 base_folder
["pkg-dir"],
1115 vdud
.get("cloud-init-file"),
1117 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1118 target
["cloud_init_content"][
1121 elif vdud
.get("cloud-init"):
1122 vdur
["cloud-init"] = "{}:vdu:{}".format(
1123 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1125 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1126 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1129 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1130 deploy_params_vdu
= self
._format
_additional
_params
(
1131 vdur
.get("additionalParams") or {}
1133 deploy_params_vdu
["OSM"] = get_osm_params(
1134 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1136 vdur
["additionalParams"] = deploy_params_vdu
1139 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1140 if target_vim
not in ns_flavor
["vim_info"]:
1141 ns_flavor
["vim_info"][target_vim
] = {}
1144 # in case alternative images are provided we must check if they should be applied
1145 # for the vim_type, modify the vim_type taking into account
1146 ns_image_id
= int(vdur
["ns-image-id"])
1147 if vdur
.get("alt-image-ids"):
1148 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1149 vim_type
= db_vim
["vim_type"]
1150 for alt_image_id
in vdur
.get("alt-image-ids"):
1151 ns_alt_image
= target
["image"][int(alt_image_id
)]
1152 if vim_type
== ns_alt_image
.get("vim-type"):
1153 # must use alternative image
1155 "use alternative image id: {}".format(alt_image_id
)
1157 ns_image_id
= alt_image_id
1158 vdur
["ns-image-id"] = ns_image_id
1160 ns_image
= target
["image"][int(ns_image_id
)]
1161 if target_vim
not in ns_image
["vim_info"]:
1162 ns_image
["vim_info"][target_vim
] = {}
1165 if vdur
.get("affinity-or-anti-affinity-group-id"):
1166 for ags_id
in vdur
["affinity-or-anti-affinity-group-id"]:
1167 ns_ags
= target
["affinity-or-anti-affinity-group"][int(ags_id
)]
1168 if target_vim
not in ns_ags
["vim_info"]:
1169 ns_ags
["vim_info"][target_vim
] = {}
1171 vdur
["vim_info"] = {target_vim
: {}}
1172 # instantiation parameters
1174 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
1175 # vdud["id"]), None)
1176 vdur_list
.append(vdur
)
1177 target_vnf
["vdur"] = vdur_list
1178 target
["vnf"].append(target_vnf
)
1180 desc
= await self
.RO
.deploy(nsr_id
, target
)
1181 self
.logger
.debug("RO return > {}".format(desc
))
1182 action_id
= desc
["action_id"]
1183 await self
._wait
_ng
_ro
(
1184 nsr_id
, action_id
, nslcmop_id
, start_deploy
, timeout_ns_deploy
, stage
1189 "_admin.deployed.RO.operational-status": "running",
1190 "detailed-status": " ".join(stage
),
1192 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1193 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1194 self
._write
_op
_status
(nslcmop_id
, stage
)
1196 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1200 async def _wait_ng_ro(
1209 detailed_status_old
= None
1211 start_time
= start_time
or time()
1212 while time() <= start_time
+ timeout
:
1213 desc_status
= await self
.RO
.status(nsr_id
, action_id
)
1214 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1215 if desc_status
["status"] == "FAILED":
1216 raise NgRoException(desc_status
["details"])
1217 elif desc_status
["status"] == "BUILD":
1219 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1220 elif desc_status
["status"] == "DONE":
1222 stage
[2] = "Deployed at VIM"
1225 assert False, "ROclient.check_ns_status returns unknown {}".format(
1226 desc_status
["status"]
1228 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1229 detailed_status_old
= stage
[2]
1230 db_nsr_update
["detailed-status"] = " ".join(stage
)
1231 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1232 self
._write
_op
_status
(nslcmop_id
, stage
)
1233 await asyncio
.sleep(15, loop
=self
.loop
)
1234 else: # timeout_ns_deploy
1235 raise NgRoException("Timeout waiting ns to deploy")
1237 async def _terminate_ng_ro(
1238 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1243 start_deploy
= time()
1250 "action_id": nslcmop_id
,
1252 desc
= await self
.RO
.deploy(nsr_id
, target
)
1253 action_id
= desc
["action_id"]
1254 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1255 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1258 + "ns terminate action at RO. action_id={}".format(action_id
)
1262 delete_timeout
= 20 * 60 # 20 minutes
1263 await self
._wait
_ng
_ro
(
1264 nsr_id
, action_id
, nslcmop_id
, start_deploy
, delete_timeout
, stage
1267 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1268 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1270 await self
.RO
.delete(nsr_id
)
1271 except Exception as e
:
1272 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
1273 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1274 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1275 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1277 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1279 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
1280 failed_detail
.append("delete conflict: {}".format(e
))
1283 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1286 failed_detail
.append("delete error: {}".format(e
))
1289 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1293 stage
[2] = "Error deleting from VIM"
1295 stage
[2] = "Deleted from VIM"
1296 db_nsr_update
["detailed-status"] = " ".join(stage
)
1297 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1298 self
._write
_op
_status
(nslcmop_id
, stage
)
1301 raise LcmException("; ".join(failed_detail
))
1304 async def instantiate_RO(
1318 :param logging_text: preffix text to use at logging
1319 :param nsr_id: nsr identity
1320 :param nsd: database content of ns descriptor
1321 :param db_nsr: database content of ns record
1322 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1324 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1325 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1326 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1327 :return: None or exception
1330 start_deploy
= time()
1331 ns_params
= db_nslcmop
.get("operationParams")
1332 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1333 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1335 timeout_ns_deploy
= self
.timeout
.get(
1336 "ns_deploy", self
.timeout_ns_deploy
1339 # Check for and optionally request placement optimization. Database will be updated if placement activated
1340 stage
[2] = "Waiting for Placement."
1341 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1342 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1343 for vnfr
in db_vnfrs
.values():
1344 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1347 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1349 return await self
._instantiate
_ng
_ro
(
1362 except Exception as e
:
1363 stage
[2] = "ERROR deploying at VIM"
1364 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1366 "Error deploying at VIM {}".format(e
),
1367 exc_info
=not isinstance(
1370 ROclient
.ROClientException
,
1379 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1381 Wait for kdu to be up, get ip address
1382 :param logging_text: prefix use for logging
1389 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1392 while nb_tries
< 360:
1393 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1397 for x
in get_iterable(db_vnfr
, "kdur")
1398 if x
.get("kdu-name") == kdu_name
1404 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1406 if kdur
.get("status"):
1407 if kdur
["status"] in ("READY", "ENABLED"):
1408 return kdur
.get("ip-address")
1411 "target KDU={} is in error state".format(kdu_name
)
1414 await asyncio
.sleep(10, loop
=self
.loop
)
1416 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1418 async def wait_vm_up_insert_key_ro(
1419 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1422 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1423 :param logging_text: prefix use for logging
1428 :param pub_key: public ssh key to inject, None to skip
1429 :param user: user to apply the public ssh key
1433 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1437 target_vdu_id
= None
1443 if ro_retries
>= 360: # 1 hour
1445 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1448 await asyncio
.sleep(10, loop
=self
.loop
)
1451 if not target_vdu_id
:
1452 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1454 if not vdu_id
: # for the VNF case
1455 if db_vnfr
.get("status") == "ERROR":
1457 "Cannot inject ssh-key because target VNF is in error state"
1459 ip_address
= db_vnfr
.get("ip-address")
1465 for x
in get_iterable(db_vnfr
, "vdur")
1466 if x
.get("ip-address") == ip_address
1474 for x
in get_iterable(db_vnfr
, "vdur")
1475 if x
.get("vdu-id-ref") == vdu_id
1476 and x
.get("count-index") == vdu_index
1482 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1483 ): # If only one, this should be the target vdu
1484 vdur
= db_vnfr
["vdur"][0]
1487 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1488 vnfr_id
, vdu_id
, vdu_index
1491 # New generation RO stores information at "vim_info"
1494 if vdur
.get("vim_info"):
1496 t
for t
in vdur
["vim_info"]
1497 ) # there should be only one key
1498 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1500 vdur
.get("pdu-type")
1501 or vdur
.get("status") == "ACTIVE"
1502 or ng_ro_status
== "ACTIVE"
1504 ip_address
= vdur
.get("ip-address")
1507 target_vdu_id
= vdur
["vdu-id-ref"]
1508 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1510 "Cannot inject ssh-key because target VM is in error state"
1513 if not target_vdu_id
:
1516 # inject public key into machine
1517 if pub_key
and user
:
1518 self
.logger
.debug(logging_text
+ "Inserting RO key")
1519 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1520 if vdur
.get("pdu-type"):
1521 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1524 ro_vm_id
= "{}-{}".format(
1525 db_vnfr
["member-vnf-index-ref"], target_vdu_id
1526 ) # TODO add vdu_index
1530 "action": "inject_ssh_key",
1534 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1536 desc
= await self
.RO
.deploy(nsr_id
, target
)
1537 action_id
= desc
["action_id"]
1538 await self
._wait
_ng
_ro
(nsr_id
, action_id
, timeout
=600)
1541 # wait until NS is deployed at RO
1543 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1544 ro_nsr_id
= deep_get(
1545 db_nsrs
, ("_admin", "deployed", "RO", "nsr_id")
1549 result_dict
= await self
.RO
.create_action(
1551 item_id_name
=ro_nsr_id
,
1553 "add_public_key": pub_key
,
1558 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1559 if not result_dict
or not isinstance(result_dict
, dict):
1561 "Unknown response from RO when injecting key"
1563 for result
in result_dict
.values():
1564 if result
.get("vim_result") == 200:
1567 raise ROclient
.ROClientException(
1568 "error injecting key: {}".format(
1569 result
.get("description")
1573 except NgRoException
as e
:
1575 "Reaching max tries injecting key. Error: {}".format(e
)
1577 except ROclient
.ROClientException
as e
:
1581 + "error injecting key: {}. Retrying until {} seconds".format(
1588 "Reaching max tries injecting key. Error: {}".format(e
)
1595 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1597 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1599 my_vca
= vca_deployed_list
[vca_index
]
1600 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1601 # vdu or kdu: no dependencies
1605 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1606 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1607 configuration_status_list
= db_nsr
["configurationStatus"]
1608 for index
, vca_deployed
in enumerate(configuration_status_list
):
1609 if index
== vca_index
:
1612 if not my_vca
.get("member-vnf-index") or (
1613 vca_deployed
.get("member-vnf-index")
1614 == my_vca
.get("member-vnf-index")
1616 internal_status
= configuration_status_list
[index
].get("status")
1617 if internal_status
== "READY":
1619 elif internal_status
== "BROKEN":
1621 "Configuration aborted because dependent charm/s has failed"
1626 # no dependencies, return
1628 await asyncio
.sleep(10)
1631 raise LcmException("Configuration aborted because dependent charm/s timeout")
1633 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1636 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1638 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1639 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1642 async def instantiate_N2VC(
1659 ee_config_descriptor
,
1661 nsr_id
= db_nsr
["_id"]
1662 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1663 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1664 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1665 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1667 "collection": "nsrs",
1668 "filter": {"_id": nsr_id
},
1669 "path": db_update_entry
,
1675 element_under_configuration
= nsr_id
1679 vnfr_id
= db_vnfr
["_id"]
1680 osm_config
["osm"]["vnf_id"] = vnfr_id
1682 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1684 if vca_type
== "native_charm":
1687 index_number
= vdu_index
or 0
1690 element_type
= "VNF"
1691 element_under_configuration
= vnfr_id
1692 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1694 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1695 element_type
= "VDU"
1696 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1697 osm_config
["osm"]["vdu_id"] = vdu_id
1699 namespace
+= ".{}".format(kdu_name
)
1700 element_type
= "KDU"
1701 element_under_configuration
= kdu_name
1702 osm_config
["osm"]["kdu_name"] = kdu_name
1705 artifact_path
= "{}/{}/{}/{}".format(
1706 base_folder
["folder"],
1707 base_folder
["pkg-dir"],
1709 if vca_type
in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1714 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1716 # get initial_config_primitive_list that applies to this element
1717 initial_config_primitive_list
= config_descriptor
.get(
1718 "initial-config-primitive"
1722 "Initial config primitive list > {}".format(
1723 initial_config_primitive_list
1727 # add config if not present for NS charm
1728 ee_descriptor_id
= ee_config_descriptor
.get("id")
1729 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1730 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1731 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1735 "Initial config primitive list #2 > {}".format(
1736 initial_config_primitive_list
1739 # n2vc_redesign STEP 3.1
1740 # find old ee_id if exists
1741 ee_id
= vca_deployed
.get("ee_id")
1743 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1744 # create or register execution environment in VCA
1745 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1747 self
._write
_configuration
_status
(
1749 vca_index
=vca_index
,
1751 element_under_configuration
=element_under_configuration
,
1752 element_type
=element_type
,
1755 step
= "create execution environment"
1756 self
.logger
.debug(logging_text
+ step
)
1760 if vca_type
== "k8s_proxy_charm":
1761 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1762 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1763 namespace
=namespace
,
1764 artifact_path
=artifact_path
,
1768 elif vca_type
== "helm" or vca_type
== "helm-v3":
1769 ee_id
, credentials
= await self
.vca_map
[
1771 ].create_execution_environment(
1772 namespace
=namespace
,
1776 artifact_path
=artifact_path
,
1780 ee_id
, credentials
= await self
.vca_map
[
1782 ].create_execution_environment(
1783 namespace
=namespace
,
1789 elif vca_type
== "native_charm":
1790 step
= "Waiting to VM being up and getting IP address"
1791 self
.logger
.debug(logging_text
+ step
)
1792 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1801 credentials
= {"hostname": rw_mgmt_ip
}
1803 username
= deep_get(
1804 config_descriptor
, ("config-access", "ssh-access", "default-user")
1806 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1807 # merged. Meanwhile let's get username from initial-config-primitive
1808 if not username
and initial_config_primitive_list
:
1809 for config_primitive
in initial_config_primitive_list
:
1810 for param
in config_primitive
.get("parameter", ()):
1811 if param
["name"] == "ssh-username":
1812 username
= param
["value"]
1816 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1817 "'config-access.ssh-access.default-user'"
1819 credentials
["username"] = username
1820 # n2vc_redesign STEP 3.2
1822 self
._write
_configuration
_status
(
1824 vca_index
=vca_index
,
1825 status
="REGISTERING",
1826 element_under_configuration
=element_under_configuration
,
1827 element_type
=element_type
,
1830 step
= "register execution environment {}".format(credentials
)
1831 self
.logger
.debug(logging_text
+ step
)
1832 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1833 credentials
=credentials
,
1834 namespace
=namespace
,
1839 # for compatibility with MON/POL modules, the need model and application name at database
1840 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1841 ee_id_parts
= ee_id
.split(".")
1842 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1843 if len(ee_id_parts
) >= 2:
1844 model_name
= ee_id_parts
[0]
1845 application_name
= ee_id_parts
[1]
1846 db_nsr_update
[db_update_entry
+ "model"] = model_name
1847 db_nsr_update
[db_update_entry
+ "application"] = application_name
1849 # n2vc_redesign STEP 3.3
1850 step
= "Install configuration Software"
1852 self
._write
_configuration
_status
(
1854 vca_index
=vca_index
,
1855 status
="INSTALLING SW",
1856 element_under_configuration
=element_under_configuration
,
1857 element_type
=element_type
,
1858 other_update
=db_nsr_update
,
1861 # TODO check if already done
1862 self
.logger
.debug(logging_text
+ step
)
1864 if vca_type
== "native_charm":
1865 config_primitive
= next(
1866 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
1869 if config_primitive
:
1870 config
= self
._map
_primitive
_params
(
1871 config_primitive
, {}, deploy_params
1874 if vca_type
== "lxc_proxy_charm":
1875 if element_type
== "NS":
1876 num_units
= db_nsr
.get("config-units") or 1
1877 elif element_type
== "VNF":
1878 num_units
= db_vnfr
.get("config-units") or 1
1879 elif element_type
== "VDU":
1880 for v
in db_vnfr
["vdur"]:
1881 if vdu_id
== v
["vdu-id-ref"]:
1882 num_units
= v
.get("config-units") or 1
1884 if vca_type
!= "k8s_proxy_charm":
1885 await self
.vca_map
[vca_type
].install_configuration_sw(
1887 artifact_path
=artifact_path
,
1890 num_units
=num_units
,
1895 # write in db flag of configuration_sw already installed
1897 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
1900 # add relations for this VCA (wait for other peers related with this VCA)
1901 await self
._add
_vca
_relations
(
1902 logging_text
=logging_text
,
1904 vca_index
=vca_index
,
1909 # if SSH access is required, then get execution environment SSH public
1910 # if native charm we have waited already to VM be UP
1911 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1914 # self.logger.debug("get ssh key block")
1916 config_descriptor
, ("config-access", "ssh-access", "required")
1918 # self.logger.debug("ssh key needed")
1919 # Needed to inject a ssh key
1922 ("config-access", "ssh-access", "default-user"),
1924 step
= "Install configuration Software, getting public ssh key"
1925 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
1926 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
1929 step
= "Insert public key into VM user={} ssh_key={}".format(
1933 # self.logger.debug("no need to get ssh key")
1934 step
= "Waiting to VM being up and getting IP address"
1935 self
.logger
.debug(logging_text
+ step
)
1937 # n2vc_redesign STEP 5.1
1938 # wait for RO (ip-address) Insert pub_key into VM
1941 rw_mgmt_ip
= await self
.wait_kdu_up(
1942 logging_text
, nsr_id
, vnfr_id
, kdu_name
1945 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1955 rw_mgmt_ip
= None # This is for a NS configuration
1957 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
1959 # store rw_mgmt_ip in deploy params for later replacement
1960 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
1962 # n2vc_redesign STEP 6 Execute initial config primitive
1963 step
= "execute initial config primitive"
1965 # wait for dependent primitives execution (NS -> VNF -> VDU)
1966 if initial_config_primitive_list
:
1967 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
1969 # stage, in function of element type: vdu, kdu, vnf or ns
1970 my_vca
= vca_deployed_list
[vca_index
]
1971 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1973 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
1974 elif my_vca
.get("member-vnf-index"):
1976 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
1979 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
1981 self
._write
_configuration
_status
(
1982 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
1985 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
1987 check_if_terminated_needed
= True
1988 for initial_config_primitive
in initial_config_primitive_list
:
1989 # adding information on the vca_deployed if it is a NS execution environment
1990 if not vca_deployed
["member-vnf-index"]:
1991 deploy_params
["ns_config_info"] = json
.dumps(
1992 self
._get
_ns
_config
_info
(nsr_id
)
1994 # TODO check if already done
1995 primitive_params_
= self
._map
_primitive
_params
(
1996 initial_config_primitive
, {}, deploy_params
1999 step
= "execute primitive '{}' params '{}'".format(
2000 initial_config_primitive
["name"], primitive_params_
2002 self
.logger
.debug(logging_text
+ step
)
2003 await self
.vca_map
[vca_type
].exec_primitive(
2005 primitive_name
=initial_config_primitive
["name"],
2006 params_dict
=primitive_params_
,
2011 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2012 if check_if_terminated_needed
:
2013 if config_descriptor
.get("terminate-config-primitive"):
2015 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
2017 check_if_terminated_needed
= False
2019 # TODO register in database that primitive is done
2021 # STEP 7 Configure metrics
2022 if vca_type
== "helm" or vca_type
== "helm-v3":
2023 prometheus_jobs
= await self
.add_prometheus_metrics(
2025 artifact_path
=artifact_path
,
2026 ee_config_descriptor
=ee_config_descriptor
,
2029 target_ip
=rw_mgmt_ip
,
2035 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
2038 step
= "instantiated at VCA"
2039 self
.logger
.debug(logging_text
+ step
)
2041 self
._write
_configuration
_status
(
2042 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
2045 except Exception as e
: # TODO not use Exception but N2VC exception
2046 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2048 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
2051 "Exception while {} : {}".format(step
, e
), exc_info
=True
2053 self
._write
_configuration
_status
(
2054 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2056 raise LcmException("{} {}".format(step
, e
)) from e
2058 def _write_ns_status(
2062 current_operation
: str,
2063 current_operation_id
: str,
2064 error_description
: str = None,
2065 error_detail
: str = None,
2066 other_update
: dict = None,
2069 Update db_nsr fields.
2072 :param current_operation:
2073 :param current_operation_id:
2074 :param error_description:
2075 :param error_detail:
2076 :param other_update: Other required changes at database if provided, will be cleared
2080 db_dict
= other_update
or {}
2083 ] = current_operation_id
# for backward compatibility
2084 db_dict
["_admin.current-operation"] = current_operation_id
2085 db_dict
["_admin.operation-type"] = (
2086 current_operation
if current_operation
!= "IDLE" else None
2088 db_dict
["currentOperation"] = current_operation
2089 db_dict
["currentOperationID"] = current_operation_id
2090 db_dict
["errorDescription"] = error_description
2091 db_dict
["errorDetail"] = error_detail
2094 db_dict
["nsState"] = ns_state
2095 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2096 except DbException
as e
:
2097 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2099 def _write_op_status(
2103 error_message
: str = None,
2104 queuePosition
: int = 0,
2105 operation_state
: str = None,
2106 other_update
: dict = None,
2109 db_dict
= other_update
or {}
2110 db_dict
["queuePosition"] = queuePosition
2111 if isinstance(stage
, list):
2112 db_dict
["stage"] = stage
[0]
2113 db_dict
["detailed-status"] = " ".join(stage
)
2114 elif stage
is not None:
2115 db_dict
["stage"] = str(stage
)
2117 if error_message
is not None:
2118 db_dict
["errorMessage"] = error_message
2119 if operation_state
is not None:
2120 db_dict
["operationState"] = operation_state
2121 db_dict
["statusEnteredTime"] = time()
2122 self
.update_db_2("nslcmops", op_id
, db_dict
)
2123 except DbException
as e
:
2125 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2128 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2130 nsr_id
= db_nsr
["_id"]
2131 # configurationStatus
2132 config_status
= db_nsr
.get("configurationStatus")
2135 "configurationStatus.{}.status".format(index
): status
2136 for index
, v
in enumerate(config_status
)
2140 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2142 except DbException
as e
:
2144 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2147 def _write_configuration_status(
2152 element_under_configuration
: str = None,
2153 element_type
: str = None,
2154 other_update
: dict = None,
2157 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2158 # .format(vca_index, status))
2161 db_path
= "configurationStatus.{}.".format(vca_index
)
2162 db_dict
= other_update
or {}
2164 db_dict
[db_path
+ "status"] = status
2165 if element_under_configuration
:
2167 db_path
+ "elementUnderConfiguration"
2168 ] = element_under_configuration
2170 db_dict
[db_path
+ "elementType"] = element_type
2171 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2172 except DbException
as e
:
2174 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2175 status
, nsr_id
, vca_index
, e
2179 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2181 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2182 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2183 Database is used because the result can be obtained from a different LCM worker in case of HA.
2184 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2185 :param db_nslcmop: database content of nslcmop
2186 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2187 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2188 computed 'vim-account-id'
2191 nslcmop_id
= db_nslcmop
["_id"]
2192 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2193 if placement_engine
== "PLA":
2195 logging_text
+ "Invoke and wait for placement optimization"
2197 await self
.msg
.aiowrite(
2198 "pla", "get_placement", {"nslcmopId": nslcmop_id
}, loop
=self
.loop
2200 db_poll_interval
= 5
2201 wait
= db_poll_interval
* 10
2203 while not pla_result
and wait
>= 0:
2204 await asyncio
.sleep(db_poll_interval
)
2205 wait
-= db_poll_interval
2206 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2207 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2211 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2214 for pla_vnf
in pla_result
["vnf"]:
2215 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2216 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2221 {"_id": vnfr
["_id"]},
2222 {"vim-account-id": pla_vnf
["vimAccountId"]},
2225 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2228 def update_nsrs_with_pla_result(self
, params
):
2230 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2232 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2234 except Exception as e
:
2235 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2237 async def instantiate(self
, nsr_id
, nslcmop_id
):
2240 :param nsr_id: ns instance to deploy
2241 :param nslcmop_id: operation to run
2245 # Try to lock HA task here
2246 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2247 if not task_is_locked_by_me
:
2249 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2253 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2254 self
.logger
.debug(logging_text
+ "Enter")
2256 # get all needed from database
2258 # database nsrs record
2261 # database nslcmops record
2264 # update operation on nsrs
2266 # update operation on nslcmops
2267 db_nslcmop_update
= {}
2269 nslcmop_operation_state
= None
2270 db_vnfrs
= {} # vnf's info indexed by member-index
2272 tasks_dict_info
= {} # from task to info text
2276 "Stage 1/5: preparation of the environment.",
2277 "Waiting for previous operations to terminate.",
2280 # ^ stage, step, VIM progress
2282 # wait for any previous tasks in process
2283 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2285 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2286 stage
[1] = "Reading from database."
2287 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2288 db_nsr_update
["detailed-status"] = "creating"
2289 db_nsr_update
["operational-status"] = "init"
2290 self
._write
_ns
_status
(
2292 ns_state
="BUILDING",
2293 current_operation
="INSTANTIATING",
2294 current_operation_id
=nslcmop_id
,
2295 other_update
=db_nsr_update
,
2297 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2299 # read from db: operation
2300 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2301 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2302 if db_nslcmop
["operationParams"].get("additionalParamsForVnf"):
2303 db_nslcmop
["operationParams"]["additionalParamsForVnf"] = json
.loads(
2304 db_nslcmop
["operationParams"]["additionalParamsForVnf"]
2306 ns_params
= db_nslcmop
.get("operationParams")
2307 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2308 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2310 timeout_ns_deploy
= self
.timeout
.get(
2311 "ns_deploy", self
.timeout_ns_deploy
2315 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2316 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2317 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2318 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2319 self
.fs
.sync(db_nsr
["nsd-id"])
2321 # nsr_name = db_nsr["name"] # TODO short-name??
2323 # read from db: vnf's of this ns
2324 stage
[1] = "Getting vnfrs from db."
2325 self
.logger
.debug(logging_text
+ stage
[1])
2326 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2328 # read from db: vnfd's for every vnf
2329 db_vnfds
= [] # every vnfd data
2331 # for each vnf in ns, read vnfd
2332 for vnfr
in db_vnfrs_list
:
2333 if vnfr
.get("kdur"):
2335 for kdur
in vnfr
["kdur"]:
2336 if kdur
.get("additionalParams"):
2337 kdur
["additionalParams"] = json
.loads(
2338 kdur
["additionalParams"]
2340 kdur_list
.append(kdur
)
2341 vnfr
["kdur"] = kdur_list
2343 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2344 vnfd_id
= vnfr
["vnfd-id"]
2345 vnfd_ref
= vnfr
["vnfd-ref"]
2346 self
.fs
.sync(vnfd_id
)
2348 # if we haven't this vnfd, read it from db
2349 if vnfd_id
not in db_vnfds
:
2351 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2354 self
.logger
.debug(logging_text
+ stage
[1])
2355 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2358 db_vnfds
.append(vnfd
)
2360 # Get or generates the _admin.deployed.VCA list
2361 vca_deployed_list
= None
2362 if db_nsr
["_admin"].get("deployed"):
2363 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2364 if vca_deployed_list
is None:
2365 vca_deployed_list
= []
2366 configuration_status_list
= []
2367 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2368 db_nsr_update
["configurationStatus"] = configuration_status_list
2369 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2370 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2371 elif isinstance(vca_deployed_list
, dict):
2372 # maintain backward compatibility. Change a dict to list at database
2373 vca_deployed_list
= list(vca_deployed_list
.values())
2374 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2375 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2378 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2380 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2381 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2383 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2384 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2385 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2387 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2390 # n2vc_redesign STEP 2 Deploy Network Scenario
2391 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2392 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2394 stage
[1] = "Deploying KDUs."
2395 # self.logger.debug(logging_text + "Before deploy_kdus")
2396 # Call to deploy_kdus in case exists the "vdu:kdu" param
2397 await self
.deploy_kdus(
2398 logging_text
=logging_text
,
2400 nslcmop_id
=nslcmop_id
,
2403 task_instantiation_info
=tasks_dict_info
,
2406 stage
[1] = "Getting VCA public key."
2407 # n2vc_redesign STEP 1 Get VCA public ssh-key
2408 # feature 1429. Add n2vc public key to needed VMs
2409 n2vc_key
= self
.n2vc
.get_public_key()
2410 n2vc_key_list
= [n2vc_key
]
2411 if self
.vca_config
.get("public_key"):
2412 n2vc_key_list
.append(self
.vca_config
["public_key"])
2414 stage
[1] = "Deploying NS at VIM."
2415 task_ro
= asyncio
.ensure_future(
2416 self
.instantiate_RO(
2417 logging_text
=logging_text
,
2421 db_nslcmop
=db_nslcmop
,
2424 n2vc_key_list
=n2vc_key_list
,
2428 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2429 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2431 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2432 stage
[1] = "Deploying Execution Environments."
2433 self
.logger
.debug(logging_text
+ stage
[1])
2435 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2436 for vnf_profile
in get_vnf_profiles(nsd
):
2437 vnfd_id
= vnf_profile
["vnfd-id"]
2438 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2439 member_vnf_index
= str(vnf_profile
["id"])
2440 db_vnfr
= db_vnfrs
[member_vnf_index
]
2441 base_folder
= vnfd
["_admin"]["storage"]
2447 # Get additional parameters
2448 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2449 if db_vnfr
.get("additionalParamsForVnf"):
2450 deploy_params
.update(
2451 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2454 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2455 if descriptor_config
:
2457 logging_text
=logging_text
2458 + "member_vnf_index={} ".format(member_vnf_index
),
2461 nslcmop_id
=nslcmop_id
,
2467 member_vnf_index
=member_vnf_index
,
2468 vdu_index
=vdu_index
,
2470 deploy_params
=deploy_params
,
2471 descriptor_config
=descriptor_config
,
2472 base_folder
=base_folder
,
2473 task_instantiation_info
=tasks_dict_info
,
2477 # Deploy charms for each VDU that supports one.
2478 for vdud
in get_vdu_list(vnfd
):
2480 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2481 vdur
= find_in_list(
2482 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2485 if vdur
.get("additionalParams"):
2486 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2488 deploy_params_vdu
= deploy_params
2489 deploy_params_vdu
["OSM"] = get_osm_params(
2490 db_vnfr
, vdu_id
, vdu_count_index
=0
2492 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2494 self
.logger
.debug("VDUD > {}".format(vdud
))
2496 "Descriptor config > {}".format(descriptor_config
)
2498 if descriptor_config
:
2501 for vdu_index
in range(vdud_count
):
2502 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2504 logging_text
=logging_text
2505 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2506 member_vnf_index
, vdu_id
, vdu_index
2510 nslcmop_id
=nslcmop_id
,
2516 member_vnf_index
=member_vnf_index
,
2517 vdu_index
=vdu_index
,
2519 deploy_params
=deploy_params_vdu
,
2520 descriptor_config
=descriptor_config
,
2521 base_folder
=base_folder
,
2522 task_instantiation_info
=tasks_dict_info
,
2525 for kdud
in get_kdu_list(vnfd
):
2526 kdu_name
= kdud
["name"]
2527 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2528 if descriptor_config
:
2533 x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
2535 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2536 if kdur
.get("additionalParams"):
2537 deploy_params_kdu
.update(
2538 parse_yaml_strings(kdur
["additionalParams"].copy())
2542 logging_text
=logging_text
,
2545 nslcmop_id
=nslcmop_id
,
2551 member_vnf_index
=member_vnf_index
,
2552 vdu_index
=vdu_index
,
2554 deploy_params
=deploy_params_kdu
,
2555 descriptor_config
=descriptor_config
,
2556 base_folder
=base_folder
,
2557 task_instantiation_info
=tasks_dict_info
,
2561 # Check if this NS has a charm configuration
2562 descriptor_config
= nsd
.get("ns-configuration")
2563 if descriptor_config
and descriptor_config
.get("juju"):
2566 member_vnf_index
= None
2572 # Get additional parameters
2573 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2574 if db_nsr
.get("additionalParamsForNs"):
2575 deploy_params
.update(
2576 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2578 base_folder
= nsd
["_admin"]["storage"]
2580 logging_text
=logging_text
,
2583 nslcmop_id
=nslcmop_id
,
2589 member_vnf_index
=member_vnf_index
,
2590 vdu_index
=vdu_index
,
2592 deploy_params
=deploy_params
,
2593 descriptor_config
=descriptor_config
,
2594 base_folder
=base_folder
,
2595 task_instantiation_info
=tasks_dict_info
,
2599 # rest of staff will be done at finally
2602 ROclient
.ROClientException
,
2608 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
2611 except asyncio
.CancelledError
:
2613 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
2615 exc
= "Operation was cancelled"
2616 except Exception as e
:
2617 exc
= traceback
.format_exc()
2618 self
.logger
.critical(
2619 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
2624 error_list
.append(str(exc
))
2626 # wait for pending tasks
2628 stage
[1] = "Waiting for instantiate pending tasks."
2629 self
.logger
.debug(logging_text
+ stage
[1])
2630 error_list
+= await self
._wait
_for
_tasks
(
2638 stage
[1] = stage
[2] = ""
2639 except asyncio
.CancelledError
:
2640 error_list
.append("Cancelled")
2641 # TODO cancel all tasks
2642 except Exception as exc
:
2643 error_list
.append(str(exc
))
2645 # update operation-status
2646 db_nsr_update
["operational-status"] = "running"
2647 # let's begin with VCA 'configured' status (later we can change it)
2648 db_nsr_update
["config-status"] = "configured"
2649 for task
, task_name
in tasks_dict_info
.items():
2650 if not task
.done() or task
.cancelled() or task
.exception():
2651 if task_name
.startswith(self
.task_name_deploy_vca
):
2652 # A N2VC task is pending
2653 db_nsr_update
["config-status"] = "failed"
2655 # RO or KDU task is pending
2656 db_nsr_update
["operational-status"] = "failed"
2658 # update status at database
2660 error_detail
= ". ".join(error_list
)
2661 self
.logger
.error(logging_text
+ error_detail
)
2662 error_description_nslcmop
= "{} Detail: {}".format(
2663 stage
[0], error_detail
2665 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
2666 nslcmop_id
, stage
[0]
2669 db_nsr_update
["detailed-status"] = (
2670 error_description_nsr
+ " Detail: " + error_detail
2672 db_nslcmop_update
["detailed-status"] = error_detail
2673 nslcmop_operation_state
= "FAILED"
2677 error_description_nsr
= error_description_nslcmop
= None
2679 db_nsr_update
["detailed-status"] = "Done"
2680 db_nslcmop_update
["detailed-status"] = "Done"
2681 nslcmop_operation_state
= "COMPLETED"
2684 self
._write
_ns
_status
(
2687 current_operation
="IDLE",
2688 current_operation_id
=None,
2689 error_description
=error_description_nsr
,
2690 error_detail
=error_detail
,
2691 other_update
=db_nsr_update
,
2693 self
._write
_op
_status
(
2696 error_message
=error_description_nslcmop
,
2697 operation_state
=nslcmop_operation_state
,
2698 other_update
=db_nslcmop_update
,
2701 if nslcmop_operation_state
:
2703 await self
.msg
.aiowrite(
2708 "nslcmop_id": nslcmop_id
,
2709 "operationState": nslcmop_operation_state
,
2713 except Exception as e
:
2715 logging_text
+ "kafka_write notification Exception {}".format(e
)
2718 self
.logger
.debug(logging_text
+ "Exit")
2719 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2721 async def _add_vca_relations(
2726 timeout
: int = 3600,
2727 vca_type
: str = None,
2732 # 1. find all relations for this VCA
2733 # 2. wait for other peers related
2737 vca_type
= vca_type
or "lxc_proxy_charm"
2739 # STEP 1: find all relations for this VCA
2742 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2743 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2746 my_vca
= deep_get(db_nsr
, ("_admin", "deployed", "VCA"))[vca_index
]
2748 # read all ns-configuration relations
2749 ns_relations
= list()
2750 db_ns_relations
= deep_get(nsd
, ("ns-configuration", "relation"))
2752 for r
in db_ns_relations
:
2753 # check if this VCA is in the relation
2754 if my_vca
.get("member-vnf-index") in (
2755 r
.get("entities")[0].get("id"),
2756 r
.get("entities")[1].get("id"),
2758 ns_relations
.append(r
)
2760 # read all vnf-configuration relations
2761 vnf_relations
= list()
2762 db_vnfd_list
= db_nsr
.get("vnfd-id")
2764 for vnfd
in db_vnfd_list
:
2765 db_vnf_relations
= None
2766 db_vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd
})
2767 db_vnf_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
2768 if db_vnf_configuration
:
2769 db_vnf_relations
= db_vnf_configuration
.get("relation", [])
2770 if db_vnf_relations
:
2771 for r
in db_vnf_relations
:
2772 # check if this VCA is in the relation
2773 if my_vca
.get("vdu_id") in (
2774 r
.get("entities")[0].get("id"),
2775 r
.get("entities")[1].get("id"),
2777 vnf_relations
.append(r
)
2779 # if no relations, terminate
2780 if not ns_relations
and not vnf_relations
:
2781 self
.logger
.debug(logging_text
+ " No relations")
2786 + " adding relations\n {}\n {}".format(
2787 ns_relations
, vnf_relations
2796 if now
- start
>= timeout
:
2797 self
.logger
.error(logging_text
+ " : timeout adding relations")
2800 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
2801 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2803 # for each defined NS relation, find the VCA's related
2804 for r
in ns_relations
.copy():
2805 from_vca_ee_id
= None
2807 from_vca_endpoint
= None
2808 to_vca_endpoint
= None
2809 vca_list
= deep_get(db_nsr
, ("_admin", "deployed", "VCA"))
2810 for vca
in vca_list
:
2811 if vca
.get("member-vnf-index") == r
.get("entities")[0].get(
2813 ) and vca
.get("config_sw_installed"):
2814 from_vca_ee_id
= vca
.get("ee_id")
2815 from_vca_endpoint
= r
.get("entities")[0].get("endpoint")
2816 if vca
.get("member-vnf-index") == r
.get("entities")[1].get(
2818 ) and vca
.get("config_sw_installed"):
2819 to_vca_ee_id
= vca
.get("ee_id")
2820 to_vca_endpoint
= r
.get("entities")[1].get("endpoint")
2821 if from_vca_ee_id
and to_vca_ee_id
:
2823 await self
.vca_map
[vca_type
].add_relation(
2824 ee_id_1
=from_vca_ee_id
,
2825 ee_id_2
=to_vca_ee_id
,
2826 endpoint_1
=from_vca_endpoint
,
2827 endpoint_2
=to_vca_endpoint
,
2830 # remove entry from relations list
2831 ns_relations
.remove(r
)
2833 # check failed peers
2835 vca_status_list
= db_nsr
.get("configurationStatus")
2837 for i
in range(len(vca_list
)):
2839 vca_status
= vca_status_list
[i
]
2840 if vca
.get("member-vnf-index") == r
.get("entities")[
2843 if vca_status
.get("status") == "BROKEN":
2844 # peer broken: remove relation from list
2845 ns_relations
.remove(r
)
2846 if vca
.get("member-vnf-index") == r
.get("entities")[
2849 if vca_status
.get("status") == "BROKEN":
2850 # peer broken: remove relation from list
2851 ns_relations
.remove(r
)
2856 # for each defined VNF relation, find the VCA's related
2857 for r
in vnf_relations
.copy():
2858 from_vca_ee_id
= None
2860 from_vca_endpoint
= None
2861 to_vca_endpoint
= None
2862 vca_list
= deep_get(db_nsr
, ("_admin", "deployed", "VCA"))
2863 for vca
in vca_list
:
2864 key_to_check
= "vdu_id"
2865 if vca
.get("vdu_id") is None:
2866 key_to_check
= "vnfd_id"
2867 if vca
.get(key_to_check
) == r
.get("entities")[0].get(
2869 ) and vca
.get("config_sw_installed"):
2870 from_vca_ee_id
= vca
.get("ee_id")
2871 from_vca_endpoint
= r
.get("entities")[0].get("endpoint")
2872 if vca
.get(key_to_check
) == r
.get("entities")[1].get(
2874 ) and vca
.get("config_sw_installed"):
2875 to_vca_ee_id
= vca
.get("ee_id")
2876 to_vca_endpoint
= r
.get("entities")[1].get("endpoint")
2877 if from_vca_ee_id
and to_vca_ee_id
:
2879 await self
.vca_map
[vca_type
].add_relation(
2880 ee_id_1
=from_vca_ee_id
,
2881 ee_id_2
=to_vca_ee_id
,
2882 endpoint_1
=from_vca_endpoint
,
2883 endpoint_2
=to_vca_endpoint
,
2886 # remove entry from relations list
2887 vnf_relations
.remove(r
)
2889 # check failed peers
2891 vca_status_list
= db_nsr
.get("configurationStatus")
2893 for i
in range(len(vca_list
)):
2895 vca_status
= vca_status_list
[i
]
2896 if vca
.get("vdu_id") == r
.get("entities")[0].get(
2899 if vca_status
.get("status") == "BROKEN":
2900 # peer broken: remove relation from list
2901 vnf_relations
.remove(r
)
2902 if vca
.get("vdu_id") == r
.get("entities")[1].get(
2905 if vca_status
.get("status") == "BROKEN":
2906 # peer broken: remove relation from list
2907 vnf_relations
.remove(r
)
2913 await asyncio
.sleep(5.0)
2915 if not ns_relations
and not vnf_relations
:
2916 self
.logger
.debug("Relations added")
2921 except Exception as e
:
2922 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
2925 async def _install_kdu(
2933 k8s_instance_info
: dict,
2934 k8params
: dict = None,
2940 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
2943 "collection": "nsrs",
2944 "filter": {"_id": nsr_id
},
2945 "path": nsr_db_path
,
2948 if k8s_instance_info
.get("kdu-deployment-name"):
2949 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
2951 kdu_instance
= self
.k8scluster_map
[
2953 ].generate_kdu_instance_name(
2954 db_dict
=db_dict_install
,
2955 kdu_model
=k8s_instance_info
["kdu-model"],
2956 kdu_name
=k8s_instance_info
["kdu-name"],
2959 "nsrs", nsr_id
, {nsr_db_path
+ ".kdu-instance": kdu_instance
}
2961 await self
.k8scluster_map
[k8sclustertype
].install(
2962 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
2963 kdu_model
=k8s_instance_info
["kdu-model"],
2966 db_dict
=db_dict_install
,
2968 kdu_name
=k8s_instance_info
["kdu-name"],
2969 namespace
=k8s_instance_info
["namespace"],
2970 kdu_instance
=kdu_instance
,
2974 "nsrs", nsr_id
, {nsr_db_path
+ ".kdu-instance": kdu_instance
}
2977 # Obtain services to obtain management service ip
2978 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
2979 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
2980 kdu_instance
=kdu_instance
,
2981 namespace
=k8s_instance_info
["namespace"],
2984 # Obtain management service info (if exists)
2985 vnfr_update_dict
= {}
2986 kdu_config
= get_configuration(vnfd
, kdud
["name"])
2988 target_ee_list
= kdu_config
.get("execution-environment-list", [])
2993 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
2996 for service
in kdud
.get("service", [])
2997 if service
.get("mgmt-service")
2999 for mgmt_service
in mgmt_services
:
3000 for service
in services
:
3001 if service
["name"].startswith(mgmt_service
["name"]):
3002 # Mgmt service found, Obtain service ip
3003 ip
= service
.get("external_ip", service
.get("cluster_ip"))
3004 if isinstance(ip
, list) and len(ip
) == 1:
3008 "kdur.{}.ip-address".format(kdu_index
)
3011 # Check if must update also mgmt ip at the vnf
3012 service_external_cp
= mgmt_service
.get(
3013 "external-connection-point-ref"
3015 if service_external_cp
:
3017 deep_get(vnfd
, ("mgmt-interface", "cp"))
3018 == service_external_cp
3020 vnfr_update_dict
["ip-address"] = ip
3025 "external-connection-point-ref", ""
3027 == service_external_cp
,
3030 "kdur.{}.ip-address".format(kdu_index
)
3035 "Mgmt service name: {} not found".format(
3036 mgmt_service
["name"]
3040 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3041 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3043 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3046 and kdu_config
.get("initial-config-primitive")
3047 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3049 initial_config_primitive_list
= kdu_config
.get(
3050 "initial-config-primitive"
3052 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3054 for initial_config_primitive
in initial_config_primitive_list
:
3055 primitive_params_
= self
._map
_primitive
_params
(
3056 initial_config_primitive
, {}, {}
3059 await asyncio
.wait_for(
3060 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3061 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3062 kdu_instance
=kdu_instance
,
3063 primitive_name
=initial_config_primitive
["name"],
3064 params
=primitive_params_
,
3065 db_dict
=db_dict_install
,
3071 except Exception as e
:
3072 # Prepare update db with error and raise exception
3075 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3079 vnfr_data
.get("_id"),
3080 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3083 # ignore to keep original exception
3085 # reraise original error
3090 async def deploy_kdus(
3097 task_instantiation_info
,
3099 # Launch kdus if present in the descriptor
3101 k8scluster_id_2_uuic
= {
3102 "helm-chart-v3": {},
3107 async def _get_cluster_id(cluster_id
, cluster_type
):
3108 nonlocal k8scluster_id_2_uuic
3109 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3110 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3112 # check if K8scluster is creating and wait look if previous tasks in process
3113 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3114 "k8scluster", cluster_id
3117 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3118 task_name
, cluster_id
3120 self
.logger
.debug(logging_text
+ text
)
3121 await asyncio
.wait(task_dependency
, timeout
=3600)
3123 db_k8scluster
= self
.db
.get_one(
3124 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3126 if not db_k8scluster
:
3127 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3129 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3131 if cluster_type
== "helm-chart-v3":
3133 # backward compatibility for existing clusters that have not been initialized for helm v3
3134 k8s_credentials
= yaml
.safe_dump(
3135 db_k8scluster
.get("credentials")
3137 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3138 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3140 db_k8scluster_update
= {}
3141 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3142 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3143 db_k8scluster_update
[
3144 "_admin.helm-chart-v3.created"
3146 db_k8scluster_update
[
3147 "_admin.helm-chart-v3.operationalState"
3150 "k8sclusters", cluster_id
, db_k8scluster_update
3152 except Exception as e
:
3155 + "error initializing helm-v3 cluster: {}".format(str(e
))
3158 "K8s cluster '{}' has not been initialized for '{}'".format(
3159 cluster_id
, cluster_type
3164 "K8s cluster '{}' has not been initialized for '{}'".format(
3165 cluster_id
, cluster_type
3168 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3171 logging_text
+= "Deploy kdus: "
3174 db_nsr_update
= {"_admin.deployed.K8s": []}
3175 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3178 updated_cluster_list
= []
3179 updated_v3_cluster_list
= []
3181 for vnfr_data
in db_vnfrs
.values():
3182 vca_id
= self
.get_vca_id(vnfr_data
, {})
3183 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3184 # Step 0: Prepare and set parameters
3185 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3186 vnfd_id
= vnfr_data
.get("vnfd-id")
3187 vnfd_with_id
= find_in_list(
3188 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3192 for kdud
in vnfd_with_id
["kdu"]
3193 if kdud
["name"] == kdur
["kdu-name"]
3195 namespace
= kdur
.get("k8s-namespace")
3196 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3197 if kdur
.get("helm-chart"):
3198 kdumodel
= kdur
["helm-chart"]
3199 # Default version: helm3, if helm-version is v2 assign v2
3200 k8sclustertype
= "helm-chart-v3"
3201 self
.logger
.debug("kdur: {}".format(kdur
))
3203 kdur
.get("helm-version")
3204 and kdur
.get("helm-version") == "v2"
3206 k8sclustertype
= "helm-chart"
3207 elif kdur
.get("juju-bundle"):
3208 kdumodel
= kdur
["juju-bundle"]
3209 k8sclustertype
= "juju-bundle"
3212 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3213 "juju-bundle. Maybe an old NBI version is running".format(
3214 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3217 # check if kdumodel is a file and exists
3219 vnfd_with_id
= find_in_list(
3220 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3222 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3223 if storage
and storage
.get(
3225 ): # may be not present if vnfd has not artifacts
3226 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3227 filename
= "{}/{}/{}s/{}".format(
3233 if self
.fs
.file_exists(
3234 filename
, mode
="file"
3235 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3236 kdumodel
= self
.fs
.path
+ filename
3237 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3239 except Exception: # it is not a file
3242 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3243 step
= "Synchronize repos for k8s cluster '{}'".format(
3246 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3250 k8sclustertype
== "helm-chart"
3251 and cluster_uuid
not in updated_cluster_list
3253 k8sclustertype
== "helm-chart-v3"
3254 and cluster_uuid
not in updated_v3_cluster_list
3256 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3257 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3258 cluster_uuid
=cluster_uuid
3261 if del_repo_list
or added_repo_dict
:
3262 if k8sclustertype
== "helm-chart":
3264 "_admin.helm_charts_added." + item
: None
3265 for item
in del_repo_list
3268 "_admin.helm_charts_added." + item
: name
3269 for item
, name
in added_repo_dict
.items()
3271 updated_cluster_list
.append(cluster_uuid
)
3272 elif k8sclustertype
== "helm-chart-v3":
3274 "_admin.helm_charts_v3_added." + item
: None
3275 for item
in del_repo_list
3278 "_admin.helm_charts_v3_added." + item
: name
3279 for item
, name
in added_repo_dict
.items()
3281 updated_v3_cluster_list
.append(cluster_uuid
)
3283 logging_text
+ "repos synchronized on k8s cluster "
3284 "'{}' to_delete: {}, to_add: {}".format(
3285 k8s_cluster_id
, del_repo_list
, added_repo_dict
3290 {"_id": k8s_cluster_id
},
3296 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3297 vnfr_data
["member-vnf-index-ref"],
3301 k8s_instance_info
= {
3302 "kdu-instance": None,
3303 "k8scluster-uuid": cluster_uuid
,
3304 "k8scluster-type": k8sclustertype
,
3305 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3306 "kdu-name": kdur
["kdu-name"],
3307 "kdu-model": kdumodel
,
3308 "namespace": namespace
,
3309 "kdu-deployment-name": kdu_deployment_name
,
3311 db_path
= "_admin.deployed.K8s.{}".format(index
)
3312 db_nsr_update
[db_path
] = k8s_instance_info
3313 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3314 vnfd_with_id
= find_in_list(
3315 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3317 task
= asyncio
.ensure_future(
3326 k8params
=desc_params
,
3331 self
.lcm_tasks
.register(
3335 "instantiate_KDU-{}".format(index
),
3338 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3344 except (LcmException
, asyncio
.CancelledError
):
3346 except Exception as e
:
3347 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3348 if isinstance(e
, (N2VCException
, DbException
)):
3349 self
.logger
.error(logging_text
+ msg
)
3351 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3352 raise LcmException(msg
)
3355 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3374 task_instantiation_info
,
3377 # launch instantiate_N2VC in a asyncio task and register task object
3378 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3379 # if not found, create one entry and update database
3380 # fill db_nsr._admin.deployed.VCA.<index>
3383 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
3385 if "execution-environment-list" in descriptor_config
:
3386 ee_list
= descriptor_config
.get("execution-environment-list", [])
3387 elif "juju" in descriptor_config
:
3388 ee_list
= [descriptor_config
] # ns charms
3389 else: # other types as script are not supported
3392 for ee_item
in ee_list
:
3395 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3396 ee_item
.get("juju"), ee_item
.get("helm-chart")
3399 ee_descriptor_id
= ee_item
.get("id")
3400 if ee_item
.get("juju"):
3401 vca_name
= ee_item
["juju"].get("charm")
3404 if ee_item
["juju"].get("charm") is not None
3407 if ee_item
["juju"].get("cloud") == "k8s":
3408 vca_type
= "k8s_proxy_charm"
3409 elif ee_item
["juju"].get("proxy") is False:
3410 vca_type
= "native_charm"
3411 elif ee_item
.get("helm-chart"):
3412 vca_name
= ee_item
["helm-chart"]
3413 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
3416 vca_type
= "helm-v3"
3419 logging_text
+ "skipping non juju neither charm configuration"
3424 for vca_index
, vca_deployed
in enumerate(
3425 db_nsr
["_admin"]["deployed"]["VCA"]
3427 if not vca_deployed
:
3430 vca_deployed
.get("member-vnf-index") == member_vnf_index
3431 and vca_deployed
.get("vdu_id") == vdu_id
3432 and vca_deployed
.get("kdu_name") == kdu_name
3433 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
3434 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
3438 # not found, create one.
3440 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
3443 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
3445 target
+= "/kdu/{}".format(kdu_name
)
3447 "target_element": target
,
3448 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3449 "member-vnf-index": member_vnf_index
,
3451 "kdu_name": kdu_name
,
3452 "vdu_count_index": vdu_index
,
3453 "operational-status": "init", # TODO revise
3454 "detailed-status": "", # TODO revise
3455 "step": "initial-deploy", # TODO revise
3457 "vdu_name": vdu_name
,
3459 "ee_descriptor_id": ee_descriptor_id
,
3463 # create VCA and configurationStatus in db
3465 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
3466 "configurationStatus.{}".format(vca_index
): dict(),
3468 self
.update_db_2("nsrs", nsr_id
, db_dict
)
3470 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
3472 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
3473 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
3474 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
3477 task_n2vc
= asyncio
.ensure_future(
3478 self
.instantiate_N2VC(
3479 logging_text
=logging_text
,
3480 vca_index
=vca_index
,
3486 vdu_index
=vdu_index
,
3487 deploy_params
=deploy_params
,
3488 config_descriptor
=descriptor_config
,
3489 base_folder
=base_folder
,
3490 nslcmop_id
=nslcmop_id
,
3494 ee_config_descriptor
=ee_item
,
3497 self
.lcm_tasks
.register(
3501 "instantiate_N2VC-{}".format(vca_index
),
3504 task_instantiation_info
[
3506 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
3507 member_vnf_index
or "", vdu_id
or ""
3511 def _create_nslcmop(nsr_id
, operation
, params
):
3513 Creates a ns-lcm-opp content to be stored at database.
3514 :param nsr_id: internal id of the instance
3515 :param operation: instantiate, terminate, scale, action, ...
3516 :param params: user parameters for the operation
3517 :return: dictionary following SOL005 format
3519 # Raise exception if invalid arguments
3520 if not (nsr_id
and operation
and params
):
3522 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3529 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3530 "operationState": "PROCESSING",
3531 "statusEnteredTime": now
,
3532 "nsInstanceId": nsr_id
,
3533 "lcmOperationType": operation
,
3535 "isAutomaticInvocation": False,
3536 "operationParams": params
,
3537 "isCancelPending": False,
3539 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
3540 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
3545 def _format_additional_params(self
, params
):
3546 params
= params
or {}
3547 for key
, value
in params
.items():
3548 if str(value
).startswith("!!yaml "):
3549 params
[key
] = yaml
.safe_load(value
[7:])
3552 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
3553 primitive
= seq
.get("name")
3554 primitive_params
= {}
3556 "member_vnf_index": vnf_index
,
3557 "primitive": primitive
,
3558 "primitive_params": primitive_params
,
3561 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
3565 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
3566 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
3567 if op
.get("operationState") == "COMPLETED":
3568 # b. Skip sub-operation
3569 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3570 return self
.SUBOPERATION_STATUS_SKIP
3572 # c. retry executing sub-operation
3573 # The sub-operation exists, and operationState != 'COMPLETED'
3574 # Update operationState = 'PROCESSING' to indicate a retry.
3575 operationState
= "PROCESSING"
3576 detailed_status
= "In progress"
3577 self
._update
_suboperation
_status
(
3578 db_nslcmop
, op_index
, operationState
, detailed_status
3580 # Return the sub-operation index
3581 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3582 # with arguments extracted from the sub-operation
3585 # Find a sub-operation where all keys in a matching dictionary must match
3586 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3587 def _find_suboperation(self
, db_nslcmop
, match
):
3588 if db_nslcmop
and match
:
3589 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
3590 for i
, op
in enumerate(op_list
):
3591 if all(op
.get(k
) == match
[k
] for k
in match
):
3593 return self
.SUBOPERATION_STATUS_NOT_FOUND
3595 # Update status for a sub-operation given its index
3596 def _update_suboperation_status(
3597 self
, db_nslcmop
, op_index
, operationState
, detailed_status
3599 # Update DB for HA tasks
3600 q_filter
= {"_id": db_nslcmop
["_id"]}
3602 "_admin.operations.{}.operationState".format(op_index
): operationState
,
3603 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
3606 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
3609 # Add sub-operation, return the index of the added sub-operation
3610 # Optionally, set operationState, detailed-status, and operationType
3611 # Status and type are currently set for 'scale' sub-operations:
3612 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3613 # 'detailed-status' : status message
3614 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3615 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3616 def _add_suboperation(
3624 mapped_primitive_params
,
3625 operationState
=None,
3626 detailed_status
=None,
3629 RO_scaling_info
=None,
3632 return self
.SUBOPERATION_STATUS_NOT_FOUND
3633 # Get the "_admin.operations" list, if it exists
3634 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
3635 op_list
= db_nslcmop_admin
.get("operations")
3636 # Create or append to the "_admin.operations" list
3638 "member_vnf_index": vnf_index
,
3640 "vdu_count_index": vdu_count_index
,
3641 "primitive": primitive
,
3642 "primitive_params": mapped_primitive_params
,
3645 new_op
["operationState"] = operationState
3647 new_op
["detailed-status"] = detailed_status
3649 new_op
["lcmOperationType"] = operationType
3651 new_op
["RO_nsr_id"] = RO_nsr_id
3653 new_op
["RO_scaling_info"] = RO_scaling_info
3655 # No existing operations, create key 'operations' with current operation as first list element
3656 db_nslcmop_admin
.update({"operations": [new_op
]})
3657 op_list
= db_nslcmop_admin
.get("operations")
3659 # Existing operations, append operation to list
3660 op_list
.append(new_op
)
3662 db_nslcmop_update
= {"_admin.operations": op_list
}
3663 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
3664 op_index
= len(op_list
) - 1
3667 # Helper methods for scale() sub-operations
3669 # pre-scale/post-scale:
3670 # Check for 3 different cases:
3671 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
3672 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
3673 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
3674 def _check_or_add_scale_suboperation(
3678 vnf_config_primitive
,
3682 RO_scaling_info
=None,
3684 # Find this sub-operation
3685 if RO_nsr_id
and RO_scaling_info
:
3686 operationType
= "SCALE-RO"
3688 "member_vnf_index": vnf_index
,
3689 "RO_nsr_id": RO_nsr_id
,
3690 "RO_scaling_info": RO_scaling_info
,
3694 "member_vnf_index": vnf_index
,
3695 "primitive": vnf_config_primitive
,
3696 "primitive_params": primitive_params
,
3697 "lcmOperationType": operationType
,
3699 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
3700 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
3701 # a. New sub-operation
3702 # The sub-operation does not exist, add it.
3703 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
3704 # The following parameters are set to None for all kind of scaling:
3706 vdu_count_index
= None
3708 if RO_nsr_id
and RO_scaling_info
:
3709 vnf_config_primitive
= None
3710 primitive_params
= None
3713 RO_scaling_info
= None
3714 # Initial status for sub-operation
3715 operationState
= "PROCESSING"
3716 detailed_status
= "In progress"
3717 # Add sub-operation for pre/post-scaling (zero or more operations)
3718 self
._add
_suboperation
(
3724 vnf_config_primitive
,
3732 return self
.SUBOPERATION_STATUS_NEW
3734 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3735 # or op_index (operationState != 'COMPLETED')
3736 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
3738 # Function to return execution_environment id
3740 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
3741 # TODO vdu_index_count
3742 for vca
in vca_deployed_list
:
3743 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
3746 async def destroy_N2VC(
3754 exec_primitives
=True,
3759 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
3760 :param logging_text:
3762 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
3763 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
3764 :param vca_index: index in the database _admin.deployed.VCA
3765 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
3766 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
3767 not executed properly
3768 :param scaling_in: True destroys the application, False destroys the model
3769 :return: None or exception
3774 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
3775 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
3779 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
3781 # execute terminate_primitives
3783 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
3784 config_descriptor
.get("terminate-config-primitive"),
3785 vca_deployed
.get("ee_descriptor_id"),
3787 vdu_id
= vca_deployed
.get("vdu_id")
3788 vdu_count_index
= vca_deployed
.get("vdu_count_index")
3789 vdu_name
= vca_deployed
.get("vdu_name")
3790 vnf_index
= vca_deployed
.get("member-vnf-index")
3791 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
3792 for seq
in terminate_primitives
:
3793 # For each sequence in list, get primitive and call _ns_execute_primitive()
3794 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
3795 vnf_index
, seq
.get("name")
3797 self
.logger
.debug(logging_text
+ step
)
3798 # Create the primitive for each sequence, i.e. "primitive": "touch"
3799 primitive
= seq
.get("name")
3800 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
3805 self
._add
_suboperation
(
3812 mapped_primitive_params
,
3814 # Sub-operations: Call _ns_execute_primitive() instead of action()
3816 result
, result_detail
= await self
._ns
_execute
_primitive
(
3817 vca_deployed
["ee_id"],
3819 mapped_primitive_params
,
3823 except LcmException
:
3824 # this happens when VCA is not deployed. In this case it is not needed to terminate
3826 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
3827 if result
not in result_ok
:
3829 "terminate_primitive {} for vnf_member_index={} fails with "
3830 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
3832 # set that this VCA do not need terminated
3833 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
3837 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
3840 if vca_deployed
.get("prometheus_jobs") and self
.prometheus
:
3841 await self
.prometheus
.update(remove_jobs
=vca_deployed
["prometheus_jobs"])
3844 await self
.vca_map
[vca_type
].delete_execution_environment(
3845 vca_deployed
["ee_id"],
3846 scaling_in
=scaling_in
,
3851 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
3852 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
3853 namespace
= "." + db_nsr
["_id"]
3855 await self
.n2vc
.delete_namespace(
3856 namespace
=namespace
,
3857 total_timeout
=self
.timeout_charm_delete
,
3860 except N2VCNotFound
: # already deleted. Skip
3862 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
3864 async def _terminate_RO(
3865 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
3868 Terminates a deployment from RO
3869 :param logging_text:
3870 :param nsr_deployed: db_nsr._admin.deployed
3873 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
3874 this method will update only the index 2, but it will write on database the concatenated content of the list
3879 ro_nsr_id
= ro_delete_action
= None
3880 if nsr_deployed
and nsr_deployed
.get("RO"):
3881 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
3882 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
3885 stage
[2] = "Deleting ns from VIM."
3886 db_nsr_update
["detailed-status"] = " ".join(stage
)
3887 self
._write
_op
_status
(nslcmop_id
, stage
)
3888 self
.logger
.debug(logging_text
+ stage
[2])
3889 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3890 self
._write
_op
_status
(nslcmop_id
, stage
)
3891 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
3892 ro_delete_action
= desc
["action_id"]
3894 "_admin.deployed.RO.nsr_delete_action_id"
3895 ] = ro_delete_action
3896 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
3897 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
3898 if ro_delete_action
:
3899 # wait until NS is deleted from VIM
3900 stage
[2] = "Waiting ns deleted from VIM."
3901 detailed_status_old
= None
3905 + " RO_id={} ro_delete_action={}".format(
3906 ro_nsr_id
, ro_delete_action
3909 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3910 self
._write
_op
_status
(nslcmop_id
, stage
)
3912 delete_timeout
= 20 * 60 # 20 minutes
3913 while delete_timeout
> 0:
3914 desc
= await self
.RO
.show(
3916 item_id_name
=ro_nsr_id
,
3917 extra_item
="action",
3918 extra_item_id
=ro_delete_action
,
3922 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
3924 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
3925 if ns_status
== "ERROR":
3926 raise ROclient
.ROClientException(ns_status_info
)
3927 elif ns_status
== "BUILD":
3928 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
3929 elif ns_status
== "ACTIVE":
3930 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
3931 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
3936 ), "ROclient.check_action_status returns unknown {}".format(
3939 if stage
[2] != detailed_status_old
:
3940 detailed_status_old
= stage
[2]
3941 db_nsr_update
["detailed-status"] = " ".join(stage
)
3942 self
._write
_op
_status
(nslcmop_id
, stage
)
3943 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3944 await asyncio
.sleep(5, loop
=self
.loop
)
3946 else: # delete_timeout <= 0:
3947 raise ROclient
.ROClientException(
3948 "Timeout waiting ns deleted from VIM"
3951 except Exception as e
:
3952 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3954 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
3956 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
3957 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
3958 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
3960 logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
)
3963 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
3965 failed_detail
.append("delete conflict: {}".format(e
))
3968 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
)
3971 failed_detail
.append("delete error: {}".format(e
))
3973 logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
)
3977 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
3978 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
3980 stage
[2] = "Deleting nsd from RO."
3981 db_nsr_update
["detailed-status"] = " ".join(stage
)
3982 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3983 self
._write
_op
_status
(nslcmop_id
, stage
)
3984 await self
.RO
.delete("nsd", ro_nsd_id
)
3986 logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
)
3988 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
3989 except Exception as e
:
3991 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
3993 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
3995 logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
)
3998 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4000 failed_detail
.append(
4001 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
)
4003 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4005 failed_detail
.append(
4006 "ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
)
4008 self
.logger
.error(logging_text
+ failed_detail
[-1])
4010 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
4011 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
4012 if not vnf_deployed
or not vnf_deployed
["id"]:
4015 ro_vnfd_id
= vnf_deployed
["id"]
4018 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4019 vnf_deployed
["member-vnf-index"], ro_vnfd_id
4021 db_nsr_update
["detailed-status"] = " ".join(stage
)
4022 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4023 self
._write
_op
_status
(nslcmop_id
, stage
)
4024 await self
.RO
.delete("vnfd", ro_vnfd_id
)
4026 logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
)
4028 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
4029 except Exception as e
:
4031 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4034 "_admin.deployed.RO.vnfd.{}.id".format(index
)
4038 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
)
4041 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4043 failed_detail
.append(
4044 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
)
4046 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4048 failed_detail
.append(
4049 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
)
4051 self
.logger
.error(logging_text
+ failed_detail
[-1])
4054 stage
[2] = "Error deleting from VIM"
4056 stage
[2] = "Deleted from VIM"
4057 db_nsr_update
["detailed-status"] = " ".join(stage
)
4058 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4059 self
._write
_op
_status
(nslcmop_id
, stage
)
4062 raise LcmException("; ".join(failed_detail
))
4064 async def terminate(self
, nsr_id
, nslcmop_id
):
4065 # Try to lock HA task here
4066 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4067 if not task_is_locked_by_me
:
4070 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4071 self
.logger
.debug(logging_text
+ "Enter")
4072 timeout_ns_terminate
= self
.timeout_ns_terminate
4075 operation_params
= None
4077 error_list
= [] # annotates all failed error messages
4078 db_nslcmop_update
= {}
4079 autoremove
= False # autoremove after terminated
4080 tasks_dict_info
= {}
4083 "Stage 1/3: Preparing task.",
4084 "Waiting for previous operations to terminate.",
4087 # ^ contains [stage, step, VIM-status]
4089 # wait for any previous tasks in process
4090 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4092 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4093 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4094 operation_params
= db_nslcmop
.get("operationParams") or {}
4095 if operation_params
.get("timeout_ns_terminate"):
4096 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4097 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4098 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4100 db_nsr_update
["operational-status"] = "terminating"
4101 db_nsr_update
["config-status"] = "terminating"
4102 self
._write
_ns
_status
(
4104 ns_state
="TERMINATING",
4105 current_operation
="TERMINATING",
4106 current_operation_id
=nslcmop_id
,
4107 other_update
=db_nsr_update
,
4109 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4110 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4111 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4114 stage
[1] = "Getting vnf descriptors from db."
4115 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4117 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4119 db_vnfds_from_id
= {}
4120 db_vnfds_from_member_index
= {}
4122 for vnfr
in db_vnfrs_list
:
4123 vnfd_id
= vnfr
["vnfd-id"]
4124 if vnfd_id
not in db_vnfds_from_id
:
4125 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4126 db_vnfds_from_id
[vnfd_id
] = vnfd
4127 db_vnfds_from_member_index
[
4128 vnfr
["member-vnf-index-ref"]
4129 ] = db_vnfds_from_id
[vnfd_id
]
4131 # Destroy individual execution environments when there are terminating primitives.
4132 # Rest of EE will be deleted at once
4133 # TODO - check before calling _destroy_N2VC
4134 # if not operation_params.get("skip_terminate_primitives"):#
4135 # or not vca.get("needed_terminate"):
4136 stage
[0] = "Stage 2/3 execute terminating primitives."
4137 self
.logger
.debug(logging_text
+ stage
[0])
4138 stage
[1] = "Looking execution environment that needs terminate."
4139 self
.logger
.debug(logging_text
+ stage
[1])
4141 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4142 config_descriptor
= None
4143 vca_member_vnf_index
= vca
.get("member-vnf-index")
4144 vca_id
= self
.get_vca_id(
4145 db_vnfrs_dict
.get(vca_member_vnf_index
)
4146 if vca_member_vnf_index
4150 if not vca
or not vca
.get("ee_id"):
4152 if not vca
.get("member-vnf-index"):
4154 config_descriptor
= db_nsr
.get("ns-configuration")
4155 elif vca
.get("vdu_id"):
4156 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4157 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4158 elif vca
.get("kdu_name"):
4159 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4160 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4162 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4163 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4164 vca_type
= vca
.get("type")
4165 exec_terminate_primitives
= not operation_params
.get(
4166 "skip_terminate_primitives"
4167 ) and vca
.get("needed_terminate")
4168 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4169 # pending native charms
4171 True if vca_type
in ("helm", "helm-v3", "native_charm") else False
4173 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4174 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4175 task
= asyncio
.ensure_future(
4183 exec_terminate_primitives
,
4187 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4189 # wait for pending tasks of terminate primitives
4193 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4195 error_list
= await self
._wait
_for
_tasks
(
4198 min(self
.timeout_charm_delete
, timeout_ns_terminate
),
4202 tasks_dict_info
.clear()
4204 return # raise LcmException("; ".join(error_list))
4206 # remove All execution environments at once
4207 stage
[0] = "Stage 3/3 delete all."
4209 if nsr_deployed
.get("VCA"):
4210 stage
[1] = "Deleting all execution environments."
4211 self
.logger
.debug(logging_text
+ stage
[1])
4212 vca_id
= self
.get_vca_id({}, db_nsr
)
4213 task_delete_ee
= asyncio
.ensure_future(
4215 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4216 timeout
=self
.timeout_charm_delete
,
4219 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4220 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4222 # Delete from k8scluster
4223 stage
[1] = "Deleting KDUs."
4224 self
.logger
.debug(logging_text
+ stage
[1])
4225 # print(nsr_deployed)
4226 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4227 if not kdu
or not kdu
.get("kdu-instance"):
4229 kdu_instance
= kdu
.get("kdu-instance")
4230 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4231 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4232 vca_id
= self
.get_vca_id({}, db_nsr
)
4233 task_delete_kdu_instance
= asyncio
.ensure_future(
4234 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4235 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4236 kdu_instance
=kdu_instance
,
4243 + "Unknown k8s deployment type {}".format(
4244 kdu
.get("k8scluster-type")
4249 task_delete_kdu_instance
4250 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4253 stage
[1] = "Deleting ns from VIM."
4255 task_delete_ro
= asyncio
.ensure_future(
4256 self
._terminate
_ng
_ro
(
4257 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4261 task_delete_ro
= asyncio
.ensure_future(
4263 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4266 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4268 # rest of staff will be done at finally
4271 ROclient
.ROClientException
,
4276 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4278 except asyncio
.CancelledError
:
4280 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4282 exc
= "Operation was cancelled"
4283 except Exception as e
:
4284 exc
= traceback
.format_exc()
4285 self
.logger
.critical(
4286 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4291 error_list
.append(str(exc
))
4293 # wait for pending tasks
4295 stage
[1] = "Waiting for terminate pending tasks."
4296 self
.logger
.debug(logging_text
+ stage
[1])
4297 error_list
+= await self
._wait
_for
_tasks
(
4300 timeout_ns_terminate
,
4304 stage
[1] = stage
[2] = ""
4305 except asyncio
.CancelledError
:
4306 error_list
.append("Cancelled")
4307 # TODO cancell all tasks
4308 except Exception as exc
:
4309 error_list
.append(str(exc
))
4310 # update status at database
4312 error_detail
= "; ".join(error_list
)
4313 # self.logger.error(logging_text + error_detail)
4314 error_description_nslcmop
= "{} Detail: {}".format(
4315 stage
[0], error_detail
4317 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4318 nslcmop_id
, stage
[0]
4321 db_nsr_update
["operational-status"] = "failed"
4322 db_nsr_update
["detailed-status"] = (
4323 error_description_nsr
+ " Detail: " + error_detail
4325 db_nslcmop_update
["detailed-status"] = error_detail
4326 nslcmop_operation_state
= "FAILED"
4330 error_description_nsr
= error_description_nslcmop
= None
4331 ns_state
= "NOT_INSTANTIATED"
4332 db_nsr_update
["operational-status"] = "terminated"
4333 db_nsr_update
["detailed-status"] = "Done"
4334 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4335 db_nslcmop_update
["detailed-status"] = "Done"
4336 nslcmop_operation_state
= "COMPLETED"
4339 self
._write
_ns
_status
(
4342 current_operation
="IDLE",
4343 current_operation_id
=None,
4344 error_description
=error_description_nsr
,
4345 error_detail
=error_detail
,
4346 other_update
=db_nsr_update
,
4348 self
._write
_op
_status
(
4351 error_message
=error_description_nslcmop
,
4352 operation_state
=nslcmop_operation_state
,
4353 other_update
=db_nslcmop_update
,
4355 if ns_state
== "NOT_INSTANTIATED":
4359 {"nsr-id-ref": nsr_id
},
4360 {"_admin.nsState": "NOT_INSTANTIATED"},
4362 except DbException
as e
:
4365 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4369 if operation_params
:
4370 autoremove
= operation_params
.get("autoremove", False)
4371 if nslcmop_operation_state
:
4373 await self
.msg
.aiowrite(
4378 "nslcmop_id": nslcmop_id
,
4379 "operationState": nslcmop_operation_state
,
4380 "autoremove": autoremove
,
4384 except Exception as e
:
4386 logging_text
+ "kafka_write notification Exception {}".format(e
)
4389 self
.logger
.debug(logging_text
+ "Exit")
4390 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4392 async def _wait_for_tasks(
4393 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4396 error_detail_list
= []
4398 pending_tasks
= list(created_tasks_info
.keys())
4399 num_tasks
= len(pending_tasks
)
4401 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4402 self
._write
_op
_status
(nslcmop_id
, stage
)
4403 while pending_tasks
:
4405 _timeout
= timeout
+ time_start
- time()
4406 done
, pending_tasks
= await asyncio
.wait(
4407 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4409 num_done
+= len(done
)
4410 if not done
: # Timeout
4411 for task
in pending_tasks
:
4412 new_error
= created_tasks_info
[task
] + ": Timeout"
4413 error_detail_list
.append(new_error
)
4414 error_list
.append(new_error
)
4417 if task
.cancelled():
4420 exc
= task
.exception()
4422 if isinstance(exc
, asyncio
.TimeoutError
):
4424 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4425 error_list
.append(created_tasks_info
[task
])
4426 error_detail_list
.append(new_error
)
4433 ROclient
.ROClientException
,
4439 self
.logger
.error(logging_text
+ new_error
)
4441 exc_traceback
= "".join(
4442 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4446 + created_tasks_info
[task
]
4452 logging_text
+ created_tasks_info
[task
] + ": Done"
4454 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4456 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4457 if nsr_id
: # update also nsr
4462 "errorDescription": "Error at: " + ", ".join(error_list
),
4463 "errorDetail": ". ".join(error_detail_list
),
4466 self
._write
_op
_status
(nslcmop_id
, stage
)
4467 return error_detail_list
4470 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4472 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4473 The default-value is used. If it is between < > it look for a value at instantiation_params
4474 :param primitive_desc: portion of VNFD/NSD that describes primitive
4475 :param params: Params provided by user
4476 :param instantiation_params: Instantiation params provided by user
4477 :return: a dictionary with the calculated params
4479 calculated_params
= {}
4480 for parameter
in primitive_desc
.get("parameter", ()):
4481 param_name
= parameter
["name"]
4482 if param_name
in params
:
4483 calculated_params
[param_name
] = params
[param_name
]
4484 elif "default-value" in parameter
or "value" in parameter
:
4485 if "value" in parameter
:
4486 calculated_params
[param_name
] = parameter
["value"]
4488 calculated_params
[param_name
] = parameter
["default-value"]
4490 isinstance(calculated_params
[param_name
], str)
4491 and calculated_params
[param_name
].startswith("<")
4492 and calculated_params
[param_name
].endswith(">")
4494 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4495 calculated_params
[param_name
] = instantiation_params
[
4496 calculated_params
[param_name
][1:-1]
4500 "Parameter {} needed to execute primitive {} not provided".format(
4501 calculated_params
[param_name
], primitive_desc
["name"]
4506 "Parameter {} needed to execute primitive {} not provided".format(
4507 param_name
, primitive_desc
["name"]
4511 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4512 calculated_params
[param_name
] = yaml
.safe_dump(
4513 calculated_params
[param_name
], default_flow_style
=True, width
=256
4515 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4517 ].startswith("!!yaml "):
4518 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4519 if parameter
.get("data-type") == "INTEGER":
4521 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4522 except ValueError: # error converting string to int
4524 "Parameter {} of primitive {} must be integer".format(
4525 param_name
, primitive_desc
["name"]
4528 elif parameter
.get("data-type") == "BOOLEAN":
4529 calculated_params
[param_name
] = not (
4530 (str(calculated_params
[param_name
])).lower() == "false"
4533 # add always ns_config_info if primitive name is config
4534 if primitive_desc
["name"] == "config":
4535 if "ns_config_info" in instantiation_params
:
4536 calculated_params
["ns_config_info"] = instantiation_params
[
4539 return calculated_params
4541 def _look_for_deployed_vca(
4548 ee_descriptor_id
=None,
4550 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4551 for vca
in deployed_vca
:
4554 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4557 vdu_count_index
is not None
4558 and vdu_count_index
!= vca
["vdu_count_index"]
4561 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4563 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4567 # vca_deployed not found
4569 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4570 " is not deployed".format(
4579 ee_id
= vca
.get("ee_id")
4581 "type", "lxc_proxy_charm"
4582 ) # default value for backward compatibility - proxy charm
4585 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4586 "execution environment".format(
4587 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
4590 return ee_id
, vca_type
4592 async def _ns_execute_primitive(
4598 retries_interval
=30,
4605 if primitive
== "config":
4606 primitive_params
= {"params": primitive_params
}
4608 vca_type
= vca_type
or "lxc_proxy_charm"
4612 output
= await asyncio
.wait_for(
4613 self
.vca_map
[vca_type
].exec_primitive(
4615 primitive_name
=primitive
,
4616 params_dict
=primitive_params
,
4617 progress_timeout
=self
.timeout_progress_primitive
,
4618 total_timeout
=self
.timeout_primitive
,
4623 timeout
=timeout
or self
.timeout_primitive
,
4627 except asyncio
.CancelledError
:
4629 except Exception as e
: # asyncio.TimeoutError
4630 if isinstance(e
, asyncio
.TimeoutError
):
4635 "Error executing action {} on {} -> {}".format(
4640 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
4642 return "FAILED", str(e
)
4644 return "COMPLETED", output
4646 except (LcmException
, asyncio
.CancelledError
):
4648 except Exception as e
:
4649 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
4651 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
4653 Updating the vca_status with latest juju information in nsrs record
4654 :param: nsr_id: Id of the nsr
4655 :param: nslcmop_id: Id of the nslcmop
4659 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
4660 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4661 vca_id
= self
.get_vca_id({}, db_nsr
)
4662 if db_nsr
["_admin"]["deployed"]["K8s"]:
4663 for _
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
4664 cluster_uuid
, kdu_instance
, cluster_type
= (
4665 k8s
["k8scluster-uuid"],
4666 k8s
["kdu-instance"],
4667 k8s
["k8scluster-type"],
4669 await self
._on
_update
_k
8s
_db
(
4670 cluster_uuid
=cluster_uuid
,
4671 kdu_instance
=kdu_instance
,
4672 filter={"_id": nsr_id
},
4674 cluster_type
=cluster_type
,
4677 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
4678 table
, filter = "nsrs", {"_id": nsr_id
}
4679 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
4680 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
4682 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
4683 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
4685 async def action(self
, nsr_id
, nslcmop_id
):
4686 # Try to lock HA task here
4687 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4688 if not task_is_locked_by_me
:
4691 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
4692 self
.logger
.debug(logging_text
+ "Enter")
4693 # get all needed from database
4697 db_nslcmop_update
= {}
4698 nslcmop_operation_state
= None
4699 error_description_nslcmop
= None
4702 # wait for any previous tasks in process
4703 step
= "Waiting for previous operations to terminate"
4704 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4706 self
._write
_ns
_status
(
4709 current_operation
="RUNNING ACTION",
4710 current_operation_id
=nslcmop_id
,
4713 step
= "Getting information from database"
4714 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4715 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4716 if db_nslcmop
["operationParams"].get("primitive_params"):
4717 db_nslcmop
["operationParams"]["primitive_params"] = json
.loads(
4718 db_nslcmop
["operationParams"]["primitive_params"]
4721 nsr_deployed
= db_nsr
["_admin"].get("deployed")
4722 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
4723 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
4724 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
4725 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
4726 primitive
= db_nslcmop
["operationParams"]["primitive"]
4727 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
4728 timeout_ns_action
= db_nslcmop
["operationParams"].get(
4729 "timeout_ns_action", self
.timeout_primitive
4733 step
= "Getting vnfr from database"
4734 db_vnfr
= self
.db
.get_one(
4735 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
4737 if db_vnfr
.get("kdur"):
4739 for kdur
in db_vnfr
["kdur"]:
4740 if kdur
.get("additionalParams"):
4741 kdur
["additionalParams"] = json
.loads(
4742 kdur
["additionalParams"]
4744 kdur_list
.append(kdur
)
4745 db_vnfr
["kdur"] = kdur_list
4746 step
= "Getting vnfd from database"
4747 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
4749 step
= "Getting nsd from database"
4750 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
4752 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
4753 # for backward compatibility
4754 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
4755 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
4756 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
4757 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4759 # look for primitive
4760 config_primitive_desc
= descriptor_configuration
= None
4762 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
4764 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
4766 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
4768 descriptor_configuration
= db_nsd
.get("ns-configuration")
4770 if descriptor_configuration
and descriptor_configuration
.get(
4773 for config_primitive
in descriptor_configuration
["config-primitive"]:
4774 if config_primitive
["name"] == primitive
:
4775 config_primitive_desc
= config_primitive
4778 if not config_primitive_desc
:
4779 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
4781 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
4785 primitive_name
= primitive
4786 ee_descriptor_id
= None
4788 primitive_name
= config_primitive_desc
.get(
4789 "execution-environment-primitive", primitive
4791 ee_descriptor_id
= config_primitive_desc
.get(
4792 "execution-environment-ref"
4798 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
4800 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
4803 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
4805 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
4807 desc_params
= parse_yaml_strings(
4808 db_vnfr
.get("additionalParamsForVnf")
4811 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
4812 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
4813 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
4815 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
4816 actions
.add(primitive
["name"])
4817 for primitive
in kdu_configuration
.get("config-primitive", []):
4818 actions
.add(primitive
["name"])
4819 kdu_action
= True if primitive_name
in actions
else False
4821 # TODO check if ns is in a proper status
4823 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
4825 # kdur and desc_params already set from before
4826 if primitive_params
:
4827 desc_params
.update(primitive_params
)
4828 # TODO Check if we will need something at vnf level
4829 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
4831 kdu_name
== kdu
["kdu-name"]
4832 and kdu
["member-vnf-index"] == vnf_index
4837 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
4840 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
4841 msg
= "unknown k8scluster-type '{}'".format(
4842 kdu
.get("k8scluster-type")
4844 raise LcmException(msg
)
4847 "collection": "nsrs",
4848 "filter": {"_id": nsr_id
},
4849 "path": "_admin.deployed.K8s.{}".format(index
),
4853 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
4855 step
= "Executing kdu {}".format(primitive_name
)
4856 if primitive_name
== "upgrade":
4857 if desc_params
.get("kdu_model"):
4858 kdu_model
= desc_params
.get("kdu_model")
4859 del desc_params
["kdu_model"]
4861 kdu_model
= kdu
.get("kdu-model")
4862 parts
= kdu_model
.split(sep
=":")
4864 kdu_model
= parts
[0]
4866 detailed_status
= await asyncio
.wait_for(
4867 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
4868 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4869 kdu_instance
=kdu
.get("kdu-instance"),
4871 kdu_model
=kdu_model
,
4874 timeout
=timeout_ns_action
,
4876 timeout
=timeout_ns_action
+ 10,
4879 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
4881 elif primitive_name
== "rollback":
4882 detailed_status
= await asyncio
.wait_for(
4883 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
4884 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4885 kdu_instance
=kdu
.get("kdu-instance"),
4888 timeout
=timeout_ns_action
,
4890 elif primitive_name
== "status":
4891 detailed_status
= await asyncio
.wait_for(
4892 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
4893 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4894 kdu_instance
=kdu
.get("kdu-instance"),
4897 timeout
=timeout_ns_action
,
4900 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
4901 kdu
["kdu-name"], nsr_id
4903 params
= self
._map
_primitive
_params
(
4904 config_primitive_desc
, primitive_params
, desc_params
4907 detailed_status
= await asyncio
.wait_for(
4908 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
4909 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4910 kdu_instance
=kdu_instance
,
4911 primitive_name
=primitive_name
,
4914 timeout
=timeout_ns_action
,
4917 timeout
=timeout_ns_action
,
4921 nslcmop_operation_state
= "COMPLETED"
4923 detailed_status
= ""
4924 nslcmop_operation_state
= "FAILED"
4926 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
4927 nsr_deployed
["VCA"],
4928 member_vnf_index
=vnf_index
,
4930 vdu_count_index
=vdu_count_index
,
4931 ee_descriptor_id
=ee_descriptor_id
,
4933 for vca_index
, vca_deployed
in enumerate(
4934 db_nsr
["_admin"]["deployed"]["VCA"]
4936 if vca_deployed
.get("member-vnf-index") == vnf_index
:
4938 "collection": "nsrs",
4939 "filter": {"_id": nsr_id
},
4940 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
4944 nslcmop_operation_state
,
4946 ) = await self
._ns
_execute
_primitive
(
4948 primitive
=primitive_name
,
4949 primitive_params
=self
._map
_primitive
_params
(
4950 config_primitive_desc
, primitive_params
, desc_params
4952 timeout
=timeout_ns_action
,
4958 db_nslcmop_update
["detailed-status"] = detailed_status
4959 error_description_nslcmop
= (
4960 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
4964 + " task Done with result {} {}".format(
4965 nslcmop_operation_state
, detailed_status
4968 return # database update is called inside finally
4970 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
4971 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4973 except asyncio
.CancelledError
:
4975 logging_text
+ "Cancelled Exception while '{}'".format(step
)
4977 exc
= "Operation was cancelled"
4978 except asyncio
.TimeoutError
:
4979 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
4981 except Exception as e
:
4982 exc
= traceback
.format_exc()
4983 self
.logger
.critical(
4984 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
4993 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
4994 nslcmop_operation_state
= "FAILED"
4996 self
._write
_ns
_status
(
5000 ], # TODO check if degraded. For the moment use previous status
5001 current_operation
="IDLE",
5002 current_operation_id
=None,
5003 # error_description=error_description_nsr,
5004 # error_detail=error_detail,
5005 other_update
=db_nsr_update
,
5008 self
._write
_op
_status
(
5011 error_message
=error_description_nslcmop
,
5012 operation_state
=nslcmop_operation_state
,
5013 other_update
=db_nslcmop_update
,
5016 if nslcmop_operation_state
:
5018 await self
.msg
.aiowrite(
5023 "nslcmop_id": nslcmop_id
,
5024 "operationState": nslcmop_operation_state
,
5028 except Exception as e
:
5030 logging_text
+ "kafka_write notification Exception {}".format(e
)
5032 self
.logger
.debug(logging_text
+ "Exit")
5033 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5034 return nslcmop_operation_state
, detailed_status
5036 async def scale(self
, nsr_id
, nslcmop_id
):
5037 # Try to lock HA task here
5038 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5039 if not task_is_locked_by_me
:
5042 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
5043 stage
= ["", "", ""]
5044 tasks_dict_info
= {}
5045 # ^ stage, step, VIM progress
5046 self
.logger
.debug(logging_text
+ "Enter")
5047 # get all needed from database
5049 db_nslcmop_update
= {}
5052 # in case of error, indicates what part of scale was failed to put nsr at error status
5053 scale_process
= None
5054 old_operational_status
= ""
5055 old_config_status
= ""
5058 # wait for any previous tasks in process
5059 step
= "Waiting for previous operations to terminate"
5060 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5061 self
._write
_ns
_status
(
5064 current_operation
="SCALING",
5065 current_operation_id
=nslcmop_id
,
5068 step
= "Getting nslcmop from database"
5070 step
+ " after having waited for previous tasks to be completed"
5072 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5074 step
= "Getting nsr from database"
5075 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5076 old_operational_status
= db_nsr
["operational-status"]
5077 old_config_status
= db_nsr
["config-status"]
5079 step
= "Parsing scaling parameters"
5080 db_nsr_update
["operational-status"] = "scaling"
5081 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5082 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5084 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
5086 ]["member-vnf-index"]
5087 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
5089 ]["scaling-group-descriptor"]
5090 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
5091 # for backward compatibility
5092 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5093 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5094 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5095 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5097 step
= "Getting vnfr from database"
5098 db_vnfr
= self
.db
.get_one(
5099 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
5102 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5104 step
= "Getting vnfd from database"
5105 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
5107 base_folder
= db_vnfd
["_admin"]["storage"]
5109 step
= "Getting scaling-group-descriptor"
5110 scaling_descriptor
= find_in_list(
5111 get_scaling_aspect(db_vnfd
),
5112 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
5114 if not scaling_descriptor
:
5116 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
5117 "at vnfd:scaling-group-descriptor".format(scaling_group
)
5120 step
= "Sending scale order to VIM"
5121 # TODO check if ns is in a proper status
5123 if not db_nsr
["_admin"].get("scaling-group"):
5128 "_admin.scaling-group": [
5129 {"name": scaling_group
, "nb-scale-op": 0}
5133 admin_scale_index
= 0
5135 for admin_scale_index
, admin_scale_info
in enumerate(
5136 db_nsr
["_admin"]["scaling-group"]
5138 if admin_scale_info
["name"] == scaling_group
:
5139 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
5141 else: # not found, set index one plus last element and add new entry with the name
5142 admin_scale_index
+= 1
5144 "_admin.scaling-group.{}.name".format(admin_scale_index
)
5147 vca_scaling_info
= []
5148 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
5149 if scaling_type
== "SCALE_OUT":
5150 if "aspect-delta-details" not in scaling_descriptor
:
5152 "Aspect delta details not fount in scaling descriptor {}".format(
5153 scaling_descriptor
["name"]
5156 # count if max-instance-count is reached
5157 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
5159 scaling_info
["scaling_direction"] = "OUT"
5160 scaling_info
["vdu-create"] = {}
5161 scaling_info
["kdu-create"] = {}
5162 for delta
in deltas
:
5163 for vdu_delta
in delta
.get("vdu-delta", {}):
5164 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
5165 # vdu_index also provides the number of instance of the targeted vdu
5166 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
5167 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
5171 additional_params
= (
5172 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
5175 cloud_init_list
= []
5177 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
5178 max_instance_count
= 10
5179 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
5180 max_instance_count
= vdu_profile
.get(
5181 "max-number-of-instances", 10
5184 default_instance_num
= get_number_of_instances(
5187 instances_number
= vdu_delta
.get("number-of-instances", 1)
5188 nb_scale_op
+= instances_number
5190 new_instance_count
= nb_scale_op
+ default_instance_num
5191 # Control if new count is over max and vdu count is less than max.
5192 # Then assign new instance count
5193 if new_instance_count
> max_instance_count
> vdu_count
:
5194 instances_number
= new_instance_count
- max_instance_count
5196 instances_number
= instances_number
5198 if new_instance_count
> max_instance_count
:
5200 "reached the limit of {} (max-instance-count) "
5201 "scaling-out operations for the "
5202 "scaling-group-descriptor '{}'".format(
5203 nb_scale_op
, scaling_group
5206 for x
in range(vdu_delta
.get("number-of-instances", 1)):
5208 # TODO Information of its own ip is not available because db_vnfr is not updated.
5209 additional_params
["OSM"] = get_osm_params(
5210 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
5212 cloud_init_list
.append(
5213 self
._parse
_cloud
_init
(
5220 vca_scaling_info
.append(
5222 "osm_vdu_id": vdu_delta
["id"],
5223 "member-vnf-index": vnf_index
,
5225 "vdu_index": vdu_index
+ x
,
5228 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
5229 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
5230 kdu_profile
= get_kdu_profile(db_vnfd
, kdu_delta
["id"])
5231 kdu_name
= kdu_profile
["kdu-name"]
5232 resource_name
= kdu_profile
["resource-name"]
5234 # Might have different kdus in the same delta
5235 # Should have list for each kdu
5236 if not scaling_info
["kdu-create"].get(kdu_name
, None):
5237 scaling_info
["kdu-create"][kdu_name
] = []
5239 kdur
= get_kdur(db_vnfr
, kdu_name
)
5240 if kdur
.get("helm-chart"):
5241 k8s_cluster_type
= "helm-chart-v3"
5242 self
.logger
.debug("kdur: {}".format(kdur
))
5244 kdur
.get("helm-version")
5245 and kdur
.get("helm-version") == "v2"
5247 k8s_cluster_type
= "helm-chart"
5248 raise NotImplementedError
5249 elif kdur
.get("juju-bundle"):
5250 k8s_cluster_type
= "juju-bundle"
5253 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5254 "juju-bundle. Maybe an old NBI version is running".format(
5255 db_vnfr
["member-vnf-index-ref"], kdu_name
5259 max_instance_count
= 10
5260 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
5261 max_instance_count
= kdu_profile
.get(
5262 "max-number-of-instances", 10
5265 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
5266 deployed_kdu
, _
= get_deployed_kdu(
5267 nsr_deployed
, kdu_name
, vnf_index
5269 if deployed_kdu
is None:
5271 "KDU '{}' for vnf '{}' not deployed".format(
5275 kdu_instance
= deployed_kdu
.get("kdu-instance")
5276 instance_num
= await self
.k8scluster_map
[
5278 ].get_scale_count(resource_name
, kdu_instance
, vca_id
=vca_id
)
5279 kdu_replica_count
= instance_num
+ kdu_delta
.get(
5280 "number-of-instances", 1
5283 # Control if new count is over max and instance_num is less than max.
5284 # Then assign max instance number to kdu replica count
5285 if kdu_replica_count
> max_instance_count
> instance_num
:
5286 kdu_replica_count
= max_instance_count
5287 if kdu_replica_count
> max_instance_count
:
5289 "reached the limit of {} (max-instance-count) "
5290 "scaling-out operations for the "
5291 "scaling-group-descriptor '{}'".format(
5292 instance_num
, scaling_group
5296 for x
in range(kdu_delta
.get("number-of-instances", 1)):
5297 vca_scaling_info
.append(
5299 "osm_kdu_id": kdu_name
,
5300 "member-vnf-index": vnf_index
,
5302 "kdu_index": instance_num
+ x
- 1,
5305 scaling_info
["kdu-create"][kdu_name
].append(
5307 "member-vnf-index": vnf_index
,
5309 "k8s-cluster-type": k8s_cluster_type
,
5310 "resource-name": resource_name
,
5311 "scale": kdu_replica_count
,
5314 elif scaling_type
== "SCALE_IN":
5315 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
5317 scaling_info
["scaling_direction"] = "IN"
5318 scaling_info
["vdu-delete"] = {}
5319 scaling_info
["kdu-delete"] = {}
5321 for delta
in deltas
:
5322 for vdu_delta
in delta
.get("vdu-delta", {}):
5323 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
5324 min_instance_count
= 0
5325 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
5326 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
5327 min_instance_count
= vdu_profile
["min-number-of-instances"]
5329 default_instance_num
= get_number_of_instances(
5330 db_vnfd
, vdu_delta
["id"]
5332 instance_num
= vdu_delta
.get("number-of-instances", 1)
5333 nb_scale_op
-= instance_num
5335 new_instance_count
= nb_scale_op
+ default_instance_num
5337 if new_instance_count
< min_instance_count
< vdu_count
:
5338 instances_number
= min_instance_count
- new_instance_count
5340 instances_number
= instance_num
5342 if new_instance_count
< min_instance_count
:
5344 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5345 "scaling-group-descriptor '{}'".format(
5346 nb_scale_op
, scaling_group
5349 for x
in range(vdu_delta
.get("number-of-instances", 1)):
5350 vca_scaling_info
.append(
5352 "osm_vdu_id": vdu_delta
["id"],
5353 "member-vnf-index": vnf_index
,
5355 "vdu_index": vdu_index
- 1 - x
,
5358 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
5359 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
5360 kdu_profile
= get_kdu_profile(db_vnfd
, kdu_delta
["id"])
5361 kdu_name
= kdu_profile
["kdu-name"]
5362 resource_name
= kdu_profile
["resource-name"]
5364 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
5365 scaling_info
["kdu-delete"][kdu_name
] = []
5367 kdur
= get_kdur(db_vnfr
, kdu_name
)
5368 if kdur
.get("helm-chart"):
5369 k8s_cluster_type
= "helm-chart-v3"
5370 self
.logger
.debug("kdur: {}".format(kdur
))
5372 kdur
.get("helm-version")
5373 and kdur
.get("helm-version") == "v2"
5375 k8s_cluster_type
= "helm-chart"
5376 raise NotImplementedError
5377 elif kdur
.get("juju-bundle"):
5378 k8s_cluster_type
= "juju-bundle"
5381 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5382 "juju-bundle. Maybe an old NBI version is running".format(
5383 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
5387 min_instance_count
= 0
5388 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
5389 min_instance_count
= kdu_profile
["min-number-of-instances"]
5391 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
5392 deployed_kdu
, _
= get_deployed_kdu(
5393 nsr_deployed
, kdu_name
, vnf_index
5395 if deployed_kdu
is None:
5397 "KDU '{}' for vnf '{}' not deployed".format(
5401 kdu_instance
= deployed_kdu
.get("kdu-instance")
5402 instance_num
= await self
.k8scluster_map
[
5404 ].get_scale_count(resource_name
, kdu_instance
, vca_id
=vca_id
)
5405 kdu_replica_count
= instance_num
- kdu_delta
.get(
5406 "number-of-instances", 1
5409 if kdu_replica_count
< min_instance_count
< instance_num
:
5410 kdu_replica_count
= min_instance_count
5411 if kdu_replica_count
< min_instance_count
:
5413 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5414 "scaling-group-descriptor '{}'".format(
5415 instance_num
, scaling_group
5419 for x
in range(kdu_delta
.get("number-of-instances", 1)):
5420 vca_scaling_info
.append(
5422 "osm_kdu_id": kdu_name
,
5423 "member-vnf-index": vnf_index
,
5425 "kdu_index": instance_num
- x
- 1,
5428 scaling_info
["kdu-delete"][kdu_name
].append(
5430 "member-vnf-index": vnf_index
,
5432 "k8s-cluster-type": k8s_cluster_type
,
5433 "resource-name": resource_name
,
5434 "scale": kdu_replica_count
,
5438 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
5439 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
5440 if scaling_info
["scaling_direction"] == "IN":
5441 for vdur
in reversed(db_vnfr
["vdur"]):
5442 if vdu_delete
.get(vdur
["vdu-id-ref"]):
5443 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
5444 scaling_info
["vdu"].append(
5446 "name": vdur
.get("name") or vdur
.get("vdu-name"),
5447 "vdu_id": vdur
["vdu-id-ref"],
5451 for interface
in vdur
["interfaces"]:
5452 scaling_info
["vdu"][-1]["interface"].append(
5454 "name": interface
["name"],
5455 "ip_address": interface
["ip-address"],
5456 "mac_address": interface
.get("mac-address"),
5459 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
5462 step
= "Executing pre-scale vnf-config-primitive"
5463 if scaling_descriptor
.get("scaling-config-action"):
5464 for scaling_config_action
in scaling_descriptor
[
5465 "scaling-config-action"
5468 scaling_config_action
.get("trigger") == "pre-scale-in"
5469 and scaling_type
== "SCALE_IN"
5471 scaling_config_action
.get("trigger") == "pre-scale-out"
5472 and scaling_type
== "SCALE_OUT"
5474 vnf_config_primitive
= scaling_config_action
[
5475 "vnf-config-primitive-name-ref"
5477 step
= db_nslcmop_update
[
5479 ] = "executing pre-scale scaling-config-action '{}'".format(
5480 vnf_config_primitive
5483 # look for primitive
5484 for config_primitive
in (
5485 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
5486 ).get("config-primitive", ()):
5487 if config_primitive
["name"] == vnf_config_primitive
:
5491 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
5492 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
5493 "primitive".format(scaling_group
, vnf_config_primitive
)
5496 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
5497 if db_vnfr
.get("additionalParamsForVnf"):
5498 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
5500 scale_process
= "VCA"
5501 db_nsr_update
["config-status"] = "configuring pre-scaling"
5502 primitive_params
= self
._map
_primitive
_params
(
5503 config_primitive
, {}, vnfr_params
5506 # Pre-scale retry check: Check if this sub-operation has been executed before
5507 op_index
= self
._check
_or
_add
_scale
_suboperation
(
5510 vnf_config_primitive
,
5514 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
5515 # Skip sub-operation
5516 result
= "COMPLETED"
5517 result_detail
= "Done"
5520 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5521 vnf_config_primitive
, result
, result_detail
5525 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
5526 # New sub-operation: Get index of this sub-operation
5528 len(db_nslcmop
.get("_admin", {}).get("operations"))
5533 + "vnf_config_primitive={} New sub-operation".format(
5534 vnf_config_primitive
5538 # retry: Get registered params for this existing sub-operation
5539 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
5542 vnf_index
= op
.get("member_vnf_index")
5543 vnf_config_primitive
= op
.get("primitive")
5544 primitive_params
= op
.get("primitive_params")
5547 + "vnf_config_primitive={} Sub-operation retry".format(
5548 vnf_config_primitive
5551 # Execute the primitive, either with new (first-time) or registered (reintent) args
5552 ee_descriptor_id
= config_primitive
.get(
5553 "execution-environment-ref"
5555 primitive_name
= config_primitive
.get(
5556 "execution-environment-primitive", vnf_config_primitive
5558 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5559 nsr_deployed
["VCA"],
5560 member_vnf_index
=vnf_index
,
5562 vdu_count_index
=None,
5563 ee_descriptor_id
=ee_descriptor_id
,
5565 result
, result_detail
= await self
._ns
_execute
_primitive
(
5574 + "vnf_config_primitive={} Done with result {} {}".format(
5575 vnf_config_primitive
, result
, result_detail
5578 # Update operationState = COMPLETED | FAILED
5579 self
._update
_suboperation
_status
(
5580 db_nslcmop
, op_index
, result
, result_detail
5583 if result
== "FAILED":
5584 raise LcmException(result_detail
)
5585 db_nsr_update
["config-status"] = old_config_status
5586 scale_process
= None
5590 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
5593 "_admin.scaling-group.{}.time".format(admin_scale_index
)
5596 # SCALE-IN VCA - BEGIN
5597 if vca_scaling_info
:
5598 step
= db_nslcmop_update
[
5600 ] = "Deleting the execution environments"
5601 scale_process
= "VCA"
5602 for vca_info
in vca_scaling_info
:
5603 if vca_info
["type"] == "delete":
5604 member_vnf_index
= str(vca_info
["member-vnf-index"])
5606 logging_text
+ "vdu info: {}".format(vca_info
)
5608 if vca_info
.get("osm_vdu_id"):
5609 vdu_id
= vca_info
["osm_vdu_id"]
5610 vdu_index
= int(vca_info
["vdu_index"])
5613 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5614 member_vnf_index
, vdu_id
, vdu_index
5618 kdu_id
= vca_info
["osm_kdu_id"]
5621 ] = "Scaling member_vnf_index={}, kdu_id={}, vdu_index={} ".format(
5622 member_vnf_index
, kdu_id
, vdu_index
5624 stage
[2] = step
= "Scaling in VCA"
5625 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
5626 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
5627 config_update
= db_nsr
["configurationStatus"]
5628 for vca_index
, vca
in enumerate(vca_update
):
5630 (vca
or vca
.get("ee_id"))
5631 and vca
["member-vnf-index"] == member_vnf_index
5632 and vca
["vdu_count_index"] == vdu_index
5634 if vca
.get("vdu_id"):
5635 config_descriptor
= get_configuration(
5636 db_vnfd
, vca
.get("vdu_id")
5638 elif vca
.get("kdu_name"):
5639 config_descriptor
= get_configuration(
5640 db_vnfd
, vca
.get("kdu_name")
5643 config_descriptor
= get_configuration(
5644 db_vnfd
, db_vnfd
["id"]
5646 operation_params
= (
5647 db_nslcmop
.get("operationParams") or {}
5649 exec_terminate_primitives
= not operation_params
.get(
5650 "skip_terminate_primitives"
5651 ) and vca
.get("needed_terminate")
5652 task
= asyncio
.ensure_future(
5661 exec_primitives
=exec_terminate_primitives
,
5665 timeout
=self
.timeout_charm_delete
,
5668 tasks_dict_info
[task
] = "Terminating VCA {}".format(
5671 del vca_update
[vca_index
]
5672 del config_update
[vca_index
]
5673 # wait for pending tasks of terminate primitives
5677 + "Waiting for tasks {}".format(
5678 list(tasks_dict_info
.keys())
5681 error_list
= await self
._wait
_for
_tasks
(
5685 self
.timeout_charm_delete
, self
.timeout_ns_terminate
5690 tasks_dict_info
.clear()
5692 raise LcmException("; ".join(error_list
))
5694 db_vca_and_config_update
= {
5695 "_admin.deployed.VCA": vca_update
,
5696 "configurationStatus": config_update
,
5699 "nsrs", db_nsr
["_id"], db_vca_and_config_update
5701 scale_process
= None
5702 # SCALE-IN VCA - END
5705 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
5706 scale_process
= "RO"
5707 if self
.ro_config
.get("ng"):
5708 await self
._scale
_ng
_ro
(
5709 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
5711 scaling_info
.pop("vdu-create", None)
5712 scaling_info
.pop("vdu-delete", None)
5714 scale_process
= None
5718 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
5719 scale_process
= "KDU"
5720 await self
._scale
_kdu
(
5721 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
5723 scaling_info
.pop("kdu-create", None)
5724 scaling_info
.pop("kdu-delete", None)
5726 scale_process
= None
5730 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5732 # SCALE-UP VCA - BEGIN
5733 if vca_scaling_info
:
5734 step
= db_nslcmop_update
[
5736 ] = "Creating new execution environments"
5737 scale_process
= "VCA"
5738 for vca_info
in vca_scaling_info
:
5739 if vca_info
["type"] == "create":
5740 member_vnf_index
= str(vca_info
["member-vnf-index"])
5742 logging_text
+ "vdu info: {}".format(vca_info
)
5744 vnfd_id
= db_vnfr
["vnfd-ref"]
5745 if vca_info
.get("osm_vdu_id"):
5746 vdu_index
= int(vca_info
["vdu_index"])
5747 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
5748 if db_vnfr
.get("additionalParamsForVnf"):
5749 deploy_params
.update(
5751 db_vnfr
["additionalParamsForVnf"].copy()
5754 descriptor_config
= get_configuration(
5755 db_vnfd
, db_vnfd
["id"]
5757 if descriptor_config
:
5762 logging_text
=logging_text
5763 + "member_vnf_index={} ".format(member_vnf_index
),
5766 nslcmop_id
=nslcmop_id
,
5772 member_vnf_index
=member_vnf_index
,
5773 vdu_index
=vdu_index
,
5775 deploy_params
=deploy_params
,
5776 descriptor_config
=descriptor_config
,
5777 base_folder
=base_folder
,
5778 task_instantiation_info
=tasks_dict_info
,
5781 vdu_id
= vca_info
["osm_vdu_id"]
5782 vdur
= find_in_list(
5783 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
5785 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
5786 if vdur
.get("additionalParams"):
5787 deploy_params_vdu
= parse_yaml_strings(
5788 vdur
["additionalParams"]
5791 deploy_params_vdu
= deploy_params
5792 deploy_params_vdu
["OSM"] = get_osm_params(
5793 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
5795 if descriptor_config
:
5800 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5801 member_vnf_index
, vdu_id
, vdu_index
5803 stage
[2] = step
= "Scaling out VCA"
5804 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
5806 logging_text
=logging_text
5807 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5808 member_vnf_index
, vdu_id
, vdu_index
5812 nslcmop_id
=nslcmop_id
,
5818 member_vnf_index
=member_vnf_index
,
5819 vdu_index
=vdu_index
,
5821 deploy_params
=deploy_params_vdu
,
5822 descriptor_config
=descriptor_config
,
5823 base_folder
=base_folder
,
5824 task_instantiation_info
=tasks_dict_info
,
5828 kdu_name
= vca_info
["osm_kdu_id"]
5829 descriptor_config
= get_configuration(db_vnfd
, kdu_name
)
5830 if descriptor_config
:
5832 kdu_index
= int(vca_info
["kdu_index"])
5836 for x
in db_vnfr
["kdur"]
5837 if x
["kdu-name"] == kdu_name
5839 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
5840 if kdur
.get("additionalParams"):
5841 deploy_params_kdu
= parse_yaml_strings(
5842 kdur
["additionalParams"]
5846 logging_text
=logging_text
,
5849 nslcmop_id
=nslcmop_id
,
5855 member_vnf_index
=member_vnf_index
,
5856 vdu_index
=kdu_index
,
5858 deploy_params
=deploy_params_kdu
,
5859 descriptor_config
=descriptor_config
,
5860 base_folder
=base_folder
,
5861 task_instantiation_info
=tasks_dict_info
,
5864 # SCALE-UP VCA - END
5865 scale_process
= None
5868 # execute primitive service POST-SCALING
5869 step
= "Executing post-scale vnf-config-primitive"
5870 if scaling_descriptor
.get("scaling-config-action"):
5871 for scaling_config_action
in scaling_descriptor
[
5872 "scaling-config-action"
5875 scaling_config_action
.get("trigger") == "post-scale-in"
5876 and scaling_type
== "SCALE_IN"
5878 scaling_config_action
.get("trigger") == "post-scale-out"
5879 and scaling_type
== "SCALE_OUT"
5881 vnf_config_primitive
= scaling_config_action
[
5882 "vnf-config-primitive-name-ref"
5884 step
= db_nslcmop_update
[
5886 ] = "executing post-scale scaling-config-action '{}'".format(
5887 vnf_config_primitive
5890 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
5891 if db_vnfr
.get("additionalParamsForVnf"):
5892 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
5894 # look for primitive
5895 for config_primitive
in (
5896 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
5897 ).get("config-primitive", ()):
5898 if config_primitive
["name"] == vnf_config_primitive
:
5902 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
5903 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
5904 "config-primitive".format(
5905 scaling_group
, vnf_config_primitive
5908 scale_process
= "VCA"
5909 db_nsr_update
["config-status"] = "configuring post-scaling"
5910 primitive_params
= self
._map
_primitive
_params
(
5911 config_primitive
, {}, vnfr_params
5914 # Post-scale retry check: Check if this sub-operation has been executed before
5915 op_index
= self
._check
_or
_add
_scale
_suboperation
(
5918 vnf_config_primitive
,
5922 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
5923 # Skip sub-operation
5924 result
= "COMPLETED"
5925 result_detail
= "Done"
5928 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5929 vnf_config_primitive
, result
, result_detail
5933 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
5934 # New sub-operation: Get index of this sub-operation
5936 len(db_nslcmop
.get("_admin", {}).get("operations"))
5941 + "vnf_config_primitive={} New sub-operation".format(
5942 vnf_config_primitive
5946 # retry: Get registered params for this existing sub-operation
5947 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
5950 vnf_index
= op
.get("member_vnf_index")
5951 vnf_config_primitive
= op
.get("primitive")
5952 primitive_params
= op
.get("primitive_params")
5955 + "vnf_config_primitive={} Sub-operation retry".format(
5956 vnf_config_primitive
5959 # Execute the primitive, either with new (first-time) or registered (reintent) args
5960 ee_descriptor_id
= config_primitive
.get(
5961 "execution-environment-ref"
5963 primitive_name
= config_primitive
.get(
5964 "execution-environment-primitive", vnf_config_primitive
5966 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5967 nsr_deployed
["VCA"],
5968 member_vnf_index
=vnf_index
,
5970 vdu_count_index
=None,
5971 ee_descriptor_id
=ee_descriptor_id
,
5973 result
, result_detail
= await self
._ns
_execute
_primitive
(
5982 + "vnf_config_primitive={} Done with result {} {}".format(
5983 vnf_config_primitive
, result
, result_detail
5986 # Update operationState = COMPLETED | FAILED
5987 self
._update
_suboperation
_status
(
5988 db_nslcmop
, op_index
, result
, result_detail
5991 if result
== "FAILED":
5992 raise LcmException(result_detail
)
5993 db_nsr_update
["config-status"] = old_config_status
5994 scale_process
= None
5999 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
6000 db_nsr_update
["operational-status"] = (
6002 if old_operational_status
== "failed"
6003 else old_operational_status
6005 db_nsr_update
["config-status"] = old_config_status
6008 ROclient
.ROClientException
,
6013 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
6015 except asyncio
.CancelledError
:
6017 logging_text
+ "Cancelled Exception while '{}'".format(step
)
6019 exc
= "Operation was cancelled"
6020 except Exception as e
:
6021 exc
= traceback
.format_exc()
6022 self
.logger
.critical(
6023 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6027 self
._write
_ns
_status
(
6030 current_operation
="IDLE",
6031 current_operation_id
=None,
6034 stage
[1] = "Waiting for instantiate pending tasks."
6035 self
.logger
.debug(logging_text
+ stage
[1])
6036 exc
= await self
._wait
_for
_tasks
(
6039 self
.timeout_ns_deploy
,
6047 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
6048 nslcmop_operation_state
= "FAILED"
6050 db_nsr_update
["operational-status"] = old_operational_status
6051 db_nsr_update
["config-status"] = old_config_status
6052 db_nsr_update
["detailed-status"] = ""
6054 if "VCA" in scale_process
:
6055 db_nsr_update
["config-status"] = "failed"
6056 if "RO" in scale_process
:
6057 db_nsr_update
["operational-status"] = "failed"
6060 ] = "FAILED scaling nslcmop={} {}: {}".format(
6061 nslcmop_id
, step
, exc
6064 error_description_nslcmop
= None
6065 nslcmop_operation_state
= "COMPLETED"
6066 db_nslcmop_update
["detailed-status"] = "Done"
6068 self
._write
_op
_status
(
6071 error_message
=error_description_nslcmop
,
6072 operation_state
=nslcmop_operation_state
,
6073 other_update
=db_nslcmop_update
,
6076 self
._write
_ns
_status
(
6079 current_operation
="IDLE",
6080 current_operation_id
=None,
6081 other_update
=db_nsr_update
,
6084 if nslcmop_operation_state
:
6088 "nslcmop_id": nslcmop_id
,
6089 "operationState": nslcmop_operation_state
,
6091 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
6092 except Exception as e
:
6094 logging_text
+ "kafka_write notification Exception {}".format(e
)
6096 self
.logger
.debug(logging_text
+ "Exit")
6097 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
6099 async def _scale_kdu(
6100 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6102 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
6103 for kdu_name
in _scaling_info
:
6104 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
6105 deployed_kdu
, index
= get_deployed_kdu(
6106 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
6108 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
6109 kdu_instance
= deployed_kdu
["kdu-instance"]
6110 scale
= int(kdu_scaling_info
["scale"])
6111 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
6114 "collection": "nsrs",
6115 "filter": {"_id": nsr_id
},
6116 "path": "_admin.deployed.K8s.{}".format(index
),
6119 step
= "scaling application {}".format(
6120 kdu_scaling_info
["resource-name"]
6122 self
.logger
.debug(logging_text
+ step
)
6124 if kdu_scaling_info
["type"] == "delete":
6125 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
6128 and kdu_config
.get("terminate-config-primitive")
6129 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
6131 terminate_config_primitive_list
= kdu_config
.get(
6132 "terminate-config-primitive"
6134 terminate_config_primitive_list
.sort(
6135 key
=lambda val
: int(val
["seq"])
6139 terminate_config_primitive
6140 ) in terminate_config_primitive_list
:
6141 primitive_params_
= self
._map
_primitive
_params
(
6142 terminate_config_primitive
, {}, {}
6144 step
= "execute terminate config primitive"
6145 self
.logger
.debug(logging_text
+ step
)
6146 await asyncio
.wait_for(
6147 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
6148 cluster_uuid
=cluster_uuid
,
6149 kdu_instance
=kdu_instance
,
6150 primitive_name
=terminate_config_primitive
["name"],
6151 params
=primitive_params_
,
6158 await asyncio
.wait_for(
6159 self
.k8scluster_map
[k8s_cluster_type
].scale(
6162 kdu_scaling_info
["resource-name"],
6165 timeout
=self
.timeout_vca_on_error
,
6168 if kdu_scaling_info
["type"] == "create":
6169 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
6172 and kdu_config
.get("initial-config-primitive")
6173 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
6175 initial_config_primitive_list
= kdu_config
.get(
6176 "initial-config-primitive"
6178 initial_config_primitive_list
.sort(
6179 key
=lambda val
: int(val
["seq"])
6182 for initial_config_primitive
in initial_config_primitive_list
:
6183 primitive_params_
= self
._map
_primitive
_params
(
6184 initial_config_primitive
, {}, {}
6186 step
= "execute initial config primitive"
6187 self
.logger
.debug(logging_text
+ step
)
6188 await asyncio
.wait_for(
6189 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
6190 cluster_uuid
=cluster_uuid
,
6191 kdu_instance
=kdu_instance
,
6192 primitive_name
=initial_config_primitive
["name"],
6193 params
=primitive_params_
,
6200 async def _scale_ng_ro(
6201 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
6203 nsr_id
= db_nslcmop
["nsInstanceId"]
6204 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
6207 # read from db: vnfd's for every vnf
6210 # for each vnf in ns, read vnfd
6211 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
6212 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
6213 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
6214 # if we haven't this vnfd, read it from db
6215 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
6217 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
6218 db_vnfds
.append(vnfd
)
6219 n2vc_key
= self
.n2vc
.get_public_key()
6220 n2vc_key_list
= [n2vc_key
]
6223 vdu_scaling_info
.get("vdu-create"),
6224 vdu_scaling_info
.get("vdu-delete"),
6227 # db_vnfr has been updated, update db_vnfrs to use it
6228 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
6229 await self
._instantiate
_ng
_ro
(
6239 start_deploy
=time(),
6240 timeout_ns_deploy
=self
.timeout_ns_deploy
,
6242 if vdu_scaling_info
.get("vdu-delete"):
6244 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
6247 async def add_prometheus_metrics(
6248 self
, ee_id
, artifact_path
, ee_config_descriptor
, vnfr_id
, nsr_id
, target_ip
6250 if not self
.prometheus
:
6252 # look if exist a file called 'prometheus*.j2' and
6253 artifact_content
= self
.fs
.dir_ls(artifact_path
)
6257 for f
in artifact_content
6258 if f
.startswith("prometheus") and f
.endswith(".j2")
6264 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
6268 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
6269 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
6271 vnfr_id
= vnfr_id
.replace("-", "")
6273 "JOB_NAME": vnfr_id
,
6274 "TARGET_IP": target_ip
,
6275 "EXPORTER_POD_IP": host_name
,
6276 "EXPORTER_POD_PORT": host_port
,
6278 job_list
= self
.prometheus
.parse_job(job_data
, variables
)
6279 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
6280 for job
in job_list
:
6282 not isinstance(job
.get("job_name"), str)
6283 or vnfr_id
not in job
["job_name"]
6285 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
6286 job
["nsr_id"] = nsr_id
6287 job_dict
= {jl
["job_name"]: jl
for jl
in job_list
}
6288 if await self
.prometheus
.update(job_dict
):
6289 return list(job_dict
.keys())
6291 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
6293 Get VCA Cloud and VCA Cloud Credentials for the VIM account
6295 :param: vim_account_id: VIM Account ID
6297 :return: (cloud_name, cloud_credential)
6299 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
6300 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
6302 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
6304 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
6306 :param: vim_account_id: VIM Account ID
6308 :return: (cloud_name, cloud_credential)
6310 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
6311 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")