1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 from typing
import Any
, Dict
, List
24 import logging
.handlers
36 from osm_lcm
import ROclient
37 from osm_lcm
.data_utils
.lcm_config
import LcmCfg
38 from osm_lcm
.data_utils
.nsr
import (
41 get_deployed_vca_list
,
44 from osm_lcm
.data_utils
.vca
import (
53 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
54 from osm_lcm
.lcm_utils
import (
61 check_juju_bundle_existence
,
62 get_charm_artifact_path
,
65 from osm_lcm
.data_utils
.nsd
import (
66 get_ns_configuration_relation_list
,
70 from osm_lcm
.data_utils
.vnfd
import (
76 get_ee_sorted_initial_config_primitive_list
,
77 get_ee_sorted_terminate_config_primitive_list
,
79 get_virtual_link_profiles
,
84 get_number_of_instances
,
86 get_kdu_resource_profile
,
87 find_software_version
,
90 from osm_lcm
.data_utils
.list_utils
import find_in_list
91 from osm_lcm
.data_utils
.vnfr
import (
95 get_volumes_from_instantiation_params
,
97 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
98 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
99 from n2vc
.definitions
import RelationEndpoint
100 from n2vc
.k8s_helm_conn
import K8sHelmConnector
101 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
102 from n2vc
.k8s_juju_conn
import K8sJujuConnector
104 from osm_common
.dbbase
import DbException
105 from osm_common
.fsbase
import FsException
107 from osm_lcm
.data_utils
.database
.database
import Database
108 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
109 from osm_lcm
.data_utils
.wim
import (
111 get_target_wim_attrs
,
112 select_feasible_wim_account
,
115 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
116 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
118 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
119 from osm_lcm
.osm_config
import OsmConfigBuilder
120 from osm_lcm
.prometheus
import parse_job
122 from copy
import copy
, deepcopy
123 from time
import time
124 from uuid
import uuid4
126 from random
import randint
128 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
131 class NsLcm(LcmBase
):
132 SUBOPERATION_STATUS_NOT_FOUND
= -1
133 SUBOPERATION_STATUS_NEW
= -2
134 SUBOPERATION_STATUS_SKIP
= -3
135 task_name_deploy_vca
= "Deploying VCA"
137 def __init__(self
, msg
, lcm_tasks
, config
: LcmCfg
, loop
):
139 Init, Connect to database, filesystem storage, and messaging
140 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
143 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
145 self
.db
= Database().instance
.db
146 self
.fs
= Filesystem().instance
.fs
148 self
.lcm_tasks
= lcm_tasks
149 self
.timeout
= config
.timeout
150 self
.ro_config
= config
.RO
151 self
.vca_config
= config
.VCA
153 # create N2VC connector
154 self
.n2vc
= N2VCJujuConnector(
157 on_update_db
=self
._on
_update
_n
2vc
_db
,
162 self
.conn_helm_ee
= LCMHelmConn(
165 vca_config
=self
.vca_config
,
166 on_update_db
=self
._on
_update
_n
2vc
_db
,
169 self
.k8sclusterhelm2
= K8sHelmConnector(
170 kubectl_command
=self
.vca_config
.kubectlpath
,
171 helm_command
=self
.vca_config
.helmpath
,
178 self
.k8sclusterhelm3
= K8sHelm3Connector(
179 kubectl_command
=self
.vca_config
.kubectlpath
,
180 helm_command
=self
.vca_config
.helm3path
,
187 self
.k8sclusterjuju
= K8sJujuConnector(
188 kubectl_command
=self
.vca_config
.kubectlpath
,
189 juju_command
=self
.vca_config
.jujupath
,
192 on_update_db
=self
._on
_update
_k
8s
_db
,
197 self
.k8scluster_map
= {
198 "helm-chart": self
.k8sclusterhelm2
,
199 "helm-chart-v3": self
.k8sclusterhelm3
,
200 "chart": self
.k8sclusterhelm3
,
201 "juju-bundle": self
.k8sclusterjuju
,
202 "juju": self
.k8sclusterjuju
,
206 "lxc_proxy_charm": self
.n2vc
,
207 "native_charm": self
.n2vc
,
208 "k8s_proxy_charm": self
.n2vc
,
209 "helm": self
.conn_helm_ee
,
210 "helm-v3": self
.conn_helm_ee
,
214 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
.to_dict())
216 self
.op_status_map
= {
217 "instantiation": self
.RO
.status
,
218 "termination": self
.RO
.status
,
219 "migrate": self
.RO
.status
,
220 "healing": self
.RO
.recreate_status
,
221 "verticalscale": self
.RO
.status
,
222 "start_stop_rebuild": self
.RO
.status
,
226 def increment_ip_mac(ip_mac
, vm_index
=1):
227 if not isinstance(ip_mac
, str):
230 # try with ipv4 look for last dot
231 i
= ip_mac
.rfind(".")
234 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
235 # try with ipv6 or mac look for last colon. Operate in hex
236 i
= ip_mac
.rfind(":")
239 # format in hex, len can be 2 for mac or 4 for ipv6
240 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
241 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
247 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
249 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
252 # TODO filter RO descriptor fields...
256 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
257 db_dict
["deploymentStatus"] = ro_descriptor
258 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
260 except Exception as e
:
262 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id
, e
)
265 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
267 # remove last dot from path (if exists)
268 if path
.endswith("."):
271 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
272 # .format(table, filter, path, updated_data))
275 nsr_id
= filter.get("_id")
277 # read ns record from database
278 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
279 current_ns_status
= nsr
.get("nsState")
281 # get vca status for NS
282 status_dict
= await self
.n2vc
.get_status(
283 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
288 db_dict
["vcaStatus"] = status_dict
290 # update configurationStatus for this VCA
292 vca_index
= int(path
[path
.rfind(".") + 1 :])
295 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
297 vca_status
= vca_list
[vca_index
].get("status")
299 configuration_status_list
= nsr
.get("configurationStatus")
300 config_status
= configuration_status_list
[vca_index
].get("status")
302 if config_status
== "BROKEN" and vca_status
!= "failed":
303 db_dict
["configurationStatus"][vca_index
] = "READY"
304 elif config_status
!= "BROKEN" and vca_status
== "failed":
305 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
306 except Exception as e
:
307 # not update configurationStatus
308 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
310 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
311 # if nsState = 'DEGRADED' check if all is OK
313 if current_ns_status
in ("READY", "DEGRADED"):
314 error_description
= ""
316 if status_dict
.get("machines"):
317 for machine_id
in status_dict
.get("machines"):
318 machine
= status_dict
.get("machines").get(machine_id
)
319 # check machine agent-status
320 if machine
.get("agent-status"):
321 s
= machine
.get("agent-status").get("status")
324 error_description
+= (
325 "machine {} agent-status={} ; ".format(
329 # check machine instance status
330 if machine
.get("instance-status"):
331 s
= machine
.get("instance-status").get("status")
334 error_description
+= (
335 "machine {} instance-status={} ; ".format(
340 if status_dict
.get("applications"):
341 for app_id
in status_dict
.get("applications"):
342 app
= status_dict
.get("applications").get(app_id
)
343 # check application status
344 if app
.get("status"):
345 s
= app
.get("status").get("status")
348 error_description
+= (
349 "application {} status={} ; ".format(app_id
, s
)
352 if error_description
:
353 db_dict
["errorDescription"] = error_description
354 if current_ns_status
== "READY" and is_degraded
:
355 db_dict
["nsState"] = "DEGRADED"
356 if current_ns_status
== "DEGRADED" and not is_degraded
:
357 db_dict
["nsState"] = "READY"
360 self
.update_db_2("nsrs", nsr_id
, db_dict
)
362 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
364 except Exception as e
:
365 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
367 async def _on_update_k8s_db(
368 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None, cluster_type
="juju"
371 Updating vca status in NSR record
372 :param cluster_uuid: UUID of a k8s cluster
373 :param kdu_instance: The unique name of the KDU instance
374 :param filter: To get nsr_id
375 :cluster_type: The cluster type (juju, k8s)
379 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
380 # .format(cluster_uuid, kdu_instance, filter))
382 nsr_id
= filter.get("_id")
384 vca_status
= await self
.k8scluster_map
[cluster_type
].status_kdu(
385 cluster_uuid
=cluster_uuid
,
386 kdu_instance
=kdu_instance
,
388 complete_status
=True,
394 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
397 f
"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
401 self
.update_db_2("nsrs", nsr_id
, db_dict
)
402 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
404 except Exception as e
:
405 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
408 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
411 undefined
=StrictUndefined
,
412 autoescape
=select_autoescape(default_for_string
=True, default
=True),
414 template
= env
.from_string(cloud_init_text
)
415 return template
.render(additional_params
or {})
416 except UndefinedError
as e
:
418 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
419 "file, must be provided in the instantiation parameters inside the "
420 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
422 except (TemplateError
, TemplateNotFound
) as e
:
424 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
429 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
430 cloud_init_content
= cloud_init_file
= None
432 if vdu
.get("cloud-init-file"):
433 base_folder
= vnfd
["_admin"]["storage"]
434 if base_folder
["pkg-dir"]:
435 cloud_init_file
= "{}/{}/cloud_init/{}".format(
436 base_folder
["folder"],
437 base_folder
["pkg-dir"],
438 vdu
["cloud-init-file"],
441 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
442 base_folder
["folder"],
443 vdu
["cloud-init-file"],
445 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
446 cloud_init_content
= ci_file
.read()
447 elif vdu
.get("cloud-init"):
448 cloud_init_content
= vdu
["cloud-init"]
450 return cloud_init_content
451 except FsException
as e
:
453 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
454 vnfd
["id"], vdu
["id"], cloud_init_file
, e
458 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
460 (vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]), {}
462 additional_params
= vdur
.get("additionalParams")
463 return parse_yaml_strings(additional_params
)
465 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
467 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
468 :param vnfd: input vnfd
469 :param new_id: overrides vnf id if provided
470 :param additionalParams: Instantiation params for VNFs provided
471 :param nsrId: Id of the NSR
472 :return: copy of vnfd
474 vnfd_RO
= deepcopy(vnfd
)
475 # remove unused by RO configuration, monitoring, scaling and internal keys
476 vnfd_RO
.pop("_id", None)
477 vnfd_RO
.pop("_admin", None)
478 vnfd_RO
.pop("monitoring-param", None)
479 vnfd_RO
.pop("scaling-group-descriptor", None)
480 vnfd_RO
.pop("kdu", None)
481 vnfd_RO
.pop("k8s-cluster", None)
483 vnfd_RO
["id"] = new_id
485 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
486 for vdu
in get_iterable(vnfd_RO
, "vdu"):
487 vdu
.pop("cloud-init-file", None)
488 vdu
.pop("cloud-init", None)
492 def ip_profile_2_RO(ip_profile
):
493 RO_ip_profile
= deepcopy(ip_profile
)
494 if "dns-server" in RO_ip_profile
:
495 if isinstance(RO_ip_profile
["dns-server"], list):
496 RO_ip_profile
["dns-address"] = []
497 for ds
in RO_ip_profile
.pop("dns-server"):
498 RO_ip_profile
["dns-address"].append(ds
["address"])
500 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
501 if RO_ip_profile
.get("ip-version") == "ipv4":
502 RO_ip_profile
["ip-version"] = "IPv4"
503 if RO_ip_profile
.get("ip-version") == "ipv6":
504 RO_ip_profile
["ip-version"] = "IPv6"
505 if "dhcp-params" in RO_ip_profile
:
506 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
509 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
510 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
511 if db_vim
["_admin"]["operationalState"] != "ENABLED":
513 "VIM={} is not available. operationalState={}".format(
514 vim_account
, db_vim
["_admin"]["operationalState"]
517 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
520 def get_ro_wim_id_for_wim_account(self
, wim_account
):
521 if isinstance(wim_account
, str):
522 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
523 if db_wim
["_admin"]["operationalState"] != "ENABLED":
525 "WIM={} is not available. operationalState={}".format(
526 wim_account
, db_wim
["_admin"]["operationalState"]
529 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
534 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
536 db_vdu_push_list
= []
538 db_update
= {"_admin.modified": time()}
540 for vdu_id
, vdu_count
in vdu_create
.items():
544 for vdur
in reversed(db_vnfr
["vdur"])
545 if vdur
["vdu-id-ref"] == vdu_id
550 # Read the template saved in the db:
552 "No vdur in the database. Using the vdur-template to scale"
554 vdur_template
= db_vnfr
.get("vdur-template")
555 if not vdur_template
:
557 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
561 vdur
= vdur_template
[0]
562 # Delete a template from the database after using it
565 {"_id": db_vnfr
["_id"]},
567 pull
={"vdur-template": {"_id": vdur
["_id"]}},
569 for count
in range(vdu_count
):
570 vdur_copy
= deepcopy(vdur
)
571 vdur_copy
["status"] = "BUILD"
572 vdur_copy
["status-detailed"] = None
573 vdur_copy
["ip-address"] = None
574 vdur_copy
["_id"] = str(uuid4())
575 vdur_copy
["count-index"] += count
+ 1
576 vdur_copy
["id"] = "{}-{}".format(
577 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
579 vdur_copy
.pop("vim_info", None)
580 for iface
in vdur_copy
["interfaces"]:
581 if iface
.get("fixed-ip"):
582 iface
["ip-address"] = self
.increment_ip_mac(
583 iface
["ip-address"], count
+ 1
586 iface
.pop("ip-address", None)
587 if iface
.get("fixed-mac"):
588 iface
["mac-address"] = self
.increment_ip_mac(
589 iface
["mac-address"], count
+ 1
592 iface
.pop("mac-address", None)
596 ) # only first vdu can be managment of vnf
597 db_vdu_push_list
.append(vdur_copy
)
598 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
600 if len(db_vnfr
["vdur"]) == 1:
601 # The scale will move to 0 instances
603 "Scaling to 0 !, creating the template with the last vdur"
605 template_vdur
= [db_vnfr
["vdur"][0]]
606 for vdu_id
, vdu_count
in vdu_delete
.items():
608 indexes_to_delete
= [
610 for iv
in enumerate(db_vnfr
["vdur"])
611 if iv
[1]["vdu-id-ref"] == vdu_id
615 "vdur.{}.status".format(i
): "DELETING"
616 for i
in indexes_to_delete
[-vdu_count
:]
620 # it must be deleted one by one because common.db does not allow otherwise
623 for v
in reversed(db_vnfr
["vdur"])
624 if v
["vdu-id-ref"] == vdu_id
626 for vdu
in vdus_to_delete
[:vdu_count
]:
629 {"_id": db_vnfr
["_id"]},
631 pull
={"vdur": {"_id": vdu
["_id"]}},
635 db_push
["vdur"] = db_vdu_push_list
637 db_push
["vdur-template"] = template_vdur
640 db_vnfr
["vdur-template"] = template_vdur
641 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
642 # modify passed dictionary db_vnfr
643 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
644 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
646 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
648 Updates database nsr with the RO info for the created vld
649 :param ns_update_nsr: dictionary to be filled with the updated info
650 :param db_nsr: content of db_nsr. This is also modified
651 :param nsr_desc_RO: nsr descriptor from RO
652 :return: Nothing, LcmException is raised on errors
655 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
656 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
657 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
659 vld
["vim-id"] = net_RO
.get("vim_net_id")
660 vld
["name"] = net_RO
.get("vim_name")
661 vld
["status"] = net_RO
.get("status")
662 vld
["status-detailed"] = net_RO
.get("error_msg")
663 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
667 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
670 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
672 for db_vnfr
in db_vnfrs
.values():
673 vnfr_update
= {"status": "ERROR"}
674 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
675 if "status" not in vdur
:
676 vdur
["status"] = "ERROR"
677 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
679 vdur
["status-detailed"] = str(error_text
)
681 "vdur.{}.status-detailed".format(vdu_index
)
683 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
684 except DbException
as e
:
685 self
.logger
.error("Cannot update vnf. {}".format(e
))
687 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
689 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
690 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
691 :param nsr_desc_RO: nsr descriptor from RO
692 :return: Nothing, LcmException is raised on errors
694 for vnf_index
, db_vnfr
in db_vnfrs
.items():
695 for vnf_RO
in nsr_desc_RO
["vnfs"]:
696 if vnf_RO
["member_vnf_index"] != vnf_index
:
699 if vnf_RO
.get("ip_address"):
700 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
[
703 elif not db_vnfr
.get("ip-address"):
704 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
705 raise LcmExceptionNoMgmtIP(
706 "ns member_vnf_index '{}' has no IP address".format(
711 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
712 vdur_RO_count_index
= 0
713 if vdur
.get("pdu-type"):
715 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
716 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
718 if vdur
["count-index"] != vdur_RO_count_index
:
719 vdur_RO_count_index
+= 1
721 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
722 if vdur_RO
.get("ip_address"):
723 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
725 vdur
["ip-address"] = None
726 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
727 vdur
["name"] = vdur_RO
.get("vim_name")
728 vdur
["status"] = vdur_RO
.get("status")
729 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
730 for ifacer
in get_iterable(vdur
, "interfaces"):
731 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
732 if ifacer
["name"] == interface_RO
.get("internal_name"):
733 ifacer
["ip-address"] = interface_RO
.get(
736 ifacer
["mac-address"] = interface_RO
.get(
742 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
743 "from VIM info".format(
744 vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]
747 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
751 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
753 vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]
757 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
758 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
759 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
761 vld
["vim-id"] = net_RO
.get("vim_net_id")
762 vld
["name"] = net_RO
.get("vim_name")
763 vld
["status"] = net_RO
.get("status")
764 vld
["status-detailed"] = net_RO
.get("error_msg")
765 vnfr_update
["vld.{}".format(vld_index
)] = vld
769 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
774 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
779 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
784 def _get_ns_config_info(self
, nsr_id
):
786 Generates a mapping between vnf,vdu elements and the N2VC id
787 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
788 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
789 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
790 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
792 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
793 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
795 ns_config_info
= {"osm-config-mapping": mapping
}
796 for vca
in vca_deployed_list
:
797 if not vca
["member-vnf-index"]:
799 if not vca
["vdu_id"]:
800 mapping
[vca
["member-vnf-index"]] = vca
["application"]
804 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
806 ] = vca
["application"]
807 return ns_config_info
809 async def _instantiate_ng_ro(
826 def get_vim_account(vim_account_id
):
828 if vim_account_id
in db_vims
:
829 return db_vims
[vim_account_id
]
830 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
831 db_vims
[vim_account_id
] = db_vim
834 # modify target_vld info with instantiation parameters
835 def parse_vld_instantiation_params(
836 target_vim
, target_vld
, vld_params
, target_sdn
838 if vld_params
.get("ip-profile"):
839 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
[
842 if vld_params
.get("provider-network"):
843 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
846 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
847 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
851 # check if WIM is needed; if needed, choose a feasible WIM able to connect VIMs
852 # if wim_account_id is specified in vld_params, validate if it is feasible.
853 wim_account_id
, db_wim
= select_feasible_wim_account(
854 db_nsr
, db_vnfrs
, target_vld
, vld_params
, self
.logger
858 # WIM is needed and a feasible one was found, populate WIM target and SDN ports
859 self
.logger
.info("WIM selected: {:s}".format(str(wim_account_id
)))
860 # update vld_params with correct WIM account Id
861 vld_params
["wimAccountId"] = wim_account_id
863 target_wim
= "wim:{}".format(wim_account_id
)
864 target_wim_attrs
= get_target_wim_attrs(nsr_id
, target_vld
, vld_params
)
865 sdn_ports
= get_sdn_ports(vld_params
, db_wim
)
866 if len(sdn_ports
) > 0:
867 target_vld
["vim_info"][target_wim
] = target_wim_attrs
868 target_vld
["vim_info"][target_wim
]["sdn-ports"] = sdn_ports
871 "Target VLD with WIM data: {:s}".format(str(target_vld
))
874 for param
in ("vim-network-name", "vim-network-id"):
875 if vld_params
.get(param
):
876 if isinstance(vld_params
[param
], dict):
877 for vim
, vim_net
in vld_params
[param
].items():
878 other_target_vim
= "vim:" + vim
880 target_vld
["vim_info"],
881 (other_target_vim
, param
.replace("-", "_")),
884 else: # isinstance str
885 target_vld
["vim_info"][target_vim
][
886 param
.replace("-", "_")
887 ] = vld_params
[param
]
888 if vld_params
.get("common_id"):
889 target_vld
["common_id"] = vld_params
.get("common_id")
891 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
892 def update_ns_vld_target(target
, ns_params
):
893 for vnf_params
in ns_params
.get("vnf", ()):
894 if vnf_params
.get("vimAccountId"):
898 for vnfr
in db_vnfrs
.values()
899 if vnf_params
["member-vnf-index"]
900 == vnfr
["member-vnf-index-ref"]
904 vdur
= next((vdur
for vdur
in target_vnf
.get("vdur", ())), None)
907 for a_index
, a_vld
in enumerate(target
["ns"]["vld"]):
908 target_vld
= find_in_list(
909 get_iterable(vdur
, "interfaces"),
910 lambda iface
: iface
.get("ns-vld-id") == a_vld
["name"],
913 vld_params
= find_in_list(
914 get_iterable(ns_params
, "vld"),
915 lambda v_vld
: v_vld
["name"] in (a_vld
["name"], a_vld
["id"]),
919 if vnf_params
.get("vimAccountId") not in a_vld
.get(
922 target_vim_network_list
= [
923 v
for _
, v
in a_vld
.get("vim_info").items()
925 target_vim_network_name
= next(
927 item
.get("vim_network_name", "")
928 for item
in target_vim_network_list
933 target
["ns"]["vld"][a_index
].get("vim_info").update(
935 "vim:{}".format(vnf_params
["vimAccountId"]): {
936 "vim_network_name": target_vim_network_name
,
942 for param
in ("vim-network-name", "vim-network-id"):
943 if vld_params
.get(param
) and isinstance(
944 vld_params
[param
], dict
946 for vim
, vim_net
in vld_params
[
949 other_target_vim
= "vim:" + vim
951 target
["ns"]["vld"][a_index
].get(
956 param
.replace("-", "_"),
961 nslcmop_id
= db_nslcmop
["_id"]
963 "name": db_nsr
["name"],
966 "image": deepcopy(db_nsr
["image"]),
967 "flavor": deepcopy(db_nsr
["flavor"]),
968 "action_id": nslcmop_id
,
969 "cloud_init_content": {},
971 for image
in target
["image"]:
972 image
["vim_info"] = {}
973 for flavor
in target
["flavor"]:
974 flavor
["vim_info"] = {}
975 if db_nsr
.get("affinity-or-anti-affinity-group"):
976 target
["affinity-or-anti-affinity-group"] = deepcopy(
977 db_nsr
["affinity-or-anti-affinity-group"]
979 for affinity_or_anti_affinity_group
in target
[
980 "affinity-or-anti-affinity-group"
982 affinity_or_anti_affinity_group
["vim_info"] = {}
984 if db_nslcmop
.get("lcmOperationType") != "instantiate":
985 # get parameters of instantiation:
986 db_nslcmop_instantiate
= self
.db
.get_list(
989 "nsInstanceId": db_nslcmop
["nsInstanceId"],
990 "lcmOperationType": "instantiate",
993 ns_params
= db_nslcmop_instantiate
.get("operationParams")
995 ns_params
= db_nslcmop
.get("operationParams")
996 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
997 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
1000 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
1001 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
1004 "name": vld
["name"],
1005 "mgmt-network": vld
.get("mgmt-network", False),
1006 "type": vld
.get("type"),
1009 "vim_network_name": vld
.get("vim-network-name"),
1010 "vim_account_id": ns_params
["vimAccountId"],
1014 # check if this network needs SDN assist
1015 if vld
.get("pci-interfaces"):
1016 db_vim
= get_vim_account(ns_params
["vimAccountId"])
1017 if vim_config
:= db_vim
.get("config"):
1018 if sdnc_id
:= vim_config
.get("sdn-controller"):
1019 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
1020 target_sdn
= "sdn:{}".format(sdnc_id
)
1021 target_vld
["vim_info"][target_sdn
] = {
1023 "target_vim": target_vim
,
1025 "type": vld
.get("type"),
1028 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
1029 for nsd_vnf_profile
in nsd_vnf_profiles
:
1030 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
1031 if cp
["virtual-link-profile-id"] == vld
["id"]:
1033 "member_vnf:{}.{}".format(
1034 cp
["constituent-cpd-id"][0][
1035 "constituent-base-element-id"
1037 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
1039 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
1041 # check at nsd descriptor, if there is an ip-profile
1043 nsd_vlp
= find_in_list(
1044 get_virtual_link_profiles(nsd
),
1045 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
1050 and nsd_vlp
.get("virtual-link-protocol-data")
1051 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1053 ip_profile_source_data
= nsd_vlp
["virtual-link-protocol-data"][
1056 ip_profile_dest_data
= {}
1057 if "ip-version" in ip_profile_source_data
:
1058 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1061 if "cidr" in ip_profile_source_data
:
1062 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1065 if "gateway-ip" in ip_profile_source_data
:
1066 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
[
1069 if "dhcp-enabled" in ip_profile_source_data
:
1070 ip_profile_dest_data
["dhcp-params"] = {
1071 "enabled": ip_profile_source_data
["dhcp-enabled"]
1073 vld_params
["ip-profile"] = ip_profile_dest_data
1075 # update vld_params with instantiation params
1076 vld_instantiation_params
= find_in_list(
1077 get_iterable(ns_params
, "vld"),
1078 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
1080 if vld_instantiation_params
:
1081 vld_params
.update(vld_instantiation_params
)
1082 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
1083 target
["ns"]["vld"].append(target_vld
)
1084 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1085 update_ns_vld_target(target
, ns_params
)
1087 for vnfr
in db_vnfrs
.values():
1088 vnfd
= find_in_list(
1089 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
1091 vnf_params
= find_in_list(
1092 get_iterable(ns_params
, "vnf"),
1093 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
1095 target_vnf
= deepcopy(vnfr
)
1096 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
1097 for vld
in target_vnf
.get("vld", ()):
1098 # check if connected to a ns.vld, to fill target'
1099 vnf_cp
= find_in_list(
1100 vnfd
.get("int-virtual-link-desc", ()),
1101 lambda cpd
: cpd
.get("id") == vld
["id"],
1104 ns_cp
= "member_vnf:{}.{}".format(
1105 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
1107 if cp2target
.get(ns_cp
):
1108 vld
["target"] = cp2target
[ns_cp
]
1111 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
1113 # check if this network needs SDN assist
1115 if vld
.get("pci-interfaces"):
1116 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1117 sdnc_id
= db_vim
["config"].get("sdn-controller")
1119 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
1120 target_sdn
= "sdn:{}".format(sdnc_id
)
1121 vld
["vim_info"][target_sdn
] = {
1123 "target_vim": target_vim
,
1125 "type": vld
.get("type"),
1128 # check at vnfd descriptor, if there is an ip-profile
1130 vnfd_vlp
= find_in_list(
1131 get_virtual_link_profiles(vnfd
),
1132 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
1136 and vnfd_vlp
.get("virtual-link-protocol-data")
1137 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1139 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"][
1142 ip_profile_dest_data
= {}
1143 if "ip-version" in ip_profile_source_data
:
1144 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1147 if "cidr" in ip_profile_source_data
:
1148 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1151 if "gateway-ip" in ip_profile_source_data
:
1152 ip_profile_dest_data
[
1154 ] = ip_profile_source_data
["gateway-ip"]
1155 if "dhcp-enabled" in ip_profile_source_data
:
1156 ip_profile_dest_data
["dhcp-params"] = {
1157 "enabled": ip_profile_source_data
["dhcp-enabled"]
1160 vld_params
["ip-profile"] = ip_profile_dest_data
1161 # update vld_params with instantiation params
1163 vld_instantiation_params
= find_in_list(
1164 get_iterable(vnf_params
, "internal-vld"),
1165 lambda i_vld
: i_vld
["name"] == vld
["id"],
1167 if vld_instantiation_params
:
1168 vld_params
.update(vld_instantiation_params
)
1169 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
1172 for vdur
in target_vnf
.get("vdur", ()):
1173 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
1174 continue # This vdu must not be created
1175 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
1177 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
1180 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
1181 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
1184 and vdu_configuration
.get("config-access")
1185 and vdu_configuration
.get("config-access").get("ssh-access")
1187 vdur
["ssh-keys"] = ssh_keys_all
1188 vdur
["ssh-access-required"] = vdu_configuration
[
1190 ]["ssh-access"]["required"]
1193 and vnf_configuration
.get("config-access")
1194 and vnf_configuration
.get("config-access").get("ssh-access")
1195 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
1197 vdur
["ssh-keys"] = ssh_keys_all
1198 vdur
["ssh-access-required"] = vnf_configuration
[
1200 ]["ssh-access"]["required"]
1201 elif ssh_keys_instantiation
and find_in_list(
1202 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
1204 vdur
["ssh-keys"] = ssh_keys_instantiation
1206 self
.logger
.debug("NS > vdur > {}".format(vdur
))
1208 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1210 if vdud
.get("cloud-init-file"):
1211 vdur
["cloud-init"] = "{}:file:{}".format(
1212 vnfd
["_id"], vdud
.get("cloud-init-file")
1214 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1215 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1216 base_folder
= vnfd
["_admin"]["storage"]
1217 if base_folder
["pkg-dir"]:
1218 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1219 base_folder
["folder"],
1220 base_folder
["pkg-dir"],
1221 vdud
.get("cloud-init-file"),
1224 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
1225 base_folder
["folder"],
1226 vdud
.get("cloud-init-file"),
1228 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1229 target
["cloud_init_content"][
1232 elif vdud
.get("cloud-init"):
1233 vdur
["cloud-init"] = "{}:vdu:{}".format(
1234 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1236 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1237 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1240 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1241 deploy_params_vdu
= self
._format
_additional
_params
(
1242 vdur
.get("additionalParams") or {}
1244 deploy_params_vdu
["OSM"] = get_osm_params(
1245 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1247 vdur
["additionalParams"] = deploy_params_vdu
1250 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1251 if target_vim
not in ns_flavor
["vim_info"]:
1252 ns_flavor
["vim_info"][target_vim
] = {}
1255 # in case alternative images are provided we must check if they should be applied
1256 # for the vim_type, modify the vim_type taking into account
1257 ns_image_id
= int(vdur
["ns-image-id"])
1258 if vdur
.get("alt-image-ids"):
1259 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1260 vim_type
= db_vim
["vim_type"]
1261 for alt_image_id
in vdur
.get("alt-image-ids"):
1262 ns_alt_image
= target
["image"][int(alt_image_id
)]
1263 if vim_type
== ns_alt_image
.get("vim-type"):
1264 # must use alternative image
1266 "use alternative image id: {}".format(alt_image_id
)
1268 ns_image_id
= alt_image_id
1269 vdur
["ns-image-id"] = ns_image_id
1271 ns_image
= target
["image"][int(ns_image_id
)]
1272 if target_vim
not in ns_image
["vim_info"]:
1273 ns_image
["vim_info"][target_vim
] = {}
1276 if vdur
.get("affinity-or-anti-affinity-group-id"):
1277 for ags_id
in vdur
["affinity-or-anti-affinity-group-id"]:
1278 ns_ags
= target
["affinity-or-anti-affinity-group"][int(ags_id
)]
1279 if target_vim
not in ns_ags
["vim_info"]:
1280 ns_ags
["vim_info"][target_vim
] = {}
1282 vdur
["vim_info"] = {target_vim
: {}}
1283 # instantiation parameters
1285 vdu_instantiation_params
= find_in_list(
1286 get_iterable(vnf_params
, "vdu"),
1287 lambda i_vdu
: i_vdu
["id"] == vdud
["id"],
1289 if vdu_instantiation_params
:
1290 # Parse the vdu_volumes from the instantiation params
1291 vdu_volumes
= get_volumes_from_instantiation_params(
1292 vdu_instantiation_params
, vdud
1294 vdur
["additionalParams"]["OSM"]["vdu_volumes"] = vdu_volumes
1295 vdur_list
.append(vdur
)
1296 target_vnf
["vdur"] = vdur_list
1297 target
["vnf"].append(target_vnf
)
1299 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
1300 desc
= await self
.RO
.deploy(nsr_id
, target
)
1301 self
.logger
.debug("RO return > {}".format(desc
))
1302 action_id
= desc
["action_id"]
1303 await self
._wait
_ng
_ro
(
1310 operation
="instantiation",
1315 "_admin.deployed.RO.operational-status": "running",
1316 "detailed-status": " ".join(stage
),
1318 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1319 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1320 self
._write
_op
_status
(nslcmop_id
, stage
)
1322 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1326 async def _wait_ng_ro(
1336 detailed_status_old
= None
1338 start_time
= start_time
or time()
1339 while time() <= start_time
+ timeout
:
1340 desc_status
= await self
.op_status_map
[operation
](nsr_id
, action_id
)
1341 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1342 if desc_status
["status"] == "FAILED":
1343 raise NgRoException(desc_status
["details"])
1344 elif desc_status
["status"] == "BUILD":
1346 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1347 elif desc_status
["status"] == "DONE":
1349 stage
[2] = "Deployed at VIM"
1352 assert False, "ROclient.check_ns_status returns unknown {}".format(
1353 desc_status
["status"]
1355 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1356 detailed_status_old
= stage
[2]
1357 db_nsr_update
["detailed-status"] = " ".join(stage
)
1358 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1359 self
._write
_op
_status
(nslcmop_id
, stage
)
1360 await asyncio
.sleep(15, loop
=self
.loop
)
1361 else: # timeout_ns_deploy
1362 raise NgRoException("Timeout waiting ns to deploy")
1364 async def _terminate_ng_ro(
1365 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1370 start_deploy
= time()
1377 "action_id": nslcmop_id
,
1379 desc
= await self
.RO
.deploy(nsr_id
, target
)
1380 action_id
= desc
["action_id"]
1381 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1384 + "ns terminate action at RO. action_id={}".format(action_id
)
1388 delete_timeout
= 20 * 60 # 20 minutes
1389 await self
._wait
_ng
_ro
(
1396 operation
="termination",
1398 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1400 await self
.RO
.delete(nsr_id
)
1401 except NgRoException
as e
:
1402 if e
.http_code
== 404: # not found
1403 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1404 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1406 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1408 elif e
.http_code
== 409: # conflict
1409 failed_detail
.append("delete conflict: {}".format(e
))
1412 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1415 failed_detail
.append("delete error: {}".format(e
))
1418 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1420 except Exception as e
:
1421 failed_detail
.append("delete error: {}".format(e
))
1423 logging_text
+ "RO_action_id={} delete error: {}".format(action_id
, e
)
1427 stage
[2] = "Error deleting from VIM"
1429 stage
[2] = "Deleted from VIM"
1430 db_nsr_update
["detailed-status"] = " ".join(stage
)
1431 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1432 self
._write
_op
_status
(nslcmop_id
, stage
)
1435 raise LcmException("; ".join(failed_detail
))
1438 async def instantiate_RO(
1452 :param logging_text: preffix text to use at logging
1453 :param nsr_id: nsr identity
1454 :param nsd: database content of ns descriptor
1455 :param db_nsr: database content of ns record
1456 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1458 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1459 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1460 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1461 :return: None or exception
1464 start_deploy
= time()
1465 ns_params
= db_nslcmop
.get("operationParams")
1466 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1467 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1469 timeout_ns_deploy
= self
.timeout
.ns_deploy
1471 # Check for and optionally request placement optimization. Database will be updated if placement activated
1472 stage
[2] = "Waiting for Placement."
1473 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1474 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1475 for vnfr
in db_vnfrs
.values():
1476 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1479 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1481 return await self
._instantiate
_ng
_ro
(
1494 except Exception as e
:
1495 stage
[2] = "ERROR deploying at VIM"
1496 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1498 "Error deploying at VIM {}".format(e
),
1499 exc_info
=not isinstance(
1502 ROclient
.ROClientException
,
1511 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1513 Wait for kdu to be up, get ip address
1514 :param logging_text: prefix use for logging
1518 :return: IP address, K8s services
1521 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1524 while nb_tries
< 360:
1525 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1529 for x
in get_iterable(db_vnfr
, "kdur")
1530 if x
.get("kdu-name") == kdu_name
1536 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1538 if kdur
.get("status"):
1539 if kdur
["status"] in ("READY", "ENABLED"):
1540 return kdur
.get("ip-address"), kdur
.get("services")
1543 "target KDU={} is in error state".format(kdu_name
)
1546 await asyncio
.sleep(10, loop
=self
.loop
)
1548 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1550 async def wait_vm_up_insert_key_ro(
1551 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1554 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1555 :param logging_text: prefix use for logging
1560 :param pub_key: public ssh key to inject, None to skip
1561 :param user: user to apply the public ssh key
1565 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1567 target_vdu_id
= None
1573 if ro_retries
>= 360: # 1 hour
1575 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1578 await asyncio
.sleep(10, loop
=self
.loop
)
1581 if not target_vdu_id
:
1582 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1584 if not vdu_id
: # for the VNF case
1585 if db_vnfr
.get("status") == "ERROR":
1587 "Cannot inject ssh-key because target VNF is in error state"
1589 ip_address
= db_vnfr
.get("ip-address")
1595 for x
in get_iterable(db_vnfr
, "vdur")
1596 if x
.get("ip-address") == ip_address
1604 for x
in get_iterable(db_vnfr
, "vdur")
1605 if x
.get("vdu-id-ref") == vdu_id
1606 and x
.get("count-index") == vdu_index
1612 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1613 ): # If only one, this should be the target vdu
1614 vdur
= db_vnfr
["vdur"][0]
1617 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1618 vnfr_id
, vdu_id
, vdu_index
1621 # New generation RO stores information at "vim_info"
1624 if vdur
.get("vim_info"):
1626 t
for t
in vdur
["vim_info"]
1627 ) # there should be only one key
1628 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1630 vdur
.get("pdu-type")
1631 or vdur
.get("status") == "ACTIVE"
1632 or ng_ro_status
== "ACTIVE"
1634 ip_address
= vdur
.get("ip-address")
1637 target_vdu_id
= vdur
["vdu-id-ref"]
1638 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1640 "Cannot inject ssh-key because target VM is in error state"
1643 if not target_vdu_id
:
1646 # inject public key into machine
1647 if pub_key
and user
:
1648 self
.logger
.debug(logging_text
+ "Inserting RO key")
1649 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1650 if vdur
.get("pdu-type"):
1651 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1656 "action": "inject_ssh_key",
1660 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1662 desc
= await self
.RO
.deploy(nsr_id
, target
)
1663 action_id
= desc
["action_id"]
1664 await self
._wait
_ng
_ro
(
1665 nsr_id
, action_id
, timeout
=600, operation
="instantiation"
1668 except NgRoException
as e
:
1670 "Reaching max tries injecting key. Error: {}".format(e
)
1677 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1679 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1681 my_vca
= vca_deployed_list
[vca_index
]
1682 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1683 # vdu or kdu: no dependencies
1687 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1688 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1689 configuration_status_list
= db_nsr
["configurationStatus"]
1690 for index
, vca_deployed
in enumerate(configuration_status_list
):
1691 if index
== vca_index
:
1694 if not my_vca
.get("member-vnf-index") or (
1695 vca_deployed
.get("member-vnf-index")
1696 == my_vca
.get("member-vnf-index")
1698 internal_status
= configuration_status_list
[index
].get("status")
1699 if internal_status
== "READY":
1701 elif internal_status
== "BROKEN":
1703 "Configuration aborted because dependent charm/s has failed"
1708 # no dependencies, return
1710 await asyncio
.sleep(10)
1713 raise LcmException("Configuration aborted because dependent charm/s timeout")
1715 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1718 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1720 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1721 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1724 async def instantiate_N2VC(
1741 ee_config_descriptor
,
1743 nsr_id
= db_nsr
["_id"]
1744 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1745 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1746 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1747 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1749 "collection": "nsrs",
1750 "filter": {"_id": nsr_id
},
1751 "path": db_update_entry
,
1757 element_under_configuration
= nsr_id
1761 vnfr_id
= db_vnfr
["_id"]
1762 osm_config
["osm"]["vnf_id"] = vnfr_id
1764 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1766 if vca_type
== "native_charm":
1769 index_number
= vdu_index
or 0
1772 element_type
= "VNF"
1773 element_under_configuration
= vnfr_id
1774 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1776 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1777 element_type
= "VDU"
1778 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1779 osm_config
["osm"]["vdu_id"] = vdu_id
1781 namespace
+= ".{}".format(kdu_name
)
1782 element_type
= "KDU"
1783 element_under_configuration
= kdu_name
1784 osm_config
["osm"]["kdu_name"] = kdu_name
1787 if base_folder
["pkg-dir"]:
1788 artifact_path
= "{}/{}/{}/{}".format(
1789 base_folder
["folder"],
1790 base_folder
["pkg-dir"],
1793 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1798 artifact_path
= "{}/Scripts/{}/{}/".format(
1799 base_folder
["folder"],
1802 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1807 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1809 # get initial_config_primitive_list that applies to this element
1810 initial_config_primitive_list
= config_descriptor
.get(
1811 "initial-config-primitive"
1815 "Initial config primitive list > {}".format(
1816 initial_config_primitive_list
1820 # add config if not present for NS charm
1821 ee_descriptor_id
= ee_config_descriptor
.get("id")
1822 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1823 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1824 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1828 "Initial config primitive list #2 > {}".format(
1829 initial_config_primitive_list
1832 # n2vc_redesign STEP 3.1
1833 # find old ee_id if exists
1834 ee_id
= vca_deployed
.get("ee_id")
1836 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1837 # create or register execution environment in VCA
1838 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1840 self
._write
_configuration
_status
(
1842 vca_index
=vca_index
,
1844 element_under_configuration
=element_under_configuration
,
1845 element_type
=element_type
,
1848 step
= "create execution environment"
1849 self
.logger
.debug(logging_text
+ step
)
1853 if vca_type
== "k8s_proxy_charm":
1854 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1855 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1856 namespace
=namespace
,
1857 artifact_path
=artifact_path
,
1861 elif vca_type
== "helm" or vca_type
== "helm-v3":
1862 ee_id
, credentials
= await self
.vca_map
[
1864 ].create_execution_environment(
1865 namespace
=namespace
,
1869 artifact_path
=artifact_path
,
1870 chart_model
=vca_name
,
1874 ee_id
, credentials
= await self
.vca_map
[
1876 ].create_execution_environment(
1877 namespace
=namespace
,
1883 elif vca_type
== "native_charm":
1884 step
= "Waiting to VM being up and getting IP address"
1885 self
.logger
.debug(logging_text
+ step
)
1886 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1895 credentials
= {"hostname": rw_mgmt_ip
}
1897 username
= deep_get(
1898 config_descriptor
, ("config-access", "ssh-access", "default-user")
1900 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1901 # merged. Meanwhile let's get username from initial-config-primitive
1902 if not username
and initial_config_primitive_list
:
1903 for config_primitive
in initial_config_primitive_list
:
1904 for param
in config_primitive
.get("parameter", ()):
1905 if param
["name"] == "ssh-username":
1906 username
= param
["value"]
1910 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1911 "'config-access.ssh-access.default-user'"
1913 credentials
["username"] = username
1914 # n2vc_redesign STEP 3.2
1916 self
._write
_configuration
_status
(
1918 vca_index
=vca_index
,
1919 status
="REGISTERING",
1920 element_under_configuration
=element_under_configuration
,
1921 element_type
=element_type
,
1924 step
= "register execution environment {}".format(credentials
)
1925 self
.logger
.debug(logging_text
+ step
)
1926 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1927 credentials
=credentials
,
1928 namespace
=namespace
,
1933 # for compatibility with MON/POL modules, the need model and application name at database
1934 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1935 ee_id_parts
= ee_id
.split(".")
1936 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1937 if len(ee_id_parts
) >= 2:
1938 model_name
= ee_id_parts
[0]
1939 application_name
= ee_id_parts
[1]
1940 db_nsr_update
[db_update_entry
+ "model"] = model_name
1941 db_nsr_update
[db_update_entry
+ "application"] = application_name
1943 # n2vc_redesign STEP 3.3
1944 step
= "Install configuration Software"
1946 self
._write
_configuration
_status
(
1948 vca_index
=vca_index
,
1949 status
="INSTALLING SW",
1950 element_under_configuration
=element_under_configuration
,
1951 element_type
=element_type
,
1952 other_update
=db_nsr_update
,
1955 # TODO check if already done
1956 self
.logger
.debug(logging_text
+ step
)
1958 if vca_type
== "native_charm":
1959 config_primitive
= next(
1960 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
1963 if config_primitive
:
1964 config
= self
._map
_primitive
_params
(
1965 config_primitive
, {}, deploy_params
1968 if vca_type
== "lxc_proxy_charm":
1969 if element_type
== "NS":
1970 num_units
= db_nsr
.get("config-units") or 1
1971 elif element_type
== "VNF":
1972 num_units
= db_vnfr
.get("config-units") or 1
1973 elif element_type
== "VDU":
1974 for v
in db_vnfr
["vdur"]:
1975 if vdu_id
== v
["vdu-id-ref"]:
1976 num_units
= v
.get("config-units") or 1
1978 if vca_type
!= "k8s_proxy_charm":
1979 await self
.vca_map
[vca_type
].install_configuration_sw(
1981 artifact_path
=artifact_path
,
1984 num_units
=num_units
,
1989 # write in db flag of configuration_sw already installed
1991 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
1994 # add relations for this VCA (wait for other peers related with this VCA)
1995 is_relation_added
= await self
._add
_vca
_relations
(
1996 logging_text
=logging_text
,
1999 vca_index
=vca_index
,
2002 if not is_relation_added
:
2003 raise LcmException("Relations could not be added to VCA.")
2005 # if SSH access is required, then get execution environment SSH public
2006 # if native charm we have waited already to VM be UP
2007 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
2010 # self.logger.debug("get ssh key block")
2012 config_descriptor
, ("config-access", "ssh-access", "required")
2014 # self.logger.debug("ssh key needed")
2015 # Needed to inject a ssh key
2018 ("config-access", "ssh-access", "default-user"),
2020 step
= "Install configuration Software, getting public ssh key"
2021 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
2022 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
2025 step
= "Insert public key into VM user={} ssh_key={}".format(
2029 # self.logger.debug("no need to get ssh key")
2030 step
= "Waiting to VM being up and getting IP address"
2031 self
.logger
.debug(logging_text
+ step
)
2033 # default rw_mgmt_ip to None, avoiding the non definition of the variable
2036 # n2vc_redesign STEP 5.1
2037 # wait for RO (ip-address) Insert pub_key into VM
2040 rw_mgmt_ip
, services
= await self
.wait_kdu_up(
2041 logging_text
, nsr_id
, vnfr_id
, kdu_name
2043 vnfd
= self
.db
.get_one(
2045 {"_id": f
'{db_vnfr["vnfd-id"]}:{db_vnfr["revision"]}'},
2047 kdu
= get_kdu(vnfd
, kdu_name
)
2049 service
["name"] for service
in get_kdu_services(kdu
)
2051 exposed_services
= []
2052 for service
in services
:
2053 if any(s
in service
["name"] for s
in kdu_services
):
2054 exposed_services
.append(service
)
2055 await self
.vca_map
[vca_type
].exec_primitive(
2057 primitive_name
="config",
2059 "osm-config": json
.dumps(
2061 k8s
={"services": exposed_services
}
2068 # This verification is needed in order to avoid trying to add a public key
2069 # to a VM, when the VNF is a KNF (in the edge case where the user creates a VCA
2070 # for a KNF and not for its KDUs, the previous verification gives False, and the code
2071 # jumps to this block, meaning that there is the need to verify if the VNF is actually a VNF
2073 elif db_vnfr
.get("vdur"):
2074 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
2084 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
2086 # store rw_mgmt_ip in deploy params for later replacement
2087 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
2089 # n2vc_redesign STEP 6 Execute initial config primitive
2090 step
= "execute initial config primitive"
2092 # wait for dependent primitives execution (NS -> VNF -> VDU)
2093 if initial_config_primitive_list
:
2094 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
2096 # stage, in function of element type: vdu, kdu, vnf or ns
2097 my_vca
= vca_deployed_list
[vca_index
]
2098 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
2100 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
2101 elif my_vca
.get("member-vnf-index"):
2103 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
2106 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
2108 self
._write
_configuration
_status
(
2109 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
2112 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2114 check_if_terminated_needed
= True
2115 for initial_config_primitive
in initial_config_primitive_list
:
2116 # adding information on the vca_deployed if it is a NS execution environment
2117 if not vca_deployed
["member-vnf-index"]:
2118 deploy_params
["ns_config_info"] = json
.dumps(
2119 self
._get
_ns
_config
_info
(nsr_id
)
2121 # TODO check if already done
2122 primitive_params_
= self
._map
_primitive
_params
(
2123 initial_config_primitive
, {}, deploy_params
2126 step
= "execute primitive '{}' params '{}'".format(
2127 initial_config_primitive
["name"], primitive_params_
2129 self
.logger
.debug(logging_text
+ step
)
2130 await self
.vca_map
[vca_type
].exec_primitive(
2132 primitive_name
=initial_config_primitive
["name"],
2133 params_dict
=primitive_params_
,
2138 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2139 if check_if_terminated_needed
:
2140 if config_descriptor
.get("terminate-config-primitive"):
2142 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
2144 check_if_terminated_needed
= False
2146 # TODO register in database that primitive is done
2148 # STEP 7 Configure metrics
2149 if vca_type
== "helm" or vca_type
== "helm-v3":
2150 # TODO: review for those cases where the helm chart is a reference and
2151 # is not part of the NF package
2152 prometheus_jobs
= await self
.extract_prometheus_scrape_jobs(
2154 artifact_path
=artifact_path
,
2155 ee_config_descriptor
=ee_config_descriptor
,
2158 target_ip
=rw_mgmt_ip
,
2164 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
2167 for job
in prometheus_jobs
:
2170 {"job_name": job
["job_name"]},
2173 fail_on_empty
=False,
2176 step
= "instantiated at VCA"
2177 self
.logger
.debug(logging_text
+ step
)
2179 self
._write
_configuration
_status
(
2180 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
2183 except Exception as e
: # TODO not use Exception but N2VC exception
2184 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2186 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
2189 "Exception while {} : {}".format(step
, e
), exc_info
=True
2191 self
._write
_configuration
_status
(
2192 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2194 raise LcmException("{}. {}".format(step
, e
)) from e
2196 def _write_ns_status(
2200 current_operation
: str,
2201 current_operation_id
: str,
2202 error_description
: str = None,
2203 error_detail
: str = None,
2204 other_update
: dict = None,
2207 Update db_nsr fields.
2210 :param current_operation:
2211 :param current_operation_id:
2212 :param error_description:
2213 :param error_detail:
2214 :param other_update: Other required changes at database if provided, will be cleared
2218 db_dict
= other_update
or {}
2221 ] = current_operation_id
# for backward compatibility
2222 db_dict
["_admin.current-operation"] = current_operation_id
2223 db_dict
["_admin.operation-type"] = (
2224 current_operation
if current_operation
!= "IDLE" else None
2226 db_dict
["currentOperation"] = current_operation
2227 db_dict
["currentOperationID"] = current_operation_id
2228 db_dict
["errorDescription"] = error_description
2229 db_dict
["errorDetail"] = error_detail
2232 db_dict
["nsState"] = ns_state
2233 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2234 except DbException
as e
:
2235 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2237 def _write_op_status(
2241 error_message
: str = None,
2242 queuePosition
: int = 0,
2243 operation_state
: str = None,
2244 other_update
: dict = None,
2247 db_dict
= other_update
or {}
2248 db_dict
["queuePosition"] = queuePosition
2249 if isinstance(stage
, list):
2250 db_dict
["stage"] = stage
[0]
2251 db_dict
["detailed-status"] = " ".join(stage
)
2252 elif stage
is not None:
2253 db_dict
["stage"] = str(stage
)
2255 if error_message
is not None:
2256 db_dict
["errorMessage"] = error_message
2257 if operation_state
is not None:
2258 db_dict
["operationState"] = operation_state
2259 db_dict
["statusEnteredTime"] = time()
2260 self
.update_db_2("nslcmops", op_id
, db_dict
)
2261 except DbException
as e
:
2263 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2266 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2268 nsr_id
= db_nsr
["_id"]
2269 # configurationStatus
2270 config_status
= db_nsr
.get("configurationStatus")
2273 "configurationStatus.{}.status".format(index
): status
2274 for index
, v
in enumerate(config_status
)
2278 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2280 except DbException
as e
:
2282 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2285 def _write_configuration_status(
2290 element_under_configuration
: str = None,
2291 element_type
: str = None,
2292 other_update
: dict = None,
2295 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2296 # .format(vca_index, status))
2299 db_path
= "configurationStatus.{}.".format(vca_index
)
2300 db_dict
= other_update
or {}
2302 db_dict
[db_path
+ "status"] = status
2303 if element_under_configuration
:
2305 db_path
+ "elementUnderConfiguration"
2306 ] = element_under_configuration
2308 db_dict
[db_path
+ "elementType"] = element_type
2309 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2310 except DbException
as e
:
2312 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2313 status
, nsr_id
, vca_index
, e
2317 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2319 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2320 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2321 Database is used because the result can be obtained from a different LCM worker in case of HA.
2322 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2323 :param db_nslcmop: database content of nslcmop
2324 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2325 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2326 computed 'vim-account-id'
2329 nslcmop_id
= db_nslcmop
["_id"]
2330 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2331 if placement_engine
== "PLA":
2333 logging_text
+ "Invoke and wait for placement optimization"
2335 await self
.msg
.aiowrite(
2336 "pla", "get_placement", {"nslcmopId": nslcmop_id
}, loop
=self
.loop
2338 db_poll_interval
= 5
2339 wait
= db_poll_interval
* 10
2341 while not pla_result
and wait
>= 0:
2342 await asyncio
.sleep(db_poll_interval
)
2343 wait
-= db_poll_interval
2344 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2345 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2349 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2352 for pla_vnf
in pla_result
["vnf"]:
2353 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2354 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2359 {"_id": vnfr
["_id"]},
2360 {"vim-account-id": pla_vnf
["vimAccountId"]},
2363 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2366 def update_nsrs_with_pla_result(self
, params
):
2368 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2370 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2372 except Exception as e
:
2373 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2375 async def instantiate(self
, nsr_id
, nslcmop_id
):
2378 :param nsr_id: ns instance to deploy
2379 :param nslcmop_id: operation to run
2383 # Try to lock HA task here
2384 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2385 if not task_is_locked_by_me
:
2387 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2391 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2392 self
.logger
.debug(logging_text
+ "Enter")
2394 # get all needed from database
2396 # database nsrs record
2399 # database nslcmops record
2402 # update operation on nsrs
2404 # update operation on nslcmops
2405 db_nslcmop_update
= {}
2407 timeout_ns_deploy
= self
.timeout
.ns_deploy
2409 nslcmop_operation_state
= None
2410 db_vnfrs
= {} # vnf's info indexed by member-index
2412 tasks_dict_info
= {} # from task to info text
2416 "Stage 1/5: preparation of the environment.",
2417 "Waiting for previous operations to terminate.",
2420 # ^ stage, step, VIM progress
2422 # wait for any previous tasks in process
2423 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2425 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2426 stage
[1] = "Reading from database."
2427 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2428 db_nsr_update
["detailed-status"] = "creating"
2429 db_nsr_update
["operational-status"] = "init"
2430 self
._write
_ns
_status
(
2432 ns_state
="BUILDING",
2433 current_operation
="INSTANTIATING",
2434 current_operation_id
=nslcmop_id
,
2435 other_update
=db_nsr_update
,
2437 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2439 # read from db: operation
2440 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2441 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2442 if db_nslcmop
["operationParams"].get("additionalParamsForVnf"):
2443 db_nslcmop
["operationParams"]["additionalParamsForVnf"] = json
.loads(
2444 db_nslcmop
["operationParams"]["additionalParamsForVnf"]
2446 ns_params
= db_nslcmop
.get("operationParams")
2447 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2448 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2451 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2452 self
.logger
.debug(logging_text
+ stage
[1])
2453 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2454 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2455 self
.logger
.debug(logging_text
+ stage
[1])
2456 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2457 self
.fs
.sync(db_nsr
["nsd-id"])
2459 # nsr_name = db_nsr["name"] # TODO short-name??
2461 # read from db: vnf's of this ns
2462 stage
[1] = "Getting vnfrs from db."
2463 self
.logger
.debug(logging_text
+ stage
[1])
2464 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2466 # read from db: vnfd's for every vnf
2467 db_vnfds
= [] # every vnfd data
2469 # for each vnf in ns, read vnfd
2470 for vnfr
in db_vnfrs_list
:
2471 if vnfr
.get("kdur"):
2473 for kdur
in vnfr
["kdur"]:
2474 if kdur
.get("additionalParams"):
2475 kdur
["additionalParams"] = json
.loads(
2476 kdur
["additionalParams"]
2478 kdur_list
.append(kdur
)
2479 vnfr
["kdur"] = kdur_list
2481 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2482 vnfd_id
= vnfr
["vnfd-id"]
2483 vnfd_ref
= vnfr
["vnfd-ref"]
2484 self
.fs
.sync(vnfd_id
)
2486 # if we haven't this vnfd, read it from db
2487 if vnfd_id
not in db_vnfds
:
2489 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2492 self
.logger
.debug(logging_text
+ stage
[1])
2493 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2496 db_vnfds
.append(vnfd
)
2498 # Get or generates the _admin.deployed.VCA list
2499 vca_deployed_list
= None
2500 if db_nsr
["_admin"].get("deployed"):
2501 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2502 if vca_deployed_list
is None:
2503 vca_deployed_list
= []
2504 configuration_status_list
= []
2505 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2506 db_nsr_update
["configurationStatus"] = configuration_status_list
2507 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2508 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2509 elif isinstance(vca_deployed_list
, dict):
2510 # maintain backward compatibility. Change a dict to list at database
2511 vca_deployed_list
= list(vca_deployed_list
.values())
2512 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2513 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2516 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2518 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2519 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2521 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2522 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2523 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2525 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2528 # n2vc_redesign STEP 2 Deploy Network Scenario
2529 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2530 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2532 stage
[1] = "Deploying KDUs."
2533 # self.logger.debug(logging_text + "Before deploy_kdus")
2534 # Call to deploy_kdus in case exists the "vdu:kdu" param
2535 await self
.deploy_kdus(
2536 logging_text
=logging_text
,
2538 nslcmop_id
=nslcmop_id
,
2541 task_instantiation_info
=tasks_dict_info
,
2544 stage
[1] = "Getting VCA public key."
2545 # n2vc_redesign STEP 1 Get VCA public ssh-key
2546 # feature 1429. Add n2vc public key to needed VMs
2547 n2vc_key
= self
.n2vc
.get_public_key()
2548 n2vc_key_list
= [n2vc_key
]
2549 if self
.vca_config
.public_key
:
2550 n2vc_key_list
.append(self
.vca_config
.public_key
)
2552 stage
[1] = "Deploying NS at VIM."
2553 task_ro
= asyncio
.ensure_future(
2554 self
.instantiate_RO(
2555 logging_text
=logging_text
,
2559 db_nslcmop
=db_nslcmop
,
2562 n2vc_key_list
=n2vc_key_list
,
2566 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2567 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2569 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2570 stage
[1] = "Deploying Execution Environments."
2571 self
.logger
.debug(logging_text
+ stage
[1])
2573 # create namespace and certificate if any helm based EE is present in the NS
2574 if check_helm_ee_in_ns(db_vnfds
):
2575 # TODO: create EE namespace
2576 # create TLS certificates
2577 await self
.vca_map
["helm-v3"].create_tls_certificate(
2578 secret_name
="ee-tls-{}".format(nsr_id
),
2581 usage
="server auth",
2584 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2585 for vnf_profile
in get_vnf_profiles(nsd
):
2586 vnfd_id
= vnf_profile
["vnfd-id"]
2587 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2588 member_vnf_index
= str(vnf_profile
["id"])
2589 db_vnfr
= db_vnfrs
[member_vnf_index
]
2590 base_folder
= vnfd
["_admin"]["storage"]
2596 # Get additional parameters
2597 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2598 if db_vnfr
.get("additionalParamsForVnf"):
2599 deploy_params
.update(
2600 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2603 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2604 if descriptor_config
:
2606 logging_text
=logging_text
2607 + "member_vnf_index={} ".format(member_vnf_index
),
2610 nslcmop_id
=nslcmop_id
,
2616 member_vnf_index
=member_vnf_index
,
2617 vdu_index
=vdu_index
,
2619 deploy_params
=deploy_params
,
2620 descriptor_config
=descriptor_config
,
2621 base_folder
=base_folder
,
2622 task_instantiation_info
=tasks_dict_info
,
2626 # Deploy charms for each VDU that supports one.
2627 for vdud
in get_vdu_list(vnfd
):
2629 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2630 vdur
= find_in_list(
2631 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2634 if vdur
.get("additionalParams"):
2635 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2637 deploy_params_vdu
= deploy_params
2638 deploy_params_vdu
["OSM"] = get_osm_params(
2639 db_vnfr
, vdu_id
, vdu_count_index
=0
2641 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2643 self
.logger
.debug("VDUD > {}".format(vdud
))
2645 "Descriptor config > {}".format(descriptor_config
)
2647 if descriptor_config
:
2650 for vdu_index
in range(vdud_count
):
2651 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2653 logging_text
=logging_text
2654 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2655 member_vnf_index
, vdu_id
, vdu_index
2659 nslcmop_id
=nslcmop_id
,
2665 member_vnf_index
=member_vnf_index
,
2666 vdu_index
=vdu_index
,
2668 deploy_params
=deploy_params_vdu
,
2669 descriptor_config
=descriptor_config
,
2670 base_folder
=base_folder
,
2671 task_instantiation_info
=tasks_dict_info
,
2674 for kdud
in get_kdu_list(vnfd
):
2675 kdu_name
= kdud
["name"]
2676 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2677 if descriptor_config
:
2682 x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
2684 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2685 if kdur
.get("additionalParams"):
2686 deploy_params_kdu
.update(
2687 parse_yaml_strings(kdur
["additionalParams"].copy())
2691 logging_text
=logging_text
,
2694 nslcmop_id
=nslcmop_id
,
2700 member_vnf_index
=member_vnf_index
,
2701 vdu_index
=vdu_index
,
2703 deploy_params
=deploy_params_kdu
,
2704 descriptor_config
=descriptor_config
,
2705 base_folder
=base_folder
,
2706 task_instantiation_info
=tasks_dict_info
,
2710 # Check if this NS has a charm configuration
2711 descriptor_config
= nsd
.get("ns-configuration")
2712 if descriptor_config
and descriptor_config
.get("juju"):
2715 member_vnf_index
= None
2721 # Get additional parameters
2722 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2723 if db_nsr
.get("additionalParamsForNs"):
2724 deploy_params
.update(
2725 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2727 base_folder
= nsd
["_admin"]["storage"]
2729 logging_text
=logging_text
,
2732 nslcmop_id
=nslcmop_id
,
2738 member_vnf_index
=member_vnf_index
,
2739 vdu_index
=vdu_index
,
2741 deploy_params
=deploy_params
,
2742 descriptor_config
=descriptor_config
,
2743 base_folder
=base_folder
,
2744 task_instantiation_info
=tasks_dict_info
,
2748 # rest of staff will be done at finally
2751 ROclient
.ROClientException
,
2757 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
2760 except asyncio
.CancelledError
:
2762 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
2764 exc
= "Operation was cancelled"
2765 except Exception as e
:
2766 exc
= traceback
.format_exc()
2767 self
.logger
.critical(
2768 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
2773 error_list
.append(str(exc
))
2775 # wait for pending tasks
2777 stage
[1] = "Waiting for instantiate pending tasks."
2778 self
.logger
.debug(logging_text
+ stage
[1])
2779 error_list
+= await self
._wait
_for
_tasks
(
2787 stage
[1] = stage
[2] = ""
2788 except asyncio
.CancelledError
:
2789 error_list
.append("Cancelled")
2790 # TODO cancel all tasks
2791 except Exception as exc
:
2792 error_list
.append(str(exc
))
2794 # update operation-status
2795 db_nsr_update
["operational-status"] = "running"
2796 # let's begin with VCA 'configured' status (later we can change it)
2797 db_nsr_update
["config-status"] = "configured"
2798 for task
, task_name
in tasks_dict_info
.items():
2799 if not task
.done() or task
.cancelled() or task
.exception():
2800 if task_name
.startswith(self
.task_name_deploy_vca
):
2801 # A N2VC task is pending
2802 db_nsr_update
["config-status"] = "failed"
2804 # RO or KDU task is pending
2805 db_nsr_update
["operational-status"] = "failed"
2807 # update status at database
2809 error_detail
= ". ".join(error_list
)
2810 self
.logger
.error(logging_text
+ error_detail
)
2811 error_description_nslcmop
= "{} Detail: {}".format(
2812 stage
[0], error_detail
2814 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
2815 nslcmop_id
, stage
[0]
2818 db_nsr_update
["detailed-status"] = (
2819 error_description_nsr
+ " Detail: " + error_detail
2821 db_nslcmop_update
["detailed-status"] = error_detail
2822 nslcmop_operation_state
= "FAILED"
2826 error_description_nsr
= error_description_nslcmop
= None
2828 db_nsr_update
["detailed-status"] = "Done"
2829 db_nslcmop_update
["detailed-status"] = "Done"
2830 nslcmop_operation_state
= "COMPLETED"
2833 self
._write
_ns
_status
(
2836 current_operation
="IDLE",
2837 current_operation_id
=None,
2838 error_description
=error_description_nsr
,
2839 error_detail
=error_detail
,
2840 other_update
=db_nsr_update
,
2842 self
._write
_op
_status
(
2845 error_message
=error_description_nslcmop
,
2846 operation_state
=nslcmop_operation_state
,
2847 other_update
=db_nslcmop_update
,
2850 if nslcmop_operation_state
:
2852 await self
.msg
.aiowrite(
2857 "nslcmop_id": nslcmop_id
,
2858 "operationState": nslcmop_operation_state
,
2862 except Exception as e
:
2864 logging_text
+ "kafka_write notification Exception {}".format(e
)
2867 self
.logger
.debug(logging_text
+ "Exit")
2868 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2870 def _get_vnfd(self
, vnfd_id
: str, projects_read
: str, cached_vnfds
: Dict
[str, Any
]):
2871 if vnfd_id
not in cached_vnfds
:
2872 cached_vnfds
[vnfd_id
] = self
.db
.get_one(
2873 "vnfds", {"id": vnfd_id
, "_admin.projects_read": projects_read
}
2875 return cached_vnfds
[vnfd_id
]
2877 def _get_vnfr(self
, nsr_id
: str, vnf_profile_id
: str, cached_vnfrs
: Dict
[str, Any
]):
2878 if vnf_profile_id
not in cached_vnfrs
:
2879 cached_vnfrs
[vnf_profile_id
] = self
.db
.get_one(
2882 "member-vnf-index-ref": vnf_profile_id
,
2883 "nsr-id-ref": nsr_id
,
2886 return cached_vnfrs
[vnf_profile_id
]
2888 def _is_deployed_vca_in_relation(
2889 self
, vca
: DeployedVCA
, relation
: Relation
2892 for endpoint
in (relation
.provider
, relation
.requirer
):
2893 if endpoint
["kdu-resource-profile-id"]:
2896 vca
.vnf_profile_id
== endpoint
.vnf_profile_id
2897 and vca
.vdu_profile_id
== endpoint
.vdu_profile_id
2898 and vca
.execution_environment_ref
== endpoint
.execution_environment_ref
2904 def _update_ee_relation_data_with_implicit_data(
2905 self
, nsr_id
, nsd
, ee_relation_data
, cached_vnfds
, vnf_profile_id
: str = None
2907 ee_relation_data
= safe_get_ee_relation(
2908 nsr_id
, ee_relation_data
, vnf_profile_id
=vnf_profile_id
2910 ee_relation_level
= EELevel
.get_level(ee_relation_data
)
2911 if (ee_relation_level
in (EELevel
.VNF
, EELevel
.VDU
)) and not ee_relation_data
[
2912 "execution-environment-ref"
2914 vnf_profile
= get_vnf_profile(nsd
, ee_relation_data
["vnf-profile-id"])
2915 vnfd_id
= vnf_profile
["vnfd-id"]
2916 project
= nsd
["_admin"]["projects_read"][0]
2917 db_vnfd
= self
._get
_vnfd
(vnfd_id
, project
, cached_vnfds
)
2920 if ee_relation_level
== EELevel
.VNF
2921 else ee_relation_data
["vdu-profile-id"]
2923 ee
= get_juju_ee_ref(db_vnfd
, entity_id
)
2926 f
"not execution environments found for ee_relation {ee_relation_data}"
2928 ee_relation_data
["execution-environment-ref"] = ee
["id"]
2929 return ee_relation_data
2931 def _get_ns_relations(
2934 nsd
: Dict
[str, Any
],
2936 cached_vnfds
: Dict
[str, Any
],
2937 ) -> List
[Relation
]:
2939 db_ns_relations
= get_ns_configuration_relation_list(nsd
)
2940 for r
in db_ns_relations
:
2941 provider_dict
= None
2942 requirer_dict
= None
2943 if all(key
in r
for key
in ("provider", "requirer")):
2944 provider_dict
= r
["provider"]
2945 requirer_dict
= r
["requirer"]
2946 elif "entities" in r
:
2947 provider_id
= r
["entities"][0]["id"]
2950 "endpoint": r
["entities"][0]["endpoint"],
2952 if provider_id
!= nsd
["id"]:
2953 provider_dict
["vnf-profile-id"] = provider_id
2954 requirer_id
= r
["entities"][1]["id"]
2957 "endpoint": r
["entities"][1]["endpoint"],
2959 if requirer_id
!= nsd
["id"]:
2960 requirer_dict
["vnf-profile-id"] = requirer_id
2963 "provider/requirer or entities must be included in the relation."
2965 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2966 nsr_id
, nsd
, provider_dict
, cached_vnfds
2968 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2969 nsr_id
, nsd
, requirer_dict
, cached_vnfds
2971 provider
= EERelation(relation_provider
)
2972 requirer
= EERelation(relation_requirer
)
2973 relation
= Relation(r
["name"], provider
, requirer
)
2974 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
2976 relations
.append(relation
)
2979 def _get_vnf_relations(
2982 nsd
: Dict
[str, Any
],
2984 cached_vnfds
: Dict
[str, Any
],
2985 ) -> List
[Relation
]:
2987 if vca
.target_element
== "ns":
2988 self
.logger
.debug("VCA is a NS charm, not a VNF.")
2990 vnf_profile
= get_vnf_profile(nsd
, vca
.vnf_profile_id
)
2991 vnf_profile_id
= vnf_profile
["id"]
2992 vnfd_id
= vnf_profile
["vnfd-id"]
2993 project
= nsd
["_admin"]["projects_read"][0]
2994 db_vnfd
= self
._get
_vnfd
(vnfd_id
, project
, cached_vnfds
)
2995 db_vnf_relations
= get_relation_list(db_vnfd
, vnfd_id
)
2996 for r
in db_vnf_relations
:
2997 provider_dict
= None
2998 requirer_dict
= None
2999 if all(key
in r
for key
in ("provider", "requirer")):
3000 provider_dict
= r
["provider"]
3001 requirer_dict
= r
["requirer"]
3002 elif "entities" in r
:
3003 provider_id
= r
["entities"][0]["id"]
3006 "vnf-profile-id": vnf_profile_id
,
3007 "endpoint": r
["entities"][0]["endpoint"],
3009 if provider_id
!= vnfd_id
:
3010 provider_dict
["vdu-profile-id"] = provider_id
3011 requirer_id
= r
["entities"][1]["id"]
3014 "vnf-profile-id": vnf_profile_id
,
3015 "endpoint": r
["entities"][1]["endpoint"],
3017 if requirer_id
!= vnfd_id
:
3018 requirer_dict
["vdu-profile-id"] = requirer_id
3021 "provider/requirer or entities must be included in the relation."
3023 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3024 nsr_id
, nsd
, provider_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3026 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3027 nsr_id
, nsd
, requirer_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3029 provider
= EERelation(relation_provider
)
3030 requirer
= EERelation(relation_requirer
)
3031 relation
= Relation(r
["name"], provider
, requirer
)
3032 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
3034 relations
.append(relation
)
3037 def _get_kdu_resource_data(
3039 ee_relation
: EERelation
,
3040 db_nsr
: Dict
[str, Any
],
3041 cached_vnfds
: Dict
[str, Any
],
3042 ) -> DeployedK8sResource
:
3043 nsd
= get_nsd(db_nsr
)
3044 vnf_profiles
= get_vnf_profiles(nsd
)
3045 vnfd_id
= find_in_list(
3047 lambda vnf_profile
: vnf_profile
["id"] == ee_relation
.vnf_profile_id
,
3049 project
= nsd
["_admin"]["projects_read"][0]
3050 db_vnfd
= self
._get
_vnfd
(vnfd_id
, project
, cached_vnfds
)
3051 kdu_resource_profile
= get_kdu_resource_profile(
3052 db_vnfd
, ee_relation
.kdu_resource_profile_id
3054 kdu_name
= kdu_resource_profile
["kdu-name"]
3055 deployed_kdu
, _
= get_deployed_kdu(
3056 db_nsr
.get("_admin", ()).get("deployed", ()),
3058 ee_relation
.vnf_profile_id
,
3060 deployed_kdu
.update({"resource-name": kdu_resource_profile
["resource-name"]})
3063 def _get_deployed_component(
3065 ee_relation
: EERelation
,
3066 db_nsr
: Dict
[str, Any
],
3067 cached_vnfds
: Dict
[str, Any
],
3068 ) -> DeployedComponent
:
3069 nsr_id
= db_nsr
["_id"]
3070 deployed_component
= None
3071 ee_level
= EELevel
.get_level(ee_relation
)
3072 if ee_level
== EELevel
.NS
:
3073 vca
= get_deployed_vca(db_nsr
, {"vdu_id": None, "member-vnf-index": None})
3075 deployed_component
= DeployedVCA(nsr_id
, vca
)
3076 elif ee_level
== EELevel
.VNF
:
3077 vca
= get_deployed_vca(
3081 "member-vnf-index": ee_relation
.vnf_profile_id
,
3082 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3086 deployed_component
= DeployedVCA(nsr_id
, vca
)
3087 elif ee_level
== EELevel
.VDU
:
3088 vca
= get_deployed_vca(
3091 "vdu_id": ee_relation
.vdu_profile_id
,
3092 "member-vnf-index": ee_relation
.vnf_profile_id
,
3093 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3097 deployed_component
= DeployedVCA(nsr_id
, vca
)
3098 elif ee_level
== EELevel
.KDU
:
3099 kdu_resource_data
= self
._get
_kdu
_resource
_data
(
3100 ee_relation
, db_nsr
, cached_vnfds
3102 if kdu_resource_data
:
3103 deployed_component
= DeployedK8sResource(kdu_resource_data
)
3104 return deployed_component
3106 async def _add_relation(
3110 db_nsr
: Dict
[str, Any
],
3111 cached_vnfds
: Dict
[str, Any
],
3112 cached_vnfrs
: Dict
[str, Any
],
3114 deployed_provider
= self
._get
_deployed
_component
(
3115 relation
.provider
, db_nsr
, cached_vnfds
3117 deployed_requirer
= self
._get
_deployed
_component
(
3118 relation
.requirer
, db_nsr
, cached_vnfds
3122 and deployed_requirer
3123 and deployed_provider
.config_sw_installed
3124 and deployed_requirer
.config_sw_installed
3126 provider_db_vnfr
= (
3128 relation
.provider
.nsr_id
,
3129 relation
.provider
.vnf_profile_id
,
3132 if relation
.provider
.vnf_profile_id
3135 requirer_db_vnfr
= (
3137 relation
.requirer
.nsr_id
,
3138 relation
.requirer
.vnf_profile_id
,
3141 if relation
.requirer
.vnf_profile_id
3144 provider_vca_id
= self
.get_vca_id(provider_db_vnfr
, db_nsr
)
3145 requirer_vca_id
= self
.get_vca_id(requirer_db_vnfr
, db_nsr
)
3146 provider_relation_endpoint
= RelationEndpoint(
3147 deployed_provider
.ee_id
,
3149 relation
.provider
.endpoint
,
3151 requirer_relation_endpoint
= RelationEndpoint(
3152 deployed_requirer
.ee_id
,
3154 relation
.requirer
.endpoint
,
3157 await self
.vca_map
[vca_type
].add_relation(
3158 provider
=provider_relation_endpoint
,
3159 requirer
=requirer_relation_endpoint
,
3161 except N2VCException
as exception
:
3162 self
.logger
.error(exception
)
3163 raise LcmException(exception
)
3167 async def _add_vca_relations(
3173 timeout
: int = 3600,
3177 # 1. find all relations for this VCA
3178 # 2. wait for other peers related
3182 # STEP 1: find all relations for this VCA
3185 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3186 nsd
= get_nsd(db_nsr
)
3189 deployed_vca_dict
= get_deployed_vca_list(db_nsr
)[vca_index
]
3190 my_vca
= DeployedVCA(nsr_id
, deployed_vca_dict
)
3195 relations
.extend(self
._get
_ns
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3196 relations
.extend(self
._get
_vnf
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3198 # if no relations, terminate
3200 self
.logger
.debug(logging_text
+ " No relations")
3203 self
.logger
.debug(logging_text
+ " adding relations {}".format(relations
))
3210 if now
- start
>= timeout
:
3211 self
.logger
.error(logging_text
+ " : timeout adding relations")
3214 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3215 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3217 # for each relation, find the VCA's related
3218 for relation
in relations
.copy():
3219 added
= await self
._add
_relation
(
3227 relations
.remove(relation
)
3230 self
.logger
.debug("Relations added")
3232 await asyncio
.sleep(5.0)
3236 except Exception as e
:
3237 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
3240 async def _install_kdu(
3248 k8s_instance_info
: dict,
3249 k8params
: dict = None,
3255 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
3258 "collection": "nsrs",
3259 "filter": {"_id": nsr_id
},
3260 "path": nsr_db_path
,
3263 if k8s_instance_info
.get("kdu-deployment-name"):
3264 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
3266 kdu_instance
= self
.k8scluster_map
[
3268 ].generate_kdu_instance_name(
3269 db_dict
=db_dict_install
,
3270 kdu_model
=k8s_instance_info
["kdu-model"],
3271 kdu_name
=k8s_instance_info
["kdu-name"],
3274 # Update the nsrs table with the kdu-instance value
3278 _desc
={nsr_db_path
+ ".kdu-instance": kdu_instance
},
3281 # Update the nsrs table with the actual namespace being used, if the k8scluster-type is `juju` or
3282 # `juju-bundle`. This verification is needed because there is not a standard/homogeneous namespace
3283 # between the Helm Charts and Juju Bundles-based KNFs. If we found a way of having an homogeneous
3284 # namespace, this first verification could be removed, and the next step would be done for any kind
3286 # TODO -> find a way to have an homogeneous namespace between the Helm Charts and Juju Bundles-based
3287 # KNFs (Bug 2027: https://osm.etsi.org/bugzilla/show_bug.cgi?id=2027)
3288 if k8sclustertype
in ("juju", "juju-bundle"):
3289 # First, verify if the current namespace is present in the `_admin.projects_read` (if not, it means
3290 # that the user passed a namespace which he wants its KDU to be deployed in)
3296 "_admin.projects_write": k8s_instance_info
["namespace"],
3297 "_admin.projects_read": k8s_instance_info
["namespace"],
3303 f
"Updating namespace/model for Juju Bundle from {k8s_instance_info['namespace']} to {kdu_instance}"
3308 _desc
={f
"{nsr_db_path}.namespace": kdu_instance
},
3310 k8s_instance_info
["namespace"] = kdu_instance
3312 await self
.k8scluster_map
[k8sclustertype
].install(
3313 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3314 kdu_model
=k8s_instance_info
["kdu-model"],
3317 db_dict
=db_dict_install
,
3319 kdu_name
=k8s_instance_info
["kdu-name"],
3320 namespace
=k8s_instance_info
["namespace"],
3321 kdu_instance
=kdu_instance
,
3325 # Obtain services to obtain management service ip
3326 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
3327 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3328 kdu_instance
=kdu_instance
,
3329 namespace
=k8s_instance_info
["namespace"],
3332 # Obtain management service info (if exists)
3333 vnfr_update_dict
= {}
3334 kdu_config
= get_configuration(vnfd
, kdud
["name"])
3336 target_ee_list
= kdu_config
.get("execution-environment-list", [])
3341 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
3344 for service
in kdud
.get("service", [])
3345 if service
.get("mgmt-service")
3347 for mgmt_service
in mgmt_services
:
3348 for service
in services
:
3349 if service
["name"].startswith(mgmt_service
["name"]):
3350 # Mgmt service found, Obtain service ip
3351 ip
= service
.get("external_ip", service
.get("cluster_ip"))
3352 if isinstance(ip
, list) and len(ip
) == 1:
3356 "kdur.{}.ip-address".format(kdu_index
)
3359 # Check if must update also mgmt ip at the vnf
3360 service_external_cp
= mgmt_service
.get(
3361 "external-connection-point-ref"
3363 if service_external_cp
:
3365 deep_get(vnfd
, ("mgmt-interface", "cp"))
3366 == service_external_cp
3368 vnfr_update_dict
["ip-address"] = ip
3373 "external-connection-point-ref", ""
3375 == service_external_cp
,
3378 "kdur.{}.ip-address".format(kdu_index
)
3383 "Mgmt service name: {} not found".format(
3384 mgmt_service
["name"]
3388 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3389 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3391 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3394 and kdu_config
.get("initial-config-primitive")
3395 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3397 initial_config_primitive_list
= kdu_config
.get(
3398 "initial-config-primitive"
3400 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3402 for initial_config_primitive
in initial_config_primitive_list
:
3403 primitive_params_
= self
._map
_primitive
_params
(
3404 initial_config_primitive
, {}, {}
3407 await asyncio
.wait_for(
3408 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3409 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3410 kdu_instance
=kdu_instance
,
3411 primitive_name
=initial_config_primitive
["name"],
3412 params
=primitive_params_
,
3413 db_dict
=db_dict_install
,
3419 except Exception as e
:
3420 # Prepare update db with error and raise exception
3423 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3427 vnfr_data
.get("_id"),
3428 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3431 # ignore to keep original exception
3433 # reraise original error
3438 async def deploy_kdus(
3445 task_instantiation_info
,
3447 # Launch kdus if present in the descriptor
3449 k8scluster_id_2_uuic
= {
3450 "helm-chart-v3": {},
3455 async def _get_cluster_id(cluster_id
, cluster_type
):
3456 nonlocal k8scluster_id_2_uuic
3457 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3458 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3460 # check if K8scluster is creating and wait look if previous tasks in process
3461 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3462 "k8scluster", cluster_id
3465 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3466 task_name
, cluster_id
3468 self
.logger
.debug(logging_text
+ text
)
3469 await asyncio
.wait(task_dependency
, timeout
=3600)
3471 db_k8scluster
= self
.db
.get_one(
3472 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3474 if not db_k8scluster
:
3475 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3477 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3479 if cluster_type
== "helm-chart-v3":
3481 # backward compatibility for existing clusters that have not been initialized for helm v3
3482 k8s_credentials
= yaml
.safe_dump(
3483 db_k8scluster
.get("credentials")
3485 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3486 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3488 db_k8scluster_update
= {}
3489 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3490 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3491 db_k8scluster_update
[
3492 "_admin.helm-chart-v3.created"
3494 db_k8scluster_update
[
3495 "_admin.helm-chart-v3.operationalState"
3498 "k8sclusters", cluster_id
, db_k8scluster_update
3500 except Exception as e
:
3503 + "error initializing helm-v3 cluster: {}".format(str(e
))
3506 "K8s cluster '{}' has not been initialized for '{}'".format(
3507 cluster_id
, cluster_type
3512 "K8s cluster '{}' has not been initialized for '{}'".format(
3513 cluster_id
, cluster_type
3516 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3519 logging_text
+= "Deploy kdus: "
3522 db_nsr_update
= {"_admin.deployed.K8s": []}
3523 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3526 updated_cluster_list
= []
3527 updated_v3_cluster_list
= []
3529 for vnfr_data
in db_vnfrs
.values():
3530 vca_id
= self
.get_vca_id(vnfr_data
, {})
3531 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3532 # Step 0: Prepare and set parameters
3533 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3534 vnfd_id
= vnfr_data
.get("vnfd-id")
3535 vnfd_with_id
= find_in_list(
3536 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3540 for kdud
in vnfd_with_id
["kdu"]
3541 if kdud
["name"] == kdur
["kdu-name"]
3543 namespace
= kdur
.get("k8s-namespace")
3544 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3545 if kdur
.get("helm-chart"):
3546 kdumodel
= kdur
["helm-chart"]
3547 # Default version: helm3, if helm-version is v2 assign v2
3548 k8sclustertype
= "helm-chart-v3"
3549 self
.logger
.debug("kdur: {}".format(kdur
))
3551 kdur
.get("helm-version")
3552 and kdur
.get("helm-version") == "v2"
3554 k8sclustertype
= "helm-chart"
3555 elif kdur
.get("juju-bundle"):
3556 kdumodel
= kdur
["juju-bundle"]
3557 k8sclustertype
= "juju-bundle"
3560 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3561 "juju-bundle. Maybe an old NBI version is running".format(
3562 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3565 # check if kdumodel is a file and exists
3567 vnfd_with_id
= find_in_list(
3568 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3570 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3571 if storage
: # may be not present if vnfd has not artifacts
3572 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3573 if storage
["pkg-dir"]:
3574 filename
= "{}/{}/{}s/{}".format(
3581 filename
= "{}/Scripts/{}s/{}".format(
3586 if self
.fs
.file_exists(
3587 filename
, mode
="file"
3588 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3589 kdumodel
= self
.fs
.path
+ filename
3590 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3592 except Exception: # it is not a file
3595 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3596 step
= "Synchronize repos for k8s cluster '{}'".format(
3599 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3603 k8sclustertype
== "helm-chart"
3604 and cluster_uuid
not in updated_cluster_list
3606 k8sclustertype
== "helm-chart-v3"
3607 and cluster_uuid
not in updated_v3_cluster_list
3609 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3610 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3611 cluster_uuid
=cluster_uuid
3614 if del_repo_list
or added_repo_dict
:
3615 if k8sclustertype
== "helm-chart":
3617 "_admin.helm_charts_added." + item
: None
3618 for item
in del_repo_list
3621 "_admin.helm_charts_added." + item
: name
3622 for item
, name
in added_repo_dict
.items()
3624 updated_cluster_list
.append(cluster_uuid
)
3625 elif k8sclustertype
== "helm-chart-v3":
3627 "_admin.helm_charts_v3_added." + item
: None
3628 for item
in del_repo_list
3631 "_admin.helm_charts_v3_added." + item
: name
3632 for item
, name
in added_repo_dict
.items()
3634 updated_v3_cluster_list
.append(cluster_uuid
)
3636 logging_text
+ "repos synchronized on k8s cluster "
3637 "'{}' to_delete: {}, to_add: {}".format(
3638 k8s_cluster_id
, del_repo_list
, added_repo_dict
3643 {"_id": k8s_cluster_id
},
3649 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3650 vnfr_data
["member-vnf-index-ref"],
3654 k8s_instance_info
= {
3655 "kdu-instance": None,
3656 "k8scluster-uuid": cluster_uuid
,
3657 "k8scluster-type": k8sclustertype
,
3658 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3659 "kdu-name": kdur
["kdu-name"],
3660 "kdu-model": kdumodel
,
3661 "namespace": namespace
,
3662 "kdu-deployment-name": kdu_deployment_name
,
3664 db_path
= "_admin.deployed.K8s.{}".format(index
)
3665 db_nsr_update
[db_path
] = k8s_instance_info
3666 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3667 vnfd_with_id
= find_in_list(
3668 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3670 task
= asyncio
.ensure_future(
3679 k8params
=desc_params
,
3684 self
.lcm_tasks
.register(
3688 "instantiate_KDU-{}".format(index
),
3691 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3697 except (LcmException
, asyncio
.CancelledError
):
3699 except Exception as e
:
3700 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3701 if isinstance(e
, (N2VCException
, DbException
)):
3702 self
.logger
.error(logging_text
+ msg
)
3704 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3705 raise LcmException(msg
)
3708 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3727 task_instantiation_info
,
3730 # launch instantiate_N2VC in a asyncio task and register task object
3731 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3732 # if not found, create one entry and update database
3733 # fill db_nsr._admin.deployed.VCA.<index>
3736 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
3740 get_charm_name
= False
3741 if "execution-environment-list" in descriptor_config
:
3742 ee_list
= descriptor_config
.get("execution-environment-list", [])
3743 elif "juju" in descriptor_config
:
3744 ee_list
= [descriptor_config
] # ns charms
3745 if "execution-environment-list" not in descriptor_config
:
3746 # charm name is only required for ns charms
3747 get_charm_name
= True
3748 else: # other types as script are not supported
3751 for ee_item
in ee_list
:
3754 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3755 ee_item
.get("juju"), ee_item
.get("helm-chart")
3758 ee_descriptor_id
= ee_item
.get("id")
3759 if ee_item
.get("juju"):
3760 vca_name
= ee_item
["juju"].get("charm")
3762 charm_name
= self
.find_charm_name(db_nsr
, str(vca_name
))
3765 if ee_item
["juju"].get("charm") is not None
3768 if ee_item
["juju"].get("cloud") == "k8s":
3769 vca_type
= "k8s_proxy_charm"
3770 elif ee_item
["juju"].get("proxy") is False:
3771 vca_type
= "native_charm"
3772 elif ee_item
.get("helm-chart"):
3773 vca_name
= ee_item
["helm-chart"]
3774 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
3777 vca_type
= "helm-v3"
3780 logging_text
+ "skipping non juju neither charm configuration"
3785 for vca_index
, vca_deployed
in enumerate(
3786 db_nsr
["_admin"]["deployed"]["VCA"]
3788 if not vca_deployed
:
3791 vca_deployed
.get("member-vnf-index") == member_vnf_index
3792 and vca_deployed
.get("vdu_id") == vdu_id
3793 and vca_deployed
.get("kdu_name") == kdu_name
3794 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
3795 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
3799 # not found, create one.
3801 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
3804 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
3806 target
+= "/kdu/{}".format(kdu_name
)
3808 "target_element": target
,
3809 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3810 "member-vnf-index": member_vnf_index
,
3812 "kdu_name": kdu_name
,
3813 "vdu_count_index": vdu_index
,
3814 "operational-status": "init", # TODO revise
3815 "detailed-status": "", # TODO revise
3816 "step": "initial-deploy", # TODO revise
3818 "vdu_name": vdu_name
,
3820 "ee_descriptor_id": ee_descriptor_id
,
3821 "charm_name": charm_name
,
3825 # create VCA and configurationStatus in db
3827 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
3828 "configurationStatus.{}".format(vca_index
): dict(),
3830 self
.update_db_2("nsrs", nsr_id
, db_dict
)
3832 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
3834 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
3835 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
3836 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
3839 task_n2vc
= asyncio
.ensure_future(
3840 self
.instantiate_N2VC(
3841 logging_text
=logging_text
,
3842 vca_index
=vca_index
,
3848 vdu_index
=vdu_index
,
3849 deploy_params
=deploy_params
,
3850 config_descriptor
=descriptor_config
,
3851 base_folder
=base_folder
,
3852 nslcmop_id
=nslcmop_id
,
3856 ee_config_descriptor
=ee_item
,
3859 self
.lcm_tasks
.register(
3863 "instantiate_N2VC-{}".format(vca_index
),
3866 task_instantiation_info
[
3868 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
3869 member_vnf_index
or "", vdu_id
or ""
3873 def _create_nslcmop(nsr_id
, operation
, params
):
3875 Creates a ns-lcm-opp content to be stored at database.
3876 :param nsr_id: internal id of the instance
3877 :param operation: instantiate, terminate, scale, action, ...
3878 :param params: user parameters for the operation
3879 :return: dictionary following SOL005 format
3881 # Raise exception if invalid arguments
3882 if not (nsr_id
and operation
and params
):
3884 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3891 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3892 "operationState": "PROCESSING",
3893 "statusEnteredTime": now
,
3894 "nsInstanceId": nsr_id
,
3895 "lcmOperationType": operation
,
3897 "isAutomaticInvocation": False,
3898 "operationParams": params
,
3899 "isCancelPending": False,
3901 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
3902 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
3907 def _format_additional_params(self
, params
):
3908 params
= params
or {}
3909 for key
, value
in params
.items():
3910 if str(value
).startswith("!!yaml "):
3911 params
[key
] = yaml
.safe_load(value
[7:])
3914 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
3915 primitive
= seq
.get("name")
3916 primitive_params
= {}
3918 "member_vnf_index": vnf_index
,
3919 "primitive": primitive
,
3920 "primitive_params": primitive_params
,
3923 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
3927 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
3928 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
3929 if op
.get("operationState") == "COMPLETED":
3930 # b. Skip sub-operation
3931 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3932 return self
.SUBOPERATION_STATUS_SKIP
3934 # c. retry executing sub-operation
3935 # The sub-operation exists, and operationState != 'COMPLETED'
3936 # Update operationState = 'PROCESSING' to indicate a retry.
3937 operationState
= "PROCESSING"
3938 detailed_status
= "In progress"
3939 self
._update
_suboperation
_status
(
3940 db_nslcmop
, op_index
, operationState
, detailed_status
3942 # Return the sub-operation index
3943 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3944 # with arguments extracted from the sub-operation
3947 # Find a sub-operation where all keys in a matching dictionary must match
3948 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3949 def _find_suboperation(self
, db_nslcmop
, match
):
3950 if db_nslcmop
and match
:
3951 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
3952 for i
, op
in enumerate(op_list
):
3953 if all(op
.get(k
) == match
[k
] for k
in match
):
3955 return self
.SUBOPERATION_STATUS_NOT_FOUND
3957 # Update status for a sub-operation given its index
3958 def _update_suboperation_status(
3959 self
, db_nslcmop
, op_index
, operationState
, detailed_status
3961 # Update DB for HA tasks
3962 q_filter
= {"_id": db_nslcmop
["_id"]}
3964 "_admin.operations.{}.operationState".format(op_index
): operationState
,
3965 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
3968 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
3971 # Add sub-operation, return the index of the added sub-operation
3972 # Optionally, set operationState, detailed-status, and operationType
3973 # Status and type are currently set for 'scale' sub-operations:
3974 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3975 # 'detailed-status' : status message
3976 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3977 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3978 def _add_suboperation(
3986 mapped_primitive_params
,
3987 operationState
=None,
3988 detailed_status
=None,
3991 RO_scaling_info
=None,
3994 return self
.SUBOPERATION_STATUS_NOT_FOUND
3995 # Get the "_admin.operations" list, if it exists
3996 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
3997 op_list
= db_nslcmop_admin
.get("operations")
3998 # Create or append to the "_admin.operations" list
4000 "member_vnf_index": vnf_index
,
4002 "vdu_count_index": vdu_count_index
,
4003 "primitive": primitive
,
4004 "primitive_params": mapped_primitive_params
,
4007 new_op
["operationState"] = operationState
4009 new_op
["detailed-status"] = detailed_status
4011 new_op
["lcmOperationType"] = operationType
4013 new_op
["RO_nsr_id"] = RO_nsr_id
4015 new_op
["RO_scaling_info"] = RO_scaling_info
4017 # No existing operations, create key 'operations' with current operation as first list element
4018 db_nslcmop_admin
.update({"operations": [new_op
]})
4019 op_list
= db_nslcmop_admin
.get("operations")
4021 # Existing operations, append operation to list
4022 op_list
.append(new_op
)
4024 db_nslcmop_update
= {"_admin.operations": op_list
}
4025 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
4026 op_index
= len(op_list
) - 1
4029 # Helper methods for scale() sub-operations
4031 # pre-scale/post-scale:
4032 # Check for 3 different cases:
4033 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
4034 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
4035 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
4036 def _check_or_add_scale_suboperation(
4040 vnf_config_primitive
,
4044 RO_scaling_info
=None,
4046 # Find this sub-operation
4047 if RO_nsr_id
and RO_scaling_info
:
4048 operationType
= "SCALE-RO"
4050 "member_vnf_index": vnf_index
,
4051 "RO_nsr_id": RO_nsr_id
,
4052 "RO_scaling_info": RO_scaling_info
,
4056 "member_vnf_index": vnf_index
,
4057 "primitive": vnf_config_primitive
,
4058 "primitive_params": primitive_params
,
4059 "lcmOperationType": operationType
,
4061 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
4062 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
4063 # a. New sub-operation
4064 # The sub-operation does not exist, add it.
4065 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
4066 # The following parameters are set to None for all kind of scaling:
4068 vdu_count_index
= None
4070 if RO_nsr_id
and RO_scaling_info
:
4071 vnf_config_primitive
= None
4072 primitive_params
= None
4075 RO_scaling_info
= None
4076 # Initial status for sub-operation
4077 operationState
= "PROCESSING"
4078 detailed_status
= "In progress"
4079 # Add sub-operation for pre/post-scaling (zero or more operations)
4080 self
._add
_suboperation
(
4086 vnf_config_primitive
,
4094 return self
.SUBOPERATION_STATUS_NEW
4096 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
4097 # or op_index (operationState != 'COMPLETED')
4098 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
4100 # Function to return execution_environment id
4102 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
4103 # TODO vdu_index_count
4104 for vca
in vca_deployed_list
:
4105 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
4108 async def destroy_N2VC(
4116 exec_primitives
=True,
4121 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
4122 :param logging_text:
4124 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
4125 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
4126 :param vca_index: index in the database _admin.deployed.VCA
4127 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
4128 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
4129 not executed properly
4130 :param scaling_in: True destroys the application, False destroys the model
4131 :return: None or exception
4136 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
4137 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
4141 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
4143 # execute terminate_primitives
4145 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
4146 config_descriptor
.get("terminate-config-primitive"),
4147 vca_deployed
.get("ee_descriptor_id"),
4149 vdu_id
= vca_deployed
.get("vdu_id")
4150 vdu_count_index
= vca_deployed
.get("vdu_count_index")
4151 vdu_name
= vca_deployed
.get("vdu_name")
4152 vnf_index
= vca_deployed
.get("member-vnf-index")
4153 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
4154 for seq
in terminate_primitives
:
4155 # For each sequence in list, get primitive and call _ns_execute_primitive()
4156 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
4157 vnf_index
, seq
.get("name")
4159 self
.logger
.debug(logging_text
+ step
)
4160 # Create the primitive for each sequence, i.e. "primitive": "touch"
4161 primitive
= seq
.get("name")
4162 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
4167 self
._add
_suboperation
(
4174 mapped_primitive_params
,
4176 # Sub-operations: Call _ns_execute_primitive() instead of action()
4178 result
, result_detail
= await self
._ns
_execute
_primitive
(
4179 vca_deployed
["ee_id"],
4181 mapped_primitive_params
,
4185 except LcmException
:
4186 # this happens when VCA is not deployed. In this case it is not needed to terminate
4188 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
4189 if result
not in result_ok
:
4191 "terminate_primitive {} for vnf_member_index={} fails with "
4192 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
4194 # set that this VCA do not need terminated
4195 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
4199 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
4202 # Delete Prometheus Jobs if any
4203 # This uses NSR_ID, so it will destroy any jobs under this index
4204 self
.db
.del_list("prometheus_jobs", {"nsr_id": db_nslcmop
["nsInstanceId"]})
4207 await self
.vca_map
[vca_type
].delete_execution_environment(
4208 vca_deployed
["ee_id"],
4209 scaling_in
=scaling_in
,
4214 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
4215 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
4216 namespace
= "." + db_nsr
["_id"]
4218 await self
.n2vc
.delete_namespace(
4219 namespace
=namespace
,
4220 total_timeout
=self
.timeout
.charm_delete
,
4223 except N2VCNotFound
: # already deleted. Skip
4225 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
4227 async def terminate(self
, nsr_id
, nslcmop_id
):
4228 # Try to lock HA task here
4229 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4230 if not task_is_locked_by_me
:
4233 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4234 self
.logger
.debug(logging_text
+ "Enter")
4235 timeout_ns_terminate
= self
.timeout
.ns_terminate
4238 operation_params
= None
4240 error_list
= [] # annotates all failed error messages
4241 db_nslcmop_update
= {}
4242 autoremove
= False # autoremove after terminated
4243 tasks_dict_info
= {}
4246 "Stage 1/3: Preparing task.",
4247 "Waiting for previous operations to terminate.",
4250 # ^ contains [stage, step, VIM-status]
4252 # wait for any previous tasks in process
4253 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4255 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4256 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4257 operation_params
= db_nslcmop
.get("operationParams") or {}
4258 if operation_params
.get("timeout_ns_terminate"):
4259 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4260 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4261 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4263 db_nsr_update
["operational-status"] = "terminating"
4264 db_nsr_update
["config-status"] = "terminating"
4265 self
._write
_ns
_status
(
4267 ns_state
="TERMINATING",
4268 current_operation
="TERMINATING",
4269 current_operation_id
=nslcmop_id
,
4270 other_update
=db_nsr_update
,
4272 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4273 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4274 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4277 stage
[1] = "Getting vnf descriptors from db."
4278 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4280 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4282 db_vnfds_from_id
= {}
4283 db_vnfds_from_member_index
= {}
4285 for vnfr
in db_vnfrs_list
:
4286 vnfd_id
= vnfr
["vnfd-id"]
4287 if vnfd_id
not in db_vnfds_from_id
:
4288 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4289 db_vnfds_from_id
[vnfd_id
] = vnfd
4290 db_vnfds_from_member_index
[
4291 vnfr
["member-vnf-index-ref"]
4292 ] = db_vnfds_from_id
[vnfd_id
]
4294 # Destroy individual execution environments when there are terminating primitives.
4295 # Rest of EE will be deleted at once
4296 # TODO - check before calling _destroy_N2VC
4297 # if not operation_params.get("skip_terminate_primitives"):#
4298 # or not vca.get("needed_terminate"):
4299 stage
[0] = "Stage 2/3 execute terminating primitives."
4300 self
.logger
.debug(logging_text
+ stage
[0])
4301 stage
[1] = "Looking execution environment that needs terminate."
4302 self
.logger
.debug(logging_text
+ stage
[1])
4304 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4305 config_descriptor
= None
4306 vca_member_vnf_index
= vca
.get("member-vnf-index")
4307 vca_id
= self
.get_vca_id(
4308 db_vnfrs_dict
.get(vca_member_vnf_index
)
4309 if vca_member_vnf_index
4313 if not vca
or not vca
.get("ee_id"):
4315 if not vca
.get("member-vnf-index"):
4317 config_descriptor
= db_nsr
.get("ns-configuration")
4318 elif vca
.get("vdu_id"):
4319 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4320 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4321 elif vca
.get("kdu_name"):
4322 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4323 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4325 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4326 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4327 vca_type
= vca
.get("type")
4328 exec_terminate_primitives
= not operation_params
.get(
4329 "skip_terminate_primitives"
4330 ) and vca
.get("needed_terminate")
4331 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4332 # pending native charms
4334 True if vca_type
in ("helm", "helm-v3", "native_charm") else False
4336 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4337 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4338 task
= asyncio
.ensure_future(
4346 exec_terminate_primitives
,
4350 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4352 # wait for pending tasks of terminate primitives
4356 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4358 error_list
= await self
._wait
_for
_tasks
(
4361 min(self
.timeout
.charm_delete
, timeout_ns_terminate
),
4365 tasks_dict_info
.clear()
4367 return # raise LcmException("; ".join(error_list))
4369 # remove All execution environments at once
4370 stage
[0] = "Stage 3/3 delete all."
4372 if nsr_deployed
.get("VCA"):
4373 stage
[1] = "Deleting all execution environments."
4374 self
.logger
.debug(logging_text
+ stage
[1])
4375 vca_id
= self
.get_vca_id({}, db_nsr
)
4376 task_delete_ee
= asyncio
.ensure_future(
4378 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4379 timeout
=self
.timeout
.charm_delete
,
4382 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4383 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4385 # Delete Namespace and Certificates if necessary
4386 if check_helm_ee_in_ns(list(db_vnfds_from_member_index
.values())):
4387 await self
.vca_map
["helm-v3"].delete_tls_certificate(
4388 certificate_name
=db_nslcmop
["nsInstanceId"],
4390 # TODO: Delete namespace
4392 # Delete from k8scluster
4393 stage
[1] = "Deleting KDUs."
4394 self
.logger
.debug(logging_text
+ stage
[1])
4395 # print(nsr_deployed)
4396 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4397 if not kdu
or not kdu
.get("kdu-instance"):
4399 kdu_instance
= kdu
.get("kdu-instance")
4400 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4401 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4402 vca_id
= self
.get_vca_id({}, db_nsr
)
4403 task_delete_kdu_instance
= asyncio
.ensure_future(
4404 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4405 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4406 kdu_instance
=kdu_instance
,
4408 namespace
=kdu
.get("namespace"),
4414 + "Unknown k8s deployment type {}".format(
4415 kdu
.get("k8scluster-type")
4420 task_delete_kdu_instance
4421 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4424 stage
[1] = "Deleting ns from VIM."
4425 if self
.ro_config
.ng
:
4426 task_delete_ro
= asyncio
.ensure_future(
4427 self
._terminate
_ng
_ro
(
4428 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4431 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4433 # rest of staff will be done at finally
4436 ROclient
.ROClientException
,
4441 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4443 except asyncio
.CancelledError
:
4445 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4447 exc
= "Operation was cancelled"
4448 except Exception as e
:
4449 exc
= traceback
.format_exc()
4450 self
.logger
.critical(
4451 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4456 error_list
.append(str(exc
))
4458 # wait for pending tasks
4460 stage
[1] = "Waiting for terminate pending tasks."
4461 self
.logger
.debug(logging_text
+ stage
[1])
4462 error_list
+= await self
._wait
_for
_tasks
(
4465 timeout_ns_terminate
,
4469 stage
[1] = stage
[2] = ""
4470 except asyncio
.CancelledError
:
4471 error_list
.append("Cancelled")
4472 # TODO cancell all tasks
4473 except Exception as exc
:
4474 error_list
.append(str(exc
))
4475 # update status at database
4477 error_detail
= "; ".join(error_list
)
4478 # self.logger.error(logging_text + error_detail)
4479 error_description_nslcmop
= "{} Detail: {}".format(
4480 stage
[0], error_detail
4482 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4483 nslcmop_id
, stage
[0]
4486 db_nsr_update
["operational-status"] = "failed"
4487 db_nsr_update
["detailed-status"] = (
4488 error_description_nsr
+ " Detail: " + error_detail
4490 db_nslcmop_update
["detailed-status"] = error_detail
4491 nslcmop_operation_state
= "FAILED"
4495 error_description_nsr
= error_description_nslcmop
= None
4496 ns_state
= "NOT_INSTANTIATED"
4497 db_nsr_update
["operational-status"] = "terminated"
4498 db_nsr_update
["detailed-status"] = "Done"
4499 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4500 db_nslcmop_update
["detailed-status"] = "Done"
4501 nslcmop_operation_state
= "COMPLETED"
4504 self
._write
_ns
_status
(
4507 current_operation
="IDLE",
4508 current_operation_id
=None,
4509 error_description
=error_description_nsr
,
4510 error_detail
=error_detail
,
4511 other_update
=db_nsr_update
,
4513 self
._write
_op
_status
(
4516 error_message
=error_description_nslcmop
,
4517 operation_state
=nslcmop_operation_state
,
4518 other_update
=db_nslcmop_update
,
4520 if ns_state
== "NOT_INSTANTIATED":
4524 {"nsr-id-ref": nsr_id
},
4525 {"_admin.nsState": "NOT_INSTANTIATED"},
4527 except DbException
as e
:
4530 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4534 if operation_params
:
4535 autoremove
= operation_params
.get("autoremove", False)
4536 if nslcmop_operation_state
:
4538 await self
.msg
.aiowrite(
4543 "nslcmop_id": nslcmop_id
,
4544 "operationState": nslcmop_operation_state
,
4545 "autoremove": autoremove
,
4549 except Exception as e
:
4551 logging_text
+ "kafka_write notification Exception {}".format(e
)
4554 self
.logger
.debug(logging_text
+ "Exit")
4555 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4557 async def _wait_for_tasks(
4558 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4561 error_detail_list
= []
4563 pending_tasks
= list(created_tasks_info
.keys())
4564 num_tasks
= len(pending_tasks
)
4566 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4567 self
._write
_op
_status
(nslcmop_id
, stage
)
4568 while pending_tasks
:
4570 _timeout
= timeout
+ time_start
- time()
4571 done
, pending_tasks
= await asyncio
.wait(
4572 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4574 num_done
+= len(done
)
4575 if not done
: # Timeout
4576 for task
in pending_tasks
:
4577 new_error
= created_tasks_info
[task
] + ": Timeout"
4578 error_detail_list
.append(new_error
)
4579 error_list
.append(new_error
)
4582 if task
.cancelled():
4585 exc
= task
.exception()
4587 if isinstance(exc
, asyncio
.TimeoutError
):
4589 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4590 error_list
.append(created_tasks_info
[task
])
4591 error_detail_list
.append(new_error
)
4598 ROclient
.ROClientException
,
4604 self
.logger
.error(logging_text
+ new_error
)
4606 exc_traceback
= "".join(
4607 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4611 + created_tasks_info
[task
]
4617 logging_text
+ created_tasks_info
[task
] + ": Done"
4619 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4621 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4622 if nsr_id
: # update also nsr
4627 "errorDescription": "Error at: " + ", ".join(error_list
),
4628 "errorDetail": ". ".join(error_detail_list
),
4631 self
._write
_op
_status
(nslcmop_id
, stage
)
4632 return error_detail_list
4635 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4637 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4638 The default-value is used. If it is between < > it look for a value at instantiation_params
4639 :param primitive_desc: portion of VNFD/NSD that describes primitive
4640 :param params: Params provided by user
4641 :param instantiation_params: Instantiation params provided by user
4642 :return: a dictionary with the calculated params
4644 calculated_params
= {}
4645 for parameter
in primitive_desc
.get("parameter", ()):
4646 param_name
= parameter
["name"]
4647 if param_name
in params
:
4648 calculated_params
[param_name
] = params
[param_name
]
4649 elif "default-value" in parameter
or "value" in parameter
:
4650 if "value" in parameter
:
4651 calculated_params
[param_name
] = parameter
["value"]
4653 calculated_params
[param_name
] = parameter
["default-value"]
4655 isinstance(calculated_params
[param_name
], str)
4656 and calculated_params
[param_name
].startswith("<")
4657 and calculated_params
[param_name
].endswith(">")
4659 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4660 calculated_params
[param_name
] = instantiation_params
[
4661 calculated_params
[param_name
][1:-1]
4665 "Parameter {} needed to execute primitive {} not provided".format(
4666 calculated_params
[param_name
], primitive_desc
["name"]
4671 "Parameter {} needed to execute primitive {} not provided".format(
4672 param_name
, primitive_desc
["name"]
4676 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4677 calculated_params
[param_name
] = yaml
.safe_dump(
4678 calculated_params
[param_name
], default_flow_style
=True, width
=256
4680 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4682 ].startswith("!!yaml "):
4683 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4684 if parameter
.get("data-type") == "INTEGER":
4686 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4687 except ValueError: # error converting string to int
4689 "Parameter {} of primitive {} must be integer".format(
4690 param_name
, primitive_desc
["name"]
4693 elif parameter
.get("data-type") == "BOOLEAN":
4694 calculated_params
[param_name
] = not (
4695 (str(calculated_params
[param_name
])).lower() == "false"
4698 # add always ns_config_info if primitive name is config
4699 if primitive_desc
["name"] == "config":
4700 if "ns_config_info" in instantiation_params
:
4701 calculated_params
["ns_config_info"] = instantiation_params
[
4704 return calculated_params
4706 def _look_for_deployed_vca(
4713 ee_descriptor_id
=None,
4715 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4716 for vca
in deployed_vca
:
4719 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4722 vdu_count_index
is not None
4723 and vdu_count_index
!= vca
["vdu_count_index"]
4726 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4728 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4732 # vca_deployed not found
4734 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4735 " is not deployed".format(
4744 ee_id
= vca
.get("ee_id")
4746 "type", "lxc_proxy_charm"
4747 ) # default value for backward compatibility - proxy charm
4750 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4751 "execution environment".format(
4752 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
4755 return ee_id
, vca_type
4757 async def _ns_execute_primitive(
4763 retries_interval
=30,
4770 if primitive
== "config":
4771 primitive_params
= {"params": primitive_params
}
4773 vca_type
= vca_type
or "lxc_proxy_charm"
4777 output
= await asyncio
.wait_for(
4778 self
.vca_map
[vca_type
].exec_primitive(
4780 primitive_name
=primitive
,
4781 params_dict
=primitive_params
,
4782 progress_timeout
=self
.timeout
.progress_primitive
,
4783 total_timeout
=self
.timeout
.primitive
,
4788 timeout
=timeout
or self
.timeout
.primitive
,
4792 except asyncio
.CancelledError
:
4794 except Exception as e
:
4798 "Error executing action {} on {} -> {}".format(
4803 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
4805 if isinstance(e
, asyncio
.TimeoutError
):
4807 message
="Timed out waiting for action to complete"
4809 return "FAILED", getattr(e
, "message", repr(e
))
4811 return "COMPLETED", output
4813 except (LcmException
, asyncio
.CancelledError
):
4815 except Exception as e
:
4816 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
4818 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
4820 Updating the vca_status with latest juju information in nsrs record
4821 :param: nsr_id: Id of the nsr
4822 :param: nslcmop_id: Id of the nslcmop
4826 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
4827 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4828 vca_id
= self
.get_vca_id({}, db_nsr
)
4829 if db_nsr
["_admin"]["deployed"]["K8s"]:
4830 for _
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
4831 cluster_uuid
, kdu_instance
, cluster_type
= (
4832 k8s
["k8scluster-uuid"],
4833 k8s
["kdu-instance"],
4834 k8s
["k8scluster-type"],
4836 await self
._on
_update
_k
8s
_db
(
4837 cluster_uuid
=cluster_uuid
,
4838 kdu_instance
=kdu_instance
,
4839 filter={"_id": nsr_id
},
4841 cluster_type
=cluster_type
,
4844 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
4845 table
, filter = "nsrs", {"_id": nsr_id
}
4846 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
4847 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
4849 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
4850 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
4852 async def action(self
, nsr_id
, nslcmop_id
):
4853 # Try to lock HA task here
4854 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4855 if not task_is_locked_by_me
:
4858 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
4859 self
.logger
.debug(logging_text
+ "Enter")
4860 # get all needed from database
4864 db_nslcmop_update
= {}
4865 nslcmop_operation_state
= None
4866 error_description_nslcmop
= None
4870 # wait for any previous tasks in process
4871 step
= "Waiting for previous operations to terminate"
4872 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4874 self
._write
_ns
_status
(
4877 current_operation
="RUNNING ACTION",
4878 current_operation_id
=nslcmop_id
,
4881 step
= "Getting information from database"
4882 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4883 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4884 if db_nslcmop
["operationParams"].get("primitive_params"):
4885 db_nslcmop
["operationParams"]["primitive_params"] = json
.loads(
4886 db_nslcmop
["operationParams"]["primitive_params"]
4889 nsr_deployed
= db_nsr
["_admin"].get("deployed")
4890 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
4891 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
4892 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
4893 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
4894 primitive
= db_nslcmop
["operationParams"]["primitive"]
4895 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
4896 timeout_ns_action
= db_nslcmop
["operationParams"].get(
4897 "timeout_ns_action", self
.timeout
.primitive
4901 step
= "Getting vnfr from database"
4902 db_vnfr
= self
.db
.get_one(
4903 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
4905 if db_vnfr
.get("kdur"):
4907 for kdur
in db_vnfr
["kdur"]:
4908 if kdur
.get("additionalParams"):
4909 kdur
["additionalParams"] = json
.loads(
4910 kdur
["additionalParams"]
4912 kdur_list
.append(kdur
)
4913 db_vnfr
["kdur"] = kdur_list
4914 step
= "Getting vnfd from database"
4915 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
4917 # Sync filesystem before running a primitive
4918 self
.fs
.sync(db_vnfr
["vnfd-id"])
4920 step
= "Getting nsd from database"
4921 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
4923 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
4924 # for backward compatibility
4925 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
4926 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
4927 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
4928 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4930 # look for primitive
4931 config_primitive_desc
= descriptor_configuration
= None
4933 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
4935 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
4937 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
4939 descriptor_configuration
= db_nsd
.get("ns-configuration")
4941 if descriptor_configuration
and descriptor_configuration
.get(
4944 for config_primitive
in descriptor_configuration
["config-primitive"]:
4945 if config_primitive
["name"] == primitive
:
4946 config_primitive_desc
= config_primitive
4949 if not config_primitive_desc
:
4950 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
4952 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
4956 primitive_name
= primitive
4957 ee_descriptor_id
= None
4959 primitive_name
= config_primitive_desc
.get(
4960 "execution-environment-primitive", primitive
4962 ee_descriptor_id
= config_primitive_desc
.get(
4963 "execution-environment-ref"
4969 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
4971 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
4974 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
4976 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
4978 desc_params
= parse_yaml_strings(
4979 db_vnfr
.get("additionalParamsForVnf")
4982 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
4983 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
4984 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
4986 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
4987 actions
.add(primitive
["name"])
4988 for primitive
in kdu_configuration
.get("config-primitive", []):
4989 actions
.add(primitive
["name"])
4991 nsr_deployed
["K8s"],
4992 lambda kdu
: kdu_name
== kdu
["kdu-name"]
4993 and kdu
["member-vnf-index"] == vnf_index
,
4997 if primitive_name
in actions
4998 and kdu
["k8scluster-type"] not in ("helm-chart", "helm-chart-v3")
5002 # TODO check if ns is in a proper status
5004 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
5006 # kdur and desc_params already set from before
5007 if primitive_params
:
5008 desc_params
.update(primitive_params
)
5009 # TODO Check if we will need something at vnf level
5010 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
5012 kdu_name
== kdu
["kdu-name"]
5013 and kdu
["member-vnf-index"] == vnf_index
5018 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
5021 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
5022 msg
= "unknown k8scluster-type '{}'".format(
5023 kdu
.get("k8scluster-type")
5025 raise LcmException(msg
)
5028 "collection": "nsrs",
5029 "filter": {"_id": nsr_id
},
5030 "path": "_admin.deployed.K8s.{}".format(index
),
5034 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
5036 step
= "Executing kdu {}".format(primitive_name
)
5037 if primitive_name
== "upgrade":
5038 if desc_params
.get("kdu_model"):
5039 kdu_model
= desc_params
.get("kdu_model")
5040 del desc_params
["kdu_model"]
5042 kdu_model
= kdu
.get("kdu-model")
5043 parts
= kdu_model
.split(sep
=":")
5045 kdu_model
= parts
[0]
5046 if desc_params
.get("kdu_atomic_upgrade"):
5047 atomic_upgrade
= desc_params
.get(
5048 "kdu_atomic_upgrade"
5049 ).lower() in ("yes", "true", "1")
5050 del desc_params
["kdu_atomic_upgrade"]
5052 atomic_upgrade
= True
5054 detailed_status
= await asyncio
.wait_for(
5055 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
5056 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5057 kdu_instance
=kdu
.get("kdu-instance"),
5058 atomic
=atomic_upgrade
,
5059 kdu_model
=kdu_model
,
5062 timeout
=timeout_ns_action
,
5064 timeout
=timeout_ns_action
+ 10,
5067 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
5069 elif primitive_name
== "rollback":
5070 detailed_status
= await asyncio
.wait_for(
5071 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
5072 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5073 kdu_instance
=kdu
.get("kdu-instance"),
5076 timeout
=timeout_ns_action
,
5078 elif primitive_name
== "status":
5079 detailed_status
= await asyncio
.wait_for(
5080 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
5081 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5082 kdu_instance
=kdu
.get("kdu-instance"),
5085 timeout
=timeout_ns_action
,
5088 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
5089 kdu
["kdu-name"], nsr_id
5091 params
= self
._map
_primitive
_params
(
5092 config_primitive_desc
, primitive_params
, desc_params
5095 detailed_status
= await asyncio
.wait_for(
5096 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
5097 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5098 kdu_instance
=kdu_instance
,
5099 primitive_name
=primitive_name
,
5102 timeout
=timeout_ns_action
,
5105 timeout
=timeout_ns_action
,
5109 nslcmop_operation_state
= "COMPLETED"
5111 detailed_status
= ""
5112 nslcmop_operation_state
= "FAILED"
5114 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5115 nsr_deployed
["VCA"],
5116 member_vnf_index
=vnf_index
,
5118 vdu_count_index
=vdu_count_index
,
5119 ee_descriptor_id
=ee_descriptor_id
,
5121 for vca_index
, vca_deployed
in enumerate(
5122 db_nsr
["_admin"]["deployed"]["VCA"]
5124 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5126 "collection": "nsrs",
5127 "filter": {"_id": nsr_id
},
5128 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
5132 nslcmop_operation_state
,
5134 ) = await self
._ns
_execute
_primitive
(
5136 primitive
=primitive_name
,
5137 primitive_params
=self
._map
_primitive
_params
(
5138 config_primitive_desc
, primitive_params
, desc_params
5140 timeout
=timeout_ns_action
,
5146 db_nslcmop_update
["detailed-status"] = detailed_status
5147 error_description_nslcmop
= (
5148 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
5152 + "Done with result {} {}".format(
5153 nslcmop_operation_state
, detailed_status
5156 return # database update is called inside finally
5158 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5159 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5161 except asyncio
.CancelledError
:
5163 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5165 exc
= "Operation was cancelled"
5166 except asyncio
.TimeoutError
:
5167 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5169 except Exception as e
:
5170 exc
= traceback
.format_exc()
5171 self
.logger
.critical(
5172 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5181 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5182 nslcmop_operation_state
= "FAILED"
5184 self
._write
_ns
_status
(
5188 ], # TODO check if degraded. For the moment use previous status
5189 current_operation
="IDLE",
5190 current_operation_id
=None,
5191 # error_description=error_description_nsr,
5192 # error_detail=error_detail,
5193 other_update
=db_nsr_update
,
5196 self
._write
_op
_status
(
5199 error_message
=error_description_nslcmop
,
5200 operation_state
=nslcmop_operation_state
,
5201 other_update
=db_nslcmop_update
,
5204 if nslcmop_operation_state
:
5206 await self
.msg
.aiowrite(
5211 "nslcmop_id": nslcmop_id
,
5212 "operationState": nslcmop_operation_state
,
5216 except Exception as e
:
5218 logging_text
+ "kafka_write notification Exception {}".format(e
)
5220 self
.logger
.debug(logging_text
+ "Exit")
5221 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5222 return nslcmop_operation_state
, detailed_status
5224 async def terminate_vdus(
5225 self
, db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
5227 """This method terminates VDUs
5230 db_vnfr: VNF instance record
5231 member_vnf_index: VNF index to identify the VDUs to be removed
5232 db_nsr: NS instance record
5233 update_db_nslcmops: Nslcmop update record
5235 vca_scaling_info
= []
5236 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5237 scaling_info
["scaling_direction"] = "IN"
5238 scaling_info
["vdu-delete"] = {}
5239 scaling_info
["kdu-delete"] = {}
5240 db_vdur
= db_vnfr
.get("vdur")
5241 vdur_list
= copy(db_vdur
)
5243 for index
, vdu
in enumerate(vdur_list
):
5244 vca_scaling_info
.append(
5246 "osm_vdu_id": vdu
["vdu-id-ref"],
5247 "member-vnf-index": member_vnf_index
,
5249 "vdu_index": count_index
,
5252 scaling_info
["vdu-delete"][vdu
["vdu-id-ref"]] = count_index
5253 scaling_info
["vdu"].append(
5255 "name": vdu
.get("name") or vdu
.get("vdu-name"),
5256 "vdu_id": vdu
["vdu-id-ref"],
5260 for interface
in vdu
["interfaces"]:
5261 scaling_info
["vdu"][index
]["interface"].append(
5263 "name": interface
["name"],
5264 "ip_address": interface
["ip-address"],
5265 "mac_address": interface
.get("mac-address"),
5268 self
.logger
.info("NS update scaling info{}".format(scaling_info
))
5269 stage
[2] = "Terminating VDUs"
5270 if scaling_info
.get("vdu-delete"):
5271 # scale_process = "RO"
5272 if self
.ro_config
.ng
:
5273 await self
._scale
_ng
_ro
(
5282 async def remove_vnf(self
, nsr_id
, nslcmop_id
, vnf_instance_id
):
5283 """This method is to Remove VNF instances from NS.
5286 nsr_id: NS instance id
5287 nslcmop_id: nslcmop id of update
5288 vnf_instance_id: id of the VNF instance to be removed
5291 result: (str, str) COMPLETED/FAILED, details
5295 logging_text
= "Task ns={} update ".format(nsr_id
)
5296 check_vnfr_count
= len(self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}))
5297 self
.logger
.info("check_vnfr_count {}".format(check_vnfr_count
))
5298 if check_vnfr_count
> 1:
5299 stage
= ["", "", ""]
5300 step
= "Getting nslcmop from database"
5302 step
+ " after having waited for previous tasks to be completed"
5304 # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5305 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5306 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5307 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5308 """ db_vnfr = self.db.get_one(
5309 "vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id}) """
5311 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5312 await self
.terminate_vdus(
5321 constituent_vnfr
= db_nsr
.get("constituent-vnfr-ref")
5322 constituent_vnfr
.remove(db_vnfr
.get("_id"))
5323 db_nsr_update
["constituent-vnfr-ref"] = db_nsr
.get(
5324 "constituent-vnfr-ref"
5326 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5327 self
.db
.del_one("vnfrs", {"_id": db_vnfr
.get("_id")})
5328 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5329 return "COMPLETED", "Done"
5331 step
= "Terminate VNF Failed with"
5333 "{} Cannot terminate the last VNF in this NS.".format(
5337 except (LcmException
, asyncio
.CancelledError
):
5339 except Exception as e
:
5340 self
.logger
.debug("Error removing VNF {}".format(e
))
5341 return "FAILED", "Error removing VNF {}".format(e
)
5343 async def _ns_redeploy_vnf(
5351 """This method updates and redeploys VNF instances
5354 nsr_id: NS instance id
5355 nslcmop_id: nslcmop id
5356 db_vnfd: VNF descriptor
5357 db_vnfr: VNF instance record
5358 db_nsr: NS instance record
5361 result: (str, str) COMPLETED/FAILED, details
5365 stage
= ["", "", ""]
5366 logging_text
= "Task ns={} update ".format(nsr_id
)
5367 latest_vnfd_revision
= db_vnfd
["_admin"].get("revision")
5368 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5370 # Terminate old VNF resources
5371 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5372 await self
.terminate_vdus(
5381 # old_vnfd_id = db_vnfr["vnfd-id"]
5382 # new_db_vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
5383 new_db_vnfd
= db_vnfd
5384 # new_vnfd_ref = new_db_vnfd["id"]
5385 # new_vnfd_id = vnfd_id
5389 for cp
in new_db_vnfd
.get("ext-cpd", ()):
5391 "name": cp
.get("id"),
5392 "connection-point-id": cp
.get("int-cpd", {}).get("cpd"),
5393 "connection-point-vdu-id": cp
.get("int-cpd", {}).get("vdu-id"),
5396 new_vnfr_cp
.append(vnf_cp
)
5397 new_vdur
= update_db_nslcmops
["operationParams"]["newVdur"]
5398 # new_vdur = self._create_vdur_descriptor_from_vnfd(db_nsd, db_vnfd, old_db_vnfd, vnfd_id, db_nsr, member_vnf_index)
5399 # new_vnfr_update = {"vnfd-ref": new_vnfd_ref, "vnfd-id": new_vnfd_id, "connection-point": new_vnfr_cp, "vdur": new_vdur, "ip-address": ""}
5401 "revision": latest_vnfd_revision
,
5402 "connection-point": new_vnfr_cp
,
5406 self
.update_db_2("vnfrs", db_vnfr
["_id"], new_vnfr_update
)
5407 updated_db_vnfr
= self
.db
.get_one(
5409 {"member-vnf-index-ref": member_vnf_index
, "nsr-id-ref": nsr_id
},
5412 # Instantiate new VNF resources
5413 # update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5414 vca_scaling_info
= []
5415 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5416 scaling_info
["scaling_direction"] = "OUT"
5417 scaling_info
["vdu-create"] = {}
5418 scaling_info
["kdu-create"] = {}
5419 vdud_instantiate_list
= db_vnfd
["vdu"]
5420 for index
, vdud
in enumerate(vdud_instantiate_list
):
5421 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(vdud
, db_vnfd
)
5423 additional_params
= (
5424 self
._get
_vdu
_additional
_params
(updated_db_vnfr
, vdud
["id"])
5427 cloud_init_list
= []
5429 # TODO Information of its own ip is not available because db_vnfr is not updated.
5430 additional_params
["OSM"] = get_osm_params(
5431 updated_db_vnfr
, vdud
["id"], 1
5433 cloud_init_list
.append(
5434 self
._parse
_cloud
_init
(
5441 vca_scaling_info
.append(
5443 "osm_vdu_id": vdud
["id"],
5444 "member-vnf-index": member_vnf_index
,
5446 "vdu_index": count_index
,
5449 scaling_info
["vdu-create"][vdud
["id"]] = count_index
5450 if self
.ro_config
.ng
:
5452 "New Resources to be deployed: {}".format(scaling_info
)
5454 await self
._scale
_ng
_ro
(
5462 return "COMPLETED", "Done"
5463 except (LcmException
, asyncio
.CancelledError
):
5465 except Exception as e
:
5466 self
.logger
.debug("Error updating VNF {}".format(e
))
5467 return "FAILED", "Error updating VNF {}".format(e
)
5469 async def _ns_charm_upgrade(
5475 timeout
: float = None,
5477 """This method upgrade charms in VNF instances
5480 ee_id: Execution environment id
5481 path: Local path to the charm
5483 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
5484 timeout: (Float) Timeout for the ns update operation
5487 result: (str, str) COMPLETED/FAILED, details
5490 charm_type
= charm_type
or "lxc_proxy_charm"
5491 output
= await self
.vca_map
[charm_type
].upgrade_charm(
5495 charm_type
=charm_type
,
5496 timeout
=timeout
or self
.timeout
.ns_update
,
5500 return "COMPLETED", output
5502 except (LcmException
, asyncio
.CancelledError
):
5505 except Exception as e
:
5507 self
.logger
.debug("Error upgrading charm {}".format(path
))
5509 return "FAILED", "Error upgrading charm {}: {}".format(path
, e
)
5511 async def update(self
, nsr_id
, nslcmop_id
):
5512 """Update NS according to different update types
5514 This method performs upgrade of VNF instances then updates the revision
5515 number in VNF record
5518 nsr_id: Network service will be updated
5519 nslcmop_id: ns lcm operation id
5522 It may raise DbException, LcmException, N2VCException, K8sException
5525 # Try to lock HA task here
5526 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5527 if not task_is_locked_by_me
:
5530 logging_text
= "Task ns={} update={} ".format(nsr_id
, nslcmop_id
)
5531 self
.logger
.debug(logging_text
+ "Enter")
5533 # Set the required variables to be filled up later
5535 db_nslcmop_update
= {}
5537 nslcmop_operation_state
= None
5539 error_description_nslcmop
= ""
5541 change_type
= "updated"
5542 detailed_status
= ""
5543 member_vnf_index
= None
5546 # wait for any previous tasks in process
5547 step
= "Waiting for previous operations to terminate"
5548 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5549 self
._write
_ns
_status
(
5552 current_operation
="UPDATING",
5553 current_operation_id
=nslcmop_id
,
5556 step
= "Getting nslcmop from database"
5557 db_nslcmop
= self
.db
.get_one(
5558 "nslcmops", {"_id": nslcmop_id
}, fail_on_empty
=False
5560 update_type
= db_nslcmop
["operationParams"]["updateType"]
5562 step
= "Getting nsr from database"
5563 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5564 old_operational_status
= db_nsr
["operational-status"]
5565 db_nsr_update
["operational-status"] = "updating"
5566 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5567 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5569 if update_type
== "CHANGE_VNFPKG":
5571 # Get the input parameters given through update request
5572 vnf_instance_id
= db_nslcmop
["operationParams"][
5573 "changeVnfPackageData"
5574 ].get("vnfInstanceId")
5576 vnfd_id
= db_nslcmop
["operationParams"]["changeVnfPackageData"].get(
5579 timeout_seconds
= db_nslcmop
["operationParams"].get("timeout_ns_update")
5581 step
= "Getting vnfr from database"
5582 db_vnfr
= self
.db
.get_one(
5583 "vnfrs", {"_id": vnf_instance_id
}, fail_on_empty
=False
5586 step
= "Getting vnfds from database"
5588 latest_vnfd
= self
.db
.get_one(
5589 "vnfds", {"_id": vnfd_id
}, fail_on_empty
=False
5591 latest_vnfd_revision
= latest_vnfd
["_admin"].get("revision")
5594 current_vnf_revision
= db_vnfr
.get("revision", 1)
5595 current_vnfd
= self
.db
.get_one(
5597 {"_id": vnfd_id
+ ":" + str(current_vnf_revision
)},
5598 fail_on_empty
=False,
5600 # Charm artifact paths will be filled up later
5602 current_charm_artifact_path
,
5603 target_charm_artifact_path
,
5604 charm_artifact_paths
,
5606 ) = ([], [], [], [])
5608 step
= "Checking if revision has changed in VNFD"
5609 if current_vnf_revision
!= latest_vnfd_revision
:
5611 change_type
= "policy_updated"
5613 # There is new revision of VNFD, update operation is required
5614 current_vnfd_path
= vnfd_id
+ ":" + str(current_vnf_revision
)
5615 latest_vnfd_path
= vnfd_id
+ ":" + str(latest_vnfd_revision
)
5617 step
= "Removing the VNFD packages if they exist in the local path"
5618 shutil
.rmtree(self
.fs
.path
+ current_vnfd_path
, ignore_errors
=True)
5619 shutil
.rmtree(self
.fs
.path
+ latest_vnfd_path
, ignore_errors
=True)
5621 step
= "Get the VNFD packages from FSMongo"
5622 self
.fs
.sync(from_path
=latest_vnfd_path
)
5623 self
.fs
.sync(from_path
=current_vnfd_path
)
5626 "Get the charm-type, charm-id, ee-id if there is deployed VCA"
5628 current_base_folder
= current_vnfd
["_admin"]["storage"]
5629 latest_base_folder
= latest_vnfd
["_admin"]["storage"]
5631 for vca_index
, vca_deployed
in enumerate(
5632 get_iterable(nsr_deployed
, "VCA")
5634 vnf_index
= db_vnfr
.get("member-vnf-index-ref")
5636 # Getting charm-id and charm-type
5637 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5638 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5639 vca_type
= vca_deployed
.get("type")
5640 vdu_count_index
= vca_deployed
.get("vdu_count_index")
5643 ee_id
= vca_deployed
.get("ee_id")
5645 step
= "Getting descriptor config"
5646 if current_vnfd
.get("kdu"):
5648 search_key
= "kdu_name"
5650 search_key
= "vnfd_id"
5652 entity_id
= vca_deployed
.get(search_key
)
5654 descriptor_config
= get_configuration(
5655 current_vnfd
, entity_id
5658 if "execution-environment-list" in descriptor_config
:
5659 ee_list
= descriptor_config
.get(
5660 "execution-environment-list", []
5665 # There could be several charm used in the same VNF
5666 for ee_item
in ee_list
:
5667 if ee_item
.get("juju"):
5669 step
= "Getting charm name"
5670 charm_name
= ee_item
["juju"].get("charm")
5672 step
= "Setting Charm artifact paths"
5673 current_charm_artifact_path
.append(
5674 get_charm_artifact_path(
5675 current_base_folder
,
5678 current_vnf_revision
,
5681 target_charm_artifact_path
.append(
5682 get_charm_artifact_path(
5686 latest_vnfd_revision
,
5689 elif ee_item
.get("helm-chart"):
5690 # add chart to list and all parameters
5691 step
= "Getting helm chart name"
5692 chart_name
= ee_item
.get("helm-chart")
5694 ee_item
.get("helm-version")
5695 and ee_item
.get("helm-version") == "v2"
5699 vca_type
= "helm-v3"
5700 step
= "Setting Helm chart artifact paths"
5702 helm_artifacts
.append(
5704 "current_artifact_path": get_charm_artifact_path(
5705 current_base_folder
,
5708 current_vnf_revision
,
5710 "target_artifact_path": get_charm_artifact_path(
5714 latest_vnfd_revision
,
5717 "vca_index": vca_index
,
5718 "vdu_index": vdu_count_index
,
5722 charm_artifact_paths
= zip(
5723 current_charm_artifact_path
, target_charm_artifact_path
5726 step
= "Checking if software version has changed in VNFD"
5727 if find_software_version(current_vnfd
) != find_software_version(
5731 step
= "Checking if existing VNF has charm"
5732 for current_charm_path
, target_charm_path
in list(
5733 charm_artifact_paths
5735 if current_charm_path
:
5737 "Software version change is not supported as VNF instance {} has charm.".format(
5742 # There is no change in the charm package, then redeploy the VNF
5743 # based on new descriptor
5744 step
= "Redeploying VNF"
5745 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5746 (result
, detailed_status
) = await self
._ns
_redeploy
_vnf
(
5747 nsr_id
, nslcmop_id
, latest_vnfd
, db_vnfr
, db_nsr
5749 if result
== "FAILED":
5750 nslcmop_operation_state
= result
5751 error_description_nslcmop
= detailed_status
5752 db_nslcmop_update
["detailed-status"] = detailed_status
5755 + " step {} Done with result {} {}".format(
5756 step
, nslcmop_operation_state
, detailed_status
5761 step
= "Checking if any charm package has changed or not"
5762 for current_charm_path
, target_charm_path
in list(
5763 charm_artifact_paths
5767 and target_charm_path
5768 and self
.check_charm_hash_changed(
5769 current_charm_path
, target_charm_path
5773 step
= "Checking whether VNF uses juju bundle"
5774 if check_juju_bundle_existence(current_vnfd
):
5777 "Charm upgrade is not supported for the instance which"
5778 " uses juju-bundle: {}".format(
5779 check_juju_bundle_existence(current_vnfd
)
5783 step
= "Upgrading Charm"
5787 ) = await self
._ns
_charm
_upgrade
(
5790 charm_type
=vca_type
,
5791 path
=self
.fs
.path
+ target_charm_path
,
5792 timeout
=timeout_seconds
,
5795 if result
== "FAILED":
5796 nslcmop_operation_state
= result
5797 error_description_nslcmop
= detailed_status
5799 db_nslcmop_update
["detailed-status"] = detailed_status
5802 + " step {} Done with result {} {}".format(
5803 step
, nslcmop_operation_state
, detailed_status
5807 step
= "Updating policies"
5808 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5809 result
= "COMPLETED"
5810 detailed_status
= "Done"
5811 db_nslcmop_update
["detailed-status"] = "Done"
5814 for item
in helm_artifacts
:
5816 item
["current_artifact_path"]
5817 and item
["target_artifact_path"]
5818 and self
.check_charm_hash_changed(
5819 item
["current_artifact_path"],
5820 item
["target_artifact_path"],
5824 db_update_entry
= "_admin.deployed.VCA.{}.".format(
5827 vnfr_id
= db_vnfr
["_id"]
5828 osm_config
= {"osm": {"ns_id": nsr_id
, "vnf_id": vnfr_id
}}
5830 "collection": "nsrs",
5831 "filter": {"_id": nsr_id
},
5832 "path": db_update_entry
,
5834 vca_type
, namespace
, helm_id
= get_ee_id_parts(item
["ee_id"])
5835 await self
.vca_map
[vca_type
].upgrade_execution_environment(
5836 namespace
=namespace
,
5840 artifact_path
=item
["target_artifact_path"],
5843 vnf_id
= db_vnfr
.get("vnfd-ref")
5844 config_descriptor
= get_configuration(latest_vnfd
, vnf_id
)
5845 self
.logger
.debug("get ssh key block")
5849 ("config-access", "ssh-access", "required"),
5851 # Needed to inject a ssh key
5854 ("config-access", "ssh-access", "default-user"),
5857 "Install configuration Software, getting public ssh key"
5859 pub_key
= await self
.vca_map
[
5861 ].get_ee_ssh_public__key(
5862 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
5866 "Insert public key into VM user={} ssh_key={}".format(
5870 self
.logger
.debug(logging_text
+ step
)
5872 # wait for RO (ip-address) Insert pub_key into VM
5873 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
5883 initial_config_primitive_list
= config_descriptor
.get(
5884 "initial-config-primitive"
5886 config_primitive
= next(
5889 for p
in initial_config_primitive_list
5890 if p
["name"] == "config"
5894 if not config_primitive
:
5897 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
5899 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
5900 if db_vnfr
.get("additionalParamsForVnf"):
5901 deploy_params
.update(
5903 db_vnfr
["additionalParamsForVnf"].copy()
5906 primitive_params_
= self
._map
_primitive
_params
(
5907 config_primitive
, {}, deploy_params
5910 step
= "execute primitive '{}' params '{}'".format(
5911 config_primitive
["name"], primitive_params_
5913 self
.logger
.debug(logging_text
+ step
)
5914 await self
.vca_map
[vca_type
].exec_primitive(
5916 primitive_name
=config_primitive
["name"],
5917 params_dict
=primitive_params_
,
5923 step
= "Updating policies"
5924 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5925 detailed_status
= "Done"
5926 db_nslcmop_update
["detailed-status"] = "Done"
5928 # If nslcmop_operation_state is None, so any operation is not failed.
5929 if not nslcmop_operation_state
:
5930 nslcmop_operation_state
= "COMPLETED"
5932 # If update CHANGE_VNFPKG nslcmop_operation is successful
5933 # vnf revision need to be updated
5934 vnfr_update
["revision"] = latest_vnfd_revision
5935 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
5939 + " task Done with result {} {}".format(
5940 nslcmop_operation_state
, detailed_status
5943 elif update_type
== "REMOVE_VNF":
5944 # This part is included in https://osm.etsi.org/gerrit/11876
5945 vnf_instance_id
= db_nslcmop
["operationParams"]["removeVnfInstanceId"]
5946 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5947 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5948 step
= "Removing VNF"
5949 (result
, detailed_status
) = await self
.remove_vnf(
5950 nsr_id
, nslcmop_id
, vnf_instance_id
5952 if result
== "FAILED":
5953 nslcmop_operation_state
= result
5954 error_description_nslcmop
= detailed_status
5955 db_nslcmop_update
["detailed-status"] = detailed_status
5956 change_type
= "vnf_terminated"
5957 if not nslcmop_operation_state
:
5958 nslcmop_operation_state
= "COMPLETED"
5961 + " task Done with result {} {}".format(
5962 nslcmop_operation_state
, detailed_status
5966 elif update_type
== "OPERATE_VNF":
5967 vnf_id
= db_nslcmop
["operationParams"]["operateVnfData"][
5970 operation_type
= db_nslcmop
["operationParams"]["operateVnfData"][
5973 additional_param
= db_nslcmop
["operationParams"]["operateVnfData"][
5976 (result
, detailed_status
) = await self
.rebuild_start_stop(
5977 nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
5979 if result
== "FAILED":
5980 nslcmop_operation_state
= result
5981 error_description_nslcmop
= detailed_status
5982 db_nslcmop_update
["detailed-status"] = detailed_status
5983 if not nslcmop_operation_state
:
5984 nslcmop_operation_state
= "COMPLETED"
5987 + " task Done with result {} {}".format(
5988 nslcmop_operation_state
, detailed_status
5992 # If nslcmop_operation_state is None, so any operation is not failed.
5993 # All operations are executed in overall.
5994 if not nslcmop_operation_state
:
5995 nslcmop_operation_state
= "COMPLETED"
5996 db_nsr_update
["operational-status"] = old_operational_status
5998 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5999 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
6001 except asyncio
.CancelledError
:
6003 logging_text
+ "Cancelled Exception while '{}'".format(step
)
6005 exc
= "Operation was cancelled"
6006 except asyncio
.TimeoutError
:
6007 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
6009 except Exception as e
:
6010 exc
= traceback
.format_exc()
6011 self
.logger
.critical(
6012 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6021 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
6022 nslcmop_operation_state
= "FAILED"
6023 db_nsr_update
["operational-status"] = old_operational_status
6025 self
._write
_ns
_status
(
6027 ns_state
=db_nsr
["nsState"],
6028 current_operation
="IDLE",
6029 current_operation_id
=None,
6030 other_update
=db_nsr_update
,
6033 self
._write
_op
_status
(
6036 error_message
=error_description_nslcmop
,
6037 operation_state
=nslcmop_operation_state
,
6038 other_update
=db_nslcmop_update
,
6041 if nslcmop_operation_state
:
6045 "nslcmop_id": nslcmop_id
,
6046 "operationState": nslcmop_operation_state
,
6049 change_type
in ("vnf_terminated", "policy_updated")
6050 and member_vnf_index
6052 msg
.update({"vnf_member_index": member_vnf_index
})
6053 await self
.msg
.aiowrite("ns", change_type
, msg
, loop
=self
.loop
)
6054 except Exception as e
:
6056 logging_text
+ "kafka_write notification Exception {}".format(e
)
6058 self
.logger
.debug(logging_text
+ "Exit")
6059 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_update")
6060 return nslcmop_operation_state
, detailed_status
6062 async def scale(self
, nsr_id
, nslcmop_id
):
6063 # Try to lock HA task here
6064 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
6065 if not task_is_locked_by_me
:
6068 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
6069 stage
= ["", "", ""]
6070 tasks_dict_info
= {}
6071 # ^ stage, step, VIM progress
6072 self
.logger
.debug(logging_text
+ "Enter")
6073 # get all needed from database
6075 db_nslcmop_update
= {}
6078 # in case of error, indicates what part of scale was failed to put nsr at error status
6079 scale_process
= None
6080 old_operational_status
= ""
6081 old_config_status
= ""
6084 # wait for any previous tasks in process
6085 step
= "Waiting for previous operations to terminate"
6086 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
6087 self
._write
_ns
_status
(
6090 current_operation
="SCALING",
6091 current_operation_id
=nslcmop_id
,
6094 step
= "Getting nslcmop from database"
6096 step
+ " after having waited for previous tasks to be completed"
6098 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
6100 step
= "Getting nsr from database"
6101 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
6102 old_operational_status
= db_nsr
["operational-status"]
6103 old_config_status
= db_nsr
["config-status"]
6105 step
= "Parsing scaling parameters"
6106 db_nsr_update
["operational-status"] = "scaling"
6107 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6108 nsr_deployed
= db_nsr
["_admin"].get("deployed")
6110 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
6112 ]["member-vnf-index"]
6113 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
6115 ]["scaling-group-descriptor"]
6116 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
6117 # for backward compatibility
6118 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
6119 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
6120 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
6121 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6123 step
= "Getting vnfr from database"
6124 db_vnfr
= self
.db
.get_one(
6125 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
6128 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
6130 step
= "Getting vnfd from database"
6131 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
6133 base_folder
= db_vnfd
["_admin"]["storage"]
6135 step
= "Getting scaling-group-descriptor"
6136 scaling_descriptor
= find_in_list(
6137 get_scaling_aspect(db_vnfd
),
6138 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
6140 if not scaling_descriptor
:
6142 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
6143 "at vnfd:scaling-group-descriptor".format(scaling_group
)
6146 step
= "Sending scale order to VIM"
6147 # TODO check if ns is in a proper status
6149 if not db_nsr
["_admin"].get("scaling-group"):
6154 "_admin.scaling-group": [
6155 {"name": scaling_group
, "nb-scale-op": 0}
6159 admin_scale_index
= 0
6161 for admin_scale_index
, admin_scale_info
in enumerate(
6162 db_nsr
["_admin"]["scaling-group"]
6164 if admin_scale_info
["name"] == scaling_group
:
6165 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
6167 else: # not found, set index one plus last element and add new entry with the name
6168 admin_scale_index
+= 1
6170 "_admin.scaling-group.{}.name".format(admin_scale_index
)
6173 vca_scaling_info
= []
6174 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
6175 if scaling_type
== "SCALE_OUT":
6176 if "aspect-delta-details" not in scaling_descriptor
:
6178 "Aspect delta details not fount in scaling descriptor {}".format(
6179 scaling_descriptor
["name"]
6182 # count if max-instance-count is reached
6183 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6185 scaling_info
["scaling_direction"] = "OUT"
6186 scaling_info
["vdu-create"] = {}
6187 scaling_info
["kdu-create"] = {}
6188 for delta
in deltas
:
6189 for vdu_delta
in delta
.get("vdu-delta", {}):
6190 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
6191 # vdu_index also provides the number of instance of the targeted vdu
6192 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6193 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
6197 additional_params
= (
6198 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
6201 cloud_init_list
= []
6203 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6204 max_instance_count
= 10
6205 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
6206 max_instance_count
= vdu_profile
.get(
6207 "max-number-of-instances", 10
6210 default_instance_num
= get_number_of_instances(
6213 instances_number
= vdu_delta
.get("number-of-instances", 1)
6214 nb_scale_op
+= instances_number
6216 new_instance_count
= nb_scale_op
+ default_instance_num
6217 # Control if new count is over max and vdu count is less than max.
6218 # Then assign new instance count
6219 if new_instance_count
> max_instance_count
> vdu_count
:
6220 instances_number
= new_instance_count
- max_instance_count
6222 instances_number
= instances_number
6224 if new_instance_count
> max_instance_count
:
6226 "reached the limit of {} (max-instance-count) "
6227 "scaling-out operations for the "
6228 "scaling-group-descriptor '{}'".format(
6229 nb_scale_op
, scaling_group
6232 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6234 # TODO Information of its own ip is not available because db_vnfr is not updated.
6235 additional_params
["OSM"] = get_osm_params(
6236 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
6238 cloud_init_list
.append(
6239 self
._parse
_cloud
_init
(
6246 vca_scaling_info
.append(
6248 "osm_vdu_id": vdu_delta
["id"],
6249 "member-vnf-index": vnf_index
,
6251 "vdu_index": vdu_index
+ x
,
6254 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
6255 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6256 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6257 kdu_name
= kdu_profile
["kdu-name"]
6258 resource_name
= kdu_profile
.get("resource-name", "")
6260 # Might have different kdus in the same delta
6261 # Should have list for each kdu
6262 if not scaling_info
["kdu-create"].get(kdu_name
, None):
6263 scaling_info
["kdu-create"][kdu_name
] = []
6265 kdur
= get_kdur(db_vnfr
, kdu_name
)
6266 if kdur
.get("helm-chart"):
6267 k8s_cluster_type
= "helm-chart-v3"
6268 self
.logger
.debug("kdur: {}".format(kdur
))
6270 kdur
.get("helm-version")
6271 and kdur
.get("helm-version") == "v2"
6273 k8s_cluster_type
= "helm-chart"
6274 elif kdur
.get("juju-bundle"):
6275 k8s_cluster_type
= "juju-bundle"
6278 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6279 "juju-bundle. Maybe an old NBI version is running".format(
6280 db_vnfr
["member-vnf-index-ref"], kdu_name
6284 max_instance_count
= 10
6285 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
6286 max_instance_count
= kdu_profile
.get(
6287 "max-number-of-instances", 10
6290 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
6291 deployed_kdu
, _
= get_deployed_kdu(
6292 nsr_deployed
, kdu_name
, vnf_index
6294 if deployed_kdu
is None:
6296 "KDU '{}' for vnf '{}' not deployed".format(
6300 kdu_instance
= deployed_kdu
.get("kdu-instance")
6301 instance_num
= await self
.k8scluster_map
[
6307 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6308 kdu_model
=deployed_kdu
.get("kdu-model"),
6310 kdu_replica_count
= instance_num
+ kdu_delta
.get(
6311 "number-of-instances", 1
6314 # Control if new count is over max and instance_num is less than max.
6315 # Then assign max instance number to kdu replica count
6316 if kdu_replica_count
> max_instance_count
> instance_num
:
6317 kdu_replica_count
= max_instance_count
6318 if kdu_replica_count
> max_instance_count
:
6320 "reached the limit of {} (max-instance-count) "
6321 "scaling-out operations for the "
6322 "scaling-group-descriptor '{}'".format(
6323 instance_num
, scaling_group
6327 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6328 vca_scaling_info
.append(
6330 "osm_kdu_id": kdu_name
,
6331 "member-vnf-index": vnf_index
,
6333 "kdu_index": instance_num
+ x
- 1,
6336 scaling_info
["kdu-create"][kdu_name
].append(
6338 "member-vnf-index": vnf_index
,
6340 "k8s-cluster-type": k8s_cluster_type
,
6341 "resource-name": resource_name
,
6342 "scale": kdu_replica_count
,
6345 elif scaling_type
== "SCALE_IN":
6346 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6348 scaling_info
["scaling_direction"] = "IN"
6349 scaling_info
["vdu-delete"] = {}
6350 scaling_info
["kdu-delete"] = {}
6352 for delta
in deltas
:
6353 for vdu_delta
in delta
.get("vdu-delta", {}):
6354 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6355 min_instance_count
= 0
6356 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6357 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
6358 min_instance_count
= vdu_profile
["min-number-of-instances"]
6360 default_instance_num
= get_number_of_instances(
6361 db_vnfd
, vdu_delta
["id"]
6363 instance_num
= vdu_delta
.get("number-of-instances", 1)
6364 nb_scale_op
-= instance_num
6366 new_instance_count
= nb_scale_op
+ default_instance_num
6368 if new_instance_count
< min_instance_count
< vdu_count
:
6369 instances_number
= min_instance_count
- new_instance_count
6371 instances_number
= instance_num
6373 if new_instance_count
< min_instance_count
:
6375 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6376 "scaling-group-descriptor '{}'".format(
6377 nb_scale_op
, scaling_group
6380 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6381 vca_scaling_info
.append(
6383 "osm_vdu_id": vdu_delta
["id"],
6384 "member-vnf-index": vnf_index
,
6386 "vdu_index": vdu_index
- 1 - x
,
6389 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
6390 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6391 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6392 kdu_name
= kdu_profile
["kdu-name"]
6393 resource_name
= kdu_profile
.get("resource-name", "")
6395 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
6396 scaling_info
["kdu-delete"][kdu_name
] = []
6398 kdur
= get_kdur(db_vnfr
, kdu_name
)
6399 if kdur
.get("helm-chart"):
6400 k8s_cluster_type
= "helm-chart-v3"
6401 self
.logger
.debug("kdur: {}".format(kdur
))
6403 kdur
.get("helm-version")
6404 and kdur
.get("helm-version") == "v2"
6406 k8s_cluster_type
= "helm-chart"
6407 elif kdur
.get("juju-bundle"):
6408 k8s_cluster_type
= "juju-bundle"
6411 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6412 "juju-bundle. Maybe an old NBI version is running".format(
6413 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
6417 min_instance_count
= 0
6418 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
6419 min_instance_count
= kdu_profile
["min-number-of-instances"]
6421 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
6422 deployed_kdu
, _
= get_deployed_kdu(
6423 nsr_deployed
, kdu_name
, vnf_index
6425 if deployed_kdu
is None:
6427 "KDU '{}' for vnf '{}' not deployed".format(
6431 kdu_instance
= deployed_kdu
.get("kdu-instance")
6432 instance_num
= await self
.k8scluster_map
[
6438 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6439 kdu_model
=deployed_kdu
.get("kdu-model"),
6441 kdu_replica_count
= instance_num
- kdu_delta
.get(
6442 "number-of-instances", 1
6445 if kdu_replica_count
< min_instance_count
< instance_num
:
6446 kdu_replica_count
= min_instance_count
6447 if kdu_replica_count
< min_instance_count
:
6449 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6450 "scaling-group-descriptor '{}'".format(
6451 instance_num
, scaling_group
6455 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6456 vca_scaling_info
.append(
6458 "osm_kdu_id": kdu_name
,
6459 "member-vnf-index": vnf_index
,
6461 "kdu_index": instance_num
- x
- 1,
6464 scaling_info
["kdu-delete"][kdu_name
].append(
6466 "member-vnf-index": vnf_index
,
6468 "k8s-cluster-type": k8s_cluster_type
,
6469 "resource-name": resource_name
,
6470 "scale": kdu_replica_count
,
6474 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
6475 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
6476 if scaling_info
["scaling_direction"] == "IN":
6477 for vdur
in reversed(db_vnfr
["vdur"]):
6478 if vdu_delete
.get(vdur
["vdu-id-ref"]):
6479 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
6480 scaling_info
["vdu"].append(
6482 "name": vdur
.get("name") or vdur
.get("vdu-name"),
6483 "vdu_id": vdur
["vdu-id-ref"],
6487 for interface
in vdur
["interfaces"]:
6488 scaling_info
["vdu"][-1]["interface"].append(
6490 "name": interface
["name"],
6491 "ip_address": interface
["ip-address"],
6492 "mac_address": interface
.get("mac-address"),
6495 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
6498 step
= "Executing pre-scale vnf-config-primitive"
6499 if scaling_descriptor
.get("scaling-config-action"):
6500 for scaling_config_action
in scaling_descriptor
[
6501 "scaling-config-action"
6504 scaling_config_action
.get("trigger") == "pre-scale-in"
6505 and scaling_type
== "SCALE_IN"
6507 scaling_config_action
.get("trigger") == "pre-scale-out"
6508 and scaling_type
== "SCALE_OUT"
6510 vnf_config_primitive
= scaling_config_action
[
6511 "vnf-config-primitive-name-ref"
6513 step
= db_nslcmop_update
[
6515 ] = "executing pre-scale scaling-config-action '{}'".format(
6516 vnf_config_primitive
6519 # look for primitive
6520 for config_primitive
in (
6521 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6522 ).get("config-primitive", ()):
6523 if config_primitive
["name"] == vnf_config_primitive
:
6527 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
6528 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
6529 "primitive".format(scaling_group
, vnf_config_primitive
)
6532 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6533 if db_vnfr
.get("additionalParamsForVnf"):
6534 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6536 scale_process
= "VCA"
6537 db_nsr_update
["config-status"] = "configuring pre-scaling"
6538 primitive_params
= self
._map
_primitive
_params
(
6539 config_primitive
, {}, vnfr_params
6542 # Pre-scale retry check: Check if this sub-operation has been executed before
6543 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6546 vnf_config_primitive
,
6550 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6551 # Skip sub-operation
6552 result
= "COMPLETED"
6553 result_detail
= "Done"
6556 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6557 vnf_config_primitive
, result
, result_detail
6561 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6562 # New sub-operation: Get index of this sub-operation
6564 len(db_nslcmop
.get("_admin", {}).get("operations"))
6569 + "vnf_config_primitive={} New sub-operation".format(
6570 vnf_config_primitive
6574 # retry: Get registered params for this existing sub-operation
6575 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6578 vnf_index
= op
.get("member_vnf_index")
6579 vnf_config_primitive
= op
.get("primitive")
6580 primitive_params
= op
.get("primitive_params")
6583 + "vnf_config_primitive={} Sub-operation retry".format(
6584 vnf_config_primitive
6587 # Execute the primitive, either with new (first-time) or registered (reintent) args
6588 ee_descriptor_id
= config_primitive
.get(
6589 "execution-environment-ref"
6591 primitive_name
= config_primitive
.get(
6592 "execution-environment-primitive", vnf_config_primitive
6594 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6595 nsr_deployed
["VCA"],
6596 member_vnf_index
=vnf_index
,
6598 vdu_count_index
=None,
6599 ee_descriptor_id
=ee_descriptor_id
,
6601 result
, result_detail
= await self
._ns
_execute
_primitive
(
6610 + "vnf_config_primitive={} Done with result {} {}".format(
6611 vnf_config_primitive
, result
, result_detail
6614 # Update operationState = COMPLETED | FAILED
6615 self
._update
_suboperation
_status
(
6616 db_nslcmop
, op_index
, result
, result_detail
6619 if result
== "FAILED":
6620 raise LcmException(result_detail
)
6621 db_nsr_update
["config-status"] = old_config_status
6622 scale_process
= None
6626 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
6629 "_admin.scaling-group.{}.time".format(admin_scale_index
)
6632 # SCALE-IN VCA - BEGIN
6633 if vca_scaling_info
:
6634 step
= db_nslcmop_update
[
6636 ] = "Deleting the execution environments"
6637 scale_process
= "VCA"
6638 for vca_info
in vca_scaling_info
:
6639 if vca_info
["type"] == "delete" and not vca_info
.get("osm_kdu_id"):
6640 member_vnf_index
= str(vca_info
["member-vnf-index"])
6642 logging_text
+ "vdu info: {}".format(vca_info
)
6644 if vca_info
.get("osm_vdu_id"):
6645 vdu_id
= vca_info
["osm_vdu_id"]
6646 vdu_index
= int(vca_info
["vdu_index"])
6649 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6650 member_vnf_index
, vdu_id
, vdu_index
6652 stage
[2] = step
= "Scaling in VCA"
6653 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6654 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
6655 config_update
= db_nsr
["configurationStatus"]
6656 for vca_index
, vca
in enumerate(vca_update
):
6658 (vca
or vca
.get("ee_id"))
6659 and vca
["member-vnf-index"] == member_vnf_index
6660 and vca
["vdu_count_index"] == vdu_index
6662 if vca
.get("vdu_id"):
6663 config_descriptor
= get_configuration(
6664 db_vnfd
, vca
.get("vdu_id")
6666 elif vca
.get("kdu_name"):
6667 config_descriptor
= get_configuration(
6668 db_vnfd
, vca
.get("kdu_name")
6671 config_descriptor
= get_configuration(
6672 db_vnfd
, db_vnfd
["id"]
6674 operation_params
= (
6675 db_nslcmop
.get("operationParams") or {}
6677 exec_terminate_primitives
= not operation_params
.get(
6678 "skip_terminate_primitives"
6679 ) and vca
.get("needed_terminate")
6680 task
= asyncio
.ensure_future(
6689 exec_primitives
=exec_terminate_primitives
,
6693 timeout
=self
.timeout
.charm_delete
,
6696 tasks_dict_info
[task
] = "Terminating VCA {}".format(
6699 del vca_update
[vca_index
]
6700 del config_update
[vca_index
]
6701 # wait for pending tasks of terminate primitives
6705 + "Waiting for tasks {}".format(
6706 list(tasks_dict_info
.keys())
6709 error_list
= await self
._wait
_for
_tasks
(
6713 self
.timeout
.charm_delete
, self
.timeout
.ns_terminate
6718 tasks_dict_info
.clear()
6720 raise LcmException("; ".join(error_list
))
6722 db_vca_and_config_update
= {
6723 "_admin.deployed.VCA": vca_update
,
6724 "configurationStatus": config_update
,
6727 "nsrs", db_nsr
["_id"], db_vca_and_config_update
6729 scale_process
= None
6730 # SCALE-IN VCA - END
6733 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
6734 scale_process
= "RO"
6735 if self
.ro_config
.ng
:
6736 await self
._scale
_ng
_ro
(
6737 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
6739 scaling_info
.pop("vdu-create", None)
6740 scaling_info
.pop("vdu-delete", None)
6742 scale_process
= None
6746 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
6747 scale_process
= "KDU"
6748 await self
._scale
_kdu
(
6749 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6751 scaling_info
.pop("kdu-create", None)
6752 scaling_info
.pop("kdu-delete", None)
6754 scale_process
= None
6758 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6760 # SCALE-UP VCA - BEGIN
6761 if vca_scaling_info
:
6762 step
= db_nslcmop_update
[
6764 ] = "Creating new execution environments"
6765 scale_process
= "VCA"
6766 for vca_info
in vca_scaling_info
:
6767 if vca_info
["type"] == "create" and not vca_info
.get("osm_kdu_id"):
6768 member_vnf_index
= str(vca_info
["member-vnf-index"])
6770 logging_text
+ "vdu info: {}".format(vca_info
)
6772 vnfd_id
= db_vnfr
["vnfd-ref"]
6773 if vca_info
.get("osm_vdu_id"):
6774 vdu_index
= int(vca_info
["vdu_index"])
6775 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
6776 if db_vnfr
.get("additionalParamsForVnf"):
6777 deploy_params
.update(
6779 db_vnfr
["additionalParamsForVnf"].copy()
6782 descriptor_config
= get_configuration(
6783 db_vnfd
, db_vnfd
["id"]
6785 if descriptor_config
:
6790 logging_text
=logging_text
6791 + "member_vnf_index={} ".format(member_vnf_index
),
6794 nslcmop_id
=nslcmop_id
,
6800 member_vnf_index
=member_vnf_index
,
6801 vdu_index
=vdu_index
,
6803 deploy_params
=deploy_params
,
6804 descriptor_config
=descriptor_config
,
6805 base_folder
=base_folder
,
6806 task_instantiation_info
=tasks_dict_info
,
6809 vdu_id
= vca_info
["osm_vdu_id"]
6810 vdur
= find_in_list(
6811 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
6813 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
6814 if vdur
.get("additionalParams"):
6815 deploy_params_vdu
= parse_yaml_strings(
6816 vdur
["additionalParams"]
6819 deploy_params_vdu
= deploy_params
6820 deploy_params_vdu
["OSM"] = get_osm_params(
6821 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
6823 if descriptor_config
:
6828 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6829 member_vnf_index
, vdu_id
, vdu_index
6831 stage
[2] = step
= "Scaling out VCA"
6832 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6834 logging_text
=logging_text
6835 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6836 member_vnf_index
, vdu_id
, vdu_index
6840 nslcmop_id
=nslcmop_id
,
6846 member_vnf_index
=member_vnf_index
,
6847 vdu_index
=vdu_index
,
6849 deploy_params
=deploy_params_vdu
,
6850 descriptor_config
=descriptor_config
,
6851 base_folder
=base_folder
,
6852 task_instantiation_info
=tasks_dict_info
,
6855 # SCALE-UP VCA - END
6856 scale_process
= None
6859 # execute primitive service POST-SCALING
6860 step
= "Executing post-scale vnf-config-primitive"
6861 if scaling_descriptor
.get("scaling-config-action"):
6862 for scaling_config_action
in scaling_descriptor
[
6863 "scaling-config-action"
6866 scaling_config_action
.get("trigger") == "post-scale-in"
6867 and scaling_type
== "SCALE_IN"
6869 scaling_config_action
.get("trigger") == "post-scale-out"
6870 and scaling_type
== "SCALE_OUT"
6872 vnf_config_primitive
= scaling_config_action
[
6873 "vnf-config-primitive-name-ref"
6875 step
= db_nslcmop_update
[
6877 ] = "executing post-scale scaling-config-action '{}'".format(
6878 vnf_config_primitive
6881 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6882 if db_vnfr
.get("additionalParamsForVnf"):
6883 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6885 # look for primitive
6886 for config_primitive
in (
6887 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6888 ).get("config-primitive", ()):
6889 if config_primitive
["name"] == vnf_config_primitive
:
6893 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
6894 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
6895 "config-primitive".format(
6896 scaling_group
, vnf_config_primitive
6899 scale_process
= "VCA"
6900 db_nsr_update
["config-status"] = "configuring post-scaling"
6901 primitive_params
= self
._map
_primitive
_params
(
6902 config_primitive
, {}, vnfr_params
6905 # Post-scale retry check: Check if this sub-operation has been executed before
6906 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6909 vnf_config_primitive
,
6913 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6914 # Skip sub-operation
6915 result
= "COMPLETED"
6916 result_detail
= "Done"
6919 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6920 vnf_config_primitive
, result
, result_detail
6924 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6925 # New sub-operation: Get index of this sub-operation
6927 len(db_nslcmop
.get("_admin", {}).get("operations"))
6932 + "vnf_config_primitive={} New sub-operation".format(
6933 vnf_config_primitive
6937 # retry: Get registered params for this existing sub-operation
6938 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6941 vnf_index
= op
.get("member_vnf_index")
6942 vnf_config_primitive
= op
.get("primitive")
6943 primitive_params
= op
.get("primitive_params")
6946 + "vnf_config_primitive={} Sub-operation retry".format(
6947 vnf_config_primitive
6950 # Execute the primitive, either with new (first-time) or registered (reintent) args
6951 ee_descriptor_id
= config_primitive
.get(
6952 "execution-environment-ref"
6954 primitive_name
= config_primitive
.get(
6955 "execution-environment-primitive", vnf_config_primitive
6957 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6958 nsr_deployed
["VCA"],
6959 member_vnf_index
=vnf_index
,
6961 vdu_count_index
=None,
6962 ee_descriptor_id
=ee_descriptor_id
,
6964 result
, result_detail
= await self
._ns
_execute
_primitive
(
6973 + "vnf_config_primitive={} Done with result {} {}".format(
6974 vnf_config_primitive
, result
, result_detail
6977 # Update operationState = COMPLETED | FAILED
6978 self
._update
_suboperation
_status
(
6979 db_nslcmop
, op_index
, result
, result_detail
6982 if result
== "FAILED":
6983 raise LcmException(result_detail
)
6984 db_nsr_update
["config-status"] = old_config_status
6985 scale_process
= None
6990 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
6991 db_nsr_update
["operational-status"] = (
6993 if old_operational_status
== "failed"
6994 else old_operational_status
6996 db_nsr_update
["config-status"] = old_config_status
6999 ROclient
.ROClientException
,
7004 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7006 except asyncio
.CancelledError
:
7008 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7010 exc
= "Operation was cancelled"
7011 except Exception as e
:
7012 exc
= traceback
.format_exc()
7013 self
.logger
.critical(
7014 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7018 self
._write
_ns
_status
(
7021 current_operation
="IDLE",
7022 current_operation_id
=None,
7025 stage
[1] = "Waiting for instantiate pending tasks."
7026 self
.logger
.debug(logging_text
+ stage
[1])
7027 exc
= await self
._wait
_for
_tasks
(
7030 self
.timeout
.ns_deploy
,
7038 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7039 nslcmop_operation_state
= "FAILED"
7041 db_nsr_update
["operational-status"] = old_operational_status
7042 db_nsr_update
["config-status"] = old_config_status
7043 db_nsr_update
["detailed-status"] = ""
7045 if "VCA" in scale_process
:
7046 db_nsr_update
["config-status"] = "failed"
7047 if "RO" in scale_process
:
7048 db_nsr_update
["operational-status"] = "failed"
7051 ] = "FAILED scaling nslcmop={} {}: {}".format(
7052 nslcmop_id
, step
, exc
7055 error_description_nslcmop
= None
7056 nslcmop_operation_state
= "COMPLETED"
7057 db_nslcmop_update
["detailed-status"] = "Done"
7059 self
._write
_op
_status
(
7062 error_message
=error_description_nslcmop
,
7063 operation_state
=nslcmop_operation_state
,
7064 other_update
=db_nslcmop_update
,
7067 self
._write
_ns
_status
(
7070 current_operation
="IDLE",
7071 current_operation_id
=None,
7072 other_update
=db_nsr_update
,
7075 if nslcmop_operation_state
:
7079 "nslcmop_id": nslcmop_id
,
7080 "operationState": nslcmop_operation_state
,
7082 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
7083 except Exception as e
:
7085 logging_text
+ "kafka_write notification Exception {}".format(e
)
7087 self
.logger
.debug(logging_text
+ "Exit")
7088 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
7090 async def _scale_kdu(
7091 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
7093 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
7094 for kdu_name
in _scaling_info
:
7095 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
7096 deployed_kdu
, index
= get_deployed_kdu(
7097 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
7099 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
7100 kdu_instance
= deployed_kdu
["kdu-instance"]
7101 kdu_model
= deployed_kdu
.get("kdu-model")
7102 scale
= int(kdu_scaling_info
["scale"])
7103 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
7106 "collection": "nsrs",
7107 "filter": {"_id": nsr_id
},
7108 "path": "_admin.deployed.K8s.{}".format(index
),
7111 step
= "scaling application {}".format(
7112 kdu_scaling_info
["resource-name"]
7114 self
.logger
.debug(logging_text
+ step
)
7116 if kdu_scaling_info
["type"] == "delete":
7117 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7120 and kdu_config
.get("terminate-config-primitive")
7121 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7123 terminate_config_primitive_list
= kdu_config
.get(
7124 "terminate-config-primitive"
7126 terminate_config_primitive_list
.sort(
7127 key
=lambda val
: int(val
["seq"])
7131 terminate_config_primitive
7132 ) in terminate_config_primitive_list
:
7133 primitive_params_
= self
._map
_primitive
_params
(
7134 terminate_config_primitive
, {}, {}
7136 step
= "execute terminate config primitive"
7137 self
.logger
.debug(logging_text
+ step
)
7138 await asyncio
.wait_for(
7139 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7140 cluster_uuid
=cluster_uuid
,
7141 kdu_instance
=kdu_instance
,
7142 primitive_name
=terminate_config_primitive
["name"],
7143 params
=primitive_params_
,
7145 total_timeout
=self
.timeout
.primitive
,
7148 timeout
=self
.timeout
.primitive
7149 * self
.timeout
.primitive_outer_factor
,
7152 await asyncio
.wait_for(
7153 self
.k8scluster_map
[k8s_cluster_type
].scale(
7154 kdu_instance
=kdu_instance
,
7156 resource_name
=kdu_scaling_info
["resource-name"],
7157 total_timeout
=self
.timeout
.scale_on_error
,
7159 cluster_uuid
=cluster_uuid
,
7160 kdu_model
=kdu_model
,
7164 timeout
=self
.timeout
.scale_on_error
7165 * self
.timeout
.scale_on_error_outer_factor
,
7168 if kdu_scaling_info
["type"] == "create":
7169 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7172 and kdu_config
.get("initial-config-primitive")
7173 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7175 initial_config_primitive_list
= kdu_config
.get(
7176 "initial-config-primitive"
7178 initial_config_primitive_list
.sort(
7179 key
=lambda val
: int(val
["seq"])
7182 for initial_config_primitive
in initial_config_primitive_list
:
7183 primitive_params_
= self
._map
_primitive
_params
(
7184 initial_config_primitive
, {}, {}
7186 step
= "execute initial config primitive"
7187 self
.logger
.debug(logging_text
+ step
)
7188 await asyncio
.wait_for(
7189 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7190 cluster_uuid
=cluster_uuid
,
7191 kdu_instance
=kdu_instance
,
7192 primitive_name
=initial_config_primitive
["name"],
7193 params
=primitive_params_
,
7200 async def _scale_ng_ro(
7201 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
7203 nsr_id
= db_nslcmop
["nsInstanceId"]
7204 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7207 # read from db: vnfd's for every vnf
7210 # for each vnf in ns, read vnfd
7211 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
7212 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
7213 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
7214 # if we haven't this vnfd, read it from db
7215 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
7217 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7218 db_vnfds
.append(vnfd
)
7219 n2vc_key
= self
.n2vc
.get_public_key()
7220 n2vc_key_list
= [n2vc_key
]
7223 vdu_scaling_info
.get("vdu-create"),
7224 vdu_scaling_info
.get("vdu-delete"),
7227 # db_vnfr has been updated, update db_vnfrs to use it
7228 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
7229 await self
._instantiate
_ng
_ro
(
7239 start_deploy
=time(),
7240 timeout_ns_deploy
=self
.timeout
.ns_deploy
,
7242 if vdu_scaling_info
.get("vdu-delete"):
7244 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
7247 async def extract_prometheus_scrape_jobs(
7248 self
, ee_id
, artifact_path
, ee_config_descriptor
, vnfr_id
, nsr_id
, target_ip
7250 # look if exist a file called 'prometheus*.j2' and
7251 artifact_content
= self
.fs
.dir_ls(artifact_path
)
7255 for f
in artifact_content
7256 if f
.startswith("prometheus") and f
.endswith(".j2")
7262 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
7266 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
7267 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
7269 vnfr_id
= vnfr_id
.replace("-", "")
7271 "JOB_NAME": vnfr_id
,
7272 "TARGET_IP": target_ip
,
7273 "EXPORTER_POD_IP": host_name
,
7274 "EXPORTER_POD_PORT": host_port
,
7276 job_list
= parse_job(job_data
, variables
)
7277 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
7278 for job
in job_list
:
7280 not isinstance(job
.get("job_name"), str)
7281 or vnfr_id
not in job
["job_name"]
7283 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
7284 job
["nsr_id"] = nsr_id
7285 job
["vnfr_id"] = vnfr_id
7288 async def rebuild_start_stop(
7289 self
, nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
7291 logging_text
= "Task ns={} {}={} ".format(nsr_id
, operation_type
, nslcmop_id
)
7292 self
.logger
.info(logging_text
+ "Enter")
7293 stage
= ["Preparing the environment", ""]
7294 # database nsrs record
7298 # in case of error, indicates what part of scale was failed to put nsr at error status
7299 start_deploy
= time()
7301 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_id
})
7302 vim_account_id
= db_vnfr
.get("vim-account-id")
7303 vim_info_key
= "vim:" + vim_account_id
7304 vdu_id
= additional_param
["vdu_id"]
7305 vdurs
= [item
for item
in db_vnfr
["vdur"] if item
["vdu-id-ref"] == vdu_id
]
7306 vdur
= find_in_list(
7307 vdurs
, lambda vdu
: vdu
["count-index"] == additional_param
["count-index"]
7310 vdu_vim_name
= vdur
["name"]
7311 vim_vm_id
= vdur
["vim_info"][vim_info_key
]["vim_id"]
7312 target_vim
, _
= next(k_v
for k_v
in vdur
["vim_info"].items())
7314 raise LcmException("Target vdu is not found")
7315 self
.logger
.info("vdu_vim_name >> {} ".format(vdu_vim_name
))
7316 # wait for any previous tasks in process
7317 stage
[1] = "Waiting for previous operations to terminate"
7318 self
.logger
.info(stage
[1])
7319 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7321 stage
[1] = "Reading from database."
7322 self
.logger
.info(stage
[1])
7323 self
._write
_ns
_status
(
7326 current_operation
=operation_type
.upper(),
7327 current_operation_id
=nslcmop_id
,
7329 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7332 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
7333 db_nsr_update
["operational-status"] = operation_type
7334 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7338 "vim_vm_id": vim_vm_id
,
7340 "vdu_index": additional_param
["count-index"],
7341 "vdu_id": vdur
["id"],
7342 "target_vim": target_vim
,
7343 "vim_account_id": vim_account_id
,
7346 stage
[1] = "Sending rebuild request to RO... {}".format(desc
)
7347 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7348 self
.logger
.info("ro nsr id: {}".format(nsr_id
))
7349 result_dict
= await self
.RO
.operate(nsr_id
, desc
, operation_type
)
7350 self
.logger
.info("response from RO: {}".format(result_dict
))
7351 action_id
= result_dict
["action_id"]
7352 await self
._wait
_ng
_ro
(
7357 self
.timeout
.operate
,
7359 "start_stop_rebuild",
7361 return "COMPLETED", "Done"
7362 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7363 self
.logger
.error("Exit Exception {}".format(e
))
7365 except asyncio
.CancelledError
:
7366 self
.logger
.error("Cancelled Exception while '{}'".format(stage
))
7367 exc
= "Operation was cancelled"
7368 except Exception as e
:
7369 exc
= traceback
.format_exc()
7370 self
.logger
.critical(
7371 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7373 return "FAILED", "Error in operate VNF {}".format(exc
)
7375 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7377 Get VCA Cloud and VCA Cloud Credentials for the VIM account
7379 :param: vim_account_id: VIM Account ID
7381 :return: (cloud_name, cloud_credential)
7383 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7384 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
7386 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7388 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
7390 :param: vim_account_id: VIM Account ID
7392 :return: (cloud_name, cloud_credential)
7394 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7395 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")
7397 async def migrate(self
, nsr_id
, nslcmop_id
):
7399 Migrate VNFs and VDUs instances in a NS
7401 :param: nsr_id: NS Instance ID
7402 :param: nslcmop_id: nslcmop ID of migrate
7405 # Try to lock HA task here
7406 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7407 if not task_is_locked_by_me
:
7409 logging_text
= "Task ns={} migrate ".format(nsr_id
)
7410 self
.logger
.debug(logging_text
+ "Enter")
7411 # get all needed from database
7413 db_nslcmop_update
= {}
7414 nslcmop_operation_state
= None
7418 # in case of error, indicates what part of scale was failed to put nsr at error status
7419 start_deploy
= time()
7422 # wait for any previous tasks in process
7423 step
= "Waiting for previous operations to terminate"
7424 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7426 self
._write
_ns
_status
(
7429 current_operation
="MIGRATING",
7430 current_operation_id
=nslcmop_id
,
7432 step
= "Getting nslcmop from database"
7434 step
+ " after having waited for previous tasks to be completed"
7436 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7437 migrate_params
= db_nslcmop
.get("operationParams")
7440 target
.update(migrate_params
)
7441 desc
= await self
.RO
.migrate(nsr_id
, target
)
7442 self
.logger
.debug("RO return > {}".format(desc
))
7443 action_id
= desc
["action_id"]
7444 await self
._wait
_ng
_ro
(
7449 self
.timeout
.migrate
,
7450 operation
="migrate",
7452 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7453 self
.logger
.error("Exit Exception {}".format(e
))
7455 except asyncio
.CancelledError
:
7456 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
7457 exc
= "Operation was cancelled"
7458 except Exception as e
:
7459 exc
= traceback
.format_exc()
7460 self
.logger
.critical(
7461 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7464 self
._write
_ns
_status
(
7467 current_operation
="IDLE",
7468 current_operation_id
=None,
7471 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
7472 nslcmop_operation_state
= "FAILED"
7474 nslcmop_operation_state
= "COMPLETED"
7475 db_nslcmop_update
["detailed-status"] = "Done"
7476 db_nsr_update
["detailed-status"] = "Done"
7478 self
._write
_op
_status
(
7482 operation_state
=nslcmop_operation_state
,
7483 other_update
=db_nslcmop_update
,
7485 if nslcmop_operation_state
:
7489 "nslcmop_id": nslcmop_id
,
7490 "operationState": nslcmop_operation_state
,
7492 await self
.msg
.aiowrite("ns", "migrated", msg
, loop
=self
.loop
)
7493 except Exception as e
:
7495 logging_text
+ "kafka_write notification Exception {}".format(e
)
7497 self
.logger
.debug(logging_text
+ "Exit")
7498 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_migrate")
7500 async def heal(self
, nsr_id
, nslcmop_id
):
7504 :param nsr_id: ns instance to heal
7505 :param nslcmop_id: operation to run
7509 # Try to lock HA task here
7510 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7511 if not task_is_locked_by_me
:
7514 logging_text
= "Task ns={} heal={} ".format(nsr_id
, nslcmop_id
)
7515 stage
= ["", "", ""]
7516 tasks_dict_info
= {}
7517 # ^ stage, step, VIM progress
7518 self
.logger
.debug(logging_text
+ "Enter")
7519 # get all needed from database
7521 db_nslcmop_update
= {}
7523 db_vnfrs
= {} # vnf's info indexed by _id
7525 old_operational_status
= ""
7526 old_config_status
= ""
7529 # wait for any previous tasks in process
7530 step
= "Waiting for previous operations to terminate"
7531 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7532 self
._write
_ns
_status
(
7535 current_operation
="HEALING",
7536 current_operation_id
=nslcmop_id
,
7539 step
= "Getting nslcmop from database"
7541 step
+ " after having waited for previous tasks to be completed"
7543 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7545 step
= "Getting nsr from database"
7546 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
7547 old_operational_status
= db_nsr
["operational-status"]
7548 old_config_status
= db_nsr
["config-status"]
7551 "_admin.deployed.RO.operational-status": "healing",
7553 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7555 step
= "Sending heal order to VIM"
7557 logging_text
=logging_text
,
7559 db_nslcmop
=db_nslcmop
,
7564 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
7565 self
.logger
.debug(logging_text
+ stage
[1])
7566 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7567 self
.fs
.sync(db_nsr
["nsd-id"])
7569 # read from db: vnfr's of this ns
7570 step
= "Getting vnfrs from db"
7571 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
7572 for vnfr
in db_vnfrs_list
:
7573 db_vnfrs
[vnfr
["_id"]] = vnfr
7574 self
.logger
.debug("ns.heal db_vnfrs={}".format(db_vnfrs
))
7576 # Check for each target VNF
7577 target_list
= db_nslcmop
.get("operationParams", {}).get("healVnfData", {})
7578 for target_vnf
in target_list
:
7579 # Find this VNF in the list from DB
7580 vnfr_id
= target_vnf
.get("vnfInstanceId", None)
7582 db_vnfr
= db_vnfrs
[vnfr_id
]
7583 vnfd_id
= db_vnfr
.get("vnfd-id")
7584 vnfd_ref
= db_vnfr
.get("vnfd-ref")
7585 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7586 base_folder
= vnfd
["_admin"]["storage"]
7591 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
7592 member_vnf_index
= db_vnfr
.get("member-vnf-index-ref")
7594 # Check each target VDU and deploy N2VC
7595 target_vdu_list
= target_vnf
.get("additionalParams", {}).get(
7598 if not target_vdu_list
:
7599 # Codigo nuevo para crear diccionario
7600 target_vdu_list
= []
7601 for existing_vdu
in db_vnfr
.get("vdur"):
7602 vdu_name
= existing_vdu
.get("vdu-name", None)
7603 vdu_index
= existing_vdu
.get("count-index", 0)
7604 vdu_run_day1
= target_vnf
.get("additionalParams", {}).get(
7607 vdu_to_be_healed
= {
7609 "count-index": vdu_index
,
7610 "run-day1": vdu_run_day1
,
7612 target_vdu_list
.append(vdu_to_be_healed
)
7613 for target_vdu
in target_vdu_list
:
7614 deploy_params_vdu
= target_vdu
7615 # Set run-day1 vnf level value if not vdu level value exists
7616 if not deploy_params_vdu
.get("run-day1") and target_vnf
[
7619 deploy_params_vdu
["run-day1"] = target_vnf
[
7622 vdu_name
= target_vdu
.get("vdu-id", None)
7623 # TODO: Get vdu_id from vdud.
7625 # For multi instance VDU count-index is mandatory
7626 # For single session VDU count-indes is 0
7627 vdu_index
= target_vdu
.get("count-index", 0)
7629 # n2vc_redesign STEP 3 to 6 Deploy N2VC
7630 stage
[1] = "Deploying Execution Environments."
7631 self
.logger
.debug(logging_text
+ stage
[1])
7633 # VNF Level charm. Normal case when proxy charms.
7634 # If target instance is management machine continue with actions: recreate EE for native charms or reinject juju key for proxy charms.
7635 descriptor_config
= get_configuration(vnfd
, vnfd_ref
)
7636 if descriptor_config
:
7637 # Continue if healed machine is management machine
7638 vnf_ip_address
= db_vnfr
.get("ip-address")
7639 target_instance
= None
7640 for instance
in db_vnfr
.get("vdur", None):
7642 instance
["vdu-name"] == vdu_name
7643 and instance
["count-index"] == vdu_index
7645 target_instance
= instance
7647 if vnf_ip_address
== target_instance
.get("ip-address"):
7649 logging_text
=logging_text
7650 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7651 member_vnf_index
, vdu_name
, vdu_index
7655 nslcmop_id
=nslcmop_id
,
7661 member_vnf_index
=member_vnf_index
,
7664 deploy_params
=deploy_params_vdu
,
7665 descriptor_config
=descriptor_config
,
7666 base_folder
=base_folder
,
7667 task_instantiation_info
=tasks_dict_info
,
7671 # VDU Level charm. Normal case with native charms.
7672 descriptor_config
= get_configuration(vnfd
, vdu_name
)
7673 if descriptor_config
:
7675 logging_text
=logging_text
7676 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7677 member_vnf_index
, vdu_name
, vdu_index
7681 nslcmop_id
=nslcmop_id
,
7687 member_vnf_index
=member_vnf_index
,
7688 vdu_index
=vdu_index
,
7690 deploy_params
=deploy_params_vdu
,
7691 descriptor_config
=descriptor_config
,
7692 base_folder
=base_folder
,
7693 task_instantiation_info
=tasks_dict_info
,
7698 ROclient
.ROClientException
,
7703 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7705 except asyncio
.CancelledError
:
7707 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7709 exc
= "Operation was cancelled"
7710 except Exception as e
:
7711 exc
= traceback
.format_exc()
7712 self
.logger
.critical(
7713 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7718 stage
[1] = "Waiting for healing pending tasks."
7719 self
.logger
.debug(logging_text
+ stage
[1])
7720 exc
= await self
._wait
_for
_tasks
(
7723 self
.timeout
.ns_deploy
,
7731 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7732 nslcmop_operation_state
= "FAILED"
7734 db_nsr_update
["operational-status"] = old_operational_status
7735 db_nsr_update
["config-status"] = old_config_status
7738 ] = "FAILED healing nslcmop={} {}: {}".format(nslcmop_id
, step
, exc
)
7739 for task
, task_name
in tasks_dict_info
.items():
7740 if not task
.done() or task
.cancelled() or task
.exception():
7741 if task_name
.startswith(self
.task_name_deploy_vca
):
7742 # A N2VC task is pending
7743 db_nsr_update
["config-status"] = "failed"
7745 # RO task is pending
7746 db_nsr_update
["operational-status"] = "failed"
7748 error_description_nslcmop
= None
7749 nslcmop_operation_state
= "COMPLETED"
7750 db_nslcmop_update
["detailed-status"] = "Done"
7751 db_nsr_update
["detailed-status"] = "Done"
7752 db_nsr_update
["operational-status"] = "running"
7753 db_nsr_update
["config-status"] = "configured"
7755 self
._write
_op
_status
(
7758 error_message
=error_description_nslcmop
,
7759 operation_state
=nslcmop_operation_state
,
7760 other_update
=db_nslcmop_update
,
7763 self
._write
_ns
_status
(
7766 current_operation
="IDLE",
7767 current_operation_id
=None,
7768 other_update
=db_nsr_update
,
7771 if nslcmop_operation_state
:
7775 "nslcmop_id": nslcmop_id
,
7776 "operationState": nslcmop_operation_state
,
7778 await self
.msg
.aiowrite("ns", "healed", msg
, loop
=self
.loop
)
7779 except Exception as e
:
7781 logging_text
+ "kafka_write notification Exception {}".format(e
)
7783 self
.logger
.debug(logging_text
+ "Exit")
7784 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_heal")
7795 :param logging_text: preffix text to use at logging
7796 :param nsr_id: nsr identity
7797 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
7798 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
7799 :return: None or exception
7802 def get_vim_account(vim_account_id
):
7804 if vim_account_id
in db_vims
:
7805 return db_vims
[vim_account_id
]
7806 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
7807 db_vims
[vim_account_id
] = db_vim
7812 ns_params
= db_nslcmop
.get("operationParams")
7813 if ns_params
and ns_params
.get("timeout_ns_heal"):
7814 timeout_ns_heal
= ns_params
["timeout_ns_heal"]
7816 timeout_ns_heal
= self
.timeout
.ns_heal
7820 nslcmop_id
= db_nslcmop
["_id"]
7822 "action_id": nslcmop_id
,
7824 self
.logger
.warning(
7825 "db_nslcmop={} and timeout_ns_heal={}".format(
7826 db_nslcmop
, timeout_ns_heal
7829 target
.update(db_nslcmop
.get("operationParams", {}))
7831 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
7832 desc
= await self
.RO
.recreate(nsr_id
, target
)
7833 self
.logger
.debug("RO return > {}".format(desc
))
7834 action_id
= desc
["action_id"]
7835 # waits for RO to complete because Reinjecting juju key at ro can find VM in state Deleted
7836 await self
._wait
_ng
_ro
(
7843 operation
="healing",
7848 "_admin.deployed.RO.operational-status": "running",
7849 "detailed-status": " ".join(stage
),
7851 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7852 self
._write
_op
_status
(nslcmop_id
, stage
)
7854 logging_text
+ "ns healed at RO. RO_id={}".format(action_id
)
7857 except Exception as e
:
7858 stage
[2] = "ERROR healing at VIM"
7859 # self.set_vnfr_at_error(db_vnfrs, str(e))
7861 "Error healing at VIM {}".format(e
),
7862 exc_info
=not isinstance(
7865 ROclient
.ROClientException
,
7891 task_instantiation_info
,
7894 # launch instantiate_N2VC in a asyncio task and register task object
7895 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
7896 # if not found, create one entry and update database
7897 # fill db_nsr._admin.deployed.VCA.<index>
7900 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
7904 get_charm_name
= False
7905 if "execution-environment-list" in descriptor_config
:
7906 ee_list
= descriptor_config
.get("execution-environment-list", [])
7907 elif "juju" in descriptor_config
:
7908 ee_list
= [descriptor_config
] # ns charms
7909 if "execution-environment-list" not in descriptor_config
:
7910 # charm name is only required for ns charms
7911 get_charm_name
= True
7912 else: # other types as script are not supported
7915 for ee_item
in ee_list
:
7918 + "_deploy_n2vc ee_item juju={}, helm={}".format(
7919 ee_item
.get("juju"), ee_item
.get("helm-chart")
7922 ee_descriptor_id
= ee_item
.get("id")
7923 if ee_item
.get("juju"):
7924 vca_name
= ee_item
["juju"].get("charm")
7926 charm_name
= self
.find_charm_name(db_nsr
, str(vca_name
))
7929 if ee_item
["juju"].get("charm") is not None
7932 if ee_item
["juju"].get("cloud") == "k8s":
7933 vca_type
= "k8s_proxy_charm"
7934 elif ee_item
["juju"].get("proxy") is False:
7935 vca_type
= "native_charm"
7936 elif ee_item
.get("helm-chart"):
7937 vca_name
= ee_item
["helm-chart"]
7938 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
7941 vca_type
= "helm-v3"
7944 logging_text
+ "skipping non juju neither charm configuration"
7949 for vca_index
, vca_deployed
in enumerate(
7950 db_nsr
["_admin"]["deployed"]["VCA"]
7952 if not vca_deployed
:
7955 vca_deployed
.get("member-vnf-index") == member_vnf_index
7956 and vca_deployed
.get("vdu_id") == vdu_id
7957 and vca_deployed
.get("kdu_name") == kdu_name
7958 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
7959 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
7963 # not found, create one.
7965 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
7968 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
7970 target
+= "/kdu/{}".format(kdu_name
)
7972 "target_element": target
,
7973 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
7974 "member-vnf-index": member_vnf_index
,
7976 "kdu_name": kdu_name
,
7977 "vdu_count_index": vdu_index
,
7978 "operational-status": "init", # TODO revise
7979 "detailed-status": "", # TODO revise
7980 "step": "initial-deploy", # TODO revise
7982 "vdu_name": vdu_name
,
7984 "ee_descriptor_id": ee_descriptor_id
,
7985 "charm_name": charm_name
,
7989 # create VCA and configurationStatus in db
7991 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
7992 "configurationStatus.{}".format(vca_index
): dict(),
7994 self
.update_db_2("nsrs", nsr_id
, db_dict
)
7996 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
7998 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
7999 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
8000 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
8003 task_n2vc
= asyncio
.ensure_future(
8005 logging_text
=logging_text
,
8006 vca_index
=vca_index
,
8012 vdu_index
=vdu_index
,
8013 deploy_params
=deploy_params
,
8014 config_descriptor
=descriptor_config
,
8015 base_folder
=base_folder
,
8016 nslcmop_id
=nslcmop_id
,
8020 ee_config_descriptor
=ee_item
,
8023 self
.lcm_tasks
.register(
8027 "instantiate_N2VC-{}".format(vca_index
),
8030 task_instantiation_info
[
8032 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
8033 member_vnf_index
or "", vdu_id
or ""
8036 async def heal_N2VC(
8053 ee_config_descriptor
,
8055 nsr_id
= db_nsr
["_id"]
8056 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
8057 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
8058 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
8059 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
8061 "collection": "nsrs",
8062 "filter": {"_id": nsr_id
},
8063 "path": db_update_entry
,
8069 element_under_configuration
= nsr_id
8073 vnfr_id
= db_vnfr
["_id"]
8074 osm_config
["osm"]["vnf_id"] = vnfr_id
8076 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
8078 if vca_type
== "native_charm":
8081 index_number
= vdu_index
or 0
8084 element_type
= "VNF"
8085 element_under_configuration
= vnfr_id
8086 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
8088 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
8089 element_type
= "VDU"
8090 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
8091 osm_config
["osm"]["vdu_id"] = vdu_id
8093 namespace
+= ".{}".format(kdu_name
)
8094 element_type
= "KDU"
8095 element_under_configuration
= kdu_name
8096 osm_config
["osm"]["kdu_name"] = kdu_name
8099 if base_folder
["pkg-dir"]:
8100 artifact_path
= "{}/{}/{}/{}".format(
8101 base_folder
["folder"],
8102 base_folder
["pkg-dir"],
8105 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8110 artifact_path
= "{}/Scripts/{}/{}/".format(
8111 base_folder
["folder"],
8114 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8119 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
8121 # get initial_config_primitive_list that applies to this element
8122 initial_config_primitive_list
= config_descriptor
.get(
8123 "initial-config-primitive"
8127 "Initial config primitive list > {}".format(
8128 initial_config_primitive_list
8132 # add config if not present for NS charm
8133 ee_descriptor_id
= ee_config_descriptor
.get("id")
8134 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
8135 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
8136 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
8140 "Initial config primitive list #2 > {}".format(
8141 initial_config_primitive_list
8144 # n2vc_redesign STEP 3.1
8145 # find old ee_id if exists
8146 ee_id
= vca_deployed
.get("ee_id")
8148 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
8149 # create or register execution environment in VCA. Only for native charms when healing
8150 if vca_type
== "native_charm":
8151 step
= "Waiting to VM being up and getting IP address"
8152 self
.logger
.debug(logging_text
+ step
)
8153 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8162 credentials
= {"hostname": rw_mgmt_ip
}
8164 username
= deep_get(
8165 config_descriptor
, ("config-access", "ssh-access", "default-user")
8167 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
8168 # merged. Meanwhile let's get username from initial-config-primitive
8169 if not username
and initial_config_primitive_list
:
8170 for config_primitive
in initial_config_primitive_list
:
8171 for param
in config_primitive
.get("parameter", ()):
8172 if param
["name"] == "ssh-username":
8173 username
= param
["value"]
8177 "Cannot determine the username neither with 'initial-config-primitive' nor with "
8178 "'config-access.ssh-access.default-user'"
8180 credentials
["username"] = username
8182 # n2vc_redesign STEP 3.2
8183 # TODO: Before healing at RO it is needed to destroy native charm units to be deleted.
8184 self
._write
_configuration
_status
(
8186 vca_index
=vca_index
,
8187 status
="REGISTERING",
8188 element_under_configuration
=element_under_configuration
,
8189 element_type
=element_type
,
8192 step
= "register execution environment {}".format(credentials
)
8193 self
.logger
.debug(logging_text
+ step
)
8194 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
8195 credentials
=credentials
,
8196 namespace
=namespace
,
8201 # update ee_id en db
8203 "_admin.deployed.VCA.{}.ee_id".format(vca_index
): ee_id
,
8205 self
.update_db_2("nsrs", nsr_id
, db_dict_ee_id
)
8207 # for compatibility with MON/POL modules, the need model and application name at database
8208 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
8209 # Not sure if this need to be done when healing
8211 ee_id_parts = ee_id.split(".")
8212 db_nsr_update = {db_update_entry + "ee_id": ee_id}
8213 if len(ee_id_parts) >= 2:
8214 model_name = ee_id_parts[0]
8215 application_name = ee_id_parts[1]
8216 db_nsr_update[db_update_entry + "model"] = model_name
8217 db_nsr_update[db_update_entry + "application"] = application_name
8220 # n2vc_redesign STEP 3.3
8221 # Install configuration software. Only for native charms.
8222 step
= "Install configuration Software"
8224 self
._write
_configuration
_status
(
8226 vca_index
=vca_index
,
8227 status
="INSTALLING SW",
8228 element_under_configuration
=element_under_configuration
,
8229 element_type
=element_type
,
8230 # other_update=db_nsr_update,
8234 # TODO check if already done
8235 self
.logger
.debug(logging_text
+ step
)
8237 if vca_type
== "native_charm":
8238 config_primitive
= next(
8239 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
8242 if config_primitive
:
8243 config
= self
._map
_primitive
_params
(
8244 config_primitive
, {}, deploy_params
8246 await self
.vca_map
[vca_type
].install_configuration_sw(
8248 artifact_path
=artifact_path
,
8256 # write in db flag of configuration_sw already installed
8258 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
8261 # Not sure if this need to be done when healing
8263 # add relations for this VCA (wait for other peers related with this VCA)
8264 await self._add_vca_relations(
8265 logging_text=logging_text,
8268 vca_index=vca_index,
8272 # if SSH access is required, then get execution environment SSH public
8273 # if native charm we have waited already to VM be UP
8274 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
8277 # self.logger.debug("get ssh key block")
8279 config_descriptor
, ("config-access", "ssh-access", "required")
8281 # self.logger.debug("ssh key needed")
8282 # Needed to inject a ssh key
8285 ("config-access", "ssh-access", "default-user"),
8287 step
= "Install configuration Software, getting public ssh key"
8288 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
8289 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
8292 step
= "Insert public key into VM user={} ssh_key={}".format(
8296 # self.logger.debug("no need to get ssh key")
8297 step
= "Waiting to VM being up and getting IP address"
8298 self
.logger
.debug(logging_text
+ step
)
8300 # n2vc_redesign STEP 5.1
8301 # wait for RO (ip-address) Insert pub_key into VM
8302 # IMPORTANT: We need do wait for RO to complete healing operation.
8303 await self
._wait
_heal
_ro
(nsr_id
, self
.timeout
.ns_heal
)
8306 rw_mgmt_ip
= await self
.wait_kdu_up(
8307 logging_text
, nsr_id
, vnfr_id
, kdu_name
8310 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8320 rw_mgmt_ip
= None # This is for a NS configuration
8322 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
8324 # store rw_mgmt_ip in deploy params for later replacement
8325 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
8328 # get run-day1 operation parameter
8329 runDay1
= deploy_params
.get("run-day1", False)
8331 "Healing vnf={}, vdu={}, runDay1 ={}".format(vnfr_id
, vdu_id
, runDay1
)
8334 # n2vc_redesign STEP 6 Execute initial config primitive
8335 step
= "execute initial config primitive"
8337 # wait for dependent primitives execution (NS -> VNF -> VDU)
8338 if initial_config_primitive_list
:
8339 await self
._wait
_dependent
_n
2vc
(
8340 nsr_id
, vca_deployed_list
, vca_index
8343 # stage, in function of element type: vdu, kdu, vnf or ns
8344 my_vca
= vca_deployed_list
[vca_index
]
8345 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
8347 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
8348 elif my_vca
.get("member-vnf-index"):
8350 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
8353 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
8355 self
._write
_configuration
_status
(
8356 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
8359 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
8361 check_if_terminated_needed
= True
8362 for initial_config_primitive
in initial_config_primitive_list
:
8363 # adding information on the vca_deployed if it is a NS execution environment
8364 if not vca_deployed
["member-vnf-index"]:
8365 deploy_params
["ns_config_info"] = json
.dumps(
8366 self
._get
_ns
_config
_info
(nsr_id
)
8368 # TODO check if already done
8369 primitive_params_
= self
._map
_primitive
_params
(
8370 initial_config_primitive
, {}, deploy_params
8373 step
= "execute primitive '{}' params '{}'".format(
8374 initial_config_primitive
["name"], primitive_params_
8376 self
.logger
.debug(logging_text
+ step
)
8377 await self
.vca_map
[vca_type
].exec_primitive(
8379 primitive_name
=initial_config_primitive
["name"],
8380 params_dict
=primitive_params_
,
8385 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
8386 if check_if_terminated_needed
:
8387 if config_descriptor
.get("terminate-config-primitive"):
8391 {db_update_entry
+ "needed_terminate": True},
8393 check_if_terminated_needed
= False
8395 # TODO register in database that primitive is done
8397 # STEP 7 Configure metrics
8398 # Not sure if this need to be done when healing
8400 if vca_type == "helm" or vca_type == "helm-v3":
8401 prometheus_jobs = await self.extract_prometheus_scrape_jobs(
8403 artifact_path=artifact_path,
8404 ee_config_descriptor=ee_config_descriptor,
8407 target_ip=rw_mgmt_ip,
8413 {db_update_entry + "prometheus_jobs": prometheus_jobs},
8416 for job in prometheus_jobs:
8419 {"job_name": job["job_name"]},
8422 fail_on_empty=False,
8426 step
= "instantiated at VCA"
8427 self
.logger
.debug(logging_text
+ step
)
8429 self
._write
_configuration
_status
(
8430 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
8433 except Exception as e
: # TODO not use Exception but N2VC exception
8434 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
8436 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
8439 "Exception while {} : {}".format(step
, e
), exc_info
=True
8441 self
._write
_configuration
_status
(
8442 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
8444 raise LcmException("{} {}".format(step
, e
)) from e
8446 async def _wait_heal_ro(
8452 while time() <= start_time
+ timeout
:
8453 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
8454 operational_status_ro
= db_nsr
["_admin"]["deployed"]["RO"][
8455 "operational-status"
8457 self
.logger
.debug("Wait Heal RO > {}".format(operational_status_ro
))
8458 if operational_status_ro
!= "healing":
8460 await asyncio
.sleep(15, loop
=self
.loop
)
8461 else: # timeout_ns_deploy
8462 raise NgRoException("Timeout waiting ns to deploy")
8464 async def vertical_scale(self
, nsr_id
, nslcmop_id
):
8466 Vertical Scale the VDUs in a NS
8468 :param: nsr_id: NS Instance ID
8469 :param: nslcmop_id: nslcmop ID of migrate
8472 # Try to lock HA task here
8473 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
8474 if not task_is_locked_by_me
:
8476 logging_text
= "Task ns={} vertical scale ".format(nsr_id
)
8477 self
.logger
.debug(logging_text
+ "Enter")
8478 # get all needed from database
8480 db_nslcmop_update
= {}
8481 nslcmop_operation_state
= None
8485 # in case of error, indicates what part of scale was failed to put nsr at error status
8486 start_deploy
= time()
8489 # wait for any previous tasks in process
8490 step
= "Waiting for previous operations to terminate"
8491 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
8493 self
._write
_ns
_status
(
8496 current_operation
="VerticalScale",
8497 current_operation_id
=nslcmop_id
,
8499 step
= "Getting nslcmop from database"
8501 step
+ " after having waited for previous tasks to be completed"
8503 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
8504 operationParams
= db_nslcmop
.get("operationParams")
8506 target
.update(operationParams
)
8507 desc
= await self
.RO
.vertical_scale(nsr_id
, target
)
8508 self
.logger
.debug("RO return > {}".format(desc
))
8509 action_id
= desc
["action_id"]
8510 await self
._wait
_ng
_ro
(
8515 self
.timeout
.verticalscale
,
8516 operation
="verticalscale",
8518 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
8519 self
.logger
.error("Exit Exception {}".format(e
))
8521 except asyncio
.CancelledError
:
8522 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
8523 exc
= "Operation was cancelled"
8524 except Exception as e
:
8525 exc
= traceback
.format_exc()
8526 self
.logger
.critical(
8527 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
8530 self
._write
_ns
_status
(
8533 current_operation
="IDLE",
8534 current_operation_id
=None,
8537 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
8538 nslcmop_operation_state
= "FAILED"
8540 nslcmop_operation_state
= "COMPLETED"
8541 db_nslcmop_update
["detailed-status"] = "Done"
8542 db_nsr_update
["detailed-status"] = "Done"
8544 self
._write
_op
_status
(
8548 operation_state
=nslcmop_operation_state
,
8549 other_update
=db_nslcmop_update
,
8551 if nslcmop_operation_state
:
8555 "nslcmop_id": nslcmop_id
,
8556 "operationState": nslcmop_operation_state
,
8558 await self
.msg
.aiowrite("ns", "verticalscaled", msg
, loop
=self
.loop
)
8559 except Exception as e
:
8561 logging_text
+ "kafka_write notification Exception {}".format(e
)
8563 self
.logger
.debug(logging_text
+ "Exit")
8564 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_verticalscale")