1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 from typing
import Any
, Dict
, List
24 import logging
.handlers
36 from osm_lcm
import ROclient
37 from osm_lcm
.data_utils
.lcm_config
import LcmCfg
38 from osm_lcm
.data_utils
.nsr
import (
41 get_deployed_vca_list
,
44 from osm_lcm
.data_utils
.vca
import (
53 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
54 from osm_lcm
.lcm_utils
import (
61 check_juju_bundle_existence
,
62 get_charm_artifact_path
,
65 from osm_lcm
.data_utils
.nsd
import (
66 get_ns_configuration_relation_list
,
70 from osm_lcm
.data_utils
.vnfd
import (
76 get_ee_sorted_initial_config_primitive_list
,
77 get_ee_sorted_terminate_config_primitive_list
,
79 get_virtual_link_profiles
,
84 get_number_of_instances
,
86 get_kdu_resource_profile
,
87 find_software_version
,
90 from osm_lcm
.data_utils
.list_utils
import find_in_list
91 from osm_lcm
.data_utils
.vnfr
import (
95 get_volumes_from_instantiation_params
,
97 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
98 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
99 from n2vc
.definitions
import RelationEndpoint
100 from n2vc
.k8s_helm_conn
import K8sHelmConnector
101 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
102 from n2vc
.k8s_juju_conn
import K8sJujuConnector
104 from osm_common
.dbbase
import DbException
105 from osm_common
.fsbase
import FsException
107 from osm_lcm
.data_utils
.database
.database
import Database
108 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
109 from osm_lcm
.data_utils
.wim
import (
111 get_target_wim_attrs
,
112 select_feasible_wim_account
,
115 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
116 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
118 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
119 from osm_lcm
.osm_config
import OsmConfigBuilder
120 from osm_lcm
.prometheus
import parse_job
122 from copy
import copy
, deepcopy
123 from time
import time
124 from uuid
import uuid4
126 from random
import randint
128 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
131 class NsLcm(LcmBase
):
132 SUBOPERATION_STATUS_NOT_FOUND
= -1
133 SUBOPERATION_STATUS_NEW
= -2
134 SUBOPERATION_STATUS_SKIP
= -3
135 task_name_deploy_vca
= "Deploying VCA"
137 def __init__(self
, msg
, lcm_tasks
, config
: LcmCfg
, loop
):
139 Init, Connect to database, filesystem storage, and messaging
140 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
143 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
145 self
.db
= Database().instance
.db
146 self
.fs
= Filesystem().instance
.fs
148 self
.lcm_tasks
= lcm_tasks
149 self
.timeout
= config
.timeout
150 self
.ro_config
= config
.RO
151 self
.vca_config
= config
.VCA
153 # create N2VC connector
154 self
.n2vc
= N2VCJujuConnector(
157 on_update_db
=self
._on
_update
_n
2vc
_db
,
162 self
.conn_helm_ee
= LCMHelmConn(
165 vca_config
=self
.vca_config
,
166 on_update_db
=self
._on
_update
_n
2vc
_db
,
169 self
.k8sclusterhelm2
= K8sHelmConnector(
170 kubectl_command
=self
.vca_config
.kubectlpath
,
171 helm_command
=self
.vca_config
.helmpath
,
178 self
.k8sclusterhelm3
= K8sHelm3Connector(
179 kubectl_command
=self
.vca_config
.kubectlpath
,
180 helm_command
=self
.vca_config
.helm3path
,
187 self
.k8sclusterjuju
= K8sJujuConnector(
188 kubectl_command
=self
.vca_config
.kubectlpath
,
189 juju_command
=self
.vca_config
.jujupath
,
192 on_update_db
=self
._on
_update
_k
8s
_db
,
197 self
.k8scluster_map
= {
198 "helm-chart": self
.k8sclusterhelm2
,
199 "helm-chart-v3": self
.k8sclusterhelm3
,
200 "chart": self
.k8sclusterhelm3
,
201 "juju-bundle": self
.k8sclusterjuju
,
202 "juju": self
.k8sclusterjuju
,
206 "lxc_proxy_charm": self
.n2vc
,
207 "native_charm": self
.n2vc
,
208 "k8s_proxy_charm": self
.n2vc
,
209 "helm": self
.conn_helm_ee
,
210 "helm-v3": self
.conn_helm_ee
,
214 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
.to_dict())
216 self
.op_status_map
= {
217 "instantiation": self
.RO
.status
,
218 "termination": self
.RO
.status
,
219 "migrate": self
.RO
.status
,
220 "healing": self
.RO
.recreate_status
,
221 "verticalscale": self
.RO
.status
,
222 "start_stop_rebuild": self
.RO
.status
,
226 def increment_ip_mac(ip_mac
, vm_index
=1):
227 if not isinstance(ip_mac
, str):
230 # try with ipv4 look for last dot
231 i
= ip_mac
.rfind(".")
234 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
235 # try with ipv6 or mac look for last colon. Operate in hex
236 i
= ip_mac
.rfind(":")
239 # format in hex, len can be 2 for mac or 4 for ipv6
240 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
241 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
247 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
249 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
252 # TODO filter RO descriptor fields...
256 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
257 db_dict
["deploymentStatus"] = ro_descriptor
258 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
260 except Exception as e
:
262 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id
, e
)
265 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
267 # remove last dot from path (if exists)
268 if path
.endswith("."):
271 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
272 # .format(table, filter, path, updated_data))
275 nsr_id
= filter.get("_id")
277 # read ns record from database
278 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
279 current_ns_status
= nsr
.get("nsState")
281 # get vca status for NS
282 status_dict
= await self
.n2vc
.get_status(
283 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
288 db_dict
["vcaStatus"] = status_dict
290 # update configurationStatus for this VCA
292 vca_index
= int(path
[path
.rfind(".") + 1 :])
295 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
297 vca_status
= vca_list
[vca_index
].get("status")
299 configuration_status_list
= nsr
.get("configurationStatus")
300 config_status
= configuration_status_list
[vca_index
].get("status")
302 if config_status
== "BROKEN" and vca_status
!= "failed":
303 db_dict
["configurationStatus"][vca_index
] = "READY"
304 elif config_status
!= "BROKEN" and vca_status
== "failed":
305 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
306 except Exception as e
:
307 # not update configurationStatus
308 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
310 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
311 # if nsState = 'DEGRADED' check if all is OK
313 if current_ns_status
in ("READY", "DEGRADED"):
314 error_description
= ""
316 if status_dict
.get("machines"):
317 for machine_id
in status_dict
.get("machines"):
318 machine
= status_dict
.get("machines").get(machine_id
)
319 # check machine agent-status
320 if machine
.get("agent-status"):
321 s
= machine
.get("agent-status").get("status")
324 error_description
+= (
325 "machine {} agent-status={} ; ".format(
329 # check machine instance status
330 if machine
.get("instance-status"):
331 s
= machine
.get("instance-status").get("status")
334 error_description
+= (
335 "machine {} instance-status={} ; ".format(
340 if status_dict
.get("applications"):
341 for app_id
in status_dict
.get("applications"):
342 app
= status_dict
.get("applications").get(app_id
)
343 # check application status
344 if app
.get("status"):
345 s
= app
.get("status").get("status")
348 error_description
+= (
349 "application {} status={} ; ".format(app_id
, s
)
352 if error_description
:
353 db_dict
["errorDescription"] = error_description
354 if current_ns_status
== "READY" and is_degraded
:
355 db_dict
["nsState"] = "DEGRADED"
356 if current_ns_status
== "DEGRADED" and not is_degraded
:
357 db_dict
["nsState"] = "READY"
360 self
.update_db_2("nsrs", nsr_id
, db_dict
)
362 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
364 except Exception as e
:
365 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
367 async def _on_update_k8s_db(
368 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None, cluster_type
="juju"
371 Updating vca status in NSR record
372 :param cluster_uuid: UUID of a k8s cluster
373 :param kdu_instance: The unique name of the KDU instance
374 :param filter: To get nsr_id
375 :cluster_type: The cluster type (juju, k8s)
379 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
380 # .format(cluster_uuid, kdu_instance, filter))
382 nsr_id
= filter.get("_id")
384 vca_status
= await self
.k8scluster_map
[cluster_type
].status_kdu(
385 cluster_uuid
=cluster_uuid
,
386 kdu_instance
=kdu_instance
,
388 complete_status
=True,
394 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
397 f
"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
401 self
.update_db_2("nsrs", nsr_id
, db_dict
)
402 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
404 except Exception as e
:
405 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
408 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
411 undefined
=StrictUndefined
,
412 autoescape
=select_autoescape(default_for_string
=True, default
=True),
414 template
= env
.from_string(cloud_init_text
)
415 return template
.render(additional_params
or {})
416 except UndefinedError
as e
:
418 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
419 "file, must be provided in the instantiation parameters inside the "
420 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
422 except (TemplateError
, TemplateNotFound
) as e
:
424 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
429 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
430 cloud_init_content
= cloud_init_file
= None
432 if vdu
.get("cloud-init-file"):
433 base_folder
= vnfd
["_admin"]["storage"]
434 if base_folder
["pkg-dir"]:
435 cloud_init_file
= "{}/{}/cloud_init/{}".format(
436 base_folder
["folder"],
437 base_folder
["pkg-dir"],
438 vdu
["cloud-init-file"],
441 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
442 base_folder
["folder"],
443 vdu
["cloud-init-file"],
445 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
446 cloud_init_content
= ci_file
.read()
447 elif vdu
.get("cloud-init"):
448 cloud_init_content
= vdu
["cloud-init"]
450 return cloud_init_content
451 except FsException
as e
:
453 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
454 vnfd
["id"], vdu
["id"], cloud_init_file
, e
458 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
460 (vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]), {}
462 additional_params
= vdur
.get("additionalParams")
463 return parse_yaml_strings(additional_params
)
465 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
467 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
468 :param vnfd: input vnfd
469 :param new_id: overrides vnf id if provided
470 :param additionalParams: Instantiation params for VNFs provided
471 :param nsrId: Id of the NSR
472 :return: copy of vnfd
474 vnfd_RO
= deepcopy(vnfd
)
475 # remove unused by RO configuration, monitoring, scaling and internal keys
476 vnfd_RO
.pop("_id", None)
477 vnfd_RO
.pop("_admin", None)
478 vnfd_RO
.pop("monitoring-param", None)
479 vnfd_RO
.pop("scaling-group-descriptor", None)
480 vnfd_RO
.pop("kdu", None)
481 vnfd_RO
.pop("k8s-cluster", None)
483 vnfd_RO
["id"] = new_id
485 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
486 for vdu
in get_iterable(vnfd_RO
, "vdu"):
487 vdu
.pop("cloud-init-file", None)
488 vdu
.pop("cloud-init", None)
492 def ip_profile_2_RO(ip_profile
):
493 RO_ip_profile
= deepcopy(ip_profile
)
494 if "dns-server" in RO_ip_profile
:
495 if isinstance(RO_ip_profile
["dns-server"], list):
496 RO_ip_profile
["dns-address"] = []
497 for ds
in RO_ip_profile
.pop("dns-server"):
498 RO_ip_profile
["dns-address"].append(ds
["address"])
500 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
501 if RO_ip_profile
.get("ip-version") == "ipv4":
502 RO_ip_profile
["ip-version"] = "IPv4"
503 if RO_ip_profile
.get("ip-version") == "ipv6":
504 RO_ip_profile
["ip-version"] = "IPv6"
505 if "dhcp-params" in RO_ip_profile
:
506 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
509 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
510 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
511 if db_vim
["_admin"]["operationalState"] != "ENABLED":
513 "VIM={} is not available. operationalState={}".format(
514 vim_account
, db_vim
["_admin"]["operationalState"]
517 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
520 def get_ro_wim_id_for_wim_account(self
, wim_account
):
521 if isinstance(wim_account
, str):
522 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
523 if db_wim
["_admin"]["operationalState"] != "ENABLED":
525 "WIM={} is not available. operationalState={}".format(
526 wim_account
, db_wim
["_admin"]["operationalState"]
529 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
534 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
536 db_vdu_push_list
= []
538 db_update
= {"_admin.modified": time()}
540 for vdu_id
, vdu_count
in vdu_create
.items():
544 for vdur
in reversed(db_vnfr
["vdur"])
545 if vdur
["vdu-id-ref"] == vdu_id
550 # Read the template saved in the db:
552 "No vdur in the database. Using the vdur-template to scale"
554 vdur_template
= db_vnfr
.get("vdur-template")
555 if not vdur_template
:
557 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
561 vdur
= vdur_template
[0]
562 # Delete a template from the database after using it
565 {"_id": db_vnfr
["_id"]},
567 pull
={"vdur-template": {"_id": vdur
["_id"]}},
569 for count
in range(vdu_count
):
570 vdur_copy
= deepcopy(vdur
)
571 vdur_copy
["status"] = "BUILD"
572 vdur_copy
["status-detailed"] = None
573 vdur_copy
["ip-address"] = None
574 vdur_copy
["_id"] = str(uuid4())
575 vdur_copy
["count-index"] += count
+ 1
576 vdur_copy
["id"] = "{}-{}".format(
577 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
579 vdur_copy
.pop("vim_info", None)
580 for iface
in vdur_copy
["interfaces"]:
581 if iface
.get("fixed-ip"):
582 iface
["ip-address"] = self
.increment_ip_mac(
583 iface
["ip-address"], count
+ 1
586 iface
.pop("ip-address", None)
587 if iface
.get("fixed-mac"):
588 iface
["mac-address"] = self
.increment_ip_mac(
589 iface
["mac-address"], count
+ 1
592 iface
.pop("mac-address", None)
596 ) # only first vdu can be managment of vnf
597 db_vdu_push_list
.append(vdur_copy
)
598 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
600 if len(db_vnfr
["vdur"]) == 1:
601 # The scale will move to 0 instances
603 "Scaling to 0 !, creating the template with the last vdur"
605 template_vdur
= [db_vnfr
["vdur"][0]]
606 for vdu_id
, vdu_count
in vdu_delete
.items():
608 indexes_to_delete
= [
610 for iv
in enumerate(db_vnfr
["vdur"])
611 if iv
[1]["vdu-id-ref"] == vdu_id
615 "vdur.{}.status".format(i
): "DELETING"
616 for i
in indexes_to_delete
[-vdu_count
:]
620 # it must be deleted one by one because common.db does not allow otherwise
623 for v
in reversed(db_vnfr
["vdur"])
624 if v
["vdu-id-ref"] == vdu_id
626 for vdu
in vdus_to_delete
[:vdu_count
]:
629 {"_id": db_vnfr
["_id"]},
631 pull
={"vdur": {"_id": vdu
["_id"]}},
635 db_push
["vdur"] = db_vdu_push_list
637 db_push
["vdur-template"] = template_vdur
640 db_vnfr
["vdur-template"] = template_vdur
641 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
642 # modify passed dictionary db_vnfr
643 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
644 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
646 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
648 Updates database nsr with the RO info for the created vld
649 :param ns_update_nsr: dictionary to be filled with the updated info
650 :param db_nsr: content of db_nsr. This is also modified
651 :param nsr_desc_RO: nsr descriptor from RO
652 :return: Nothing, LcmException is raised on errors
655 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
656 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
657 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
659 vld
["vim-id"] = net_RO
.get("vim_net_id")
660 vld
["name"] = net_RO
.get("vim_name")
661 vld
["status"] = net_RO
.get("status")
662 vld
["status-detailed"] = net_RO
.get("error_msg")
663 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
667 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
670 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
672 for db_vnfr
in db_vnfrs
.values():
673 vnfr_update
= {"status": "ERROR"}
674 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
675 if "status" not in vdur
:
676 vdur
["status"] = "ERROR"
677 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
679 vdur
["status-detailed"] = str(error_text
)
681 "vdur.{}.status-detailed".format(vdu_index
)
683 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
684 except DbException
as e
:
685 self
.logger
.error("Cannot update vnf. {}".format(e
))
687 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
689 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
690 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
691 :param nsr_desc_RO: nsr descriptor from RO
692 :return: Nothing, LcmException is raised on errors
694 for vnf_index
, db_vnfr
in db_vnfrs
.items():
695 for vnf_RO
in nsr_desc_RO
["vnfs"]:
696 if vnf_RO
["member_vnf_index"] != vnf_index
:
699 if vnf_RO
.get("ip_address"):
700 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
[
703 elif not db_vnfr
.get("ip-address"):
704 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
705 raise LcmExceptionNoMgmtIP(
706 "ns member_vnf_index '{}' has no IP address".format(
711 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
712 vdur_RO_count_index
= 0
713 if vdur
.get("pdu-type"):
715 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
716 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
718 if vdur
["count-index"] != vdur_RO_count_index
:
719 vdur_RO_count_index
+= 1
721 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
722 if vdur_RO
.get("ip_address"):
723 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
725 vdur
["ip-address"] = None
726 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
727 vdur
["name"] = vdur_RO
.get("vim_name")
728 vdur
["status"] = vdur_RO
.get("status")
729 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
730 for ifacer
in get_iterable(vdur
, "interfaces"):
731 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
732 if ifacer
["name"] == interface_RO
.get("internal_name"):
733 ifacer
["ip-address"] = interface_RO
.get(
736 ifacer
["mac-address"] = interface_RO
.get(
742 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
743 "from VIM info".format(
744 vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]
747 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
751 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
753 vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]
757 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
758 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
759 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
761 vld
["vim-id"] = net_RO
.get("vim_net_id")
762 vld
["name"] = net_RO
.get("vim_name")
763 vld
["status"] = net_RO
.get("status")
764 vld
["status-detailed"] = net_RO
.get("error_msg")
765 vnfr_update
["vld.{}".format(vld_index
)] = vld
769 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
774 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
779 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
784 def _get_ns_config_info(self
, nsr_id
):
786 Generates a mapping between vnf,vdu elements and the N2VC id
787 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
788 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
789 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
790 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
792 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
793 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
795 ns_config_info
= {"osm-config-mapping": mapping
}
796 for vca
in vca_deployed_list
:
797 if not vca
["member-vnf-index"]:
799 if not vca
["vdu_id"]:
800 mapping
[vca
["member-vnf-index"]] = vca
["application"]
804 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
806 ] = vca
["application"]
807 return ns_config_info
809 async def _instantiate_ng_ro(
826 def get_vim_account(vim_account_id
):
828 if vim_account_id
in db_vims
:
829 return db_vims
[vim_account_id
]
830 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
831 db_vims
[vim_account_id
] = db_vim
834 # modify target_vld info with instantiation parameters
835 def parse_vld_instantiation_params(
836 target_vim
, target_vld
, vld_params
, target_sdn
838 if vld_params
.get("ip-profile"):
839 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
[
842 if vld_params
.get("provider-network"):
843 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
846 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
847 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
851 # check if WIM is needed; if needed, choose a feasible WIM able to connect VIMs
852 # if wim_account_id is specified in vld_params, validate if it is feasible.
853 wim_account_id
, db_wim
= select_feasible_wim_account(
854 db_nsr
, db_vnfrs
, target_vld
, vld_params
, self
.logger
858 # WIM is needed and a feasible one was found, populate WIM target and SDN ports
859 self
.logger
.info("WIM selected: {:s}".format(str(wim_account_id
)))
860 # update vld_params with correct WIM account Id
861 vld_params
["wimAccountId"] = wim_account_id
863 target_wim
= "wim:{}".format(wim_account_id
)
864 target_wim_attrs
= get_target_wim_attrs(nsr_id
, target_vld
, vld_params
)
865 sdn_ports
= get_sdn_ports(vld_params
, db_wim
)
866 if len(sdn_ports
) > 0:
867 target_vld
["vim_info"][target_wim
] = target_wim_attrs
868 target_vld
["vim_info"][target_wim
]["sdn-ports"] = sdn_ports
871 "Target VLD with WIM data: {:s}".format(str(target_vld
))
874 for param
in ("vim-network-name", "vim-network-id"):
875 if vld_params
.get(param
):
876 if isinstance(vld_params
[param
], dict):
877 for vim
, vim_net
in vld_params
[param
].items():
878 other_target_vim
= "vim:" + vim
880 target_vld
["vim_info"],
881 (other_target_vim
, param
.replace("-", "_")),
884 else: # isinstance str
885 target_vld
["vim_info"][target_vim
][
886 param
.replace("-", "_")
887 ] = vld_params
[param
]
888 if vld_params
.get("common_id"):
889 target_vld
["common_id"] = vld_params
.get("common_id")
891 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
892 def update_ns_vld_target(target
, ns_params
):
893 for vnf_params
in ns_params
.get("vnf", ()):
894 if vnf_params
.get("vimAccountId"):
898 for vnfr
in db_vnfrs
.values()
899 if vnf_params
["member-vnf-index"]
900 == vnfr
["member-vnf-index-ref"]
904 vdur
= next((vdur
for vdur
in target_vnf
.get("vdur", ())), None)
907 for a_index
, a_vld
in enumerate(target
["ns"]["vld"]):
908 target_vld
= find_in_list(
909 get_iterable(vdur
, "interfaces"),
910 lambda iface
: iface
.get("ns-vld-id") == a_vld
["name"],
913 vld_params
= find_in_list(
914 get_iterable(ns_params
, "vld"),
915 lambda v_vld
: v_vld
["name"] in (a_vld
["name"], a_vld
["id"]),
919 if vnf_params
.get("vimAccountId") not in a_vld
.get(
922 target_vim_network_list
= [
923 v
for _
, v
in a_vld
.get("vim_info").items()
925 target_vim_network_name
= next(
927 item
.get("vim_network_name", "")
928 for item
in target_vim_network_list
933 target
["ns"]["vld"][a_index
].get("vim_info").update(
935 "vim:{}".format(vnf_params
["vimAccountId"]): {
936 "vim_network_name": target_vim_network_name
,
942 for param
in ("vim-network-name", "vim-network-id"):
943 if vld_params
.get(param
) and isinstance(
944 vld_params
[param
], dict
946 for vim
, vim_net
in vld_params
[
949 other_target_vim
= "vim:" + vim
951 target
["ns"]["vld"][a_index
].get(
956 param
.replace("-", "_"),
961 nslcmop_id
= db_nslcmop
["_id"]
963 "name": db_nsr
["name"],
966 "image": deepcopy(db_nsr
["image"]),
967 "flavor": deepcopy(db_nsr
["flavor"]),
968 "action_id": nslcmop_id
,
969 "cloud_init_content": {},
971 for image
in target
["image"]:
972 image
["vim_info"] = {}
973 for flavor
in target
["flavor"]:
974 flavor
["vim_info"] = {}
975 if db_nsr
.get("affinity-or-anti-affinity-group"):
976 target
["affinity-or-anti-affinity-group"] = deepcopy(
977 db_nsr
["affinity-or-anti-affinity-group"]
979 for affinity_or_anti_affinity_group
in target
[
980 "affinity-or-anti-affinity-group"
982 affinity_or_anti_affinity_group
["vim_info"] = {}
984 if db_nslcmop
.get("lcmOperationType") != "instantiate":
985 # get parameters of instantiation:
986 db_nslcmop_instantiate
= self
.db
.get_list(
989 "nsInstanceId": db_nslcmop
["nsInstanceId"],
990 "lcmOperationType": "instantiate",
993 ns_params
= db_nslcmop_instantiate
.get("operationParams")
995 ns_params
= db_nslcmop
.get("operationParams")
996 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
997 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
1000 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
1001 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
1004 "name": vld
["name"],
1005 "mgmt-network": vld
.get("mgmt-network", False),
1006 "type": vld
.get("type"),
1009 "vim_network_name": vld
.get("vim-network-name"),
1010 "vim_account_id": ns_params
["vimAccountId"],
1014 # check if this network needs SDN assist
1015 if vld
.get("pci-interfaces"):
1016 db_vim
= get_vim_account(ns_params
["vimAccountId"])
1017 if vim_config
:= db_vim
.get("config"):
1018 if sdnc_id
:= vim_config
.get("sdn-controller"):
1019 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
1020 target_sdn
= "sdn:{}".format(sdnc_id
)
1021 target_vld
["vim_info"][target_sdn
] = {
1023 "target_vim": target_vim
,
1025 "type": vld
.get("type"),
1028 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
1029 for nsd_vnf_profile
in nsd_vnf_profiles
:
1030 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
1031 if cp
["virtual-link-profile-id"] == vld
["id"]:
1033 "member_vnf:{}.{}".format(
1034 cp
["constituent-cpd-id"][0][
1035 "constituent-base-element-id"
1037 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
1039 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
1041 # check at nsd descriptor, if there is an ip-profile
1043 nsd_vlp
= find_in_list(
1044 get_virtual_link_profiles(nsd
),
1045 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
1050 and nsd_vlp
.get("virtual-link-protocol-data")
1051 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1053 ip_profile_source_data
= nsd_vlp
["virtual-link-protocol-data"][
1056 ip_profile_dest_data
= {}
1057 if "ip-version" in ip_profile_source_data
:
1058 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1061 if "cidr" in ip_profile_source_data
:
1062 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1065 if "gateway-ip" in ip_profile_source_data
:
1066 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
[
1069 if "dhcp-enabled" in ip_profile_source_data
:
1070 ip_profile_dest_data
["dhcp-params"] = {
1071 "enabled": ip_profile_source_data
["dhcp-enabled"]
1073 vld_params
["ip-profile"] = ip_profile_dest_data
1075 # update vld_params with instantiation params
1076 vld_instantiation_params
= find_in_list(
1077 get_iterable(ns_params
, "vld"),
1078 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
1080 if vld_instantiation_params
:
1081 vld_params
.update(vld_instantiation_params
)
1082 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
1083 target
["ns"]["vld"].append(target_vld
)
1084 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1085 update_ns_vld_target(target
, ns_params
)
1087 for vnfr
in db_vnfrs
.values():
1088 vnfd
= find_in_list(
1089 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
1091 vnf_params
= find_in_list(
1092 get_iterable(ns_params
, "vnf"),
1093 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
1095 target_vnf
= deepcopy(vnfr
)
1096 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
1097 for vld
in target_vnf
.get("vld", ()):
1098 # check if connected to a ns.vld, to fill target'
1099 vnf_cp
= find_in_list(
1100 vnfd
.get("int-virtual-link-desc", ()),
1101 lambda cpd
: cpd
.get("id") == vld
["id"],
1104 ns_cp
= "member_vnf:{}.{}".format(
1105 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
1107 if cp2target
.get(ns_cp
):
1108 vld
["target"] = cp2target
[ns_cp
]
1111 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
1113 # check if this network needs SDN assist
1115 if vld
.get("pci-interfaces"):
1116 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1117 sdnc_id
= db_vim
["config"].get("sdn-controller")
1119 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
1120 target_sdn
= "sdn:{}".format(sdnc_id
)
1121 vld
["vim_info"][target_sdn
] = {
1123 "target_vim": target_vim
,
1125 "type": vld
.get("type"),
1128 # check at vnfd descriptor, if there is an ip-profile
1130 vnfd_vlp
= find_in_list(
1131 get_virtual_link_profiles(vnfd
),
1132 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
1136 and vnfd_vlp
.get("virtual-link-protocol-data")
1137 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1139 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"][
1142 ip_profile_dest_data
= {}
1143 if "ip-version" in ip_profile_source_data
:
1144 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1147 if "cidr" in ip_profile_source_data
:
1148 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1151 if "gateway-ip" in ip_profile_source_data
:
1152 ip_profile_dest_data
[
1154 ] = ip_profile_source_data
["gateway-ip"]
1155 if "dhcp-enabled" in ip_profile_source_data
:
1156 ip_profile_dest_data
["dhcp-params"] = {
1157 "enabled": ip_profile_source_data
["dhcp-enabled"]
1160 vld_params
["ip-profile"] = ip_profile_dest_data
1161 # update vld_params with instantiation params
1163 vld_instantiation_params
= find_in_list(
1164 get_iterable(vnf_params
, "internal-vld"),
1165 lambda i_vld
: i_vld
["name"] == vld
["id"],
1167 if vld_instantiation_params
:
1168 vld_params
.update(vld_instantiation_params
)
1169 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
1172 for vdur
in target_vnf
.get("vdur", ()):
1173 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
1174 continue # This vdu must not be created
1175 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
1177 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
1180 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
1181 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
1184 and vdu_configuration
.get("config-access")
1185 and vdu_configuration
.get("config-access").get("ssh-access")
1187 vdur
["ssh-keys"] = ssh_keys_all
1188 vdur
["ssh-access-required"] = vdu_configuration
[
1190 ]["ssh-access"]["required"]
1193 and vnf_configuration
.get("config-access")
1194 and vnf_configuration
.get("config-access").get("ssh-access")
1195 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
1197 vdur
["ssh-keys"] = ssh_keys_all
1198 vdur
["ssh-access-required"] = vnf_configuration
[
1200 ]["ssh-access"]["required"]
1201 elif ssh_keys_instantiation
and find_in_list(
1202 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
1204 vdur
["ssh-keys"] = ssh_keys_instantiation
1206 self
.logger
.debug("NS > vdur > {}".format(vdur
))
1208 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1210 if vdud
.get("cloud-init-file"):
1211 vdur
["cloud-init"] = "{}:file:{}".format(
1212 vnfd
["_id"], vdud
.get("cloud-init-file")
1214 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1215 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1216 base_folder
= vnfd
["_admin"]["storage"]
1217 if base_folder
["pkg-dir"]:
1218 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1219 base_folder
["folder"],
1220 base_folder
["pkg-dir"],
1221 vdud
.get("cloud-init-file"),
1224 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
1225 base_folder
["folder"],
1226 vdud
.get("cloud-init-file"),
1228 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1229 target
["cloud_init_content"][
1232 elif vdud
.get("cloud-init"):
1233 vdur
["cloud-init"] = "{}:vdu:{}".format(
1234 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1236 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1237 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1240 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1241 deploy_params_vdu
= self
._format
_additional
_params
(
1242 vdur
.get("additionalParams") or {}
1244 deploy_params_vdu
["OSM"] = get_osm_params(
1245 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1247 vdur
["additionalParams"] = deploy_params_vdu
1250 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1251 if target_vim
not in ns_flavor
["vim_info"]:
1252 ns_flavor
["vim_info"][target_vim
] = {}
1255 # in case alternative images are provided we must check if they should be applied
1256 # for the vim_type, modify the vim_type taking into account
1257 ns_image_id
= int(vdur
["ns-image-id"])
1258 if vdur
.get("alt-image-ids"):
1259 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1260 vim_type
= db_vim
["vim_type"]
1261 for alt_image_id
in vdur
.get("alt-image-ids"):
1262 ns_alt_image
= target
["image"][int(alt_image_id
)]
1263 if vim_type
== ns_alt_image
.get("vim-type"):
1264 # must use alternative image
1266 "use alternative image id: {}".format(alt_image_id
)
1268 ns_image_id
= alt_image_id
1269 vdur
["ns-image-id"] = ns_image_id
1271 ns_image
= target
["image"][int(ns_image_id
)]
1272 if target_vim
not in ns_image
["vim_info"]:
1273 ns_image
["vim_info"][target_vim
] = {}
1276 if vdur
.get("affinity-or-anti-affinity-group-id"):
1277 for ags_id
in vdur
["affinity-or-anti-affinity-group-id"]:
1278 ns_ags
= target
["affinity-or-anti-affinity-group"][int(ags_id
)]
1279 if target_vim
not in ns_ags
["vim_info"]:
1280 ns_ags
["vim_info"][target_vim
] = {}
1282 vdur
["vim_info"] = {target_vim
: {}}
1283 # instantiation parameters
1285 vdu_instantiation_params
= find_in_list(
1286 get_iterable(vnf_params
, "vdu"),
1287 lambda i_vdu
: i_vdu
["id"] == vdud
["id"],
1289 if vdu_instantiation_params
:
1290 # Parse the vdu_volumes from the instantiation params
1291 vdu_volumes
= get_volumes_from_instantiation_params(
1292 vdu_instantiation_params
, vdud
1294 vdur
["additionalParams"]["OSM"]["vdu_volumes"] = vdu_volumes
1295 vdur_list
.append(vdur
)
1296 target_vnf
["vdur"] = vdur_list
1297 target
["vnf"].append(target_vnf
)
1299 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
1300 desc
= await self
.RO
.deploy(nsr_id
, target
)
1301 self
.logger
.debug("RO return > {}".format(desc
))
1302 action_id
= desc
["action_id"]
1303 await self
._wait
_ng
_ro
(
1310 operation
="instantiation",
1315 "_admin.deployed.RO.operational-status": "running",
1316 "detailed-status": " ".join(stage
),
1318 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1319 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1320 self
._write
_op
_status
(nslcmop_id
, stage
)
1322 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1326 async def _wait_ng_ro(
1336 detailed_status_old
= None
1338 start_time
= start_time
or time()
1339 while time() <= start_time
+ timeout
:
1340 desc_status
= await self
.op_status_map
[operation
](nsr_id
, action_id
)
1341 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1342 if desc_status
["status"] == "FAILED":
1343 raise NgRoException(desc_status
["details"])
1344 elif desc_status
["status"] == "BUILD":
1346 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1347 elif desc_status
["status"] == "DONE":
1349 stage
[2] = "Deployed at VIM"
1352 assert False, "ROclient.check_ns_status returns unknown {}".format(
1353 desc_status
["status"]
1355 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1356 detailed_status_old
= stage
[2]
1357 db_nsr_update
["detailed-status"] = " ".join(stage
)
1358 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1359 self
._write
_op
_status
(nslcmop_id
, stage
)
1360 await asyncio
.sleep(15, loop
=self
.loop
)
1361 else: # timeout_ns_deploy
1362 raise NgRoException("Timeout waiting ns to deploy")
1364 async def _terminate_ng_ro(
1365 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1370 start_deploy
= time()
1377 "action_id": nslcmop_id
,
1379 desc
= await self
.RO
.deploy(nsr_id
, target
)
1380 action_id
= desc
["action_id"]
1381 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1384 + "ns terminate action at RO. action_id={}".format(action_id
)
1388 delete_timeout
= 20 * 60 # 20 minutes
1389 await self
._wait
_ng
_ro
(
1396 operation
="termination",
1398 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1400 await self
.RO
.delete(nsr_id
)
1401 except NgRoException
as e
:
1402 if e
.http_code
== 404: # not found
1403 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1404 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1406 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1408 elif e
.http_code
== 409: # conflict
1409 failed_detail
.append("delete conflict: {}".format(e
))
1412 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1415 failed_detail
.append("delete error: {}".format(e
))
1418 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1420 except Exception as e
:
1421 failed_detail
.append("delete error: {}".format(e
))
1423 logging_text
+ "RO_action_id={} delete error: {}".format(action_id
, e
)
1427 stage
[2] = "Error deleting from VIM"
1429 stage
[2] = "Deleted from VIM"
1430 db_nsr_update
["detailed-status"] = " ".join(stage
)
1431 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1432 self
._write
_op
_status
(nslcmop_id
, stage
)
1435 raise LcmException("; ".join(failed_detail
))
1438 async def instantiate_RO(
1452 :param logging_text: preffix text to use at logging
1453 :param nsr_id: nsr identity
1454 :param nsd: database content of ns descriptor
1455 :param db_nsr: database content of ns record
1456 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1458 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1459 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1460 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1461 :return: None or exception
1464 start_deploy
= time()
1465 ns_params
= db_nslcmop
.get("operationParams")
1466 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1467 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1469 timeout_ns_deploy
= self
.timeout
.ns_deploy
1471 # Check for and optionally request placement optimization. Database will be updated if placement activated
1472 stage
[2] = "Waiting for Placement."
1473 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1474 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1475 for vnfr
in db_vnfrs
.values():
1476 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1479 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1481 return await self
._instantiate
_ng
_ro
(
1494 except Exception as e
:
1495 stage
[2] = "ERROR deploying at VIM"
1496 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1498 "Error deploying at VIM {}".format(e
),
1499 exc_info
=not isinstance(
1502 ROclient
.ROClientException
,
1511 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1513 Wait for kdu to be up, get ip address
1514 :param logging_text: prefix use for logging
1518 :return: IP address, K8s services
1521 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1524 while nb_tries
< 360:
1525 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1529 for x
in get_iterable(db_vnfr
, "kdur")
1530 if x
.get("kdu-name") == kdu_name
1536 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1538 if kdur
.get("status"):
1539 if kdur
["status"] in ("READY", "ENABLED"):
1540 return kdur
.get("ip-address"), kdur
.get("services")
1543 "target KDU={} is in error state".format(kdu_name
)
1546 await asyncio
.sleep(10, loop
=self
.loop
)
1548 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1550 async def wait_vm_up_insert_key_ro(
1551 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1554 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1555 :param logging_text: prefix use for logging
1560 :param pub_key: public ssh key to inject, None to skip
1561 :param user: user to apply the public ssh key
1565 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1567 target_vdu_id
= None
1573 if ro_retries
>= 360: # 1 hour
1575 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1578 await asyncio
.sleep(10, loop
=self
.loop
)
1581 if not target_vdu_id
:
1582 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1584 if not vdu_id
: # for the VNF case
1585 if db_vnfr
.get("status") == "ERROR":
1587 "Cannot inject ssh-key because target VNF is in error state"
1589 ip_address
= db_vnfr
.get("ip-address")
1595 for x
in get_iterable(db_vnfr
, "vdur")
1596 if x
.get("ip-address") == ip_address
1604 for x
in get_iterable(db_vnfr
, "vdur")
1605 if x
.get("vdu-id-ref") == vdu_id
1606 and x
.get("count-index") == vdu_index
1612 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1613 ): # If only one, this should be the target vdu
1614 vdur
= db_vnfr
["vdur"][0]
1617 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1618 vnfr_id
, vdu_id
, vdu_index
1621 # New generation RO stores information at "vim_info"
1624 if vdur
.get("vim_info"):
1626 t
for t
in vdur
["vim_info"]
1627 ) # there should be only one key
1628 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1630 vdur
.get("pdu-type")
1631 or vdur
.get("status") == "ACTIVE"
1632 or ng_ro_status
== "ACTIVE"
1634 ip_address
= vdur
.get("ip-address")
1637 target_vdu_id
= vdur
["vdu-id-ref"]
1638 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1640 "Cannot inject ssh-key because target VM is in error state"
1643 if not target_vdu_id
:
1646 # inject public key into machine
1647 if pub_key
and user
:
1648 self
.logger
.debug(logging_text
+ "Inserting RO key")
1649 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1650 if vdur
.get("pdu-type"):
1651 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1656 "action": "inject_ssh_key",
1660 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1662 desc
= await self
.RO
.deploy(nsr_id
, target
)
1663 action_id
= desc
["action_id"]
1664 await self
._wait
_ng
_ro
(
1665 nsr_id
, action_id
, timeout
=600, operation
="instantiation"
1668 except NgRoException
as e
:
1670 "Reaching max tries injecting key. Error: {}".format(e
)
1677 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1679 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1681 my_vca
= vca_deployed_list
[vca_index
]
1682 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1683 # vdu or kdu: no dependencies
1687 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1688 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1689 configuration_status_list
= db_nsr
["configurationStatus"]
1690 for index
, vca_deployed
in enumerate(configuration_status_list
):
1691 if index
== vca_index
:
1694 if not my_vca
.get("member-vnf-index") or (
1695 vca_deployed
.get("member-vnf-index")
1696 == my_vca
.get("member-vnf-index")
1698 internal_status
= configuration_status_list
[index
].get("status")
1699 if internal_status
== "READY":
1701 elif internal_status
== "BROKEN":
1703 "Configuration aborted because dependent charm/s has failed"
1708 # no dependencies, return
1710 await asyncio
.sleep(10)
1713 raise LcmException("Configuration aborted because dependent charm/s timeout")
1715 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1718 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1720 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1721 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1724 async def instantiate_N2VC(
1742 ee_config_descriptor
,
1744 nsr_id
= db_nsr
["_id"]
1745 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1746 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1747 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1748 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1750 "collection": "nsrs",
1751 "filter": {"_id": nsr_id
},
1752 "path": db_update_entry
,
1758 element_under_configuration
= nsr_id
1762 vnfr_id
= db_vnfr
["_id"]
1763 osm_config
["osm"]["vnf_id"] = vnfr_id
1765 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1767 if vca_type
== "native_charm":
1770 index_number
= vdu_index
or 0
1773 element_type
= "VNF"
1774 element_under_configuration
= vnfr_id
1775 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1777 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1778 element_type
= "VDU"
1779 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1780 osm_config
["osm"]["vdu_id"] = vdu_id
1782 namespace
+= ".{}".format(kdu_name
)
1783 element_type
= "KDU"
1784 element_under_configuration
= kdu_name
1785 osm_config
["osm"]["kdu_name"] = kdu_name
1788 if base_folder
["pkg-dir"]:
1789 artifact_path
= "{}/{}/{}/{}".format(
1790 base_folder
["folder"],
1791 base_folder
["pkg-dir"],
1794 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1799 artifact_path
= "{}/Scripts/{}/{}/".format(
1800 base_folder
["folder"],
1803 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1808 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1810 # get initial_config_primitive_list that applies to this element
1811 initial_config_primitive_list
= config_descriptor
.get(
1812 "initial-config-primitive"
1816 "Initial config primitive list > {}".format(
1817 initial_config_primitive_list
1821 # add config if not present for NS charm
1822 ee_descriptor_id
= ee_config_descriptor
.get("id")
1823 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1824 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1825 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1829 "Initial config primitive list #2 > {}".format(
1830 initial_config_primitive_list
1833 # n2vc_redesign STEP 3.1
1834 # find old ee_id if exists
1835 ee_id
= vca_deployed
.get("ee_id")
1837 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1838 # create or register execution environment in VCA
1839 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1841 self
._write
_configuration
_status
(
1843 vca_index
=vca_index
,
1845 element_under_configuration
=element_under_configuration
,
1846 element_type
=element_type
,
1849 step
= "create execution environment"
1850 self
.logger
.debug(logging_text
+ step
)
1854 if vca_type
== "k8s_proxy_charm":
1855 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1856 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1857 namespace
=namespace
,
1858 artifact_path
=artifact_path
,
1862 elif vca_type
== "helm" or vca_type
== "helm-v3":
1863 ee_id
, credentials
= await self
.vca_map
[
1865 ].create_execution_environment(
1866 namespace
=namespace
,
1870 artifact_path
=artifact_path
,
1871 chart_model
=vca_name
,
1875 ee_id
, credentials
= await self
.vca_map
[
1877 ].create_execution_environment(
1878 namespace
=namespace
,
1884 elif vca_type
== "native_charm":
1885 step
= "Waiting to VM being up and getting IP address"
1886 self
.logger
.debug(logging_text
+ step
)
1887 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1896 credentials
= {"hostname": rw_mgmt_ip
}
1898 username
= deep_get(
1899 config_descriptor
, ("config-access", "ssh-access", "default-user")
1901 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1902 # merged. Meanwhile let's get username from initial-config-primitive
1903 if not username
and initial_config_primitive_list
:
1904 for config_primitive
in initial_config_primitive_list
:
1905 for param
in config_primitive
.get("parameter", ()):
1906 if param
["name"] == "ssh-username":
1907 username
= param
["value"]
1911 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1912 "'config-access.ssh-access.default-user'"
1914 credentials
["username"] = username
1915 # n2vc_redesign STEP 3.2
1917 self
._write
_configuration
_status
(
1919 vca_index
=vca_index
,
1920 status
="REGISTERING",
1921 element_under_configuration
=element_under_configuration
,
1922 element_type
=element_type
,
1925 step
= "register execution environment {}".format(credentials
)
1926 self
.logger
.debug(logging_text
+ step
)
1927 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1928 credentials
=credentials
,
1929 namespace
=namespace
,
1934 # for compatibility with MON/POL modules, the need model and application name at database
1935 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1936 ee_id_parts
= ee_id
.split(".")
1937 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1938 if len(ee_id_parts
) >= 2:
1939 model_name
= ee_id_parts
[0]
1940 application_name
= ee_id_parts
[1]
1941 db_nsr_update
[db_update_entry
+ "model"] = model_name
1942 db_nsr_update
[db_update_entry
+ "application"] = application_name
1944 # n2vc_redesign STEP 3.3
1945 step
= "Install configuration Software"
1947 self
._write
_configuration
_status
(
1949 vca_index
=vca_index
,
1950 status
="INSTALLING SW",
1951 element_under_configuration
=element_under_configuration
,
1952 element_type
=element_type
,
1953 other_update
=db_nsr_update
,
1956 # TODO check if already done
1957 self
.logger
.debug(logging_text
+ step
)
1959 if vca_type
== "native_charm":
1960 config_primitive
= next(
1961 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
1964 if config_primitive
:
1965 config
= self
._map
_primitive
_params
(
1966 config_primitive
, {}, deploy_params
1969 if vca_type
== "lxc_proxy_charm":
1970 if element_type
== "NS":
1971 num_units
= db_nsr
.get("config-units") or 1
1972 elif element_type
== "VNF":
1973 num_units
= db_vnfr
.get("config-units") or 1
1974 elif element_type
== "VDU":
1975 for v
in db_vnfr
["vdur"]:
1976 if vdu_id
== v
["vdu-id-ref"]:
1977 num_units
= v
.get("config-units") or 1
1979 if vca_type
!= "k8s_proxy_charm":
1980 await self
.vca_map
[vca_type
].install_configuration_sw(
1982 artifact_path
=artifact_path
,
1985 num_units
=num_units
,
1990 # write in db flag of configuration_sw already installed
1992 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
1995 # add relations for this VCA (wait for other peers related with this VCA)
1996 is_relation_added
= await self
._add
_vca
_relations
(
1997 logging_text
=logging_text
,
2000 vca_index
=vca_index
,
2003 if not is_relation_added
:
2004 raise LcmException("Relations could not be added to VCA.")
2006 # if SSH access is required, then get execution environment SSH public
2007 # if native charm we have waited already to VM be UP
2008 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
2011 # self.logger.debug("get ssh key block")
2013 config_descriptor
, ("config-access", "ssh-access", "required")
2015 # self.logger.debug("ssh key needed")
2016 # Needed to inject a ssh key
2019 ("config-access", "ssh-access", "default-user"),
2021 step
= "Install configuration Software, getting public ssh key"
2022 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
2023 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
2026 step
= "Insert public key into VM user={} ssh_key={}".format(
2030 # self.logger.debug("no need to get ssh key")
2031 step
= "Waiting to VM being up and getting IP address"
2032 self
.logger
.debug(logging_text
+ step
)
2034 # default rw_mgmt_ip to None, avoiding the non definition of the variable
2037 # n2vc_redesign STEP 5.1
2038 # wait for RO (ip-address) Insert pub_key into VM
2041 rw_mgmt_ip
, services
= await self
.wait_kdu_up(
2042 logging_text
, nsr_id
, vnfr_id
, kdu_name
2044 vnfd
= self
.db
.get_one(
2046 {"_id": f
'{db_vnfr["vnfd-id"]}:{db_vnfr["revision"]}'},
2048 kdu
= get_kdu(vnfd
, kdu_name
)
2050 service
["name"] for service
in get_kdu_services(kdu
)
2052 exposed_services
= []
2053 for service
in services
:
2054 if any(s
in service
["name"] for s
in kdu_services
):
2055 exposed_services
.append(service
)
2056 await self
.vca_map
[vca_type
].exec_primitive(
2058 primitive_name
="config",
2060 "osm-config": json
.dumps(
2062 k8s
={"services": exposed_services
}
2069 # This verification is needed in order to avoid trying to add a public key
2070 # to a VM, when the VNF is a KNF (in the edge case where the user creates a VCA
2071 # for a KNF and not for its KDUs, the previous verification gives False, and the code
2072 # jumps to this block, meaning that there is the need to verify if the VNF is actually a VNF
2074 elif db_vnfr
.get("vdur"):
2075 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
2085 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
2087 # store rw_mgmt_ip in deploy params for later replacement
2088 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
2090 # n2vc_redesign STEP 6 Execute initial config primitive
2091 step
= "execute initial config primitive"
2093 # wait for dependent primitives execution (NS -> VNF -> VDU)
2094 if initial_config_primitive_list
:
2095 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
2097 # stage, in function of element type: vdu, kdu, vnf or ns
2098 my_vca
= vca_deployed_list
[vca_index
]
2099 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
2101 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
2102 elif my_vca
.get("member-vnf-index"):
2104 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
2107 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
2109 self
._write
_configuration
_status
(
2110 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
2113 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2115 check_if_terminated_needed
= True
2116 for initial_config_primitive
in initial_config_primitive_list
:
2117 # adding information on the vca_deployed if it is a NS execution environment
2118 if not vca_deployed
["member-vnf-index"]:
2119 deploy_params
["ns_config_info"] = json
.dumps(
2120 self
._get
_ns
_config
_info
(nsr_id
)
2122 # TODO check if already done
2123 primitive_params_
= self
._map
_primitive
_params
(
2124 initial_config_primitive
, {}, deploy_params
2127 step
= "execute primitive '{}' params '{}'".format(
2128 initial_config_primitive
["name"], primitive_params_
2130 self
.logger
.debug(logging_text
+ step
)
2131 await self
.vca_map
[vca_type
].exec_primitive(
2133 primitive_name
=initial_config_primitive
["name"],
2134 params_dict
=primitive_params_
,
2139 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2140 if check_if_terminated_needed
:
2141 if config_descriptor
.get("terminate-config-primitive"):
2143 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
2145 check_if_terminated_needed
= False
2147 # TODO register in database that primitive is done
2149 # STEP 7 Configure metrics
2150 if vca_type
== "helm" or vca_type
== "helm-v3":
2151 # TODO: review for those cases where the helm chart is a reference and
2152 # is not part of the NF package
2153 prometheus_jobs
= await self
.extract_prometheus_scrape_jobs(
2155 artifact_path
=artifact_path
,
2156 ee_config_descriptor
=ee_config_descriptor
,
2159 target_ip
=rw_mgmt_ip
,
2160 vnf_member_index
=db_vnfr
.get("member-vnf-index-ref", ""),
2162 vdu_index
=vdu_index
,
2164 kdu_index
=kdu_index
,
2170 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
2173 for job
in prometheus_jobs
:
2176 {"job_name": job
["job_name"]},
2179 fail_on_empty
=False,
2182 step
= "instantiated at VCA"
2183 self
.logger
.debug(logging_text
+ step
)
2185 self
._write
_configuration
_status
(
2186 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
2189 except Exception as e
: # TODO not use Exception but N2VC exception
2190 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2192 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
2195 "Exception while {} : {}".format(step
, e
), exc_info
=True
2197 self
._write
_configuration
_status
(
2198 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2200 raise LcmException("{}. {}".format(step
, e
)) from e
2202 def _write_ns_status(
2206 current_operation
: str,
2207 current_operation_id
: str,
2208 error_description
: str = None,
2209 error_detail
: str = None,
2210 other_update
: dict = None,
2213 Update db_nsr fields.
2216 :param current_operation:
2217 :param current_operation_id:
2218 :param error_description:
2219 :param error_detail:
2220 :param other_update: Other required changes at database if provided, will be cleared
2224 db_dict
= other_update
or {}
2227 ] = current_operation_id
# for backward compatibility
2228 db_dict
["_admin.current-operation"] = current_operation_id
2229 db_dict
["_admin.operation-type"] = (
2230 current_operation
if current_operation
!= "IDLE" else None
2232 db_dict
["currentOperation"] = current_operation
2233 db_dict
["currentOperationID"] = current_operation_id
2234 db_dict
["errorDescription"] = error_description
2235 db_dict
["errorDetail"] = error_detail
2238 db_dict
["nsState"] = ns_state
2239 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2240 except DbException
as e
:
2241 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2243 def _write_op_status(
2247 error_message
: str = None,
2248 queuePosition
: int = 0,
2249 operation_state
: str = None,
2250 other_update
: dict = None,
2253 db_dict
= other_update
or {}
2254 db_dict
["queuePosition"] = queuePosition
2255 if isinstance(stage
, list):
2256 db_dict
["stage"] = stage
[0]
2257 db_dict
["detailed-status"] = " ".join(stage
)
2258 elif stage
is not None:
2259 db_dict
["stage"] = str(stage
)
2261 if error_message
is not None:
2262 db_dict
["errorMessage"] = error_message
2263 if operation_state
is not None:
2264 db_dict
["operationState"] = operation_state
2265 db_dict
["statusEnteredTime"] = time()
2266 self
.update_db_2("nslcmops", op_id
, db_dict
)
2267 except DbException
as e
:
2269 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2272 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2274 nsr_id
= db_nsr
["_id"]
2275 # configurationStatus
2276 config_status
= db_nsr
.get("configurationStatus")
2279 "configurationStatus.{}.status".format(index
): status
2280 for index
, v
in enumerate(config_status
)
2284 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2286 except DbException
as e
:
2288 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2291 def _write_configuration_status(
2296 element_under_configuration
: str = None,
2297 element_type
: str = None,
2298 other_update
: dict = None,
2301 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2302 # .format(vca_index, status))
2305 db_path
= "configurationStatus.{}.".format(vca_index
)
2306 db_dict
= other_update
or {}
2308 db_dict
[db_path
+ "status"] = status
2309 if element_under_configuration
:
2311 db_path
+ "elementUnderConfiguration"
2312 ] = element_under_configuration
2314 db_dict
[db_path
+ "elementType"] = element_type
2315 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2316 except DbException
as e
:
2318 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2319 status
, nsr_id
, vca_index
, e
2323 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2325 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2326 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2327 Database is used because the result can be obtained from a different LCM worker in case of HA.
2328 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2329 :param db_nslcmop: database content of nslcmop
2330 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2331 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2332 computed 'vim-account-id'
2335 nslcmop_id
= db_nslcmop
["_id"]
2336 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2337 if placement_engine
== "PLA":
2339 logging_text
+ "Invoke and wait for placement optimization"
2341 await self
.msg
.aiowrite(
2342 "pla", "get_placement", {"nslcmopId": nslcmop_id
}, loop
=self
.loop
2344 db_poll_interval
= 5
2345 wait
= db_poll_interval
* 10
2347 while not pla_result
and wait
>= 0:
2348 await asyncio
.sleep(db_poll_interval
)
2349 wait
-= db_poll_interval
2350 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2351 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2355 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2358 for pla_vnf
in pla_result
["vnf"]:
2359 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2360 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2365 {"_id": vnfr
["_id"]},
2366 {"vim-account-id": pla_vnf
["vimAccountId"]},
2369 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2372 def update_nsrs_with_pla_result(self
, params
):
2374 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2376 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2378 except Exception as e
:
2379 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2381 async def instantiate(self
, nsr_id
, nslcmop_id
):
2384 :param nsr_id: ns instance to deploy
2385 :param nslcmop_id: operation to run
2389 # Try to lock HA task here
2390 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2391 if not task_is_locked_by_me
:
2393 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2397 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2398 self
.logger
.debug(logging_text
+ "Enter")
2400 # get all needed from database
2402 # database nsrs record
2405 # database nslcmops record
2408 # update operation on nsrs
2410 # update operation on nslcmops
2411 db_nslcmop_update
= {}
2413 timeout_ns_deploy
= self
.timeout
.ns_deploy
2415 nslcmop_operation_state
= None
2416 db_vnfrs
= {} # vnf's info indexed by member-index
2418 tasks_dict_info
= {} # from task to info text
2422 "Stage 1/5: preparation of the environment.",
2423 "Waiting for previous operations to terminate.",
2426 # ^ stage, step, VIM progress
2428 # wait for any previous tasks in process
2429 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2431 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2432 stage
[1] = "Reading from database."
2433 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2434 db_nsr_update
["detailed-status"] = "creating"
2435 db_nsr_update
["operational-status"] = "init"
2436 self
._write
_ns
_status
(
2438 ns_state
="BUILDING",
2439 current_operation
="INSTANTIATING",
2440 current_operation_id
=nslcmop_id
,
2441 other_update
=db_nsr_update
,
2443 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2445 # read from db: operation
2446 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2447 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2448 if db_nslcmop
["operationParams"].get("additionalParamsForVnf"):
2449 db_nslcmop
["operationParams"]["additionalParamsForVnf"] = json
.loads(
2450 db_nslcmop
["operationParams"]["additionalParamsForVnf"]
2452 ns_params
= db_nslcmop
.get("operationParams")
2453 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2454 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2457 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2458 self
.logger
.debug(logging_text
+ stage
[1])
2459 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2460 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2461 self
.logger
.debug(logging_text
+ stage
[1])
2462 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2463 self
.fs
.sync(db_nsr
["nsd-id"])
2465 # nsr_name = db_nsr["name"] # TODO short-name??
2467 # read from db: vnf's of this ns
2468 stage
[1] = "Getting vnfrs from db."
2469 self
.logger
.debug(logging_text
+ stage
[1])
2470 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2472 # read from db: vnfd's for every vnf
2473 db_vnfds
= [] # every vnfd data
2475 # for each vnf in ns, read vnfd
2476 for vnfr
in db_vnfrs_list
:
2477 if vnfr
.get("kdur"):
2479 for kdur
in vnfr
["kdur"]:
2480 if kdur
.get("additionalParams"):
2481 kdur
["additionalParams"] = json
.loads(
2482 kdur
["additionalParams"]
2484 kdur_list
.append(kdur
)
2485 vnfr
["kdur"] = kdur_list
2487 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2488 vnfd_id
= vnfr
["vnfd-id"]
2489 vnfd_ref
= vnfr
["vnfd-ref"]
2490 self
.fs
.sync(vnfd_id
)
2492 # if we haven't this vnfd, read it from db
2493 if vnfd_id
not in db_vnfds
:
2495 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2498 self
.logger
.debug(logging_text
+ stage
[1])
2499 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2502 db_vnfds
.append(vnfd
)
2504 # Get or generates the _admin.deployed.VCA list
2505 vca_deployed_list
= None
2506 if db_nsr
["_admin"].get("deployed"):
2507 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2508 if vca_deployed_list
is None:
2509 vca_deployed_list
= []
2510 configuration_status_list
= []
2511 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2512 db_nsr_update
["configurationStatus"] = configuration_status_list
2513 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2514 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2515 elif isinstance(vca_deployed_list
, dict):
2516 # maintain backward compatibility. Change a dict to list at database
2517 vca_deployed_list
= list(vca_deployed_list
.values())
2518 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2519 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2522 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2524 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2525 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2527 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2528 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2529 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2531 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2534 # n2vc_redesign STEP 2 Deploy Network Scenario
2535 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2536 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2538 stage
[1] = "Deploying KDUs."
2539 # self.logger.debug(logging_text + "Before deploy_kdus")
2540 # Call to deploy_kdus in case exists the "vdu:kdu" param
2541 await self
.deploy_kdus(
2542 logging_text
=logging_text
,
2544 nslcmop_id
=nslcmop_id
,
2547 task_instantiation_info
=tasks_dict_info
,
2550 stage
[1] = "Getting VCA public key."
2551 # n2vc_redesign STEP 1 Get VCA public ssh-key
2552 # feature 1429. Add n2vc public key to needed VMs
2553 n2vc_key
= self
.n2vc
.get_public_key()
2554 n2vc_key_list
= [n2vc_key
]
2555 if self
.vca_config
.public_key
:
2556 n2vc_key_list
.append(self
.vca_config
.public_key
)
2558 stage
[1] = "Deploying NS at VIM."
2559 task_ro
= asyncio
.ensure_future(
2560 self
.instantiate_RO(
2561 logging_text
=logging_text
,
2565 db_nslcmop
=db_nslcmop
,
2568 n2vc_key_list
=n2vc_key_list
,
2572 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2573 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2575 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2576 stage
[1] = "Deploying Execution Environments."
2577 self
.logger
.debug(logging_text
+ stage
[1])
2579 # create namespace and certificate if any helm based EE is present in the NS
2580 if check_helm_ee_in_ns(db_vnfds
):
2581 # TODO: create EE namespace
2582 # create TLS certificates
2583 await self
.vca_map
["helm-v3"].create_tls_certificate(
2584 secret_name
="ee-tls-{}".format(nsr_id
),
2587 usage
="server auth",
2590 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2591 for vnf_profile
in get_vnf_profiles(nsd
):
2592 vnfd_id
= vnf_profile
["vnfd-id"]
2593 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2594 member_vnf_index
= str(vnf_profile
["id"])
2595 db_vnfr
= db_vnfrs
[member_vnf_index
]
2596 base_folder
= vnfd
["_admin"]["storage"]
2603 # Get additional parameters
2604 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2605 if db_vnfr
.get("additionalParamsForVnf"):
2606 deploy_params
.update(
2607 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2610 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2611 if descriptor_config
:
2613 logging_text
=logging_text
2614 + "member_vnf_index={} ".format(member_vnf_index
),
2617 nslcmop_id
=nslcmop_id
,
2623 member_vnf_index
=member_vnf_index
,
2624 vdu_index
=vdu_index
,
2625 kdu_index
=kdu_index
,
2627 deploy_params
=deploy_params
,
2628 descriptor_config
=descriptor_config
,
2629 base_folder
=base_folder
,
2630 task_instantiation_info
=tasks_dict_info
,
2634 # Deploy charms for each VDU that supports one.
2635 for vdud
in get_vdu_list(vnfd
):
2637 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2638 vdur
= find_in_list(
2639 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2642 if vdur
.get("additionalParams"):
2643 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2645 deploy_params_vdu
= deploy_params
2646 deploy_params_vdu
["OSM"] = get_osm_params(
2647 db_vnfr
, vdu_id
, vdu_count_index
=0
2649 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2651 self
.logger
.debug("VDUD > {}".format(vdud
))
2653 "Descriptor config > {}".format(descriptor_config
)
2655 if descriptor_config
:
2659 for vdu_index
in range(vdud_count
):
2660 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2662 logging_text
=logging_text
2663 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2664 member_vnf_index
, vdu_id
, vdu_index
2668 nslcmop_id
=nslcmop_id
,
2674 kdu_index
=kdu_index
,
2675 member_vnf_index
=member_vnf_index
,
2676 vdu_index
=vdu_index
,
2678 deploy_params
=deploy_params_vdu
,
2679 descriptor_config
=descriptor_config
,
2680 base_folder
=base_folder
,
2681 task_instantiation_info
=tasks_dict_info
,
2684 for kdud
in get_kdu_list(vnfd
):
2685 kdu_name
= kdud
["name"]
2686 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2687 if descriptor_config
:
2691 kdu_index
, kdur
= next(
2693 for x
in enumerate(db_vnfr
["kdur"])
2694 if x
[1]["kdu-name"] == kdu_name
2696 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2697 if kdur
.get("additionalParams"):
2698 deploy_params_kdu
.update(
2699 parse_yaml_strings(kdur
["additionalParams"].copy())
2703 logging_text
=logging_text
,
2706 nslcmop_id
=nslcmop_id
,
2712 member_vnf_index
=member_vnf_index
,
2713 vdu_index
=vdu_index
,
2714 kdu_index
=kdu_index
,
2716 deploy_params
=deploy_params_kdu
,
2717 descriptor_config
=descriptor_config
,
2718 base_folder
=base_folder
,
2719 task_instantiation_info
=tasks_dict_info
,
2723 # Check if this NS has a charm configuration
2724 descriptor_config
= nsd
.get("ns-configuration")
2725 if descriptor_config
and descriptor_config
.get("juju"):
2728 member_vnf_index
= None
2735 # Get additional parameters
2736 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2737 if db_nsr
.get("additionalParamsForNs"):
2738 deploy_params
.update(
2739 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2741 base_folder
= nsd
["_admin"]["storage"]
2743 logging_text
=logging_text
,
2746 nslcmop_id
=nslcmop_id
,
2752 member_vnf_index
=member_vnf_index
,
2753 vdu_index
=vdu_index
,
2754 kdu_index
=kdu_index
,
2756 deploy_params
=deploy_params
,
2757 descriptor_config
=descriptor_config
,
2758 base_folder
=base_folder
,
2759 task_instantiation_info
=tasks_dict_info
,
2763 # rest of staff will be done at finally
2766 ROclient
.ROClientException
,
2772 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
2775 except asyncio
.CancelledError
:
2777 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
2779 exc
= "Operation was cancelled"
2780 except Exception as e
:
2781 exc
= traceback
.format_exc()
2782 self
.logger
.critical(
2783 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
2788 error_list
.append(str(exc
))
2790 # wait for pending tasks
2792 stage
[1] = "Waiting for instantiate pending tasks."
2793 self
.logger
.debug(logging_text
+ stage
[1])
2794 error_list
+= await self
._wait
_for
_tasks
(
2802 stage
[1] = stage
[2] = ""
2803 except asyncio
.CancelledError
:
2804 error_list
.append("Cancelled")
2805 # TODO cancel all tasks
2806 except Exception as exc
:
2807 error_list
.append(str(exc
))
2809 # update operation-status
2810 db_nsr_update
["operational-status"] = "running"
2811 # let's begin with VCA 'configured' status (later we can change it)
2812 db_nsr_update
["config-status"] = "configured"
2813 for task
, task_name
in tasks_dict_info
.items():
2814 if not task
.done() or task
.cancelled() or task
.exception():
2815 if task_name
.startswith(self
.task_name_deploy_vca
):
2816 # A N2VC task is pending
2817 db_nsr_update
["config-status"] = "failed"
2819 # RO or KDU task is pending
2820 db_nsr_update
["operational-status"] = "failed"
2822 # update status at database
2824 error_detail
= ". ".join(error_list
)
2825 self
.logger
.error(logging_text
+ error_detail
)
2826 error_description_nslcmop
= "{} Detail: {}".format(
2827 stage
[0], error_detail
2829 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
2830 nslcmop_id
, stage
[0]
2833 db_nsr_update
["detailed-status"] = (
2834 error_description_nsr
+ " Detail: " + error_detail
2836 db_nslcmop_update
["detailed-status"] = error_detail
2837 nslcmop_operation_state
= "FAILED"
2841 error_description_nsr
= error_description_nslcmop
= None
2843 db_nsr_update
["detailed-status"] = "Done"
2844 db_nslcmop_update
["detailed-status"] = "Done"
2845 nslcmop_operation_state
= "COMPLETED"
2848 self
._write
_ns
_status
(
2851 current_operation
="IDLE",
2852 current_operation_id
=None,
2853 error_description
=error_description_nsr
,
2854 error_detail
=error_detail
,
2855 other_update
=db_nsr_update
,
2857 self
._write
_op
_status
(
2860 error_message
=error_description_nslcmop
,
2861 operation_state
=nslcmop_operation_state
,
2862 other_update
=db_nslcmop_update
,
2865 if nslcmop_operation_state
:
2867 await self
.msg
.aiowrite(
2872 "nslcmop_id": nslcmop_id
,
2873 "operationState": nslcmop_operation_state
,
2877 except Exception as e
:
2879 logging_text
+ "kafka_write notification Exception {}".format(e
)
2882 self
.logger
.debug(logging_text
+ "Exit")
2883 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2885 def _get_vnfd(self
, vnfd_id
: str, projects_read
: str, cached_vnfds
: Dict
[str, Any
]):
2886 if vnfd_id
not in cached_vnfds
:
2887 cached_vnfds
[vnfd_id
] = self
.db
.get_one(
2888 "vnfds", {"id": vnfd_id
, "_admin.projects_read": projects_read
}
2890 return cached_vnfds
[vnfd_id
]
2892 def _get_vnfr(self
, nsr_id
: str, vnf_profile_id
: str, cached_vnfrs
: Dict
[str, Any
]):
2893 if vnf_profile_id
not in cached_vnfrs
:
2894 cached_vnfrs
[vnf_profile_id
] = self
.db
.get_one(
2897 "member-vnf-index-ref": vnf_profile_id
,
2898 "nsr-id-ref": nsr_id
,
2901 return cached_vnfrs
[vnf_profile_id
]
2903 def _is_deployed_vca_in_relation(
2904 self
, vca
: DeployedVCA
, relation
: Relation
2907 for endpoint
in (relation
.provider
, relation
.requirer
):
2908 if endpoint
["kdu-resource-profile-id"]:
2911 vca
.vnf_profile_id
== endpoint
.vnf_profile_id
2912 and vca
.vdu_profile_id
== endpoint
.vdu_profile_id
2913 and vca
.execution_environment_ref
== endpoint
.execution_environment_ref
2919 def _update_ee_relation_data_with_implicit_data(
2920 self
, nsr_id
, nsd
, ee_relation_data
, cached_vnfds
, vnf_profile_id
: str = None
2922 ee_relation_data
= safe_get_ee_relation(
2923 nsr_id
, ee_relation_data
, vnf_profile_id
=vnf_profile_id
2925 ee_relation_level
= EELevel
.get_level(ee_relation_data
)
2926 if (ee_relation_level
in (EELevel
.VNF
, EELevel
.VDU
)) and not ee_relation_data
[
2927 "execution-environment-ref"
2929 vnf_profile
= get_vnf_profile(nsd
, ee_relation_data
["vnf-profile-id"])
2930 vnfd_id
= vnf_profile
["vnfd-id"]
2931 project
= nsd
["_admin"]["projects_read"][0]
2932 db_vnfd
= self
._get
_vnfd
(vnfd_id
, project
, cached_vnfds
)
2935 if ee_relation_level
== EELevel
.VNF
2936 else ee_relation_data
["vdu-profile-id"]
2938 ee
= get_juju_ee_ref(db_vnfd
, entity_id
)
2941 f
"not execution environments found for ee_relation {ee_relation_data}"
2943 ee_relation_data
["execution-environment-ref"] = ee
["id"]
2944 return ee_relation_data
2946 def _get_ns_relations(
2949 nsd
: Dict
[str, Any
],
2951 cached_vnfds
: Dict
[str, Any
],
2952 ) -> List
[Relation
]:
2954 db_ns_relations
= get_ns_configuration_relation_list(nsd
)
2955 for r
in db_ns_relations
:
2956 provider_dict
= None
2957 requirer_dict
= None
2958 if all(key
in r
for key
in ("provider", "requirer")):
2959 provider_dict
= r
["provider"]
2960 requirer_dict
= r
["requirer"]
2961 elif "entities" in r
:
2962 provider_id
= r
["entities"][0]["id"]
2965 "endpoint": r
["entities"][0]["endpoint"],
2967 if provider_id
!= nsd
["id"]:
2968 provider_dict
["vnf-profile-id"] = provider_id
2969 requirer_id
= r
["entities"][1]["id"]
2972 "endpoint": r
["entities"][1]["endpoint"],
2974 if requirer_id
!= nsd
["id"]:
2975 requirer_dict
["vnf-profile-id"] = requirer_id
2978 "provider/requirer or entities must be included in the relation."
2980 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2981 nsr_id
, nsd
, provider_dict
, cached_vnfds
2983 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2984 nsr_id
, nsd
, requirer_dict
, cached_vnfds
2986 provider
= EERelation(relation_provider
)
2987 requirer
= EERelation(relation_requirer
)
2988 relation
= Relation(r
["name"], provider
, requirer
)
2989 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
2991 relations
.append(relation
)
2994 def _get_vnf_relations(
2997 nsd
: Dict
[str, Any
],
2999 cached_vnfds
: Dict
[str, Any
],
3000 ) -> List
[Relation
]:
3002 if vca
.target_element
== "ns":
3003 self
.logger
.debug("VCA is a NS charm, not a VNF.")
3005 vnf_profile
= get_vnf_profile(nsd
, vca
.vnf_profile_id
)
3006 vnf_profile_id
= vnf_profile
["id"]
3007 vnfd_id
= vnf_profile
["vnfd-id"]
3008 project
= nsd
["_admin"]["projects_read"][0]
3009 db_vnfd
= self
._get
_vnfd
(vnfd_id
, project
, cached_vnfds
)
3010 db_vnf_relations
= get_relation_list(db_vnfd
, vnfd_id
)
3011 for r
in db_vnf_relations
:
3012 provider_dict
= None
3013 requirer_dict
= None
3014 if all(key
in r
for key
in ("provider", "requirer")):
3015 provider_dict
= r
["provider"]
3016 requirer_dict
= r
["requirer"]
3017 elif "entities" in r
:
3018 provider_id
= r
["entities"][0]["id"]
3021 "vnf-profile-id": vnf_profile_id
,
3022 "endpoint": r
["entities"][0]["endpoint"],
3024 if provider_id
!= vnfd_id
:
3025 provider_dict
["vdu-profile-id"] = provider_id
3026 requirer_id
= r
["entities"][1]["id"]
3029 "vnf-profile-id": vnf_profile_id
,
3030 "endpoint": r
["entities"][1]["endpoint"],
3032 if requirer_id
!= vnfd_id
:
3033 requirer_dict
["vdu-profile-id"] = requirer_id
3036 "provider/requirer or entities must be included in the relation."
3038 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3039 nsr_id
, nsd
, provider_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3041 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3042 nsr_id
, nsd
, requirer_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3044 provider
= EERelation(relation_provider
)
3045 requirer
= EERelation(relation_requirer
)
3046 relation
= Relation(r
["name"], provider
, requirer
)
3047 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
3049 relations
.append(relation
)
3052 def _get_kdu_resource_data(
3054 ee_relation
: EERelation
,
3055 db_nsr
: Dict
[str, Any
],
3056 cached_vnfds
: Dict
[str, Any
],
3057 ) -> DeployedK8sResource
:
3058 nsd
= get_nsd(db_nsr
)
3059 vnf_profiles
= get_vnf_profiles(nsd
)
3060 vnfd_id
= find_in_list(
3062 lambda vnf_profile
: vnf_profile
["id"] == ee_relation
.vnf_profile_id
,
3064 project
= nsd
["_admin"]["projects_read"][0]
3065 db_vnfd
= self
._get
_vnfd
(vnfd_id
, project
, cached_vnfds
)
3066 kdu_resource_profile
= get_kdu_resource_profile(
3067 db_vnfd
, ee_relation
.kdu_resource_profile_id
3069 kdu_name
= kdu_resource_profile
["kdu-name"]
3070 deployed_kdu
, _
= get_deployed_kdu(
3071 db_nsr
.get("_admin", ()).get("deployed", ()),
3073 ee_relation
.vnf_profile_id
,
3075 deployed_kdu
.update({"resource-name": kdu_resource_profile
["resource-name"]})
3078 def _get_deployed_component(
3080 ee_relation
: EERelation
,
3081 db_nsr
: Dict
[str, Any
],
3082 cached_vnfds
: Dict
[str, Any
],
3083 ) -> DeployedComponent
:
3084 nsr_id
= db_nsr
["_id"]
3085 deployed_component
= None
3086 ee_level
= EELevel
.get_level(ee_relation
)
3087 if ee_level
== EELevel
.NS
:
3088 vca
= get_deployed_vca(db_nsr
, {"vdu_id": None, "member-vnf-index": None})
3090 deployed_component
= DeployedVCA(nsr_id
, vca
)
3091 elif ee_level
== EELevel
.VNF
:
3092 vca
= get_deployed_vca(
3096 "member-vnf-index": ee_relation
.vnf_profile_id
,
3097 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3101 deployed_component
= DeployedVCA(nsr_id
, vca
)
3102 elif ee_level
== EELevel
.VDU
:
3103 vca
= get_deployed_vca(
3106 "vdu_id": ee_relation
.vdu_profile_id
,
3107 "member-vnf-index": ee_relation
.vnf_profile_id
,
3108 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3112 deployed_component
= DeployedVCA(nsr_id
, vca
)
3113 elif ee_level
== EELevel
.KDU
:
3114 kdu_resource_data
= self
._get
_kdu
_resource
_data
(
3115 ee_relation
, db_nsr
, cached_vnfds
3117 if kdu_resource_data
:
3118 deployed_component
= DeployedK8sResource(kdu_resource_data
)
3119 return deployed_component
3121 async def _add_relation(
3125 db_nsr
: Dict
[str, Any
],
3126 cached_vnfds
: Dict
[str, Any
],
3127 cached_vnfrs
: Dict
[str, Any
],
3129 deployed_provider
= self
._get
_deployed
_component
(
3130 relation
.provider
, db_nsr
, cached_vnfds
3132 deployed_requirer
= self
._get
_deployed
_component
(
3133 relation
.requirer
, db_nsr
, cached_vnfds
3137 and deployed_requirer
3138 and deployed_provider
.config_sw_installed
3139 and deployed_requirer
.config_sw_installed
3141 provider_db_vnfr
= (
3143 relation
.provider
.nsr_id
,
3144 relation
.provider
.vnf_profile_id
,
3147 if relation
.provider
.vnf_profile_id
3150 requirer_db_vnfr
= (
3152 relation
.requirer
.nsr_id
,
3153 relation
.requirer
.vnf_profile_id
,
3156 if relation
.requirer
.vnf_profile_id
3159 provider_vca_id
= self
.get_vca_id(provider_db_vnfr
, db_nsr
)
3160 requirer_vca_id
= self
.get_vca_id(requirer_db_vnfr
, db_nsr
)
3161 provider_relation_endpoint
= RelationEndpoint(
3162 deployed_provider
.ee_id
,
3164 relation
.provider
.endpoint
,
3166 requirer_relation_endpoint
= RelationEndpoint(
3167 deployed_requirer
.ee_id
,
3169 relation
.requirer
.endpoint
,
3172 await self
.vca_map
[vca_type
].add_relation(
3173 provider
=provider_relation_endpoint
,
3174 requirer
=requirer_relation_endpoint
,
3176 except N2VCException
as exception
:
3177 self
.logger
.error(exception
)
3178 raise LcmException(exception
)
3182 async def _add_vca_relations(
3188 timeout
: int = 3600,
3192 # 1. find all relations for this VCA
3193 # 2. wait for other peers related
3197 # STEP 1: find all relations for this VCA
3200 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3201 nsd
= get_nsd(db_nsr
)
3204 deployed_vca_dict
= get_deployed_vca_list(db_nsr
)[vca_index
]
3205 my_vca
= DeployedVCA(nsr_id
, deployed_vca_dict
)
3210 relations
.extend(self
._get
_ns
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3211 relations
.extend(self
._get
_vnf
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3213 # if no relations, terminate
3215 self
.logger
.debug(logging_text
+ " No relations")
3218 self
.logger
.debug(logging_text
+ " adding relations {}".format(relations
))
3225 if now
- start
>= timeout
:
3226 self
.logger
.error(logging_text
+ " : timeout adding relations")
3229 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3230 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3232 # for each relation, find the VCA's related
3233 for relation
in relations
.copy():
3234 added
= await self
._add
_relation
(
3242 relations
.remove(relation
)
3245 self
.logger
.debug("Relations added")
3247 await asyncio
.sleep(5.0)
3251 except Exception as e
:
3252 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
3255 async def _install_kdu(
3263 k8s_instance_info
: dict,
3264 k8params
: dict = None,
3270 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
3273 "collection": "nsrs",
3274 "filter": {"_id": nsr_id
},
3275 "path": nsr_db_path
,
3278 if k8s_instance_info
.get("kdu-deployment-name"):
3279 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
3281 kdu_instance
= self
.k8scluster_map
[
3283 ].generate_kdu_instance_name(
3284 db_dict
=db_dict_install
,
3285 kdu_model
=k8s_instance_info
["kdu-model"],
3286 kdu_name
=k8s_instance_info
["kdu-name"],
3289 # Update the nsrs table with the kdu-instance value
3293 _desc
={nsr_db_path
+ ".kdu-instance": kdu_instance
},
3296 # Update the nsrs table with the actual namespace being used, if the k8scluster-type is `juju` or
3297 # `juju-bundle`. This verification is needed because there is not a standard/homogeneous namespace
3298 # between the Helm Charts and Juju Bundles-based KNFs. If we found a way of having an homogeneous
3299 # namespace, this first verification could be removed, and the next step would be done for any kind
3301 # TODO -> find a way to have an homogeneous namespace between the Helm Charts and Juju Bundles-based
3302 # KNFs (Bug 2027: https://osm.etsi.org/bugzilla/show_bug.cgi?id=2027)
3303 if k8sclustertype
in ("juju", "juju-bundle"):
3304 # First, verify if the current namespace is present in the `_admin.projects_read` (if not, it means
3305 # that the user passed a namespace which he wants its KDU to be deployed in)
3311 "_admin.projects_write": k8s_instance_info
["namespace"],
3312 "_admin.projects_read": k8s_instance_info
["namespace"],
3318 f
"Updating namespace/model for Juju Bundle from {k8s_instance_info['namespace']} to {kdu_instance}"
3323 _desc
={f
"{nsr_db_path}.namespace": kdu_instance
},
3325 k8s_instance_info
["namespace"] = kdu_instance
3327 await self
.k8scluster_map
[k8sclustertype
].install(
3328 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3329 kdu_model
=k8s_instance_info
["kdu-model"],
3332 db_dict
=db_dict_install
,
3334 kdu_name
=k8s_instance_info
["kdu-name"],
3335 namespace
=k8s_instance_info
["namespace"],
3336 kdu_instance
=kdu_instance
,
3340 # Obtain services to obtain management service ip
3341 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
3342 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3343 kdu_instance
=kdu_instance
,
3344 namespace
=k8s_instance_info
["namespace"],
3347 # Obtain management service info (if exists)
3348 vnfr_update_dict
= {}
3349 kdu_config
= get_configuration(vnfd
, kdud
["name"])
3351 target_ee_list
= kdu_config
.get("execution-environment-list", [])
3356 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
3359 for service
in kdud
.get("service", [])
3360 if service
.get("mgmt-service")
3362 for mgmt_service
in mgmt_services
:
3363 for service
in services
:
3364 if service
["name"].startswith(mgmt_service
["name"]):
3365 # Mgmt service found, Obtain service ip
3366 ip
= service
.get("external_ip", service
.get("cluster_ip"))
3367 if isinstance(ip
, list) and len(ip
) == 1:
3371 "kdur.{}.ip-address".format(kdu_index
)
3374 # Check if must update also mgmt ip at the vnf
3375 service_external_cp
= mgmt_service
.get(
3376 "external-connection-point-ref"
3378 if service_external_cp
:
3380 deep_get(vnfd
, ("mgmt-interface", "cp"))
3381 == service_external_cp
3383 vnfr_update_dict
["ip-address"] = ip
3388 "external-connection-point-ref", ""
3390 == service_external_cp
,
3393 "kdur.{}.ip-address".format(kdu_index
)
3398 "Mgmt service name: {} not found".format(
3399 mgmt_service
["name"]
3403 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3404 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3406 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3409 and kdu_config
.get("initial-config-primitive")
3410 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3412 initial_config_primitive_list
= kdu_config
.get(
3413 "initial-config-primitive"
3415 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3417 for initial_config_primitive
in initial_config_primitive_list
:
3418 primitive_params_
= self
._map
_primitive
_params
(
3419 initial_config_primitive
, {}, {}
3422 await asyncio
.wait_for(
3423 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3424 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3425 kdu_instance
=kdu_instance
,
3426 primitive_name
=initial_config_primitive
["name"],
3427 params
=primitive_params_
,
3428 db_dict
=db_dict_install
,
3434 except Exception as e
:
3435 # Prepare update db with error and raise exception
3438 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3442 vnfr_data
.get("_id"),
3443 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3446 # ignore to keep original exception
3448 # reraise original error
3453 async def deploy_kdus(
3460 task_instantiation_info
,
3462 # Launch kdus if present in the descriptor
3464 k8scluster_id_2_uuic
= {
3465 "helm-chart-v3": {},
3470 async def _get_cluster_id(cluster_id
, cluster_type
):
3471 nonlocal k8scluster_id_2_uuic
3472 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3473 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3475 # check if K8scluster is creating and wait look if previous tasks in process
3476 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3477 "k8scluster", cluster_id
3480 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3481 task_name
, cluster_id
3483 self
.logger
.debug(logging_text
+ text
)
3484 await asyncio
.wait(task_dependency
, timeout
=3600)
3486 db_k8scluster
= self
.db
.get_one(
3487 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3489 if not db_k8scluster
:
3490 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3492 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3494 if cluster_type
== "helm-chart-v3":
3496 # backward compatibility for existing clusters that have not been initialized for helm v3
3497 k8s_credentials
= yaml
.safe_dump(
3498 db_k8scluster
.get("credentials")
3500 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3501 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3503 db_k8scluster_update
= {}
3504 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3505 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3506 db_k8scluster_update
[
3507 "_admin.helm-chart-v3.created"
3509 db_k8scluster_update
[
3510 "_admin.helm-chart-v3.operationalState"
3513 "k8sclusters", cluster_id
, db_k8scluster_update
3515 except Exception as e
:
3518 + "error initializing helm-v3 cluster: {}".format(str(e
))
3521 "K8s cluster '{}' has not been initialized for '{}'".format(
3522 cluster_id
, cluster_type
3527 "K8s cluster '{}' has not been initialized for '{}'".format(
3528 cluster_id
, cluster_type
3531 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3534 logging_text
+= "Deploy kdus: "
3537 db_nsr_update
= {"_admin.deployed.K8s": []}
3538 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3541 updated_cluster_list
= []
3542 updated_v3_cluster_list
= []
3544 for vnfr_data
in db_vnfrs
.values():
3545 vca_id
= self
.get_vca_id(vnfr_data
, {})
3546 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3547 # Step 0: Prepare and set parameters
3548 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3549 vnfd_id
= vnfr_data
.get("vnfd-id")
3550 vnfd_with_id
= find_in_list(
3551 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3555 for kdud
in vnfd_with_id
["kdu"]
3556 if kdud
["name"] == kdur
["kdu-name"]
3558 namespace
= kdur
.get("k8s-namespace")
3559 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3560 if kdur
.get("helm-chart"):
3561 kdumodel
= kdur
["helm-chart"]
3562 # Default version: helm3, if helm-version is v2 assign v2
3563 k8sclustertype
= "helm-chart-v3"
3564 self
.logger
.debug("kdur: {}".format(kdur
))
3566 kdur
.get("helm-version")
3567 and kdur
.get("helm-version") == "v2"
3569 k8sclustertype
= "helm-chart"
3570 elif kdur
.get("juju-bundle"):
3571 kdumodel
= kdur
["juju-bundle"]
3572 k8sclustertype
= "juju-bundle"
3575 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3576 "juju-bundle. Maybe an old NBI version is running".format(
3577 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3580 # check if kdumodel is a file and exists
3582 vnfd_with_id
= find_in_list(
3583 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3585 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3586 if storage
: # may be not present if vnfd has not artifacts
3587 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3588 if storage
["pkg-dir"]:
3589 filename
= "{}/{}/{}s/{}".format(
3596 filename
= "{}/Scripts/{}s/{}".format(
3601 if self
.fs
.file_exists(
3602 filename
, mode
="file"
3603 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3604 kdumodel
= self
.fs
.path
+ filename
3605 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3607 except Exception: # it is not a file
3610 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3611 step
= "Synchronize repos for k8s cluster '{}'".format(
3614 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3618 k8sclustertype
== "helm-chart"
3619 and cluster_uuid
not in updated_cluster_list
3621 k8sclustertype
== "helm-chart-v3"
3622 and cluster_uuid
not in updated_v3_cluster_list
3624 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3625 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3626 cluster_uuid
=cluster_uuid
3629 if del_repo_list
or added_repo_dict
:
3630 if k8sclustertype
== "helm-chart":
3632 "_admin.helm_charts_added." + item
: None
3633 for item
in del_repo_list
3636 "_admin.helm_charts_added." + item
: name
3637 for item
, name
in added_repo_dict
.items()
3639 updated_cluster_list
.append(cluster_uuid
)
3640 elif k8sclustertype
== "helm-chart-v3":
3642 "_admin.helm_charts_v3_added." + item
: None
3643 for item
in del_repo_list
3646 "_admin.helm_charts_v3_added." + item
: name
3647 for item
, name
in added_repo_dict
.items()
3649 updated_v3_cluster_list
.append(cluster_uuid
)
3651 logging_text
+ "repos synchronized on k8s cluster "
3652 "'{}' to_delete: {}, to_add: {}".format(
3653 k8s_cluster_id
, del_repo_list
, added_repo_dict
3658 {"_id": k8s_cluster_id
},
3664 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3665 vnfr_data
["member-vnf-index-ref"],
3669 k8s_instance_info
= {
3670 "kdu-instance": None,
3671 "k8scluster-uuid": cluster_uuid
,
3672 "k8scluster-type": k8sclustertype
,
3673 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3674 "kdu-name": kdur
["kdu-name"],
3675 "kdu-model": kdumodel
,
3676 "namespace": namespace
,
3677 "kdu-deployment-name": kdu_deployment_name
,
3679 db_path
= "_admin.deployed.K8s.{}".format(index
)
3680 db_nsr_update
[db_path
] = k8s_instance_info
3681 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3682 vnfd_with_id
= find_in_list(
3683 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3685 task
= asyncio
.ensure_future(
3694 k8params
=desc_params
,
3699 self
.lcm_tasks
.register(
3703 "instantiate_KDU-{}".format(index
),
3706 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3712 except (LcmException
, asyncio
.CancelledError
):
3714 except Exception as e
:
3715 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3716 if isinstance(e
, (N2VCException
, DbException
)):
3717 self
.logger
.error(logging_text
+ msg
)
3719 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3720 raise LcmException(msg
)
3723 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3743 task_instantiation_info
,
3746 # launch instantiate_N2VC in a asyncio task and register task object
3747 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3748 # if not found, create one entry and update database
3749 # fill db_nsr._admin.deployed.VCA.<index>
3752 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
3756 get_charm_name
= False
3757 if "execution-environment-list" in descriptor_config
:
3758 ee_list
= descriptor_config
.get("execution-environment-list", [])
3759 elif "juju" in descriptor_config
:
3760 ee_list
= [descriptor_config
] # ns charms
3761 if "execution-environment-list" not in descriptor_config
:
3762 # charm name is only required for ns charms
3763 get_charm_name
= True
3764 else: # other types as script are not supported
3767 for ee_item
in ee_list
:
3770 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3771 ee_item
.get("juju"), ee_item
.get("helm-chart")
3774 ee_descriptor_id
= ee_item
.get("id")
3775 if ee_item
.get("juju"):
3776 vca_name
= ee_item
["juju"].get("charm")
3778 charm_name
= self
.find_charm_name(db_nsr
, str(vca_name
))
3781 if ee_item
["juju"].get("charm") is not None
3784 if ee_item
["juju"].get("cloud") == "k8s":
3785 vca_type
= "k8s_proxy_charm"
3786 elif ee_item
["juju"].get("proxy") is False:
3787 vca_type
= "native_charm"
3788 elif ee_item
.get("helm-chart"):
3789 vca_name
= ee_item
["helm-chart"]
3790 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
3793 vca_type
= "helm-v3"
3796 logging_text
+ "skipping non juju neither charm configuration"
3801 for vca_index
, vca_deployed
in enumerate(
3802 db_nsr
["_admin"]["deployed"]["VCA"]
3804 if not vca_deployed
:
3807 vca_deployed
.get("member-vnf-index") == member_vnf_index
3808 and vca_deployed
.get("vdu_id") == vdu_id
3809 and vca_deployed
.get("kdu_name") == kdu_name
3810 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
3811 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
3815 # not found, create one.
3817 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
3820 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
3822 target
+= "/kdu/{}".format(kdu_name
)
3824 "target_element": target
,
3825 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3826 "member-vnf-index": member_vnf_index
,
3828 "kdu_name": kdu_name
,
3829 "vdu_count_index": vdu_index
,
3830 "operational-status": "init", # TODO revise
3831 "detailed-status": "", # TODO revise
3832 "step": "initial-deploy", # TODO revise
3834 "vdu_name": vdu_name
,
3836 "ee_descriptor_id": ee_descriptor_id
,
3837 "charm_name": charm_name
,
3841 # create VCA and configurationStatus in db
3843 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
3844 "configurationStatus.{}".format(vca_index
): dict(),
3846 self
.update_db_2("nsrs", nsr_id
, db_dict
)
3848 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
3850 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
3851 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
3852 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
3855 task_n2vc
= asyncio
.ensure_future(
3856 self
.instantiate_N2VC(
3857 logging_text
=logging_text
,
3858 vca_index
=vca_index
,
3864 vdu_index
=vdu_index
,
3865 kdu_index
=kdu_index
,
3866 deploy_params
=deploy_params
,
3867 config_descriptor
=descriptor_config
,
3868 base_folder
=base_folder
,
3869 nslcmop_id
=nslcmop_id
,
3873 ee_config_descriptor
=ee_item
,
3876 self
.lcm_tasks
.register(
3880 "instantiate_N2VC-{}".format(vca_index
),
3883 task_instantiation_info
[
3885 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
3886 member_vnf_index
or "", vdu_id
or ""
3890 def _create_nslcmop(nsr_id
, operation
, params
):
3892 Creates a ns-lcm-opp content to be stored at database.
3893 :param nsr_id: internal id of the instance
3894 :param operation: instantiate, terminate, scale, action, ...
3895 :param params: user parameters for the operation
3896 :return: dictionary following SOL005 format
3898 # Raise exception if invalid arguments
3899 if not (nsr_id
and operation
and params
):
3901 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3908 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3909 "operationState": "PROCESSING",
3910 "statusEnteredTime": now
,
3911 "nsInstanceId": nsr_id
,
3912 "lcmOperationType": operation
,
3914 "isAutomaticInvocation": False,
3915 "operationParams": params
,
3916 "isCancelPending": False,
3918 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
3919 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
3924 def _format_additional_params(self
, params
):
3925 params
= params
or {}
3926 for key
, value
in params
.items():
3927 if str(value
).startswith("!!yaml "):
3928 params
[key
] = yaml
.safe_load(value
[7:])
3931 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
3932 primitive
= seq
.get("name")
3933 primitive_params
= {}
3935 "member_vnf_index": vnf_index
,
3936 "primitive": primitive
,
3937 "primitive_params": primitive_params
,
3940 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
3944 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
3945 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
3946 if op
.get("operationState") == "COMPLETED":
3947 # b. Skip sub-operation
3948 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3949 return self
.SUBOPERATION_STATUS_SKIP
3951 # c. retry executing sub-operation
3952 # The sub-operation exists, and operationState != 'COMPLETED'
3953 # Update operationState = 'PROCESSING' to indicate a retry.
3954 operationState
= "PROCESSING"
3955 detailed_status
= "In progress"
3956 self
._update
_suboperation
_status
(
3957 db_nslcmop
, op_index
, operationState
, detailed_status
3959 # Return the sub-operation index
3960 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3961 # with arguments extracted from the sub-operation
3964 # Find a sub-operation where all keys in a matching dictionary must match
3965 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3966 def _find_suboperation(self
, db_nslcmop
, match
):
3967 if db_nslcmop
and match
:
3968 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
3969 for i
, op
in enumerate(op_list
):
3970 if all(op
.get(k
) == match
[k
] for k
in match
):
3972 return self
.SUBOPERATION_STATUS_NOT_FOUND
3974 # Update status for a sub-operation given its index
3975 def _update_suboperation_status(
3976 self
, db_nslcmop
, op_index
, operationState
, detailed_status
3978 # Update DB for HA tasks
3979 q_filter
= {"_id": db_nslcmop
["_id"]}
3981 "_admin.operations.{}.operationState".format(op_index
): operationState
,
3982 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
3985 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
3988 # Add sub-operation, return the index of the added sub-operation
3989 # Optionally, set operationState, detailed-status, and operationType
3990 # Status and type are currently set for 'scale' sub-operations:
3991 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3992 # 'detailed-status' : status message
3993 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3994 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3995 def _add_suboperation(
4003 mapped_primitive_params
,
4004 operationState
=None,
4005 detailed_status
=None,
4008 RO_scaling_info
=None,
4011 return self
.SUBOPERATION_STATUS_NOT_FOUND
4012 # Get the "_admin.operations" list, if it exists
4013 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
4014 op_list
= db_nslcmop_admin
.get("operations")
4015 # Create or append to the "_admin.operations" list
4017 "member_vnf_index": vnf_index
,
4019 "vdu_count_index": vdu_count_index
,
4020 "primitive": primitive
,
4021 "primitive_params": mapped_primitive_params
,
4024 new_op
["operationState"] = operationState
4026 new_op
["detailed-status"] = detailed_status
4028 new_op
["lcmOperationType"] = operationType
4030 new_op
["RO_nsr_id"] = RO_nsr_id
4032 new_op
["RO_scaling_info"] = RO_scaling_info
4034 # No existing operations, create key 'operations' with current operation as first list element
4035 db_nslcmop_admin
.update({"operations": [new_op
]})
4036 op_list
= db_nslcmop_admin
.get("operations")
4038 # Existing operations, append operation to list
4039 op_list
.append(new_op
)
4041 db_nslcmop_update
= {"_admin.operations": op_list
}
4042 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
4043 op_index
= len(op_list
) - 1
4046 # Helper methods for scale() sub-operations
4048 # pre-scale/post-scale:
4049 # Check for 3 different cases:
4050 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
4051 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
4052 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
4053 def _check_or_add_scale_suboperation(
4057 vnf_config_primitive
,
4061 RO_scaling_info
=None,
4063 # Find this sub-operation
4064 if RO_nsr_id
and RO_scaling_info
:
4065 operationType
= "SCALE-RO"
4067 "member_vnf_index": vnf_index
,
4068 "RO_nsr_id": RO_nsr_id
,
4069 "RO_scaling_info": RO_scaling_info
,
4073 "member_vnf_index": vnf_index
,
4074 "primitive": vnf_config_primitive
,
4075 "primitive_params": primitive_params
,
4076 "lcmOperationType": operationType
,
4078 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
4079 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
4080 # a. New sub-operation
4081 # The sub-operation does not exist, add it.
4082 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
4083 # The following parameters are set to None for all kind of scaling:
4085 vdu_count_index
= None
4087 if RO_nsr_id
and RO_scaling_info
:
4088 vnf_config_primitive
= None
4089 primitive_params
= None
4092 RO_scaling_info
= None
4093 # Initial status for sub-operation
4094 operationState
= "PROCESSING"
4095 detailed_status
= "In progress"
4096 # Add sub-operation for pre/post-scaling (zero or more operations)
4097 self
._add
_suboperation
(
4103 vnf_config_primitive
,
4111 return self
.SUBOPERATION_STATUS_NEW
4113 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
4114 # or op_index (operationState != 'COMPLETED')
4115 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
4117 # Function to return execution_environment id
4119 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
4120 # TODO vdu_index_count
4121 for vca
in vca_deployed_list
:
4122 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
4125 async def destroy_N2VC(
4133 exec_primitives
=True,
4138 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
4139 :param logging_text:
4141 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
4142 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
4143 :param vca_index: index in the database _admin.deployed.VCA
4144 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
4145 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
4146 not executed properly
4147 :param scaling_in: True destroys the application, False destroys the model
4148 :return: None or exception
4153 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
4154 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
4158 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
4160 # execute terminate_primitives
4162 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
4163 config_descriptor
.get("terminate-config-primitive"),
4164 vca_deployed
.get("ee_descriptor_id"),
4166 vdu_id
= vca_deployed
.get("vdu_id")
4167 vdu_count_index
= vca_deployed
.get("vdu_count_index")
4168 vdu_name
= vca_deployed
.get("vdu_name")
4169 vnf_index
= vca_deployed
.get("member-vnf-index")
4170 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
4171 for seq
in terminate_primitives
:
4172 # For each sequence in list, get primitive and call _ns_execute_primitive()
4173 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
4174 vnf_index
, seq
.get("name")
4176 self
.logger
.debug(logging_text
+ step
)
4177 # Create the primitive for each sequence, i.e. "primitive": "touch"
4178 primitive
= seq
.get("name")
4179 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
4184 self
._add
_suboperation
(
4191 mapped_primitive_params
,
4193 # Sub-operations: Call _ns_execute_primitive() instead of action()
4195 result
, result_detail
= await self
._ns
_execute
_primitive
(
4196 vca_deployed
["ee_id"],
4198 mapped_primitive_params
,
4202 except LcmException
:
4203 # this happens when VCA is not deployed. In this case it is not needed to terminate
4205 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
4206 if result
not in result_ok
:
4208 "terminate_primitive {} for vnf_member_index={} fails with "
4209 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
4211 # set that this VCA do not need terminated
4212 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
4216 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
4219 # Delete Prometheus Jobs if any
4220 # This uses NSR_ID, so it will destroy any jobs under this index
4221 self
.db
.del_list("prometheus_jobs", {"nsr_id": db_nslcmop
["nsInstanceId"]})
4224 await self
.vca_map
[vca_type
].delete_execution_environment(
4225 vca_deployed
["ee_id"],
4226 scaling_in
=scaling_in
,
4231 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
4232 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
4233 namespace
= "." + db_nsr
["_id"]
4235 await self
.n2vc
.delete_namespace(
4236 namespace
=namespace
,
4237 total_timeout
=self
.timeout
.charm_delete
,
4240 except N2VCNotFound
: # already deleted. Skip
4242 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
4244 async def terminate(self
, nsr_id
, nslcmop_id
):
4245 # Try to lock HA task here
4246 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4247 if not task_is_locked_by_me
:
4250 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4251 self
.logger
.debug(logging_text
+ "Enter")
4252 timeout_ns_terminate
= self
.timeout
.ns_terminate
4255 operation_params
= None
4257 error_list
= [] # annotates all failed error messages
4258 db_nslcmop_update
= {}
4259 autoremove
= False # autoremove after terminated
4260 tasks_dict_info
= {}
4263 "Stage 1/3: Preparing task.",
4264 "Waiting for previous operations to terminate.",
4267 # ^ contains [stage, step, VIM-status]
4269 # wait for any previous tasks in process
4270 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4272 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4273 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4274 operation_params
= db_nslcmop
.get("operationParams") or {}
4275 if operation_params
.get("timeout_ns_terminate"):
4276 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4277 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4278 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4280 db_nsr_update
["operational-status"] = "terminating"
4281 db_nsr_update
["config-status"] = "terminating"
4282 self
._write
_ns
_status
(
4284 ns_state
="TERMINATING",
4285 current_operation
="TERMINATING",
4286 current_operation_id
=nslcmop_id
,
4287 other_update
=db_nsr_update
,
4289 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4290 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4291 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4294 stage
[1] = "Getting vnf descriptors from db."
4295 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4297 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4299 db_vnfds_from_id
= {}
4300 db_vnfds_from_member_index
= {}
4302 for vnfr
in db_vnfrs_list
:
4303 vnfd_id
= vnfr
["vnfd-id"]
4304 if vnfd_id
not in db_vnfds_from_id
:
4305 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4306 db_vnfds_from_id
[vnfd_id
] = vnfd
4307 db_vnfds_from_member_index
[
4308 vnfr
["member-vnf-index-ref"]
4309 ] = db_vnfds_from_id
[vnfd_id
]
4311 # Destroy individual execution environments when there are terminating primitives.
4312 # Rest of EE will be deleted at once
4313 # TODO - check before calling _destroy_N2VC
4314 # if not operation_params.get("skip_terminate_primitives"):#
4315 # or not vca.get("needed_terminate"):
4316 stage
[0] = "Stage 2/3 execute terminating primitives."
4317 self
.logger
.debug(logging_text
+ stage
[0])
4318 stage
[1] = "Looking execution environment that needs terminate."
4319 self
.logger
.debug(logging_text
+ stage
[1])
4321 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4322 config_descriptor
= None
4323 vca_member_vnf_index
= vca
.get("member-vnf-index")
4324 vca_id
= self
.get_vca_id(
4325 db_vnfrs_dict
.get(vca_member_vnf_index
)
4326 if vca_member_vnf_index
4330 if not vca
or not vca
.get("ee_id"):
4332 if not vca
.get("member-vnf-index"):
4334 config_descriptor
= db_nsr
.get("ns-configuration")
4335 elif vca
.get("vdu_id"):
4336 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4337 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4338 elif vca
.get("kdu_name"):
4339 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4340 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4342 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4343 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4344 vca_type
= vca
.get("type")
4345 exec_terminate_primitives
= not operation_params
.get(
4346 "skip_terminate_primitives"
4347 ) and vca
.get("needed_terminate")
4348 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4349 # pending native charms
4351 True if vca_type
in ("helm", "helm-v3", "native_charm") else False
4353 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4354 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4355 task
= asyncio
.ensure_future(
4363 exec_terminate_primitives
,
4367 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4369 # wait for pending tasks of terminate primitives
4373 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4375 error_list
= await self
._wait
_for
_tasks
(
4378 min(self
.timeout
.charm_delete
, timeout_ns_terminate
),
4382 tasks_dict_info
.clear()
4384 return # raise LcmException("; ".join(error_list))
4386 # remove All execution environments at once
4387 stage
[0] = "Stage 3/3 delete all."
4389 if nsr_deployed
.get("VCA"):
4390 stage
[1] = "Deleting all execution environments."
4391 self
.logger
.debug(logging_text
+ stage
[1])
4392 vca_id
= self
.get_vca_id({}, db_nsr
)
4393 task_delete_ee
= asyncio
.ensure_future(
4395 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4396 timeout
=self
.timeout
.charm_delete
,
4399 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4400 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4402 # Delete Namespace and Certificates if necessary
4403 if check_helm_ee_in_ns(list(db_vnfds_from_member_index
.values())):
4404 await self
.vca_map
["helm-v3"].delete_tls_certificate(
4405 certificate_name
=db_nslcmop
["nsInstanceId"],
4407 # TODO: Delete namespace
4409 # Delete from k8scluster
4410 stage
[1] = "Deleting KDUs."
4411 self
.logger
.debug(logging_text
+ stage
[1])
4412 # print(nsr_deployed)
4413 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4414 if not kdu
or not kdu
.get("kdu-instance"):
4416 kdu_instance
= kdu
.get("kdu-instance")
4417 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4418 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4419 vca_id
= self
.get_vca_id({}, db_nsr
)
4420 task_delete_kdu_instance
= asyncio
.ensure_future(
4421 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4422 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4423 kdu_instance
=kdu_instance
,
4425 namespace
=kdu
.get("namespace"),
4431 + "Unknown k8s deployment type {}".format(
4432 kdu
.get("k8scluster-type")
4437 task_delete_kdu_instance
4438 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4441 stage
[1] = "Deleting ns from VIM."
4442 if self
.ro_config
.ng
:
4443 task_delete_ro
= asyncio
.ensure_future(
4444 self
._terminate
_ng
_ro
(
4445 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4448 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4450 # rest of staff will be done at finally
4453 ROclient
.ROClientException
,
4458 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4460 except asyncio
.CancelledError
:
4462 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4464 exc
= "Operation was cancelled"
4465 except Exception as e
:
4466 exc
= traceback
.format_exc()
4467 self
.logger
.critical(
4468 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4473 error_list
.append(str(exc
))
4475 # wait for pending tasks
4477 stage
[1] = "Waiting for terminate pending tasks."
4478 self
.logger
.debug(logging_text
+ stage
[1])
4479 error_list
+= await self
._wait
_for
_tasks
(
4482 timeout_ns_terminate
,
4486 stage
[1] = stage
[2] = ""
4487 except asyncio
.CancelledError
:
4488 error_list
.append("Cancelled")
4489 # TODO cancell all tasks
4490 except Exception as exc
:
4491 error_list
.append(str(exc
))
4492 # update status at database
4494 error_detail
= "; ".join(error_list
)
4495 # self.logger.error(logging_text + error_detail)
4496 error_description_nslcmop
= "{} Detail: {}".format(
4497 stage
[0], error_detail
4499 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4500 nslcmop_id
, stage
[0]
4503 db_nsr_update
["operational-status"] = "failed"
4504 db_nsr_update
["detailed-status"] = (
4505 error_description_nsr
+ " Detail: " + error_detail
4507 db_nslcmop_update
["detailed-status"] = error_detail
4508 nslcmop_operation_state
= "FAILED"
4512 error_description_nsr
= error_description_nslcmop
= None
4513 ns_state
= "NOT_INSTANTIATED"
4514 db_nsr_update
["operational-status"] = "terminated"
4515 db_nsr_update
["detailed-status"] = "Done"
4516 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4517 db_nslcmop_update
["detailed-status"] = "Done"
4518 nslcmop_operation_state
= "COMPLETED"
4521 self
._write
_ns
_status
(
4524 current_operation
="IDLE",
4525 current_operation_id
=None,
4526 error_description
=error_description_nsr
,
4527 error_detail
=error_detail
,
4528 other_update
=db_nsr_update
,
4530 self
._write
_op
_status
(
4533 error_message
=error_description_nslcmop
,
4534 operation_state
=nslcmop_operation_state
,
4535 other_update
=db_nslcmop_update
,
4537 if ns_state
== "NOT_INSTANTIATED":
4541 {"nsr-id-ref": nsr_id
},
4542 {"_admin.nsState": "NOT_INSTANTIATED"},
4544 except DbException
as e
:
4547 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4551 if operation_params
:
4552 autoremove
= operation_params
.get("autoremove", False)
4553 if nslcmop_operation_state
:
4555 await self
.msg
.aiowrite(
4560 "nslcmop_id": nslcmop_id
,
4561 "operationState": nslcmop_operation_state
,
4562 "autoremove": autoremove
,
4566 except Exception as e
:
4568 logging_text
+ "kafka_write notification Exception {}".format(e
)
4571 self
.logger
.debug(logging_text
+ "Exit")
4572 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4574 async def _wait_for_tasks(
4575 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4578 error_detail_list
= []
4580 pending_tasks
= list(created_tasks_info
.keys())
4581 num_tasks
= len(pending_tasks
)
4583 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4584 self
._write
_op
_status
(nslcmop_id
, stage
)
4585 while pending_tasks
:
4587 _timeout
= timeout
+ time_start
- time()
4588 done
, pending_tasks
= await asyncio
.wait(
4589 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4591 num_done
+= len(done
)
4592 if not done
: # Timeout
4593 for task
in pending_tasks
:
4594 new_error
= created_tasks_info
[task
] + ": Timeout"
4595 error_detail_list
.append(new_error
)
4596 error_list
.append(new_error
)
4599 if task
.cancelled():
4602 exc
= task
.exception()
4604 if isinstance(exc
, asyncio
.TimeoutError
):
4606 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4607 error_list
.append(created_tasks_info
[task
])
4608 error_detail_list
.append(new_error
)
4615 ROclient
.ROClientException
,
4621 self
.logger
.error(logging_text
+ new_error
)
4623 exc_traceback
= "".join(
4624 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4628 + created_tasks_info
[task
]
4634 logging_text
+ created_tasks_info
[task
] + ": Done"
4636 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4638 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4639 if nsr_id
: # update also nsr
4644 "errorDescription": "Error at: " + ", ".join(error_list
),
4645 "errorDetail": ". ".join(error_detail_list
),
4648 self
._write
_op
_status
(nslcmop_id
, stage
)
4649 return error_detail_list
4652 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4654 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4655 The default-value is used. If it is between < > it look for a value at instantiation_params
4656 :param primitive_desc: portion of VNFD/NSD that describes primitive
4657 :param params: Params provided by user
4658 :param instantiation_params: Instantiation params provided by user
4659 :return: a dictionary with the calculated params
4661 calculated_params
= {}
4662 for parameter
in primitive_desc
.get("parameter", ()):
4663 param_name
= parameter
["name"]
4664 if param_name
in params
:
4665 calculated_params
[param_name
] = params
[param_name
]
4666 elif "default-value" in parameter
or "value" in parameter
:
4667 if "value" in parameter
:
4668 calculated_params
[param_name
] = parameter
["value"]
4670 calculated_params
[param_name
] = parameter
["default-value"]
4672 isinstance(calculated_params
[param_name
], str)
4673 and calculated_params
[param_name
].startswith("<")
4674 and calculated_params
[param_name
].endswith(">")
4676 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4677 calculated_params
[param_name
] = instantiation_params
[
4678 calculated_params
[param_name
][1:-1]
4682 "Parameter {} needed to execute primitive {} not provided".format(
4683 calculated_params
[param_name
], primitive_desc
["name"]
4688 "Parameter {} needed to execute primitive {} not provided".format(
4689 param_name
, primitive_desc
["name"]
4693 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4694 calculated_params
[param_name
] = yaml
.safe_dump(
4695 calculated_params
[param_name
], default_flow_style
=True, width
=256
4697 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4699 ].startswith("!!yaml "):
4700 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4701 if parameter
.get("data-type") == "INTEGER":
4703 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4704 except ValueError: # error converting string to int
4706 "Parameter {} of primitive {} must be integer".format(
4707 param_name
, primitive_desc
["name"]
4710 elif parameter
.get("data-type") == "BOOLEAN":
4711 calculated_params
[param_name
] = not (
4712 (str(calculated_params
[param_name
])).lower() == "false"
4715 # add always ns_config_info if primitive name is config
4716 if primitive_desc
["name"] == "config":
4717 if "ns_config_info" in instantiation_params
:
4718 calculated_params
["ns_config_info"] = instantiation_params
[
4721 return calculated_params
4723 def _look_for_deployed_vca(
4730 ee_descriptor_id
=None,
4732 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4733 for vca
in deployed_vca
:
4736 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4739 vdu_count_index
is not None
4740 and vdu_count_index
!= vca
["vdu_count_index"]
4743 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4745 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4749 # vca_deployed not found
4751 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4752 " is not deployed".format(
4761 ee_id
= vca
.get("ee_id")
4763 "type", "lxc_proxy_charm"
4764 ) # default value for backward compatibility - proxy charm
4767 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4768 "execution environment".format(
4769 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
4772 return ee_id
, vca_type
4774 async def _ns_execute_primitive(
4780 retries_interval
=30,
4787 if primitive
== "config":
4788 primitive_params
= {"params": primitive_params
}
4790 vca_type
= vca_type
or "lxc_proxy_charm"
4794 output
= await asyncio
.wait_for(
4795 self
.vca_map
[vca_type
].exec_primitive(
4797 primitive_name
=primitive
,
4798 params_dict
=primitive_params
,
4799 progress_timeout
=self
.timeout
.progress_primitive
,
4800 total_timeout
=self
.timeout
.primitive
,
4805 timeout
=timeout
or self
.timeout
.primitive
,
4809 except asyncio
.CancelledError
:
4811 except Exception as e
:
4815 "Error executing action {} on {} -> {}".format(
4820 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
4822 if isinstance(e
, asyncio
.TimeoutError
):
4824 message
="Timed out waiting for action to complete"
4826 return "FAILED", getattr(e
, "message", repr(e
))
4828 return "COMPLETED", output
4830 except (LcmException
, asyncio
.CancelledError
):
4832 except Exception as e
:
4833 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
4835 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
4837 Updating the vca_status with latest juju information in nsrs record
4838 :param: nsr_id: Id of the nsr
4839 :param: nslcmop_id: Id of the nslcmop
4843 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
4844 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4845 vca_id
= self
.get_vca_id({}, db_nsr
)
4846 if db_nsr
["_admin"]["deployed"]["K8s"]:
4847 for _
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
4848 cluster_uuid
, kdu_instance
, cluster_type
= (
4849 k8s
["k8scluster-uuid"],
4850 k8s
["kdu-instance"],
4851 k8s
["k8scluster-type"],
4853 await self
._on
_update
_k
8s
_db
(
4854 cluster_uuid
=cluster_uuid
,
4855 kdu_instance
=kdu_instance
,
4856 filter={"_id": nsr_id
},
4858 cluster_type
=cluster_type
,
4861 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
4862 table
, filter = "nsrs", {"_id": nsr_id
}
4863 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
4864 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
4866 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
4867 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
4869 async def action(self
, nsr_id
, nslcmop_id
):
4870 # Try to lock HA task here
4871 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4872 if not task_is_locked_by_me
:
4875 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
4876 self
.logger
.debug(logging_text
+ "Enter")
4877 # get all needed from database
4881 db_nslcmop_update
= {}
4882 nslcmop_operation_state
= None
4883 error_description_nslcmop
= None
4887 # wait for any previous tasks in process
4888 step
= "Waiting for previous operations to terminate"
4889 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4891 self
._write
_ns
_status
(
4894 current_operation
="RUNNING ACTION",
4895 current_operation_id
=nslcmop_id
,
4898 step
= "Getting information from database"
4899 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4900 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4901 if db_nslcmop
["operationParams"].get("primitive_params"):
4902 db_nslcmop
["operationParams"]["primitive_params"] = json
.loads(
4903 db_nslcmop
["operationParams"]["primitive_params"]
4906 nsr_deployed
= db_nsr
["_admin"].get("deployed")
4907 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
4908 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
4909 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
4910 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
4911 primitive
= db_nslcmop
["operationParams"]["primitive"]
4912 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
4913 timeout_ns_action
= db_nslcmop
["operationParams"].get(
4914 "timeout_ns_action", self
.timeout
.primitive
4918 step
= "Getting vnfr from database"
4919 db_vnfr
= self
.db
.get_one(
4920 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
4922 if db_vnfr
.get("kdur"):
4924 for kdur
in db_vnfr
["kdur"]:
4925 if kdur
.get("additionalParams"):
4926 kdur
["additionalParams"] = json
.loads(
4927 kdur
["additionalParams"]
4929 kdur_list
.append(kdur
)
4930 db_vnfr
["kdur"] = kdur_list
4931 step
= "Getting vnfd from database"
4932 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
4934 # Sync filesystem before running a primitive
4935 self
.fs
.sync(db_vnfr
["vnfd-id"])
4937 step
= "Getting nsd from database"
4938 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
4940 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
4941 # for backward compatibility
4942 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
4943 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
4944 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
4945 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4947 # look for primitive
4948 config_primitive_desc
= descriptor_configuration
= None
4950 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
4952 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
4954 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
4956 descriptor_configuration
= db_nsd
.get("ns-configuration")
4958 if descriptor_configuration
and descriptor_configuration
.get(
4961 for config_primitive
in descriptor_configuration
["config-primitive"]:
4962 if config_primitive
["name"] == primitive
:
4963 config_primitive_desc
= config_primitive
4966 if not config_primitive_desc
:
4967 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
4969 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
4973 primitive_name
= primitive
4974 ee_descriptor_id
= None
4976 primitive_name
= config_primitive_desc
.get(
4977 "execution-environment-primitive", primitive
4979 ee_descriptor_id
= config_primitive_desc
.get(
4980 "execution-environment-ref"
4986 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
4988 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
4991 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
4993 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
4995 desc_params
= parse_yaml_strings(
4996 db_vnfr
.get("additionalParamsForVnf")
4999 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
5000 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
5001 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
5003 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
5004 actions
.add(primitive
["name"])
5005 for primitive
in kdu_configuration
.get("config-primitive", []):
5006 actions
.add(primitive
["name"])
5008 nsr_deployed
["K8s"],
5009 lambda kdu
: kdu_name
== kdu
["kdu-name"]
5010 and kdu
["member-vnf-index"] == vnf_index
,
5014 if primitive_name
in actions
5015 and kdu
["k8scluster-type"] not in ("helm-chart", "helm-chart-v3")
5019 # TODO check if ns is in a proper status
5021 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
5023 # kdur and desc_params already set from before
5024 if primitive_params
:
5025 desc_params
.update(primitive_params
)
5026 # TODO Check if we will need something at vnf level
5027 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
5029 kdu_name
== kdu
["kdu-name"]
5030 and kdu
["member-vnf-index"] == vnf_index
5035 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
5038 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
5039 msg
= "unknown k8scluster-type '{}'".format(
5040 kdu
.get("k8scluster-type")
5042 raise LcmException(msg
)
5045 "collection": "nsrs",
5046 "filter": {"_id": nsr_id
},
5047 "path": "_admin.deployed.K8s.{}".format(index
),
5051 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
5053 step
= "Executing kdu {}".format(primitive_name
)
5054 if primitive_name
== "upgrade":
5055 if desc_params
.get("kdu_model"):
5056 kdu_model
= desc_params
.get("kdu_model")
5057 del desc_params
["kdu_model"]
5059 kdu_model
= kdu
.get("kdu-model")
5060 parts
= kdu_model
.split(sep
=":")
5062 kdu_model
= parts
[0]
5063 if desc_params
.get("kdu_atomic_upgrade"):
5064 atomic_upgrade
= desc_params
.get(
5065 "kdu_atomic_upgrade"
5066 ).lower() in ("yes", "true", "1")
5067 del desc_params
["kdu_atomic_upgrade"]
5069 atomic_upgrade
= True
5071 detailed_status
= await asyncio
.wait_for(
5072 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
5073 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5074 kdu_instance
=kdu
.get("kdu-instance"),
5075 atomic
=atomic_upgrade
,
5076 kdu_model
=kdu_model
,
5079 timeout
=timeout_ns_action
,
5081 timeout
=timeout_ns_action
+ 10,
5084 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
5086 elif primitive_name
== "rollback":
5087 detailed_status
= await asyncio
.wait_for(
5088 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
5089 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5090 kdu_instance
=kdu
.get("kdu-instance"),
5093 timeout
=timeout_ns_action
,
5095 elif primitive_name
== "status":
5096 detailed_status
= await asyncio
.wait_for(
5097 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
5098 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5099 kdu_instance
=kdu
.get("kdu-instance"),
5102 timeout
=timeout_ns_action
,
5105 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
5106 kdu
["kdu-name"], nsr_id
5108 params
= self
._map
_primitive
_params
(
5109 config_primitive_desc
, primitive_params
, desc_params
5112 detailed_status
= await asyncio
.wait_for(
5113 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
5114 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5115 kdu_instance
=kdu_instance
,
5116 primitive_name
=primitive_name
,
5119 timeout
=timeout_ns_action
,
5122 timeout
=timeout_ns_action
,
5126 nslcmop_operation_state
= "COMPLETED"
5128 detailed_status
= ""
5129 nslcmop_operation_state
= "FAILED"
5131 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5132 nsr_deployed
["VCA"],
5133 member_vnf_index
=vnf_index
,
5135 vdu_count_index
=vdu_count_index
,
5136 ee_descriptor_id
=ee_descriptor_id
,
5138 for vca_index
, vca_deployed
in enumerate(
5139 db_nsr
["_admin"]["deployed"]["VCA"]
5141 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5143 "collection": "nsrs",
5144 "filter": {"_id": nsr_id
},
5145 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
5149 nslcmop_operation_state
,
5151 ) = await self
._ns
_execute
_primitive
(
5153 primitive
=primitive_name
,
5154 primitive_params
=self
._map
_primitive
_params
(
5155 config_primitive_desc
, primitive_params
, desc_params
5157 timeout
=timeout_ns_action
,
5163 db_nslcmop_update
["detailed-status"] = detailed_status
5164 error_description_nslcmop
= (
5165 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
5169 + "Done with result {} {}".format(
5170 nslcmop_operation_state
, detailed_status
5173 return # database update is called inside finally
5175 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5176 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5178 except asyncio
.CancelledError
:
5180 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5182 exc
= "Operation was cancelled"
5183 except asyncio
.TimeoutError
:
5184 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5186 except Exception as e
:
5187 exc
= traceback
.format_exc()
5188 self
.logger
.critical(
5189 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5198 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5199 nslcmop_operation_state
= "FAILED"
5201 self
._write
_ns
_status
(
5205 ], # TODO check if degraded. For the moment use previous status
5206 current_operation
="IDLE",
5207 current_operation_id
=None,
5208 # error_description=error_description_nsr,
5209 # error_detail=error_detail,
5210 other_update
=db_nsr_update
,
5213 self
._write
_op
_status
(
5216 error_message
=error_description_nslcmop
,
5217 operation_state
=nslcmop_operation_state
,
5218 other_update
=db_nslcmop_update
,
5221 if nslcmop_operation_state
:
5223 await self
.msg
.aiowrite(
5228 "nslcmop_id": nslcmop_id
,
5229 "operationState": nslcmop_operation_state
,
5233 except Exception as e
:
5235 logging_text
+ "kafka_write notification Exception {}".format(e
)
5237 self
.logger
.debug(logging_text
+ "Exit")
5238 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5239 return nslcmop_operation_state
, detailed_status
5241 async def terminate_vdus(
5242 self
, db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
5244 """This method terminates VDUs
5247 db_vnfr: VNF instance record
5248 member_vnf_index: VNF index to identify the VDUs to be removed
5249 db_nsr: NS instance record
5250 update_db_nslcmops: Nslcmop update record
5252 vca_scaling_info
= []
5253 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5254 scaling_info
["scaling_direction"] = "IN"
5255 scaling_info
["vdu-delete"] = {}
5256 scaling_info
["kdu-delete"] = {}
5257 db_vdur
= db_vnfr
.get("vdur")
5258 vdur_list
= copy(db_vdur
)
5260 for index
, vdu
in enumerate(vdur_list
):
5261 vca_scaling_info
.append(
5263 "osm_vdu_id": vdu
["vdu-id-ref"],
5264 "member-vnf-index": member_vnf_index
,
5266 "vdu_index": count_index
,
5269 scaling_info
["vdu-delete"][vdu
["vdu-id-ref"]] = count_index
5270 scaling_info
["vdu"].append(
5272 "name": vdu
.get("name") or vdu
.get("vdu-name"),
5273 "vdu_id": vdu
["vdu-id-ref"],
5277 for interface
in vdu
["interfaces"]:
5278 scaling_info
["vdu"][index
]["interface"].append(
5280 "name": interface
["name"],
5281 "ip_address": interface
["ip-address"],
5282 "mac_address": interface
.get("mac-address"),
5285 self
.logger
.info("NS update scaling info{}".format(scaling_info
))
5286 stage
[2] = "Terminating VDUs"
5287 if scaling_info
.get("vdu-delete"):
5288 # scale_process = "RO"
5289 if self
.ro_config
.ng
:
5290 await self
._scale
_ng
_ro
(
5299 async def remove_vnf(self
, nsr_id
, nslcmop_id
, vnf_instance_id
):
5300 """This method is to Remove VNF instances from NS.
5303 nsr_id: NS instance id
5304 nslcmop_id: nslcmop id of update
5305 vnf_instance_id: id of the VNF instance to be removed
5308 result: (str, str) COMPLETED/FAILED, details
5312 logging_text
= "Task ns={} update ".format(nsr_id
)
5313 check_vnfr_count
= len(self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}))
5314 self
.logger
.info("check_vnfr_count {}".format(check_vnfr_count
))
5315 if check_vnfr_count
> 1:
5316 stage
= ["", "", ""]
5317 step
= "Getting nslcmop from database"
5319 step
+ " after having waited for previous tasks to be completed"
5321 # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5322 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5323 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5324 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5325 """ db_vnfr = self.db.get_one(
5326 "vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id}) """
5328 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5329 await self
.terminate_vdus(
5338 constituent_vnfr
= db_nsr
.get("constituent-vnfr-ref")
5339 constituent_vnfr
.remove(db_vnfr
.get("_id"))
5340 db_nsr_update
["constituent-vnfr-ref"] = db_nsr
.get(
5341 "constituent-vnfr-ref"
5343 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5344 self
.db
.del_one("vnfrs", {"_id": db_vnfr
.get("_id")})
5345 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5346 return "COMPLETED", "Done"
5348 step
= "Terminate VNF Failed with"
5350 "{} Cannot terminate the last VNF in this NS.".format(
5354 except (LcmException
, asyncio
.CancelledError
):
5356 except Exception as e
:
5357 self
.logger
.debug("Error removing VNF {}".format(e
))
5358 return "FAILED", "Error removing VNF {}".format(e
)
5360 async def _ns_redeploy_vnf(
5368 """This method updates and redeploys VNF instances
5371 nsr_id: NS instance id
5372 nslcmop_id: nslcmop id
5373 db_vnfd: VNF descriptor
5374 db_vnfr: VNF instance record
5375 db_nsr: NS instance record
5378 result: (str, str) COMPLETED/FAILED, details
5382 stage
= ["", "", ""]
5383 logging_text
= "Task ns={} update ".format(nsr_id
)
5384 latest_vnfd_revision
= db_vnfd
["_admin"].get("revision")
5385 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5387 # Terminate old VNF resources
5388 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5389 await self
.terminate_vdus(
5398 # old_vnfd_id = db_vnfr["vnfd-id"]
5399 # new_db_vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
5400 new_db_vnfd
= db_vnfd
5401 # new_vnfd_ref = new_db_vnfd["id"]
5402 # new_vnfd_id = vnfd_id
5406 for cp
in new_db_vnfd
.get("ext-cpd", ()):
5408 "name": cp
.get("id"),
5409 "connection-point-id": cp
.get("int-cpd", {}).get("cpd"),
5410 "connection-point-vdu-id": cp
.get("int-cpd", {}).get("vdu-id"),
5413 new_vnfr_cp
.append(vnf_cp
)
5414 new_vdur
= update_db_nslcmops
["operationParams"]["newVdur"]
5415 # new_vdur = self._create_vdur_descriptor_from_vnfd(db_nsd, db_vnfd, old_db_vnfd, vnfd_id, db_nsr, member_vnf_index)
5416 # new_vnfr_update = {"vnfd-ref": new_vnfd_ref, "vnfd-id": new_vnfd_id, "connection-point": new_vnfr_cp, "vdur": new_vdur, "ip-address": ""}
5418 "revision": latest_vnfd_revision
,
5419 "connection-point": new_vnfr_cp
,
5423 self
.update_db_2("vnfrs", db_vnfr
["_id"], new_vnfr_update
)
5424 updated_db_vnfr
= self
.db
.get_one(
5426 {"member-vnf-index-ref": member_vnf_index
, "nsr-id-ref": nsr_id
},
5429 # Instantiate new VNF resources
5430 # update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5431 vca_scaling_info
= []
5432 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5433 scaling_info
["scaling_direction"] = "OUT"
5434 scaling_info
["vdu-create"] = {}
5435 scaling_info
["kdu-create"] = {}
5436 vdud_instantiate_list
= db_vnfd
["vdu"]
5437 for index
, vdud
in enumerate(vdud_instantiate_list
):
5438 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(vdud
, db_vnfd
)
5440 additional_params
= (
5441 self
._get
_vdu
_additional
_params
(updated_db_vnfr
, vdud
["id"])
5444 cloud_init_list
= []
5446 # TODO Information of its own ip is not available because db_vnfr is not updated.
5447 additional_params
["OSM"] = get_osm_params(
5448 updated_db_vnfr
, vdud
["id"], 1
5450 cloud_init_list
.append(
5451 self
._parse
_cloud
_init
(
5458 vca_scaling_info
.append(
5460 "osm_vdu_id": vdud
["id"],
5461 "member-vnf-index": member_vnf_index
,
5463 "vdu_index": count_index
,
5466 scaling_info
["vdu-create"][vdud
["id"]] = count_index
5467 if self
.ro_config
.ng
:
5469 "New Resources to be deployed: {}".format(scaling_info
)
5471 await self
._scale
_ng
_ro
(
5479 return "COMPLETED", "Done"
5480 except (LcmException
, asyncio
.CancelledError
):
5482 except Exception as e
:
5483 self
.logger
.debug("Error updating VNF {}".format(e
))
5484 return "FAILED", "Error updating VNF {}".format(e
)
5486 async def _ns_charm_upgrade(
5492 timeout
: float = None,
5494 """This method upgrade charms in VNF instances
5497 ee_id: Execution environment id
5498 path: Local path to the charm
5500 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
5501 timeout: (Float) Timeout for the ns update operation
5504 result: (str, str) COMPLETED/FAILED, details
5507 charm_type
= charm_type
or "lxc_proxy_charm"
5508 output
= await self
.vca_map
[charm_type
].upgrade_charm(
5512 charm_type
=charm_type
,
5513 timeout
=timeout
or self
.timeout
.ns_update
,
5517 return "COMPLETED", output
5519 except (LcmException
, asyncio
.CancelledError
):
5522 except Exception as e
:
5524 self
.logger
.debug("Error upgrading charm {}".format(path
))
5526 return "FAILED", "Error upgrading charm {}: {}".format(path
, e
)
5528 async def update(self
, nsr_id
, nslcmop_id
):
5529 """Update NS according to different update types
5531 This method performs upgrade of VNF instances then updates the revision
5532 number in VNF record
5535 nsr_id: Network service will be updated
5536 nslcmop_id: ns lcm operation id
5539 It may raise DbException, LcmException, N2VCException, K8sException
5542 # Try to lock HA task here
5543 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5544 if not task_is_locked_by_me
:
5547 logging_text
= "Task ns={} update={} ".format(nsr_id
, nslcmop_id
)
5548 self
.logger
.debug(logging_text
+ "Enter")
5550 # Set the required variables to be filled up later
5552 db_nslcmop_update
= {}
5554 nslcmop_operation_state
= None
5556 error_description_nslcmop
= ""
5558 change_type
= "updated"
5559 detailed_status
= ""
5560 member_vnf_index
= None
5563 # wait for any previous tasks in process
5564 step
= "Waiting for previous operations to terminate"
5565 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5566 self
._write
_ns
_status
(
5569 current_operation
="UPDATING",
5570 current_operation_id
=nslcmop_id
,
5573 step
= "Getting nslcmop from database"
5574 db_nslcmop
= self
.db
.get_one(
5575 "nslcmops", {"_id": nslcmop_id
}, fail_on_empty
=False
5577 update_type
= db_nslcmop
["operationParams"]["updateType"]
5579 step
= "Getting nsr from database"
5580 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5581 old_operational_status
= db_nsr
["operational-status"]
5582 db_nsr_update
["operational-status"] = "updating"
5583 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5584 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5586 if update_type
== "CHANGE_VNFPKG":
5588 # Get the input parameters given through update request
5589 vnf_instance_id
= db_nslcmop
["operationParams"][
5590 "changeVnfPackageData"
5591 ].get("vnfInstanceId")
5593 vnfd_id
= db_nslcmop
["operationParams"]["changeVnfPackageData"].get(
5596 timeout_seconds
= db_nslcmop
["operationParams"].get("timeout_ns_update")
5598 step
= "Getting vnfr from database"
5599 db_vnfr
= self
.db
.get_one(
5600 "vnfrs", {"_id": vnf_instance_id
}, fail_on_empty
=False
5603 step
= "Getting vnfds from database"
5605 latest_vnfd
= self
.db
.get_one(
5606 "vnfds", {"_id": vnfd_id
}, fail_on_empty
=False
5608 latest_vnfd_revision
= latest_vnfd
["_admin"].get("revision")
5611 current_vnf_revision
= db_vnfr
.get("revision", 1)
5612 current_vnfd
= self
.db
.get_one(
5614 {"_id": vnfd_id
+ ":" + str(current_vnf_revision
)},
5615 fail_on_empty
=False,
5617 # Charm artifact paths will be filled up later
5619 current_charm_artifact_path
,
5620 target_charm_artifact_path
,
5621 charm_artifact_paths
,
5623 ) = ([], [], [], [])
5625 step
= "Checking if revision has changed in VNFD"
5626 if current_vnf_revision
!= latest_vnfd_revision
:
5628 change_type
= "policy_updated"
5630 # There is new revision of VNFD, update operation is required
5631 current_vnfd_path
= vnfd_id
+ ":" + str(current_vnf_revision
)
5632 latest_vnfd_path
= vnfd_id
+ ":" + str(latest_vnfd_revision
)
5634 step
= "Removing the VNFD packages if they exist in the local path"
5635 shutil
.rmtree(self
.fs
.path
+ current_vnfd_path
, ignore_errors
=True)
5636 shutil
.rmtree(self
.fs
.path
+ latest_vnfd_path
, ignore_errors
=True)
5638 step
= "Get the VNFD packages from FSMongo"
5639 self
.fs
.sync(from_path
=latest_vnfd_path
)
5640 self
.fs
.sync(from_path
=current_vnfd_path
)
5643 "Get the charm-type, charm-id, ee-id if there is deployed VCA"
5645 current_base_folder
= current_vnfd
["_admin"]["storage"]
5646 latest_base_folder
= latest_vnfd
["_admin"]["storage"]
5648 for vca_index
, vca_deployed
in enumerate(
5649 get_iterable(nsr_deployed
, "VCA")
5651 vnf_index
= db_vnfr
.get("member-vnf-index-ref")
5653 # Getting charm-id and charm-type
5654 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5655 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5656 vca_type
= vca_deployed
.get("type")
5657 vdu_count_index
= vca_deployed
.get("vdu_count_index")
5660 ee_id
= vca_deployed
.get("ee_id")
5662 step
= "Getting descriptor config"
5663 if current_vnfd
.get("kdu"):
5665 search_key
= "kdu_name"
5667 search_key
= "vnfd_id"
5669 entity_id
= vca_deployed
.get(search_key
)
5671 descriptor_config
= get_configuration(
5672 current_vnfd
, entity_id
5675 if "execution-environment-list" in descriptor_config
:
5676 ee_list
= descriptor_config
.get(
5677 "execution-environment-list", []
5682 # There could be several charm used in the same VNF
5683 for ee_item
in ee_list
:
5684 if ee_item
.get("juju"):
5686 step
= "Getting charm name"
5687 charm_name
= ee_item
["juju"].get("charm")
5689 step
= "Setting Charm artifact paths"
5690 current_charm_artifact_path
.append(
5691 get_charm_artifact_path(
5692 current_base_folder
,
5695 current_vnf_revision
,
5698 target_charm_artifact_path
.append(
5699 get_charm_artifact_path(
5703 latest_vnfd_revision
,
5706 elif ee_item
.get("helm-chart"):
5707 # add chart to list and all parameters
5708 step
= "Getting helm chart name"
5709 chart_name
= ee_item
.get("helm-chart")
5711 ee_item
.get("helm-version")
5712 and ee_item
.get("helm-version") == "v2"
5716 vca_type
= "helm-v3"
5717 step
= "Setting Helm chart artifact paths"
5719 helm_artifacts
.append(
5721 "current_artifact_path": get_charm_artifact_path(
5722 current_base_folder
,
5725 current_vnf_revision
,
5727 "target_artifact_path": get_charm_artifact_path(
5731 latest_vnfd_revision
,
5734 "vca_index": vca_index
,
5735 "vdu_index": vdu_count_index
,
5739 charm_artifact_paths
= zip(
5740 current_charm_artifact_path
, target_charm_artifact_path
5743 step
= "Checking if software version has changed in VNFD"
5744 if find_software_version(current_vnfd
) != find_software_version(
5748 step
= "Checking if existing VNF has charm"
5749 for current_charm_path
, target_charm_path
in list(
5750 charm_artifact_paths
5752 if current_charm_path
:
5754 "Software version change is not supported as VNF instance {} has charm.".format(
5759 # There is no change in the charm package, then redeploy the VNF
5760 # based on new descriptor
5761 step
= "Redeploying VNF"
5762 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5763 (result
, detailed_status
) = await self
._ns
_redeploy
_vnf
(
5764 nsr_id
, nslcmop_id
, latest_vnfd
, db_vnfr
, db_nsr
5766 if result
== "FAILED":
5767 nslcmop_operation_state
= result
5768 error_description_nslcmop
= detailed_status
5769 db_nslcmop_update
["detailed-status"] = detailed_status
5772 + " step {} Done with result {} {}".format(
5773 step
, nslcmop_operation_state
, detailed_status
5778 step
= "Checking if any charm package has changed or not"
5779 for current_charm_path
, target_charm_path
in list(
5780 charm_artifact_paths
5784 and target_charm_path
5785 and self
.check_charm_hash_changed(
5786 current_charm_path
, target_charm_path
5790 step
= "Checking whether VNF uses juju bundle"
5791 if check_juju_bundle_existence(current_vnfd
):
5794 "Charm upgrade is not supported for the instance which"
5795 " uses juju-bundle: {}".format(
5796 check_juju_bundle_existence(current_vnfd
)
5800 step
= "Upgrading Charm"
5804 ) = await self
._ns
_charm
_upgrade
(
5807 charm_type
=vca_type
,
5808 path
=self
.fs
.path
+ target_charm_path
,
5809 timeout
=timeout_seconds
,
5812 if result
== "FAILED":
5813 nslcmop_operation_state
= result
5814 error_description_nslcmop
= detailed_status
5816 db_nslcmop_update
["detailed-status"] = detailed_status
5819 + " step {} Done with result {} {}".format(
5820 step
, nslcmop_operation_state
, detailed_status
5824 step
= "Updating policies"
5825 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5826 result
= "COMPLETED"
5827 detailed_status
= "Done"
5828 db_nslcmop_update
["detailed-status"] = "Done"
5831 for item
in helm_artifacts
:
5833 item
["current_artifact_path"]
5834 and item
["target_artifact_path"]
5835 and self
.check_charm_hash_changed(
5836 item
["current_artifact_path"],
5837 item
["target_artifact_path"],
5841 db_update_entry
= "_admin.deployed.VCA.{}.".format(
5844 vnfr_id
= db_vnfr
["_id"]
5845 osm_config
= {"osm": {"ns_id": nsr_id
, "vnf_id": vnfr_id
}}
5847 "collection": "nsrs",
5848 "filter": {"_id": nsr_id
},
5849 "path": db_update_entry
,
5851 vca_type
, namespace
, helm_id
= get_ee_id_parts(item
["ee_id"])
5852 await self
.vca_map
[vca_type
].upgrade_execution_environment(
5853 namespace
=namespace
,
5857 artifact_path
=item
["target_artifact_path"],
5860 vnf_id
= db_vnfr
.get("vnfd-ref")
5861 config_descriptor
= get_configuration(latest_vnfd
, vnf_id
)
5862 self
.logger
.debug("get ssh key block")
5866 ("config-access", "ssh-access", "required"),
5868 # Needed to inject a ssh key
5871 ("config-access", "ssh-access", "default-user"),
5874 "Install configuration Software, getting public ssh key"
5876 pub_key
= await self
.vca_map
[
5878 ].get_ee_ssh_public__key(
5879 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
5883 "Insert public key into VM user={} ssh_key={}".format(
5887 self
.logger
.debug(logging_text
+ step
)
5889 # wait for RO (ip-address) Insert pub_key into VM
5890 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
5900 initial_config_primitive_list
= config_descriptor
.get(
5901 "initial-config-primitive"
5903 config_primitive
= next(
5906 for p
in initial_config_primitive_list
5907 if p
["name"] == "config"
5911 if not config_primitive
:
5914 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
5916 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
5917 if db_vnfr
.get("additionalParamsForVnf"):
5918 deploy_params
.update(
5920 db_vnfr
["additionalParamsForVnf"].copy()
5923 primitive_params_
= self
._map
_primitive
_params
(
5924 config_primitive
, {}, deploy_params
5927 step
= "execute primitive '{}' params '{}'".format(
5928 config_primitive
["name"], primitive_params_
5930 self
.logger
.debug(logging_text
+ step
)
5931 await self
.vca_map
[vca_type
].exec_primitive(
5933 primitive_name
=config_primitive
["name"],
5934 params_dict
=primitive_params_
,
5940 step
= "Updating policies"
5941 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5942 detailed_status
= "Done"
5943 db_nslcmop_update
["detailed-status"] = "Done"
5945 # If nslcmop_operation_state is None, so any operation is not failed.
5946 if not nslcmop_operation_state
:
5947 nslcmop_operation_state
= "COMPLETED"
5949 # If update CHANGE_VNFPKG nslcmop_operation is successful
5950 # vnf revision need to be updated
5951 vnfr_update
["revision"] = latest_vnfd_revision
5952 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
5956 + " task Done with result {} {}".format(
5957 nslcmop_operation_state
, detailed_status
5960 elif update_type
== "REMOVE_VNF":
5961 # This part is included in https://osm.etsi.org/gerrit/11876
5962 vnf_instance_id
= db_nslcmop
["operationParams"]["removeVnfInstanceId"]
5963 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5964 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5965 step
= "Removing VNF"
5966 (result
, detailed_status
) = await self
.remove_vnf(
5967 nsr_id
, nslcmop_id
, vnf_instance_id
5969 if result
== "FAILED":
5970 nslcmop_operation_state
= result
5971 error_description_nslcmop
= detailed_status
5972 db_nslcmop_update
["detailed-status"] = detailed_status
5973 change_type
= "vnf_terminated"
5974 if not nslcmop_operation_state
:
5975 nslcmop_operation_state
= "COMPLETED"
5978 + " task Done with result {} {}".format(
5979 nslcmop_operation_state
, detailed_status
5983 elif update_type
== "OPERATE_VNF":
5984 vnf_id
= db_nslcmop
["operationParams"]["operateVnfData"][
5987 operation_type
= db_nslcmop
["operationParams"]["operateVnfData"][
5990 additional_param
= db_nslcmop
["operationParams"]["operateVnfData"][
5993 (result
, detailed_status
) = await self
.rebuild_start_stop(
5994 nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
5996 if result
== "FAILED":
5997 nslcmop_operation_state
= result
5998 error_description_nslcmop
= detailed_status
5999 db_nslcmop_update
["detailed-status"] = detailed_status
6000 if not nslcmop_operation_state
:
6001 nslcmop_operation_state
= "COMPLETED"
6004 + " task Done with result {} {}".format(
6005 nslcmop_operation_state
, detailed_status
6009 # If nslcmop_operation_state is None, so any operation is not failed.
6010 # All operations are executed in overall.
6011 if not nslcmop_operation_state
:
6012 nslcmop_operation_state
= "COMPLETED"
6013 db_nsr_update
["operational-status"] = old_operational_status
6015 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
6016 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
6018 except asyncio
.CancelledError
:
6020 logging_text
+ "Cancelled Exception while '{}'".format(step
)
6022 exc
= "Operation was cancelled"
6023 except asyncio
.TimeoutError
:
6024 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
6026 except Exception as e
:
6027 exc
= traceback
.format_exc()
6028 self
.logger
.critical(
6029 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6038 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
6039 nslcmop_operation_state
= "FAILED"
6040 db_nsr_update
["operational-status"] = old_operational_status
6042 self
._write
_ns
_status
(
6044 ns_state
=db_nsr
["nsState"],
6045 current_operation
="IDLE",
6046 current_operation_id
=None,
6047 other_update
=db_nsr_update
,
6050 self
._write
_op
_status
(
6053 error_message
=error_description_nslcmop
,
6054 operation_state
=nslcmop_operation_state
,
6055 other_update
=db_nslcmop_update
,
6058 if nslcmop_operation_state
:
6062 "nslcmop_id": nslcmop_id
,
6063 "operationState": nslcmop_operation_state
,
6066 change_type
in ("vnf_terminated", "policy_updated")
6067 and member_vnf_index
6069 msg
.update({"vnf_member_index": member_vnf_index
})
6070 await self
.msg
.aiowrite("ns", change_type
, msg
, loop
=self
.loop
)
6071 except Exception as e
:
6073 logging_text
+ "kafka_write notification Exception {}".format(e
)
6075 self
.logger
.debug(logging_text
+ "Exit")
6076 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_update")
6077 return nslcmop_operation_state
, detailed_status
6079 async def scale(self
, nsr_id
, nslcmop_id
):
6080 # Try to lock HA task here
6081 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
6082 if not task_is_locked_by_me
:
6085 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
6086 stage
= ["", "", ""]
6087 tasks_dict_info
= {}
6088 # ^ stage, step, VIM progress
6089 self
.logger
.debug(logging_text
+ "Enter")
6090 # get all needed from database
6092 db_nslcmop_update
= {}
6095 # in case of error, indicates what part of scale was failed to put nsr at error status
6096 scale_process
= None
6097 old_operational_status
= ""
6098 old_config_status
= ""
6101 # wait for any previous tasks in process
6102 step
= "Waiting for previous operations to terminate"
6103 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
6104 self
._write
_ns
_status
(
6107 current_operation
="SCALING",
6108 current_operation_id
=nslcmop_id
,
6111 step
= "Getting nslcmop from database"
6113 step
+ " after having waited for previous tasks to be completed"
6115 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
6117 step
= "Getting nsr from database"
6118 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
6119 old_operational_status
= db_nsr
["operational-status"]
6120 old_config_status
= db_nsr
["config-status"]
6122 step
= "Parsing scaling parameters"
6123 db_nsr_update
["operational-status"] = "scaling"
6124 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6125 nsr_deployed
= db_nsr
["_admin"].get("deployed")
6127 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
6129 ]["member-vnf-index"]
6130 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
6132 ]["scaling-group-descriptor"]
6133 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
6134 # for backward compatibility
6135 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
6136 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
6137 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
6138 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6140 step
= "Getting vnfr from database"
6141 db_vnfr
= self
.db
.get_one(
6142 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
6145 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
6147 step
= "Getting vnfd from database"
6148 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
6150 base_folder
= db_vnfd
["_admin"]["storage"]
6152 step
= "Getting scaling-group-descriptor"
6153 scaling_descriptor
= find_in_list(
6154 get_scaling_aspect(db_vnfd
),
6155 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
6157 if not scaling_descriptor
:
6159 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
6160 "at vnfd:scaling-group-descriptor".format(scaling_group
)
6163 step
= "Sending scale order to VIM"
6164 # TODO check if ns is in a proper status
6166 if not db_nsr
["_admin"].get("scaling-group"):
6171 "_admin.scaling-group": [
6172 {"name": scaling_group
, "nb-scale-op": 0}
6176 admin_scale_index
= 0
6178 for admin_scale_index
, admin_scale_info
in enumerate(
6179 db_nsr
["_admin"]["scaling-group"]
6181 if admin_scale_info
["name"] == scaling_group
:
6182 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
6184 else: # not found, set index one plus last element and add new entry with the name
6185 admin_scale_index
+= 1
6187 "_admin.scaling-group.{}.name".format(admin_scale_index
)
6190 vca_scaling_info
= []
6191 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
6192 if scaling_type
== "SCALE_OUT":
6193 if "aspect-delta-details" not in scaling_descriptor
:
6195 "Aspect delta details not fount in scaling descriptor {}".format(
6196 scaling_descriptor
["name"]
6199 # count if max-instance-count is reached
6200 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6202 scaling_info
["scaling_direction"] = "OUT"
6203 scaling_info
["vdu-create"] = {}
6204 scaling_info
["kdu-create"] = {}
6205 for delta
in deltas
:
6206 for vdu_delta
in delta
.get("vdu-delta", {}):
6207 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
6208 # vdu_index also provides the number of instance of the targeted vdu
6209 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6210 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
6214 additional_params
= (
6215 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
6218 cloud_init_list
= []
6220 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6221 max_instance_count
= 10
6222 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
6223 max_instance_count
= vdu_profile
.get(
6224 "max-number-of-instances", 10
6227 default_instance_num
= get_number_of_instances(
6230 instances_number
= vdu_delta
.get("number-of-instances", 1)
6231 nb_scale_op
+= instances_number
6233 new_instance_count
= nb_scale_op
+ default_instance_num
6234 # Control if new count is over max and vdu count is less than max.
6235 # Then assign new instance count
6236 if new_instance_count
> max_instance_count
> vdu_count
:
6237 instances_number
= new_instance_count
- max_instance_count
6239 instances_number
= instances_number
6241 if new_instance_count
> max_instance_count
:
6243 "reached the limit of {} (max-instance-count) "
6244 "scaling-out operations for the "
6245 "scaling-group-descriptor '{}'".format(
6246 nb_scale_op
, scaling_group
6249 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6251 # TODO Information of its own ip is not available because db_vnfr is not updated.
6252 additional_params
["OSM"] = get_osm_params(
6253 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
6255 cloud_init_list
.append(
6256 self
._parse
_cloud
_init
(
6263 vca_scaling_info
.append(
6265 "osm_vdu_id": vdu_delta
["id"],
6266 "member-vnf-index": vnf_index
,
6268 "vdu_index": vdu_index
+ x
,
6271 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
6272 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6273 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6274 kdu_name
= kdu_profile
["kdu-name"]
6275 resource_name
= kdu_profile
.get("resource-name", "")
6277 # Might have different kdus in the same delta
6278 # Should have list for each kdu
6279 if not scaling_info
["kdu-create"].get(kdu_name
, None):
6280 scaling_info
["kdu-create"][kdu_name
] = []
6282 kdur
= get_kdur(db_vnfr
, kdu_name
)
6283 if kdur
.get("helm-chart"):
6284 k8s_cluster_type
= "helm-chart-v3"
6285 self
.logger
.debug("kdur: {}".format(kdur
))
6287 kdur
.get("helm-version")
6288 and kdur
.get("helm-version") == "v2"
6290 k8s_cluster_type
= "helm-chart"
6291 elif kdur
.get("juju-bundle"):
6292 k8s_cluster_type
= "juju-bundle"
6295 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6296 "juju-bundle. Maybe an old NBI version is running".format(
6297 db_vnfr
["member-vnf-index-ref"], kdu_name
6301 max_instance_count
= 10
6302 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
6303 max_instance_count
= kdu_profile
.get(
6304 "max-number-of-instances", 10
6307 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
6308 deployed_kdu
, _
= get_deployed_kdu(
6309 nsr_deployed
, kdu_name
, vnf_index
6311 if deployed_kdu
is None:
6313 "KDU '{}' for vnf '{}' not deployed".format(
6317 kdu_instance
= deployed_kdu
.get("kdu-instance")
6318 instance_num
= await self
.k8scluster_map
[
6324 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6325 kdu_model
=deployed_kdu
.get("kdu-model"),
6327 kdu_replica_count
= instance_num
+ kdu_delta
.get(
6328 "number-of-instances", 1
6331 # Control if new count is over max and instance_num is less than max.
6332 # Then assign max instance number to kdu replica count
6333 if kdu_replica_count
> max_instance_count
> instance_num
:
6334 kdu_replica_count
= max_instance_count
6335 if kdu_replica_count
> max_instance_count
:
6337 "reached the limit of {} (max-instance-count) "
6338 "scaling-out operations for the "
6339 "scaling-group-descriptor '{}'".format(
6340 instance_num
, scaling_group
6344 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6345 vca_scaling_info
.append(
6347 "osm_kdu_id": kdu_name
,
6348 "member-vnf-index": vnf_index
,
6350 "kdu_index": instance_num
+ x
- 1,
6353 scaling_info
["kdu-create"][kdu_name
].append(
6355 "member-vnf-index": vnf_index
,
6357 "k8s-cluster-type": k8s_cluster_type
,
6358 "resource-name": resource_name
,
6359 "scale": kdu_replica_count
,
6362 elif scaling_type
== "SCALE_IN":
6363 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6365 scaling_info
["scaling_direction"] = "IN"
6366 scaling_info
["vdu-delete"] = {}
6367 scaling_info
["kdu-delete"] = {}
6369 for delta
in deltas
:
6370 for vdu_delta
in delta
.get("vdu-delta", {}):
6371 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6372 min_instance_count
= 0
6373 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6374 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
6375 min_instance_count
= vdu_profile
["min-number-of-instances"]
6377 default_instance_num
= get_number_of_instances(
6378 db_vnfd
, vdu_delta
["id"]
6380 instance_num
= vdu_delta
.get("number-of-instances", 1)
6381 nb_scale_op
-= instance_num
6383 new_instance_count
= nb_scale_op
+ default_instance_num
6385 if new_instance_count
< min_instance_count
< vdu_count
:
6386 instances_number
= min_instance_count
- new_instance_count
6388 instances_number
= instance_num
6390 if new_instance_count
< min_instance_count
:
6392 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6393 "scaling-group-descriptor '{}'".format(
6394 nb_scale_op
, scaling_group
6397 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6398 vca_scaling_info
.append(
6400 "osm_vdu_id": vdu_delta
["id"],
6401 "member-vnf-index": vnf_index
,
6403 "vdu_index": vdu_index
- 1 - x
,
6406 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
6407 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6408 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6409 kdu_name
= kdu_profile
["kdu-name"]
6410 resource_name
= kdu_profile
.get("resource-name", "")
6412 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
6413 scaling_info
["kdu-delete"][kdu_name
] = []
6415 kdur
= get_kdur(db_vnfr
, kdu_name
)
6416 if kdur
.get("helm-chart"):
6417 k8s_cluster_type
= "helm-chart-v3"
6418 self
.logger
.debug("kdur: {}".format(kdur
))
6420 kdur
.get("helm-version")
6421 and kdur
.get("helm-version") == "v2"
6423 k8s_cluster_type
= "helm-chart"
6424 elif kdur
.get("juju-bundle"):
6425 k8s_cluster_type
= "juju-bundle"
6428 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6429 "juju-bundle. Maybe an old NBI version is running".format(
6430 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
6434 min_instance_count
= 0
6435 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
6436 min_instance_count
= kdu_profile
["min-number-of-instances"]
6438 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
6439 deployed_kdu
, _
= get_deployed_kdu(
6440 nsr_deployed
, kdu_name
, vnf_index
6442 if deployed_kdu
is None:
6444 "KDU '{}' for vnf '{}' not deployed".format(
6448 kdu_instance
= deployed_kdu
.get("kdu-instance")
6449 instance_num
= await self
.k8scluster_map
[
6455 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6456 kdu_model
=deployed_kdu
.get("kdu-model"),
6458 kdu_replica_count
= instance_num
- kdu_delta
.get(
6459 "number-of-instances", 1
6462 if kdu_replica_count
< min_instance_count
< instance_num
:
6463 kdu_replica_count
= min_instance_count
6464 if kdu_replica_count
< min_instance_count
:
6466 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6467 "scaling-group-descriptor '{}'".format(
6468 instance_num
, scaling_group
6472 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6473 vca_scaling_info
.append(
6475 "osm_kdu_id": kdu_name
,
6476 "member-vnf-index": vnf_index
,
6478 "kdu_index": instance_num
- x
- 1,
6481 scaling_info
["kdu-delete"][kdu_name
].append(
6483 "member-vnf-index": vnf_index
,
6485 "k8s-cluster-type": k8s_cluster_type
,
6486 "resource-name": resource_name
,
6487 "scale": kdu_replica_count
,
6491 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
6492 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
6493 if scaling_info
["scaling_direction"] == "IN":
6494 for vdur
in reversed(db_vnfr
["vdur"]):
6495 if vdu_delete
.get(vdur
["vdu-id-ref"]):
6496 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
6497 scaling_info
["vdu"].append(
6499 "name": vdur
.get("name") or vdur
.get("vdu-name"),
6500 "vdu_id": vdur
["vdu-id-ref"],
6504 for interface
in vdur
["interfaces"]:
6505 scaling_info
["vdu"][-1]["interface"].append(
6507 "name": interface
["name"],
6508 "ip_address": interface
["ip-address"],
6509 "mac_address": interface
.get("mac-address"),
6512 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
6515 step
= "Executing pre-scale vnf-config-primitive"
6516 if scaling_descriptor
.get("scaling-config-action"):
6517 for scaling_config_action
in scaling_descriptor
[
6518 "scaling-config-action"
6521 scaling_config_action
.get("trigger") == "pre-scale-in"
6522 and scaling_type
== "SCALE_IN"
6524 scaling_config_action
.get("trigger") == "pre-scale-out"
6525 and scaling_type
== "SCALE_OUT"
6527 vnf_config_primitive
= scaling_config_action
[
6528 "vnf-config-primitive-name-ref"
6530 step
= db_nslcmop_update
[
6532 ] = "executing pre-scale scaling-config-action '{}'".format(
6533 vnf_config_primitive
6536 # look for primitive
6537 for config_primitive
in (
6538 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6539 ).get("config-primitive", ()):
6540 if config_primitive
["name"] == vnf_config_primitive
:
6544 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
6545 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
6546 "primitive".format(scaling_group
, vnf_config_primitive
)
6549 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6550 if db_vnfr
.get("additionalParamsForVnf"):
6551 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6553 scale_process
= "VCA"
6554 db_nsr_update
["config-status"] = "configuring pre-scaling"
6555 primitive_params
= self
._map
_primitive
_params
(
6556 config_primitive
, {}, vnfr_params
6559 # Pre-scale retry check: Check if this sub-operation has been executed before
6560 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6563 vnf_config_primitive
,
6567 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6568 # Skip sub-operation
6569 result
= "COMPLETED"
6570 result_detail
= "Done"
6573 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6574 vnf_config_primitive
, result
, result_detail
6578 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6579 # New sub-operation: Get index of this sub-operation
6581 len(db_nslcmop
.get("_admin", {}).get("operations"))
6586 + "vnf_config_primitive={} New sub-operation".format(
6587 vnf_config_primitive
6591 # retry: Get registered params for this existing sub-operation
6592 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6595 vnf_index
= op
.get("member_vnf_index")
6596 vnf_config_primitive
= op
.get("primitive")
6597 primitive_params
= op
.get("primitive_params")
6600 + "vnf_config_primitive={} Sub-operation retry".format(
6601 vnf_config_primitive
6604 # Execute the primitive, either with new (first-time) or registered (reintent) args
6605 ee_descriptor_id
= config_primitive
.get(
6606 "execution-environment-ref"
6608 primitive_name
= config_primitive
.get(
6609 "execution-environment-primitive", vnf_config_primitive
6611 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6612 nsr_deployed
["VCA"],
6613 member_vnf_index
=vnf_index
,
6615 vdu_count_index
=None,
6616 ee_descriptor_id
=ee_descriptor_id
,
6618 result
, result_detail
= await self
._ns
_execute
_primitive
(
6627 + "vnf_config_primitive={} Done with result {} {}".format(
6628 vnf_config_primitive
, result
, result_detail
6631 # Update operationState = COMPLETED | FAILED
6632 self
._update
_suboperation
_status
(
6633 db_nslcmop
, op_index
, result
, result_detail
6636 if result
== "FAILED":
6637 raise LcmException(result_detail
)
6638 db_nsr_update
["config-status"] = old_config_status
6639 scale_process
= None
6643 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
6646 "_admin.scaling-group.{}.time".format(admin_scale_index
)
6649 # SCALE-IN VCA - BEGIN
6650 if vca_scaling_info
:
6651 step
= db_nslcmop_update
[
6653 ] = "Deleting the execution environments"
6654 scale_process
= "VCA"
6655 for vca_info
in vca_scaling_info
:
6656 if vca_info
["type"] == "delete" and not vca_info
.get("osm_kdu_id"):
6657 member_vnf_index
= str(vca_info
["member-vnf-index"])
6659 logging_text
+ "vdu info: {}".format(vca_info
)
6661 if vca_info
.get("osm_vdu_id"):
6662 vdu_id
= vca_info
["osm_vdu_id"]
6663 vdu_index
= int(vca_info
["vdu_index"])
6666 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6667 member_vnf_index
, vdu_id
, vdu_index
6669 stage
[2] = step
= "Scaling in VCA"
6670 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6671 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
6672 config_update
= db_nsr
["configurationStatus"]
6673 for vca_index
, vca
in enumerate(vca_update
):
6675 (vca
or vca
.get("ee_id"))
6676 and vca
["member-vnf-index"] == member_vnf_index
6677 and vca
["vdu_count_index"] == vdu_index
6679 if vca
.get("vdu_id"):
6680 config_descriptor
= get_configuration(
6681 db_vnfd
, vca
.get("vdu_id")
6683 elif vca
.get("kdu_name"):
6684 config_descriptor
= get_configuration(
6685 db_vnfd
, vca
.get("kdu_name")
6688 config_descriptor
= get_configuration(
6689 db_vnfd
, db_vnfd
["id"]
6691 operation_params
= (
6692 db_nslcmop
.get("operationParams") or {}
6694 exec_terminate_primitives
= not operation_params
.get(
6695 "skip_terminate_primitives"
6696 ) and vca
.get("needed_terminate")
6697 task
= asyncio
.ensure_future(
6706 exec_primitives
=exec_terminate_primitives
,
6710 timeout
=self
.timeout
.charm_delete
,
6713 tasks_dict_info
[task
] = "Terminating VCA {}".format(
6716 del vca_update
[vca_index
]
6717 del config_update
[vca_index
]
6718 # wait for pending tasks of terminate primitives
6722 + "Waiting for tasks {}".format(
6723 list(tasks_dict_info
.keys())
6726 error_list
= await self
._wait
_for
_tasks
(
6730 self
.timeout
.charm_delete
, self
.timeout
.ns_terminate
6735 tasks_dict_info
.clear()
6737 raise LcmException("; ".join(error_list
))
6739 db_vca_and_config_update
= {
6740 "_admin.deployed.VCA": vca_update
,
6741 "configurationStatus": config_update
,
6744 "nsrs", db_nsr
["_id"], db_vca_and_config_update
6746 scale_process
= None
6747 # SCALE-IN VCA - END
6750 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
6751 scale_process
= "RO"
6752 if self
.ro_config
.ng
:
6753 await self
._scale
_ng
_ro
(
6754 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
6756 scaling_info
.pop("vdu-create", None)
6757 scaling_info
.pop("vdu-delete", None)
6759 scale_process
= None
6763 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
6764 scale_process
= "KDU"
6765 await self
._scale
_kdu
(
6766 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6768 scaling_info
.pop("kdu-create", None)
6769 scaling_info
.pop("kdu-delete", None)
6771 scale_process
= None
6775 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6777 # SCALE-UP VCA - BEGIN
6778 if vca_scaling_info
:
6779 step
= db_nslcmop_update
[
6781 ] = "Creating new execution environments"
6782 scale_process
= "VCA"
6783 for vca_info
in vca_scaling_info
:
6784 if vca_info
["type"] == "create" and not vca_info
.get("osm_kdu_id"):
6785 member_vnf_index
= str(vca_info
["member-vnf-index"])
6787 logging_text
+ "vdu info: {}".format(vca_info
)
6789 vnfd_id
= db_vnfr
["vnfd-ref"]
6790 if vca_info
.get("osm_vdu_id"):
6791 vdu_index
= int(vca_info
["vdu_index"])
6792 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
6793 if db_vnfr
.get("additionalParamsForVnf"):
6794 deploy_params
.update(
6796 db_vnfr
["additionalParamsForVnf"].copy()
6799 descriptor_config
= get_configuration(
6800 db_vnfd
, db_vnfd
["id"]
6802 if descriptor_config
:
6808 logging_text
=logging_text
6809 + "member_vnf_index={} ".format(member_vnf_index
),
6812 nslcmop_id
=nslcmop_id
,
6818 kdu_index
=kdu_index
,
6819 member_vnf_index
=member_vnf_index
,
6820 vdu_index
=vdu_index
,
6822 deploy_params
=deploy_params
,
6823 descriptor_config
=descriptor_config
,
6824 base_folder
=base_folder
,
6825 task_instantiation_info
=tasks_dict_info
,
6828 vdu_id
= vca_info
["osm_vdu_id"]
6829 vdur
= find_in_list(
6830 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
6832 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
6833 if vdur
.get("additionalParams"):
6834 deploy_params_vdu
= parse_yaml_strings(
6835 vdur
["additionalParams"]
6838 deploy_params_vdu
= deploy_params
6839 deploy_params_vdu
["OSM"] = get_osm_params(
6840 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
6842 if descriptor_config
:
6848 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6849 member_vnf_index
, vdu_id
, vdu_index
6851 stage
[2] = step
= "Scaling out VCA"
6852 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6854 logging_text
=logging_text
6855 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6856 member_vnf_index
, vdu_id
, vdu_index
6860 nslcmop_id
=nslcmop_id
,
6866 member_vnf_index
=member_vnf_index
,
6867 vdu_index
=vdu_index
,
6868 kdu_index
=kdu_index
,
6870 deploy_params
=deploy_params_vdu
,
6871 descriptor_config
=descriptor_config
,
6872 base_folder
=base_folder
,
6873 task_instantiation_info
=tasks_dict_info
,
6876 # SCALE-UP VCA - END
6877 scale_process
= None
6880 # execute primitive service POST-SCALING
6881 step
= "Executing post-scale vnf-config-primitive"
6882 if scaling_descriptor
.get("scaling-config-action"):
6883 for scaling_config_action
in scaling_descriptor
[
6884 "scaling-config-action"
6887 scaling_config_action
.get("trigger") == "post-scale-in"
6888 and scaling_type
== "SCALE_IN"
6890 scaling_config_action
.get("trigger") == "post-scale-out"
6891 and scaling_type
== "SCALE_OUT"
6893 vnf_config_primitive
= scaling_config_action
[
6894 "vnf-config-primitive-name-ref"
6896 step
= db_nslcmop_update
[
6898 ] = "executing post-scale scaling-config-action '{}'".format(
6899 vnf_config_primitive
6902 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6903 if db_vnfr
.get("additionalParamsForVnf"):
6904 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6906 # look for primitive
6907 for config_primitive
in (
6908 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6909 ).get("config-primitive", ()):
6910 if config_primitive
["name"] == vnf_config_primitive
:
6914 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
6915 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
6916 "config-primitive".format(
6917 scaling_group
, vnf_config_primitive
6920 scale_process
= "VCA"
6921 db_nsr_update
["config-status"] = "configuring post-scaling"
6922 primitive_params
= self
._map
_primitive
_params
(
6923 config_primitive
, {}, vnfr_params
6926 # Post-scale retry check: Check if this sub-operation has been executed before
6927 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6930 vnf_config_primitive
,
6934 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6935 # Skip sub-operation
6936 result
= "COMPLETED"
6937 result_detail
= "Done"
6940 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6941 vnf_config_primitive
, result
, result_detail
6945 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6946 # New sub-operation: Get index of this sub-operation
6948 len(db_nslcmop
.get("_admin", {}).get("operations"))
6953 + "vnf_config_primitive={} New sub-operation".format(
6954 vnf_config_primitive
6958 # retry: Get registered params for this existing sub-operation
6959 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6962 vnf_index
= op
.get("member_vnf_index")
6963 vnf_config_primitive
= op
.get("primitive")
6964 primitive_params
= op
.get("primitive_params")
6967 + "vnf_config_primitive={} Sub-operation retry".format(
6968 vnf_config_primitive
6971 # Execute the primitive, either with new (first-time) or registered (reintent) args
6972 ee_descriptor_id
= config_primitive
.get(
6973 "execution-environment-ref"
6975 primitive_name
= config_primitive
.get(
6976 "execution-environment-primitive", vnf_config_primitive
6978 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6979 nsr_deployed
["VCA"],
6980 member_vnf_index
=vnf_index
,
6982 vdu_count_index
=None,
6983 ee_descriptor_id
=ee_descriptor_id
,
6985 result
, result_detail
= await self
._ns
_execute
_primitive
(
6994 + "vnf_config_primitive={} Done with result {} {}".format(
6995 vnf_config_primitive
, result
, result_detail
6998 # Update operationState = COMPLETED | FAILED
6999 self
._update
_suboperation
_status
(
7000 db_nslcmop
, op_index
, result
, result_detail
7003 if result
== "FAILED":
7004 raise LcmException(result_detail
)
7005 db_nsr_update
["config-status"] = old_config_status
7006 scale_process
= None
7011 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
7012 db_nsr_update
["operational-status"] = (
7014 if old_operational_status
== "failed"
7015 else old_operational_status
7017 db_nsr_update
["config-status"] = old_config_status
7020 ROclient
.ROClientException
,
7025 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7027 except asyncio
.CancelledError
:
7029 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7031 exc
= "Operation was cancelled"
7032 except Exception as e
:
7033 exc
= traceback
.format_exc()
7034 self
.logger
.critical(
7035 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7039 self
._write
_ns
_status
(
7042 current_operation
="IDLE",
7043 current_operation_id
=None,
7046 stage
[1] = "Waiting for instantiate pending tasks."
7047 self
.logger
.debug(logging_text
+ stage
[1])
7048 exc
= await self
._wait
_for
_tasks
(
7051 self
.timeout
.ns_deploy
,
7059 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7060 nslcmop_operation_state
= "FAILED"
7062 db_nsr_update
["operational-status"] = old_operational_status
7063 db_nsr_update
["config-status"] = old_config_status
7064 db_nsr_update
["detailed-status"] = ""
7066 if "VCA" in scale_process
:
7067 db_nsr_update
["config-status"] = "failed"
7068 if "RO" in scale_process
:
7069 db_nsr_update
["operational-status"] = "failed"
7072 ] = "FAILED scaling nslcmop={} {}: {}".format(
7073 nslcmop_id
, step
, exc
7076 error_description_nslcmop
= None
7077 nslcmop_operation_state
= "COMPLETED"
7078 db_nslcmop_update
["detailed-status"] = "Done"
7080 self
._write
_op
_status
(
7083 error_message
=error_description_nslcmop
,
7084 operation_state
=nslcmop_operation_state
,
7085 other_update
=db_nslcmop_update
,
7088 self
._write
_ns
_status
(
7091 current_operation
="IDLE",
7092 current_operation_id
=None,
7093 other_update
=db_nsr_update
,
7096 if nslcmop_operation_state
:
7100 "nslcmop_id": nslcmop_id
,
7101 "operationState": nslcmop_operation_state
,
7103 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
7104 except Exception as e
:
7106 logging_text
+ "kafka_write notification Exception {}".format(e
)
7108 self
.logger
.debug(logging_text
+ "Exit")
7109 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
7111 async def _scale_kdu(
7112 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
7114 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
7115 for kdu_name
in _scaling_info
:
7116 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
7117 deployed_kdu
, index
= get_deployed_kdu(
7118 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
7120 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
7121 kdu_instance
= deployed_kdu
["kdu-instance"]
7122 kdu_model
= deployed_kdu
.get("kdu-model")
7123 scale
= int(kdu_scaling_info
["scale"])
7124 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
7127 "collection": "nsrs",
7128 "filter": {"_id": nsr_id
},
7129 "path": "_admin.deployed.K8s.{}".format(index
),
7132 step
= "scaling application {}".format(
7133 kdu_scaling_info
["resource-name"]
7135 self
.logger
.debug(logging_text
+ step
)
7137 if kdu_scaling_info
["type"] == "delete":
7138 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7141 and kdu_config
.get("terminate-config-primitive")
7142 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7144 terminate_config_primitive_list
= kdu_config
.get(
7145 "terminate-config-primitive"
7147 terminate_config_primitive_list
.sort(
7148 key
=lambda val
: int(val
["seq"])
7152 terminate_config_primitive
7153 ) in terminate_config_primitive_list
:
7154 primitive_params_
= self
._map
_primitive
_params
(
7155 terminate_config_primitive
, {}, {}
7157 step
= "execute terminate config primitive"
7158 self
.logger
.debug(logging_text
+ step
)
7159 await asyncio
.wait_for(
7160 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7161 cluster_uuid
=cluster_uuid
,
7162 kdu_instance
=kdu_instance
,
7163 primitive_name
=terminate_config_primitive
["name"],
7164 params
=primitive_params_
,
7166 total_timeout
=self
.timeout
.primitive
,
7169 timeout
=self
.timeout
.primitive
7170 * self
.timeout
.primitive_outer_factor
,
7173 await asyncio
.wait_for(
7174 self
.k8scluster_map
[k8s_cluster_type
].scale(
7175 kdu_instance
=kdu_instance
,
7177 resource_name
=kdu_scaling_info
["resource-name"],
7178 total_timeout
=self
.timeout
.scale_on_error
,
7180 cluster_uuid
=cluster_uuid
,
7181 kdu_model
=kdu_model
,
7185 timeout
=self
.timeout
.scale_on_error
7186 * self
.timeout
.scale_on_error_outer_factor
,
7189 if kdu_scaling_info
["type"] == "create":
7190 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7193 and kdu_config
.get("initial-config-primitive")
7194 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7196 initial_config_primitive_list
= kdu_config
.get(
7197 "initial-config-primitive"
7199 initial_config_primitive_list
.sort(
7200 key
=lambda val
: int(val
["seq"])
7203 for initial_config_primitive
in initial_config_primitive_list
:
7204 primitive_params_
= self
._map
_primitive
_params
(
7205 initial_config_primitive
, {}, {}
7207 step
= "execute initial config primitive"
7208 self
.logger
.debug(logging_text
+ step
)
7209 await asyncio
.wait_for(
7210 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7211 cluster_uuid
=cluster_uuid
,
7212 kdu_instance
=kdu_instance
,
7213 primitive_name
=initial_config_primitive
["name"],
7214 params
=primitive_params_
,
7221 async def _scale_ng_ro(
7222 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
7224 nsr_id
= db_nslcmop
["nsInstanceId"]
7225 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7228 # read from db: vnfd's for every vnf
7231 # for each vnf in ns, read vnfd
7232 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
7233 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
7234 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
7235 # if we haven't this vnfd, read it from db
7236 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
7238 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7239 db_vnfds
.append(vnfd
)
7240 n2vc_key
= self
.n2vc
.get_public_key()
7241 n2vc_key_list
= [n2vc_key
]
7244 vdu_scaling_info
.get("vdu-create"),
7245 vdu_scaling_info
.get("vdu-delete"),
7248 # db_vnfr has been updated, update db_vnfrs to use it
7249 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
7250 await self
._instantiate
_ng
_ro
(
7260 start_deploy
=time(),
7261 timeout_ns_deploy
=self
.timeout
.ns_deploy
,
7263 if vdu_scaling_info
.get("vdu-delete"):
7265 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
7268 async def extract_prometheus_scrape_jobs(
7272 ee_config_descriptor
: dict,
7276 vnf_member_index
: str = "",
7278 vdu_index
: int = None,
7280 kdu_index
: int = None,
7282 """Method to extract prometheus scrape jobs from EE's Prometheus template job file
7283 This method will wait until the corresponding VDU or KDU is fully instantiated
7286 ee_id (str): Execution Environment ID
7287 artifact_path (str): Path where the EE's content is (including the Prometheus template file)
7288 ee_config_descriptor (dict): Execution Environment's configuration descriptor
7289 vnfr_id (str): VNFR ID where this EE applies
7290 nsr_id (str): NSR ID where this EE applies
7291 target_ip (str): VDU/KDU instance IP address
7292 vnf_member_index (str, optional): VNF index where this EE applies. Defaults to "".
7293 vdu_id (str, optional): VDU ID where this EE applies. Defaults to "".
7294 vdu_index (int, optional): VDU index where this EE applies. Defaults to None.
7295 kdu_name (str, optional): KDU name where this EE applies. Defaults to "".
7296 kdu_index (int, optional): KDU index where this EE applies. Defaults to None.
7299 LcmException: When the VDU or KDU instance was not found in an hour
7302 _type_: Prometheus jobs
7304 self
.logger
.debug(f
"KDU: {kdu_name}; KDU INDEX: {kdu_index}")
7305 # look if exist a file called 'prometheus*.j2' and
7306 artifact_content
= self
.fs
.dir_ls(artifact_path
)
7310 for f
in artifact_content
7311 if f
.startswith("prometheus") and f
.endswith(".j2")
7317 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
7322 for r
in range(360):
7323 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
7324 if vdu_id
and vdu_index
is not None:
7328 for x
in get_iterable(db_vnfr
, "vdur")
7330 x
.get("vdu-id-ref") == vdu_id
7331 and x
.get("count-index") == vdu_index
7336 if vdur
.get("name"):
7337 vdur_name
= vdur
.get("name")
7339 if kdu_name
and kdu_index
is not None:
7343 for x
in get_iterable(db_vnfr
, "kdur")
7345 x
.get("kdu-name") == kdu_name
7346 and x
.get("count-index") == kdu_index
7351 if kdur
.get("name"):
7352 kdur_name
= kdur
.get("name")
7355 await asyncio
.sleep(10, loop
=self
.loop
)
7357 if vdu_id
and vdu_index
is not None:
7359 f
"Timeout waiting VDU with name={vdu_id} and index={vdu_index} to be intantiated"
7361 if kdu_name
and kdu_index
is not None:
7363 f
"Timeout waiting KDU with name={kdu_name} and index={kdu_index} to be intantiated"
7367 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
7368 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
7370 vnfr_id
= vnfr_id
.replace("-", "")
7372 "JOB_NAME": vnfr_id
,
7373 "TARGET_IP": target_ip
,
7374 "EXPORTER_POD_IP": host_name
,
7375 "EXPORTER_POD_PORT": host_port
,
7377 "VNF_MEMBER_INDEX": vnf_member_index
,
7378 "VDUR_NAME": vdur_name
,
7379 "KDUR_NAME": kdur_name
,
7381 job_list
= parse_job(job_data
, variables
)
7382 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
7383 for job
in job_list
:
7385 not isinstance(job
.get("job_name"), str)
7386 or vnfr_id
not in job
["job_name"]
7388 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
7389 job
["nsr_id"] = nsr_id
7390 job
["vnfr_id"] = vnfr_id
7393 async def rebuild_start_stop(
7394 self
, nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
7396 logging_text
= "Task ns={} {}={} ".format(nsr_id
, operation_type
, nslcmop_id
)
7397 self
.logger
.info(logging_text
+ "Enter")
7398 stage
= ["Preparing the environment", ""]
7399 # database nsrs record
7403 # in case of error, indicates what part of scale was failed to put nsr at error status
7404 start_deploy
= time()
7406 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_id
})
7407 vim_account_id
= db_vnfr
.get("vim-account-id")
7408 vim_info_key
= "vim:" + vim_account_id
7409 vdu_id
= additional_param
["vdu_id"]
7410 vdurs
= [item
for item
in db_vnfr
["vdur"] if item
["vdu-id-ref"] == vdu_id
]
7411 vdur
= find_in_list(
7412 vdurs
, lambda vdu
: vdu
["count-index"] == additional_param
["count-index"]
7415 vdu_vim_name
= vdur
["name"]
7416 vim_vm_id
= vdur
["vim_info"][vim_info_key
]["vim_id"]
7417 target_vim
, _
= next(k_v
for k_v
in vdur
["vim_info"].items())
7419 raise LcmException("Target vdu is not found")
7420 self
.logger
.info("vdu_vim_name >> {} ".format(vdu_vim_name
))
7421 # wait for any previous tasks in process
7422 stage
[1] = "Waiting for previous operations to terminate"
7423 self
.logger
.info(stage
[1])
7424 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7426 stage
[1] = "Reading from database."
7427 self
.logger
.info(stage
[1])
7428 self
._write
_ns
_status
(
7431 current_operation
=operation_type
.upper(),
7432 current_operation_id
=nslcmop_id
,
7434 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7437 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
7438 db_nsr_update
["operational-status"] = operation_type
7439 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7443 "vim_vm_id": vim_vm_id
,
7445 "vdu_index": additional_param
["count-index"],
7446 "vdu_id": vdur
["id"],
7447 "target_vim": target_vim
,
7448 "vim_account_id": vim_account_id
,
7451 stage
[1] = "Sending rebuild request to RO... {}".format(desc
)
7452 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7453 self
.logger
.info("ro nsr id: {}".format(nsr_id
))
7454 result_dict
= await self
.RO
.operate(nsr_id
, desc
, operation_type
)
7455 self
.logger
.info("response from RO: {}".format(result_dict
))
7456 action_id
= result_dict
["action_id"]
7457 await self
._wait
_ng
_ro
(
7462 self
.timeout
.operate
,
7464 "start_stop_rebuild",
7466 return "COMPLETED", "Done"
7467 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7468 self
.logger
.error("Exit Exception {}".format(e
))
7470 except asyncio
.CancelledError
:
7471 self
.logger
.error("Cancelled Exception while '{}'".format(stage
))
7472 exc
= "Operation was cancelled"
7473 except Exception as e
:
7474 exc
= traceback
.format_exc()
7475 self
.logger
.critical(
7476 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7478 return "FAILED", "Error in operate VNF {}".format(exc
)
7480 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7482 Get VCA Cloud and VCA Cloud Credentials for the VIM account
7484 :param: vim_account_id: VIM Account ID
7486 :return: (cloud_name, cloud_credential)
7488 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7489 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
7491 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7493 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
7495 :param: vim_account_id: VIM Account ID
7497 :return: (cloud_name, cloud_credential)
7499 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7500 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")
7502 async def migrate(self
, nsr_id
, nslcmop_id
):
7504 Migrate VNFs and VDUs instances in a NS
7506 :param: nsr_id: NS Instance ID
7507 :param: nslcmop_id: nslcmop ID of migrate
7510 # Try to lock HA task here
7511 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7512 if not task_is_locked_by_me
:
7514 logging_text
= "Task ns={} migrate ".format(nsr_id
)
7515 self
.logger
.debug(logging_text
+ "Enter")
7516 # get all needed from database
7518 db_nslcmop_update
= {}
7519 nslcmop_operation_state
= None
7523 # in case of error, indicates what part of scale was failed to put nsr at error status
7524 start_deploy
= time()
7527 # wait for any previous tasks in process
7528 step
= "Waiting for previous operations to terminate"
7529 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7531 self
._write
_ns
_status
(
7534 current_operation
="MIGRATING",
7535 current_operation_id
=nslcmop_id
,
7537 step
= "Getting nslcmop from database"
7539 step
+ " after having waited for previous tasks to be completed"
7541 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7542 migrate_params
= db_nslcmop
.get("operationParams")
7545 target
.update(migrate_params
)
7546 desc
= await self
.RO
.migrate(nsr_id
, target
)
7547 self
.logger
.debug("RO return > {}".format(desc
))
7548 action_id
= desc
["action_id"]
7549 await self
._wait
_ng
_ro
(
7554 self
.timeout
.migrate
,
7555 operation
="migrate",
7557 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7558 self
.logger
.error("Exit Exception {}".format(e
))
7560 except asyncio
.CancelledError
:
7561 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
7562 exc
= "Operation was cancelled"
7563 except Exception as e
:
7564 exc
= traceback
.format_exc()
7565 self
.logger
.critical(
7566 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7569 self
._write
_ns
_status
(
7572 current_operation
="IDLE",
7573 current_operation_id
=None,
7576 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
7577 nslcmop_operation_state
= "FAILED"
7579 nslcmop_operation_state
= "COMPLETED"
7580 db_nslcmop_update
["detailed-status"] = "Done"
7581 db_nsr_update
["detailed-status"] = "Done"
7583 self
._write
_op
_status
(
7587 operation_state
=nslcmop_operation_state
,
7588 other_update
=db_nslcmop_update
,
7590 if nslcmop_operation_state
:
7594 "nslcmop_id": nslcmop_id
,
7595 "operationState": nslcmop_operation_state
,
7597 await self
.msg
.aiowrite("ns", "migrated", msg
, loop
=self
.loop
)
7598 except Exception as e
:
7600 logging_text
+ "kafka_write notification Exception {}".format(e
)
7602 self
.logger
.debug(logging_text
+ "Exit")
7603 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_migrate")
7605 async def heal(self
, nsr_id
, nslcmop_id
):
7609 :param nsr_id: ns instance to heal
7610 :param nslcmop_id: operation to run
7614 # Try to lock HA task here
7615 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7616 if not task_is_locked_by_me
:
7619 logging_text
= "Task ns={} heal={} ".format(nsr_id
, nslcmop_id
)
7620 stage
= ["", "", ""]
7621 tasks_dict_info
= {}
7622 # ^ stage, step, VIM progress
7623 self
.logger
.debug(logging_text
+ "Enter")
7624 # get all needed from database
7626 db_nslcmop_update
= {}
7628 db_vnfrs
= {} # vnf's info indexed by _id
7630 old_operational_status
= ""
7631 old_config_status
= ""
7634 # wait for any previous tasks in process
7635 step
= "Waiting for previous operations to terminate"
7636 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7637 self
._write
_ns
_status
(
7640 current_operation
="HEALING",
7641 current_operation_id
=nslcmop_id
,
7644 step
= "Getting nslcmop from database"
7646 step
+ " after having waited for previous tasks to be completed"
7648 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7650 step
= "Getting nsr from database"
7651 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
7652 old_operational_status
= db_nsr
["operational-status"]
7653 old_config_status
= db_nsr
["config-status"]
7656 "_admin.deployed.RO.operational-status": "healing",
7658 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7660 step
= "Sending heal order to VIM"
7662 logging_text
=logging_text
,
7664 db_nslcmop
=db_nslcmop
,
7669 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
7670 self
.logger
.debug(logging_text
+ stage
[1])
7671 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7672 self
.fs
.sync(db_nsr
["nsd-id"])
7674 # read from db: vnfr's of this ns
7675 step
= "Getting vnfrs from db"
7676 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
7677 for vnfr
in db_vnfrs_list
:
7678 db_vnfrs
[vnfr
["_id"]] = vnfr
7679 self
.logger
.debug("ns.heal db_vnfrs={}".format(db_vnfrs
))
7681 # Check for each target VNF
7682 target_list
= db_nslcmop
.get("operationParams", {}).get("healVnfData", {})
7683 for target_vnf
in target_list
:
7684 # Find this VNF in the list from DB
7685 vnfr_id
= target_vnf
.get("vnfInstanceId", None)
7687 db_vnfr
= db_vnfrs
[vnfr_id
]
7688 vnfd_id
= db_vnfr
.get("vnfd-id")
7689 vnfd_ref
= db_vnfr
.get("vnfd-ref")
7690 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7691 base_folder
= vnfd
["_admin"]["storage"]
7696 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
7697 member_vnf_index
= db_vnfr
.get("member-vnf-index-ref")
7699 # Check each target VDU and deploy N2VC
7700 target_vdu_list
= target_vnf
.get("additionalParams", {}).get(
7703 if not target_vdu_list
:
7704 # Codigo nuevo para crear diccionario
7705 target_vdu_list
= []
7706 for existing_vdu
in db_vnfr
.get("vdur"):
7707 vdu_name
= existing_vdu
.get("vdu-name", None)
7708 vdu_index
= existing_vdu
.get("count-index", 0)
7709 vdu_run_day1
= target_vnf
.get("additionalParams", {}).get(
7712 vdu_to_be_healed
= {
7714 "count-index": vdu_index
,
7715 "run-day1": vdu_run_day1
,
7717 target_vdu_list
.append(vdu_to_be_healed
)
7718 for target_vdu
in target_vdu_list
:
7719 deploy_params_vdu
= target_vdu
7720 # Set run-day1 vnf level value if not vdu level value exists
7721 if not deploy_params_vdu
.get("run-day1") and target_vnf
[
7724 deploy_params_vdu
["run-day1"] = target_vnf
[
7727 vdu_name
= target_vdu
.get("vdu-id", None)
7728 # TODO: Get vdu_id from vdud.
7730 # For multi instance VDU count-index is mandatory
7731 # For single session VDU count-indes is 0
7732 vdu_index
= target_vdu
.get("count-index", 0)
7734 # n2vc_redesign STEP 3 to 6 Deploy N2VC
7735 stage
[1] = "Deploying Execution Environments."
7736 self
.logger
.debug(logging_text
+ stage
[1])
7738 # VNF Level charm. Normal case when proxy charms.
7739 # If target instance is management machine continue with actions: recreate EE for native charms or reinject juju key for proxy charms.
7740 descriptor_config
= get_configuration(vnfd
, vnfd_ref
)
7741 if descriptor_config
:
7742 # Continue if healed machine is management machine
7743 vnf_ip_address
= db_vnfr
.get("ip-address")
7744 target_instance
= None
7745 for instance
in db_vnfr
.get("vdur", None):
7747 instance
["vdu-name"] == vdu_name
7748 and instance
["count-index"] == vdu_index
7750 target_instance
= instance
7752 if vnf_ip_address
== target_instance
.get("ip-address"):
7754 logging_text
=logging_text
7755 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7756 member_vnf_index
, vdu_name
, vdu_index
7760 nslcmop_id
=nslcmop_id
,
7766 member_vnf_index
=member_vnf_index
,
7769 deploy_params
=deploy_params_vdu
,
7770 descriptor_config
=descriptor_config
,
7771 base_folder
=base_folder
,
7772 task_instantiation_info
=tasks_dict_info
,
7776 # VDU Level charm. Normal case with native charms.
7777 descriptor_config
= get_configuration(vnfd
, vdu_name
)
7778 if descriptor_config
:
7780 logging_text
=logging_text
7781 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7782 member_vnf_index
, vdu_name
, vdu_index
7786 nslcmop_id
=nslcmop_id
,
7792 member_vnf_index
=member_vnf_index
,
7793 vdu_index
=vdu_index
,
7795 deploy_params
=deploy_params_vdu
,
7796 descriptor_config
=descriptor_config
,
7797 base_folder
=base_folder
,
7798 task_instantiation_info
=tasks_dict_info
,
7803 ROclient
.ROClientException
,
7808 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7810 except asyncio
.CancelledError
:
7812 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7814 exc
= "Operation was cancelled"
7815 except Exception as e
:
7816 exc
= traceback
.format_exc()
7817 self
.logger
.critical(
7818 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7823 stage
[1] = "Waiting for healing pending tasks."
7824 self
.logger
.debug(logging_text
+ stage
[1])
7825 exc
= await self
._wait
_for
_tasks
(
7828 self
.timeout
.ns_deploy
,
7836 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7837 nslcmop_operation_state
= "FAILED"
7839 db_nsr_update
["operational-status"] = old_operational_status
7840 db_nsr_update
["config-status"] = old_config_status
7843 ] = "FAILED healing nslcmop={} {}: {}".format(nslcmop_id
, step
, exc
)
7844 for task
, task_name
in tasks_dict_info
.items():
7845 if not task
.done() or task
.cancelled() or task
.exception():
7846 if task_name
.startswith(self
.task_name_deploy_vca
):
7847 # A N2VC task is pending
7848 db_nsr_update
["config-status"] = "failed"
7850 # RO task is pending
7851 db_nsr_update
["operational-status"] = "failed"
7853 error_description_nslcmop
= None
7854 nslcmop_operation_state
= "COMPLETED"
7855 db_nslcmop_update
["detailed-status"] = "Done"
7856 db_nsr_update
["detailed-status"] = "Done"
7857 db_nsr_update
["operational-status"] = "running"
7858 db_nsr_update
["config-status"] = "configured"
7860 self
._write
_op
_status
(
7863 error_message
=error_description_nslcmop
,
7864 operation_state
=nslcmop_operation_state
,
7865 other_update
=db_nslcmop_update
,
7868 self
._write
_ns
_status
(
7871 current_operation
="IDLE",
7872 current_operation_id
=None,
7873 other_update
=db_nsr_update
,
7876 if nslcmop_operation_state
:
7880 "nslcmop_id": nslcmop_id
,
7881 "operationState": nslcmop_operation_state
,
7883 await self
.msg
.aiowrite("ns", "healed", msg
, loop
=self
.loop
)
7884 except Exception as e
:
7886 logging_text
+ "kafka_write notification Exception {}".format(e
)
7888 self
.logger
.debug(logging_text
+ "Exit")
7889 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_heal")
7900 :param logging_text: preffix text to use at logging
7901 :param nsr_id: nsr identity
7902 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
7903 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
7904 :return: None or exception
7907 def get_vim_account(vim_account_id
):
7909 if vim_account_id
in db_vims
:
7910 return db_vims
[vim_account_id
]
7911 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
7912 db_vims
[vim_account_id
] = db_vim
7917 ns_params
= db_nslcmop
.get("operationParams")
7918 if ns_params
and ns_params
.get("timeout_ns_heal"):
7919 timeout_ns_heal
= ns_params
["timeout_ns_heal"]
7921 timeout_ns_heal
= self
.timeout
.ns_heal
7925 nslcmop_id
= db_nslcmop
["_id"]
7927 "action_id": nslcmop_id
,
7929 self
.logger
.warning(
7930 "db_nslcmop={} and timeout_ns_heal={}".format(
7931 db_nslcmop
, timeout_ns_heal
7934 target
.update(db_nslcmop
.get("operationParams", {}))
7936 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
7937 desc
= await self
.RO
.recreate(nsr_id
, target
)
7938 self
.logger
.debug("RO return > {}".format(desc
))
7939 action_id
= desc
["action_id"]
7940 # waits for RO to complete because Reinjecting juju key at ro can find VM in state Deleted
7941 await self
._wait
_ng
_ro
(
7948 operation
="healing",
7953 "_admin.deployed.RO.operational-status": "running",
7954 "detailed-status": " ".join(stage
),
7956 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7957 self
._write
_op
_status
(nslcmop_id
, stage
)
7959 logging_text
+ "ns healed at RO. RO_id={}".format(action_id
)
7962 except Exception as e
:
7963 stage
[2] = "ERROR healing at VIM"
7964 # self.set_vnfr_at_error(db_vnfrs, str(e))
7966 "Error healing at VIM {}".format(e
),
7967 exc_info
=not isinstance(
7970 ROclient
.ROClientException
,
7996 task_instantiation_info
,
7999 # launch instantiate_N2VC in a asyncio task and register task object
8000 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
8001 # if not found, create one entry and update database
8002 # fill db_nsr._admin.deployed.VCA.<index>
8005 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
8009 get_charm_name
= False
8010 if "execution-environment-list" in descriptor_config
:
8011 ee_list
= descriptor_config
.get("execution-environment-list", [])
8012 elif "juju" in descriptor_config
:
8013 ee_list
= [descriptor_config
] # ns charms
8014 if "execution-environment-list" not in descriptor_config
:
8015 # charm name is only required for ns charms
8016 get_charm_name
= True
8017 else: # other types as script are not supported
8020 for ee_item
in ee_list
:
8023 + "_deploy_n2vc ee_item juju={}, helm={}".format(
8024 ee_item
.get("juju"), ee_item
.get("helm-chart")
8027 ee_descriptor_id
= ee_item
.get("id")
8028 if ee_item
.get("juju"):
8029 vca_name
= ee_item
["juju"].get("charm")
8031 charm_name
= self
.find_charm_name(db_nsr
, str(vca_name
))
8034 if ee_item
["juju"].get("charm") is not None
8037 if ee_item
["juju"].get("cloud") == "k8s":
8038 vca_type
= "k8s_proxy_charm"
8039 elif ee_item
["juju"].get("proxy") is False:
8040 vca_type
= "native_charm"
8041 elif ee_item
.get("helm-chart"):
8042 vca_name
= ee_item
["helm-chart"]
8043 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
8046 vca_type
= "helm-v3"
8049 logging_text
+ "skipping non juju neither charm configuration"
8054 for vca_index
, vca_deployed
in enumerate(
8055 db_nsr
["_admin"]["deployed"]["VCA"]
8057 if not vca_deployed
:
8060 vca_deployed
.get("member-vnf-index") == member_vnf_index
8061 and vca_deployed
.get("vdu_id") == vdu_id
8062 and vca_deployed
.get("kdu_name") == kdu_name
8063 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
8064 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
8068 # not found, create one.
8070 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
8073 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
8075 target
+= "/kdu/{}".format(kdu_name
)
8077 "target_element": target
,
8078 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
8079 "member-vnf-index": member_vnf_index
,
8081 "kdu_name": kdu_name
,
8082 "vdu_count_index": vdu_index
,
8083 "operational-status": "init", # TODO revise
8084 "detailed-status": "", # TODO revise
8085 "step": "initial-deploy", # TODO revise
8087 "vdu_name": vdu_name
,
8089 "ee_descriptor_id": ee_descriptor_id
,
8090 "charm_name": charm_name
,
8094 # create VCA and configurationStatus in db
8096 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
8097 "configurationStatus.{}".format(vca_index
): dict(),
8099 self
.update_db_2("nsrs", nsr_id
, db_dict
)
8101 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
8103 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
8104 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
8105 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
8108 task_n2vc
= asyncio
.ensure_future(
8110 logging_text
=logging_text
,
8111 vca_index
=vca_index
,
8117 vdu_index
=vdu_index
,
8118 deploy_params
=deploy_params
,
8119 config_descriptor
=descriptor_config
,
8120 base_folder
=base_folder
,
8121 nslcmop_id
=nslcmop_id
,
8125 ee_config_descriptor
=ee_item
,
8128 self
.lcm_tasks
.register(
8132 "instantiate_N2VC-{}".format(vca_index
),
8135 task_instantiation_info
[
8137 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
8138 member_vnf_index
or "", vdu_id
or ""
8141 async def heal_N2VC(
8158 ee_config_descriptor
,
8160 nsr_id
= db_nsr
["_id"]
8161 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
8162 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
8163 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
8164 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
8166 "collection": "nsrs",
8167 "filter": {"_id": nsr_id
},
8168 "path": db_update_entry
,
8174 element_under_configuration
= nsr_id
8178 vnfr_id
= db_vnfr
["_id"]
8179 osm_config
["osm"]["vnf_id"] = vnfr_id
8181 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
8183 if vca_type
== "native_charm":
8186 index_number
= vdu_index
or 0
8189 element_type
= "VNF"
8190 element_under_configuration
= vnfr_id
8191 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
8193 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
8194 element_type
= "VDU"
8195 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
8196 osm_config
["osm"]["vdu_id"] = vdu_id
8198 namespace
+= ".{}".format(kdu_name
)
8199 element_type
= "KDU"
8200 element_under_configuration
= kdu_name
8201 osm_config
["osm"]["kdu_name"] = kdu_name
8204 if base_folder
["pkg-dir"]:
8205 artifact_path
= "{}/{}/{}/{}".format(
8206 base_folder
["folder"],
8207 base_folder
["pkg-dir"],
8210 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8215 artifact_path
= "{}/Scripts/{}/{}/".format(
8216 base_folder
["folder"],
8219 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8224 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
8226 # get initial_config_primitive_list that applies to this element
8227 initial_config_primitive_list
= config_descriptor
.get(
8228 "initial-config-primitive"
8232 "Initial config primitive list > {}".format(
8233 initial_config_primitive_list
8237 # add config if not present for NS charm
8238 ee_descriptor_id
= ee_config_descriptor
.get("id")
8239 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
8240 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
8241 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
8245 "Initial config primitive list #2 > {}".format(
8246 initial_config_primitive_list
8249 # n2vc_redesign STEP 3.1
8250 # find old ee_id if exists
8251 ee_id
= vca_deployed
.get("ee_id")
8253 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
8254 # create or register execution environment in VCA. Only for native charms when healing
8255 if vca_type
== "native_charm":
8256 step
= "Waiting to VM being up and getting IP address"
8257 self
.logger
.debug(logging_text
+ step
)
8258 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8267 credentials
= {"hostname": rw_mgmt_ip
}
8269 username
= deep_get(
8270 config_descriptor
, ("config-access", "ssh-access", "default-user")
8272 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
8273 # merged. Meanwhile let's get username from initial-config-primitive
8274 if not username
and initial_config_primitive_list
:
8275 for config_primitive
in initial_config_primitive_list
:
8276 for param
in config_primitive
.get("parameter", ()):
8277 if param
["name"] == "ssh-username":
8278 username
= param
["value"]
8282 "Cannot determine the username neither with 'initial-config-primitive' nor with "
8283 "'config-access.ssh-access.default-user'"
8285 credentials
["username"] = username
8287 # n2vc_redesign STEP 3.2
8288 # TODO: Before healing at RO it is needed to destroy native charm units to be deleted.
8289 self
._write
_configuration
_status
(
8291 vca_index
=vca_index
,
8292 status
="REGISTERING",
8293 element_under_configuration
=element_under_configuration
,
8294 element_type
=element_type
,
8297 step
= "register execution environment {}".format(credentials
)
8298 self
.logger
.debug(logging_text
+ step
)
8299 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
8300 credentials
=credentials
,
8301 namespace
=namespace
,
8306 # update ee_id en db
8308 "_admin.deployed.VCA.{}.ee_id".format(vca_index
): ee_id
,
8310 self
.update_db_2("nsrs", nsr_id
, db_dict_ee_id
)
8312 # for compatibility with MON/POL modules, the need model and application name at database
8313 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
8314 # Not sure if this need to be done when healing
8316 ee_id_parts = ee_id.split(".")
8317 db_nsr_update = {db_update_entry + "ee_id": ee_id}
8318 if len(ee_id_parts) >= 2:
8319 model_name = ee_id_parts[0]
8320 application_name = ee_id_parts[1]
8321 db_nsr_update[db_update_entry + "model"] = model_name
8322 db_nsr_update[db_update_entry + "application"] = application_name
8325 # n2vc_redesign STEP 3.3
8326 # Install configuration software. Only for native charms.
8327 step
= "Install configuration Software"
8329 self
._write
_configuration
_status
(
8331 vca_index
=vca_index
,
8332 status
="INSTALLING SW",
8333 element_under_configuration
=element_under_configuration
,
8334 element_type
=element_type
,
8335 # other_update=db_nsr_update,
8339 # TODO check if already done
8340 self
.logger
.debug(logging_text
+ step
)
8342 if vca_type
== "native_charm":
8343 config_primitive
= next(
8344 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
8347 if config_primitive
:
8348 config
= self
._map
_primitive
_params
(
8349 config_primitive
, {}, deploy_params
8351 await self
.vca_map
[vca_type
].install_configuration_sw(
8353 artifact_path
=artifact_path
,
8361 # write in db flag of configuration_sw already installed
8363 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
8366 # Not sure if this need to be done when healing
8368 # add relations for this VCA (wait for other peers related with this VCA)
8369 await self._add_vca_relations(
8370 logging_text=logging_text,
8373 vca_index=vca_index,
8377 # if SSH access is required, then get execution environment SSH public
8378 # if native charm we have waited already to VM be UP
8379 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
8382 # self.logger.debug("get ssh key block")
8384 config_descriptor
, ("config-access", "ssh-access", "required")
8386 # self.logger.debug("ssh key needed")
8387 # Needed to inject a ssh key
8390 ("config-access", "ssh-access", "default-user"),
8392 step
= "Install configuration Software, getting public ssh key"
8393 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
8394 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
8397 step
= "Insert public key into VM user={} ssh_key={}".format(
8401 # self.logger.debug("no need to get ssh key")
8402 step
= "Waiting to VM being up and getting IP address"
8403 self
.logger
.debug(logging_text
+ step
)
8405 # n2vc_redesign STEP 5.1
8406 # wait for RO (ip-address) Insert pub_key into VM
8407 # IMPORTANT: We need do wait for RO to complete healing operation.
8408 await self
._wait
_heal
_ro
(nsr_id
, self
.timeout
.ns_heal
)
8411 rw_mgmt_ip
= await self
.wait_kdu_up(
8412 logging_text
, nsr_id
, vnfr_id
, kdu_name
8415 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8425 rw_mgmt_ip
= None # This is for a NS configuration
8427 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
8429 # store rw_mgmt_ip in deploy params for later replacement
8430 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
8433 # get run-day1 operation parameter
8434 runDay1
= deploy_params
.get("run-day1", False)
8436 "Healing vnf={}, vdu={}, runDay1 ={}".format(vnfr_id
, vdu_id
, runDay1
)
8439 # n2vc_redesign STEP 6 Execute initial config primitive
8440 step
= "execute initial config primitive"
8442 # wait for dependent primitives execution (NS -> VNF -> VDU)
8443 if initial_config_primitive_list
:
8444 await self
._wait
_dependent
_n
2vc
(
8445 nsr_id
, vca_deployed_list
, vca_index
8448 # stage, in function of element type: vdu, kdu, vnf or ns
8449 my_vca
= vca_deployed_list
[vca_index
]
8450 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
8452 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
8453 elif my_vca
.get("member-vnf-index"):
8455 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
8458 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
8460 self
._write
_configuration
_status
(
8461 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
8464 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
8466 check_if_terminated_needed
= True
8467 for initial_config_primitive
in initial_config_primitive_list
:
8468 # adding information on the vca_deployed if it is a NS execution environment
8469 if not vca_deployed
["member-vnf-index"]:
8470 deploy_params
["ns_config_info"] = json
.dumps(
8471 self
._get
_ns
_config
_info
(nsr_id
)
8473 # TODO check if already done
8474 primitive_params_
= self
._map
_primitive
_params
(
8475 initial_config_primitive
, {}, deploy_params
8478 step
= "execute primitive '{}' params '{}'".format(
8479 initial_config_primitive
["name"], primitive_params_
8481 self
.logger
.debug(logging_text
+ step
)
8482 await self
.vca_map
[vca_type
].exec_primitive(
8484 primitive_name
=initial_config_primitive
["name"],
8485 params_dict
=primitive_params_
,
8490 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
8491 if check_if_terminated_needed
:
8492 if config_descriptor
.get("terminate-config-primitive"):
8496 {db_update_entry
+ "needed_terminate": True},
8498 check_if_terminated_needed
= False
8500 # TODO register in database that primitive is done
8502 # STEP 7 Configure metrics
8503 # Not sure if this need to be done when healing
8505 if vca_type == "helm" or vca_type == "helm-v3":
8506 prometheus_jobs = await self.extract_prometheus_scrape_jobs(
8508 artifact_path=artifact_path,
8509 ee_config_descriptor=ee_config_descriptor,
8512 target_ip=rw_mgmt_ip,
8518 {db_update_entry + "prometheus_jobs": prometheus_jobs},
8521 for job in prometheus_jobs:
8524 {"job_name": job["job_name"]},
8527 fail_on_empty=False,
8531 step
= "instantiated at VCA"
8532 self
.logger
.debug(logging_text
+ step
)
8534 self
._write
_configuration
_status
(
8535 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
8538 except Exception as e
: # TODO not use Exception but N2VC exception
8539 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
8541 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
8544 "Exception while {} : {}".format(step
, e
), exc_info
=True
8546 self
._write
_configuration
_status
(
8547 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
8549 raise LcmException("{} {}".format(step
, e
)) from e
8551 async def _wait_heal_ro(
8557 while time() <= start_time
+ timeout
:
8558 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
8559 operational_status_ro
= db_nsr
["_admin"]["deployed"]["RO"][
8560 "operational-status"
8562 self
.logger
.debug("Wait Heal RO > {}".format(operational_status_ro
))
8563 if operational_status_ro
!= "healing":
8565 await asyncio
.sleep(15, loop
=self
.loop
)
8566 else: # timeout_ns_deploy
8567 raise NgRoException("Timeout waiting ns to deploy")
8569 async def vertical_scale(self
, nsr_id
, nslcmop_id
):
8571 Vertical Scale the VDUs in a NS
8573 :param: nsr_id: NS Instance ID
8574 :param: nslcmop_id: nslcmop ID of migrate
8577 # Try to lock HA task here
8578 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
8579 if not task_is_locked_by_me
:
8581 logging_text
= "Task ns={} vertical scale ".format(nsr_id
)
8582 self
.logger
.debug(logging_text
+ "Enter")
8583 # get all needed from database
8585 db_nslcmop_update
= {}
8586 nslcmop_operation_state
= None
8590 # in case of error, indicates what part of scale was failed to put nsr at error status
8591 start_deploy
= time()
8594 # wait for any previous tasks in process
8595 step
= "Waiting for previous operations to terminate"
8596 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
8598 self
._write
_ns
_status
(
8601 current_operation
="VerticalScale",
8602 current_operation_id
=nslcmop_id
,
8604 step
= "Getting nslcmop from database"
8606 step
+ " after having waited for previous tasks to be completed"
8608 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
8609 operationParams
= db_nslcmop
.get("operationParams")
8611 target
.update(operationParams
)
8612 desc
= await self
.RO
.vertical_scale(nsr_id
, target
)
8613 self
.logger
.debug("RO return > {}".format(desc
))
8614 action_id
= desc
["action_id"]
8615 await self
._wait
_ng
_ro
(
8620 self
.timeout
.verticalscale
,
8621 operation
="verticalscale",
8623 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
8624 self
.logger
.error("Exit Exception {}".format(e
))
8626 except asyncio
.CancelledError
:
8627 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
8628 exc
= "Operation was cancelled"
8629 except Exception as e
:
8630 exc
= traceback
.format_exc()
8631 self
.logger
.critical(
8632 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
8635 self
._write
_ns
_status
(
8638 current_operation
="IDLE",
8639 current_operation_id
=None,
8642 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
8643 nslcmop_operation_state
= "FAILED"
8645 nslcmop_operation_state
= "COMPLETED"
8646 db_nslcmop_update
["detailed-status"] = "Done"
8647 db_nsr_update
["detailed-status"] = "Done"
8649 self
._write
_op
_status
(
8653 operation_state
=nslcmop_operation_state
,
8654 other_update
=db_nslcmop_update
,
8656 if nslcmop_operation_state
:
8660 "nslcmop_id": nslcmop_id
,
8661 "operationState": nslcmop_operation_state
,
8663 await self
.msg
.aiowrite("ns", "verticalscaled", msg
, loop
=self
.loop
)
8664 except Exception as e
:
8666 logging_text
+ "kafka_write notification Exception {}".format(e
)
8668 self
.logger
.debug(logging_text
+ "Exit")
8669 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_verticalscale")