1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 from typing
import Any
, Dict
, List
24 import logging
.handlers
36 from osm_lcm
import ROclient
37 from osm_lcm
.data_utils
.lcm_config
import LcmCfg
38 from osm_lcm
.data_utils
.nsr
import (
41 get_deployed_vca_list
,
44 from osm_lcm
.data_utils
.vca
import (
53 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
54 from osm_lcm
.lcm_utils
import (
61 check_juju_bundle_existence
,
62 get_charm_artifact_path
,
65 from osm_lcm
.data_utils
.nsd
import (
66 get_ns_configuration_relation_list
,
70 from osm_lcm
.data_utils
.vnfd
import (
76 get_ee_sorted_initial_config_primitive_list
,
77 get_ee_sorted_terminate_config_primitive_list
,
79 get_virtual_link_profiles
,
84 get_number_of_instances
,
86 get_kdu_resource_profile
,
87 find_software_version
,
90 from osm_lcm
.data_utils
.list_utils
import find_in_list
91 from osm_lcm
.data_utils
.vnfr
import (
95 get_volumes_from_instantiation_params
,
97 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
98 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
99 from n2vc
.definitions
import RelationEndpoint
100 from n2vc
.k8s_helm_conn
import K8sHelmConnector
101 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
102 from n2vc
.k8s_juju_conn
import K8sJujuConnector
104 from osm_common
.dbbase
import DbException
105 from osm_common
.fsbase
import FsException
107 from osm_lcm
.data_utils
.database
.database
import Database
108 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
109 from osm_lcm
.data_utils
.wim
import (
111 get_target_wim_attrs
,
112 select_feasible_wim_account
,
115 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
116 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
118 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
119 from osm_lcm
.osm_config
import OsmConfigBuilder
120 from osm_lcm
.prometheus
import parse_job
122 from copy
import copy
, deepcopy
123 from time
import time
124 from uuid
import uuid4
126 from random
import randint
128 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
131 class NsLcm(LcmBase
):
132 SUBOPERATION_STATUS_NOT_FOUND
= -1
133 SUBOPERATION_STATUS_NEW
= -2
134 SUBOPERATION_STATUS_SKIP
= -3
135 task_name_deploy_vca
= "Deploying VCA"
137 def __init__(self
, msg
, lcm_tasks
, config
: LcmCfg
, loop
):
139 Init, Connect to database, filesystem storage, and messaging
140 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
143 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
145 self
.db
= Database().instance
.db
146 self
.fs
= Filesystem().instance
.fs
148 self
.lcm_tasks
= lcm_tasks
149 self
.timeout
= config
.timeout
150 self
.ro_config
= config
.RO
151 self
.vca_config
= config
.VCA
153 # create N2VC connector
154 self
.n2vc
= N2VCJujuConnector(
157 on_update_db
=self
._on
_update
_n
2vc
_db
,
162 self
.conn_helm_ee
= LCMHelmConn(
165 vca_config
=self
.vca_config
,
166 on_update_db
=self
._on
_update
_n
2vc
_db
,
169 self
.k8sclusterhelm2
= K8sHelmConnector(
170 kubectl_command
=self
.vca_config
.kubectlpath
,
171 helm_command
=self
.vca_config
.helmpath
,
178 self
.k8sclusterhelm3
= K8sHelm3Connector(
179 kubectl_command
=self
.vca_config
.kubectlpath
,
180 helm_command
=self
.vca_config
.helm3path
,
187 self
.k8sclusterjuju
= K8sJujuConnector(
188 kubectl_command
=self
.vca_config
.kubectlpath
,
189 juju_command
=self
.vca_config
.jujupath
,
192 on_update_db
=self
._on
_update
_k
8s
_db
,
197 self
.k8scluster_map
= {
198 "helm-chart": self
.k8sclusterhelm2
,
199 "helm-chart-v3": self
.k8sclusterhelm3
,
200 "chart": self
.k8sclusterhelm3
,
201 "juju-bundle": self
.k8sclusterjuju
,
202 "juju": self
.k8sclusterjuju
,
206 "lxc_proxy_charm": self
.n2vc
,
207 "native_charm": self
.n2vc
,
208 "k8s_proxy_charm": self
.n2vc
,
209 "helm": self
.conn_helm_ee
,
210 "helm-v3": self
.conn_helm_ee
,
214 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
.to_dict())
216 self
.op_status_map
= {
217 "instantiation": self
.RO
.status
,
218 "termination": self
.RO
.status
,
219 "migrate": self
.RO
.status
,
220 "healing": self
.RO
.recreate_status
,
221 "verticalscale": self
.RO
.status
,
222 "start_stop_rebuild": self
.RO
.status
,
226 def increment_ip_mac(ip_mac
, vm_index
=1):
227 if not isinstance(ip_mac
, str):
230 # try with ipv4 look for last dot
231 i
= ip_mac
.rfind(".")
234 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
235 # try with ipv6 or mac look for last colon. Operate in hex
236 i
= ip_mac
.rfind(":")
239 # format in hex, len can be 2 for mac or 4 for ipv6
240 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
241 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
247 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
249 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
252 # TODO filter RO descriptor fields...
256 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
257 db_dict
["deploymentStatus"] = ro_descriptor
258 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
260 except Exception as e
:
262 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id
, e
)
265 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
267 # remove last dot from path (if exists)
268 if path
.endswith("."):
271 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
272 # .format(table, filter, path, updated_data))
275 nsr_id
= filter.get("_id")
277 # read ns record from database
278 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
279 current_ns_status
= nsr
.get("nsState")
281 # get vca status for NS
282 status_dict
= await self
.n2vc
.get_status(
283 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
288 db_dict
["vcaStatus"] = status_dict
290 # update configurationStatus for this VCA
292 vca_index
= int(path
[path
.rfind(".") + 1 :])
295 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
297 vca_status
= vca_list
[vca_index
].get("status")
299 configuration_status_list
= nsr
.get("configurationStatus")
300 config_status
= configuration_status_list
[vca_index
].get("status")
302 if config_status
== "BROKEN" and vca_status
!= "failed":
303 db_dict
["configurationStatus"][vca_index
] = "READY"
304 elif config_status
!= "BROKEN" and vca_status
== "failed":
305 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
306 except Exception as e
:
307 # not update configurationStatus
308 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
310 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
311 # if nsState = 'DEGRADED' check if all is OK
313 if current_ns_status
in ("READY", "DEGRADED"):
314 error_description
= ""
316 if status_dict
.get("machines"):
317 for machine_id
in status_dict
.get("machines"):
318 machine
= status_dict
.get("machines").get(machine_id
)
319 # check machine agent-status
320 if machine
.get("agent-status"):
321 s
= machine
.get("agent-status").get("status")
324 error_description
+= (
325 "machine {} agent-status={} ; ".format(
329 # check machine instance status
330 if machine
.get("instance-status"):
331 s
= machine
.get("instance-status").get("status")
334 error_description
+= (
335 "machine {} instance-status={} ; ".format(
340 if status_dict
.get("applications"):
341 for app_id
in status_dict
.get("applications"):
342 app
= status_dict
.get("applications").get(app_id
)
343 # check application status
344 if app
.get("status"):
345 s
= app
.get("status").get("status")
348 error_description
+= (
349 "application {} status={} ; ".format(app_id
, s
)
352 if error_description
:
353 db_dict
["errorDescription"] = error_description
354 if current_ns_status
== "READY" and is_degraded
:
355 db_dict
["nsState"] = "DEGRADED"
356 if current_ns_status
== "DEGRADED" and not is_degraded
:
357 db_dict
["nsState"] = "READY"
360 self
.update_db_2("nsrs", nsr_id
, db_dict
)
362 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
364 except Exception as e
:
365 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
367 async def _on_update_k8s_db(
368 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None, cluster_type
="juju"
371 Updating vca status in NSR record
372 :param cluster_uuid: UUID of a k8s cluster
373 :param kdu_instance: The unique name of the KDU instance
374 :param filter: To get nsr_id
375 :cluster_type: The cluster type (juju, k8s)
379 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
380 # .format(cluster_uuid, kdu_instance, filter))
382 nsr_id
= filter.get("_id")
384 vca_status
= await self
.k8scluster_map
[cluster_type
].status_kdu(
385 cluster_uuid
=cluster_uuid
,
386 kdu_instance
=kdu_instance
,
388 complete_status
=True,
394 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
397 f
"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
401 self
.update_db_2("nsrs", nsr_id
, db_dict
)
402 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
404 except Exception as e
:
405 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
408 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
411 undefined
=StrictUndefined
,
412 autoescape
=select_autoescape(default_for_string
=True, default
=True),
414 template
= env
.from_string(cloud_init_text
)
415 return template
.render(additional_params
or {})
416 except UndefinedError
as e
:
418 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
419 "file, must be provided in the instantiation parameters inside the "
420 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
422 except (TemplateError
, TemplateNotFound
) as e
:
424 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
429 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
430 cloud_init_content
= cloud_init_file
= None
432 if vdu
.get("cloud-init-file"):
433 base_folder
= vnfd
["_admin"]["storage"]
434 if base_folder
["pkg-dir"]:
435 cloud_init_file
= "{}/{}/cloud_init/{}".format(
436 base_folder
["folder"],
437 base_folder
["pkg-dir"],
438 vdu
["cloud-init-file"],
441 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
442 base_folder
["folder"],
443 vdu
["cloud-init-file"],
445 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
446 cloud_init_content
= ci_file
.read()
447 elif vdu
.get("cloud-init"):
448 cloud_init_content
= vdu
["cloud-init"]
450 return cloud_init_content
451 except FsException
as e
:
453 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
454 vnfd
["id"], vdu
["id"], cloud_init_file
, e
458 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
460 (vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]), {}
462 additional_params
= vdur
.get("additionalParams")
463 return parse_yaml_strings(additional_params
)
465 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
467 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
468 :param vnfd: input vnfd
469 :param new_id: overrides vnf id if provided
470 :param additionalParams: Instantiation params for VNFs provided
471 :param nsrId: Id of the NSR
472 :return: copy of vnfd
474 vnfd_RO
= deepcopy(vnfd
)
475 # remove unused by RO configuration, monitoring, scaling and internal keys
476 vnfd_RO
.pop("_id", None)
477 vnfd_RO
.pop("_admin", None)
478 vnfd_RO
.pop("monitoring-param", None)
479 vnfd_RO
.pop("scaling-group-descriptor", None)
480 vnfd_RO
.pop("kdu", None)
481 vnfd_RO
.pop("k8s-cluster", None)
483 vnfd_RO
["id"] = new_id
485 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
486 for vdu
in get_iterable(vnfd_RO
, "vdu"):
487 vdu
.pop("cloud-init-file", None)
488 vdu
.pop("cloud-init", None)
492 def ip_profile_2_RO(ip_profile
):
493 RO_ip_profile
= deepcopy(ip_profile
)
494 if "dns-server" in RO_ip_profile
:
495 if isinstance(RO_ip_profile
["dns-server"], list):
496 RO_ip_profile
["dns-address"] = []
497 for ds
in RO_ip_profile
.pop("dns-server"):
498 RO_ip_profile
["dns-address"].append(ds
["address"])
500 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
501 if RO_ip_profile
.get("ip-version") == "ipv4":
502 RO_ip_profile
["ip-version"] = "IPv4"
503 if RO_ip_profile
.get("ip-version") == "ipv6":
504 RO_ip_profile
["ip-version"] = "IPv6"
505 if "dhcp-params" in RO_ip_profile
:
506 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
509 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
510 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
511 if db_vim
["_admin"]["operationalState"] != "ENABLED":
513 "VIM={} is not available. operationalState={}".format(
514 vim_account
, db_vim
["_admin"]["operationalState"]
517 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
520 def get_ro_wim_id_for_wim_account(self
, wim_account
):
521 if isinstance(wim_account
, str):
522 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
523 if db_wim
["_admin"]["operationalState"] != "ENABLED":
525 "WIM={} is not available. operationalState={}".format(
526 wim_account
, db_wim
["_admin"]["operationalState"]
529 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
534 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
536 db_vdu_push_list
= []
538 db_update
= {"_admin.modified": time()}
540 for vdu_id
, vdu_count
in vdu_create
.items():
544 for vdur
in reversed(db_vnfr
["vdur"])
545 if vdur
["vdu-id-ref"] == vdu_id
550 # Read the template saved in the db:
552 "No vdur in the database. Using the vdur-template to scale"
554 vdur_template
= db_vnfr
.get("vdur-template")
555 if not vdur_template
:
557 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
561 vdur
= vdur_template
[0]
562 # Delete a template from the database after using it
565 {"_id": db_vnfr
["_id"]},
567 pull
={"vdur-template": {"_id": vdur
["_id"]}},
569 for count
in range(vdu_count
):
570 vdur_copy
= deepcopy(vdur
)
571 vdur_copy
["status"] = "BUILD"
572 vdur_copy
["status-detailed"] = None
573 vdur_copy
["ip-address"] = None
574 vdur_copy
["_id"] = str(uuid4())
575 vdur_copy
["count-index"] += count
+ 1
576 vdur_copy
["id"] = "{}-{}".format(
577 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
579 vdur_copy
.pop("vim_info", None)
580 for iface
in vdur_copy
["interfaces"]:
581 if iface
.get("fixed-ip"):
582 iface
["ip-address"] = self
.increment_ip_mac(
583 iface
["ip-address"], count
+ 1
586 iface
.pop("ip-address", None)
587 if iface
.get("fixed-mac"):
588 iface
["mac-address"] = self
.increment_ip_mac(
589 iface
["mac-address"], count
+ 1
592 iface
.pop("mac-address", None)
596 ) # only first vdu can be managment of vnf
597 db_vdu_push_list
.append(vdur_copy
)
598 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
600 if len(db_vnfr
["vdur"]) == 1:
601 # The scale will move to 0 instances
603 "Scaling to 0 !, creating the template with the last vdur"
605 template_vdur
= [db_vnfr
["vdur"][0]]
606 for vdu_id
, vdu_count
in vdu_delete
.items():
608 indexes_to_delete
= [
610 for iv
in enumerate(db_vnfr
["vdur"])
611 if iv
[1]["vdu-id-ref"] == vdu_id
615 "vdur.{}.status".format(i
): "DELETING"
616 for i
in indexes_to_delete
[-vdu_count
:]
620 # it must be deleted one by one because common.db does not allow otherwise
623 for v
in reversed(db_vnfr
["vdur"])
624 if v
["vdu-id-ref"] == vdu_id
626 for vdu
in vdus_to_delete
[:vdu_count
]:
629 {"_id": db_vnfr
["_id"]},
631 pull
={"vdur": {"_id": vdu
["_id"]}},
635 db_push
["vdur"] = db_vdu_push_list
637 db_push
["vdur-template"] = template_vdur
640 db_vnfr
["vdur-template"] = template_vdur
641 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
642 # modify passed dictionary db_vnfr
643 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
644 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
646 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
648 Updates database nsr with the RO info for the created vld
649 :param ns_update_nsr: dictionary to be filled with the updated info
650 :param db_nsr: content of db_nsr. This is also modified
651 :param nsr_desc_RO: nsr descriptor from RO
652 :return: Nothing, LcmException is raised on errors
655 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
656 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
657 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
659 vld
["vim-id"] = net_RO
.get("vim_net_id")
660 vld
["name"] = net_RO
.get("vim_name")
661 vld
["status"] = net_RO
.get("status")
662 vld
["status-detailed"] = net_RO
.get("error_msg")
663 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
667 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
670 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
672 for db_vnfr
in db_vnfrs
.values():
673 vnfr_update
= {"status": "ERROR"}
674 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
675 if "status" not in vdur
:
676 vdur
["status"] = "ERROR"
677 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
679 vdur
["status-detailed"] = str(error_text
)
681 "vdur.{}.status-detailed".format(vdu_index
)
683 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
684 except DbException
as e
:
685 self
.logger
.error("Cannot update vnf. {}".format(e
))
687 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
689 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
690 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
691 :param nsr_desc_RO: nsr descriptor from RO
692 :return: Nothing, LcmException is raised on errors
694 for vnf_index
, db_vnfr
in db_vnfrs
.items():
695 for vnf_RO
in nsr_desc_RO
["vnfs"]:
696 if vnf_RO
["member_vnf_index"] != vnf_index
:
699 if vnf_RO
.get("ip_address"):
700 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
[
703 elif not db_vnfr
.get("ip-address"):
704 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
705 raise LcmExceptionNoMgmtIP(
706 "ns member_vnf_index '{}' has no IP address".format(
711 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
712 vdur_RO_count_index
= 0
713 if vdur
.get("pdu-type"):
715 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
716 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
718 if vdur
["count-index"] != vdur_RO_count_index
:
719 vdur_RO_count_index
+= 1
721 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
722 if vdur_RO
.get("ip_address"):
723 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
725 vdur
["ip-address"] = None
726 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
727 vdur
["name"] = vdur_RO
.get("vim_name")
728 vdur
["status"] = vdur_RO
.get("status")
729 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
730 for ifacer
in get_iterable(vdur
, "interfaces"):
731 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
732 if ifacer
["name"] == interface_RO
.get("internal_name"):
733 ifacer
["ip-address"] = interface_RO
.get(
736 ifacer
["mac-address"] = interface_RO
.get(
742 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
743 "from VIM info".format(
744 vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]
747 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
751 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
753 vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]
757 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
758 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
759 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
761 vld
["vim-id"] = net_RO
.get("vim_net_id")
762 vld
["name"] = net_RO
.get("vim_name")
763 vld
["status"] = net_RO
.get("status")
764 vld
["status-detailed"] = net_RO
.get("error_msg")
765 vnfr_update
["vld.{}".format(vld_index
)] = vld
769 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
774 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
779 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
784 def _get_ns_config_info(self
, nsr_id
):
786 Generates a mapping between vnf,vdu elements and the N2VC id
787 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
788 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
789 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
790 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
792 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
793 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
795 ns_config_info
= {"osm-config-mapping": mapping
}
796 for vca
in vca_deployed_list
:
797 if not vca
["member-vnf-index"]:
799 if not vca
["vdu_id"]:
800 mapping
[vca
["member-vnf-index"]] = vca
["application"]
804 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
806 ] = vca
["application"]
807 return ns_config_info
809 async def _instantiate_ng_ro(
826 def get_vim_account(vim_account_id
):
828 if vim_account_id
in db_vims
:
829 return db_vims
[vim_account_id
]
830 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
831 db_vims
[vim_account_id
] = db_vim
834 # modify target_vld info with instantiation parameters
835 def parse_vld_instantiation_params(
836 target_vim
, target_vld
, vld_params
, target_sdn
838 if vld_params
.get("ip-profile"):
839 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
[
842 if vld_params
.get("provider-network"):
843 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
846 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
847 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
851 # check if WIM is needed; if needed, choose a feasible WIM able to connect VIMs
852 # if wim_account_id is specified in vld_params, validate if it is feasible.
853 wim_account_id
, db_wim
= select_feasible_wim_account(
854 db_nsr
, db_vnfrs
, target_vld
, vld_params
, self
.logger
858 # WIM is needed and a feasible one was found, populate WIM target and SDN ports
859 self
.logger
.info("WIM selected: {:s}".format(str(wim_account_id
)))
860 # update vld_params with correct WIM account Id
861 vld_params
["wimAccountId"] = wim_account_id
863 target_wim
= "wim:{}".format(wim_account_id
)
864 target_wim_attrs
= get_target_wim_attrs(nsr_id
, target_vld
, vld_params
)
865 sdn_ports
= get_sdn_ports(vld_params
, db_wim
)
866 if len(sdn_ports
) > 0:
867 target_vld
["vim_info"][target_wim
] = target_wim_attrs
868 target_vld
["vim_info"][target_wim
]["sdn-ports"] = sdn_ports
871 "Target VLD with WIM data: {:s}".format(str(target_vld
))
874 for param
in ("vim-network-name", "vim-network-id"):
875 if vld_params
.get(param
):
876 if isinstance(vld_params
[param
], dict):
877 for vim
, vim_net
in vld_params
[param
].items():
878 other_target_vim
= "vim:" + vim
880 target_vld
["vim_info"],
881 (other_target_vim
, param
.replace("-", "_")),
884 else: # isinstance str
885 target_vld
["vim_info"][target_vim
][
886 param
.replace("-", "_")
887 ] = vld_params
[param
]
888 if vld_params
.get("common_id"):
889 target_vld
["common_id"] = vld_params
.get("common_id")
891 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
892 def update_ns_vld_target(target
, ns_params
):
893 for vnf_params
in ns_params
.get("vnf", ()):
894 if vnf_params
.get("vimAccountId"):
898 for vnfr
in db_vnfrs
.values()
899 if vnf_params
["member-vnf-index"]
900 == vnfr
["member-vnf-index-ref"]
904 vdur
= next((vdur
for vdur
in target_vnf
.get("vdur", ())), None)
907 for a_index
, a_vld
in enumerate(target
["ns"]["vld"]):
908 target_vld
= find_in_list(
909 get_iterable(vdur
, "interfaces"),
910 lambda iface
: iface
.get("ns-vld-id") == a_vld
["name"],
913 vld_params
= find_in_list(
914 get_iterable(ns_params
, "vld"),
915 lambda v_vld
: v_vld
["name"] in (a_vld
["name"], a_vld
["id"]),
919 if vnf_params
.get("vimAccountId") not in a_vld
.get(
922 target_vim_network_list
= [
923 v
for _
, v
in a_vld
.get("vim_info").items()
925 target_vim_network_name
= next(
927 item
.get("vim_network_name", "")
928 for item
in target_vim_network_list
933 target
["ns"]["vld"][a_index
].get("vim_info").update(
935 "vim:{}".format(vnf_params
["vimAccountId"]): {
936 "vim_network_name": target_vim_network_name
,
942 for param
in ("vim-network-name", "vim-network-id"):
943 if vld_params
.get(param
) and isinstance(
944 vld_params
[param
], dict
946 for vim
, vim_net
in vld_params
[
949 other_target_vim
= "vim:" + vim
951 target
["ns"]["vld"][a_index
].get(
956 param
.replace("-", "_"),
961 nslcmop_id
= db_nslcmop
["_id"]
963 "name": db_nsr
["name"],
966 "image": deepcopy(db_nsr
["image"]),
967 "flavor": deepcopy(db_nsr
["flavor"]),
968 "action_id": nslcmop_id
,
969 "cloud_init_content": {},
971 for image
in target
["image"]:
972 image
["vim_info"] = {}
973 for flavor
in target
["flavor"]:
974 flavor
["vim_info"] = {}
975 if db_nsr
.get("affinity-or-anti-affinity-group"):
976 target
["affinity-or-anti-affinity-group"] = deepcopy(
977 db_nsr
["affinity-or-anti-affinity-group"]
979 for affinity_or_anti_affinity_group
in target
[
980 "affinity-or-anti-affinity-group"
982 affinity_or_anti_affinity_group
["vim_info"] = {}
984 if db_nslcmop
.get("lcmOperationType") != "instantiate":
985 # get parameters of instantiation:
986 db_nslcmop_instantiate
= self
.db
.get_list(
989 "nsInstanceId": db_nslcmop
["nsInstanceId"],
990 "lcmOperationType": "instantiate",
993 ns_params
= db_nslcmop_instantiate
.get("operationParams")
995 ns_params
= db_nslcmop
.get("operationParams")
996 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
997 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
1000 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
1001 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
1004 "name": vld
["name"],
1005 "mgmt-network": vld
.get("mgmt-network", False),
1006 "type": vld
.get("type"),
1009 "vim_network_name": vld
.get("vim-network-name"),
1010 "vim_account_id": ns_params
["vimAccountId"],
1014 # check if this network needs SDN assist
1015 if vld
.get("pci-interfaces"):
1016 db_vim
= get_vim_account(ns_params
["vimAccountId"])
1017 if vim_config
:= db_vim
.get("config"):
1018 if sdnc_id
:= vim_config
.get("sdn-controller"):
1019 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
1020 target_sdn
= "sdn:{}".format(sdnc_id
)
1021 target_vld
["vim_info"][target_sdn
] = {
1023 "target_vim": target_vim
,
1025 "type": vld
.get("type"),
1028 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
1029 for nsd_vnf_profile
in nsd_vnf_profiles
:
1030 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
1031 if cp
["virtual-link-profile-id"] == vld
["id"]:
1033 "member_vnf:{}.{}".format(
1034 cp
["constituent-cpd-id"][0][
1035 "constituent-base-element-id"
1037 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
1039 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
1041 # check at nsd descriptor, if there is an ip-profile
1043 nsd_vlp
= find_in_list(
1044 get_virtual_link_profiles(nsd
),
1045 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
1050 and nsd_vlp
.get("virtual-link-protocol-data")
1051 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1053 ip_profile_source_data
= nsd_vlp
["virtual-link-protocol-data"][
1056 ip_profile_dest_data
= {}
1057 if "ip-version" in ip_profile_source_data
:
1058 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1061 if "cidr" in ip_profile_source_data
:
1062 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1065 if "gateway-ip" in ip_profile_source_data
:
1066 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
[
1069 if "dhcp-enabled" in ip_profile_source_data
:
1070 ip_profile_dest_data
["dhcp-params"] = {
1071 "enabled": ip_profile_source_data
["dhcp-enabled"]
1073 vld_params
["ip-profile"] = ip_profile_dest_data
1075 # update vld_params with instantiation params
1076 vld_instantiation_params
= find_in_list(
1077 get_iterable(ns_params
, "vld"),
1078 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
1080 if vld_instantiation_params
:
1081 vld_params
.update(vld_instantiation_params
)
1082 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
1083 target
["ns"]["vld"].append(target_vld
)
1084 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1085 update_ns_vld_target(target
, ns_params
)
1087 for vnfr
in db_vnfrs
.values():
1088 vnfd
= find_in_list(
1089 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
1091 vnf_params
= find_in_list(
1092 get_iterable(ns_params
, "vnf"),
1093 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
1095 target_vnf
= deepcopy(vnfr
)
1096 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
1097 for vld
in target_vnf
.get("vld", ()):
1098 # check if connected to a ns.vld, to fill target'
1099 vnf_cp
= find_in_list(
1100 vnfd
.get("int-virtual-link-desc", ()),
1101 lambda cpd
: cpd
.get("id") == vld
["id"],
1104 ns_cp
= "member_vnf:{}.{}".format(
1105 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
1107 if cp2target
.get(ns_cp
):
1108 vld
["target"] = cp2target
[ns_cp
]
1111 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
1113 # check if this network needs SDN assist
1115 if vld
.get("pci-interfaces"):
1116 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1117 sdnc_id
= db_vim
["config"].get("sdn-controller")
1119 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
1120 target_sdn
= "sdn:{}".format(sdnc_id
)
1121 vld
["vim_info"][target_sdn
] = {
1123 "target_vim": target_vim
,
1125 "type": vld
.get("type"),
1128 # check at vnfd descriptor, if there is an ip-profile
1130 vnfd_vlp
= find_in_list(
1131 get_virtual_link_profiles(vnfd
),
1132 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
1136 and vnfd_vlp
.get("virtual-link-protocol-data")
1137 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1139 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"][
1142 ip_profile_dest_data
= {}
1143 if "ip-version" in ip_profile_source_data
:
1144 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1147 if "cidr" in ip_profile_source_data
:
1148 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1151 if "gateway-ip" in ip_profile_source_data
:
1152 ip_profile_dest_data
[
1154 ] = ip_profile_source_data
["gateway-ip"]
1155 if "dhcp-enabled" in ip_profile_source_data
:
1156 ip_profile_dest_data
["dhcp-params"] = {
1157 "enabled": ip_profile_source_data
["dhcp-enabled"]
1160 vld_params
["ip-profile"] = ip_profile_dest_data
1161 # update vld_params with instantiation params
1163 vld_instantiation_params
= find_in_list(
1164 get_iterable(vnf_params
, "internal-vld"),
1165 lambda i_vld
: i_vld
["name"] == vld
["id"],
1167 if vld_instantiation_params
:
1168 vld_params
.update(vld_instantiation_params
)
1169 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
1172 for vdur
in target_vnf
.get("vdur", ()):
1173 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
1174 continue # This vdu must not be created
1175 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
1177 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
1180 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
1181 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
1184 and vdu_configuration
.get("config-access")
1185 and vdu_configuration
.get("config-access").get("ssh-access")
1187 vdur
["ssh-keys"] = ssh_keys_all
1188 vdur
["ssh-access-required"] = vdu_configuration
[
1190 ]["ssh-access"]["required"]
1193 and vnf_configuration
.get("config-access")
1194 and vnf_configuration
.get("config-access").get("ssh-access")
1195 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
1197 vdur
["ssh-keys"] = ssh_keys_all
1198 vdur
["ssh-access-required"] = vnf_configuration
[
1200 ]["ssh-access"]["required"]
1201 elif ssh_keys_instantiation
and find_in_list(
1202 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
1204 vdur
["ssh-keys"] = ssh_keys_instantiation
1206 self
.logger
.debug("NS > vdur > {}".format(vdur
))
1208 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1210 if vdud
.get("cloud-init-file"):
1211 vdur
["cloud-init"] = "{}:file:{}".format(
1212 vnfd
["_id"], vdud
.get("cloud-init-file")
1214 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1215 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1216 base_folder
= vnfd
["_admin"]["storage"]
1217 if base_folder
["pkg-dir"]:
1218 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1219 base_folder
["folder"],
1220 base_folder
["pkg-dir"],
1221 vdud
.get("cloud-init-file"),
1224 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
1225 base_folder
["folder"],
1226 vdud
.get("cloud-init-file"),
1228 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1229 target
["cloud_init_content"][
1232 elif vdud
.get("cloud-init"):
1233 vdur
["cloud-init"] = "{}:vdu:{}".format(
1234 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1236 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1237 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1240 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1241 deploy_params_vdu
= self
._format
_additional
_params
(
1242 vdur
.get("additionalParams") or {}
1244 deploy_params_vdu
["OSM"] = get_osm_params(
1245 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1247 vdur
["additionalParams"] = deploy_params_vdu
1250 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1251 if target_vim
not in ns_flavor
["vim_info"]:
1252 ns_flavor
["vim_info"][target_vim
] = {}
1255 # in case alternative images are provided we must check if they should be applied
1256 # for the vim_type, modify the vim_type taking into account
1257 ns_image_id
= int(vdur
["ns-image-id"])
1258 if vdur
.get("alt-image-ids"):
1259 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1260 vim_type
= db_vim
["vim_type"]
1261 for alt_image_id
in vdur
.get("alt-image-ids"):
1262 ns_alt_image
= target
["image"][int(alt_image_id
)]
1263 if vim_type
== ns_alt_image
.get("vim-type"):
1264 # must use alternative image
1266 "use alternative image id: {}".format(alt_image_id
)
1268 ns_image_id
= alt_image_id
1269 vdur
["ns-image-id"] = ns_image_id
1271 ns_image
= target
["image"][int(ns_image_id
)]
1272 if target_vim
not in ns_image
["vim_info"]:
1273 ns_image
["vim_info"][target_vim
] = {}
1276 if vdur
.get("affinity-or-anti-affinity-group-id"):
1277 for ags_id
in vdur
["affinity-or-anti-affinity-group-id"]:
1278 ns_ags
= target
["affinity-or-anti-affinity-group"][int(ags_id
)]
1279 if target_vim
not in ns_ags
["vim_info"]:
1280 ns_ags
["vim_info"][target_vim
] = {}
1282 vdur
["vim_info"] = {target_vim
: {}}
1283 # instantiation parameters
1285 vdu_instantiation_params
= find_in_list(
1286 get_iterable(vnf_params
, "vdu"),
1287 lambda i_vdu
: i_vdu
["id"] == vdud
["id"],
1289 if vdu_instantiation_params
:
1290 # Parse the vdu_volumes from the instantiation params
1291 vdu_volumes
= get_volumes_from_instantiation_params(
1292 vdu_instantiation_params
, vdud
1294 vdur
["additionalParams"]["OSM"]["vdu_volumes"] = vdu_volumes
1295 vdur_list
.append(vdur
)
1296 target_vnf
["vdur"] = vdur_list
1297 target
["vnf"].append(target_vnf
)
1299 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
1300 desc
= await self
.RO
.deploy(nsr_id
, target
)
1301 self
.logger
.debug("RO return > {}".format(desc
))
1302 action_id
= desc
["action_id"]
1303 await self
._wait
_ng
_ro
(
1310 operation
="instantiation",
1315 "_admin.deployed.RO.operational-status": "running",
1316 "detailed-status": " ".join(stage
),
1318 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1319 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1320 self
._write
_op
_status
(nslcmop_id
, stage
)
1322 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1326 async def _wait_ng_ro(
1336 detailed_status_old
= None
1338 start_time
= start_time
or time()
1339 while time() <= start_time
+ timeout
:
1340 desc_status
= await self
.op_status_map
[operation
](nsr_id
, action_id
)
1341 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1342 if desc_status
["status"] == "FAILED":
1343 raise NgRoException(desc_status
["details"])
1344 elif desc_status
["status"] == "BUILD":
1346 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1347 elif desc_status
["status"] == "DONE":
1349 stage
[2] = "Deployed at VIM"
1352 assert False, "ROclient.check_ns_status returns unknown {}".format(
1353 desc_status
["status"]
1355 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1356 detailed_status_old
= stage
[2]
1357 db_nsr_update
["detailed-status"] = " ".join(stage
)
1358 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1359 self
._write
_op
_status
(nslcmop_id
, stage
)
1360 await asyncio
.sleep(15, loop
=self
.loop
)
1361 else: # timeout_ns_deploy
1362 raise NgRoException("Timeout waiting ns to deploy")
1364 async def _terminate_ng_ro(
1365 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1370 start_deploy
= time()
1377 "action_id": nslcmop_id
,
1379 desc
= await self
.RO
.deploy(nsr_id
, target
)
1380 action_id
= desc
["action_id"]
1381 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1384 + "ns terminate action at RO. action_id={}".format(action_id
)
1388 delete_timeout
= 20 * 60 # 20 minutes
1389 await self
._wait
_ng
_ro
(
1396 operation
="termination",
1398 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1400 await self
.RO
.delete(nsr_id
)
1401 except NgRoException
as e
:
1402 if e
.http_code
== 404: # not found
1403 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1404 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1406 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1408 elif e
.http_code
== 409: # conflict
1409 failed_detail
.append("delete conflict: {}".format(e
))
1412 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1415 failed_detail
.append("delete error: {}".format(e
))
1418 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1420 except Exception as e
:
1421 failed_detail
.append("delete error: {}".format(e
))
1423 logging_text
+ "RO_action_id={} delete error: {}".format(action_id
, e
)
1427 stage
[2] = "Error deleting from VIM"
1429 stage
[2] = "Deleted from VIM"
1430 db_nsr_update
["detailed-status"] = " ".join(stage
)
1431 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1432 self
._write
_op
_status
(nslcmop_id
, stage
)
1435 raise LcmException("; ".join(failed_detail
))
1438 async def instantiate_RO(
1452 :param logging_text: preffix text to use at logging
1453 :param nsr_id: nsr identity
1454 :param nsd: database content of ns descriptor
1455 :param db_nsr: database content of ns record
1456 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1458 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1459 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1460 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1461 :return: None or exception
1464 start_deploy
= time()
1465 ns_params
= db_nslcmop
.get("operationParams")
1466 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1467 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1469 timeout_ns_deploy
= self
.timeout
.ns_deploy
1471 # Check for and optionally request placement optimization. Database will be updated if placement activated
1472 stage
[2] = "Waiting for Placement."
1473 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1474 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1475 for vnfr
in db_vnfrs
.values():
1476 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1479 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1481 return await self
._instantiate
_ng
_ro
(
1494 except Exception as e
:
1495 stage
[2] = "ERROR deploying at VIM"
1496 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1498 "Error deploying at VIM {}".format(e
),
1499 exc_info
=not isinstance(
1502 ROclient
.ROClientException
,
1511 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1513 Wait for kdu to be up, get ip address
1514 :param logging_text: prefix use for logging
1518 :return: IP address, K8s services
1521 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1524 while nb_tries
< 360:
1525 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1529 for x
in get_iterable(db_vnfr
, "kdur")
1530 if x
.get("kdu-name") == kdu_name
1536 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1538 if kdur
.get("status"):
1539 if kdur
["status"] in ("READY", "ENABLED"):
1540 return kdur
.get("ip-address"), kdur
.get("services")
1543 "target KDU={} is in error state".format(kdu_name
)
1546 await asyncio
.sleep(10, loop
=self
.loop
)
1548 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1550 async def wait_vm_up_insert_key_ro(
1551 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1554 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1555 :param logging_text: prefix use for logging
1560 :param pub_key: public ssh key to inject, None to skip
1561 :param user: user to apply the public ssh key
1565 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1567 target_vdu_id
= None
1573 if ro_retries
>= 360: # 1 hour
1575 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1578 await asyncio
.sleep(10, loop
=self
.loop
)
1581 if not target_vdu_id
:
1582 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1584 if not vdu_id
: # for the VNF case
1585 if db_vnfr
.get("status") == "ERROR":
1587 "Cannot inject ssh-key because target VNF is in error state"
1589 ip_address
= db_vnfr
.get("ip-address")
1595 for x
in get_iterable(db_vnfr
, "vdur")
1596 if x
.get("ip-address") == ip_address
1604 for x
in get_iterable(db_vnfr
, "vdur")
1605 if x
.get("vdu-id-ref") == vdu_id
1606 and x
.get("count-index") == vdu_index
1612 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1613 ): # If only one, this should be the target vdu
1614 vdur
= db_vnfr
["vdur"][0]
1617 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1618 vnfr_id
, vdu_id
, vdu_index
1621 # New generation RO stores information at "vim_info"
1624 if vdur
.get("vim_info"):
1626 t
for t
in vdur
["vim_info"]
1627 ) # there should be only one key
1628 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1630 vdur
.get("pdu-type")
1631 or vdur
.get("status") == "ACTIVE"
1632 or ng_ro_status
== "ACTIVE"
1634 ip_address
= vdur
.get("ip-address")
1637 target_vdu_id
= vdur
["vdu-id-ref"]
1638 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1640 "Cannot inject ssh-key because target VM is in error state"
1643 if not target_vdu_id
:
1646 # inject public key into machine
1647 if pub_key
and user
:
1648 self
.logger
.debug(logging_text
+ "Inserting RO key")
1649 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1650 if vdur
.get("pdu-type"):
1651 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1656 "action": "inject_ssh_key",
1660 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1662 desc
= await self
.RO
.deploy(nsr_id
, target
)
1663 action_id
= desc
["action_id"]
1664 await self
._wait
_ng
_ro
(
1665 nsr_id
, action_id
, timeout
=600, operation
="instantiation"
1668 except NgRoException
as e
:
1670 "Reaching max tries injecting key. Error: {}".format(e
)
1677 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1679 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1681 my_vca
= vca_deployed_list
[vca_index
]
1682 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1683 # vdu or kdu: no dependencies
1687 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1688 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1689 configuration_status_list
= db_nsr
["configurationStatus"]
1690 for index
, vca_deployed
in enumerate(configuration_status_list
):
1691 if index
== vca_index
:
1694 if not my_vca
.get("member-vnf-index") or (
1695 vca_deployed
.get("member-vnf-index")
1696 == my_vca
.get("member-vnf-index")
1698 internal_status
= configuration_status_list
[index
].get("status")
1699 if internal_status
== "READY":
1701 elif internal_status
== "BROKEN":
1703 "Configuration aborted because dependent charm/s has failed"
1708 # no dependencies, return
1710 await asyncio
.sleep(10)
1713 raise LcmException("Configuration aborted because dependent charm/s timeout")
1715 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1718 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1720 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1721 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1724 async def instantiate_N2VC(
1742 ee_config_descriptor
,
1744 nsr_id
= db_nsr
["_id"]
1745 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1746 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1747 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1748 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1750 "collection": "nsrs",
1751 "filter": {"_id": nsr_id
},
1752 "path": db_update_entry
,
1758 element_under_configuration
= nsr_id
1762 vnfr_id
= db_vnfr
["_id"]
1763 osm_config
["osm"]["vnf_id"] = vnfr_id
1765 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1767 if vca_type
== "native_charm":
1770 index_number
= vdu_index
or 0
1773 element_type
= "VNF"
1774 element_under_configuration
= vnfr_id
1775 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1777 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1778 element_type
= "VDU"
1779 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1780 osm_config
["osm"]["vdu_id"] = vdu_id
1782 namespace
+= ".{}".format(kdu_name
)
1783 element_type
= "KDU"
1784 element_under_configuration
= kdu_name
1785 osm_config
["osm"]["kdu_name"] = kdu_name
1788 if base_folder
["pkg-dir"]:
1789 artifact_path
= "{}/{}/{}/{}".format(
1790 base_folder
["folder"],
1791 base_folder
["pkg-dir"],
1794 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1799 artifact_path
= "{}/Scripts/{}/{}/".format(
1800 base_folder
["folder"],
1803 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1808 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1810 # get initial_config_primitive_list that applies to this element
1811 initial_config_primitive_list
= config_descriptor
.get(
1812 "initial-config-primitive"
1816 "Initial config primitive list > {}".format(
1817 initial_config_primitive_list
1821 # add config if not present for NS charm
1822 ee_descriptor_id
= ee_config_descriptor
.get("id")
1823 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1824 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1825 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1829 "Initial config primitive list #2 > {}".format(
1830 initial_config_primitive_list
1833 # n2vc_redesign STEP 3.1
1834 # find old ee_id if exists
1835 ee_id
= vca_deployed
.get("ee_id")
1837 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1838 # create or register execution environment in VCA
1839 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1841 self
._write
_configuration
_status
(
1843 vca_index
=vca_index
,
1845 element_under_configuration
=element_under_configuration
,
1846 element_type
=element_type
,
1849 step
= "create execution environment"
1850 self
.logger
.debug(logging_text
+ step
)
1854 if vca_type
== "k8s_proxy_charm":
1855 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1856 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1857 namespace
=namespace
,
1858 artifact_path
=artifact_path
,
1862 elif vca_type
== "helm" or vca_type
== "helm-v3":
1863 ee_id
, credentials
= await self
.vca_map
[
1865 ].create_execution_environment(
1866 namespace
=namespace
,
1870 artifact_path
=artifact_path
,
1871 chart_model
=vca_name
,
1875 ee_id
, credentials
= await self
.vca_map
[
1877 ].create_execution_environment(
1878 namespace
=namespace
,
1884 elif vca_type
== "native_charm":
1885 step
= "Waiting to VM being up and getting IP address"
1886 self
.logger
.debug(logging_text
+ step
)
1887 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1896 credentials
= {"hostname": rw_mgmt_ip
}
1898 username
= deep_get(
1899 config_descriptor
, ("config-access", "ssh-access", "default-user")
1901 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1902 # merged. Meanwhile let's get username from initial-config-primitive
1903 if not username
and initial_config_primitive_list
:
1904 for config_primitive
in initial_config_primitive_list
:
1905 for param
in config_primitive
.get("parameter", ()):
1906 if param
["name"] == "ssh-username":
1907 username
= param
["value"]
1911 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1912 "'config-access.ssh-access.default-user'"
1914 credentials
["username"] = username
1915 # n2vc_redesign STEP 3.2
1917 self
._write
_configuration
_status
(
1919 vca_index
=vca_index
,
1920 status
="REGISTERING",
1921 element_under_configuration
=element_under_configuration
,
1922 element_type
=element_type
,
1925 step
= "register execution environment {}".format(credentials
)
1926 self
.logger
.debug(logging_text
+ step
)
1927 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1928 credentials
=credentials
,
1929 namespace
=namespace
,
1934 # for compatibility with MON/POL modules, the need model and application name at database
1935 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1936 ee_id_parts
= ee_id
.split(".")
1937 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1938 if len(ee_id_parts
) >= 2:
1939 model_name
= ee_id_parts
[0]
1940 application_name
= ee_id_parts
[1]
1941 db_nsr_update
[db_update_entry
+ "model"] = model_name
1942 db_nsr_update
[db_update_entry
+ "application"] = application_name
1944 # n2vc_redesign STEP 3.3
1945 step
= "Install configuration Software"
1947 self
._write
_configuration
_status
(
1949 vca_index
=vca_index
,
1950 status
="INSTALLING SW",
1951 element_under_configuration
=element_under_configuration
,
1952 element_type
=element_type
,
1953 other_update
=db_nsr_update
,
1956 # TODO check if already done
1957 self
.logger
.debug(logging_text
+ step
)
1959 if vca_type
== "native_charm":
1960 config_primitive
= next(
1961 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
1964 if config_primitive
:
1965 config
= self
._map
_primitive
_params
(
1966 config_primitive
, {}, deploy_params
1969 if vca_type
== "lxc_proxy_charm":
1970 if element_type
== "NS":
1971 num_units
= db_nsr
.get("config-units") or 1
1972 elif element_type
== "VNF":
1973 num_units
= db_vnfr
.get("config-units") or 1
1974 elif element_type
== "VDU":
1975 for v
in db_vnfr
["vdur"]:
1976 if vdu_id
== v
["vdu-id-ref"]:
1977 num_units
= v
.get("config-units") or 1
1979 if vca_type
!= "k8s_proxy_charm":
1980 await self
.vca_map
[vca_type
].install_configuration_sw(
1982 artifact_path
=artifact_path
,
1985 num_units
=num_units
,
1990 # write in db flag of configuration_sw already installed
1992 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
1995 # add relations for this VCA (wait for other peers related with this VCA)
1996 is_relation_added
= await self
._add
_vca
_relations
(
1997 logging_text
=logging_text
,
2000 vca_index
=vca_index
,
2003 if not is_relation_added
:
2004 raise LcmException("Relations could not be added to VCA.")
2006 # if SSH access is required, then get execution environment SSH public
2007 # if native charm we have waited already to VM be UP
2008 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
2011 # self.logger.debug("get ssh key block")
2013 config_descriptor
, ("config-access", "ssh-access", "required")
2015 # self.logger.debug("ssh key needed")
2016 # Needed to inject a ssh key
2019 ("config-access", "ssh-access", "default-user"),
2021 step
= "Install configuration Software, getting public ssh key"
2022 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
2023 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
2026 step
= "Insert public key into VM user={} ssh_key={}".format(
2030 # self.logger.debug("no need to get ssh key")
2031 step
= "Waiting to VM being up and getting IP address"
2032 self
.logger
.debug(logging_text
+ step
)
2034 # default rw_mgmt_ip to None, avoiding the non definition of the variable
2037 # n2vc_redesign STEP 5.1
2038 # wait for RO (ip-address) Insert pub_key into VM
2041 rw_mgmt_ip
, services
= await self
.wait_kdu_up(
2042 logging_text
, nsr_id
, vnfr_id
, kdu_name
2044 vnfd
= self
.db
.get_one(
2046 {"_id": f
'{db_vnfr["vnfd-id"]}:{db_vnfr["revision"]}'},
2048 kdu
= get_kdu(vnfd
, kdu_name
)
2050 service
["name"] for service
in get_kdu_services(kdu
)
2052 exposed_services
= []
2053 for service
in services
:
2054 if any(s
in service
["name"] for s
in kdu_services
):
2055 exposed_services
.append(service
)
2056 await self
.vca_map
[vca_type
].exec_primitive(
2058 primitive_name
="config",
2060 "osm-config": json
.dumps(
2062 k8s
={"services": exposed_services
}
2069 # This verification is needed in order to avoid trying to add a public key
2070 # to a VM, when the VNF is a KNF (in the edge case where the user creates a VCA
2071 # for a KNF and not for its KDUs, the previous verification gives False, and the code
2072 # jumps to this block, meaning that there is the need to verify if the VNF is actually a VNF
2074 elif db_vnfr
.get("vdur"):
2075 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
2085 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
2087 # store rw_mgmt_ip in deploy params for later replacement
2088 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
2090 # n2vc_redesign STEP 6 Execute initial config primitive
2091 step
= "execute initial config primitive"
2093 # wait for dependent primitives execution (NS -> VNF -> VDU)
2094 if initial_config_primitive_list
:
2095 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
2097 # stage, in function of element type: vdu, kdu, vnf or ns
2098 my_vca
= vca_deployed_list
[vca_index
]
2099 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
2101 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
2102 elif my_vca
.get("member-vnf-index"):
2104 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
2107 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
2109 self
._write
_configuration
_status
(
2110 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
2113 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2115 check_if_terminated_needed
= True
2116 for initial_config_primitive
in initial_config_primitive_list
:
2117 # adding information on the vca_deployed if it is a NS execution environment
2118 if not vca_deployed
["member-vnf-index"]:
2119 deploy_params
["ns_config_info"] = json
.dumps(
2120 self
._get
_ns
_config
_info
(nsr_id
)
2122 # TODO check if already done
2123 primitive_params_
= self
._map
_primitive
_params
(
2124 initial_config_primitive
, {}, deploy_params
2127 step
= "execute primitive '{}' params '{}'".format(
2128 initial_config_primitive
["name"], primitive_params_
2130 self
.logger
.debug(logging_text
+ step
)
2131 await self
.vca_map
[vca_type
].exec_primitive(
2133 primitive_name
=initial_config_primitive
["name"],
2134 params_dict
=primitive_params_
,
2139 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2140 if check_if_terminated_needed
:
2141 if config_descriptor
.get("terminate-config-primitive"):
2143 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
2145 check_if_terminated_needed
= False
2147 # TODO register in database that primitive is done
2149 # STEP 7 Configure metrics
2150 if vca_type
== "helm" or vca_type
== "helm-v3":
2151 # TODO: review for those cases where the helm chart is a reference and
2152 # is not part of the NF package
2153 prometheus_jobs
= await self
.extract_prometheus_scrape_jobs(
2155 artifact_path
=artifact_path
,
2156 ee_config_descriptor
=ee_config_descriptor
,
2159 target_ip
=rw_mgmt_ip
,
2160 vnf_member_index
=db_vnfr
.get("member-vnf-index-ref", ""),
2162 vdu_index
=vdu_index
,
2164 kdu_index
=kdu_index
,
2170 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
2173 for job
in prometheus_jobs
:
2176 {"job_name": job
["job_name"]},
2179 fail_on_empty
=False,
2182 step
= "instantiated at VCA"
2183 self
.logger
.debug(logging_text
+ step
)
2185 self
._write
_configuration
_status
(
2186 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
2189 except Exception as e
: # TODO not use Exception but N2VC exception
2190 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2192 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
2195 "Exception while {} : {}".format(step
, e
), exc_info
=True
2197 self
._write
_configuration
_status
(
2198 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2200 raise LcmException("{}. {}".format(step
, e
)) from e
2202 def _write_ns_status(
2206 current_operation
: str,
2207 current_operation_id
: str,
2208 error_description
: str = None,
2209 error_detail
: str = None,
2210 other_update
: dict = None,
2213 Update db_nsr fields.
2216 :param current_operation:
2217 :param current_operation_id:
2218 :param error_description:
2219 :param error_detail:
2220 :param other_update: Other required changes at database if provided, will be cleared
2224 db_dict
= other_update
or {}
2227 ] = current_operation_id
# for backward compatibility
2228 db_dict
["_admin.current-operation"] = current_operation_id
2229 db_dict
["_admin.operation-type"] = (
2230 current_operation
if current_operation
!= "IDLE" else None
2232 db_dict
["currentOperation"] = current_operation
2233 db_dict
["currentOperationID"] = current_operation_id
2234 db_dict
["errorDescription"] = error_description
2235 db_dict
["errorDetail"] = error_detail
2238 db_dict
["nsState"] = ns_state
2239 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2240 except DbException
as e
:
2241 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2243 def _write_op_status(
2247 error_message
: str = None,
2248 queuePosition
: int = 0,
2249 operation_state
: str = None,
2250 other_update
: dict = None,
2253 db_dict
= other_update
or {}
2254 db_dict
["queuePosition"] = queuePosition
2255 if isinstance(stage
, list):
2256 db_dict
["stage"] = stage
[0]
2257 db_dict
["detailed-status"] = " ".join(stage
)
2258 elif stage
is not None:
2259 db_dict
["stage"] = str(stage
)
2261 if error_message
is not None:
2262 db_dict
["errorMessage"] = error_message
2263 if operation_state
is not None:
2264 db_dict
["operationState"] = operation_state
2265 db_dict
["statusEnteredTime"] = time()
2266 self
.update_db_2("nslcmops", op_id
, db_dict
)
2267 except DbException
as e
:
2269 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2272 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2274 nsr_id
= db_nsr
["_id"]
2275 # configurationStatus
2276 config_status
= db_nsr
.get("configurationStatus")
2279 "configurationStatus.{}.status".format(index
): status
2280 for index
, v
in enumerate(config_status
)
2284 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2286 except DbException
as e
:
2288 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2291 def _write_configuration_status(
2296 element_under_configuration
: str = None,
2297 element_type
: str = None,
2298 other_update
: dict = None,
2301 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2302 # .format(vca_index, status))
2305 db_path
= "configurationStatus.{}.".format(vca_index
)
2306 db_dict
= other_update
or {}
2308 db_dict
[db_path
+ "status"] = status
2309 if element_under_configuration
:
2311 db_path
+ "elementUnderConfiguration"
2312 ] = element_under_configuration
2314 db_dict
[db_path
+ "elementType"] = element_type
2315 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2316 except DbException
as e
:
2318 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2319 status
, nsr_id
, vca_index
, e
2323 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2325 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2326 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2327 Database is used because the result can be obtained from a different LCM worker in case of HA.
2328 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2329 :param db_nslcmop: database content of nslcmop
2330 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2331 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2332 computed 'vim-account-id'
2335 nslcmop_id
= db_nslcmop
["_id"]
2336 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2337 if placement_engine
== "PLA":
2339 logging_text
+ "Invoke and wait for placement optimization"
2341 await self
.msg
.aiowrite(
2342 "pla", "get_placement", {"nslcmopId": nslcmop_id
}, loop
=self
.loop
2344 db_poll_interval
= 5
2345 wait
= db_poll_interval
* 10
2347 while not pla_result
and wait
>= 0:
2348 await asyncio
.sleep(db_poll_interval
)
2349 wait
-= db_poll_interval
2350 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2351 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2355 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2358 for pla_vnf
in pla_result
["vnf"]:
2359 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2360 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2365 {"_id": vnfr
["_id"]},
2366 {"vim-account-id": pla_vnf
["vimAccountId"]},
2369 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2372 def update_nsrs_with_pla_result(self
, params
):
2374 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2376 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2378 except Exception as e
:
2379 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2381 async def instantiate(self
, nsr_id
, nslcmop_id
):
2384 :param nsr_id: ns instance to deploy
2385 :param nslcmop_id: operation to run
2389 # Try to lock HA task here
2390 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2391 if not task_is_locked_by_me
:
2393 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2397 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2398 self
.logger
.debug(logging_text
+ "Enter")
2400 # get all needed from database
2402 # database nsrs record
2405 # database nslcmops record
2408 # update operation on nsrs
2410 # update operation on nslcmops
2411 db_nslcmop_update
= {}
2413 timeout_ns_deploy
= self
.timeout
.ns_deploy
2415 nslcmop_operation_state
= None
2416 db_vnfrs
= {} # vnf's info indexed by member-index
2418 tasks_dict_info
= {} # from task to info text
2422 "Stage 1/5: preparation of the environment.",
2423 "Waiting for previous operations to terminate.",
2426 # ^ stage, step, VIM progress
2428 # wait for any previous tasks in process
2429 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2431 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2432 stage
[1] = "Reading from database."
2433 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2434 db_nsr_update
["detailed-status"] = "creating"
2435 db_nsr_update
["operational-status"] = "init"
2436 self
._write
_ns
_status
(
2438 ns_state
="BUILDING",
2439 current_operation
="INSTANTIATING",
2440 current_operation_id
=nslcmop_id
,
2441 other_update
=db_nsr_update
,
2443 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2445 # read from db: operation
2446 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2447 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2448 if db_nslcmop
["operationParams"].get("additionalParamsForVnf"):
2449 db_nslcmop
["operationParams"]["additionalParamsForVnf"] = json
.loads(
2450 db_nslcmop
["operationParams"]["additionalParamsForVnf"]
2452 ns_params
= db_nslcmop
.get("operationParams")
2453 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2454 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2457 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2458 self
.logger
.debug(logging_text
+ stage
[1])
2459 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2460 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2461 self
.logger
.debug(logging_text
+ stage
[1])
2462 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2463 self
.fs
.sync(db_nsr
["nsd-id"])
2465 # nsr_name = db_nsr["name"] # TODO short-name??
2467 # read from db: vnf's of this ns
2468 stage
[1] = "Getting vnfrs from db."
2469 self
.logger
.debug(logging_text
+ stage
[1])
2470 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2472 # read from db: vnfd's for every vnf
2473 db_vnfds
= [] # every vnfd data
2475 # for each vnf in ns, read vnfd
2476 for vnfr
in db_vnfrs_list
:
2477 if vnfr
.get("kdur"):
2479 for kdur
in vnfr
["kdur"]:
2480 if kdur
.get("additionalParams"):
2481 kdur
["additionalParams"] = json
.loads(
2482 kdur
["additionalParams"]
2484 kdur_list
.append(kdur
)
2485 vnfr
["kdur"] = kdur_list
2487 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2488 vnfd_id
= vnfr
["vnfd-id"]
2489 vnfd_ref
= vnfr
["vnfd-ref"]
2490 self
.fs
.sync(vnfd_id
)
2492 # if we haven't this vnfd, read it from db
2493 if vnfd_id
not in db_vnfds
:
2495 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2498 self
.logger
.debug(logging_text
+ stage
[1])
2499 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2502 db_vnfds
.append(vnfd
)
2504 # Get or generates the _admin.deployed.VCA list
2505 vca_deployed_list
= None
2506 if db_nsr
["_admin"].get("deployed"):
2507 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2508 if vca_deployed_list
is None:
2509 vca_deployed_list
= []
2510 configuration_status_list
= []
2511 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2512 db_nsr_update
["configurationStatus"] = configuration_status_list
2513 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2514 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2515 elif isinstance(vca_deployed_list
, dict):
2516 # maintain backward compatibility. Change a dict to list at database
2517 vca_deployed_list
= list(vca_deployed_list
.values())
2518 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2519 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2522 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2524 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2525 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2527 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2528 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2529 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2531 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2534 # n2vc_redesign STEP 2 Deploy Network Scenario
2535 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2536 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2538 stage
[1] = "Deploying KDUs."
2539 # self.logger.debug(logging_text + "Before deploy_kdus")
2540 # Call to deploy_kdus in case exists the "vdu:kdu" param
2541 await self
.deploy_kdus(
2542 logging_text
=logging_text
,
2544 nslcmop_id
=nslcmop_id
,
2547 task_instantiation_info
=tasks_dict_info
,
2550 stage
[1] = "Getting VCA public key."
2551 # n2vc_redesign STEP 1 Get VCA public ssh-key
2552 # feature 1429. Add n2vc public key to needed VMs
2553 n2vc_key
= self
.n2vc
.get_public_key()
2554 n2vc_key_list
= [n2vc_key
]
2555 if self
.vca_config
.public_key
:
2556 n2vc_key_list
.append(self
.vca_config
.public_key
)
2558 stage
[1] = "Deploying NS at VIM."
2559 task_ro
= asyncio
.ensure_future(
2560 self
.instantiate_RO(
2561 logging_text
=logging_text
,
2565 db_nslcmop
=db_nslcmop
,
2568 n2vc_key_list
=n2vc_key_list
,
2572 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2573 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2575 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2576 stage
[1] = "Deploying Execution Environments."
2577 self
.logger
.debug(logging_text
+ stage
[1])
2579 # create namespace and certificate if any helm based EE is present in the NS
2580 if check_helm_ee_in_ns(db_vnfds
):
2581 # TODO: create EE namespace
2582 # create TLS certificates
2583 await self
.vca_map
["helm-v3"].create_tls_certificate(
2584 secret_name
="ee-tls-{}".format(nsr_id
),
2587 usage
="server auth",
2590 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2591 for vnf_profile
in get_vnf_profiles(nsd
):
2592 vnfd_id
= vnf_profile
["vnfd-id"]
2593 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2594 member_vnf_index
= str(vnf_profile
["id"])
2595 db_vnfr
= db_vnfrs
[member_vnf_index
]
2596 base_folder
= vnfd
["_admin"]["storage"]
2603 # Get additional parameters
2604 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2605 if db_vnfr
.get("additionalParamsForVnf"):
2606 deploy_params
.update(
2607 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2610 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2611 if descriptor_config
:
2613 logging_text
=logging_text
2614 + "member_vnf_index={} ".format(member_vnf_index
),
2617 nslcmop_id
=nslcmop_id
,
2623 member_vnf_index
=member_vnf_index
,
2624 vdu_index
=vdu_index
,
2625 kdu_index
=kdu_index
,
2627 deploy_params
=deploy_params
,
2628 descriptor_config
=descriptor_config
,
2629 base_folder
=base_folder
,
2630 task_instantiation_info
=tasks_dict_info
,
2634 # Deploy charms for each VDU that supports one.
2635 for vdud
in get_vdu_list(vnfd
):
2637 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2638 vdur
= find_in_list(
2639 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2642 if vdur
.get("additionalParams"):
2643 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2645 deploy_params_vdu
= deploy_params
2646 deploy_params_vdu
["OSM"] = get_osm_params(
2647 db_vnfr
, vdu_id
, vdu_count_index
=0
2649 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2651 self
.logger
.debug("VDUD > {}".format(vdud
))
2653 "Descriptor config > {}".format(descriptor_config
)
2655 if descriptor_config
:
2659 for vdu_index
in range(vdud_count
):
2660 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2662 logging_text
=logging_text
2663 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2664 member_vnf_index
, vdu_id
, vdu_index
2668 nslcmop_id
=nslcmop_id
,
2674 kdu_index
=kdu_index
,
2675 member_vnf_index
=member_vnf_index
,
2676 vdu_index
=vdu_index
,
2678 deploy_params
=deploy_params_vdu
,
2679 descriptor_config
=descriptor_config
,
2680 base_folder
=base_folder
,
2681 task_instantiation_info
=tasks_dict_info
,
2684 for kdud
in get_kdu_list(vnfd
):
2685 kdu_name
= kdud
["name"]
2686 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2687 if descriptor_config
:
2691 kdu_index
, kdur
= next(
2693 for x
in enumerate(db_vnfr
["kdur"])
2694 if x
[1]["kdu-name"] == kdu_name
2696 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2697 if kdur
.get("additionalParams"):
2698 deploy_params_kdu
.update(
2699 parse_yaml_strings(kdur
["additionalParams"].copy())
2703 logging_text
=logging_text
,
2706 nslcmop_id
=nslcmop_id
,
2712 member_vnf_index
=member_vnf_index
,
2713 vdu_index
=vdu_index
,
2714 kdu_index
=kdu_index
,
2716 deploy_params
=deploy_params_kdu
,
2717 descriptor_config
=descriptor_config
,
2718 base_folder
=base_folder
,
2719 task_instantiation_info
=tasks_dict_info
,
2723 # Check if this NS has a charm configuration
2724 descriptor_config
= nsd
.get("ns-configuration")
2725 if descriptor_config
and descriptor_config
.get("juju"):
2728 member_vnf_index
= None
2735 # Get additional parameters
2736 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2737 if db_nsr
.get("additionalParamsForNs"):
2738 deploy_params
.update(
2739 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2741 base_folder
= nsd
["_admin"]["storage"]
2743 logging_text
=logging_text
,
2746 nslcmop_id
=nslcmop_id
,
2752 member_vnf_index
=member_vnf_index
,
2753 vdu_index
=vdu_index
,
2754 kdu_index
=kdu_index
,
2756 deploy_params
=deploy_params
,
2757 descriptor_config
=descriptor_config
,
2758 base_folder
=base_folder
,
2759 task_instantiation_info
=tasks_dict_info
,
2763 # rest of staff will be done at finally
2766 ROclient
.ROClientException
,
2772 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
2775 except asyncio
.CancelledError
:
2777 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
2779 exc
= "Operation was cancelled"
2780 except Exception as e
:
2781 exc
= traceback
.format_exc()
2782 self
.logger
.critical(
2783 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
2788 error_list
.append(str(exc
))
2790 # wait for pending tasks
2792 stage
[1] = "Waiting for instantiate pending tasks."
2793 self
.logger
.debug(logging_text
+ stage
[1])
2794 error_list
+= await self
._wait
_for
_tasks
(
2802 stage
[1] = stage
[2] = ""
2803 except asyncio
.CancelledError
:
2804 error_list
.append("Cancelled")
2805 # TODO cancel all tasks
2806 except Exception as exc
:
2807 error_list
.append(str(exc
))
2809 # update operation-status
2810 db_nsr_update
["operational-status"] = "running"
2811 # let's begin with VCA 'configured' status (later we can change it)
2812 db_nsr_update
["config-status"] = "configured"
2813 for task
, task_name
in tasks_dict_info
.items():
2814 if not task
.done() or task
.cancelled() or task
.exception():
2815 if task_name
.startswith(self
.task_name_deploy_vca
):
2816 # A N2VC task is pending
2817 db_nsr_update
["config-status"] = "failed"
2819 # RO or KDU task is pending
2820 db_nsr_update
["operational-status"] = "failed"
2822 # update status at database
2824 error_detail
= ". ".join(error_list
)
2825 self
.logger
.error(logging_text
+ error_detail
)
2826 error_description_nslcmop
= "{} Detail: {}".format(
2827 stage
[0], error_detail
2829 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
2830 nslcmop_id
, stage
[0]
2833 db_nsr_update
["detailed-status"] = (
2834 error_description_nsr
+ " Detail: " + error_detail
2836 db_nslcmop_update
["detailed-status"] = error_detail
2837 nslcmop_operation_state
= "FAILED"
2841 error_description_nsr
= error_description_nslcmop
= None
2843 db_nsr_update
["detailed-status"] = "Done"
2844 db_nslcmop_update
["detailed-status"] = "Done"
2845 nslcmop_operation_state
= "COMPLETED"
2848 self
._write
_ns
_status
(
2851 current_operation
="IDLE",
2852 current_operation_id
=None,
2853 error_description
=error_description_nsr
,
2854 error_detail
=error_detail
,
2855 other_update
=db_nsr_update
,
2857 self
._write
_op
_status
(
2860 error_message
=error_description_nslcmop
,
2861 operation_state
=nslcmop_operation_state
,
2862 other_update
=db_nslcmop_update
,
2865 if nslcmop_operation_state
:
2867 await self
.msg
.aiowrite(
2872 "nslcmop_id": nslcmop_id
,
2873 "operationState": nslcmop_operation_state
,
2877 except Exception as e
:
2879 logging_text
+ "kafka_write notification Exception {}".format(e
)
2882 self
.logger
.debug(logging_text
+ "Exit")
2883 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2885 def _get_vnfd(self
, vnfd_id
: str, projects_read
: str, cached_vnfds
: Dict
[str, Any
]):
2886 if vnfd_id
not in cached_vnfds
:
2887 cached_vnfds
[vnfd_id
] = self
.db
.get_one(
2888 "vnfds", {"id": vnfd_id
, "_admin.projects_read": projects_read
}
2890 return cached_vnfds
[vnfd_id
]
2892 def _get_vnfr(self
, nsr_id
: str, vnf_profile_id
: str, cached_vnfrs
: Dict
[str, Any
]):
2893 if vnf_profile_id
not in cached_vnfrs
:
2894 cached_vnfrs
[vnf_profile_id
] = self
.db
.get_one(
2897 "member-vnf-index-ref": vnf_profile_id
,
2898 "nsr-id-ref": nsr_id
,
2901 return cached_vnfrs
[vnf_profile_id
]
2903 def _is_deployed_vca_in_relation(
2904 self
, vca
: DeployedVCA
, relation
: Relation
2907 for endpoint
in (relation
.provider
, relation
.requirer
):
2908 if endpoint
["kdu-resource-profile-id"]:
2911 vca
.vnf_profile_id
== endpoint
.vnf_profile_id
2912 and vca
.vdu_profile_id
== endpoint
.vdu_profile_id
2913 and vca
.execution_environment_ref
== endpoint
.execution_environment_ref
2919 def _update_ee_relation_data_with_implicit_data(
2920 self
, nsr_id
, nsd
, ee_relation_data
, cached_vnfds
, vnf_profile_id
: str = None
2922 ee_relation_data
= safe_get_ee_relation(
2923 nsr_id
, ee_relation_data
, vnf_profile_id
=vnf_profile_id
2925 ee_relation_level
= EELevel
.get_level(ee_relation_data
)
2926 if (ee_relation_level
in (EELevel
.VNF
, EELevel
.VDU
)) and not ee_relation_data
[
2927 "execution-environment-ref"
2929 vnf_profile
= get_vnf_profile(nsd
, ee_relation_data
["vnf-profile-id"])
2930 vnfd_id
= vnf_profile
["vnfd-id"]
2931 project
= nsd
["_admin"]["projects_read"][0]
2932 db_vnfd
= self
._get
_vnfd
(vnfd_id
, project
, cached_vnfds
)
2935 if ee_relation_level
== EELevel
.VNF
2936 else ee_relation_data
["vdu-profile-id"]
2938 ee
= get_juju_ee_ref(db_vnfd
, entity_id
)
2941 f
"not execution environments found for ee_relation {ee_relation_data}"
2943 ee_relation_data
["execution-environment-ref"] = ee
["id"]
2944 return ee_relation_data
2946 def _get_ns_relations(
2949 nsd
: Dict
[str, Any
],
2951 cached_vnfds
: Dict
[str, Any
],
2952 ) -> List
[Relation
]:
2954 db_ns_relations
= get_ns_configuration_relation_list(nsd
)
2955 for r
in db_ns_relations
:
2956 provider_dict
= None
2957 requirer_dict
= None
2958 if all(key
in r
for key
in ("provider", "requirer")):
2959 provider_dict
= r
["provider"]
2960 requirer_dict
= r
["requirer"]
2961 elif "entities" in r
:
2962 provider_id
= r
["entities"][0]["id"]
2965 "endpoint": r
["entities"][0]["endpoint"],
2967 if provider_id
!= nsd
["id"]:
2968 provider_dict
["vnf-profile-id"] = provider_id
2969 requirer_id
= r
["entities"][1]["id"]
2972 "endpoint": r
["entities"][1]["endpoint"],
2974 if requirer_id
!= nsd
["id"]:
2975 requirer_dict
["vnf-profile-id"] = requirer_id
2978 "provider/requirer or entities must be included in the relation."
2980 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2981 nsr_id
, nsd
, provider_dict
, cached_vnfds
2983 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2984 nsr_id
, nsd
, requirer_dict
, cached_vnfds
2986 provider
= EERelation(relation_provider
)
2987 requirer
= EERelation(relation_requirer
)
2988 relation
= Relation(r
["name"], provider
, requirer
)
2989 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
2991 relations
.append(relation
)
2994 def _get_vnf_relations(
2997 nsd
: Dict
[str, Any
],
2999 cached_vnfds
: Dict
[str, Any
],
3000 ) -> List
[Relation
]:
3002 vnf_profile
= get_vnf_profile(nsd
, vca
.vnf_profile_id
)
3003 vnf_profile_id
= vnf_profile
["id"]
3004 vnfd_id
= vnf_profile
["vnfd-id"]
3005 project
= nsd
["_admin"]["projects_read"][0]
3006 db_vnfd
= self
._get
_vnfd
(vnfd_id
, project
, cached_vnfds
)
3007 db_vnf_relations
= get_relation_list(db_vnfd
, vnfd_id
)
3008 for r
in db_vnf_relations
:
3009 provider_dict
= None
3010 requirer_dict
= None
3011 if all(key
in r
for key
in ("provider", "requirer")):
3012 provider_dict
= r
["provider"]
3013 requirer_dict
= r
["requirer"]
3014 elif "entities" in r
:
3015 provider_id
= r
["entities"][0]["id"]
3018 "vnf-profile-id": vnf_profile_id
,
3019 "endpoint": r
["entities"][0]["endpoint"],
3021 if provider_id
!= vnfd_id
:
3022 provider_dict
["vdu-profile-id"] = provider_id
3023 requirer_id
= r
["entities"][1]["id"]
3026 "vnf-profile-id": vnf_profile_id
,
3027 "endpoint": r
["entities"][1]["endpoint"],
3029 if requirer_id
!= vnfd_id
:
3030 requirer_dict
["vdu-profile-id"] = requirer_id
3033 "provider/requirer or entities must be included in the relation."
3035 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3036 nsr_id
, nsd
, provider_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3038 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3039 nsr_id
, nsd
, requirer_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3041 provider
= EERelation(relation_provider
)
3042 requirer
= EERelation(relation_requirer
)
3043 relation
= Relation(r
["name"], provider
, requirer
)
3044 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
3046 relations
.append(relation
)
3049 def _get_kdu_resource_data(
3051 ee_relation
: EERelation
,
3052 db_nsr
: Dict
[str, Any
],
3053 cached_vnfds
: Dict
[str, Any
],
3054 ) -> DeployedK8sResource
:
3055 nsd
= get_nsd(db_nsr
)
3056 vnf_profiles
= get_vnf_profiles(nsd
)
3057 vnfd_id
= find_in_list(
3059 lambda vnf_profile
: vnf_profile
["id"] == ee_relation
.vnf_profile_id
,
3061 project
= nsd
["_admin"]["projects_read"][0]
3062 db_vnfd
= self
._get
_vnfd
(vnfd_id
, project
, cached_vnfds
)
3063 kdu_resource_profile
= get_kdu_resource_profile(
3064 db_vnfd
, ee_relation
.kdu_resource_profile_id
3066 kdu_name
= kdu_resource_profile
["kdu-name"]
3067 deployed_kdu
, _
= get_deployed_kdu(
3068 db_nsr
.get("_admin", ()).get("deployed", ()),
3070 ee_relation
.vnf_profile_id
,
3072 deployed_kdu
.update({"resource-name": kdu_resource_profile
["resource-name"]})
3075 def _get_deployed_component(
3077 ee_relation
: EERelation
,
3078 db_nsr
: Dict
[str, Any
],
3079 cached_vnfds
: Dict
[str, Any
],
3080 ) -> DeployedComponent
:
3081 nsr_id
= db_nsr
["_id"]
3082 deployed_component
= None
3083 ee_level
= EELevel
.get_level(ee_relation
)
3084 if ee_level
== EELevel
.NS
:
3085 vca
= get_deployed_vca(db_nsr
, {"vdu_id": None, "member-vnf-index": None})
3087 deployed_component
= DeployedVCA(nsr_id
, vca
)
3088 elif ee_level
== EELevel
.VNF
:
3089 vca
= get_deployed_vca(
3093 "member-vnf-index": ee_relation
.vnf_profile_id
,
3094 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3098 deployed_component
= DeployedVCA(nsr_id
, vca
)
3099 elif ee_level
== EELevel
.VDU
:
3100 vca
= get_deployed_vca(
3103 "vdu_id": ee_relation
.vdu_profile_id
,
3104 "member-vnf-index": ee_relation
.vnf_profile_id
,
3105 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3109 deployed_component
= DeployedVCA(nsr_id
, vca
)
3110 elif ee_level
== EELevel
.KDU
:
3111 kdu_resource_data
= self
._get
_kdu
_resource
_data
(
3112 ee_relation
, db_nsr
, cached_vnfds
3114 if kdu_resource_data
:
3115 deployed_component
= DeployedK8sResource(kdu_resource_data
)
3116 return deployed_component
3118 async def _add_relation(
3122 db_nsr
: Dict
[str, Any
],
3123 cached_vnfds
: Dict
[str, Any
],
3124 cached_vnfrs
: Dict
[str, Any
],
3126 deployed_provider
= self
._get
_deployed
_component
(
3127 relation
.provider
, db_nsr
, cached_vnfds
3129 deployed_requirer
= self
._get
_deployed
_component
(
3130 relation
.requirer
, db_nsr
, cached_vnfds
3134 and deployed_requirer
3135 and deployed_provider
.config_sw_installed
3136 and deployed_requirer
.config_sw_installed
3138 provider_db_vnfr
= (
3140 relation
.provider
.nsr_id
,
3141 relation
.provider
.vnf_profile_id
,
3144 if relation
.provider
.vnf_profile_id
3147 requirer_db_vnfr
= (
3149 relation
.requirer
.nsr_id
,
3150 relation
.requirer
.vnf_profile_id
,
3153 if relation
.requirer
.vnf_profile_id
3156 provider_vca_id
= self
.get_vca_id(provider_db_vnfr
, db_nsr
)
3157 requirer_vca_id
= self
.get_vca_id(requirer_db_vnfr
, db_nsr
)
3158 provider_relation_endpoint
= RelationEndpoint(
3159 deployed_provider
.ee_id
,
3161 relation
.provider
.endpoint
,
3163 requirer_relation_endpoint
= RelationEndpoint(
3164 deployed_requirer
.ee_id
,
3166 relation
.requirer
.endpoint
,
3169 await self
.vca_map
[vca_type
].add_relation(
3170 provider
=provider_relation_endpoint
,
3171 requirer
=requirer_relation_endpoint
,
3173 except N2VCException
as exception
:
3174 self
.logger
.error(exception
)
3175 raise LcmException(exception
)
3179 async def _add_vca_relations(
3185 timeout
: int = 3600,
3189 # 1. find all relations for this VCA
3190 # 2. wait for other peers related
3194 # STEP 1: find all relations for this VCA
3197 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3198 nsd
= get_nsd(db_nsr
)
3201 deployed_vca_dict
= get_deployed_vca_list(db_nsr
)[vca_index
]
3202 my_vca
= DeployedVCA(nsr_id
, deployed_vca_dict
)
3207 relations
.extend(self
._get
_ns
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3208 relations
.extend(self
._get
_vnf
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3210 # if no relations, terminate
3212 self
.logger
.debug(logging_text
+ " No relations")
3215 self
.logger
.debug(logging_text
+ " adding relations {}".format(relations
))
3222 if now
- start
>= timeout
:
3223 self
.logger
.error(logging_text
+ " : timeout adding relations")
3226 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3227 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3229 # for each relation, find the VCA's related
3230 for relation
in relations
.copy():
3231 added
= await self
._add
_relation
(
3239 relations
.remove(relation
)
3242 self
.logger
.debug("Relations added")
3244 await asyncio
.sleep(5.0)
3248 except Exception as e
:
3249 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
3252 async def _install_kdu(
3260 k8s_instance_info
: dict,
3261 k8params
: dict = None,
3267 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
3270 "collection": "nsrs",
3271 "filter": {"_id": nsr_id
},
3272 "path": nsr_db_path
,
3275 if k8s_instance_info
.get("kdu-deployment-name"):
3276 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
3278 kdu_instance
= self
.k8scluster_map
[
3280 ].generate_kdu_instance_name(
3281 db_dict
=db_dict_install
,
3282 kdu_model
=k8s_instance_info
["kdu-model"],
3283 kdu_name
=k8s_instance_info
["kdu-name"],
3286 # Update the nsrs table with the kdu-instance value
3290 _desc
={nsr_db_path
+ ".kdu-instance": kdu_instance
},
3293 # Update the nsrs table with the actual namespace being used, if the k8scluster-type is `juju` or
3294 # `juju-bundle`. This verification is needed because there is not a standard/homogeneous namespace
3295 # between the Helm Charts and Juju Bundles-based KNFs. If we found a way of having an homogeneous
3296 # namespace, this first verification could be removed, and the next step would be done for any kind
3298 # TODO -> find a way to have an homogeneous namespace between the Helm Charts and Juju Bundles-based
3299 # KNFs (Bug 2027: https://osm.etsi.org/bugzilla/show_bug.cgi?id=2027)
3300 if k8sclustertype
in ("juju", "juju-bundle"):
3301 # First, verify if the current namespace is present in the `_admin.projects_read` (if not, it means
3302 # that the user passed a namespace which he wants its KDU to be deployed in)
3308 "_admin.projects_write": k8s_instance_info
["namespace"],
3309 "_admin.projects_read": k8s_instance_info
["namespace"],
3315 f
"Updating namespace/model for Juju Bundle from {k8s_instance_info['namespace']} to {kdu_instance}"
3320 _desc
={f
"{nsr_db_path}.namespace": kdu_instance
},
3322 k8s_instance_info
["namespace"] = kdu_instance
3324 await self
.k8scluster_map
[k8sclustertype
].install(
3325 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3326 kdu_model
=k8s_instance_info
["kdu-model"],
3329 db_dict
=db_dict_install
,
3331 kdu_name
=k8s_instance_info
["kdu-name"],
3332 namespace
=k8s_instance_info
["namespace"],
3333 kdu_instance
=kdu_instance
,
3337 # Obtain services to obtain management service ip
3338 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
3339 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3340 kdu_instance
=kdu_instance
,
3341 namespace
=k8s_instance_info
["namespace"],
3344 # Obtain management service info (if exists)
3345 vnfr_update_dict
= {}
3346 kdu_config
= get_configuration(vnfd
, kdud
["name"])
3348 target_ee_list
= kdu_config
.get("execution-environment-list", [])
3353 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
3356 for service
in kdud
.get("service", [])
3357 if service
.get("mgmt-service")
3359 for mgmt_service
in mgmt_services
:
3360 for service
in services
:
3361 if service
["name"].startswith(mgmt_service
["name"]):
3362 # Mgmt service found, Obtain service ip
3363 ip
= service
.get("external_ip", service
.get("cluster_ip"))
3364 if isinstance(ip
, list) and len(ip
) == 1:
3368 "kdur.{}.ip-address".format(kdu_index
)
3371 # Check if must update also mgmt ip at the vnf
3372 service_external_cp
= mgmt_service
.get(
3373 "external-connection-point-ref"
3375 if service_external_cp
:
3377 deep_get(vnfd
, ("mgmt-interface", "cp"))
3378 == service_external_cp
3380 vnfr_update_dict
["ip-address"] = ip
3385 "external-connection-point-ref", ""
3387 == service_external_cp
,
3390 "kdur.{}.ip-address".format(kdu_index
)
3395 "Mgmt service name: {} not found".format(
3396 mgmt_service
["name"]
3400 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3401 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3403 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3406 and kdu_config
.get("initial-config-primitive")
3407 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3409 initial_config_primitive_list
= kdu_config
.get(
3410 "initial-config-primitive"
3412 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3414 for initial_config_primitive
in initial_config_primitive_list
:
3415 primitive_params_
= self
._map
_primitive
_params
(
3416 initial_config_primitive
, {}, {}
3419 await asyncio
.wait_for(
3420 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3421 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3422 kdu_instance
=kdu_instance
,
3423 primitive_name
=initial_config_primitive
["name"],
3424 params
=primitive_params_
,
3425 db_dict
=db_dict_install
,
3431 except Exception as e
:
3432 # Prepare update db with error and raise exception
3435 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3439 vnfr_data
.get("_id"),
3440 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3443 # ignore to keep original exception
3445 # reraise original error
3450 async def deploy_kdus(
3457 task_instantiation_info
,
3459 # Launch kdus if present in the descriptor
3461 k8scluster_id_2_uuic
= {
3462 "helm-chart-v3": {},
3467 async def _get_cluster_id(cluster_id
, cluster_type
):
3468 nonlocal k8scluster_id_2_uuic
3469 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3470 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3472 # check if K8scluster is creating and wait look if previous tasks in process
3473 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3474 "k8scluster", cluster_id
3477 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3478 task_name
, cluster_id
3480 self
.logger
.debug(logging_text
+ text
)
3481 await asyncio
.wait(task_dependency
, timeout
=3600)
3483 db_k8scluster
= self
.db
.get_one(
3484 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3486 if not db_k8scluster
:
3487 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3489 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3491 if cluster_type
== "helm-chart-v3":
3493 # backward compatibility for existing clusters that have not been initialized for helm v3
3494 k8s_credentials
= yaml
.safe_dump(
3495 db_k8scluster
.get("credentials")
3497 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3498 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3500 db_k8scluster_update
= {}
3501 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3502 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3503 db_k8scluster_update
[
3504 "_admin.helm-chart-v3.created"
3506 db_k8scluster_update
[
3507 "_admin.helm-chart-v3.operationalState"
3510 "k8sclusters", cluster_id
, db_k8scluster_update
3512 except Exception as e
:
3515 + "error initializing helm-v3 cluster: {}".format(str(e
))
3518 "K8s cluster '{}' has not been initialized for '{}'".format(
3519 cluster_id
, cluster_type
3524 "K8s cluster '{}' has not been initialized for '{}'".format(
3525 cluster_id
, cluster_type
3528 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3531 logging_text
+= "Deploy kdus: "
3534 db_nsr_update
= {"_admin.deployed.K8s": []}
3535 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3538 updated_cluster_list
= []
3539 updated_v3_cluster_list
= []
3541 for vnfr_data
in db_vnfrs
.values():
3542 vca_id
= self
.get_vca_id(vnfr_data
, {})
3543 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3544 # Step 0: Prepare and set parameters
3545 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3546 vnfd_id
= vnfr_data
.get("vnfd-id")
3547 vnfd_with_id
= find_in_list(
3548 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3552 for kdud
in vnfd_with_id
["kdu"]
3553 if kdud
["name"] == kdur
["kdu-name"]
3555 namespace
= kdur
.get("k8s-namespace")
3556 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3557 if kdur
.get("helm-chart"):
3558 kdumodel
= kdur
["helm-chart"]
3559 # Default version: helm3, if helm-version is v2 assign v2
3560 k8sclustertype
= "helm-chart-v3"
3561 self
.logger
.debug("kdur: {}".format(kdur
))
3563 kdur
.get("helm-version")
3564 and kdur
.get("helm-version") == "v2"
3566 k8sclustertype
= "helm-chart"
3567 elif kdur
.get("juju-bundle"):
3568 kdumodel
= kdur
["juju-bundle"]
3569 k8sclustertype
= "juju-bundle"
3572 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3573 "juju-bundle. Maybe an old NBI version is running".format(
3574 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3577 # check if kdumodel is a file and exists
3579 vnfd_with_id
= find_in_list(
3580 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3582 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3583 if storage
: # may be not present if vnfd has not artifacts
3584 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3585 if storage
["pkg-dir"]:
3586 filename
= "{}/{}/{}s/{}".format(
3593 filename
= "{}/Scripts/{}s/{}".format(
3598 if self
.fs
.file_exists(
3599 filename
, mode
="file"
3600 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3601 kdumodel
= self
.fs
.path
+ filename
3602 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3604 except Exception: # it is not a file
3607 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3608 step
= "Synchronize repos for k8s cluster '{}'".format(
3611 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3615 k8sclustertype
== "helm-chart"
3616 and cluster_uuid
not in updated_cluster_list
3618 k8sclustertype
== "helm-chart-v3"
3619 and cluster_uuid
not in updated_v3_cluster_list
3621 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3622 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3623 cluster_uuid
=cluster_uuid
3626 if del_repo_list
or added_repo_dict
:
3627 if k8sclustertype
== "helm-chart":
3629 "_admin.helm_charts_added." + item
: None
3630 for item
in del_repo_list
3633 "_admin.helm_charts_added." + item
: name
3634 for item
, name
in added_repo_dict
.items()
3636 updated_cluster_list
.append(cluster_uuid
)
3637 elif k8sclustertype
== "helm-chart-v3":
3639 "_admin.helm_charts_v3_added." + item
: None
3640 for item
in del_repo_list
3643 "_admin.helm_charts_v3_added." + item
: name
3644 for item
, name
in added_repo_dict
.items()
3646 updated_v3_cluster_list
.append(cluster_uuid
)
3648 logging_text
+ "repos synchronized on k8s cluster "
3649 "'{}' to_delete: {}, to_add: {}".format(
3650 k8s_cluster_id
, del_repo_list
, added_repo_dict
3655 {"_id": k8s_cluster_id
},
3661 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3662 vnfr_data
["member-vnf-index-ref"],
3666 k8s_instance_info
= {
3667 "kdu-instance": None,
3668 "k8scluster-uuid": cluster_uuid
,
3669 "k8scluster-type": k8sclustertype
,
3670 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3671 "kdu-name": kdur
["kdu-name"],
3672 "kdu-model": kdumodel
,
3673 "namespace": namespace
,
3674 "kdu-deployment-name": kdu_deployment_name
,
3676 db_path
= "_admin.deployed.K8s.{}".format(index
)
3677 db_nsr_update
[db_path
] = k8s_instance_info
3678 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3679 vnfd_with_id
= find_in_list(
3680 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3682 task
= asyncio
.ensure_future(
3691 k8params
=desc_params
,
3696 self
.lcm_tasks
.register(
3700 "instantiate_KDU-{}".format(index
),
3703 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3709 except (LcmException
, asyncio
.CancelledError
):
3711 except Exception as e
:
3712 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3713 if isinstance(e
, (N2VCException
, DbException
)):
3714 self
.logger
.error(logging_text
+ msg
)
3716 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3717 raise LcmException(msg
)
3720 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3740 task_instantiation_info
,
3743 # launch instantiate_N2VC in a asyncio task and register task object
3744 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3745 # if not found, create one entry and update database
3746 # fill db_nsr._admin.deployed.VCA.<index>
3749 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
3753 get_charm_name
= False
3754 if "execution-environment-list" in descriptor_config
:
3755 ee_list
= descriptor_config
.get("execution-environment-list", [])
3756 elif "juju" in descriptor_config
:
3757 ee_list
= [descriptor_config
] # ns charms
3758 if "execution-environment-list" not in descriptor_config
:
3759 # charm name is only required for ns charms
3760 get_charm_name
= True
3761 else: # other types as script are not supported
3764 for ee_item
in ee_list
:
3767 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3768 ee_item
.get("juju"), ee_item
.get("helm-chart")
3771 ee_descriptor_id
= ee_item
.get("id")
3772 if ee_item
.get("juju"):
3773 vca_name
= ee_item
["juju"].get("charm")
3775 charm_name
= self
.find_charm_name(db_nsr
, str(vca_name
))
3778 if ee_item
["juju"].get("charm") is not None
3781 if ee_item
["juju"].get("cloud") == "k8s":
3782 vca_type
= "k8s_proxy_charm"
3783 elif ee_item
["juju"].get("proxy") is False:
3784 vca_type
= "native_charm"
3785 elif ee_item
.get("helm-chart"):
3786 vca_name
= ee_item
["helm-chart"]
3787 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
3790 vca_type
= "helm-v3"
3793 logging_text
+ "skipping non juju neither charm configuration"
3798 for vca_index
, vca_deployed
in enumerate(
3799 db_nsr
["_admin"]["deployed"]["VCA"]
3801 if not vca_deployed
:
3804 vca_deployed
.get("member-vnf-index") == member_vnf_index
3805 and vca_deployed
.get("vdu_id") == vdu_id
3806 and vca_deployed
.get("kdu_name") == kdu_name
3807 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
3808 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
3812 # not found, create one.
3814 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
3817 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
3819 target
+= "/kdu/{}".format(kdu_name
)
3821 "target_element": target
,
3822 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3823 "member-vnf-index": member_vnf_index
,
3825 "kdu_name": kdu_name
,
3826 "vdu_count_index": vdu_index
,
3827 "operational-status": "init", # TODO revise
3828 "detailed-status": "", # TODO revise
3829 "step": "initial-deploy", # TODO revise
3831 "vdu_name": vdu_name
,
3833 "ee_descriptor_id": ee_descriptor_id
,
3834 "charm_name": charm_name
,
3838 # create VCA and configurationStatus in db
3840 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
3841 "configurationStatus.{}".format(vca_index
): dict(),
3843 self
.update_db_2("nsrs", nsr_id
, db_dict
)
3845 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
3847 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
3848 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
3849 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
3852 task_n2vc
= asyncio
.ensure_future(
3853 self
.instantiate_N2VC(
3854 logging_text
=logging_text
,
3855 vca_index
=vca_index
,
3861 vdu_index
=vdu_index
,
3862 kdu_index
=kdu_index
,
3863 deploy_params
=deploy_params
,
3864 config_descriptor
=descriptor_config
,
3865 base_folder
=base_folder
,
3866 nslcmop_id
=nslcmop_id
,
3870 ee_config_descriptor
=ee_item
,
3873 self
.lcm_tasks
.register(
3877 "instantiate_N2VC-{}".format(vca_index
),
3880 task_instantiation_info
[
3882 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
3883 member_vnf_index
or "", vdu_id
or ""
3887 def _create_nslcmop(nsr_id
, operation
, params
):
3889 Creates a ns-lcm-opp content to be stored at database.
3890 :param nsr_id: internal id of the instance
3891 :param operation: instantiate, terminate, scale, action, ...
3892 :param params: user parameters for the operation
3893 :return: dictionary following SOL005 format
3895 # Raise exception if invalid arguments
3896 if not (nsr_id
and operation
and params
):
3898 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3905 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3906 "operationState": "PROCESSING",
3907 "statusEnteredTime": now
,
3908 "nsInstanceId": nsr_id
,
3909 "lcmOperationType": operation
,
3911 "isAutomaticInvocation": False,
3912 "operationParams": params
,
3913 "isCancelPending": False,
3915 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
3916 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
3921 def _format_additional_params(self
, params
):
3922 params
= params
or {}
3923 for key
, value
in params
.items():
3924 if str(value
).startswith("!!yaml "):
3925 params
[key
] = yaml
.safe_load(value
[7:])
3928 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
3929 primitive
= seq
.get("name")
3930 primitive_params
= {}
3932 "member_vnf_index": vnf_index
,
3933 "primitive": primitive
,
3934 "primitive_params": primitive_params
,
3937 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
3941 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
3942 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
3943 if op
.get("operationState") == "COMPLETED":
3944 # b. Skip sub-operation
3945 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3946 return self
.SUBOPERATION_STATUS_SKIP
3948 # c. retry executing sub-operation
3949 # The sub-operation exists, and operationState != 'COMPLETED'
3950 # Update operationState = 'PROCESSING' to indicate a retry.
3951 operationState
= "PROCESSING"
3952 detailed_status
= "In progress"
3953 self
._update
_suboperation
_status
(
3954 db_nslcmop
, op_index
, operationState
, detailed_status
3956 # Return the sub-operation index
3957 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3958 # with arguments extracted from the sub-operation
3961 # Find a sub-operation where all keys in a matching dictionary must match
3962 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3963 def _find_suboperation(self
, db_nslcmop
, match
):
3964 if db_nslcmop
and match
:
3965 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
3966 for i
, op
in enumerate(op_list
):
3967 if all(op
.get(k
) == match
[k
] for k
in match
):
3969 return self
.SUBOPERATION_STATUS_NOT_FOUND
3971 # Update status for a sub-operation given its index
3972 def _update_suboperation_status(
3973 self
, db_nslcmop
, op_index
, operationState
, detailed_status
3975 # Update DB for HA tasks
3976 q_filter
= {"_id": db_nslcmop
["_id"]}
3978 "_admin.operations.{}.operationState".format(op_index
): operationState
,
3979 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
3982 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
3985 # Add sub-operation, return the index of the added sub-operation
3986 # Optionally, set operationState, detailed-status, and operationType
3987 # Status and type are currently set for 'scale' sub-operations:
3988 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3989 # 'detailed-status' : status message
3990 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3991 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3992 def _add_suboperation(
4000 mapped_primitive_params
,
4001 operationState
=None,
4002 detailed_status
=None,
4005 RO_scaling_info
=None,
4008 return self
.SUBOPERATION_STATUS_NOT_FOUND
4009 # Get the "_admin.operations" list, if it exists
4010 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
4011 op_list
= db_nslcmop_admin
.get("operations")
4012 # Create or append to the "_admin.operations" list
4014 "member_vnf_index": vnf_index
,
4016 "vdu_count_index": vdu_count_index
,
4017 "primitive": primitive
,
4018 "primitive_params": mapped_primitive_params
,
4021 new_op
["operationState"] = operationState
4023 new_op
["detailed-status"] = detailed_status
4025 new_op
["lcmOperationType"] = operationType
4027 new_op
["RO_nsr_id"] = RO_nsr_id
4029 new_op
["RO_scaling_info"] = RO_scaling_info
4031 # No existing operations, create key 'operations' with current operation as first list element
4032 db_nslcmop_admin
.update({"operations": [new_op
]})
4033 op_list
= db_nslcmop_admin
.get("operations")
4035 # Existing operations, append operation to list
4036 op_list
.append(new_op
)
4038 db_nslcmop_update
= {"_admin.operations": op_list
}
4039 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
4040 op_index
= len(op_list
) - 1
4043 # Helper methods for scale() sub-operations
4045 # pre-scale/post-scale:
4046 # Check for 3 different cases:
4047 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
4048 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
4049 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
4050 def _check_or_add_scale_suboperation(
4054 vnf_config_primitive
,
4058 RO_scaling_info
=None,
4060 # Find this sub-operation
4061 if RO_nsr_id
and RO_scaling_info
:
4062 operationType
= "SCALE-RO"
4064 "member_vnf_index": vnf_index
,
4065 "RO_nsr_id": RO_nsr_id
,
4066 "RO_scaling_info": RO_scaling_info
,
4070 "member_vnf_index": vnf_index
,
4071 "primitive": vnf_config_primitive
,
4072 "primitive_params": primitive_params
,
4073 "lcmOperationType": operationType
,
4075 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
4076 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
4077 # a. New sub-operation
4078 # The sub-operation does not exist, add it.
4079 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
4080 # The following parameters are set to None for all kind of scaling:
4082 vdu_count_index
= None
4084 if RO_nsr_id
and RO_scaling_info
:
4085 vnf_config_primitive
= None
4086 primitive_params
= None
4089 RO_scaling_info
= None
4090 # Initial status for sub-operation
4091 operationState
= "PROCESSING"
4092 detailed_status
= "In progress"
4093 # Add sub-operation for pre/post-scaling (zero or more operations)
4094 self
._add
_suboperation
(
4100 vnf_config_primitive
,
4108 return self
.SUBOPERATION_STATUS_NEW
4110 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
4111 # or op_index (operationState != 'COMPLETED')
4112 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
4114 # Function to return execution_environment id
4116 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
4117 # TODO vdu_index_count
4118 for vca
in vca_deployed_list
:
4119 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
4122 async def destroy_N2VC(
4130 exec_primitives
=True,
4135 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
4136 :param logging_text:
4138 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
4139 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
4140 :param vca_index: index in the database _admin.deployed.VCA
4141 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
4142 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
4143 not executed properly
4144 :param scaling_in: True destroys the application, False destroys the model
4145 :return: None or exception
4150 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
4151 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
4155 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
4157 # execute terminate_primitives
4159 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
4160 config_descriptor
.get("terminate-config-primitive"),
4161 vca_deployed
.get("ee_descriptor_id"),
4163 vdu_id
= vca_deployed
.get("vdu_id")
4164 vdu_count_index
= vca_deployed
.get("vdu_count_index")
4165 vdu_name
= vca_deployed
.get("vdu_name")
4166 vnf_index
= vca_deployed
.get("member-vnf-index")
4167 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
4168 for seq
in terminate_primitives
:
4169 # For each sequence in list, get primitive and call _ns_execute_primitive()
4170 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
4171 vnf_index
, seq
.get("name")
4173 self
.logger
.debug(logging_text
+ step
)
4174 # Create the primitive for each sequence, i.e. "primitive": "touch"
4175 primitive
= seq
.get("name")
4176 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
4181 self
._add
_suboperation
(
4188 mapped_primitive_params
,
4190 # Sub-operations: Call _ns_execute_primitive() instead of action()
4192 result
, result_detail
= await self
._ns
_execute
_primitive
(
4193 vca_deployed
["ee_id"],
4195 mapped_primitive_params
,
4199 except LcmException
:
4200 # this happens when VCA is not deployed. In this case it is not needed to terminate
4202 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
4203 if result
not in result_ok
:
4205 "terminate_primitive {} for vnf_member_index={} fails with "
4206 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
4208 # set that this VCA do not need terminated
4209 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
4213 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
4216 # Delete Prometheus Jobs if any
4217 # This uses NSR_ID, so it will destroy any jobs under this index
4218 self
.db
.del_list("prometheus_jobs", {"nsr_id": db_nslcmop
["nsInstanceId"]})
4221 await self
.vca_map
[vca_type
].delete_execution_environment(
4222 vca_deployed
["ee_id"],
4223 scaling_in
=scaling_in
,
4228 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
4229 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
4230 namespace
= "." + db_nsr
["_id"]
4232 await self
.n2vc
.delete_namespace(
4233 namespace
=namespace
,
4234 total_timeout
=self
.timeout
.charm_delete
,
4237 except N2VCNotFound
: # already deleted. Skip
4239 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
4241 async def terminate(self
, nsr_id
, nslcmop_id
):
4242 # Try to lock HA task here
4243 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4244 if not task_is_locked_by_me
:
4247 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4248 self
.logger
.debug(logging_text
+ "Enter")
4249 timeout_ns_terminate
= self
.timeout
.ns_terminate
4252 operation_params
= None
4254 error_list
= [] # annotates all failed error messages
4255 db_nslcmop_update
= {}
4256 autoremove
= False # autoremove after terminated
4257 tasks_dict_info
= {}
4260 "Stage 1/3: Preparing task.",
4261 "Waiting for previous operations to terminate.",
4264 # ^ contains [stage, step, VIM-status]
4266 # wait for any previous tasks in process
4267 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4269 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4270 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4271 operation_params
= db_nslcmop
.get("operationParams") or {}
4272 if operation_params
.get("timeout_ns_terminate"):
4273 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4274 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4275 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4277 db_nsr_update
["operational-status"] = "terminating"
4278 db_nsr_update
["config-status"] = "terminating"
4279 self
._write
_ns
_status
(
4281 ns_state
="TERMINATING",
4282 current_operation
="TERMINATING",
4283 current_operation_id
=nslcmop_id
,
4284 other_update
=db_nsr_update
,
4286 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4287 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4288 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4291 stage
[1] = "Getting vnf descriptors from db."
4292 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4294 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4296 db_vnfds_from_id
= {}
4297 db_vnfds_from_member_index
= {}
4299 for vnfr
in db_vnfrs_list
:
4300 vnfd_id
= vnfr
["vnfd-id"]
4301 if vnfd_id
not in db_vnfds_from_id
:
4302 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4303 db_vnfds_from_id
[vnfd_id
] = vnfd
4304 db_vnfds_from_member_index
[
4305 vnfr
["member-vnf-index-ref"]
4306 ] = db_vnfds_from_id
[vnfd_id
]
4308 # Destroy individual execution environments when there are terminating primitives.
4309 # Rest of EE will be deleted at once
4310 # TODO - check before calling _destroy_N2VC
4311 # if not operation_params.get("skip_terminate_primitives"):#
4312 # or not vca.get("needed_terminate"):
4313 stage
[0] = "Stage 2/3 execute terminating primitives."
4314 self
.logger
.debug(logging_text
+ stage
[0])
4315 stage
[1] = "Looking execution environment that needs terminate."
4316 self
.logger
.debug(logging_text
+ stage
[1])
4318 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4319 config_descriptor
= None
4320 vca_member_vnf_index
= vca
.get("member-vnf-index")
4321 vca_id
= self
.get_vca_id(
4322 db_vnfrs_dict
.get(vca_member_vnf_index
)
4323 if vca_member_vnf_index
4327 if not vca
or not vca
.get("ee_id"):
4329 if not vca
.get("member-vnf-index"):
4331 config_descriptor
= db_nsr
.get("ns-configuration")
4332 elif vca
.get("vdu_id"):
4333 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4334 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4335 elif vca
.get("kdu_name"):
4336 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4337 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4339 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4340 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4341 vca_type
= vca
.get("type")
4342 exec_terminate_primitives
= not operation_params
.get(
4343 "skip_terminate_primitives"
4344 ) and vca
.get("needed_terminate")
4345 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4346 # pending native charms
4348 True if vca_type
in ("helm", "helm-v3", "native_charm") else False
4350 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4351 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4352 task
= asyncio
.ensure_future(
4360 exec_terminate_primitives
,
4364 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4366 # wait for pending tasks of terminate primitives
4370 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4372 error_list
= await self
._wait
_for
_tasks
(
4375 min(self
.timeout
.charm_delete
, timeout_ns_terminate
),
4379 tasks_dict_info
.clear()
4381 return # raise LcmException("; ".join(error_list))
4383 # remove All execution environments at once
4384 stage
[0] = "Stage 3/3 delete all."
4386 if nsr_deployed
.get("VCA"):
4387 stage
[1] = "Deleting all execution environments."
4388 self
.logger
.debug(logging_text
+ stage
[1])
4389 vca_id
= self
.get_vca_id({}, db_nsr
)
4390 task_delete_ee
= asyncio
.ensure_future(
4392 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4393 timeout
=self
.timeout
.charm_delete
,
4396 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4397 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4399 # Delete Namespace and Certificates if necessary
4400 if check_helm_ee_in_ns(list(db_vnfds_from_member_index
.values())):
4401 await self
.vca_map
["helm-v3"].delete_tls_certificate(
4402 certificate_name
=db_nslcmop
["nsInstanceId"],
4404 # TODO: Delete namespace
4406 # Delete from k8scluster
4407 stage
[1] = "Deleting KDUs."
4408 self
.logger
.debug(logging_text
+ stage
[1])
4409 # print(nsr_deployed)
4410 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4411 if not kdu
or not kdu
.get("kdu-instance"):
4413 kdu_instance
= kdu
.get("kdu-instance")
4414 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4415 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4416 vca_id
= self
.get_vca_id({}, db_nsr
)
4417 task_delete_kdu_instance
= asyncio
.ensure_future(
4418 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4419 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4420 kdu_instance
=kdu_instance
,
4422 namespace
=kdu
.get("namespace"),
4428 + "Unknown k8s deployment type {}".format(
4429 kdu
.get("k8scluster-type")
4434 task_delete_kdu_instance
4435 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4438 stage
[1] = "Deleting ns from VIM."
4439 if self
.ro_config
.ng
:
4440 task_delete_ro
= asyncio
.ensure_future(
4441 self
._terminate
_ng
_ro
(
4442 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4445 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4447 # rest of staff will be done at finally
4450 ROclient
.ROClientException
,
4455 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4457 except asyncio
.CancelledError
:
4459 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4461 exc
= "Operation was cancelled"
4462 except Exception as e
:
4463 exc
= traceback
.format_exc()
4464 self
.logger
.critical(
4465 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4470 error_list
.append(str(exc
))
4472 # wait for pending tasks
4474 stage
[1] = "Waiting for terminate pending tasks."
4475 self
.logger
.debug(logging_text
+ stage
[1])
4476 error_list
+= await self
._wait
_for
_tasks
(
4479 timeout_ns_terminate
,
4483 stage
[1] = stage
[2] = ""
4484 except asyncio
.CancelledError
:
4485 error_list
.append("Cancelled")
4486 # TODO cancell all tasks
4487 except Exception as exc
:
4488 error_list
.append(str(exc
))
4489 # update status at database
4491 error_detail
= "; ".join(error_list
)
4492 # self.logger.error(logging_text + error_detail)
4493 error_description_nslcmop
= "{} Detail: {}".format(
4494 stage
[0], error_detail
4496 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4497 nslcmop_id
, stage
[0]
4500 db_nsr_update
["operational-status"] = "failed"
4501 db_nsr_update
["detailed-status"] = (
4502 error_description_nsr
+ " Detail: " + error_detail
4504 db_nslcmop_update
["detailed-status"] = error_detail
4505 nslcmop_operation_state
= "FAILED"
4509 error_description_nsr
= error_description_nslcmop
= None
4510 ns_state
= "NOT_INSTANTIATED"
4511 db_nsr_update
["operational-status"] = "terminated"
4512 db_nsr_update
["detailed-status"] = "Done"
4513 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4514 db_nslcmop_update
["detailed-status"] = "Done"
4515 nslcmop_operation_state
= "COMPLETED"
4518 self
._write
_ns
_status
(
4521 current_operation
="IDLE",
4522 current_operation_id
=None,
4523 error_description
=error_description_nsr
,
4524 error_detail
=error_detail
,
4525 other_update
=db_nsr_update
,
4527 self
._write
_op
_status
(
4530 error_message
=error_description_nslcmop
,
4531 operation_state
=nslcmop_operation_state
,
4532 other_update
=db_nslcmop_update
,
4534 if ns_state
== "NOT_INSTANTIATED":
4538 {"nsr-id-ref": nsr_id
},
4539 {"_admin.nsState": "NOT_INSTANTIATED"},
4541 except DbException
as e
:
4544 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4548 if operation_params
:
4549 autoremove
= operation_params
.get("autoremove", False)
4550 if nslcmop_operation_state
:
4552 await self
.msg
.aiowrite(
4557 "nslcmop_id": nslcmop_id
,
4558 "operationState": nslcmop_operation_state
,
4559 "autoremove": autoremove
,
4563 except Exception as e
:
4565 logging_text
+ "kafka_write notification Exception {}".format(e
)
4568 self
.logger
.debug(logging_text
+ "Exit")
4569 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4571 async def _wait_for_tasks(
4572 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4575 error_detail_list
= []
4577 pending_tasks
= list(created_tasks_info
.keys())
4578 num_tasks
= len(pending_tasks
)
4580 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4581 self
._write
_op
_status
(nslcmop_id
, stage
)
4582 while pending_tasks
:
4584 _timeout
= timeout
+ time_start
- time()
4585 done
, pending_tasks
= await asyncio
.wait(
4586 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4588 num_done
+= len(done
)
4589 if not done
: # Timeout
4590 for task
in pending_tasks
:
4591 new_error
= created_tasks_info
[task
] + ": Timeout"
4592 error_detail_list
.append(new_error
)
4593 error_list
.append(new_error
)
4596 if task
.cancelled():
4599 exc
= task
.exception()
4601 if isinstance(exc
, asyncio
.TimeoutError
):
4603 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4604 error_list
.append(created_tasks_info
[task
])
4605 error_detail_list
.append(new_error
)
4612 ROclient
.ROClientException
,
4618 self
.logger
.error(logging_text
+ new_error
)
4620 exc_traceback
= "".join(
4621 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4625 + created_tasks_info
[task
]
4631 logging_text
+ created_tasks_info
[task
] + ": Done"
4633 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4635 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4636 if nsr_id
: # update also nsr
4641 "errorDescription": "Error at: " + ", ".join(error_list
),
4642 "errorDetail": ". ".join(error_detail_list
),
4645 self
._write
_op
_status
(nslcmop_id
, stage
)
4646 return error_detail_list
4649 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4651 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4652 The default-value is used. If it is between < > it look for a value at instantiation_params
4653 :param primitive_desc: portion of VNFD/NSD that describes primitive
4654 :param params: Params provided by user
4655 :param instantiation_params: Instantiation params provided by user
4656 :return: a dictionary with the calculated params
4658 calculated_params
= {}
4659 for parameter
in primitive_desc
.get("parameter", ()):
4660 param_name
= parameter
["name"]
4661 if param_name
in params
:
4662 calculated_params
[param_name
] = params
[param_name
]
4663 elif "default-value" in parameter
or "value" in parameter
:
4664 if "value" in parameter
:
4665 calculated_params
[param_name
] = parameter
["value"]
4667 calculated_params
[param_name
] = parameter
["default-value"]
4669 isinstance(calculated_params
[param_name
], str)
4670 and calculated_params
[param_name
].startswith("<")
4671 and calculated_params
[param_name
].endswith(">")
4673 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4674 calculated_params
[param_name
] = instantiation_params
[
4675 calculated_params
[param_name
][1:-1]
4679 "Parameter {} needed to execute primitive {} not provided".format(
4680 calculated_params
[param_name
], primitive_desc
["name"]
4685 "Parameter {} needed to execute primitive {} not provided".format(
4686 param_name
, primitive_desc
["name"]
4690 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4691 calculated_params
[param_name
] = yaml
.safe_dump(
4692 calculated_params
[param_name
], default_flow_style
=True, width
=256
4694 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4696 ].startswith("!!yaml "):
4697 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4698 if parameter
.get("data-type") == "INTEGER":
4700 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4701 except ValueError: # error converting string to int
4703 "Parameter {} of primitive {} must be integer".format(
4704 param_name
, primitive_desc
["name"]
4707 elif parameter
.get("data-type") == "BOOLEAN":
4708 calculated_params
[param_name
] = not (
4709 (str(calculated_params
[param_name
])).lower() == "false"
4712 # add always ns_config_info if primitive name is config
4713 if primitive_desc
["name"] == "config":
4714 if "ns_config_info" in instantiation_params
:
4715 calculated_params
["ns_config_info"] = instantiation_params
[
4718 return calculated_params
4720 def _look_for_deployed_vca(
4727 ee_descriptor_id
=None,
4729 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4730 for vca
in deployed_vca
:
4733 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4736 vdu_count_index
is not None
4737 and vdu_count_index
!= vca
["vdu_count_index"]
4740 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4742 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4746 # vca_deployed not found
4748 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4749 " is not deployed".format(
4758 ee_id
= vca
.get("ee_id")
4760 "type", "lxc_proxy_charm"
4761 ) # default value for backward compatibility - proxy charm
4764 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4765 "execution environment".format(
4766 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
4769 return ee_id
, vca_type
4771 async def _ns_execute_primitive(
4777 retries_interval
=30,
4784 if primitive
== "config":
4785 primitive_params
= {"params": primitive_params
}
4787 vca_type
= vca_type
or "lxc_proxy_charm"
4791 output
= await asyncio
.wait_for(
4792 self
.vca_map
[vca_type
].exec_primitive(
4794 primitive_name
=primitive
,
4795 params_dict
=primitive_params
,
4796 progress_timeout
=self
.timeout
.progress_primitive
,
4797 total_timeout
=self
.timeout
.primitive
,
4802 timeout
=timeout
or self
.timeout
.primitive
,
4806 except asyncio
.CancelledError
:
4808 except Exception as e
:
4812 "Error executing action {} on {} -> {}".format(
4817 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
4819 if isinstance(e
, asyncio
.TimeoutError
):
4821 message
="Timed out waiting for action to complete"
4823 return "FAILED", getattr(e
, "message", repr(e
))
4825 return "COMPLETED", output
4827 except (LcmException
, asyncio
.CancelledError
):
4829 except Exception as e
:
4830 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
4832 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
4834 Updating the vca_status with latest juju information in nsrs record
4835 :param: nsr_id: Id of the nsr
4836 :param: nslcmop_id: Id of the nslcmop
4840 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
4841 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4842 vca_id
= self
.get_vca_id({}, db_nsr
)
4843 if db_nsr
["_admin"]["deployed"]["K8s"]:
4844 for _
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
4845 cluster_uuid
, kdu_instance
, cluster_type
= (
4846 k8s
["k8scluster-uuid"],
4847 k8s
["kdu-instance"],
4848 k8s
["k8scluster-type"],
4850 await self
._on
_update
_k
8s
_db
(
4851 cluster_uuid
=cluster_uuid
,
4852 kdu_instance
=kdu_instance
,
4853 filter={"_id": nsr_id
},
4855 cluster_type
=cluster_type
,
4858 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
4859 table
, filter = "nsrs", {"_id": nsr_id
}
4860 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
4861 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
4863 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
4864 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
4866 async def action(self
, nsr_id
, nslcmop_id
):
4867 # Try to lock HA task here
4868 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4869 if not task_is_locked_by_me
:
4872 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
4873 self
.logger
.debug(logging_text
+ "Enter")
4874 # get all needed from database
4878 db_nslcmop_update
= {}
4879 nslcmop_operation_state
= None
4880 error_description_nslcmop
= None
4884 # wait for any previous tasks in process
4885 step
= "Waiting for previous operations to terminate"
4886 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4888 self
._write
_ns
_status
(
4891 current_operation
="RUNNING ACTION",
4892 current_operation_id
=nslcmop_id
,
4895 step
= "Getting information from database"
4896 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4897 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4898 if db_nslcmop
["operationParams"].get("primitive_params"):
4899 db_nslcmop
["operationParams"]["primitive_params"] = json
.loads(
4900 db_nslcmop
["operationParams"]["primitive_params"]
4903 nsr_deployed
= db_nsr
["_admin"].get("deployed")
4904 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
4905 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
4906 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
4907 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
4908 primitive
= db_nslcmop
["operationParams"]["primitive"]
4909 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
4910 timeout_ns_action
= db_nslcmop
["operationParams"].get(
4911 "timeout_ns_action", self
.timeout
.primitive
4915 step
= "Getting vnfr from database"
4916 db_vnfr
= self
.db
.get_one(
4917 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
4919 if db_vnfr
.get("kdur"):
4921 for kdur
in db_vnfr
["kdur"]:
4922 if kdur
.get("additionalParams"):
4923 kdur
["additionalParams"] = json
.loads(
4924 kdur
["additionalParams"]
4926 kdur_list
.append(kdur
)
4927 db_vnfr
["kdur"] = kdur_list
4928 step
= "Getting vnfd from database"
4929 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
4931 # Sync filesystem before running a primitive
4932 self
.fs
.sync(db_vnfr
["vnfd-id"])
4934 step
= "Getting nsd from database"
4935 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
4937 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
4938 # for backward compatibility
4939 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
4940 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
4941 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
4942 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4944 # look for primitive
4945 config_primitive_desc
= descriptor_configuration
= None
4947 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
4949 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
4951 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
4953 descriptor_configuration
= db_nsd
.get("ns-configuration")
4955 if descriptor_configuration
and descriptor_configuration
.get(
4958 for config_primitive
in descriptor_configuration
["config-primitive"]:
4959 if config_primitive
["name"] == primitive
:
4960 config_primitive_desc
= config_primitive
4963 if not config_primitive_desc
:
4964 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
4966 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
4970 primitive_name
= primitive
4971 ee_descriptor_id
= None
4973 primitive_name
= config_primitive_desc
.get(
4974 "execution-environment-primitive", primitive
4976 ee_descriptor_id
= config_primitive_desc
.get(
4977 "execution-environment-ref"
4983 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
4985 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
4988 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
4990 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
4992 desc_params
= parse_yaml_strings(
4993 db_vnfr
.get("additionalParamsForVnf")
4996 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
4997 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
4998 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
5000 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
5001 actions
.add(primitive
["name"])
5002 for primitive
in kdu_configuration
.get("config-primitive", []):
5003 actions
.add(primitive
["name"])
5005 nsr_deployed
["K8s"],
5006 lambda kdu
: kdu_name
== kdu
["kdu-name"]
5007 and kdu
["member-vnf-index"] == vnf_index
,
5011 if primitive_name
in actions
5012 and kdu
["k8scluster-type"] not in ("helm-chart", "helm-chart-v3")
5016 # TODO check if ns is in a proper status
5018 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
5020 # kdur and desc_params already set from before
5021 if primitive_params
:
5022 desc_params
.update(primitive_params
)
5023 # TODO Check if we will need something at vnf level
5024 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
5026 kdu_name
== kdu
["kdu-name"]
5027 and kdu
["member-vnf-index"] == vnf_index
5032 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
5035 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
5036 msg
= "unknown k8scluster-type '{}'".format(
5037 kdu
.get("k8scluster-type")
5039 raise LcmException(msg
)
5042 "collection": "nsrs",
5043 "filter": {"_id": nsr_id
},
5044 "path": "_admin.deployed.K8s.{}".format(index
),
5048 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
5050 step
= "Executing kdu {}".format(primitive_name
)
5051 if primitive_name
== "upgrade":
5052 if desc_params
.get("kdu_model"):
5053 kdu_model
= desc_params
.get("kdu_model")
5054 del desc_params
["kdu_model"]
5056 kdu_model
= kdu
.get("kdu-model")
5057 parts
= kdu_model
.split(sep
=":")
5059 kdu_model
= parts
[0]
5060 if desc_params
.get("kdu_atomic_upgrade"):
5061 atomic_upgrade
= desc_params
.get(
5062 "kdu_atomic_upgrade"
5063 ).lower() in ("yes", "true", "1")
5064 del desc_params
["kdu_atomic_upgrade"]
5066 atomic_upgrade
= True
5068 detailed_status
= await asyncio
.wait_for(
5069 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
5070 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5071 kdu_instance
=kdu
.get("kdu-instance"),
5072 atomic
=atomic_upgrade
,
5073 kdu_model
=kdu_model
,
5076 timeout
=timeout_ns_action
,
5078 timeout
=timeout_ns_action
+ 10,
5081 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
5083 elif primitive_name
== "rollback":
5084 detailed_status
= await asyncio
.wait_for(
5085 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
5086 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5087 kdu_instance
=kdu
.get("kdu-instance"),
5090 timeout
=timeout_ns_action
,
5092 elif primitive_name
== "status":
5093 detailed_status
= await asyncio
.wait_for(
5094 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
5095 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5096 kdu_instance
=kdu
.get("kdu-instance"),
5099 timeout
=timeout_ns_action
,
5102 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
5103 kdu
["kdu-name"], nsr_id
5105 params
= self
._map
_primitive
_params
(
5106 config_primitive_desc
, primitive_params
, desc_params
5109 detailed_status
= await asyncio
.wait_for(
5110 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
5111 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5112 kdu_instance
=kdu_instance
,
5113 primitive_name
=primitive_name
,
5116 timeout
=timeout_ns_action
,
5119 timeout
=timeout_ns_action
,
5123 nslcmop_operation_state
= "COMPLETED"
5125 detailed_status
= ""
5126 nslcmop_operation_state
= "FAILED"
5128 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5129 nsr_deployed
["VCA"],
5130 member_vnf_index
=vnf_index
,
5132 vdu_count_index
=vdu_count_index
,
5133 ee_descriptor_id
=ee_descriptor_id
,
5135 for vca_index
, vca_deployed
in enumerate(
5136 db_nsr
["_admin"]["deployed"]["VCA"]
5138 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5140 "collection": "nsrs",
5141 "filter": {"_id": nsr_id
},
5142 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
5146 nslcmop_operation_state
,
5148 ) = await self
._ns
_execute
_primitive
(
5150 primitive
=primitive_name
,
5151 primitive_params
=self
._map
_primitive
_params
(
5152 config_primitive_desc
, primitive_params
, desc_params
5154 timeout
=timeout_ns_action
,
5160 db_nslcmop_update
["detailed-status"] = detailed_status
5161 error_description_nslcmop
= (
5162 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
5166 + "Done with result {} {}".format(
5167 nslcmop_operation_state
, detailed_status
5170 return # database update is called inside finally
5172 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5173 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5175 except asyncio
.CancelledError
:
5177 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5179 exc
= "Operation was cancelled"
5180 except asyncio
.TimeoutError
:
5181 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5183 except Exception as e
:
5184 exc
= traceback
.format_exc()
5185 self
.logger
.critical(
5186 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5195 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5196 nslcmop_operation_state
= "FAILED"
5198 self
._write
_ns
_status
(
5202 ], # TODO check if degraded. For the moment use previous status
5203 current_operation
="IDLE",
5204 current_operation_id
=None,
5205 # error_description=error_description_nsr,
5206 # error_detail=error_detail,
5207 other_update
=db_nsr_update
,
5210 self
._write
_op
_status
(
5213 error_message
=error_description_nslcmop
,
5214 operation_state
=nslcmop_operation_state
,
5215 other_update
=db_nslcmop_update
,
5218 if nslcmop_operation_state
:
5220 await self
.msg
.aiowrite(
5225 "nslcmop_id": nslcmop_id
,
5226 "operationState": nslcmop_operation_state
,
5230 except Exception as e
:
5232 logging_text
+ "kafka_write notification Exception {}".format(e
)
5234 self
.logger
.debug(logging_text
+ "Exit")
5235 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5236 return nslcmop_operation_state
, detailed_status
5238 async def terminate_vdus(
5239 self
, db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
5241 """This method terminates VDUs
5244 db_vnfr: VNF instance record
5245 member_vnf_index: VNF index to identify the VDUs to be removed
5246 db_nsr: NS instance record
5247 update_db_nslcmops: Nslcmop update record
5249 vca_scaling_info
= []
5250 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5251 scaling_info
["scaling_direction"] = "IN"
5252 scaling_info
["vdu-delete"] = {}
5253 scaling_info
["kdu-delete"] = {}
5254 db_vdur
= db_vnfr
.get("vdur")
5255 vdur_list
= copy(db_vdur
)
5257 for index
, vdu
in enumerate(vdur_list
):
5258 vca_scaling_info
.append(
5260 "osm_vdu_id": vdu
["vdu-id-ref"],
5261 "member-vnf-index": member_vnf_index
,
5263 "vdu_index": count_index
,
5266 scaling_info
["vdu-delete"][vdu
["vdu-id-ref"]] = count_index
5267 scaling_info
["vdu"].append(
5269 "name": vdu
.get("name") or vdu
.get("vdu-name"),
5270 "vdu_id": vdu
["vdu-id-ref"],
5274 for interface
in vdu
["interfaces"]:
5275 scaling_info
["vdu"][index
]["interface"].append(
5277 "name": interface
["name"],
5278 "ip_address": interface
["ip-address"],
5279 "mac_address": interface
.get("mac-address"),
5282 self
.logger
.info("NS update scaling info{}".format(scaling_info
))
5283 stage
[2] = "Terminating VDUs"
5284 if scaling_info
.get("vdu-delete"):
5285 # scale_process = "RO"
5286 if self
.ro_config
.ng
:
5287 await self
._scale
_ng
_ro
(
5296 async def remove_vnf(self
, nsr_id
, nslcmop_id
, vnf_instance_id
):
5297 """This method is to Remove VNF instances from NS.
5300 nsr_id: NS instance id
5301 nslcmop_id: nslcmop id of update
5302 vnf_instance_id: id of the VNF instance to be removed
5305 result: (str, str) COMPLETED/FAILED, details
5309 logging_text
= "Task ns={} update ".format(nsr_id
)
5310 check_vnfr_count
= len(self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}))
5311 self
.logger
.info("check_vnfr_count {}".format(check_vnfr_count
))
5312 if check_vnfr_count
> 1:
5313 stage
= ["", "", ""]
5314 step
= "Getting nslcmop from database"
5316 step
+ " after having waited for previous tasks to be completed"
5318 # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5319 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5320 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5321 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5322 """ db_vnfr = self.db.get_one(
5323 "vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id}) """
5325 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5326 await self
.terminate_vdus(
5335 constituent_vnfr
= db_nsr
.get("constituent-vnfr-ref")
5336 constituent_vnfr
.remove(db_vnfr
.get("_id"))
5337 db_nsr_update
["constituent-vnfr-ref"] = db_nsr
.get(
5338 "constituent-vnfr-ref"
5340 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5341 self
.db
.del_one("vnfrs", {"_id": db_vnfr
.get("_id")})
5342 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5343 return "COMPLETED", "Done"
5345 step
= "Terminate VNF Failed with"
5347 "{} Cannot terminate the last VNF in this NS.".format(
5351 except (LcmException
, asyncio
.CancelledError
):
5353 except Exception as e
:
5354 self
.logger
.debug("Error removing VNF {}".format(e
))
5355 return "FAILED", "Error removing VNF {}".format(e
)
5357 async def _ns_redeploy_vnf(
5365 """This method updates and redeploys VNF instances
5368 nsr_id: NS instance id
5369 nslcmop_id: nslcmop id
5370 db_vnfd: VNF descriptor
5371 db_vnfr: VNF instance record
5372 db_nsr: NS instance record
5375 result: (str, str) COMPLETED/FAILED, details
5379 stage
= ["", "", ""]
5380 logging_text
= "Task ns={} update ".format(nsr_id
)
5381 latest_vnfd_revision
= db_vnfd
["_admin"].get("revision")
5382 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5384 # Terminate old VNF resources
5385 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5386 await self
.terminate_vdus(
5395 # old_vnfd_id = db_vnfr["vnfd-id"]
5396 # new_db_vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
5397 new_db_vnfd
= db_vnfd
5398 # new_vnfd_ref = new_db_vnfd["id"]
5399 # new_vnfd_id = vnfd_id
5403 for cp
in new_db_vnfd
.get("ext-cpd", ()):
5405 "name": cp
.get("id"),
5406 "connection-point-id": cp
.get("int-cpd", {}).get("cpd"),
5407 "connection-point-vdu-id": cp
.get("int-cpd", {}).get("vdu-id"),
5410 new_vnfr_cp
.append(vnf_cp
)
5411 new_vdur
= update_db_nslcmops
["operationParams"]["newVdur"]
5412 # new_vdur = self._create_vdur_descriptor_from_vnfd(db_nsd, db_vnfd, old_db_vnfd, vnfd_id, db_nsr, member_vnf_index)
5413 # new_vnfr_update = {"vnfd-ref": new_vnfd_ref, "vnfd-id": new_vnfd_id, "connection-point": new_vnfr_cp, "vdur": new_vdur, "ip-address": ""}
5415 "revision": latest_vnfd_revision
,
5416 "connection-point": new_vnfr_cp
,
5420 self
.update_db_2("vnfrs", db_vnfr
["_id"], new_vnfr_update
)
5421 updated_db_vnfr
= self
.db
.get_one(
5423 {"member-vnf-index-ref": member_vnf_index
, "nsr-id-ref": nsr_id
},
5426 # Instantiate new VNF resources
5427 # update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5428 vca_scaling_info
= []
5429 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5430 scaling_info
["scaling_direction"] = "OUT"
5431 scaling_info
["vdu-create"] = {}
5432 scaling_info
["kdu-create"] = {}
5433 vdud_instantiate_list
= db_vnfd
["vdu"]
5434 for index
, vdud
in enumerate(vdud_instantiate_list
):
5435 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(vdud
, db_vnfd
)
5437 additional_params
= (
5438 self
._get
_vdu
_additional
_params
(updated_db_vnfr
, vdud
["id"])
5441 cloud_init_list
= []
5443 # TODO Information of its own ip is not available because db_vnfr is not updated.
5444 additional_params
["OSM"] = get_osm_params(
5445 updated_db_vnfr
, vdud
["id"], 1
5447 cloud_init_list
.append(
5448 self
._parse
_cloud
_init
(
5455 vca_scaling_info
.append(
5457 "osm_vdu_id": vdud
["id"],
5458 "member-vnf-index": member_vnf_index
,
5460 "vdu_index": count_index
,
5463 scaling_info
["vdu-create"][vdud
["id"]] = count_index
5464 if self
.ro_config
.ng
:
5466 "New Resources to be deployed: {}".format(scaling_info
)
5468 await self
._scale
_ng
_ro
(
5476 return "COMPLETED", "Done"
5477 except (LcmException
, asyncio
.CancelledError
):
5479 except Exception as e
:
5480 self
.logger
.debug("Error updating VNF {}".format(e
))
5481 return "FAILED", "Error updating VNF {}".format(e
)
5483 async def _ns_charm_upgrade(
5489 timeout
: float = None,
5491 """This method upgrade charms in VNF instances
5494 ee_id: Execution environment id
5495 path: Local path to the charm
5497 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
5498 timeout: (Float) Timeout for the ns update operation
5501 result: (str, str) COMPLETED/FAILED, details
5504 charm_type
= charm_type
or "lxc_proxy_charm"
5505 output
= await self
.vca_map
[charm_type
].upgrade_charm(
5509 charm_type
=charm_type
,
5510 timeout
=timeout
or self
.timeout
.ns_update
,
5514 return "COMPLETED", output
5516 except (LcmException
, asyncio
.CancelledError
):
5519 except Exception as e
:
5521 self
.logger
.debug("Error upgrading charm {}".format(path
))
5523 return "FAILED", "Error upgrading charm {}: {}".format(path
, e
)
5525 async def update(self
, nsr_id
, nslcmop_id
):
5526 """Update NS according to different update types
5528 This method performs upgrade of VNF instances then updates the revision
5529 number in VNF record
5532 nsr_id: Network service will be updated
5533 nslcmop_id: ns lcm operation id
5536 It may raise DbException, LcmException, N2VCException, K8sException
5539 # Try to lock HA task here
5540 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5541 if not task_is_locked_by_me
:
5544 logging_text
= "Task ns={} update={} ".format(nsr_id
, nslcmop_id
)
5545 self
.logger
.debug(logging_text
+ "Enter")
5547 # Set the required variables to be filled up later
5549 db_nslcmop_update
= {}
5551 nslcmop_operation_state
= None
5553 error_description_nslcmop
= ""
5555 change_type
= "updated"
5556 detailed_status
= ""
5557 member_vnf_index
= None
5560 # wait for any previous tasks in process
5561 step
= "Waiting for previous operations to terminate"
5562 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5563 self
._write
_ns
_status
(
5566 current_operation
="UPDATING",
5567 current_operation_id
=nslcmop_id
,
5570 step
= "Getting nslcmop from database"
5571 db_nslcmop
= self
.db
.get_one(
5572 "nslcmops", {"_id": nslcmop_id
}, fail_on_empty
=False
5574 update_type
= db_nslcmop
["operationParams"]["updateType"]
5576 step
= "Getting nsr from database"
5577 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5578 old_operational_status
= db_nsr
["operational-status"]
5579 db_nsr_update
["operational-status"] = "updating"
5580 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5581 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5583 if update_type
== "CHANGE_VNFPKG":
5585 # Get the input parameters given through update request
5586 vnf_instance_id
= db_nslcmop
["operationParams"][
5587 "changeVnfPackageData"
5588 ].get("vnfInstanceId")
5590 vnfd_id
= db_nslcmop
["operationParams"]["changeVnfPackageData"].get(
5593 timeout_seconds
= db_nslcmop
["operationParams"].get("timeout_ns_update")
5595 step
= "Getting vnfr from database"
5596 db_vnfr
= self
.db
.get_one(
5597 "vnfrs", {"_id": vnf_instance_id
}, fail_on_empty
=False
5600 step
= "Getting vnfds from database"
5602 latest_vnfd
= self
.db
.get_one(
5603 "vnfds", {"_id": vnfd_id
}, fail_on_empty
=False
5605 latest_vnfd_revision
= latest_vnfd
["_admin"].get("revision")
5608 current_vnf_revision
= db_vnfr
.get("revision", 1)
5609 current_vnfd
= self
.db
.get_one(
5611 {"_id": vnfd_id
+ ":" + str(current_vnf_revision
)},
5612 fail_on_empty
=False,
5614 # Charm artifact paths will be filled up later
5616 current_charm_artifact_path
,
5617 target_charm_artifact_path
,
5618 charm_artifact_paths
,
5620 ) = ([], [], [], [])
5622 step
= "Checking if revision has changed in VNFD"
5623 if current_vnf_revision
!= latest_vnfd_revision
:
5625 change_type
= "policy_updated"
5627 # There is new revision of VNFD, update operation is required
5628 current_vnfd_path
= vnfd_id
+ ":" + str(current_vnf_revision
)
5629 latest_vnfd_path
= vnfd_id
+ ":" + str(latest_vnfd_revision
)
5631 step
= "Removing the VNFD packages if they exist in the local path"
5632 shutil
.rmtree(self
.fs
.path
+ current_vnfd_path
, ignore_errors
=True)
5633 shutil
.rmtree(self
.fs
.path
+ latest_vnfd_path
, ignore_errors
=True)
5635 step
= "Get the VNFD packages from FSMongo"
5636 self
.fs
.sync(from_path
=latest_vnfd_path
)
5637 self
.fs
.sync(from_path
=current_vnfd_path
)
5640 "Get the charm-type, charm-id, ee-id if there is deployed VCA"
5642 current_base_folder
= current_vnfd
["_admin"]["storage"]
5643 latest_base_folder
= latest_vnfd
["_admin"]["storage"]
5645 for vca_index
, vca_deployed
in enumerate(
5646 get_iterable(nsr_deployed
, "VCA")
5648 vnf_index
= db_vnfr
.get("member-vnf-index-ref")
5650 # Getting charm-id and charm-type
5651 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5652 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5653 vca_type
= vca_deployed
.get("type")
5654 vdu_count_index
= vca_deployed
.get("vdu_count_index")
5657 ee_id
= vca_deployed
.get("ee_id")
5659 step
= "Getting descriptor config"
5660 if current_vnfd
.get("kdu"):
5662 search_key
= "kdu_name"
5664 search_key
= "vnfd_id"
5666 entity_id
= vca_deployed
.get(search_key
)
5668 descriptor_config
= get_configuration(
5669 current_vnfd
, entity_id
5672 if "execution-environment-list" in descriptor_config
:
5673 ee_list
= descriptor_config
.get(
5674 "execution-environment-list", []
5679 # There could be several charm used in the same VNF
5680 for ee_item
in ee_list
:
5681 if ee_item
.get("juju"):
5683 step
= "Getting charm name"
5684 charm_name
= ee_item
["juju"].get("charm")
5686 step
= "Setting Charm artifact paths"
5687 current_charm_artifact_path
.append(
5688 get_charm_artifact_path(
5689 current_base_folder
,
5692 current_vnf_revision
,
5695 target_charm_artifact_path
.append(
5696 get_charm_artifact_path(
5700 latest_vnfd_revision
,
5703 elif ee_item
.get("helm-chart"):
5704 # add chart to list and all parameters
5705 step
= "Getting helm chart name"
5706 chart_name
= ee_item
.get("helm-chart")
5708 ee_item
.get("helm-version")
5709 and ee_item
.get("helm-version") == "v2"
5713 vca_type
= "helm-v3"
5714 step
= "Setting Helm chart artifact paths"
5716 helm_artifacts
.append(
5718 "current_artifact_path": get_charm_artifact_path(
5719 current_base_folder
,
5722 current_vnf_revision
,
5724 "target_artifact_path": get_charm_artifact_path(
5728 latest_vnfd_revision
,
5731 "vca_index": vca_index
,
5732 "vdu_index": vdu_count_index
,
5736 charm_artifact_paths
= zip(
5737 current_charm_artifact_path
, target_charm_artifact_path
5740 step
= "Checking if software version has changed in VNFD"
5741 if find_software_version(current_vnfd
) != find_software_version(
5745 step
= "Checking if existing VNF has charm"
5746 for current_charm_path
, target_charm_path
in list(
5747 charm_artifact_paths
5749 if current_charm_path
:
5751 "Software version change is not supported as VNF instance {} has charm.".format(
5756 # There is no change in the charm package, then redeploy the VNF
5757 # based on new descriptor
5758 step
= "Redeploying VNF"
5759 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5760 (result
, detailed_status
) = await self
._ns
_redeploy
_vnf
(
5761 nsr_id
, nslcmop_id
, latest_vnfd
, db_vnfr
, db_nsr
5763 if result
== "FAILED":
5764 nslcmop_operation_state
= result
5765 error_description_nslcmop
= detailed_status
5766 db_nslcmop_update
["detailed-status"] = detailed_status
5769 + " step {} Done with result {} {}".format(
5770 step
, nslcmop_operation_state
, detailed_status
5775 step
= "Checking if any charm package has changed or not"
5776 for current_charm_path
, target_charm_path
in list(
5777 charm_artifact_paths
5781 and target_charm_path
5782 and self
.check_charm_hash_changed(
5783 current_charm_path
, target_charm_path
5787 step
= "Checking whether VNF uses juju bundle"
5788 if check_juju_bundle_existence(current_vnfd
):
5791 "Charm upgrade is not supported for the instance which"
5792 " uses juju-bundle: {}".format(
5793 check_juju_bundle_existence(current_vnfd
)
5797 step
= "Upgrading Charm"
5801 ) = await self
._ns
_charm
_upgrade
(
5804 charm_type
=vca_type
,
5805 path
=self
.fs
.path
+ target_charm_path
,
5806 timeout
=timeout_seconds
,
5809 if result
== "FAILED":
5810 nslcmop_operation_state
= result
5811 error_description_nslcmop
= detailed_status
5813 db_nslcmop_update
["detailed-status"] = detailed_status
5816 + " step {} Done with result {} {}".format(
5817 step
, nslcmop_operation_state
, detailed_status
5821 step
= "Updating policies"
5822 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5823 result
= "COMPLETED"
5824 detailed_status
= "Done"
5825 db_nslcmop_update
["detailed-status"] = "Done"
5828 for item
in helm_artifacts
:
5830 item
["current_artifact_path"]
5831 and item
["target_artifact_path"]
5832 and self
.check_charm_hash_changed(
5833 item
["current_artifact_path"],
5834 item
["target_artifact_path"],
5838 db_update_entry
= "_admin.deployed.VCA.{}.".format(
5841 vnfr_id
= db_vnfr
["_id"]
5842 osm_config
= {"osm": {"ns_id": nsr_id
, "vnf_id": vnfr_id
}}
5844 "collection": "nsrs",
5845 "filter": {"_id": nsr_id
},
5846 "path": db_update_entry
,
5848 vca_type
, namespace
, helm_id
= get_ee_id_parts(item
["ee_id"])
5849 await self
.vca_map
[vca_type
].upgrade_execution_environment(
5850 namespace
=namespace
,
5854 artifact_path
=item
["target_artifact_path"],
5857 vnf_id
= db_vnfr
.get("vnfd-ref")
5858 config_descriptor
= get_configuration(latest_vnfd
, vnf_id
)
5859 self
.logger
.debug("get ssh key block")
5863 ("config-access", "ssh-access", "required"),
5865 # Needed to inject a ssh key
5868 ("config-access", "ssh-access", "default-user"),
5871 "Install configuration Software, getting public ssh key"
5873 pub_key
= await self
.vca_map
[
5875 ].get_ee_ssh_public__key(
5876 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
5880 "Insert public key into VM user={} ssh_key={}".format(
5884 self
.logger
.debug(logging_text
+ step
)
5886 # wait for RO (ip-address) Insert pub_key into VM
5887 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
5897 initial_config_primitive_list
= config_descriptor
.get(
5898 "initial-config-primitive"
5900 config_primitive
= next(
5903 for p
in initial_config_primitive_list
5904 if p
["name"] == "config"
5908 if not config_primitive
:
5911 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
5913 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
5914 if db_vnfr
.get("additionalParamsForVnf"):
5915 deploy_params
.update(
5917 db_vnfr
["additionalParamsForVnf"].copy()
5920 primitive_params_
= self
._map
_primitive
_params
(
5921 config_primitive
, {}, deploy_params
5924 step
= "execute primitive '{}' params '{}'".format(
5925 config_primitive
["name"], primitive_params_
5927 self
.logger
.debug(logging_text
+ step
)
5928 await self
.vca_map
[vca_type
].exec_primitive(
5930 primitive_name
=config_primitive
["name"],
5931 params_dict
=primitive_params_
,
5937 step
= "Updating policies"
5938 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5939 detailed_status
= "Done"
5940 db_nslcmop_update
["detailed-status"] = "Done"
5942 # If nslcmop_operation_state is None, so any operation is not failed.
5943 if not nslcmop_operation_state
:
5944 nslcmop_operation_state
= "COMPLETED"
5946 # If update CHANGE_VNFPKG nslcmop_operation is successful
5947 # vnf revision need to be updated
5948 vnfr_update
["revision"] = latest_vnfd_revision
5949 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
5953 + " task Done with result {} {}".format(
5954 nslcmop_operation_state
, detailed_status
5957 elif update_type
== "REMOVE_VNF":
5958 # This part is included in https://osm.etsi.org/gerrit/11876
5959 vnf_instance_id
= db_nslcmop
["operationParams"]["removeVnfInstanceId"]
5960 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5961 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5962 step
= "Removing VNF"
5963 (result
, detailed_status
) = await self
.remove_vnf(
5964 nsr_id
, nslcmop_id
, vnf_instance_id
5966 if result
== "FAILED":
5967 nslcmop_operation_state
= result
5968 error_description_nslcmop
= detailed_status
5969 db_nslcmop_update
["detailed-status"] = detailed_status
5970 change_type
= "vnf_terminated"
5971 if not nslcmop_operation_state
:
5972 nslcmop_operation_state
= "COMPLETED"
5975 + " task Done with result {} {}".format(
5976 nslcmop_operation_state
, detailed_status
5980 elif update_type
== "OPERATE_VNF":
5981 vnf_id
= db_nslcmop
["operationParams"]["operateVnfData"][
5984 operation_type
= db_nslcmop
["operationParams"]["operateVnfData"][
5987 additional_param
= db_nslcmop
["operationParams"]["operateVnfData"][
5990 (result
, detailed_status
) = await self
.rebuild_start_stop(
5991 nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
5993 if result
== "FAILED":
5994 nslcmop_operation_state
= result
5995 error_description_nslcmop
= detailed_status
5996 db_nslcmop_update
["detailed-status"] = detailed_status
5997 if not nslcmop_operation_state
:
5998 nslcmop_operation_state
= "COMPLETED"
6001 + " task Done with result {} {}".format(
6002 nslcmop_operation_state
, detailed_status
6006 # If nslcmop_operation_state is None, so any operation is not failed.
6007 # All operations are executed in overall.
6008 if not nslcmop_operation_state
:
6009 nslcmop_operation_state
= "COMPLETED"
6010 db_nsr_update
["operational-status"] = old_operational_status
6012 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
6013 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
6015 except asyncio
.CancelledError
:
6017 logging_text
+ "Cancelled Exception while '{}'".format(step
)
6019 exc
= "Operation was cancelled"
6020 except asyncio
.TimeoutError
:
6021 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
6023 except Exception as e
:
6024 exc
= traceback
.format_exc()
6025 self
.logger
.critical(
6026 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6035 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
6036 nslcmop_operation_state
= "FAILED"
6037 db_nsr_update
["operational-status"] = old_operational_status
6039 self
._write
_ns
_status
(
6041 ns_state
=db_nsr
["nsState"],
6042 current_operation
="IDLE",
6043 current_operation_id
=None,
6044 other_update
=db_nsr_update
,
6047 self
._write
_op
_status
(
6050 error_message
=error_description_nslcmop
,
6051 operation_state
=nslcmop_operation_state
,
6052 other_update
=db_nslcmop_update
,
6055 if nslcmop_operation_state
:
6059 "nslcmop_id": nslcmop_id
,
6060 "operationState": nslcmop_operation_state
,
6063 change_type
in ("vnf_terminated", "policy_updated")
6064 and member_vnf_index
6066 msg
.update({"vnf_member_index": member_vnf_index
})
6067 await self
.msg
.aiowrite("ns", change_type
, msg
, loop
=self
.loop
)
6068 except Exception as e
:
6070 logging_text
+ "kafka_write notification Exception {}".format(e
)
6072 self
.logger
.debug(logging_text
+ "Exit")
6073 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_update")
6074 return nslcmop_operation_state
, detailed_status
6076 async def scale(self
, nsr_id
, nslcmop_id
):
6077 # Try to lock HA task here
6078 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
6079 if not task_is_locked_by_me
:
6082 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
6083 stage
= ["", "", ""]
6084 tasks_dict_info
= {}
6085 # ^ stage, step, VIM progress
6086 self
.logger
.debug(logging_text
+ "Enter")
6087 # get all needed from database
6089 db_nslcmop_update
= {}
6092 # in case of error, indicates what part of scale was failed to put nsr at error status
6093 scale_process
= None
6094 old_operational_status
= ""
6095 old_config_status
= ""
6098 # wait for any previous tasks in process
6099 step
= "Waiting for previous operations to terminate"
6100 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
6101 self
._write
_ns
_status
(
6104 current_operation
="SCALING",
6105 current_operation_id
=nslcmop_id
,
6108 step
= "Getting nslcmop from database"
6110 step
+ " after having waited for previous tasks to be completed"
6112 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
6114 step
= "Getting nsr from database"
6115 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
6116 old_operational_status
= db_nsr
["operational-status"]
6117 old_config_status
= db_nsr
["config-status"]
6119 step
= "Parsing scaling parameters"
6120 db_nsr_update
["operational-status"] = "scaling"
6121 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6122 nsr_deployed
= db_nsr
["_admin"].get("deployed")
6124 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
6126 ]["member-vnf-index"]
6127 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
6129 ]["scaling-group-descriptor"]
6130 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
6131 # for backward compatibility
6132 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
6133 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
6134 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
6135 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6137 step
= "Getting vnfr from database"
6138 db_vnfr
= self
.db
.get_one(
6139 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
6142 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
6144 step
= "Getting vnfd from database"
6145 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
6147 base_folder
= db_vnfd
["_admin"]["storage"]
6149 step
= "Getting scaling-group-descriptor"
6150 scaling_descriptor
= find_in_list(
6151 get_scaling_aspect(db_vnfd
),
6152 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
6154 if not scaling_descriptor
:
6156 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
6157 "at vnfd:scaling-group-descriptor".format(scaling_group
)
6160 step
= "Sending scale order to VIM"
6161 # TODO check if ns is in a proper status
6163 if not db_nsr
["_admin"].get("scaling-group"):
6168 "_admin.scaling-group": [
6169 {"name": scaling_group
, "nb-scale-op": 0}
6173 admin_scale_index
= 0
6175 for admin_scale_index
, admin_scale_info
in enumerate(
6176 db_nsr
["_admin"]["scaling-group"]
6178 if admin_scale_info
["name"] == scaling_group
:
6179 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
6181 else: # not found, set index one plus last element and add new entry with the name
6182 admin_scale_index
+= 1
6184 "_admin.scaling-group.{}.name".format(admin_scale_index
)
6187 vca_scaling_info
= []
6188 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
6189 if scaling_type
== "SCALE_OUT":
6190 if "aspect-delta-details" not in scaling_descriptor
:
6192 "Aspect delta details not fount in scaling descriptor {}".format(
6193 scaling_descriptor
["name"]
6196 # count if max-instance-count is reached
6197 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6199 scaling_info
["scaling_direction"] = "OUT"
6200 scaling_info
["vdu-create"] = {}
6201 scaling_info
["kdu-create"] = {}
6202 for delta
in deltas
:
6203 for vdu_delta
in delta
.get("vdu-delta", {}):
6204 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
6205 # vdu_index also provides the number of instance of the targeted vdu
6206 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6207 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
6211 additional_params
= (
6212 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
6215 cloud_init_list
= []
6217 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6218 max_instance_count
= 10
6219 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
6220 max_instance_count
= vdu_profile
.get(
6221 "max-number-of-instances", 10
6224 default_instance_num
= get_number_of_instances(
6227 instances_number
= vdu_delta
.get("number-of-instances", 1)
6228 nb_scale_op
+= instances_number
6230 new_instance_count
= nb_scale_op
+ default_instance_num
6231 # Control if new count is over max and vdu count is less than max.
6232 # Then assign new instance count
6233 if new_instance_count
> max_instance_count
> vdu_count
:
6234 instances_number
= new_instance_count
- max_instance_count
6236 instances_number
= instances_number
6238 if new_instance_count
> max_instance_count
:
6240 "reached the limit of {} (max-instance-count) "
6241 "scaling-out operations for the "
6242 "scaling-group-descriptor '{}'".format(
6243 nb_scale_op
, scaling_group
6246 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6248 # TODO Information of its own ip is not available because db_vnfr is not updated.
6249 additional_params
["OSM"] = get_osm_params(
6250 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
6252 cloud_init_list
.append(
6253 self
._parse
_cloud
_init
(
6260 vca_scaling_info
.append(
6262 "osm_vdu_id": vdu_delta
["id"],
6263 "member-vnf-index": vnf_index
,
6265 "vdu_index": vdu_index
+ x
,
6268 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
6269 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6270 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6271 kdu_name
= kdu_profile
["kdu-name"]
6272 resource_name
= kdu_profile
.get("resource-name", "")
6274 # Might have different kdus in the same delta
6275 # Should have list for each kdu
6276 if not scaling_info
["kdu-create"].get(kdu_name
, None):
6277 scaling_info
["kdu-create"][kdu_name
] = []
6279 kdur
= get_kdur(db_vnfr
, kdu_name
)
6280 if kdur
.get("helm-chart"):
6281 k8s_cluster_type
= "helm-chart-v3"
6282 self
.logger
.debug("kdur: {}".format(kdur
))
6284 kdur
.get("helm-version")
6285 and kdur
.get("helm-version") == "v2"
6287 k8s_cluster_type
= "helm-chart"
6288 elif kdur
.get("juju-bundle"):
6289 k8s_cluster_type
= "juju-bundle"
6292 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6293 "juju-bundle. Maybe an old NBI version is running".format(
6294 db_vnfr
["member-vnf-index-ref"], kdu_name
6298 max_instance_count
= 10
6299 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
6300 max_instance_count
= kdu_profile
.get(
6301 "max-number-of-instances", 10
6304 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
6305 deployed_kdu
, _
= get_deployed_kdu(
6306 nsr_deployed
, kdu_name
, vnf_index
6308 if deployed_kdu
is None:
6310 "KDU '{}' for vnf '{}' not deployed".format(
6314 kdu_instance
= deployed_kdu
.get("kdu-instance")
6315 instance_num
= await self
.k8scluster_map
[
6321 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6322 kdu_model
=deployed_kdu
.get("kdu-model"),
6324 kdu_replica_count
= instance_num
+ kdu_delta
.get(
6325 "number-of-instances", 1
6328 # Control if new count is over max and instance_num is less than max.
6329 # Then assign max instance number to kdu replica count
6330 if kdu_replica_count
> max_instance_count
> instance_num
:
6331 kdu_replica_count
= max_instance_count
6332 if kdu_replica_count
> max_instance_count
:
6334 "reached the limit of {} (max-instance-count) "
6335 "scaling-out operations for the "
6336 "scaling-group-descriptor '{}'".format(
6337 instance_num
, scaling_group
6341 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6342 vca_scaling_info
.append(
6344 "osm_kdu_id": kdu_name
,
6345 "member-vnf-index": vnf_index
,
6347 "kdu_index": instance_num
+ x
- 1,
6350 scaling_info
["kdu-create"][kdu_name
].append(
6352 "member-vnf-index": vnf_index
,
6354 "k8s-cluster-type": k8s_cluster_type
,
6355 "resource-name": resource_name
,
6356 "scale": kdu_replica_count
,
6359 elif scaling_type
== "SCALE_IN":
6360 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6362 scaling_info
["scaling_direction"] = "IN"
6363 scaling_info
["vdu-delete"] = {}
6364 scaling_info
["kdu-delete"] = {}
6366 for delta
in deltas
:
6367 for vdu_delta
in delta
.get("vdu-delta", {}):
6368 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6369 min_instance_count
= 0
6370 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6371 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
6372 min_instance_count
= vdu_profile
["min-number-of-instances"]
6374 default_instance_num
= get_number_of_instances(
6375 db_vnfd
, vdu_delta
["id"]
6377 instance_num
= vdu_delta
.get("number-of-instances", 1)
6378 nb_scale_op
-= instance_num
6380 new_instance_count
= nb_scale_op
+ default_instance_num
6382 if new_instance_count
< min_instance_count
< vdu_count
:
6383 instances_number
= min_instance_count
- new_instance_count
6385 instances_number
= instance_num
6387 if new_instance_count
< min_instance_count
:
6389 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6390 "scaling-group-descriptor '{}'".format(
6391 nb_scale_op
, scaling_group
6394 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6395 vca_scaling_info
.append(
6397 "osm_vdu_id": vdu_delta
["id"],
6398 "member-vnf-index": vnf_index
,
6400 "vdu_index": vdu_index
- 1 - x
,
6403 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
6404 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6405 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6406 kdu_name
= kdu_profile
["kdu-name"]
6407 resource_name
= kdu_profile
.get("resource-name", "")
6409 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
6410 scaling_info
["kdu-delete"][kdu_name
] = []
6412 kdur
= get_kdur(db_vnfr
, kdu_name
)
6413 if kdur
.get("helm-chart"):
6414 k8s_cluster_type
= "helm-chart-v3"
6415 self
.logger
.debug("kdur: {}".format(kdur
))
6417 kdur
.get("helm-version")
6418 and kdur
.get("helm-version") == "v2"
6420 k8s_cluster_type
= "helm-chart"
6421 elif kdur
.get("juju-bundle"):
6422 k8s_cluster_type
= "juju-bundle"
6425 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6426 "juju-bundle. Maybe an old NBI version is running".format(
6427 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
6431 min_instance_count
= 0
6432 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
6433 min_instance_count
= kdu_profile
["min-number-of-instances"]
6435 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
6436 deployed_kdu
, _
= get_deployed_kdu(
6437 nsr_deployed
, kdu_name
, vnf_index
6439 if deployed_kdu
is None:
6441 "KDU '{}' for vnf '{}' not deployed".format(
6445 kdu_instance
= deployed_kdu
.get("kdu-instance")
6446 instance_num
= await self
.k8scluster_map
[
6452 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6453 kdu_model
=deployed_kdu
.get("kdu-model"),
6455 kdu_replica_count
= instance_num
- kdu_delta
.get(
6456 "number-of-instances", 1
6459 if kdu_replica_count
< min_instance_count
< instance_num
:
6460 kdu_replica_count
= min_instance_count
6461 if kdu_replica_count
< min_instance_count
:
6463 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6464 "scaling-group-descriptor '{}'".format(
6465 instance_num
, scaling_group
6469 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6470 vca_scaling_info
.append(
6472 "osm_kdu_id": kdu_name
,
6473 "member-vnf-index": vnf_index
,
6475 "kdu_index": instance_num
- x
- 1,
6478 scaling_info
["kdu-delete"][kdu_name
].append(
6480 "member-vnf-index": vnf_index
,
6482 "k8s-cluster-type": k8s_cluster_type
,
6483 "resource-name": resource_name
,
6484 "scale": kdu_replica_count
,
6488 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
6489 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
6490 if scaling_info
["scaling_direction"] == "IN":
6491 for vdur
in reversed(db_vnfr
["vdur"]):
6492 if vdu_delete
.get(vdur
["vdu-id-ref"]):
6493 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
6494 scaling_info
["vdu"].append(
6496 "name": vdur
.get("name") or vdur
.get("vdu-name"),
6497 "vdu_id": vdur
["vdu-id-ref"],
6501 for interface
in vdur
["interfaces"]:
6502 scaling_info
["vdu"][-1]["interface"].append(
6504 "name": interface
["name"],
6505 "ip_address": interface
["ip-address"],
6506 "mac_address": interface
.get("mac-address"),
6509 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
6512 step
= "Executing pre-scale vnf-config-primitive"
6513 if scaling_descriptor
.get("scaling-config-action"):
6514 for scaling_config_action
in scaling_descriptor
[
6515 "scaling-config-action"
6518 scaling_config_action
.get("trigger") == "pre-scale-in"
6519 and scaling_type
== "SCALE_IN"
6521 scaling_config_action
.get("trigger") == "pre-scale-out"
6522 and scaling_type
== "SCALE_OUT"
6524 vnf_config_primitive
= scaling_config_action
[
6525 "vnf-config-primitive-name-ref"
6527 step
= db_nslcmop_update
[
6529 ] = "executing pre-scale scaling-config-action '{}'".format(
6530 vnf_config_primitive
6533 # look for primitive
6534 for config_primitive
in (
6535 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6536 ).get("config-primitive", ()):
6537 if config_primitive
["name"] == vnf_config_primitive
:
6541 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
6542 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
6543 "primitive".format(scaling_group
, vnf_config_primitive
)
6546 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6547 if db_vnfr
.get("additionalParamsForVnf"):
6548 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6550 scale_process
= "VCA"
6551 db_nsr_update
["config-status"] = "configuring pre-scaling"
6552 primitive_params
= self
._map
_primitive
_params
(
6553 config_primitive
, {}, vnfr_params
6556 # Pre-scale retry check: Check if this sub-operation has been executed before
6557 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6560 vnf_config_primitive
,
6564 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6565 # Skip sub-operation
6566 result
= "COMPLETED"
6567 result_detail
= "Done"
6570 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6571 vnf_config_primitive
, result
, result_detail
6575 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6576 # New sub-operation: Get index of this sub-operation
6578 len(db_nslcmop
.get("_admin", {}).get("operations"))
6583 + "vnf_config_primitive={} New sub-operation".format(
6584 vnf_config_primitive
6588 # retry: Get registered params for this existing sub-operation
6589 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6592 vnf_index
= op
.get("member_vnf_index")
6593 vnf_config_primitive
= op
.get("primitive")
6594 primitive_params
= op
.get("primitive_params")
6597 + "vnf_config_primitive={} Sub-operation retry".format(
6598 vnf_config_primitive
6601 # Execute the primitive, either with new (first-time) or registered (reintent) args
6602 ee_descriptor_id
= config_primitive
.get(
6603 "execution-environment-ref"
6605 primitive_name
= config_primitive
.get(
6606 "execution-environment-primitive", vnf_config_primitive
6608 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6609 nsr_deployed
["VCA"],
6610 member_vnf_index
=vnf_index
,
6612 vdu_count_index
=None,
6613 ee_descriptor_id
=ee_descriptor_id
,
6615 result
, result_detail
= await self
._ns
_execute
_primitive
(
6624 + "vnf_config_primitive={} Done with result {} {}".format(
6625 vnf_config_primitive
, result
, result_detail
6628 # Update operationState = COMPLETED | FAILED
6629 self
._update
_suboperation
_status
(
6630 db_nslcmop
, op_index
, result
, result_detail
6633 if result
== "FAILED":
6634 raise LcmException(result_detail
)
6635 db_nsr_update
["config-status"] = old_config_status
6636 scale_process
= None
6640 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
6643 "_admin.scaling-group.{}.time".format(admin_scale_index
)
6646 # SCALE-IN VCA - BEGIN
6647 if vca_scaling_info
:
6648 step
= db_nslcmop_update
[
6650 ] = "Deleting the execution environments"
6651 scale_process
= "VCA"
6652 for vca_info
in vca_scaling_info
:
6653 if vca_info
["type"] == "delete" and not vca_info
.get("osm_kdu_id"):
6654 member_vnf_index
= str(vca_info
["member-vnf-index"])
6656 logging_text
+ "vdu info: {}".format(vca_info
)
6658 if vca_info
.get("osm_vdu_id"):
6659 vdu_id
= vca_info
["osm_vdu_id"]
6660 vdu_index
= int(vca_info
["vdu_index"])
6663 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6664 member_vnf_index
, vdu_id
, vdu_index
6666 stage
[2] = step
= "Scaling in VCA"
6667 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6668 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
6669 config_update
= db_nsr
["configurationStatus"]
6670 for vca_index
, vca
in enumerate(vca_update
):
6672 (vca
or vca
.get("ee_id"))
6673 and vca
["member-vnf-index"] == member_vnf_index
6674 and vca
["vdu_count_index"] == vdu_index
6676 if vca
.get("vdu_id"):
6677 config_descriptor
= get_configuration(
6678 db_vnfd
, vca
.get("vdu_id")
6680 elif vca
.get("kdu_name"):
6681 config_descriptor
= get_configuration(
6682 db_vnfd
, vca
.get("kdu_name")
6685 config_descriptor
= get_configuration(
6686 db_vnfd
, db_vnfd
["id"]
6688 operation_params
= (
6689 db_nslcmop
.get("operationParams") or {}
6691 exec_terminate_primitives
= not operation_params
.get(
6692 "skip_terminate_primitives"
6693 ) and vca
.get("needed_terminate")
6694 task
= asyncio
.ensure_future(
6703 exec_primitives
=exec_terminate_primitives
,
6707 timeout
=self
.timeout
.charm_delete
,
6710 tasks_dict_info
[task
] = "Terminating VCA {}".format(
6713 del vca_update
[vca_index
]
6714 del config_update
[vca_index
]
6715 # wait for pending tasks of terminate primitives
6719 + "Waiting for tasks {}".format(
6720 list(tasks_dict_info
.keys())
6723 error_list
= await self
._wait
_for
_tasks
(
6727 self
.timeout
.charm_delete
, self
.timeout
.ns_terminate
6732 tasks_dict_info
.clear()
6734 raise LcmException("; ".join(error_list
))
6736 db_vca_and_config_update
= {
6737 "_admin.deployed.VCA": vca_update
,
6738 "configurationStatus": config_update
,
6741 "nsrs", db_nsr
["_id"], db_vca_and_config_update
6743 scale_process
= None
6744 # SCALE-IN VCA - END
6747 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
6748 scale_process
= "RO"
6749 if self
.ro_config
.ng
:
6750 await self
._scale
_ng
_ro
(
6751 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
6753 scaling_info
.pop("vdu-create", None)
6754 scaling_info
.pop("vdu-delete", None)
6756 scale_process
= None
6760 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
6761 scale_process
= "KDU"
6762 await self
._scale
_kdu
(
6763 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6765 scaling_info
.pop("kdu-create", None)
6766 scaling_info
.pop("kdu-delete", None)
6768 scale_process
= None
6772 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6774 # SCALE-UP VCA - BEGIN
6775 if vca_scaling_info
:
6776 step
= db_nslcmop_update
[
6778 ] = "Creating new execution environments"
6779 scale_process
= "VCA"
6780 for vca_info
in vca_scaling_info
:
6781 if vca_info
["type"] == "create" and not vca_info
.get("osm_kdu_id"):
6782 member_vnf_index
= str(vca_info
["member-vnf-index"])
6784 logging_text
+ "vdu info: {}".format(vca_info
)
6786 vnfd_id
= db_vnfr
["vnfd-ref"]
6787 if vca_info
.get("osm_vdu_id"):
6788 vdu_index
= int(vca_info
["vdu_index"])
6789 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
6790 if db_vnfr
.get("additionalParamsForVnf"):
6791 deploy_params
.update(
6793 db_vnfr
["additionalParamsForVnf"].copy()
6796 descriptor_config
= get_configuration(
6797 db_vnfd
, db_vnfd
["id"]
6799 if descriptor_config
:
6805 logging_text
=logging_text
6806 + "member_vnf_index={} ".format(member_vnf_index
),
6809 nslcmop_id
=nslcmop_id
,
6815 kdu_index
=kdu_index
,
6816 member_vnf_index
=member_vnf_index
,
6817 vdu_index
=vdu_index
,
6819 deploy_params
=deploy_params
,
6820 descriptor_config
=descriptor_config
,
6821 base_folder
=base_folder
,
6822 task_instantiation_info
=tasks_dict_info
,
6825 vdu_id
= vca_info
["osm_vdu_id"]
6826 vdur
= find_in_list(
6827 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
6829 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
6830 if vdur
.get("additionalParams"):
6831 deploy_params_vdu
= parse_yaml_strings(
6832 vdur
["additionalParams"]
6835 deploy_params_vdu
= deploy_params
6836 deploy_params_vdu
["OSM"] = get_osm_params(
6837 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
6839 if descriptor_config
:
6845 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6846 member_vnf_index
, vdu_id
, vdu_index
6848 stage
[2] = step
= "Scaling out VCA"
6849 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6851 logging_text
=logging_text
6852 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6853 member_vnf_index
, vdu_id
, vdu_index
6857 nslcmop_id
=nslcmop_id
,
6863 member_vnf_index
=member_vnf_index
,
6864 vdu_index
=vdu_index
,
6865 kdu_index
=kdu_index
,
6867 deploy_params
=deploy_params_vdu
,
6868 descriptor_config
=descriptor_config
,
6869 base_folder
=base_folder
,
6870 task_instantiation_info
=tasks_dict_info
,
6873 # SCALE-UP VCA - END
6874 scale_process
= None
6877 # execute primitive service POST-SCALING
6878 step
= "Executing post-scale vnf-config-primitive"
6879 if scaling_descriptor
.get("scaling-config-action"):
6880 for scaling_config_action
in scaling_descriptor
[
6881 "scaling-config-action"
6884 scaling_config_action
.get("trigger") == "post-scale-in"
6885 and scaling_type
== "SCALE_IN"
6887 scaling_config_action
.get("trigger") == "post-scale-out"
6888 and scaling_type
== "SCALE_OUT"
6890 vnf_config_primitive
= scaling_config_action
[
6891 "vnf-config-primitive-name-ref"
6893 step
= db_nslcmop_update
[
6895 ] = "executing post-scale scaling-config-action '{}'".format(
6896 vnf_config_primitive
6899 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6900 if db_vnfr
.get("additionalParamsForVnf"):
6901 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6903 # look for primitive
6904 for config_primitive
in (
6905 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6906 ).get("config-primitive", ()):
6907 if config_primitive
["name"] == vnf_config_primitive
:
6911 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
6912 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
6913 "config-primitive".format(
6914 scaling_group
, vnf_config_primitive
6917 scale_process
= "VCA"
6918 db_nsr_update
["config-status"] = "configuring post-scaling"
6919 primitive_params
= self
._map
_primitive
_params
(
6920 config_primitive
, {}, vnfr_params
6923 # Post-scale retry check: Check if this sub-operation has been executed before
6924 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6927 vnf_config_primitive
,
6931 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6932 # Skip sub-operation
6933 result
= "COMPLETED"
6934 result_detail
= "Done"
6937 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6938 vnf_config_primitive
, result
, result_detail
6942 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6943 # New sub-operation: Get index of this sub-operation
6945 len(db_nslcmop
.get("_admin", {}).get("operations"))
6950 + "vnf_config_primitive={} New sub-operation".format(
6951 vnf_config_primitive
6955 # retry: Get registered params for this existing sub-operation
6956 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6959 vnf_index
= op
.get("member_vnf_index")
6960 vnf_config_primitive
= op
.get("primitive")
6961 primitive_params
= op
.get("primitive_params")
6964 + "vnf_config_primitive={} Sub-operation retry".format(
6965 vnf_config_primitive
6968 # Execute the primitive, either with new (first-time) or registered (reintent) args
6969 ee_descriptor_id
= config_primitive
.get(
6970 "execution-environment-ref"
6972 primitive_name
= config_primitive
.get(
6973 "execution-environment-primitive", vnf_config_primitive
6975 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6976 nsr_deployed
["VCA"],
6977 member_vnf_index
=vnf_index
,
6979 vdu_count_index
=None,
6980 ee_descriptor_id
=ee_descriptor_id
,
6982 result
, result_detail
= await self
._ns
_execute
_primitive
(
6991 + "vnf_config_primitive={} Done with result {} {}".format(
6992 vnf_config_primitive
, result
, result_detail
6995 # Update operationState = COMPLETED | FAILED
6996 self
._update
_suboperation
_status
(
6997 db_nslcmop
, op_index
, result
, result_detail
7000 if result
== "FAILED":
7001 raise LcmException(result_detail
)
7002 db_nsr_update
["config-status"] = old_config_status
7003 scale_process
= None
7008 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
7009 db_nsr_update
["operational-status"] = (
7011 if old_operational_status
== "failed"
7012 else old_operational_status
7014 db_nsr_update
["config-status"] = old_config_status
7017 ROclient
.ROClientException
,
7022 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7024 except asyncio
.CancelledError
:
7026 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7028 exc
= "Operation was cancelled"
7029 except Exception as e
:
7030 exc
= traceback
.format_exc()
7031 self
.logger
.critical(
7032 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7036 self
._write
_ns
_status
(
7039 current_operation
="IDLE",
7040 current_operation_id
=None,
7043 stage
[1] = "Waiting for instantiate pending tasks."
7044 self
.logger
.debug(logging_text
+ stage
[1])
7045 exc
= await self
._wait
_for
_tasks
(
7048 self
.timeout
.ns_deploy
,
7056 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7057 nslcmop_operation_state
= "FAILED"
7059 db_nsr_update
["operational-status"] = old_operational_status
7060 db_nsr_update
["config-status"] = old_config_status
7061 db_nsr_update
["detailed-status"] = ""
7063 if "VCA" in scale_process
:
7064 db_nsr_update
["config-status"] = "failed"
7065 if "RO" in scale_process
:
7066 db_nsr_update
["operational-status"] = "failed"
7069 ] = "FAILED scaling nslcmop={} {}: {}".format(
7070 nslcmop_id
, step
, exc
7073 error_description_nslcmop
= None
7074 nslcmop_operation_state
= "COMPLETED"
7075 db_nslcmop_update
["detailed-status"] = "Done"
7077 self
._write
_op
_status
(
7080 error_message
=error_description_nslcmop
,
7081 operation_state
=nslcmop_operation_state
,
7082 other_update
=db_nslcmop_update
,
7085 self
._write
_ns
_status
(
7088 current_operation
="IDLE",
7089 current_operation_id
=None,
7090 other_update
=db_nsr_update
,
7093 if nslcmop_operation_state
:
7097 "nslcmop_id": nslcmop_id
,
7098 "operationState": nslcmop_operation_state
,
7100 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
7101 except Exception as e
:
7103 logging_text
+ "kafka_write notification Exception {}".format(e
)
7105 self
.logger
.debug(logging_text
+ "Exit")
7106 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
7108 async def _scale_kdu(
7109 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
7111 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
7112 for kdu_name
in _scaling_info
:
7113 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
7114 deployed_kdu
, index
= get_deployed_kdu(
7115 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
7117 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
7118 kdu_instance
= deployed_kdu
["kdu-instance"]
7119 kdu_model
= deployed_kdu
.get("kdu-model")
7120 scale
= int(kdu_scaling_info
["scale"])
7121 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
7124 "collection": "nsrs",
7125 "filter": {"_id": nsr_id
},
7126 "path": "_admin.deployed.K8s.{}".format(index
),
7129 step
= "scaling application {}".format(
7130 kdu_scaling_info
["resource-name"]
7132 self
.logger
.debug(logging_text
+ step
)
7134 if kdu_scaling_info
["type"] == "delete":
7135 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7138 and kdu_config
.get("terminate-config-primitive")
7139 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7141 terminate_config_primitive_list
= kdu_config
.get(
7142 "terminate-config-primitive"
7144 terminate_config_primitive_list
.sort(
7145 key
=lambda val
: int(val
["seq"])
7149 terminate_config_primitive
7150 ) in terminate_config_primitive_list
:
7151 primitive_params_
= self
._map
_primitive
_params
(
7152 terminate_config_primitive
, {}, {}
7154 step
= "execute terminate config primitive"
7155 self
.logger
.debug(logging_text
+ step
)
7156 await asyncio
.wait_for(
7157 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7158 cluster_uuid
=cluster_uuid
,
7159 kdu_instance
=kdu_instance
,
7160 primitive_name
=terminate_config_primitive
["name"],
7161 params
=primitive_params_
,
7163 total_timeout
=self
.timeout
.primitive
,
7166 timeout
=self
.timeout
.primitive
7167 * self
.timeout
.primitive_outer_factor
,
7170 await asyncio
.wait_for(
7171 self
.k8scluster_map
[k8s_cluster_type
].scale(
7172 kdu_instance
=kdu_instance
,
7174 resource_name
=kdu_scaling_info
["resource-name"],
7175 total_timeout
=self
.timeout
.scale_on_error
,
7177 cluster_uuid
=cluster_uuid
,
7178 kdu_model
=kdu_model
,
7182 timeout
=self
.timeout
.scale_on_error
7183 * self
.timeout
.scale_on_error_outer_factor
,
7186 if kdu_scaling_info
["type"] == "create":
7187 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7190 and kdu_config
.get("initial-config-primitive")
7191 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7193 initial_config_primitive_list
= kdu_config
.get(
7194 "initial-config-primitive"
7196 initial_config_primitive_list
.sort(
7197 key
=lambda val
: int(val
["seq"])
7200 for initial_config_primitive
in initial_config_primitive_list
:
7201 primitive_params_
= self
._map
_primitive
_params
(
7202 initial_config_primitive
, {}, {}
7204 step
= "execute initial config primitive"
7205 self
.logger
.debug(logging_text
+ step
)
7206 await asyncio
.wait_for(
7207 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7208 cluster_uuid
=cluster_uuid
,
7209 kdu_instance
=kdu_instance
,
7210 primitive_name
=initial_config_primitive
["name"],
7211 params
=primitive_params_
,
7218 async def _scale_ng_ro(
7219 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
7221 nsr_id
= db_nslcmop
["nsInstanceId"]
7222 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7225 # read from db: vnfd's for every vnf
7228 # for each vnf in ns, read vnfd
7229 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
7230 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
7231 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
7232 # if we haven't this vnfd, read it from db
7233 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
7235 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7236 db_vnfds
.append(vnfd
)
7237 n2vc_key
= self
.n2vc
.get_public_key()
7238 n2vc_key_list
= [n2vc_key
]
7241 vdu_scaling_info
.get("vdu-create"),
7242 vdu_scaling_info
.get("vdu-delete"),
7245 # db_vnfr has been updated, update db_vnfrs to use it
7246 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
7247 await self
._instantiate
_ng
_ro
(
7257 start_deploy
=time(),
7258 timeout_ns_deploy
=self
.timeout
.ns_deploy
,
7260 if vdu_scaling_info
.get("vdu-delete"):
7262 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
7265 async def extract_prometheus_scrape_jobs(
7269 ee_config_descriptor
: dict,
7273 vnf_member_index
: str = "",
7275 vdu_index
: int = None,
7277 kdu_index
: int = None,
7279 """Method to extract prometheus scrape jobs from EE's Prometheus template job file
7280 This method will wait until the corresponding VDU or KDU is fully instantiated
7283 ee_id (str): Execution Environment ID
7284 artifact_path (str): Path where the EE's content is (including the Prometheus template file)
7285 ee_config_descriptor (dict): Execution Environment's configuration descriptor
7286 vnfr_id (str): VNFR ID where this EE applies
7287 nsr_id (str): NSR ID where this EE applies
7288 target_ip (str): VDU/KDU instance IP address
7289 vnf_member_index (str, optional): VNF index where this EE applies. Defaults to "".
7290 vdu_id (str, optional): VDU ID where this EE applies. Defaults to "".
7291 vdu_index (int, optional): VDU index where this EE applies. Defaults to None.
7292 kdu_name (str, optional): KDU name where this EE applies. Defaults to "".
7293 kdu_index (int, optional): KDU index where this EE applies. Defaults to None.
7296 LcmException: When the VDU or KDU instance was not found in an hour
7299 _type_: Prometheus jobs
7301 self
.logger
.debug(f
"KDU: {kdu_name}; KDU INDEX: {kdu_index}")
7302 # look if exist a file called 'prometheus*.j2' and
7303 artifact_content
= self
.fs
.dir_ls(artifact_path
)
7307 for f
in artifact_content
7308 if f
.startswith("prometheus") and f
.endswith(".j2")
7314 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
7319 for r
in range(360):
7320 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
7321 if vdu_id
and vdu_index
is not None:
7325 for x
in get_iterable(db_vnfr
, "vdur")
7327 x
.get("vdu-id-ref") == vdu_id
7328 and x
.get("count-index") == vdu_index
7333 if vdur
.get("name"):
7334 vdur_name
= vdur
.get("name")
7336 if kdu_name
and kdu_index
is not None:
7340 for x
in get_iterable(db_vnfr
, "kdur")
7342 x
.get("kdu-name") == kdu_name
7343 and x
.get("count-index") == kdu_index
7348 if kdur
.get("name"):
7349 kdur_name
= kdur
.get("name")
7352 await asyncio
.sleep(10, loop
=self
.loop
)
7354 if vdu_id
and vdu_index
is not None:
7356 f
"Timeout waiting VDU with name={vdu_id} and index={vdu_index} to be intantiated"
7358 if kdu_name
and kdu_index
is not None:
7360 f
"Timeout waiting KDU with name={kdu_name} and index={kdu_index} to be intantiated"
7364 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
7365 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
7367 vnfr_id
= vnfr_id
.replace("-", "")
7369 "JOB_NAME": vnfr_id
,
7370 "TARGET_IP": target_ip
,
7371 "EXPORTER_POD_IP": host_name
,
7372 "EXPORTER_POD_PORT": host_port
,
7374 "VNF_MEMBER_INDEX": vnf_member_index
,
7375 "VDUR_NAME": vdur_name
,
7376 "KDUR_NAME": kdur_name
,
7378 job_list
= parse_job(job_data
, variables
)
7379 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
7380 for job
in job_list
:
7382 not isinstance(job
.get("job_name"), str)
7383 or vnfr_id
not in job
["job_name"]
7385 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
7386 job
["nsr_id"] = nsr_id
7387 job
["vnfr_id"] = vnfr_id
7390 async def rebuild_start_stop(
7391 self
, nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
7393 logging_text
= "Task ns={} {}={} ".format(nsr_id
, operation_type
, nslcmop_id
)
7394 self
.logger
.info(logging_text
+ "Enter")
7395 stage
= ["Preparing the environment", ""]
7396 # database nsrs record
7400 # in case of error, indicates what part of scale was failed to put nsr at error status
7401 start_deploy
= time()
7403 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_id
})
7404 vim_account_id
= db_vnfr
.get("vim-account-id")
7405 vim_info_key
= "vim:" + vim_account_id
7406 vdu_id
= additional_param
["vdu_id"]
7407 vdurs
= [item
for item
in db_vnfr
["vdur"] if item
["vdu-id-ref"] == vdu_id
]
7408 vdur
= find_in_list(
7409 vdurs
, lambda vdu
: vdu
["count-index"] == additional_param
["count-index"]
7412 vdu_vim_name
= vdur
["name"]
7413 vim_vm_id
= vdur
["vim_info"][vim_info_key
]["vim_id"]
7414 target_vim
, _
= next(k_v
for k_v
in vdur
["vim_info"].items())
7416 raise LcmException("Target vdu is not found")
7417 self
.logger
.info("vdu_vim_name >> {} ".format(vdu_vim_name
))
7418 # wait for any previous tasks in process
7419 stage
[1] = "Waiting for previous operations to terminate"
7420 self
.logger
.info(stage
[1])
7421 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7423 stage
[1] = "Reading from database."
7424 self
.logger
.info(stage
[1])
7425 self
._write
_ns
_status
(
7428 current_operation
=operation_type
.upper(),
7429 current_operation_id
=nslcmop_id
,
7431 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7434 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
7435 db_nsr_update
["operational-status"] = operation_type
7436 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7440 "vim_vm_id": vim_vm_id
,
7442 "vdu_index": additional_param
["count-index"],
7443 "vdu_id": vdur
["id"],
7444 "target_vim": target_vim
,
7445 "vim_account_id": vim_account_id
,
7448 stage
[1] = "Sending rebuild request to RO... {}".format(desc
)
7449 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7450 self
.logger
.info("ro nsr id: {}".format(nsr_id
))
7451 result_dict
= await self
.RO
.operate(nsr_id
, desc
, operation_type
)
7452 self
.logger
.info("response from RO: {}".format(result_dict
))
7453 action_id
= result_dict
["action_id"]
7454 await self
._wait
_ng
_ro
(
7459 self
.timeout
.operate
,
7461 "start_stop_rebuild",
7463 return "COMPLETED", "Done"
7464 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7465 self
.logger
.error("Exit Exception {}".format(e
))
7467 except asyncio
.CancelledError
:
7468 self
.logger
.error("Cancelled Exception while '{}'".format(stage
))
7469 exc
= "Operation was cancelled"
7470 except Exception as e
:
7471 exc
= traceback
.format_exc()
7472 self
.logger
.critical(
7473 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7475 return "FAILED", "Error in operate VNF {}".format(exc
)
7477 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7479 Get VCA Cloud and VCA Cloud Credentials for the VIM account
7481 :param: vim_account_id: VIM Account ID
7483 :return: (cloud_name, cloud_credential)
7485 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7486 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
7488 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7490 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
7492 :param: vim_account_id: VIM Account ID
7494 :return: (cloud_name, cloud_credential)
7496 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7497 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")
7499 async def migrate(self
, nsr_id
, nslcmop_id
):
7501 Migrate VNFs and VDUs instances in a NS
7503 :param: nsr_id: NS Instance ID
7504 :param: nslcmop_id: nslcmop ID of migrate
7507 # Try to lock HA task here
7508 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7509 if not task_is_locked_by_me
:
7511 logging_text
= "Task ns={} migrate ".format(nsr_id
)
7512 self
.logger
.debug(logging_text
+ "Enter")
7513 # get all needed from database
7515 db_nslcmop_update
= {}
7516 nslcmop_operation_state
= None
7520 # in case of error, indicates what part of scale was failed to put nsr at error status
7521 start_deploy
= time()
7524 # wait for any previous tasks in process
7525 step
= "Waiting for previous operations to terminate"
7526 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7528 self
._write
_ns
_status
(
7531 current_operation
="MIGRATING",
7532 current_operation_id
=nslcmop_id
,
7534 step
= "Getting nslcmop from database"
7536 step
+ " after having waited for previous tasks to be completed"
7538 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7539 migrate_params
= db_nslcmop
.get("operationParams")
7542 target
.update(migrate_params
)
7543 desc
= await self
.RO
.migrate(nsr_id
, target
)
7544 self
.logger
.debug("RO return > {}".format(desc
))
7545 action_id
= desc
["action_id"]
7546 await self
._wait
_ng
_ro
(
7551 self
.timeout
.migrate
,
7552 operation
="migrate",
7554 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7555 self
.logger
.error("Exit Exception {}".format(e
))
7557 except asyncio
.CancelledError
:
7558 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
7559 exc
= "Operation was cancelled"
7560 except Exception as e
:
7561 exc
= traceback
.format_exc()
7562 self
.logger
.critical(
7563 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7566 self
._write
_ns
_status
(
7569 current_operation
="IDLE",
7570 current_operation_id
=None,
7573 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
7574 nslcmop_operation_state
= "FAILED"
7576 nslcmop_operation_state
= "COMPLETED"
7577 db_nslcmop_update
["detailed-status"] = "Done"
7578 db_nsr_update
["detailed-status"] = "Done"
7580 self
._write
_op
_status
(
7584 operation_state
=nslcmop_operation_state
,
7585 other_update
=db_nslcmop_update
,
7587 if nslcmop_operation_state
:
7591 "nslcmop_id": nslcmop_id
,
7592 "operationState": nslcmop_operation_state
,
7594 await self
.msg
.aiowrite("ns", "migrated", msg
, loop
=self
.loop
)
7595 except Exception as e
:
7597 logging_text
+ "kafka_write notification Exception {}".format(e
)
7599 self
.logger
.debug(logging_text
+ "Exit")
7600 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_migrate")
7602 async def heal(self
, nsr_id
, nslcmop_id
):
7606 :param nsr_id: ns instance to heal
7607 :param nslcmop_id: operation to run
7611 # Try to lock HA task here
7612 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7613 if not task_is_locked_by_me
:
7616 logging_text
= "Task ns={} heal={} ".format(nsr_id
, nslcmop_id
)
7617 stage
= ["", "", ""]
7618 tasks_dict_info
= {}
7619 # ^ stage, step, VIM progress
7620 self
.logger
.debug(logging_text
+ "Enter")
7621 # get all needed from database
7623 db_nslcmop_update
= {}
7625 db_vnfrs
= {} # vnf's info indexed by _id
7627 old_operational_status
= ""
7628 old_config_status
= ""
7631 # wait for any previous tasks in process
7632 step
= "Waiting for previous operations to terminate"
7633 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7634 self
._write
_ns
_status
(
7637 current_operation
="HEALING",
7638 current_operation_id
=nslcmop_id
,
7641 step
= "Getting nslcmop from database"
7643 step
+ " after having waited for previous tasks to be completed"
7645 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7647 step
= "Getting nsr from database"
7648 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
7649 old_operational_status
= db_nsr
["operational-status"]
7650 old_config_status
= db_nsr
["config-status"]
7653 "_admin.deployed.RO.operational-status": "healing",
7655 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7657 step
= "Sending heal order to VIM"
7659 logging_text
=logging_text
,
7661 db_nslcmop
=db_nslcmop
,
7666 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
7667 self
.logger
.debug(logging_text
+ stage
[1])
7668 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7669 self
.fs
.sync(db_nsr
["nsd-id"])
7671 # read from db: vnfr's of this ns
7672 step
= "Getting vnfrs from db"
7673 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
7674 for vnfr
in db_vnfrs_list
:
7675 db_vnfrs
[vnfr
["_id"]] = vnfr
7676 self
.logger
.debug("ns.heal db_vnfrs={}".format(db_vnfrs
))
7678 # Check for each target VNF
7679 target_list
= db_nslcmop
.get("operationParams", {}).get("healVnfData", {})
7680 for target_vnf
in target_list
:
7681 # Find this VNF in the list from DB
7682 vnfr_id
= target_vnf
.get("vnfInstanceId", None)
7684 db_vnfr
= db_vnfrs
[vnfr_id
]
7685 vnfd_id
= db_vnfr
.get("vnfd-id")
7686 vnfd_ref
= db_vnfr
.get("vnfd-ref")
7687 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7688 base_folder
= vnfd
["_admin"]["storage"]
7693 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
7694 member_vnf_index
= db_vnfr
.get("member-vnf-index-ref")
7696 # Check each target VDU and deploy N2VC
7697 target_vdu_list
= target_vnf
.get("additionalParams", {}).get(
7700 if not target_vdu_list
:
7701 # Codigo nuevo para crear diccionario
7702 target_vdu_list
= []
7703 for existing_vdu
in db_vnfr
.get("vdur"):
7704 vdu_name
= existing_vdu
.get("vdu-name", None)
7705 vdu_index
= existing_vdu
.get("count-index", 0)
7706 vdu_run_day1
= target_vnf
.get("additionalParams", {}).get(
7709 vdu_to_be_healed
= {
7711 "count-index": vdu_index
,
7712 "run-day1": vdu_run_day1
,
7714 target_vdu_list
.append(vdu_to_be_healed
)
7715 for target_vdu
in target_vdu_list
:
7716 deploy_params_vdu
= target_vdu
7717 # Set run-day1 vnf level value if not vdu level value exists
7718 if not deploy_params_vdu
.get("run-day1") and target_vnf
[
7721 deploy_params_vdu
["run-day1"] = target_vnf
[
7724 vdu_name
= target_vdu
.get("vdu-id", None)
7725 # TODO: Get vdu_id from vdud.
7727 # For multi instance VDU count-index is mandatory
7728 # For single session VDU count-indes is 0
7729 vdu_index
= target_vdu
.get("count-index", 0)
7731 # n2vc_redesign STEP 3 to 6 Deploy N2VC
7732 stage
[1] = "Deploying Execution Environments."
7733 self
.logger
.debug(logging_text
+ stage
[1])
7735 # VNF Level charm. Normal case when proxy charms.
7736 # If target instance is management machine continue with actions: recreate EE for native charms or reinject juju key for proxy charms.
7737 descriptor_config
= get_configuration(vnfd
, vnfd_ref
)
7738 if descriptor_config
:
7739 # Continue if healed machine is management machine
7740 vnf_ip_address
= db_vnfr
.get("ip-address")
7741 target_instance
= None
7742 for instance
in db_vnfr
.get("vdur", None):
7744 instance
["vdu-name"] == vdu_name
7745 and instance
["count-index"] == vdu_index
7747 target_instance
= instance
7749 if vnf_ip_address
== target_instance
.get("ip-address"):
7751 logging_text
=logging_text
7752 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7753 member_vnf_index
, vdu_name
, vdu_index
7757 nslcmop_id
=nslcmop_id
,
7763 member_vnf_index
=member_vnf_index
,
7766 deploy_params
=deploy_params_vdu
,
7767 descriptor_config
=descriptor_config
,
7768 base_folder
=base_folder
,
7769 task_instantiation_info
=tasks_dict_info
,
7773 # VDU Level charm. Normal case with native charms.
7774 descriptor_config
= get_configuration(vnfd
, vdu_name
)
7775 if descriptor_config
:
7777 logging_text
=logging_text
7778 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7779 member_vnf_index
, vdu_name
, vdu_index
7783 nslcmop_id
=nslcmop_id
,
7789 member_vnf_index
=member_vnf_index
,
7790 vdu_index
=vdu_index
,
7792 deploy_params
=deploy_params_vdu
,
7793 descriptor_config
=descriptor_config
,
7794 base_folder
=base_folder
,
7795 task_instantiation_info
=tasks_dict_info
,
7800 ROclient
.ROClientException
,
7805 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7807 except asyncio
.CancelledError
:
7809 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7811 exc
= "Operation was cancelled"
7812 except Exception as e
:
7813 exc
= traceback
.format_exc()
7814 self
.logger
.critical(
7815 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7820 stage
[1] = "Waiting for healing pending tasks."
7821 self
.logger
.debug(logging_text
+ stage
[1])
7822 exc
= await self
._wait
_for
_tasks
(
7825 self
.timeout
.ns_deploy
,
7833 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7834 nslcmop_operation_state
= "FAILED"
7836 db_nsr_update
["operational-status"] = old_operational_status
7837 db_nsr_update
["config-status"] = old_config_status
7840 ] = "FAILED healing nslcmop={} {}: {}".format(nslcmop_id
, step
, exc
)
7841 for task
, task_name
in tasks_dict_info
.items():
7842 if not task
.done() or task
.cancelled() or task
.exception():
7843 if task_name
.startswith(self
.task_name_deploy_vca
):
7844 # A N2VC task is pending
7845 db_nsr_update
["config-status"] = "failed"
7847 # RO task is pending
7848 db_nsr_update
["operational-status"] = "failed"
7850 error_description_nslcmop
= None
7851 nslcmop_operation_state
= "COMPLETED"
7852 db_nslcmop_update
["detailed-status"] = "Done"
7853 db_nsr_update
["detailed-status"] = "Done"
7854 db_nsr_update
["operational-status"] = "running"
7855 db_nsr_update
["config-status"] = "configured"
7857 self
._write
_op
_status
(
7860 error_message
=error_description_nslcmop
,
7861 operation_state
=nslcmop_operation_state
,
7862 other_update
=db_nslcmop_update
,
7865 self
._write
_ns
_status
(
7868 current_operation
="IDLE",
7869 current_operation_id
=None,
7870 other_update
=db_nsr_update
,
7873 if nslcmop_operation_state
:
7877 "nslcmop_id": nslcmop_id
,
7878 "operationState": nslcmop_operation_state
,
7880 await self
.msg
.aiowrite("ns", "healed", msg
, loop
=self
.loop
)
7881 except Exception as e
:
7883 logging_text
+ "kafka_write notification Exception {}".format(e
)
7885 self
.logger
.debug(logging_text
+ "Exit")
7886 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_heal")
7897 :param logging_text: preffix text to use at logging
7898 :param nsr_id: nsr identity
7899 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
7900 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
7901 :return: None or exception
7904 def get_vim_account(vim_account_id
):
7906 if vim_account_id
in db_vims
:
7907 return db_vims
[vim_account_id
]
7908 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
7909 db_vims
[vim_account_id
] = db_vim
7914 ns_params
= db_nslcmop
.get("operationParams")
7915 if ns_params
and ns_params
.get("timeout_ns_heal"):
7916 timeout_ns_heal
= ns_params
["timeout_ns_heal"]
7918 timeout_ns_heal
= self
.timeout
.ns_heal
7922 nslcmop_id
= db_nslcmop
["_id"]
7924 "action_id": nslcmop_id
,
7926 self
.logger
.warning(
7927 "db_nslcmop={} and timeout_ns_heal={}".format(
7928 db_nslcmop
, timeout_ns_heal
7931 target
.update(db_nslcmop
.get("operationParams", {}))
7933 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
7934 desc
= await self
.RO
.recreate(nsr_id
, target
)
7935 self
.logger
.debug("RO return > {}".format(desc
))
7936 action_id
= desc
["action_id"]
7937 # waits for RO to complete because Reinjecting juju key at ro can find VM in state Deleted
7938 await self
._wait
_ng
_ro
(
7945 operation
="healing",
7950 "_admin.deployed.RO.operational-status": "running",
7951 "detailed-status": " ".join(stage
),
7953 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7954 self
._write
_op
_status
(nslcmop_id
, stage
)
7956 logging_text
+ "ns healed at RO. RO_id={}".format(action_id
)
7959 except Exception as e
:
7960 stage
[2] = "ERROR healing at VIM"
7961 # self.set_vnfr_at_error(db_vnfrs, str(e))
7963 "Error healing at VIM {}".format(e
),
7964 exc_info
=not isinstance(
7967 ROclient
.ROClientException
,
7993 task_instantiation_info
,
7996 # launch instantiate_N2VC in a asyncio task and register task object
7997 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
7998 # if not found, create one entry and update database
7999 # fill db_nsr._admin.deployed.VCA.<index>
8002 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
8006 get_charm_name
= False
8007 if "execution-environment-list" in descriptor_config
:
8008 ee_list
= descriptor_config
.get("execution-environment-list", [])
8009 elif "juju" in descriptor_config
:
8010 ee_list
= [descriptor_config
] # ns charms
8011 if "execution-environment-list" not in descriptor_config
:
8012 # charm name is only required for ns charms
8013 get_charm_name
= True
8014 else: # other types as script are not supported
8017 for ee_item
in ee_list
:
8020 + "_deploy_n2vc ee_item juju={}, helm={}".format(
8021 ee_item
.get("juju"), ee_item
.get("helm-chart")
8024 ee_descriptor_id
= ee_item
.get("id")
8025 if ee_item
.get("juju"):
8026 vca_name
= ee_item
["juju"].get("charm")
8028 charm_name
= self
.find_charm_name(db_nsr
, str(vca_name
))
8031 if ee_item
["juju"].get("charm") is not None
8034 if ee_item
["juju"].get("cloud") == "k8s":
8035 vca_type
= "k8s_proxy_charm"
8036 elif ee_item
["juju"].get("proxy") is False:
8037 vca_type
= "native_charm"
8038 elif ee_item
.get("helm-chart"):
8039 vca_name
= ee_item
["helm-chart"]
8040 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
8043 vca_type
= "helm-v3"
8046 logging_text
+ "skipping non juju neither charm configuration"
8051 for vca_index
, vca_deployed
in enumerate(
8052 db_nsr
["_admin"]["deployed"]["VCA"]
8054 if not vca_deployed
:
8057 vca_deployed
.get("member-vnf-index") == member_vnf_index
8058 and vca_deployed
.get("vdu_id") == vdu_id
8059 and vca_deployed
.get("kdu_name") == kdu_name
8060 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
8061 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
8065 # not found, create one.
8067 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
8070 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
8072 target
+= "/kdu/{}".format(kdu_name
)
8074 "target_element": target
,
8075 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
8076 "member-vnf-index": member_vnf_index
,
8078 "kdu_name": kdu_name
,
8079 "vdu_count_index": vdu_index
,
8080 "operational-status": "init", # TODO revise
8081 "detailed-status": "", # TODO revise
8082 "step": "initial-deploy", # TODO revise
8084 "vdu_name": vdu_name
,
8086 "ee_descriptor_id": ee_descriptor_id
,
8087 "charm_name": charm_name
,
8091 # create VCA and configurationStatus in db
8093 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
8094 "configurationStatus.{}".format(vca_index
): dict(),
8096 self
.update_db_2("nsrs", nsr_id
, db_dict
)
8098 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
8100 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
8101 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
8102 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
8105 task_n2vc
= asyncio
.ensure_future(
8107 logging_text
=logging_text
,
8108 vca_index
=vca_index
,
8114 vdu_index
=vdu_index
,
8115 deploy_params
=deploy_params
,
8116 config_descriptor
=descriptor_config
,
8117 base_folder
=base_folder
,
8118 nslcmop_id
=nslcmop_id
,
8122 ee_config_descriptor
=ee_item
,
8125 self
.lcm_tasks
.register(
8129 "instantiate_N2VC-{}".format(vca_index
),
8132 task_instantiation_info
[
8134 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
8135 member_vnf_index
or "", vdu_id
or ""
8138 async def heal_N2VC(
8155 ee_config_descriptor
,
8157 nsr_id
= db_nsr
["_id"]
8158 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
8159 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
8160 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
8161 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
8163 "collection": "nsrs",
8164 "filter": {"_id": nsr_id
},
8165 "path": db_update_entry
,
8171 element_under_configuration
= nsr_id
8175 vnfr_id
= db_vnfr
["_id"]
8176 osm_config
["osm"]["vnf_id"] = vnfr_id
8178 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
8180 if vca_type
== "native_charm":
8183 index_number
= vdu_index
or 0
8186 element_type
= "VNF"
8187 element_under_configuration
= vnfr_id
8188 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
8190 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
8191 element_type
= "VDU"
8192 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
8193 osm_config
["osm"]["vdu_id"] = vdu_id
8195 namespace
+= ".{}".format(kdu_name
)
8196 element_type
= "KDU"
8197 element_under_configuration
= kdu_name
8198 osm_config
["osm"]["kdu_name"] = kdu_name
8201 if base_folder
["pkg-dir"]:
8202 artifact_path
= "{}/{}/{}/{}".format(
8203 base_folder
["folder"],
8204 base_folder
["pkg-dir"],
8207 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8212 artifact_path
= "{}/Scripts/{}/{}/".format(
8213 base_folder
["folder"],
8216 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8221 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
8223 # get initial_config_primitive_list that applies to this element
8224 initial_config_primitive_list
= config_descriptor
.get(
8225 "initial-config-primitive"
8229 "Initial config primitive list > {}".format(
8230 initial_config_primitive_list
8234 # add config if not present for NS charm
8235 ee_descriptor_id
= ee_config_descriptor
.get("id")
8236 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
8237 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
8238 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
8242 "Initial config primitive list #2 > {}".format(
8243 initial_config_primitive_list
8246 # n2vc_redesign STEP 3.1
8247 # find old ee_id if exists
8248 ee_id
= vca_deployed
.get("ee_id")
8250 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
8251 # create or register execution environment in VCA. Only for native charms when healing
8252 if vca_type
== "native_charm":
8253 step
= "Waiting to VM being up and getting IP address"
8254 self
.logger
.debug(logging_text
+ step
)
8255 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8264 credentials
= {"hostname": rw_mgmt_ip
}
8266 username
= deep_get(
8267 config_descriptor
, ("config-access", "ssh-access", "default-user")
8269 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
8270 # merged. Meanwhile let's get username from initial-config-primitive
8271 if not username
and initial_config_primitive_list
:
8272 for config_primitive
in initial_config_primitive_list
:
8273 for param
in config_primitive
.get("parameter", ()):
8274 if param
["name"] == "ssh-username":
8275 username
= param
["value"]
8279 "Cannot determine the username neither with 'initial-config-primitive' nor with "
8280 "'config-access.ssh-access.default-user'"
8282 credentials
["username"] = username
8284 # n2vc_redesign STEP 3.2
8285 # TODO: Before healing at RO it is needed to destroy native charm units to be deleted.
8286 self
._write
_configuration
_status
(
8288 vca_index
=vca_index
,
8289 status
="REGISTERING",
8290 element_under_configuration
=element_under_configuration
,
8291 element_type
=element_type
,
8294 step
= "register execution environment {}".format(credentials
)
8295 self
.logger
.debug(logging_text
+ step
)
8296 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
8297 credentials
=credentials
,
8298 namespace
=namespace
,
8303 # update ee_id en db
8305 "_admin.deployed.VCA.{}.ee_id".format(vca_index
): ee_id
,
8307 self
.update_db_2("nsrs", nsr_id
, db_dict_ee_id
)
8309 # for compatibility with MON/POL modules, the need model and application name at database
8310 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
8311 # Not sure if this need to be done when healing
8313 ee_id_parts = ee_id.split(".")
8314 db_nsr_update = {db_update_entry + "ee_id": ee_id}
8315 if len(ee_id_parts) >= 2:
8316 model_name = ee_id_parts[0]
8317 application_name = ee_id_parts[1]
8318 db_nsr_update[db_update_entry + "model"] = model_name
8319 db_nsr_update[db_update_entry + "application"] = application_name
8322 # n2vc_redesign STEP 3.3
8323 # Install configuration software. Only for native charms.
8324 step
= "Install configuration Software"
8326 self
._write
_configuration
_status
(
8328 vca_index
=vca_index
,
8329 status
="INSTALLING SW",
8330 element_under_configuration
=element_under_configuration
,
8331 element_type
=element_type
,
8332 # other_update=db_nsr_update,
8336 # TODO check if already done
8337 self
.logger
.debug(logging_text
+ step
)
8339 if vca_type
== "native_charm":
8340 config_primitive
= next(
8341 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
8344 if config_primitive
:
8345 config
= self
._map
_primitive
_params
(
8346 config_primitive
, {}, deploy_params
8348 await self
.vca_map
[vca_type
].install_configuration_sw(
8350 artifact_path
=artifact_path
,
8358 # write in db flag of configuration_sw already installed
8360 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
8363 # Not sure if this need to be done when healing
8365 # add relations for this VCA (wait for other peers related with this VCA)
8366 await self._add_vca_relations(
8367 logging_text=logging_text,
8370 vca_index=vca_index,
8374 # if SSH access is required, then get execution environment SSH public
8375 # if native charm we have waited already to VM be UP
8376 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
8379 # self.logger.debug("get ssh key block")
8381 config_descriptor
, ("config-access", "ssh-access", "required")
8383 # self.logger.debug("ssh key needed")
8384 # Needed to inject a ssh key
8387 ("config-access", "ssh-access", "default-user"),
8389 step
= "Install configuration Software, getting public ssh key"
8390 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
8391 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
8394 step
= "Insert public key into VM user={} ssh_key={}".format(
8398 # self.logger.debug("no need to get ssh key")
8399 step
= "Waiting to VM being up and getting IP address"
8400 self
.logger
.debug(logging_text
+ step
)
8402 # n2vc_redesign STEP 5.1
8403 # wait for RO (ip-address) Insert pub_key into VM
8404 # IMPORTANT: We need do wait for RO to complete healing operation.
8405 await self
._wait
_heal
_ro
(nsr_id
, self
.timeout
.ns_heal
)
8408 rw_mgmt_ip
= await self
.wait_kdu_up(
8409 logging_text
, nsr_id
, vnfr_id
, kdu_name
8412 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8422 rw_mgmt_ip
= None # This is for a NS configuration
8424 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
8426 # store rw_mgmt_ip in deploy params for later replacement
8427 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
8430 # get run-day1 operation parameter
8431 runDay1
= deploy_params
.get("run-day1", False)
8433 "Healing vnf={}, vdu={}, runDay1 ={}".format(vnfr_id
, vdu_id
, runDay1
)
8436 # n2vc_redesign STEP 6 Execute initial config primitive
8437 step
= "execute initial config primitive"
8439 # wait for dependent primitives execution (NS -> VNF -> VDU)
8440 if initial_config_primitive_list
:
8441 await self
._wait
_dependent
_n
2vc
(
8442 nsr_id
, vca_deployed_list
, vca_index
8445 # stage, in function of element type: vdu, kdu, vnf or ns
8446 my_vca
= vca_deployed_list
[vca_index
]
8447 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
8449 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
8450 elif my_vca
.get("member-vnf-index"):
8452 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
8455 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
8457 self
._write
_configuration
_status
(
8458 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
8461 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
8463 check_if_terminated_needed
= True
8464 for initial_config_primitive
in initial_config_primitive_list
:
8465 # adding information on the vca_deployed if it is a NS execution environment
8466 if not vca_deployed
["member-vnf-index"]:
8467 deploy_params
["ns_config_info"] = json
.dumps(
8468 self
._get
_ns
_config
_info
(nsr_id
)
8470 # TODO check if already done
8471 primitive_params_
= self
._map
_primitive
_params
(
8472 initial_config_primitive
, {}, deploy_params
8475 step
= "execute primitive '{}' params '{}'".format(
8476 initial_config_primitive
["name"], primitive_params_
8478 self
.logger
.debug(logging_text
+ step
)
8479 await self
.vca_map
[vca_type
].exec_primitive(
8481 primitive_name
=initial_config_primitive
["name"],
8482 params_dict
=primitive_params_
,
8487 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
8488 if check_if_terminated_needed
:
8489 if config_descriptor
.get("terminate-config-primitive"):
8493 {db_update_entry
+ "needed_terminate": True},
8495 check_if_terminated_needed
= False
8497 # TODO register in database that primitive is done
8499 # STEP 7 Configure metrics
8500 # Not sure if this need to be done when healing
8502 if vca_type == "helm" or vca_type == "helm-v3":
8503 prometheus_jobs = await self.extract_prometheus_scrape_jobs(
8505 artifact_path=artifact_path,
8506 ee_config_descriptor=ee_config_descriptor,
8509 target_ip=rw_mgmt_ip,
8515 {db_update_entry + "prometheus_jobs": prometheus_jobs},
8518 for job in prometheus_jobs:
8521 {"job_name": job["job_name"]},
8524 fail_on_empty=False,
8528 step
= "instantiated at VCA"
8529 self
.logger
.debug(logging_text
+ step
)
8531 self
._write
_configuration
_status
(
8532 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
8535 except Exception as e
: # TODO not use Exception but N2VC exception
8536 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
8538 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
8541 "Exception while {} : {}".format(step
, e
), exc_info
=True
8543 self
._write
_configuration
_status
(
8544 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
8546 raise LcmException("{} {}".format(step
, e
)) from e
8548 async def _wait_heal_ro(
8554 while time() <= start_time
+ timeout
:
8555 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
8556 operational_status_ro
= db_nsr
["_admin"]["deployed"]["RO"][
8557 "operational-status"
8559 self
.logger
.debug("Wait Heal RO > {}".format(operational_status_ro
))
8560 if operational_status_ro
!= "healing":
8562 await asyncio
.sleep(15, loop
=self
.loop
)
8563 else: # timeout_ns_deploy
8564 raise NgRoException("Timeout waiting ns to deploy")
8566 async def vertical_scale(self
, nsr_id
, nslcmop_id
):
8568 Vertical Scale the VDUs in a NS
8570 :param: nsr_id: NS Instance ID
8571 :param: nslcmop_id: nslcmop ID of migrate
8574 # Try to lock HA task here
8575 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
8576 if not task_is_locked_by_me
:
8578 logging_text
= "Task ns={} vertical scale ".format(nsr_id
)
8579 self
.logger
.debug(logging_text
+ "Enter")
8580 # get all needed from database
8582 db_nslcmop_update
= {}
8583 nslcmop_operation_state
= None
8587 # in case of error, indicates what part of scale was failed to put nsr at error status
8588 start_deploy
= time()
8591 # wait for any previous tasks in process
8592 step
= "Waiting for previous operations to terminate"
8593 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
8595 self
._write
_ns
_status
(
8598 current_operation
="VerticalScale",
8599 current_operation_id
=nslcmop_id
,
8601 step
= "Getting nslcmop from database"
8603 step
+ " after having waited for previous tasks to be completed"
8605 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
8606 operationParams
= db_nslcmop
.get("operationParams")
8608 target
.update(operationParams
)
8609 desc
= await self
.RO
.vertical_scale(nsr_id
, target
)
8610 self
.logger
.debug("RO return > {}".format(desc
))
8611 action_id
= desc
["action_id"]
8612 await self
._wait
_ng
_ro
(
8617 self
.timeout
.verticalscale
,
8618 operation
="verticalscale",
8620 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
8621 self
.logger
.error("Exit Exception {}".format(e
))
8623 except asyncio
.CancelledError
:
8624 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
8625 exc
= "Operation was cancelled"
8626 except Exception as e
:
8627 exc
= traceback
.format_exc()
8628 self
.logger
.critical(
8629 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
8632 self
._write
_ns
_status
(
8635 current_operation
="IDLE",
8636 current_operation_id
=None,
8639 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
8640 nslcmop_operation_state
= "FAILED"
8642 nslcmop_operation_state
= "COMPLETED"
8643 db_nslcmop_update
["detailed-status"] = "Done"
8644 db_nsr_update
["detailed-status"] = "Done"
8646 self
._write
_op
_status
(
8650 operation_state
=nslcmop_operation_state
,
8651 other_update
=db_nslcmop_update
,
8653 if nslcmop_operation_state
:
8657 "nslcmop_id": nslcmop_id
,
8658 "operationState": nslcmop_operation_state
,
8660 await self
.msg
.aiowrite("ns", "verticalscaled", msg
, loop
=self
.loop
)
8661 except Exception as e
:
8663 logging_text
+ "kafka_write notification Exception {}".format(e
)
8665 self
.logger
.debug(logging_text
+ "Exit")
8666 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_verticalscale")