1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 from typing
import Any
, Dict
, List
24 import logging
.handlers
36 from osm_lcm
import ROclient
37 from osm_lcm
.data_utils
.lcm_config
import LcmCfg
38 from osm_lcm
.data_utils
.nsr
import (
41 get_deployed_vca_list
,
44 from osm_lcm
.data_utils
.vca
import (
53 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
54 from osm_lcm
.lcm_utils
import (
61 check_juju_bundle_existence
,
62 get_charm_artifact_path
,
65 from osm_lcm
.data_utils
.nsd
import (
66 get_ns_configuration_relation_list
,
70 from osm_lcm
.data_utils
.vnfd
import (
76 get_ee_sorted_initial_config_primitive_list
,
77 get_ee_sorted_terminate_config_primitive_list
,
79 get_virtual_link_profiles
,
84 get_number_of_instances
,
86 get_kdu_resource_profile
,
87 find_software_version
,
90 from osm_lcm
.data_utils
.list_utils
import find_in_list
91 from osm_lcm
.data_utils
.vnfr
import (
95 get_volumes_from_instantiation_params
,
97 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
98 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
99 from n2vc
.definitions
import RelationEndpoint
100 from n2vc
.k8s_helm_conn
import K8sHelmConnector
101 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
102 from n2vc
.k8s_juju_conn
import K8sJujuConnector
104 from osm_common
.dbbase
import DbException
105 from osm_common
.fsbase
import FsException
107 from osm_lcm
.data_utils
.database
.database
import Database
108 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
109 from osm_lcm
.data_utils
.wim
import (
111 get_target_wim_attrs
,
112 select_feasible_wim_account
,
115 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
116 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
118 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
119 from osm_lcm
.osm_config
import OsmConfigBuilder
120 from osm_lcm
.prometheus
import parse_job
122 from copy
import copy
, deepcopy
123 from time
import time
124 from uuid
import uuid4
126 from random
import randint
128 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
131 class NsLcm(LcmBase
):
132 SUBOPERATION_STATUS_NOT_FOUND
= -1
133 SUBOPERATION_STATUS_NEW
= -2
134 SUBOPERATION_STATUS_SKIP
= -3
135 task_name_deploy_vca
= "Deploying VCA"
137 def __init__(self
, msg
, lcm_tasks
, config
: LcmCfg
, loop
):
139 Init, Connect to database, filesystem storage, and messaging
140 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
143 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
145 self
.db
= Database().instance
.db
146 self
.fs
= Filesystem().instance
.fs
148 self
.lcm_tasks
= lcm_tasks
149 self
.timeout
= config
.timeout
150 self
.ro_config
= config
.RO
151 self
.vca_config
= config
.VCA
153 # create N2VC connector
154 self
.n2vc
= N2VCJujuConnector(
157 on_update_db
=self
._on
_update
_n
2vc
_db
,
162 self
.conn_helm_ee
= LCMHelmConn(
165 vca_config
=self
.vca_config
,
166 on_update_db
=self
._on
_update
_n
2vc
_db
,
169 self
.k8sclusterhelm2
= K8sHelmConnector(
170 kubectl_command
=self
.vca_config
.kubectlpath
,
171 helm_command
=self
.vca_config
.helmpath
,
178 self
.k8sclusterhelm3
= K8sHelm3Connector(
179 kubectl_command
=self
.vca_config
.kubectlpath
,
180 helm_command
=self
.vca_config
.helm3path
,
187 self
.k8sclusterjuju
= K8sJujuConnector(
188 kubectl_command
=self
.vca_config
.kubectlpath
,
189 juju_command
=self
.vca_config
.jujupath
,
192 on_update_db
=self
._on
_update
_k
8s
_db
,
197 self
.k8scluster_map
= {
198 "helm-chart": self
.k8sclusterhelm2
,
199 "helm-chart-v3": self
.k8sclusterhelm3
,
200 "chart": self
.k8sclusterhelm3
,
201 "juju-bundle": self
.k8sclusterjuju
,
202 "juju": self
.k8sclusterjuju
,
206 "lxc_proxy_charm": self
.n2vc
,
207 "native_charm": self
.n2vc
,
208 "k8s_proxy_charm": self
.n2vc
,
209 "helm": self
.conn_helm_ee
,
210 "helm-v3": self
.conn_helm_ee
,
214 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
.to_dict())
216 self
.op_status_map
= {
217 "instantiation": self
.RO
.status
,
218 "termination": self
.RO
.status
,
219 "migrate": self
.RO
.status
,
220 "healing": self
.RO
.recreate_status
,
221 "verticalscale": self
.RO
.status
,
222 "start_stop_rebuild": self
.RO
.status
,
226 def increment_ip_mac(ip_mac
, vm_index
=1):
227 if not isinstance(ip_mac
, str):
230 # try with ipv4 look for last dot
231 i
= ip_mac
.rfind(".")
234 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
235 # try with ipv6 or mac look for last colon. Operate in hex
236 i
= ip_mac
.rfind(":")
239 # format in hex, len can be 2 for mac or 4 for ipv6
240 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
241 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
247 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
249 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
252 # TODO filter RO descriptor fields...
256 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
257 db_dict
["deploymentStatus"] = ro_descriptor
258 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
260 except Exception as e
:
262 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id
, e
)
265 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
267 # remove last dot from path (if exists)
268 if path
.endswith("."):
271 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
272 # .format(table, filter, path, updated_data))
275 nsr_id
= filter.get("_id")
277 # read ns record from database
278 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
279 current_ns_status
= nsr
.get("nsState")
281 # get vca status for NS
282 status_dict
= await self
.n2vc
.get_status(
283 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
288 db_dict
["vcaStatus"] = status_dict
290 # update configurationStatus for this VCA
292 vca_index
= int(path
[path
.rfind(".") + 1 :])
295 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
297 vca_status
= vca_list
[vca_index
].get("status")
299 configuration_status_list
= nsr
.get("configurationStatus")
300 config_status
= configuration_status_list
[vca_index
].get("status")
302 if config_status
== "BROKEN" and vca_status
!= "failed":
303 db_dict
["configurationStatus"][vca_index
] = "READY"
304 elif config_status
!= "BROKEN" and vca_status
== "failed":
305 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
306 except Exception as e
:
307 # not update configurationStatus
308 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
310 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
311 # if nsState = 'DEGRADED' check if all is OK
313 if current_ns_status
in ("READY", "DEGRADED"):
314 error_description
= ""
316 if status_dict
.get("machines"):
317 for machine_id
in status_dict
.get("machines"):
318 machine
= status_dict
.get("machines").get(machine_id
)
319 # check machine agent-status
320 if machine
.get("agent-status"):
321 s
= machine
.get("agent-status").get("status")
324 error_description
+= (
325 "machine {} agent-status={} ; ".format(
329 # check machine instance status
330 if machine
.get("instance-status"):
331 s
= machine
.get("instance-status").get("status")
334 error_description
+= (
335 "machine {} instance-status={} ; ".format(
340 if status_dict
.get("applications"):
341 for app_id
in status_dict
.get("applications"):
342 app
= status_dict
.get("applications").get(app_id
)
343 # check application status
344 if app
.get("status"):
345 s
= app
.get("status").get("status")
348 error_description
+= (
349 "application {} status={} ; ".format(app_id
, s
)
352 if error_description
:
353 db_dict
["errorDescription"] = error_description
354 if current_ns_status
== "READY" and is_degraded
:
355 db_dict
["nsState"] = "DEGRADED"
356 if current_ns_status
== "DEGRADED" and not is_degraded
:
357 db_dict
["nsState"] = "READY"
360 self
.update_db_2("nsrs", nsr_id
, db_dict
)
362 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
364 except Exception as e
:
365 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
367 async def _on_update_k8s_db(
368 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None, cluster_type
="juju"
371 Updating vca status in NSR record
372 :param cluster_uuid: UUID of a k8s cluster
373 :param kdu_instance: The unique name of the KDU instance
374 :param filter: To get nsr_id
375 :cluster_type: The cluster type (juju, k8s)
379 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
380 # .format(cluster_uuid, kdu_instance, filter))
382 nsr_id
= filter.get("_id")
384 vca_status
= await self
.k8scluster_map
[cluster_type
].status_kdu(
385 cluster_uuid
=cluster_uuid
,
386 kdu_instance
=kdu_instance
,
388 complete_status
=True,
394 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
397 f
"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
401 self
.update_db_2("nsrs", nsr_id
, db_dict
)
402 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
404 except Exception as e
:
405 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
408 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
411 undefined
=StrictUndefined
,
412 autoescape
=select_autoescape(default_for_string
=True, default
=True),
414 template
= env
.from_string(cloud_init_text
)
415 return template
.render(additional_params
or {})
416 except UndefinedError
as e
:
418 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
419 "file, must be provided in the instantiation parameters inside the "
420 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
422 except (TemplateError
, TemplateNotFound
) as e
:
424 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
429 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
430 cloud_init_content
= cloud_init_file
= None
432 if vdu
.get("cloud-init-file"):
433 base_folder
= vnfd
["_admin"]["storage"]
434 if base_folder
["pkg-dir"]:
435 cloud_init_file
= "{}/{}/cloud_init/{}".format(
436 base_folder
["folder"],
437 base_folder
["pkg-dir"],
438 vdu
["cloud-init-file"],
441 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
442 base_folder
["folder"],
443 vdu
["cloud-init-file"],
445 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
446 cloud_init_content
= ci_file
.read()
447 elif vdu
.get("cloud-init"):
448 cloud_init_content
= vdu
["cloud-init"]
450 return cloud_init_content
451 except FsException
as e
:
453 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
454 vnfd
["id"], vdu
["id"], cloud_init_file
, e
458 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
460 (vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]), {}
462 additional_params
= vdur
.get("additionalParams")
463 return parse_yaml_strings(additional_params
)
465 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
467 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
468 :param vnfd: input vnfd
469 :param new_id: overrides vnf id if provided
470 :param additionalParams: Instantiation params for VNFs provided
471 :param nsrId: Id of the NSR
472 :return: copy of vnfd
474 vnfd_RO
= deepcopy(vnfd
)
475 # remove unused by RO configuration, monitoring, scaling and internal keys
476 vnfd_RO
.pop("_id", None)
477 vnfd_RO
.pop("_admin", None)
478 vnfd_RO
.pop("monitoring-param", None)
479 vnfd_RO
.pop("scaling-group-descriptor", None)
480 vnfd_RO
.pop("kdu", None)
481 vnfd_RO
.pop("k8s-cluster", None)
483 vnfd_RO
["id"] = new_id
485 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
486 for vdu
in get_iterable(vnfd_RO
, "vdu"):
487 vdu
.pop("cloud-init-file", None)
488 vdu
.pop("cloud-init", None)
492 def ip_profile_2_RO(ip_profile
):
493 RO_ip_profile
= deepcopy(ip_profile
)
494 if "dns-server" in RO_ip_profile
:
495 if isinstance(RO_ip_profile
["dns-server"], list):
496 RO_ip_profile
["dns-address"] = []
497 for ds
in RO_ip_profile
.pop("dns-server"):
498 RO_ip_profile
["dns-address"].append(ds
["address"])
500 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
501 if RO_ip_profile
.get("ip-version") == "ipv4":
502 RO_ip_profile
["ip-version"] = "IPv4"
503 if RO_ip_profile
.get("ip-version") == "ipv6":
504 RO_ip_profile
["ip-version"] = "IPv6"
505 if "dhcp-params" in RO_ip_profile
:
506 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
509 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
510 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
511 if db_vim
["_admin"]["operationalState"] != "ENABLED":
513 "VIM={} is not available. operationalState={}".format(
514 vim_account
, db_vim
["_admin"]["operationalState"]
517 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
520 def get_ro_wim_id_for_wim_account(self
, wim_account
):
521 if isinstance(wim_account
, str):
522 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
523 if db_wim
["_admin"]["operationalState"] != "ENABLED":
525 "WIM={} is not available. operationalState={}".format(
526 wim_account
, db_wim
["_admin"]["operationalState"]
529 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
534 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
536 db_vdu_push_list
= []
538 db_update
= {"_admin.modified": time()}
540 for vdu_id
, vdu_count
in vdu_create
.items():
544 for vdur
in reversed(db_vnfr
["vdur"])
545 if vdur
["vdu-id-ref"] == vdu_id
550 # Read the template saved in the db:
552 "No vdur in the database. Using the vdur-template to scale"
554 vdur_template
= db_vnfr
.get("vdur-template")
555 if not vdur_template
:
557 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
561 vdur
= vdur_template
[0]
562 # Delete a template from the database after using it
565 {"_id": db_vnfr
["_id"]},
567 pull
={"vdur-template": {"_id": vdur
["_id"]}},
569 for count
in range(vdu_count
):
570 vdur_copy
= deepcopy(vdur
)
571 vdur_copy
["status"] = "BUILD"
572 vdur_copy
["status-detailed"] = None
573 vdur_copy
["ip-address"] = None
574 vdur_copy
["_id"] = str(uuid4())
575 vdur_copy
["count-index"] += count
+ 1
576 vdur_copy
["id"] = "{}-{}".format(
577 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
579 vdur_copy
.pop("vim_info", None)
580 for iface
in vdur_copy
["interfaces"]:
581 if iface
.get("fixed-ip"):
582 iface
["ip-address"] = self
.increment_ip_mac(
583 iface
["ip-address"], count
+ 1
586 iface
.pop("ip-address", None)
587 if iface
.get("fixed-mac"):
588 iface
["mac-address"] = self
.increment_ip_mac(
589 iface
["mac-address"], count
+ 1
592 iface
.pop("mac-address", None)
596 ) # only first vdu can be managment of vnf
597 db_vdu_push_list
.append(vdur_copy
)
598 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
600 if len(db_vnfr
["vdur"]) == 1:
601 # The scale will move to 0 instances
603 "Scaling to 0 !, creating the template with the last vdur"
605 template_vdur
= [db_vnfr
["vdur"][0]]
606 for vdu_id
, vdu_count
in vdu_delete
.items():
608 indexes_to_delete
= [
610 for iv
in enumerate(db_vnfr
["vdur"])
611 if iv
[1]["vdu-id-ref"] == vdu_id
615 "vdur.{}.status".format(i
): "DELETING"
616 for i
in indexes_to_delete
[-vdu_count
:]
620 # it must be deleted one by one because common.db does not allow otherwise
623 for v
in reversed(db_vnfr
["vdur"])
624 if v
["vdu-id-ref"] == vdu_id
626 for vdu
in vdus_to_delete
[:vdu_count
]:
629 {"_id": db_vnfr
["_id"]},
631 pull
={"vdur": {"_id": vdu
["_id"]}},
635 db_push
["vdur"] = db_vdu_push_list
637 db_push
["vdur-template"] = template_vdur
640 db_vnfr
["vdur-template"] = template_vdur
641 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
642 # modify passed dictionary db_vnfr
643 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
644 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
646 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
648 Updates database nsr with the RO info for the created vld
649 :param ns_update_nsr: dictionary to be filled with the updated info
650 :param db_nsr: content of db_nsr. This is also modified
651 :param nsr_desc_RO: nsr descriptor from RO
652 :return: Nothing, LcmException is raised on errors
655 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
656 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
657 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
659 vld
["vim-id"] = net_RO
.get("vim_net_id")
660 vld
["name"] = net_RO
.get("vim_name")
661 vld
["status"] = net_RO
.get("status")
662 vld
["status-detailed"] = net_RO
.get("error_msg")
663 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
667 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
670 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
672 for db_vnfr
in db_vnfrs
.values():
673 vnfr_update
= {"status": "ERROR"}
674 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
675 if "status" not in vdur
:
676 vdur
["status"] = "ERROR"
677 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
679 vdur
["status-detailed"] = str(error_text
)
681 "vdur.{}.status-detailed".format(vdu_index
)
683 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
684 except DbException
as e
:
685 self
.logger
.error("Cannot update vnf. {}".format(e
))
687 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
689 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
690 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
691 :param nsr_desc_RO: nsr descriptor from RO
692 :return: Nothing, LcmException is raised on errors
694 for vnf_index
, db_vnfr
in db_vnfrs
.items():
695 for vnf_RO
in nsr_desc_RO
["vnfs"]:
696 if vnf_RO
["member_vnf_index"] != vnf_index
:
699 if vnf_RO
.get("ip_address"):
700 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
[
703 elif not db_vnfr
.get("ip-address"):
704 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
705 raise LcmExceptionNoMgmtIP(
706 "ns member_vnf_index '{}' has no IP address".format(
711 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
712 vdur_RO_count_index
= 0
713 if vdur
.get("pdu-type"):
715 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
716 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
718 if vdur
["count-index"] != vdur_RO_count_index
:
719 vdur_RO_count_index
+= 1
721 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
722 if vdur_RO
.get("ip_address"):
723 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
725 vdur
["ip-address"] = None
726 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
727 vdur
["name"] = vdur_RO
.get("vim_name")
728 vdur
["status"] = vdur_RO
.get("status")
729 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
730 for ifacer
in get_iterable(vdur
, "interfaces"):
731 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
732 if ifacer
["name"] == interface_RO
.get("internal_name"):
733 ifacer
["ip-address"] = interface_RO
.get(
736 ifacer
["mac-address"] = interface_RO
.get(
742 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
743 "from VIM info".format(
744 vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]
747 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
751 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
753 vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]
757 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
758 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
759 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
761 vld
["vim-id"] = net_RO
.get("vim_net_id")
762 vld
["name"] = net_RO
.get("vim_name")
763 vld
["status"] = net_RO
.get("status")
764 vld
["status-detailed"] = net_RO
.get("error_msg")
765 vnfr_update
["vld.{}".format(vld_index
)] = vld
769 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
774 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
779 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
784 def _get_ns_config_info(self
, nsr_id
):
786 Generates a mapping between vnf,vdu elements and the N2VC id
787 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
788 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
789 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
790 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
792 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
793 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
795 ns_config_info
= {"osm-config-mapping": mapping
}
796 for vca
in vca_deployed_list
:
797 if not vca
["member-vnf-index"]:
799 if not vca
["vdu_id"]:
800 mapping
[vca
["member-vnf-index"]] = vca
["application"]
804 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
806 ] = vca
["application"]
807 return ns_config_info
809 async def _instantiate_ng_ro(
826 def get_vim_account(vim_account_id
):
828 if vim_account_id
in db_vims
:
829 return db_vims
[vim_account_id
]
830 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
831 db_vims
[vim_account_id
] = db_vim
834 # modify target_vld info with instantiation parameters
835 def parse_vld_instantiation_params(
836 target_vim
, target_vld
, vld_params
, target_sdn
838 if vld_params
.get("ip-profile"):
839 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
[
842 if vld_params
.get("provider-network"):
843 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
846 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
847 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
851 # check if WIM is needed; if needed, choose a feasible WIM able to connect VIMs
852 # if wim_account_id is specified in vld_params, validate if it is feasible.
853 wim_account_id
, db_wim
= select_feasible_wim_account(
854 db_nsr
, db_vnfrs
, target_vld
, vld_params
, self
.logger
858 # WIM is needed and a feasible one was found, populate WIM target and SDN ports
859 self
.logger
.info("WIM selected: {:s}".format(str(wim_account_id
)))
860 # update vld_params with correct WIM account Id
861 vld_params
["wimAccountId"] = wim_account_id
863 target_wim
= "wim:{}".format(wim_account_id
)
864 target_wim_attrs
= get_target_wim_attrs(nsr_id
, target_vld
, vld_params
)
865 sdn_ports
= get_sdn_ports(vld_params
, db_wim
)
866 if len(sdn_ports
) > 0:
867 target_vld
["vim_info"][target_wim
] = target_wim_attrs
868 target_vld
["vim_info"][target_wim
]["sdn-ports"] = sdn_ports
871 "Target VLD with WIM data: {:s}".format(str(target_vld
))
874 for param
in ("vim-network-name", "vim-network-id"):
875 if vld_params
.get(param
):
876 if isinstance(vld_params
[param
], dict):
877 for vim
, vim_net
in vld_params
[param
].items():
878 other_target_vim
= "vim:" + vim
880 target_vld
["vim_info"],
881 (other_target_vim
, param
.replace("-", "_")),
884 else: # isinstance str
885 target_vld
["vim_info"][target_vim
][
886 param
.replace("-", "_")
887 ] = vld_params
[param
]
888 if vld_params
.get("common_id"):
889 target_vld
["common_id"] = vld_params
.get("common_id")
891 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
892 def update_ns_vld_target(target
, ns_params
):
893 for vnf_params
in ns_params
.get("vnf", ()):
894 if vnf_params
.get("vimAccountId"):
898 for vnfr
in db_vnfrs
.values()
899 if vnf_params
["member-vnf-index"]
900 == vnfr
["member-vnf-index-ref"]
904 vdur
= next((vdur
for vdur
in target_vnf
.get("vdur", ())), None)
907 for a_index
, a_vld
in enumerate(target
["ns"]["vld"]):
908 target_vld
= find_in_list(
909 get_iterable(vdur
, "interfaces"),
910 lambda iface
: iface
.get("ns-vld-id") == a_vld
["name"],
913 vld_params
= find_in_list(
914 get_iterable(ns_params
, "vld"),
915 lambda v_vld
: v_vld
["name"] in (a_vld
["name"], a_vld
["id"]),
919 if vnf_params
.get("vimAccountId") not in a_vld
.get(
922 target_vim_network_list
= [
923 v
for _
, v
in a_vld
.get("vim_info").items()
925 target_vim_network_name
= next(
927 item
.get("vim_network_name", "")
928 for item
in target_vim_network_list
933 target
["ns"]["vld"][a_index
].get("vim_info").update(
935 "vim:{}".format(vnf_params
["vimAccountId"]): {
936 "vim_network_name": target_vim_network_name
,
942 for param
in ("vim-network-name", "vim-network-id"):
943 if vld_params
.get(param
) and isinstance(
944 vld_params
[param
], dict
946 for vim
, vim_net
in vld_params
[
949 other_target_vim
= "vim:" + vim
951 target
["ns"]["vld"][a_index
].get(
956 param
.replace("-", "_"),
961 nslcmop_id
= db_nslcmop
["_id"]
963 "name": db_nsr
["name"],
966 "image": deepcopy(db_nsr
["image"]),
967 "flavor": deepcopy(db_nsr
["flavor"]),
968 "action_id": nslcmop_id
,
969 "cloud_init_content": {},
971 for image
in target
["image"]:
972 image
["vim_info"] = {}
973 for flavor
in target
["flavor"]:
974 flavor
["vim_info"] = {}
975 if db_nsr
.get("affinity-or-anti-affinity-group"):
976 target
["affinity-or-anti-affinity-group"] = deepcopy(
977 db_nsr
["affinity-or-anti-affinity-group"]
979 for affinity_or_anti_affinity_group
in target
[
980 "affinity-or-anti-affinity-group"
982 affinity_or_anti_affinity_group
["vim_info"] = {}
984 if db_nslcmop
.get("lcmOperationType") != "instantiate":
985 # get parameters of instantiation:
986 db_nslcmop_instantiate
= self
.db
.get_list(
989 "nsInstanceId": db_nslcmop
["nsInstanceId"],
990 "lcmOperationType": "instantiate",
993 ns_params
= db_nslcmop_instantiate
.get("operationParams")
995 ns_params
= db_nslcmop
.get("operationParams")
996 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
997 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
1000 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
1001 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
1004 "name": vld
["name"],
1005 "mgmt-network": vld
.get("mgmt-network", False),
1006 "type": vld
.get("type"),
1009 "vim_network_name": vld
.get("vim-network-name"),
1010 "vim_account_id": ns_params
["vimAccountId"],
1014 # check if this network needs SDN assist
1015 if vld
.get("pci-interfaces"):
1016 db_vim
= get_vim_account(ns_params
["vimAccountId"])
1017 if vim_config
:= db_vim
.get("config"):
1018 if sdnc_id
:= vim_config
.get("sdn-controller"):
1019 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
1020 target_sdn
= "sdn:{}".format(sdnc_id
)
1021 target_vld
["vim_info"][target_sdn
] = {
1023 "target_vim": target_vim
,
1025 "type": vld
.get("type"),
1028 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
1029 for nsd_vnf_profile
in nsd_vnf_profiles
:
1030 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
1031 if cp
["virtual-link-profile-id"] == vld
["id"]:
1033 "member_vnf:{}.{}".format(
1034 cp
["constituent-cpd-id"][0][
1035 "constituent-base-element-id"
1037 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
1039 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
1041 # check at nsd descriptor, if there is an ip-profile
1043 nsd_vlp
= find_in_list(
1044 get_virtual_link_profiles(nsd
),
1045 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
1050 and nsd_vlp
.get("virtual-link-protocol-data")
1051 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1053 ip_profile_source_data
= nsd_vlp
["virtual-link-protocol-data"][
1056 ip_profile_dest_data
= {}
1057 if "ip-version" in ip_profile_source_data
:
1058 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1061 if "cidr" in ip_profile_source_data
:
1062 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1065 if "gateway-ip" in ip_profile_source_data
:
1066 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
[
1069 if "dhcp-enabled" in ip_profile_source_data
:
1070 ip_profile_dest_data
["dhcp-params"] = {
1071 "enabled": ip_profile_source_data
["dhcp-enabled"]
1073 vld_params
["ip-profile"] = ip_profile_dest_data
1075 # update vld_params with instantiation params
1076 vld_instantiation_params
= find_in_list(
1077 get_iterable(ns_params
, "vld"),
1078 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
1080 if vld_instantiation_params
:
1081 vld_params
.update(vld_instantiation_params
)
1082 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
1083 target
["ns"]["vld"].append(target_vld
)
1084 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1085 update_ns_vld_target(target
, ns_params
)
1087 for vnfr
in db_vnfrs
.values():
1088 vnfd
= find_in_list(
1089 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
1091 vnf_params
= find_in_list(
1092 get_iterable(ns_params
, "vnf"),
1093 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
1095 target_vnf
= deepcopy(vnfr
)
1096 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
1097 for vld
in target_vnf
.get("vld", ()):
1098 # check if connected to a ns.vld, to fill target'
1099 vnf_cp
= find_in_list(
1100 vnfd
.get("int-virtual-link-desc", ()),
1101 lambda cpd
: cpd
.get("id") == vld
["id"],
1104 ns_cp
= "member_vnf:{}.{}".format(
1105 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
1107 if cp2target
.get(ns_cp
):
1108 vld
["target"] = cp2target
[ns_cp
]
1111 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
1113 # check if this network needs SDN assist
1115 if vld
.get("pci-interfaces"):
1116 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1117 sdnc_id
= db_vim
["config"].get("sdn-controller")
1119 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
1120 target_sdn
= "sdn:{}".format(sdnc_id
)
1121 vld
["vim_info"][target_sdn
] = {
1123 "target_vim": target_vim
,
1125 "type": vld
.get("type"),
1128 # check at vnfd descriptor, if there is an ip-profile
1130 vnfd_vlp
= find_in_list(
1131 get_virtual_link_profiles(vnfd
),
1132 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
1136 and vnfd_vlp
.get("virtual-link-protocol-data")
1137 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1139 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"][
1142 ip_profile_dest_data
= {}
1143 if "ip-version" in ip_profile_source_data
:
1144 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1147 if "cidr" in ip_profile_source_data
:
1148 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1151 if "gateway-ip" in ip_profile_source_data
:
1152 ip_profile_dest_data
[
1154 ] = ip_profile_source_data
["gateway-ip"]
1155 if "dhcp-enabled" in ip_profile_source_data
:
1156 ip_profile_dest_data
["dhcp-params"] = {
1157 "enabled": ip_profile_source_data
["dhcp-enabled"]
1160 vld_params
["ip-profile"] = ip_profile_dest_data
1161 # update vld_params with instantiation params
1163 vld_instantiation_params
= find_in_list(
1164 get_iterable(vnf_params
, "internal-vld"),
1165 lambda i_vld
: i_vld
["name"] == vld
["id"],
1167 if vld_instantiation_params
:
1168 vld_params
.update(vld_instantiation_params
)
1169 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
1172 for vdur
in target_vnf
.get("vdur", ()):
1173 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
1174 continue # This vdu must not be created
1175 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
1177 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
1180 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
1181 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
1184 and vdu_configuration
.get("config-access")
1185 and vdu_configuration
.get("config-access").get("ssh-access")
1187 vdur
["ssh-keys"] = ssh_keys_all
1188 vdur
["ssh-access-required"] = vdu_configuration
[
1190 ]["ssh-access"]["required"]
1193 and vnf_configuration
.get("config-access")
1194 and vnf_configuration
.get("config-access").get("ssh-access")
1195 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
1197 vdur
["ssh-keys"] = ssh_keys_all
1198 vdur
["ssh-access-required"] = vnf_configuration
[
1200 ]["ssh-access"]["required"]
1201 elif ssh_keys_instantiation
and find_in_list(
1202 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
1204 vdur
["ssh-keys"] = ssh_keys_instantiation
1206 self
.logger
.debug("NS > vdur > {}".format(vdur
))
1208 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1210 if vdud
.get("cloud-init-file"):
1211 vdur
["cloud-init"] = "{}:file:{}".format(
1212 vnfd
["_id"], vdud
.get("cloud-init-file")
1214 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1215 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1216 base_folder
= vnfd
["_admin"]["storage"]
1217 if base_folder
["pkg-dir"]:
1218 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1219 base_folder
["folder"],
1220 base_folder
["pkg-dir"],
1221 vdud
.get("cloud-init-file"),
1224 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
1225 base_folder
["folder"],
1226 vdud
.get("cloud-init-file"),
1228 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1229 target
["cloud_init_content"][
1232 elif vdud
.get("cloud-init"):
1233 vdur
["cloud-init"] = "{}:vdu:{}".format(
1234 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1236 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1237 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1240 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1241 deploy_params_vdu
= self
._format
_additional
_params
(
1242 vdur
.get("additionalParams") or {}
1244 deploy_params_vdu
["OSM"] = get_osm_params(
1245 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1247 vdur
["additionalParams"] = deploy_params_vdu
1250 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1251 if target_vim
not in ns_flavor
["vim_info"]:
1252 ns_flavor
["vim_info"][target_vim
] = {}
1255 # in case alternative images are provided we must check if they should be applied
1256 # for the vim_type, modify the vim_type taking into account
1257 ns_image_id
= int(vdur
["ns-image-id"])
1258 if vdur
.get("alt-image-ids"):
1259 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1260 vim_type
= db_vim
["vim_type"]
1261 for alt_image_id
in vdur
.get("alt-image-ids"):
1262 ns_alt_image
= target
["image"][int(alt_image_id
)]
1263 if vim_type
== ns_alt_image
.get("vim-type"):
1264 # must use alternative image
1266 "use alternative image id: {}".format(alt_image_id
)
1268 ns_image_id
= alt_image_id
1269 vdur
["ns-image-id"] = ns_image_id
1271 ns_image
= target
["image"][int(ns_image_id
)]
1272 if target_vim
not in ns_image
["vim_info"]:
1273 ns_image
["vim_info"][target_vim
] = {}
1276 if vdur
.get("affinity-or-anti-affinity-group-id"):
1277 for ags_id
in vdur
["affinity-or-anti-affinity-group-id"]:
1278 ns_ags
= target
["affinity-or-anti-affinity-group"][int(ags_id
)]
1279 if target_vim
not in ns_ags
["vim_info"]:
1280 ns_ags
["vim_info"][target_vim
] = {}
1282 vdur
["vim_info"] = {target_vim
: {}}
1283 # instantiation parameters
1285 vdu_instantiation_params
= find_in_list(
1286 get_iterable(vnf_params
, "vdu"),
1287 lambda i_vdu
: i_vdu
["id"] == vdud
["id"],
1289 if vdu_instantiation_params
:
1290 # Parse the vdu_volumes from the instantiation params
1291 vdu_volumes
= get_volumes_from_instantiation_params(
1292 vdu_instantiation_params
, vdud
1294 vdur
["additionalParams"]["OSM"]["vdu_volumes"] = vdu_volumes
1295 vdur_list
.append(vdur
)
1296 target_vnf
["vdur"] = vdur_list
1297 target
["vnf"].append(target_vnf
)
1299 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
1300 desc
= await self
.RO
.deploy(nsr_id
, target
)
1301 self
.logger
.debug("RO return > {}".format(desc
))
1302 action_id
= desc
["action_id"]
1303 await self
._wait
_ng
_ro
(
1310 operation
="instantiation",
1315 "_admin.deployed.RO.operational-status": "running",
1316 "detailed-status": " ".join(stage
),
1318 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1319 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1320 self
._write
_op
_status
(nslcmop_id
, stage
)
1322 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1326 async def _wait_ng_ro(
1336 detailed_status_old
= None
1338 start_time
= start_time
or time()
1339 while time() <= start_time
+ timeout
:
1340 desc_status
= await self
.op_status_map
[operation
](nsr_id
, action_id
)
1341 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1342 if desc_status
["status"] == "FAILED":
1343 raise NgRoException(desc_status
["details"])
1344 elif desc_status
["status"] == "BUILD":
1346 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1347 elif desc_status
["status"] == "DONE":
1349 stage
[2] = "Deployed at VIM"
1352 assert False, "ROclient.check_ns_status returns unknown {}".format(
1353 desc_status
["status"]
1355 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1356 detailed_status_old
= stage
[2]
1357 db_nsr_update
["detailed-status"] = " ".join(stage
)
1358 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1359 self
._write
_op
_status
(nslcmop_id
, stage
)
1360 await asyncio
.sleep(15, loop
=self
.loop
)
1361 else: # timeout_ns_deploy
1362 raise NgRoException("Timeout waiting ns to deploy")
1364 async def _terminate_ng_ro(
1365 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1370 start_deploy
= time()
1377 "action_id": nslcmop_id
,
1379 desc
= await self
.RO
.deploy(nsr_id
, target
)
1380 action_id
= desc
["action_id"]
1381 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1384 + "ns terminate action at RO. action_id={}".format(action_id
)
1388 delete_timeout
= 20 * 60 # 20 minutes
1389 await self
._wait
_ng
_ro
(
1396 operation
="termination",
1398 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1400 await self
.RO
.delete(nsr_id
)
1401 except NgRoException
as e
:
1402 if e
.http_code
== 404: # not found
1403 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1404 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1406 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1408 elif e
.http_code
== 409: # conflict
1409 failed_detail
.append("delete conflict: {}".format(e
))
1412 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1415 failed_detail
.append("delete error: {}".format(e
))
1418 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1420 except Exception as e
:
1421 failed_detail
.append("delete error: {}".format(e
))
1423 logging_text
+ "RO_action_id={} delete error: {}".format(action_id
, e
)
1427 stage
[2] = "Error deleting from VIM"
1429 stage
[2] = "Deleted from VIM"
1430 db_nsr_update
["detailed-status"] = " ".join(stage
)
1431 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1432 self
._write
_op
_status
(nslcmop_id
, stage
)
1435 raise LcmException("; ".join(failed_detail
))
1438 async def instantiate_RO(
1452 :param logging_text: preffix text to use at logging
1453 :param nsr_id: nsr identity
1454 :param nsd: database content of ns descriptor
1455 :param db_nsr: database content of ns record
1456 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1458 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1459 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1460 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1461 :return: None or exception
1464 start_deploy
= time()
1465 ns_params
= db_nslcmop
.get("operationParams")
1466 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1467 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1469 timeout_ns_deploy
= self
.timeout
.ns_deploy
1471 # Check for and optionally request placement optimization. Database will be updated if placement activated
1472 stage
[2] = "Waiting for Placement."
1473 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1474 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1475 for vnfr
in db_vnfrs
.values():
1476 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1479 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1481 return await self
._instantiate
_ng
_ro
(
1494 except Exception as e
:
1495 stage
[2] = "ERROR deploying at VIM"
1496 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1498 "Error deploying at VIM {}".format(e
),
1499 exc_info
=not isinstance(
1502 ROclient
.ROClientException
,
1511 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1513 Wait for kdu to be up, get ip address
1514 :param logging_text: prefix use for logging
1518 :return: IP address, K8s services
1521 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1524 while nb_tries
< 360:
1525 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1529 for x
in get_iterable(db_vnfr
, "kdur")
1530 if x
.get("kdu-name") == kdu_name
1536 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1538 if kdur
.get("status"):
1539 if kdur
["status"] in ("READY", "ENABLED"):
1540 return kdur
.get("ip-address"), kdur
.get("services")
1543 "target KDU={} is in error state".format(kdu_name
)
1546 await asyncio
.sleep(10, loop
=self
.loop
)
1548 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1550 async def wait_vm_up_insert_key_ro(
1551 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1554 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1555 :param logging_text: prefix use for logging
1560 :param pub_key: public ssh key to inject, None to skip
1561 :param user: user to apply the public ssh key
1565 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1567 target_vdu_id
= None
1573 if ro_retries
>= 360: # 1 hour
1575 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1578 await asyncio
.sleep(10, loop
=self
.loop
)
1581 if not target_vdu_id
:
1582 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1584 if not vdu_id
: # for the VNF case
1585 if db_vnfr
.get("status") == "ERROR":
1587 "Cannot inject ssh-key because target VNF is in error state"
1589 ip_address
= db_vnfr
.get("ip-address")
1595 for x
in get_iterable(db_vnfr
, "vdur")
1596 if x
.get("ip-address") == ip_address
1604 for x
in get_iterable(db_vnfr
, "vdur")
1605 if x
.get("vdu-id-ref") == vdu_id
1606 and x
.get("count-index") == vdu_index
1612 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1613 ): # If only one, this should be the target vdu
1614 vdur
= db_vnfr
["vdur"][0]
1617 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1618 vnfr_id
, vdu_id
, vdu_index
1621 # New generation RO stores information at "vim_info"
1624 if vdur
.get("vim_info"):
1626 t
for t
in vdur
["vim_info"]
1627 ) # there should be only one key
1628 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1630 vdur
.get("pdu-type")
1631 or vdur
.get("status") == "ACTIVE"
1632 or ng_ro_status
== "ACTIVE"
1634 ip_address
= vdur
.get("ip-address")
1637 target_vdu_id
= vdur
["vdu-id-ref"]
1638 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1640 "Cannot inject ssh-key because target VM is in error state"
1643 if not target_vdu_id
:
1646 # inject public key into machine
1647 if pub_key
and user
:
1648 self
.logger
.debug(logging_text
+ "Inserting RO key")
1649 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1650 if vdur
.get("pdu-type"):
1651 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1656 "action": "inject_ssh_key",
1660 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1662 desc
= await self
.RO
.deploy(nsr_id
, target
)
1663 action_id
= desc
["action_id"]
1664 await self
._wait
_ng
_ro
(
1665 nsr_id
, action_id
, timeout
=600, operation
="instantiation"
1668 except NgRoException
as e
:
1670 "Reaching max tries injecting key. Error: {}".format(e
)
1677 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1679 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1681 my_vca
= vca_deployed_list
[vca_index
]
1682 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1683 # vdu or kdu: no dependencies
1687 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1688 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1689 configuration_status_list
= db_nsr
["configurationStatus"]
1690 for index
, vca_deployed
in enumerate(configuration_status_list
):
1691 if index
== vca_index
:
1694 if not my_vca
.get("member-vnf-index") or (
1695 vca_deployed
.get("member-vnf-index")
1696 == my_vca
.get("member-vnf-index")
1698 internal_status
= configuration_status_list
[index
].get("status")
1699 if internal_status
== "READY":
1701 elif internal_status
== "BROKEN":
1703 "Configuration aborted because dependent charm/s has failed"
1708 # no dependencies, return
1710 await asyncio
.sleep(10)
1713 raise LcmException("Configuration aborted because dependent charm/s timeout")
1715 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1718 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1720 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1721 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1724 async def instantiate_N2VC(
1742 ee_config_descriptor
,
1744 nsr_id
= db_nsr
["_id"]
1745 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1746 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1747 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1748 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1750 "collection": "nsrs",
1751 "filter": {"_id": nsr_id
},
1752 "path": db_update_entry
,
1758 element_under_configuration
= nsr_id
1762 vnfr_id
= db_vnfr
["_id"]
1763 osm_config
["osm"]["vnf_id"] = vnfr_id
1765 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1767 if vca_type
== "native_charm":
1770 index_number
= vdu_index
or 0
1773 element_type
= "VNF"
1774 element_under_configuration
= vnfr_id
1775 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1777 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1778 element_type
= "VDU"
1779 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1780 osm_config
["osm"]["vdu_id"] = vdu_id
1782 namespace
+= ".{}".format(kdu_name
)
1783 element_type
= "KDU"
1784 element_under_configuration
= kdu_name
1785 osm_config
["osm"]["kdu_name"] = kdu_name
1788 if base_folder
["pkg-dir"]:
1789 artifact_path
= "{}/{}/{}/{}".format(
1790 base_folder
["folder"],
1791 base_folder
["pkg-dir"],
1794 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1799 artifact_path
= "{}/Scripts/{}/{}/".format(
1800 base_folder
["folder"],
1803 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1808 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1810 # get initial_config_primitive_list that applies to this element
1811 initial_config_primitive_list
= config_descriptor
.get(
1812 "initial-config-primitive"
1816 "Initial config primitive list > {}".format(
1817 initial_config_primitive_list
1821 # add config if not present for NS charm
1822 ee_descriptor_id
= ee_config_descriptor
.get("id")
1823 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1824 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1825 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1829 "Initial config primitive list #2 > {}".format(
1830 initial_config_primitive_list
1833 # n2vc_redesign STEP 3.1
1834 # find old ee_id if exists
1835 ee_id
= vca_deployed
.get("ee_id")
1837 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1838 # create or register execution environment in VCA
1839 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1841 self
._write
_configuration
_status
(
1843 vca_index
=vca_index
,
1845 element_under_configuration
=element_under_configuration
,
1846 element_type
=element_type
,
1849 step
= "create execution environment"
1850 self
.logger
.debug(logging_text
+ step
)
1854 if vca_type
== "k8s_proxy_charm":
1855 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1856 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1857 namespace
=namespace
,
1858 artifact_path
=artifact_path
,
1862 elif vca_type
== "helm" or vca_type
== "helm-v3":
1863 ee_id
, credentials
= await self
.vca_map
[
1865 ].create_execution_environment(
1866 namespace
=namespace
,
1870 artifact_path
=artifact_path
,
1871 chart_model
=vca_name
,
1875 ee_id
, credentials
= await self
.vca_map
[
1877 ].create_execution_environment(
1878 namespace
=namespace
,
1884 elif vca_type
== "native_charm":
1885 step
= "Waiting to VM being up and getting IP address"
1886 self
.logger
.debug(logging_text
+ step
)
1887 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1896 credentials
= {"hostname": rw_mgmt_ip
}
1898 username
= deep_get(
1899 config_descriptor
, ("config-access", "ssh-access", "default-user")
1901 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1902 # merged. Meanwhile let's get username from initial-config-primitive
1903 if not username
and initial_config_primitive_list
:
1904 for config_primitive
in initial_config_primitive_list
:
1905 for param
in config_primitive
.get("parameter", ()):
1906 if param
["name"] == "ssh-username":
1907 username
= param
["value"]
1911 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1912 "'config-access.ssh-access.default-user'"
1914 credentials
["username"] = username
1915 # n2vc_redesign STEP 3.2
1917 self
._write
_configuration
_status
(
1919 vca_index
=vca_index
,
1920 status
="REGISTERING",
1921 element_under_configuration
=element_under_configuration
,
1922 element_type
=element_type
,
1925 step
= "register execution environment {}".format(credentials
)
1926 self
.logger
.debug(logging_text
+ step
)
1927 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1928 credentials
=credentials
,
1929 namespace
=namespace
,
1934 # for compatibility with MON/POL modules, the need model and application name at database
1935 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1936 ee_id_parts
= ee_id
.split(".")
1937 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1938 if len(ee_id_parts
) >= 2:
1939 model_name
= ee_id_parts
[0]
1940 application_name
= ee_id_parts
[1]
1941 db_nsr_update
[db_update_entry
+ "model"] = model_name
1942 db_nsr_update
[db_update_entry
+ "application"] = application_name
1944 # n2vc_redesign STEP 3.3
1945 step
= "Install configuration Software"
1947 self
._write
_configuration
_status
(
1949 vca_index
=vca_index
,
1950 status
="INSTALLING SW",
1951 element_under_configuration
=element_under_configuration
,
1952 element_type
=element_type
,
1953 other_update
=db_nsr_update
,
1956 # TODO check if already done
1957 self
.logger
.debug(logging_text
+ step
)
1959 if vca_type
== "native_charm":
1960 config_primitive
= next(
1961 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
1964 if config_primitive
:
1965 config
= self
._map
_primitive
_params
(
1966 config_primitive
, {}, deploy_params
1969 if vca_type
== "lxc_proxy_charm":
1970 if element_type
== "NS":
1971 num_units
= db_nsr
.get("config-units") or 1
1972 elif element_type
== "VNF":
1973 num_units
= db_vnfr
.get("config-units") or 1
1974 elif element_type
== "VDU":
1975 for v
in db_vnfr
["vdur"]:
1976 if vdu_id
== v
["vdu-id-ref"]:
1977 num_units
= v
.get("config-units") or 1
1979 if vca_type
!= "k8s_proxy_charm":
1980 await self
.vca_map
[vca_type
].install_configuration_sw(
1982 artifact_path
=artifact_path
,
1985 num_units
=num_units
,
1990 # write in db flag of configuration_sw already installed
1992 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
1995 # add relations for this VCA (wait for other peers related with this VCA)
1996 is_relation_added
= await self
._add
_vca
_relations
(
1997 logging_text
=logging_text
,
2000 vca_index
=vca_index
,
2003 if not is_relation_added
:
2004 raise LcmException("Relations could not be added to VCA.")
2006 # if SSH access is required, then get execution environment SSH public
2007 # if native charm we have waited already to VM be UP
2008 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
2011 # self.logger.debug("get ssh key block")
2013 config_descriptor
, ("config-access", "ssh-access", "required")
2015 # self.logger.debug("ssh key needed")
2016 # Needed to inject a ssh key
2019 ("config-access", "ssh-access", "default-user"),
2021 step
= "Install configuration Software, getting public ssh key"
2022 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
2023 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
2026 step
= "Insert public key into VM user={} ssh_key={}".format(
2030 # self.logger.debug("no need to get ssh key")
2031 step
= "Waiting to VM being up and getting IP address"
2032 self
.logger
.debug(logging_text
+ step
)
2034 # default rw_mgmt_ip to None, avoiding the non definition of the variable
2037 # n2vc_redesign STEP 5.1
2038 # wait for RO (ip-address) Insert pub_key into VM
2041 rw_mgmt_ip
, services
= await self
.wait_kdu_up(
2042 logging_text
, nsr_id
, vnfr_id
, kdu_name
2044 vnfd
= self
.db
.get_one(
2046 {"_id": f
'{db_vnfr["vnfd-id"]}:{db_vnfr["revision"]}'},
2048 kdu
= get_kdu(vnfd
, kdu_name
)
2050 service
["name"] for service
in get_kdu_services(kdu
)
2052 exposed_services
= []
2053 for service
in services
:
2054 if any(s
in service
["name"] for s
in kdu_services
):
2055 exposed_services
.append(service
)
2056 await self
.vca_map
[vca_type
].exec_primitive(
2058 primitive_name
="config",
2060 "osm-config": json
.dumps(
2062 k8s
={"services": exposed_services
}
2069 # This verification is needed in order to avoid trying to add a public key
2070 # to a VM, when the VNF is a KNF (in the edge case where the user creates a VCA
2071 # for a KNF and not for its KDUs, the previous verification gives False, and the code
2072 # jumps to this block, meaning that there is the need to verify if the VNF is actually a VNF
2074 elif db_vnfr
.get("vdur"):
2075 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
2085 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
2087 # store rw_mgmt_ip in deploy params for later replacement
2088 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
2090 # n2vc_redesign STEP 6 Execute initial config primitive
2091 step
= "execute initial config primitive"
2093 # wait for dependent primitives execution (NS -> VNF -> VDU)
2094 if initial_config_primitive_list
:
2095 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
2097 # stage, in function of element type: vdu, kdu, vnf or ns
2098 my_vca
= vca_deployed_list
[vca_index
]
2099 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
2101 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
2102 elif my_vca
.get("member-vnf-index"):
2104 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
2107 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
2109 self
._write
_configuration
_status
(
2110 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
2113 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2115 check_if_terminated_needed
= True
2116 for initial_config_primitive
in initial_config_primitive_list
:
2117 # adding information on the vca_deployed if it is a NS execution environment
2118 if not vca_deployed
["member-vnf-index"]:
2119 deploy_params
["ns_config_info"] = json
.dumps(
2120 self
._get
_ns
_config
_info
(nsr_id
)
2122 # TODO check if already done
2123 primitive_params_
= self
._map
_primitive
_params
(
2124 initial_config_primitive
, {}, deploy_params
2127 step
= "execute primitive '{}' params '{}'".format(
2128 initial_config_primitive
["name"], primitive_params_
2130 self
.logger
.debug(logging_text
+ step
)
2131 await self
.vca_map
[vca_type
].exec_primitive(
2133 primitive_name
=initial_config_primitive
["name"],
2134 params_dict
=primitive_params_
,
2139 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2140 if check_if_terminated_needed
:
2141 if config_descriptor
.get("terminate-config-primitive"):
2143 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
2145 check_if_terminated_needed
= False
2147 # TODO register in database that primitive is done
2149 # STEP 7 Configure metrics
2150 if vca_type
== "helm" or vca_type
== "helm-v3":
2151 # TODO: review for those cases where the helm chart is a reference and
2152 # is not part of the NF package
2153 prometheus_jobs
= await self
.extract_prometheus_scrape_jobs(
2155 artifact_path
=artifact_path
,
2156 ee_config_descriptor
=ee_config_descriptor
,
2159 target_ip
=rw_mgmt_ip
,
2160 element_type
=element_type
,
2161 vnf_member_index
=db_vnfr
.get("member-vnf-index-ref", ""),
2163 vdu_index
=vdu_index
,
2165 kdu_index
=kdu_index
,
2171 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
2174 for job
in prometheus_jobs
:
2177 {"job_name": job
["job_name"]},
2180 fail_on_empty
=False,
2183 step
= "instantiated at VCA"
2184 self
.logger
.debug(logging_text
+ step
)
2186 self
._write
_configuration
_status
(
2187 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
2190 except Exception as e
: # TODO not use Exception but N2VC exception
2191 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2193 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
2196 "Exception while {} : {}".format(step
, e
), exc_info
=True
2198 self
._write
_configuration
_status
(
2199 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2201 raise LcmException("{}. {}".format(step
, e
)) from e
2203 def _write_ns_status(
2207 current_operation
: str,
2208 current_operation_id
: str,
2209 error_description
: str = None,
2210 error_detail
: str = None,
2211 other_update
: dict = None,
2214 Update db_nsr fields.
2217 :param current_operation:
2218 :param current_operation_id:
2219 :param error_description:
2220 :param error_detail:
2221 :param other_update: Other required changes at database if provided, will be cleared
2225 db_dict
= other_update
or {}
2228 ] = current_operation_id
# for backward compatibility
2229 db_dict
["_admin.current-operation"] = current_operation_id
2230 db_dict
["_admin.operation-type"] = (
2231 current_operation
if current_operation
!= "IDLE" else None
2233 db_dict
["currentOperation"] = current_operation
2234 db_dict
["currentOperationID"] = current_operation_id
2235 db_dict
["errorDescription"] = error_description
2236 db_dict
["errorDetail"] = error_detail
2239 db_dict
["nsState"] = ns_state
2240 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2241 except DbException
as e
:
2242 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2244 def _write_op_status(
2248 error_message
: str = None,
2249 queuePosition
: int = 0,
2250 operation_state
: str = None,
2251 other_update
: dict = None,
2254 db_dict
= other_update
or {}
2255 db_dict
["queuePosition"] = queuePosition
2256 if isinstance(stage
, list):
2257 db_dict
["stage"] = stage
[0]
2258 db_dict
["detailed-status"] = " ".join(stage
)
2259 elif stage
is not None:
2260 db_dict
["stage"] = str(stage
)
2262 if error_message
is not None:
2263 db_dict
["errorMessage"] = error_message
2264 if operation_state
is not None:
2265 db_dict
["operationState"] = operation_state
2266 db_dict
["statusEnteredTime"] = time()
2267 self
.update_db_2("nslcmops", op_id
, db_dict
)
2268 except DbException
as e
:
2270 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2273 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2275 nsr_id
= db_nsr
["_id"]
2276 # configurationStatus
2277 config_status
= db_nsr
.get("configurationStatus")
2280 "configurationStatus.{}.status".format(index
): status
2281 for index
, v
in enumerate(config_status
)
2285 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2287 except DbException
as e
:
2289 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2292 def _write_configuration_status(
2297 element_under_configuration
: str = None,
2298 element_type
: str = None,
2299 other_update
: dict = None,
2302 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2303 # .format(vca_index, status))
2306 db_path
= "configurationStatus.{}.".format(vca_index
)
2307 db_dict
= other_update
or {}
2309 db_dict
[db_path
+ "status"] = status
2310 if element_under_configuration
:
2312 db_path
+ "elementUnderConfiguration"
2313 ] = element_under_configuration
2315 db_dict
[db_path
+ "elementType"] = element_type
2316 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2317 except DbException
as e
:
2319 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2320 status
, nsr_id
, vca_index
, e
2324 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2326 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2327 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2328 Database is used because the result can be obtained from a different LCM worker in case of HA.
2329 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2330 :param db_nslcmop: database content of nslcmop
2331 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2332 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2333 computed 'vim-account-id'
2336 nslcmop_id
= db_nslcmop
["_id"]
2337 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2338 if placement_engine
== "PLA":
2340 logging_text
+ "Invoke and wait for placement optimization"
2342 await self
.msg
.aiowrite(
2343 "pla", "get_placement", {"nslcmopId": nslcmop_id
}, loop
=self
.loop
2345 db_poll_interval
= 5
2346 wait
= db_poll_interval
* 10
2348 while not pla_result
and wait
>= 0:
2349 await asyncio
.sleep(db_poll_interval
)
2350 wait
-= db_poll_interval
2351 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2352 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2356 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2359 for pla_vnf
in pla_result
["vnf"]:
2360 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2361 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2366 {"_id": vnfr
["_id"]},
2367 {"vim-account-id": pla_vnf
["vimAccountId"]},
2370 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2373 def update_nsrs_with_pla_result(self
, params
):
2375 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2377 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2379 except Exception as e
:
2380 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2382 async def instantiate(self
, nsr_id
, nslcmop_id
):
2385 :param nsr_id: ns instance to deploy
2386 :param nslcmop_id: operation to run
2390 # Try to lock HA task here
2391 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2392 if not task_is_locked_by_me
:
2394 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2398 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2399 self
.logger
.debug(logging_text
+ "Enter")
2401 # get all needed from database
2403 # database nsrs record
2406 # database nslcmops record
2409 # update operation on nsrs
2411 # update operation on nslcmops
2412 db_nslcmop_update
= {}
2414 timeout_ns_deploy
= self
.timeout
.ns_deploy
2416 nslcmop_operation_state
= None
2417 db_vnfrs
= {} # vnf's info indexed by member-index
2419 tasks_dict_info
= {} # from task to info text
2423 "Stage 1/5: preparation of the environment.",
2424 "Waiting for previous operations to terminate.",
2427 # ^ stage, step, VIM progress
2429 # wait for any previous tasks in process
2430 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2432 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2433 stage
[1] = "Reading from database."
2434 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2435 db_nsr_update
["detailed-status"] = "creating"
2436 db_nsr_update
["operational-status"] = "init"
2437 self
._write
_ns
_status
(
2439 ns_state
="BUILDING",
2440 current_operation
="INSTANTIATING",
2441 current_operation_id
=nslcmop_id
,
2442 other_update
=db_nsr_update
,
2444 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2446 # read from db: operation
2447 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2448 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2449 if db_nslcmop
["operationParams"].get("additionalParamsForVnf"):
2450 db_nslcmop
["operationParams"]["additionalParamsForVnf"] = json
.loads(
2451 db_nslcmop
["operationParams"]["additionalParamsForVnf"]
2453 ns_params
= db_nslcmop
.get("operationParams")
2454 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2455 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2458 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2459 self
.logger
.debug(logging_text
+ stage
[1])
2460 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2461 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2462 self
.logger
.debug(logging_text
+ stage
[1])
2463 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2464 self
.fs
.sync(db_nsr
["nsd-id"])
2466 # nsr_name = db_nsr["name"] # TODO short-name??
2468 # read from db: vnf's of this ns
2469 stage
[1] = "Getting vnfrs from db."
2470 self
.logger
.debug(logging_text
+ stage
[1])
2471 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2473 # read from db: vnfd's for every vnf
2474 db_vnfds
= [] # every vnfd data
2476 # for each vnf in ns, read vnfd
2477 for vnfr
in db_vnfrs_list
:
2478 if vnfr
.get("kdur"):
2480 for kdur
in vnfr
["kdur"]:
2481 if kdur
.get("additionalParams"):
2482 kdur
["additionalParams"] = json
.loads(
2483 kdur
["additionalParams"]
2485 kdur_list
.append(kdur
)
2486 vnfr
["kdur"] = kdur_list
2488 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2489 vnfd_id
= vnfr
["vnfd-id"]
2490 vnfd_ref
= vnfr
["vnfd-ref"]
2491 self
.fs
.sync(vnfd_id
)
2493 # if we haven't this vnfd, read it from db
2494 if vnfd_id
not in db_vnfds
:
2496 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2499 self
.logger
.debug(logging_text
+ stage
[1])
2500 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2503 db_vnfds
.append(vnfd
)
2505 # Get or generates the _admin.deployed.VCA list
2506 vca_deployed_list
= None
2507 if db_nsr
["_admin"].get("deployed"):
2508 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2509 if vca_deployed_list
is None:
2510 vca_deployed_list
= []
2511 configuration_status_list
= []
2512 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2513 db_nsr_update
["configurationStatus"] = configuration_status_list
2514 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2515 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2516 elif isinstance(vca_deployed_list
, dict):
2517 # maintain backward compatibility. Change a dict to list at database
2518 vca_deployed_list
= list(vca_deployed_list
.values())
2519 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2520 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2523 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2525 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2526 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2528 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2529 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2530 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2532 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2535 # n2vc_redesign STEP 2 Deploy Network Scenario
2536 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2537 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2539 stage
[1] = "Deploying KDUs."
2540 # self.logger.debug(logging_text + "Before deploy_kdus")
2541 # Call to deploy_kdus in case exists the "vdu:kdu" param
2542 await self
.deploy_kdus(
2543 logging_text
=logging_text
,
2545 nslcmop_id
=nslcmop_id
,
2548 task_instantiation_info
=tasks_dict_info
,
2551 stage
[1] = "Getting VCA public key."
2552 # n2vc_redesign STEP 1 Get VCA public ssh-key
2553 # feature 1429. Add n2vc public key to needed VMs
2554 n2vc_key
= self
.n2vc
.get_public_key()
2555 n2vc_key_list
= [n2vc_key
]
2556 if self
.vca_config
.public_key
:
2557 n2vc_key_list
.append(self
.vca_config
.public_key
)
2559 stage
[1] = "Deploying NS at VIM."
2560 task_ro
= asyncio
.ensure_future(
2561 self
.instantiate_RO(
2562 logging_text
=logging_text
,
2566 db_nslcmop
=db_nslcmop
,
2569 n2vc_key_list
=n2vc_key_list
,
2573 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2574 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2576 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2577 stage
[1] = "Deploying Execution Environments."
2578 self
.logger
.debug(logging_text
+ stage
[1])
2580 # create namespace and certificate if any helm based EE is present in the NS
2581 if check_helm_ee_in_ns(db_vnfds
):
2582 # TODO: create EE namespace
2583 # create TLS certificates
2584 await self
.vca_map
["helm-v3"].create_tls_certificate(
2585 secret_name
="ee-tls-{}".format(nsr_id
),
2588 usage
="server auth",
2591 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2592 for vnf_profile
in get_vnf_profiles(nsd
):
2593 vnfd_id
= vnf_profile
["vnfd-id"]
2594 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2595 member_vnf_index
= str(vnf_profile
["id"])
2596 db_vnfr
= db_vnfrs
[member_vnf_index
]
2597 base_folder
= vnfd
["_admin"]["storage"]
2604 # Get additional parameters
2605 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2606 if db_vnfr
.get("additionalParamsForVnf"):
2607 deploy_params
.update(
2608 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2611 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2612 if descriptor_config
:
2614 logging_text
=logging_text
2615 + "member_vnf_index={} ".format(member_vnf_index
),
2618 nslcmop_id
=nslcmop_id
,
2624 member_vnf_index
=member_vnf_index
,
2625 vdu_index
=vdu_index
,
2626 kdu_index
=kdu_index
,
2628 deploy_params
=deploy_params
,
2629 descriptor_config
=descriptor_config
,
2630 base_folder
=base_folder
,
2631 task_instantiation_info
=tasks_dict_info
,
2635 # Deploy charms for each VDU that supports one.
2636 for vdud
in get_vdu_list(vnfd
):
2638 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2639 vdur
= find_in_list(
2640 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2643 if vdur
.get("additionalParams"):
2644 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2646 deploy_params_vdu
= deploy_params
2647 deploy_params_vdu
["OSM"] = get_osm_params(
2648 db_vnfr
, vdu_id
, vdu_count_index
=0
2650 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2652 self
.logger
.debug("VDUD > {}".format(vdud
))
2654 "Descriptor config > {}".format(descriptor_config
)
2656 if descriptor_config
:
2660 for vdu_index
in range(vdud_count
):
2661 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2663 logging_text
=logging_text
2664 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2665 member_vnf_index
, vdu_id
, vdu_index
2669 nslcmop_id
=nslcmop_id
,
2675 kdu_index
=kdu_index
,
2676 member_vnf_index
=member_vnf_index
,
2677 vdu_index
=vdu_index
,
2679 deploy_params
=deploy_params_vdu
,
2680 descriptor_config
=descriptor_config
,
2681 base_folder
=base_folder
,
2682 task_instantiation_info
=tasks_dict_info
,
2685 for kdud
in get_kdu_list(vnfd
):
2686 kdu_name
= kdud
["name"]
2687 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2688 if descriptor_config
:
2692 kdu_index
, kdur
= next(
2694 for x
in enumerate(db_vnfr
["kdur"])
2695 if x
[1]["kdu-name"] == kdu_name
2697 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2698 if kdur
.get("additionalParams"):
2699 deploy_params_kdu
.update(
2700 parse_yaml_strings(kdur
["additionalParams"].copy())
2704 logging_text
=logging_text
,
2707 nslcmop_id
=nslcmop_id
,
2713 member_vnf_index
=member_vnf_index
,
2714 vdu_index
=vdu_index
,
2715 kdu_index
=kdu_index
,
2717 deploy_params
=deploy_params_kdu
,
2718 descriptor_config
=descriptor_config
,
2719 base_folder
=base_folder
,
2720 task_instantiation_info
=tasks_dict_info
,
2724 # Check if this NS has a charm configuration
2725 descriptor_config
= nsd
.get("ns-configuration")
2726 if descriptor_config
and descriptor_config
.get("juju"):
2729 member_vnf_index
= None
2736 # Get additional parameters
2737 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2738 if db_nsr
.get("additionalParamsForNs"):
2739 deploy_params
.update(
2740 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2742 base_folder
= nsd
["_admin"]["storage"]
2744 logging_text
=logging_text
,
2747 nslcmop_id
=nslcmop_id
,
2753 member_vnf_index
=member_vnf_index
,
2754 vdu_index
=vdu_index
,
2755 kdu_index
=kdu_index
,
2757 deploy_params
=deploy_params
,
2758 descriptor_config
=descriptor_config
,
2759 base_folder
=base_folder
,
2760 task_instantiation_info
=tasks_dict_info
,
2764 # rest of staff will be done at finally
2767 ROclient
.ROClientException
,
2773 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
2776 except asyncio
.CancelledError
:
2778 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
2780 exc
= "Operation was cancelled"
2781 except Exception as e
:
2782 exc
= traceback
.format_exc()
2783 self
.logger
.critical(
2784 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
2789 error_list
.append(str(exc
))
2791 # wait for pending tasks
2793 stage
[1] = "Waiting for instantiate pending tasks."
2794 self
.logger
.debug(logging_text
+ stage
[1])
2795 error_list
+= await self
._wait
_for
_tasks
(
2803 stage
[1] = stage
[2] = ""
2804 except asyncio
.CancelledError
:
2805 error_list
.append("Cancelled")
2806 # TODO cancel all tasks
2807 except Exception as exc
:
2808 error_list
.append(str(exc
))
2810 # update operation-status
2811 db_nsr_update
["operational-status"] = "running"
2812 # let's begin with VCA 'configured' status (later we can change it)
2813 db_nsr_update
["config-status"] = "configured"
2814 for task
, task_name
in tasks_dict_info
.items():
2815 if not task
.done() or task
.cancelled() or task
.exception():
2816 if task_name
.startswith(self
.task_name_deploy_vca
):
2817 # A N2VC task is pending
2818 db_nsr_update
["config-status"] = "failed"
2820 # RO or KDU task is pending
2821 db_nsr_update
["operational-status"] = "failed"
2823 # update status at database
2825 error_detail
= ". ".join(error_list
)
2826 self
.logger
.error(logging_text
+ error_detail
)
2827 error_description_nslcmop
= "{} Detail: {}".format(
2828 stage
[0], error_detail
2830 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
2831 nslcmop_id
, stage
[0]
2834 db_nsr_update
["detailed-status"] = (
2835 error_description_nsr
+ " Detail: " + error_detail
2837 db_nslcmop_update
["detailed-status"] = error_detail
2838 nslcmop_operation_state
= "FAILED"
2842 error_description_nsr
= error_description_nslcmop
= None
2844 db_nsr_update
["detailed-status"] = "Done"
2845 db_nslcmop_update
["detailed-status"] = "Done"
2846 nslcmop_operation_state
= "COMPLETED"
2849 self
._write
_ns
_status
(
2852 current_operation
="IDLE",
2853 current_operation_id
=None,
2854 error_description
=error_description_nsr
,
2855 error_detail
=error_detail
,
2856 other_update
=db_nsr_update
,
2858 self
._write
_op
_status
(
2861 error_message
=error_description_nslcmop
,
2862 operation_state
=nslcmop_operation_state
,
2863 other_update
=db_nslcmop_update
,
2866 if nslcmop_operation_state
:
2868 await self
.msg
.aiowrite(
2873 "nslcmop_id": nslcmop_id
,
2874 "operationState": nslcmop_operation_state
,
2878 except Exception as e
:
2880 logging_text
+ "kafka_write notification Exception {}".format(e
)
2883 self
.logger
.debug(logging_text
+ "Exit")
2884 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2886 def _get_vnfd(self
, vnfd_id
: str, projects_read
: str, cached_vnfds
: Dict
[str, Any
]):
2887 if vnfd_id
not in cached_vnfds
:
2888 cached_vnfds
[vnfd_id
] = self
.db
.get_one(
2889 "vnfds", {"id": vnfd_id
, "_admin.projects_read": projects_read
}
2891 return cached_vnfds
[vnfd_id
]
2893 def _get_vnfr(self
, nsr_id
: str, vnf_profile_id
: str, cached_vnfrs
: Dict
[str, Any
]):
2894 if vnf_profile_id
not in cached_vnfrs
:
2895 cached_vnfrs
[vnf_profile_id
] = self
.db
.get_one(
2898 "member-vnf-index-ref": vnf_profile_id
,
2899 "nsr-id-ref": nsr_id
,
2902 return cached_vnfrs
[vnf_profile_id
]
2904 def _is_deployed_vca_in_relation(
2905 self
, vca
: DeployedVCA
, relation
: Relation
2908 for endpoint
in (relation
.provider
, relation
.requirer
):
2909 if endpoint
["kdu-resource-profile-id"]:
2912 vca
.vnf_profile_id
== endpoint
.vnf_profile_id
2913 and vca
.vdu_profile_id
== endpoint
.vdu_profile_id
2914 and vca
.execution_environment_ref
== endpoint
.execution_environment_ref
2920 def _update_ee_relation_data_with_implicit_data(
2921 self
, nsr_id
, nsd
, ee_relation_data
, cached_vnfds
, vnf_profile_id
: str = None
2923 ee_relation_data
= safe_get_ee_relation(
2924 nsr_id
, ee_relation_data
, vnf_profile_id
=vnf_profile_id
2926 ee_relation_level
= EELevel
.get_level(ee_relation_data
)
2927 if (ee_relation_level
in (EELevel
.VNF
, EELevel
.VDU
)) and not ee_relation_data
[
2928 "execution-environment-ref"
2930 vnf_profile
= get_vnf_profile(nsd
, ee_relation_data
["vnf-profile-id"])
2931 vnfd_id
= vnf_profile
["vnfd-id"]
2932 project
= nsd
["_admin"]["projects_read"][0]
2933 db_vnfd
= self
._get
_vnfd
(vnfd_id
, project
, cached_vnfds
)
2936 if ee_relation_level
== EELevel
.VNF
2937 else ee_relation_data
["vdu-profile-id"]
2939 ee
= get_juju_ee_ref(db_vnfd
, entity_id
)
2942 f
"not execution environments found for ee_relation {ee_relation_data}"
2944 ee_relation_data
["execution-environment-ref"] = ee
["id"]
2945 return ee_relation_data
2947 def _get_ns_relations(
2950 nsd
: Dict
[str, Any
],
2952 cached_vnfds
: Dict
[str, Any
],
2953 ) -> List
[Relation
]:
2955 db_ns_relations
= get_ns_configuration_relation_list(nsd
)
2956 for r
in db_ns_relations
:
2957 provider_dict
= None
2958 requirer_dict
= None
2959 if all(key
in r
for key
in ("provider", "requirer")):
2960 provider_dict
= r
["provider"]
2961 requirer_dict
= r
["requirer"]
2962 elif "entities" in r
:
2963 provider_id
= r
["entities"][0]["id"]
2966 "endpoint": r
["entities"][0]["endpoint"],
2968 if provider_id
!= nsd
["id"]:
2969 provider_dict
["vnf-profile-id"] = provider_id
2970 requirer_id
= r
["entities"][1]["id"]
2973 "endpoint": r
["entities"][1]["endpoint"],
2975 if requirer_id
!= nsd
["id"]:
2976 requirer_dict
["vnf-profile-id"] = requirer_id
2979 "provider/requirer or entities must be included in the relation."
2981 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2982 nsr_id
, nsd
, provider_dict
, cached_vnfds
2984 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2985 nsr_id
, nsd
, requirer_dict
, cached_vnfds
2987 provider
= EERelation(relation_provider
)
2988 requirer
= EERelation(relation_requirer
)
2989 relation
= Relation(r
["name"], provider
, requirer
)
2990 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
2992 relations
.append(relation
)
2995 def _get_vnf_relations(
2998 nsd
: Dict
[str, Any
],
3000 cached_vnfds
: Dict
[str, Any
],
3001 ) -> List
[Relation
]:
3003 if vca
.target_element
== "ns":
3004 self
.logger
.debug("VCA is a NS charm, not a VNF.")
3006 vnf_profile
= get_vnf_profile(nsd
, vca
.vnf_profile_id
)
3007 vnf_profile_id
= vnf_profile
["id"]
3008 vnfd_id
= vnf_profile
["vnfd-id"]
3009 project
= nsd
["_admin"]["projects_read"][0]
3010 db_vnfd
= self
._get
_vnfd
(vnfd_id
, project
, cached_vnfds
)
3011 db_vnf_relations
= get_relation_list(db_vnfd
, vnfd_id
)
3012 for r
in db_vnf_relations
:
3013 provider_dict
= None
3014 requirer_dict
= None
3015 if all(key
in r
for key
in ("provider", "requirer")):
3016 provider_dict
= r
["provider"]
3017 requirer_dict
= r
["requirer"]
3018 elif "entities" in r
:
3019 provider_id
= r
["entities"][0]["id"]
3022 "vnf-profile-id": vnf_profile_id
,
3023 "endpoint": r
["entities"][0]["endpoint"],
3025 if provider_id
!= vnfd_id
:
3026 provider_dict
["vdu-profile-id"] = provider_id
3027 requirer_id
= r
["entities"][1]["id"]
3030 "vnf-profile-id": vnf_profile_id
,
3031 "endpoint": r
["entities"][1]["endpoint"],
3033 if requirer_id
!= vnfd_id
:
3034 requirer_dict
["vdu-profile-id"] = requirer_id
3037 "provider/requirer or entities must be included in the relation."
3039 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3040 nsr_id
, nsd
, provider_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3042 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3043 nsr_id
, nsd
, requirer_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3045 provider
= EERelation(relation_provider
)
3046 requirer
= EERelation(relation_requirer
)
3047 relation
= Relation(r
["name"], provider
, requirer
)
3048 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
3050 relations
.append(relation
)
3053 def _get_kdu_resource_data(
3055 ee_relation
: EERelation
,
3056 db_nsr
: Dict
[str, Any
],
3057 cached_vnfds
: Dict
[str, Any
],
3058 ) -> DeployedK8sResource
:
3059 nsd
= get_nsd(db_nsr
)
3060 vnf_profiles
= get_vnf_profiles(nsd
)
3061 vnfd_id
= find_in_list(
3063 lambda vnf_profile
: vnf_profile
["id"] == ee_relation
.vnf_profile_id
,
3065 project
= nsd
["_admin"]["projects_read"][0]
3066 db_vnfd
= self
._get
_vnfd
(vnfd_id
, project
, cached_vnfds
)
3067 kdu_resource_profile
= get_kdu_resource_profile(
3068 db_vnfd
, ee_relation
.kdu_resource_profile_id
3070 kdu_name
= kdu_resource_profile
["kdu-name"]
3071 deployed_kdu
, _
= get_deployed_kdu(
3072 db_nsr
.get("_admin", ()).get("deployed", ()),
3074 ee_relation
.vnf_profile_id
,
3076 deployed_kdu
.update({"resource-name": kdu_resource_profile
["resource-name"]})
3079 def _get_deployed_component(
3081 ee_relation
: EERelation
,
3082 db_nsr
: Dict
[str, Any
],
3083 cached_vnfds
: Dict
[str, Any
],
3084 ) -> DeployedComponent
:
3085 nsr_id
= db_nsr
["_id"]
3086 deployed_component
= None
3087 ee_level
= EELevel
.get_level(ee_relation
)
3088 if ee_level
== EELevel
.NS
:
3089 vca
= get_deployed_vca(db_nsr
, {"vdu_id": None, "member-vnf-index": None})
3091 deployed_component
= DeployedVCA(nsr_id
, vca
)
3092 elif ee_level
== EELevel
.VNF
:
3093 vca
= get_deployed_vca(
3097 "member-vnf-index": ee_relation
.vnf_profile_id
,
3098 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3102 deployed_component
= DeployedVCA(nsr_id
, vca
)
3103 elif ee_level
== EELevel
.VDU
:
3104 vca
= get_deployed_vca(
3107 "vdu_id": ee_relation
.vdu_profile_id
,
3108 "member-vnf-index": ee_relation
.vnf_profile_id
,
3109 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3113 deployed_component
= DeployedVCA(nsr_id
, vca
)
3114 elif ee_level
== EELevel
.KDU
:
3115 kdu_resource_data
= self
._get
_kdu
_resource
_data
(
3116 ee_relation
, db_nsr
, cached_vnfds
3118 if kdu_resource_data
:
3119 deployed_component
= DeployedK8sResource(kdu_resource_data
)
3120 return deployed_component
3122 async def _add_relation(
3126 db_nsr
: Dict
[str, Any
],
3127 cached_vnfds
: Dict
[str, Any
],
3128 cached_vnfrs
: Dict
[str, Any
],
3130 deployed_provider
= self
._get
_deployed
_component
(
3131 relation
.provider
, db_nsr
, cached_vnfds
3133 deployed_requirer
= self
._get
_deployed
_component
(
3134 relation
.requirer
, db_nsr
, cached_vnfds
3138 and deployed_requirer
3139 and deployed_provider
.config_sw_installed
3140 and deployed_requirer
.config_sw_installed
3142 provider_db_vnfr
= (
3144 relation
.provider
.nsr_id
,
3145 relation
.provider
.vnf_profile_id
,
3148 if relation
.provider
.vnf_profile_id
3151 requirer_db_vnfr
= (
3153 relation
.requirer
.nsr_id
,
3154 relation
.requirer
.vnf_profile_id
,
3157 if relation
.requirer
.vnf_profile_id
3160 provider_vca_id
= self
.get_vca_id(provider_db_vnfr
, db_nsr
)
3161 requirer_vca_id
= self
.get_vca_id(requirer_db_vnfr
, db_nsr
)
3162 provider_relation_endpoint
= RelationEndpoint(
3163 deployed_provider
.ee_id
,
3165 relation
.provider
.endpoint
,
3167 requirer_relation_endpoint
= RelationEndpoint(
3168 deployed_requirer
.ee_id
,
3170 relation
.requirer
.endpoint
,
3173 await self
.vca_map
[vca_type
].add_relation(
3174 provider
=provider_relation_endpoint
,
3175 requirer
=requirer_relation_endpoint
,
3177 except N2VCException
as exception
:
3178 self
.logger
.error(exception
)
3179 raise LcmException(exception
)
3183 async def _add_vca_relations(
3189 timeout
: int = 3600,
3193 # 1. find all relations for this VCA
3194 # 2. wait for other peers related
3198 # STEP 1: find all relations for this VCA
3201 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3202 nsd
= get_nsd(db_nsr
)
3205 deployed_vca_dict
= get_deployed_vca_list(db_nsr
)[vca_index
]
3206 my_vca
= DeployedVCA(nsr_id
, deployed_vca_dict
)
3211 relations
.extend(self
._get
_ns
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3212 relations
.extend(self
._get
_vnf
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3214 # if no relations, terminate
3216 self
.logger
.debug(logging_text
+ " No relations")
3219 self
.logger
.debug(logging_text
+ " adding relations {}".format(relations
))
3226 if now
- start
>= timeout
:
3227 self
.logger
.error(logging_text
+ " : timeout adding relations")
3230 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3231 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3233 # for each relation, find the VCA's related
3234 for relation
in relations
.copy():
3235 added
= await self
._add
_relation
(
3243 relations
.remove(relation
)
3246 self
.logger
.debug("Relations added")
3248 await asyncio
.sleep(5.0)
3252 except Exception as e
:
3253 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
3256 async def _install_kdu(
3264 k8s_instance_info
: dict,
3265 k8params
: dict = None,
3271 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
3274 "collection": "nsrs",
3275 "filter": {"_id": nsr_id
},
3276 "path": nsr_db_path
,
3279 if k8s_instance_info
.get("kdu-deployment-name"):
3280 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
3282 kdu_instance
= self
.k8scluster_map
[
3284 ].generate_kdu_instance_name(
3285 db_dict
=db_dict_install
,
3286 kdu_model
=k8s_instance_info
["kdu-model"],
3287 kdu_name
=k8s_instance_info
["kdu-name"],
3290 # Update the nsrs table with the kdu-instance value
3294 _desc
={nsr_db_path
+ ".kdu-instance": kdu_instance
},
3297 # Update the nsrs table with the actual namespace being used, if the k8scluster-type is `juju` or
3298 # `juju-bundle`. This verification is needed because there is not a standard/homogeneous namespace
3299 # between the Helm Charts and Juju Bundles-based KNFs. If we found a way of having an homogeneous
3300 # namespace, this first verification could be removed, and the next step would be done for any kind
3302 # TODO -> find a way to have an homogeneous namespace between the Helm Charts and Juju Bundles-based
3303 # KNFs (Bug 2027: https://osm.etsi.org/bugzilla/show_bug.cgi?id=2027)
3304 if k8sclustertype
in ("juju", "juju-bundle"):
3305 # First, verify if the current namespace is present in the `_admin.projects_read` (if not, it means
3306 # that the user passed a namespace which he wants its KDU to be deployed in)
3312 "_admin.projects_write": k8s_instance_info
["namespace"],
3313 "_admin.projects_read": k8s_instance_info
["namespace"],
3319 f
"Updating namespace/model for Juju Bundle from {k8s_instance_info['namespace']} to {kdu_instance}"
3324 _desc
={f
"{nsr_db_path}.namespace": kdu_instance
},
3326 k8s_instance_info
["namespace"] = kdu_instance
3328 await self
.k8scluster_map
[k8sclustertype
].install(
3329 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3330 kdu_model
=k8s_instance_info
["kdu-model"],
3333 db_dict
=db_dict_install
,
3335 kdu_name
=k8s_instance_info
["kdu-name"],
3336 namespace
=k8s_instance_info
["namespace"],
3337 kdu_instance
=kdu_instance
,
3341 # Obtain services to obtain management service ip
3342 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
3343 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3344 kdu_instance
=kdu_instance
,
3345 namespace
=k8s_instance_info
["namespace"],
3348 # Obtain management service info (if exists)
3349 vnfr_update_dict
= {}
3350 kdu_config
= get_configuration(vnfd
, kdud
["name"])
3352 target_ee_list
= kdu_config
.get("execution-environment-list", [])
3357 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
3360 for service
in kdud
.get("service", [])
3361 if service
.get("mgmt-service")
3363 for mgmt_service
in mgmt_services
:
3364 for service
in services
:
3365 if service
["name"].startswith(mgmt_service
["name"]):
3366 # Mgmt service found, Obtain service ip
3367 ip
= service
.get("external_ip", service
.get("cluster_ip"))
3368 if isinstance(ip
, list) and len(ip
) == 1:
3372 "kdur.{}.ip-address".format(kdu_index
)
3375 # Check if must update also mgmt ip at the vnf
3376 service_external_cp
= mgmt_service
.get(
3377 "external-connection-point-ref"
3379 if service_external_cp
:
3381 deep_get(vnfd
, ("mgmt-interface", "cp"))
3382 == service_external_cp
3384 vnfr_update_dict
["ip-address"] = ip
3389 "external-connection-point-ref", ""
3391 == service_external_cp
,
3394 "kdur.{}.ip-address".format(kdu_index
)
3399 "Mgmt service name: {} not found".format(
3400 mgmt_service
["name"]
3404 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3405 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3407 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3410 and kdu_config
.get("initial-config-primitive")
3411 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3413 initial_config_primitive_list
= kdu_config
.get(
3414 "initial-config-primitive"
3416 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3418 for initial_config_primitive
in initial_config_primitive_list
:
3419 primitive_params_
= self
._map
_primitive
_params
(
3420 initial_config_primitive
, {}, {}
3423 await asyncio
.wait_for(
3424 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3425 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3426 kdu_instance
=kdu_instance
,
3427 primitive_name
=initial_config_primitive
["name"],
3428 params
=primitive_params_
,
3429 db_dict
=db_dict_install
,
3435 except Exception as e
:
3436 # Prepare update db with error and raise exception
3439 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3443 vnfr_data
.get("_id"),
3444 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3447 # ignore to keep original exception
3449 # reraise original error
3454 async def deploy_kdus(
3461 task_instantiation_info
,
3463 # Launch kdus if present in the descriptor
3465 k8scluster_id_2_uuic
= {
3466 "helm-chart-v3": {},
3471 async def _get_cluster_id(cluster_id
, cluster_type
):
3472 nonlocal k8scluster_id_2_uuic
3473 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3474 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3476 # check if K8scluster is creating and wait look if previous tasks in process
3477 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3478 "k8scluster", cluster_id
3481 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3482 task_name
, cluster_id
3484 self
.logger
.debug(logging_text
+ text
)
3485 await asyncio
.wait(task_dependency
, timeout
=3600)
3487 db_k8scluster
= self
.db
.get_one(
3488 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3490 if not db_k8scluster
:
3491 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3493 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3495 if cluster_type
== "helm-chart-v3":
3497 # backward compatibility for existing clusters that have not been initialized for helm v3
3498 k8s_credentials
= yaml
.safe_dump(
3499 db_k8scluster
.get("credentials")
3501 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3502 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3504 db_k8scluster_update
= {}
3505 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3506 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3507 db_k8scluster_update
[
3508 "_admin.helm-chart-v3.created"
3510 db_k8scluster_update
[
3511 "_admin.helm-chart-v3.operationalState"
3514 "k8sclusters", cluster_id
, db_k8scluster_update
3516 except Exception as e
:
3519 + "error initializing helm-v3 cluster: {}".format(str(e
))
3522 "K8s cluster '{}' has not been initialized for '{}'".format(
3523 cluster_id
, cluster_type
3528 "K8s cluster '{}' has not been initialized for '{}'".format(
3529 cluster_id
, cluster_type
3532 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3535 logging_text
+= "Deploy kdus: "
3538 db_nsr_update
= {"_admin.deployed.K8s": []}
3539 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3542 updated_cluster_list
= []
3543 updated_v3_cluster_list
= []
3545 for vnfr_data
in db_vnfrs
.values():
3546 vca_id
= self
.get_vca_id(vnfr_data
, {})
3547 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3548 # Step 0: Prepare and set parameters
3549 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3550 vnfd_id
= vnfr_data
.get("vnfd-id")
3551 vnfd_with_id
= find_in_list(
3552 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3556 for kdud
in vnfd_with_id
["kdu"]
3557 if kdud
["name"] == kdur
["kdu-name"]
3559 namespace
= kdur
.get("k8s-namespace")
3560 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3561 if kdur
.get("helm-chart"):
3562 kdumodel
= kdur
["helm-chart"]
3563 # Default version: helm3, if helm-version is v2 assign v2
3564 k8sclustertype
= "helm-chart-v3"
3565 self
.logger
.debug("kdur: {}".format(kdur
))
3567 kdur
.get("helm-version")
3568 and kdur
.get("helm-version") == "v2"
3570 k8sclustertype
= "helm-chart"
3571 elif kdur
.get("juju-bundle"):
3572 kdumodel
= kdur
["juju-bundle"]
3573 k8sclustertype
= "juju-bundle"
3576 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3577 "juju-bundle. Maybe an old NBI version is running".format(
3578 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3581 # check if kdumodel is a file and exists
3583 vnfd_with_id
= find_in_list(
3584 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3586 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3587 if storage
: # may be not present if vnfd has not artifacts
3588 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3589 if storage
["pkg-dir"]:
3590 filename
= "{}/{}/{}s/{}".format(
3597 filename
= "{}/Scripts/{}s/{}".format(
3602 if self
.fs
.file_exists(
3603 filename
, mode
="file"
3604 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3605 kdumodel
= self
.fs
.path
+ filename
3606 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3608 except Exception: # it is not a file
3611 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3612 step
= "Synchronize repos for k8s cluster '{}'".format(
3615 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3619 k8sclustertype
== "helm-chart"
3620 and cluster_uuid
not in updated_cluster_list
3622 k8sclustertype
== "helm-chart-v3"
3623 and cluster_uuid
not in updated_v3_cluster_list
3625 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3626 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3627 cluster_uuid
=cluster_uuid
3630 if del_repo_list
or added_repo_dict
:
3631 if k8sclustertype
== "helm-chart":
3633 "_admin.helm_charts_added." + item
: None
3634 for item
in del_repo_list
3637 "_admin.helm_charts_added." + item
: name
3638 for item
, name
in added_repo_dict
.items()
3640 updated_cluster_list
.append(cluster_uuid
)
3641 elif k8sclustertype
== "helm-chart-v3":
3643 "_admin.helm_charts_v3_added." + item
: None
3644 for item
in del_repo_list
3647 "_admin.helm_charts_v3_added." + item
: name
3648 for item
, name
in added_repo_dict
.items()
3650 updated_v3_cluster_list
.append(cluster_uuid
)
3652 logging_text
+ "repos synchronized on k8s cluster "
3653 "'{}' to_delete: {}, to_add: {}".format(
3654 k8s_cluster_id
, del_repo_list
, added_repo_dict
3659 {"_id": k8s_cluster_id
},
3665 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3666 vnfr_data
["member-vnf-index-ref"],
3670 k8s_instance_info
= {
3671 "kdu-instance": None,
3672 "k8scluster-uuid": cluster_uuid
,
3673 "k8scluster-type": k8sclustertype
,
3674 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3675 "kdu-name": kdur
["kdu-name"],
3676 "kdu-model": kdumodel
,
3677 "namespace": namespace
,
3678 "kdu-deployment-name": kdu_deployment_name
,
3680 db_path
= "_admin.deployed.K8s.{}".format(index
)
3681 db_nsr_update
[db_path
] = k8s_instance_info
3682 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3683 vnfd_with_id
= find_in_list(
3684 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3686 task
= asyncio
.ensure_future(
3695 k8params
=desc_params
,
3700 self
.lcm_tasks
.register(
3704 "instantiate_KDU-{}".format(index
),
3707 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3713 except (LcmException
, asyncio
.CancelledError
):
3715 except Exception as e
:
3716 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3717 if isinstance(e
, (N2VCException
, DbException
)):
3718 self
.logger
.error(logging_text
+ msg
)
3720 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3721 raise LcmException(msg
)
3724 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3744 task_instantiation_info
,
3747 # launch instantiate_N2VC in a asyncio task and register task object
3748 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3749 # if not found, create one entry and update database
3750 # fill db_nsr._admin.deployed.VCA.<index>
3753 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
3757 get_charm_name
= False
3758 if "execution-environment-list" in descriptor_config
:
3759 ee_list
= descriptor_config
.get("execution-environment-list", [])
3760 elif "juju" in descriptor_config
:
3761 ee_list
= [descriptor_config
] # ns charms
3762 if "execution-environment-list" not in descriptor_config
:
3763 # charm name is only required for ns charms
3764 get_charm_name
= True
3765 else: # other types as script are not supported
3768 for ee_item
in ee_list
:
3771 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3772 ee_item
.get("juju"), ee_item
.get("helm-chart")
3775 ee_descriptor_id
= ee_item
.get("id")
3776 if ee_item
.get("juju"):
3777 vca_name
= ee_item
["juju"].get("charm")
3779 charm_name
= self
.find_charm_name(db_nsr
, str(vca_name
))
3782 if ee_item
["juju"].get("charm") is not None
3785 if ee_item
["juju"].get("cloud") == "k8s":
3786 vca_type
= "k8s_proxy_charm"
3787 elif ee_item
["juju"].get("proxy") is False:
3788 vca_type
= "native_charm"
3789 elif ee_item
.get("helm-chart"):
3790 vca_name
= ee_item
["helm-chart"]
3791 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
3794 vca_type
= "helm-v3"
3797 logging_text
+ "skipping non juju neither charm configuration"
3802 for vca_index
, vca_deployed
in enumerate(
3803 db_nsr
["_admin"]["deployed"]["VCA"]
3805 if not vca_deployed
:
3808 vca_deployed
.get("member-vnf-index") == member_vnf_index
3809 and vca_deployed
.get("vdu_id") == vdu_id
3810 and vca_deployed
.get("kdu_name") == kdu_name
3811 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
3812 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
3816 # not found, create one.
3818 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
3821 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
3823 target
+= "/kdu/{}".format(kdu_name
)
3825 "target_element": target
,
3826 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3827 "member-vnf-index": member_vnf_index
,
3829 "kdu_name": kdu_name
,
3830 "vdu_count_index": vdu_index
,
3831 "operational-status": "init", # TODO revise
3832 "detailed-status": "", # TODO revise
3833 "step": "initial-deploy", # TODO revise
3835 "vdu_name": vdu_name
,
3837 "ee_descriptor_id": ee_descriptor_id
,
3838 "charm_name": charm_name
,
3842 # create VCA and configurationStatus in db
3844 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
3845 "configurationStatus.{}".format(vca_index
): dict(),
3847 self
.update_db_2("nsrs", nsr_id
, db_dict
)
3849 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
3851 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
3852 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
3853 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
3856 task_n2vc
= asyncio
.ensure_future(
3857 self
.instantiate_N2VC(
3858 logging_text
=logging_text
,
3859 vca_index
=vca_index
,
3865 vdu_index
=vdu_index
,
3866 kdu_index
=kdu_index
,
3867 deploy_params
=deploy_params
,
3868 config_descriptor
=descriptor_config
,
3869 base_folder
=base_folder
,
3870 nslcmop_id
=nslcmop_id
,
3874 ee_config_descriptor
=ee_item
,
3877 self
.lcm_tasks
.register(
3881 "instantiate_N2VC-{}".format(vca_index
),
3884 task_instantiation_info
[
3886 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
3887 member_vnf_index
or "", vdu_id
or ""
3891 def _create_nslcmop(nsr_id
, operation
, params
):
3893 Creates a ns-lcm-opp content to be stored at database.
3894 :param nsr_id: internal id of the instance
3895 :param operation: instantiate, terminate, scale, action, ...
3896 :param params: user parameters for the operation
3897 :return: dictionary following SOL005 format
3899 # Raise exception if invalid arguments
3900 if not (nsr_id
and operation
and params
):
3902 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3909 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3910 "operationState": "PROCESSING",
3911 "statusEnteredTime": now
,
3912 "nsInstanceId": nsr_id
,
3913 "lcmOperationType": operation
,
3915 "isAutomaticInvocation": False,
3916 "operationParams": params
,
3917 "isCancelPending": False,
3919 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
3920 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
3925 def _format_additional_params(self
, params
):
3926 params
= params
or {}
3927 for key
, value
in params
.items():
3928 if str(value
).startswith("!!yaml "):
3929 params
[key
] = yaml
.safe_load(value
[7:])
3932 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
3933 primitive
= seq
.get("name")
3934 primitive_params
= {}
3936 "member_vnf_index": vnf_index
,
3937 "primitive": primitive
,
3938 "primitive_params": primitive_params
,
3941 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
3945 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
3946 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
3947 if op
.get("operationState") == "COMPLETED":
3948 # b. Skip sub-operation
3949 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3950 return self
.SUBOPERATION_STATUS_SKIP
3952 # c. retry executing sub-operation
3953 # The sub-operation exists, and operationState != 'COMPLETED'
3954 # Update operationState = 'PROCESSING' to indicate a retry.
3955 operationState
= "PROCESSING"
3956 detailed_status
= "In progress"
3957 self
._update
_suboperation
_status
(
3958 db_nslcmop
, op_index
, operationState
, detailed_status
3960 # Return the sub-operation index
3961 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3962 # with arguments extracted from the sub-operation
3965 # Find a sub-operation where all keys in a matching dictionary must match
3966 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3967 def _find_suboperation(self
, db_nslcmop
, match
):
3968 if db_nslcmop
and match
:
3969 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
3970 for i
, op
in enumerate(op_list
):
3971 if all(op
.get(k
) == match
[k
] for k
in match
):
3973 return self
.SUBOPERATION_STATUS_NOT_FOUND
3975 # Update status for a sub-operation given its index
3976 def _update_suboperation_status(
3977 self
, db_nslcmop
, op_index
, operationState
, detailed_status
3979 # Update DB for HA tasks
3980 q_filter
= {"_id": db_nslcmop
["_id"]}
3982 "_admin.operations.{}.operationState".format(op_index
): operationState
,
3983 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
3986 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
3989 # Add sub-operation, return the index of the added sub-operation
3990 # Optionally, set operationState, detailed-status, and operationType
3991 # Status and type are currently set for 'scale' sub-operations:
3992 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3993 # 'detailed-status' : status message
3994 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3995 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3996 def _add_suboperation(
4004 mapped_primitive_params
,
4005 operationState
=None,
4006 detailed_status
=None,
4009 RO_scaling_info
=None,
4012 return self
.SUBOPERATION_STATUS_NOT_FOUND
4013 # Get the "_admin.operations" list, if it exists
4014 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
4015 op_list
= db_nslcmop_admin
.get("operations")
4016 # Create or append to the "_admin.operations" list
4018 "member_vnf_index": vnf_index
,
4020 "vdu_count_index": vdu_count_index
,
4021 "primitive": primitive
,
4022 "primitive_params": mapped_primitive_params
,
4025 new_op
["operationState"] = operationState
4027 new_op
["detailed-status"] = detailed_status
4029 new_op
["lcmOperationType"] = operationType
4031 new_op
["RO_nsr_id"] = RO_nsr_id
4033 new_op
["RO_scaling_info"] = RO_scaling_info
4035 # No existing operations, create key 'operations' with current operation as first list element
4036 db_nslcmop_admin
.update({"operations": [new_op
]})
4037 op_list
= db_nslcmop_admin
.get("operations")
4039 # Existing operations, append operation to list
4040 op_list
.append(new_op
)
4042 db_nslcmop_update
= {"_admin.operations": op_list
}
4043 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
4044 op_index
= len(op_list
) - 1
4047 # Helper methods for scale() sub-operations
4049 # pre-scale/post-scale:
4050 # Check for 3 different cases:
4051 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
4052 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
4053 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
4054 def _check_or_add_scale_suboperation(
4058 vnf_config_primitive
,
4062 RO_scaling_info
=None,
4064 # Find this sub-operation
4065 if RO_nsr_id
and RO_scaling_info
:
4066 operationType
= "SCALE-RO"
4068 "member_vnf_index": vnf_index
,
4069 "RO_nsr_id": RO_nsr_id
,
4070 "RO_scaling_info": RO_scaling_info
,
4074 "member_vnf_index": vnf_index
,
4075 "primitive": vnf_config_primitive
,
4076 "primitive_params": primitive_params
,
4077 "lcmOperationType": operationType
,
4079 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
4080 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
4081 # a. New sub-operation
4082 # The sub-operation does not exist, add it.
4083 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
4084 # The following parameters are set to None for all kind of scaling:
4086 vdu_count_index
= None
4088 if RO_nsr_id
and RO_scaling_info
:
4089 vnf_config_primitive
= None
4090 primitive_params
= None
4093 RO_scaling_info
= None
4094 # Initial status for sub-operation
4095 operationState
= "PROCESSING"
4096 detailed_status
= "In progress"
4097 # Add sub-operation for pre/post-scaling (zero or more operations)
4098 self
._add
_suboperation
(
4104 vnf_config_primitive
,
4112 return self
.SUBOPERATION_STATUS_NEW
4114 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
4115 # or op_index (operationState != 'COMPLETED')
4116 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
4118 # Function to return execution_environment id
4120 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
4121 # TODO vdu_index_count
4122 for vca
in vca_deployed_list
:
4123 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
4126 async def destroy_N2VC(
4134 exec_primitives
=True,
4139 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
4140 :param logging_text:
4142 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
4143 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
4144 :param vca_index: index in the database _admin.deployed.VCA
4145 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
4146 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
4147 not executed properly
4148 :param scaling_in: True destroys the application, False destroys the model
4149 :return: None or exception
4154 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
4155 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
4159 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
4161 # execute terminate_primitives
4163 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
4164 config_descriptor
.get("terminate-config-primitive"),
4165 vca_deployed
.get("ee_descriptor_id"),
4167 vdu_id
= vca_deployed
.get("vdu_id")
4168 vdu_count_index
= vca_deployed
.get("vdu_count_index")
4169 vdu_name
= vca_deployed
.get("vdu_name")
4170 vnf_index
= vca_deployed
.get("member-vnf-index")
4171 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
4172 for seq
in terminate_primitives
:
4173 # For each sequence in list, get primitive and call _ns_execute_primitive()
4174 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
4175 vnf_index
, seq
.get("name")
4177 self
.logger
.debug(logging_text
+ step
)
4178 # Create the primitive for each sequence, i.e. "primitive": "touch"
4179 primitive
= seq
.get("name")
4180 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
4185 self
._add
_suboperation
(
4192 mapped_primitive_params
,
4194 # Sub-operations: Call _ns_execute_primitive() instead of action()
4196 result
, result_detail
= await self
._ns
_execute
_primitive
(
4197 vca_deployed
["ee_id"],
4199 mapped_primitive_params
,
4203 except LcmException
:
4204 # this happens when VCA is not deployed. In this case it is not needed to terminate
4206 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
4207 if result
not in result_ok
:
4209 "terminate_primitive {} for vnf_member_index={} fails with "
4210 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
4212 # set that this VCA do not need terminated
4213 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
4217 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
4220 # Delete Prometheus Jobs if any
4221 # This uses NSR_ID, so it will destroy any jobs under this index
4222 self
.db
.del_list("prometheus_jobs", {"nsr_id": db_nslcmop
["nsInstanceId"]})
4225 await self
.vca_map
[vca_type
].delete_execution_environment(
4226 vca_deployed
["ee_id"],
4227 scaling_in
=scaling_in
,
4232 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
4233 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
4234 namespace
= "." + db_nsr
["_id"]
4236 await self
.n2vc
.delete_namespace(
4237 namespace
=namespace
,
4238 total_timeout
=self
.timeout
.charm_delete
,
4241 except N2VCNotFound
: # already deleted. Skip
4243 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
4245 async def terminate(self
, nsr_id
, nslcmop_id
):
4246 # Try to lock HA task here
4247 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4248 if not task_is_locked_by_me
:
4251 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4252 self
.logger
.debug(logging_text
+ "Enter")
4253 timeout_ns_terminate
= self
.timeout
.ns_terminate
4256 operation_params
= None
4258 error_list
= [] # annotates all failed error messages
4259 db_nslcmop_update
= {}
4260 autoremove
= False # autoremove after terminated
4261 tasks_dict_info
= {}
4264 "Stage 1/3: Preparing task.",
4265 "Waiting for previous operations to terminate.",
4268 # ^ contains [stage, step, VIM-status]
4270 # wait for any previous tasks in process
4271 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4273 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4274 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4275 operation_params
= db_nslcmop
.get("operationParams") or {}
4276 if operation_params
.get("timeout_ns_terminate"):
4277 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4278 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4279 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4281 db_nsr_update
["operational-status"] = "terminating"
4282 db_nsr_update
["config-status"] = "terminating"
4283 self
._write
_ns
_status
(
4285 ns_state
="TERMINATING",
4286 current_operation
="TERMINATING",
4287 current_operation_id
=nslcmop_id
,
4288 other_update
=db_nsr_update
,
4290 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4291 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4292 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4295 stage
[1] = "Getting vnf descriptors from db."
4296 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4298 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4300 db_vnfds_from_id
= {}
4301 db_vnfds_from_member_index
= {}
4303 for vnfr
in db_vnfrs_list
:
4304 vnfd_id
= vnfr
["vnfd-id"]
4305 if vnfd_id
not in db_vnfds_from_id
:
4306 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4307 db_vnfds_from_id
[vnfd_id
] = vnfd
4308 db_vnfds_from_member_index
[
4309 vnfr
["member-vnf-index-ref"]
4310 ] = db_vnfds_from_id
[vnfd_id
]
4312 # Destroy individual execution environments when there are terminating primitives.
4313 # Rest of EE will be deleted at once
4314 # TODO - check before calling _destroy_N2VC
4315 # if not operation_params.get("skip_terminate_primitives"):#
4316 # or not vca.get("needed_terminate"):
4317 stage
[0] = "Stage 2/3 execute terminating primitives."
4318 self
.logger
.debug(logging_text
+ stage
[0])
4319 stage
[1] = "Looking execution environment that needs terminate."
4320 self
.logger
.debug(logging_text
+ stage
[1])
4322 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4323 config_descriptor
= None
4324 vca_member_vnf_index
= vca
.get("member-vnf-index")
4325 vca_id
= self
.get_vca_id(
4326 db_vnfrs_dict
.get(vca_member_vnf_index
)
4327 if vca_member_vnf_index
4331 if not vca
or not vca
.get("ee_id"):
4333 if not vca
.get("member-vnf-index"):
4335 config_descriptor
= db_nsr
.get("ns-configuration")
4336 elif vca
.get("vdu_id"):
4337 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4338 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4339 elif vca
.get("kdu_name"):
4340 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4341 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4343 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4344 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4345 vca_type
= vca
.get("type")
4346 exec_terminate_primitives
= not operation_params
.get(
4347 "skip_terminate_primitives"
4348 ) and vca
.get("needed_terminate")
4349 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4350 # pending native charms
4352 True if vca_type
in ("helm", "helm-v3", "native_charm") else False
4354 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4355 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4356 task
= asyncio
.ensure_future(
4364 exec_terminate_primitives
,
4368 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4370 # wait for pending tasks of terminate primitives
4374 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4376 error_list
= await self
._wait
_for
_tasks
(
4379 min(self
.timeout
.charm_delete
, timeout_ns_terminate
),
4383 tasks_dict_info
.clear()
4385 return # raise LcmException("; ".join(error_list))
4387 # remove All execution environments at once
4388 stage
[0] = "Stage 3/3 delete all."
4390 if nsr_deployed
.get("VCA"):
4391 stage
[1] = "Deleting all execution environments."
4392 self
.logger
.debug(logging_text
+ stage
[1])
4393 vca_id
= self
.get_vca_id({}, db_nsr
)
4394 task_delete_ee
= asyncio
.ensure_future(
4396 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4397 timeout
=self
.timeout
.charm_delete
,
4400 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4401 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4403 # Delete Namespace and Certificates if necessary
4404 if check_helm_ee_in_ns(list(db_vnfds_from_member_index
.values())):
4405 await self
.vca_map
["helm-v3"].delete_tls_certificate(
4406 certificate_name
=db_nslcmop
["nsInstanceId"],
4408 # TODO: Delete namespace
4410 # Delete from k8scluster
4411 stage
[1] = "Deleting KDUs."
4412 self
.logger
.debug(logging_text
+ stage
[1])
4413 # print(nsr_deployed)
4414 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4415 if not kdu
or not kdu
.get("kdu-instance"):
4417 kdu_instance
= kdu
.get("kdu-instance")
4418 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4419 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4420 vca_id
= self
.get_vca_id({}, db_nsr
)
4421 task_delete_kdu_instance
= asyncio
.ensure_future(
4422 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4423 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4424 kdu_instance
=kdu_instance
,
4426 namespace
=kdu
.get("namespace"),
4432 + "Unknown k8s deployment type {}".format(
4433 kdu
.get("k8scluster-type")
4438 task_delete_kdu_instance
4439 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4442 stage
[1] = "Deleting ns from VIM."
4443 if self
.ro_config
.ng
:
4444 task_delete_ro
= asyncio
.ensure_future(
4445 self
._terminate
_ng
_ro
(
4446 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4449 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4451 # rest of staff will be done at finally
4454 ROclient
.ROClientException
,
4459 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4461 except asyncio
.CancelledError
:
4463 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4465 exc
= "Operation was cancelled"
4466 except Exception as e
:
4467 exc
= traceback
.format_exc()
4468 self
.logger
.critical(
4469 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4474 error_list
.append(str(exc
))
4476 # wait for pending tasks
4478 stage
[1] = "Waiting for terminate pending tasks."
4479 self
.logger
.debug(logging_text
+ stage
[1])
4480 error_list
+= await self
._wait
_for
_tasks
(
4483 timeout_ns_terminate
,
4487 stage
[1] = stage
[2] = ""
4488 except asyncio
.CancelledError
:
4489 error_list
.append("Cancelled")
4490 # TODO cancell all tasks
4491 except Exception as exc
:
4492 error_list
.append(str(exc
))
4493 # update status at database
4495 error_detail
= "; ".join(error_list
)
4496 # self.logger.error(logging_text + error_detail)
4497 error_description_nslcmop
= "{} Detail: {}".format(
4498 stage
[0], error_detail
4500 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4501 nslcmop_id
, stage
[0]
4504 db_nsr_update
["operational-status"] = "failed"
4505 db_nsr_update
["detailed-status"] = (
4506 error_description_nsr
+ " Detail: " + error_detail
4508 db_nslcmop_update
["detailed-status"] = error_detail
4509 nslcmop_operation_state
= "FAILED"
4513 error_description_nsr
= error_description_nslcmop
= None
4514 ns_state
= "NOT_INSTANTIATED"
4515 db_nsr_update
["operational-status"] = "terminated"
4516 db_nsr_update
["detailed-status"] = "Done"
4517 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4518 db_nslcmop_update
["detailed-status"] = "Done"
4519 nslcmop_operation_state
= "COMPLETED"
4522 self
._write
_ns
_status
(
4525 current_operation
="IDLE",
4526 current_operation_id
=None,
4527 error_description
=error_description_nsr
,
4528 error_detail
=error_detail
,
4529 other_update
=db_nsr_update
,
4531 self
._write
_op
_status
(
4534 error_message
=error_description_nslcmop
,
4535 operation_state
=nslcmop_operation_state
,
4536 other_update
=db_nslcmop_update
,
4538 if ns_state
== "NOT_INSTANTIATED":
4542 {"nsr-id-ref": nsr_id
},
4543 {"_admin.nsState": "NOT_INSTANTIATED"},
4545 except DbException
as e
:
4548 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4552 if operation_params
:
4553 autoremove
= operation_params
.get("autoremove", False)
4554 if nslcmop_operation_state
:
4556 await self
.msg
.aiowrite(
4561 "nslcmop_id": nslcmop_id
,
4562 "operationState": nslcmop_operation_state
,
4563 "autoremove": autoremove
,
4567 except Exception as e
:
4569 logging_text
+ "kafka_write notification Exception {}".format(e
)
4572 self
.logger
.debug(logging_text
+ "Exit")
4573 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4575 async def _wait_for_tasks(
4576 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4579 error_detail_list
= []
4581 pending_tasks
= list(created_tasks_info
.keys())
4582 num_tasks
= len(pending_tasks
)
4584 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4585 self
._write
_op
_status
(nslcmop_id
, stage
)
4586 while pending_tasks
:
4588 _timeout
= timeout
+ time_start
- time()
4589 done
, pending_tasks
= await asyncio
.wait(
4590 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4592 num_done
+= len(done
)
4593 if not done
: # Timeout
4594 for task
in pending_tasks
:
4595 new_error
= created_tasks_info
[task
] + ": Timeout"
4596 error_detail_list
.append(new_error
)
4597 error_list
.append(new_error
)
4600 if task
.cancelled():
4603 exc
= task
.exception()
4605 if isinstance(exc
, asyncio
.TimeoutError
):
4607 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4608 error_list
.append(created_tasks_info
[task
])
4609 error_detail_list
.append(new_error
)
4616 ROclient
.ROClientException
,
4622 self
.logger
.error(logging_text
+ new_error
)
4624 exc_traceback
= "".join(
4625 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4629 + created_tasks_info
[task
]
4635 logging_text
+ created_tasks_info
[task
] + ": Done"
4637 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4639 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4640 if nsr_id
: # update also nsr
4645 "errorDescription": "Error at: " + ", ".join(error_list
),
4646 "errorDetail": ". ".join(error_detail_list
),
4649 self
._write
_op
_status
(nslcmop_id
, stage
)
4650 return error_detail_list
4653 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4655 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4656 The default-value is used. If it is between < > it look for a value at instantiation_params
4657 :param primitive_desc: portion of VNFD/NSD that describes primitive
4658 :param params: Params provided by user
4659 :param instantiation_params: Instantiation params provided by user
4660 :return: a dictionary with the calculated params
4662 calculated_params
= {}
4663 for parameter
in primitive_desc
.get("parameter", ()):
4664 param_name
= parameter
["name"]
4665 if param_name
in params
:
4666 calculated_params
[param_name
] = params
[param_name
]
4667 elif "default-value" in parameter
or "value" in parameter
:
4668 if "value" in parameter
:
4669 calculated_params
[param_name
] = parameter
["value"]
4671 calculated_params
[param_name
] = parameter
["default-value"]
4673 isinstance(calculated_params
[param_name
], str)
4674 and calculated_params
[param_name
].startswith("<")
4675 and calculated_params
[param_name
].endswith(">")
4677 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4678 calculated_params
[param_name
] = instantiation_params
[
4679 calculated_params
[param_name
][1:-1]
4683 "Parameter {} needed to execute primitive {} not provided".format(
4684 calculated_params
[param_name
], primitive_desc
["name"]
4689 "Parameter {} needed to execute primitive {} not provided".format(
4690 param_name
, primitive_desc
["name"]
4694 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4695 calculated_params
[param_name
] = yaml
.safe_dump(
4696 calculated_params
[param_name
], default_flow_style
=True, width
=256
4698 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4700 ].startswith("!!yaml "):
4701 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4702 if parameter
.get("data-type") == "INTEGER":
4704 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4705 except ValueError: # error converting string to int
4707 "Parameter {} of primitive {} must be integer".format(
4708 param_name
, primitive_desc
["name"]
4711 elif parameter
.get("data-type") == "BOOLEAN":
4712 calculated_params
[param_name
] = not (
4713 (str(calculated_params
[param_name
])).lower() == "false"
4716 # add always ns_config_info if primitive name is config
4717 if primitive_desc
["name"] == "config":
4718 if "ns_config_info" in instantiation_params
:
4719 calculated_params
["ns_config_info"] = instantiation_params
[
4722 return calculated_params
4724 def _look_for_deployed_vca(
4731 ee_descriptor_id
=None,
4733 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4734 for vca
in deployed_vca
:
4737 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4740 vdu_count_index
is not None
4741 and vdu_count_index
!= vca
["vdu_count_index"]
4744 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4746 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4750 # vca_deployed not found
4752 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4753 " is not deployed".format(
4762 ee_id
= vca
.get("ee_id")
4764 "type", "lxc_proxy_charm"
4765 ) # default value for backward compatibility - proxy charm
4768 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4769 "execution environment".format(
4770 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
4773 return ee_id
, vca_type
4775 async def _ns_execute_primitive(
4781 retries_interval
=30,
4788 if primitive
== "config":
4789 primitive_params
= {"params": primitive_params
}
4791 vca_type
= vca_type
or "lxc_proxy_charm"
4795 output
= await asyncio
.wait_for(
4796 self
.vca_map
[vca_type
].exec_primitive(
4798 primitive_name
=primitive
,
4799 params_dict
=primitive_params
,
4800 progress_timeout
=self
.timeout
.progress_primitive
,
4801 total_timeout
=self
.timeout
.primitive
,
4806 timeout
=timeout
or self
.timeout
.primitive
,
4810 except asyncio
.CancelledError
:
4812 except Exception as e
:
4816 "Error executing action {} on {} -> {}".format(
4821 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
4823 if isinstance(e
, asyncio
.TimeoutError
):
4825 message
="Timed out waiting for action to complete"
4827 return "FAILED", getattr(e
, "message", repr(e
))
4829 return "COMPLETED", output
4831 except (LcmException
, asyncio
.CancelledError
):
4833 except Exception as e
:
4834 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
4836 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
4838 Updating the vca_status with latest juju information in nsrs record
4839 :param: nsr_id: Id of the nsr
4840 :param: nslcmop_id: Id of the nslcmop
4844 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
4845 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4846 vca_id
= self
.get_vca_id({}, db_nsr
)
4847 if db_nsr
["_admin"]["deployed"]["K8s"]:
4848 for _
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
4849 cluster_uuid
, kdu_instance
, cluster_type
= (
4850 k8s
["k8scluster-uuid"],
4851 k8s
["kdu-instance"],
4852 k8s
["k8scluster-type"],
4854 await self
._on
_update
_k
8s
_db
(
4855 cluster_uuid
=cluster_uuid
,
4856 kdu_instance
=kdu_instance
,
4857 filter={"_id": nsr_id
},
4859 cluster_type
=cluster_type
,
4862 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
4863 table
, filter = "nsrs", {"_id": nsr_id
}
4864 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
4865 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
4867 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
4868 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
4870 async def action(self
, nsr_id
, nslcmop_id
):
4871 # Try to lock HA task here
4872 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4873 if not task_is_locked_by_me
:
4876 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
4877 self
.logger
.debug(logging_text
+ "Enter")
4878 # get all needed from database
4882 db_nslcmop_update
= {}
4883 nslcmop_operation_state
= None
4884 error_description_nslcmop
= None
4888 # wait for any previous tasks in process
4889 step
= "Waiting for previous operations to terminate"
4890 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4892 self
._write
_ns
_status
(
4895 current_operation
="RUNNING ACTION",
4896 current_operation_id
=nslcmop_id
,
4899 step
= "Getting information from database"
4900 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4901 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4902 if db_nslcmop
["operationParams"].get("primitive_params"):
4903 db_nslcmop
["operationParams"]["primitive_params"] = json
.loads(
4904 db_nslcmop
["operationParams"]["primitive_params"]
4907 nsr_deployed
= db_nsr
["_admin"].get("deployed")
4908 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
4909 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
4910 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
4911 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
4912 primitive
= db_nslcmop
["operationParams"]["primitive"]
4913 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
4914 timeout_ns_action
= db_nslcmop
["operationParams"].get(
4915 "timeout_ns_action", self
.timeout
.primitive
4919 step
= "Getting vnfr from database"
4920 db_vnfr
= self
.db
.get_one(
4921 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
4923 if db_vnfr
.get("kdur"):
4925 for kdur
in db_vnfr
["kdur"]:
4926 if kdur
.get("additionalParams"):
4927 kdur
["additionalParams"] = json
.loads(
4928 kdur
["additionalParams"]
4930 kdur_list
.append(kdur
)
4931 db_vnfr
["kdur"] = kdur_list
4932 step
= "Getting vnfd from database"
4933 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
4935 # Sync filesystem before running a primitive
4936 self
.fs
.sync(db_vnfr
["vnfd-id"])
4938 step
= "Getting nsd from database"
4939 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
4941 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
4942 # for backward compatibility
4943 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
4944 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
4945 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
4946 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4948 # look for primitive
4949 config_primitive_desc
= descriptor_configuration
= None
4951 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
4953 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
4955 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
4957 descriptor_configuration
= db_nsd
.get("ns-configuration")
4959 if descriptor_configuration
and descriptor_configuration
.get(
4962 for config_primitive
in descriptor_configuration
["config-primitive"]:
4963 if config_primitive
["name"] == primitive
:
4964 config_primitive_desc
= config_primitive
4967 if not config_primitive_desc
:
4968 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
4970 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
4974 primitive_name
= primitive
4975 ee_descriptor_id
= None
4977 primitive_name
= config_primitive_desc
.get(
4978 "execution-environment-primitive", primitive
4980 ee_descriptor_id
= config_primitive_desc
.get(
4981 "execution-environment-ref"
4987 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
4989 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
4992 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
4994 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
4996 desc_params
= parse_yaml_strings(
4997 db_vnfr
.get("additionalParamsForVnf")
5000 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
5001 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
5002 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
5004 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
5005 actions
.add(primitive
["name"])
5006 for primitive
in kdu_configuration
.get("config-primitive", []):
5007 actions
.add(primitive
["name"])
5009 nsr_deployed
["K8s"],
5010 lambda kdu
: kdu_name
== kdu
["kdu-name"]
5011 and kdu
["member-vnf-index"] == vnf_index
,
5015 if primitive_name
in actions
5016 and kdu
["k8scluster-type"] not in ("helm-chart", "helm-chart-v3")
5020 # TODO check if ns is in a proper status
5022 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
5024 # kdur and desc_params already set from before
5025 if primitive_params
:
5026 desc_params
.update(primitive_params
)
5027 # TODO Check if we will need something at vnf level
5028 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
5030 kdu_name
== kdu
["kdu-name"]
5031 and kdu
["member-vnf-index"] == vnf_index
5036 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
5039 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
5040 msg
= "unknown k8scluster-type '{}'".format(
5041 kdu
.get("k8scluster-type")
5043 raise LcmException(msg
)
5046 "collection": "nsrs",
5047 "filter": {"_id": nsr_id
},
5048 "path": "_admin.deployed.K8s.{}".format(index
),
5052 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
5054 step
= "Executing kdu {}".format(primitive_name
)
5055 if primitive_name
== "upgrade":
5056 if desc_params
.get("kdu_model"):
5057 kdu_model
= desc_params
.get("kdu_model")
5058 del desc_params
["kdu_model"]
5060 kdu_model
= kdu
.get("kdu-model")
5061 parts
= kdu_model
.split(sep
=":")
5063 kdu_model
= parts
[0]
5064 if desc_params
.get("kdu_atomic_upgrade"):
5065 atomic_upgrade
= desc_params
.get(
5066 "kdu_atomic_upgrade"
5067 ).lower() in ("yes", "true", "1")
5068 del desc_params
["kdu_atomic_upgrade"]
5070 atomic_upgrade
= True
5072 detailed_status
= await asyncio
.wait_for(
5073 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
5074 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5075 kdu_instance
=kdu
.get("kdu-instance"),
5076 atomic
=atomic_upgrade
,
5077 kdu_model
=kdu_model
,
5080 timeout
=timeout_ns_action
,
5082 timeout
=timeout_ns_action
+ 10,
5085 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
5087 elif primitive_name
== "rollback":
5088 detailed_status
= await asyncio
.wait_for(
5089 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
5090 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5091 kdu_instance
=kdu
.get("kdu-instance"),
5094 timeout
=timeout_ns_action
,
5096 elif primitive_name
== "status":
5097 detailed_status
= await asyncio
.wait_for(
5098 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
5099 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5100 kdu_instance
=kdu
.get("kdu-instance"),
5103 timeout
=timeout_ns_action
,
5106 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
5107 kdu
["kdu-name"], nsr_id
5109 params
= self
._map
_primitive
_params
(
5110 config_primitive_desc
, primitive_params
, desc_params
5113 detailed_status
= await asyncio
.wait_for(
5114 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
5115 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5116 kdu_instance
=kdu_instance
,
5117 primitive_name
=primitive_name
,
5120 timeout
=timeout_ns_action
,
5123 timeout
=timeout_ns_action
,
5127 nslcmop_operation_state
= "COMPLETED"
5129 detailed_status
= ""
5130 nslcmop_operation_state
= "FAILED"
5132 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5133 nsr_deployed
["VCA"],
5134 member_vnf_index
=vnf_index
,
5136 vdu_count_index
=vdu_count_index
,
5137 ee_descriptor_id
=ee_descriptor_id
,
5139 for vca_index
, vca_deployed
in enumerate(
5140 db_nsr
["_admin"]["deployed"]["VCA"]
5142 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5144 "collection": "nsrs",
5145 "filter": {"_id": nsr_id
},
5146 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
5150 nslcmop_operation_state
,
5152 ) = await self
._ns
_execute
_primitive
(
5154 primitive
=primitive_name
,
5155 primitive_params
=self
._map
_primitive
_params
(
5156 config_primitive_desc
, primitive_params
, desc_params
5158 timeout
=timeout_ns_action
,
5164 db_nslcmop_update
["detailed-status"] = detailed_status
5165 error_description_nslcmop
= (
5166 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
5170 + "Done with result {} {}".format(
5171 nslcmop_operation_state
, detailed_status
5174 return # database update is called inside finally
5176 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5177 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5179 except asyncio
.CancelledError
:
5181 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5183 exc
= "Operation was cancelled"
5184 except asyncio
.TimeoutError
:
5185 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5187 except Exception as e
:
5188 exc
= traceback
.format_exc()
5189 self
.logger
.critical(
5190 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5199 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5200 nslcmop_operation_state
= "FAILED"
5202 self
._write
_ns
_status
(
5206 ], # TODO check if degraded. For the moment use previous status
5207 current_operation
="IDLE",
5208 current_operation_id
=None,
5209 # error_description=error_description_nsr,
5210 # error_detail=error_detail,
5211 other_update
=db_nsr_update
,
5214 self
._write
_op
_status
(
5217 error_message
=error_description_nslcmop
,
5218 operation_state
=nslcmop_operation_state
,
5219 other_update
=db_nslcmop_update
,
5222 if nslcmop_operation_state
:
5224 await self
.msg
.aiowrite(
5229 "nslcmop_id": nslcmop_id
,
5230 "operationState": nslcmop_operation_state
,
5234 except Exception as e
:
5236 logging_text
+ "kafka_write notification Exception {}".format(e
)
5238 self
.logger
.debug(logging_text
+ "Exit")
5239 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5240 return nslcmop_operation_state
, detailed_status
5242 async def terminate_vdus(
5243 self
, db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
5245 """This method terminates VDUs
5248 db_vnfr: VNF instance record
5249 member_vnf_index: VNF index to identify the VDUs to be removed
5250 db_nsr: NS instance record
5251 update_db_nslcmops: Nslcmop update record
5253 vca_scaling_info
= []
5254 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5255 scaling_info
["scaling_direction"] = "IN"
5256 scaling_info
["vdu-delete"] = {}
5257 scaling_info
["kdu-delete"] = {}
5258 db_vdur
= db_vnfr
.get("vdur")
5259 vdur_list
= copy(db_vdur
)
5261 for index
, vdu
in enumerate(vdur_list
):
5262 vca_scaling_info
.append(
5264 "osm_vdu_id": vdu
["vdu-id-ref"],
5265 "member-vnf-index": member_vnf_index
,
5267 "vdu_index": count_index
,
5270 scaling_info
["vdu-delete"][vdu
["vdu-id-ref"]] = count_index
5271 scaling_info
["vdu"].append(
5273 "name": vdu
.get("name") or vdu
.get("vdu-name"),
5274 "vdu_id": vdu
["vdu-id-ref"],
5278 for interface
in vdu
["interfaces"]:
5279 scaling_info
["vdu"][index
]["interface"].append(
5281 "name": interface
["name"],
5282 "ip_address": interface
["ip-address"],
5283 "mac_address": interface
.get("mac-address"),
5286 self
.logger
.info("NS update scaling info{}".format(scaling_info
))
5287 stage
[2] = "Terminating VDUs"
5288 if scaling_info
.get("vdu-delete"):
5289 # scale_process = "RO"
5290 if self
.ro_config
.ng
:
5291 await self
._scale
_ng
_ro
(
5300 async def remove_vnf(self
, nsr_id
, nslcmop_id
, vnf_instance_id
):
5301 """This method is to Remove VNF instances from NS.
5304 nsr_id: NS instance id
5305 nslcmop_id: nslcmop id of update
5306 vnf_instance_id: id of the VNF instance to be removed
5309 result: (str, str) COMPLETED/FAILED, details
5313 logging_text
= "Task ns={} update ".format(nsr_id
)
5314 check_vnfr_count
= len(self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}))
5315 self
.logger
.info("check_vnfr_count {}".format(check_vnfr_count
))
5316 if check_vnfr_count
> 1:
5317 stage
= ["", "", ""]
5318 step
= "Getting nslcmop from database"
5320 step
+ " after having waited for previous tasks to be completed"
5322 # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5323 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5324 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5325 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5326 """ db_vnfr = self.db.get_one(
5327 "vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id}) """
5329 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5330 await self
.terminate_vdus(
5339 constituent_vnfr
= db_nsr
.get("constituent-vnfr-ref")
5340 constituent_vnfr
.remove(db_vnfr
.get("_id"))
5341 db_nsr_update
["constituent-vnfr-ref"] = db_nsr
.get(
5342 "constituent-vnfr-ref"
5344 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5345 self
.db
.del_one("vnfrs", {"_id": db_vnfr
.get("_id")})
5346 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5347 return "COMPLETED", "Done"
5349 step
= "Terminate VNF Failed with"
5351 "{} Cannot terminate the last VNF in this NS.".format(
5355 except (LcmException
, asyncio
.CancelledError
):
5357 except Exception as e
:
5358 self
.logger
.debug("Error removing VNF {}".format(e
))
5359 return "FAILED", "Error removing VNF {}".format(e
)
5361 async def _ns_redeploy_vnf(
5369 """This method updates and redeploys VNF instances
5372 nsr_id: NS instance id
5373 nslcmop_id: nslcmop id
5374 db_vnfd: VNF descriptor
5375 db_vnfr: VNF instance record
5376 db_nsr: NS instance record
5379 result: (str, str) COMPLETED/FAILED, details
5383 stage
= ["", "", ""]
5384 logging_text
= "Task ns={} update ".format(nsr_id
)
5385 latest_vnfd_revision
= db_vnfd
["_admin"].get("revision")
5386 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5388 # Terminate old VNF resources
5389 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5390 await self
.terminate_vdus(
5399 # old_vnfd_id = db_vnfr["vnfd-id"]
5400 # new_db_vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
5401 new_db_vnfd
= db_vnfd
5402 # new_vnfd_ref = new_db_vnfd["id"]
5403 # new_vnfd_id = vnfd_id
5407 for cp
in new_db_vnfd
.get("ext-cpd", ()):
5409 "name": cp
.get("id"),
5410 "connection-point-id": cp
.get("int-cpd", {}).get("cpd"),
5411 "connection-point-vdu-id": cp
.get("int-cpd", {}).get("vdu-id"),
5414 new_vnfr_cp
.append(vnf_cp
)
5415 new_vdur
= update_db_nslcmops
["operationParams"]["newVdur"]
5416 # new_vdur = self._create_vdur_descriptor_from_vnfd(db_nsd, db_vnfd, old_db_vnfd, vnfd_id, db_nsr, member_vnf_index)
5417 # new_vnfr_update = {"vnfd-ref": new_vnfd_ref, "vnfd-id": new_vnfd_id, "connection-point": new_vnfr_cp, "vdur": new_vdur, "ip-address": ""}
5419 "revision": latest_vnfd_revision
,
5420 "connection-point": new_vnfr_cp
,
5424 self
.update_db_2("vnfrs", db_vnfr
["_id"], new_vnfr_update
)
5425 updated_db_vnfr
= self
.db
.get_one(
5427 {"member-vnf-index-ref": member_vnf_index
, "nsr-id-ref": nsr_id
},
5430 # Instantiate new VNF resources
5431 # update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5432 vca_scaling_info
= []
5433 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5434 scaling_info
["scaling_direction"] = "OUT"
5435 scaling_info
["vdu-create"] = {}
5436 scaling_info
["kdu-create"] = {}
5437 vdud_instantiate_list
= db_vnfd
["vdu"]
5438 for index
, vdud
in enumerate(vdud_instantiate_list
):
5439 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(vdud
, db_vnfd
)
5441 additional_params
= (
5442 self
._get
_vdu
_additional
_params
(updated_db_vnfr
, vdud
["id"])
5445 cloud_init_list
= []
5447 # TODO Information of its own ip is not available because db_vnfr is not updated.
5448 additional_params
["OSM"] = get_osm_params(
5449 updated_db_vnfr
, vdud
["id"], 1
5451 cloud_init_list
.append(
5452 self
._parse
_cloud
_init
(
5459 vca_scaling_info
.append(
5461 "osm_vdu_id": vdud
["id"],
5462 "member-vnf-index": member_vnf_index
,
5464 "vdu_index": count_index
,
5467 scaling_info
["vdu-create"][vdud
["id"]] = count_index
5468 if self
.ro_config
.ng
:
5470 "New Resources to be deployed: {}".format(scaling_info
)
5472 await self
._scale
_ng
_ro
(
5480 return "COMPLETED", "Done"
5481 except (LcmException
, asyncio
.CancelledError
):
5483 except Exception as e
:
5484 self
.logger
.debug("Error updating VNF {}".format(e
))
5485 return "FAILED", "Error updating VNF {}".format(e
)
5487 async def _ns_charm_upgrade(
5493 timeout
: float = None,
5495 """This method upgrade charms in VNF instances
5498 ee_id: Execution environment id
5499 path: Local path to the charm
5501 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
5502 timeout: (Float) Timeout for the ns update operation
5505 result: (str, str) COMPLETED/FAILED, details
5508 charm_type
= charm_type
or "lxc_proxy_charm"
5509 output
= await self
.vca_map
[charm_type
].upgrade_charm(
5513 charm_type
=charm_type
,
5514 timeout
=timeout
or self
.timeout
.ns_update
,
5518 return "COMPLETED", output
5520 except (LcmException
, asyncio
.CancelledError
):
5523 except Exception as e
:
5525 self
.logger
.debug("Error upgrading charm {}".format(path
))
5527 return "FAILED", "Error upgrading charm {}: {}".format(path
, e
)
5529 async def update(self
, nsr_id
, nslcmop_id
):
5530 """Update NS according to different update types
5532 This method performs upgrade of VNF instances then updates the revision
5533 number in VNF record
5536 nsr_id: Network service will be updated
5537 nslcmop_id: ns lcm operation id
5540 It may raise DbException, LcmException, N2VCException, K8sException
5543 # Try to lock HA task here
5544 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5545 if not task_is_locked_by_me
:
5548 logging_text
= "Task ns={} update={} ".format(nsr_id
, nslcmop_id
)
5549 self
.logger
.debug(logging_text
+ "Enter")
5551 # Set the required variables to be filled up later
5553 db_nslcmop_update
= {}
5555 nslcmop_operation_state
= None
5557 error_description_nslcmop
= ""
5559 change_type
= "updated"
5560 detailed_status
= ""
5561 member_vnf_index
= None
5564 # wait for any previous tasks in process
5565 step
= "Waiting for previous operations to terminate"
5566 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5567 self
._write
_ns
_status
(
5570 current_operation
="UPDATING",
5571 current_operation_id
=nslcmop_id
,
5574 step
= "Getting nslcmop from database"
5575 db_nslcmop
= self
.db
.get_one(
5576 "nslcmops", {"_id": nslcmop_id
}, fail_on_empty
=False
5578 update_type
= db_nslcmop
["operationParams"]["updateType"]
5580 step
= "Getting nsr from database"
5581 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5582 old_operational_status
= db_nsr
["operational-status"]
5583 db_nsr_update
["operational-status"] = "updating"
5584 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5585 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5587 if update_type
== "CHANGE_VNFPKG":
5589 # Get the input parameters given through update request
5590 vnf_instance_id
= db_nslcmop
["operationParams"][
5591 "changeVnfPackageData"
5592 ].get("vnfInstanceId")
5594 vnfd_id
= db_nslcmop
["operationParams"]["changeVnfPackageData"].get(
5597 timeout_seconds
= db_nslcmop
["operationParams"].get("timeout_ns_update")
5599 step
= "Getting vnfr from database"
5600 db_vnfr
= self
.db
.get_one(
5601 "vnfrs", {"_id": vnf_instance_id
}, fail_on_empty
=False
5604 step
= "Getting vnfds from database"
5606 latest_vnfd
= self
.db
.get_one(
5607 "vnfds", {"_id": vnfd_id
}, fail_on_empty
=False
5609 latest_vnfd_revision
= latest_vnfd
["_admin"].get("revision")
5612 current_vnf_revision
= db_vnfr
.get("revision", 1)
5613 current_vnfd
= self
.db
.get_one(
5615 {"_id": vnfd_id
+ ":" + str(current_vnf_revision
)},
5616 fail_on_empty
=False,
5618 # Charm artifact paths will be filled up later
5620 current_charm_artifact_path
,
5621 target_charm_artifact_path
,
5622 charm_artifact_paths
,
5624 ) = ([], [], [], [])
5626 step
= "Checking if revision has changed in VNFD"
5627 if current_vnf_revision
!= latest_vnfd_revision
:
5629 change_type
= "policy_updated"
5631 # There is new revision of VNFD, update operation is required
5632 current_vnfd_path
= vnfd_id
+ ":" + str(current_vnf_revision
)
5633 latest_vnfd_path
= vnfd_id
+ ":" + str(latest_vnfd_revision
)
5635 step
= "Removing the VNFD packages if they exist in the local path"
5636 shutil
.rmtree(self
.fs
.path
+ current_vnfd_path
, ignore_errors
=True)
5637 shutil
.rmtree(self
.fs
.path
+ latest_vnfd_path
, ignore_errors
=True)
5639 step
= "Get the VNFD packages from FSMongo"
5640 self
.fs
.sync(from_path
=latest_vnfd_path
)
5641 self
.fs
.sync(from_path
=current_vnfd_path
)
5644 "Get the charm-type, charm-id, ee-id if there is deployed VCA"
5646 current_base_folder
= current_vnfd
["_admin"]["storage"]
5647 latest_base_folder
= latest_vnfd
["_admin"]["storage"]
5649 for vca_index
, vca_deployed
in enumerate(
5650 get_iterable(nsr_deployed
, "VCA")
5652 vnf_index
= db_vnfr
.get("member-vnf-index-ref")
5654 # Getting charm-id and charm-type
5655 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5656 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5657 vca_type
= vca_deployed
.get("type")
5658 vdu_count_index
= vca_deployed
.get("vdu_count_index")
5661 ee_id
= vca_deployed
.get("ee_id")
5663 step
= "Getting descriptor config"
5664 if current_vnfd
.get("kdu"):
5666 search_key
= "kdu_name"
5668 search_key
= "vnfd_id"
5670 entity_id
= vca_deployed
.get(search_key
)
5672 descriptor_config
= get_configuration(
5673 current_vnfd
, entity_id
5676 if "execution-environment-list" in descriptor_config
:
5677 ee_list
= descriptor_config
.get(
5678 "execution-environment-list", []
5683 # There could be several charm used in the same VNF
5684 for ee_item
in ee_list
:
5685 if ee_item
.get("juju"):
5687 step
= "Getting charm name"
5688 charm_name
= ee_item
["juju"].get("charm")
5690 step
= "Setting Charm artifact paths"
5691 current_charm_artifact_path
.append(
5692 get_charm_artifact_path(
5693 current_base_folder
,
5696 current_vnf_revision
,
5699 target_charm_artifact_path
.append(
5700 get_charm_artifact_path(
5704 latest_vnfd_revision
,
5707 elif ee_item
.get("helm-chart"):
5708 # add chart to list and all parameters
5709 step
= "Getting helm chart name"
5710 chart_name
= ee_item
.get("helm-chart")
5712 ee_item
.get("helm-version")
5713 and ee_item
.get("helm-version") == "v2"
5717 vca_type
= "helm-v3"
5718 step
= "Setting Helm chart artifact paths"
5720 helm_artifacts
.append(
5722 "current_artifact_path": get_charm_artifact_path(
5723 current_base_folder
,
5726 current_vnf_revision
,
5728 "target_artifact_path": get_charm_artifact_path(
5732 latest_vnfd_revision
,
5735 "vca_index": vca_index
,
5736 "vdu_index": vdu_count_index
,
5740 charm_artifact_paths
= zip(
5741 current_charm_artifact_path
, target_charm_artifact_path
5744 step
= "Checking if software version has changed in VNFD"
5745 if find_software_version(current_vnfd
) != find_software_version(
5749 step
= "Checking if existing VNF has charm"
5750 for current_charm_path
, target_charm_path
in list(
5751 charm_artifact_paths
5753 if current_charm_path
:
5755 "Software version change is not supported as VNF instance {} has charm.".format(
5760 # There is no change in the charm package, then redeploy the VNF
5761 # based on new descriptor
5762 step
= "Redeploying VNF"
5763 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5764 (result
, detailed_status
) = await self
._ns
_redeploy
_vnf
(
5765 nsr_id
, nslcmop_id
, latest_vnfd
, db_vnfr
, db_nsr
5767 if result
== "FAILED":
5768 nslcmop_operation_state
= result
5769 error_description_nslcmop
= detailed_status
5770 db_nslcmop_update
["detailed-status"] = detailed_status
5773 + " step {} Done with result {} {}".format(
5774 step
, nslcmop_operation_state
, detailed_status
5779 step
= "Checking if any charm package has changed or not"
5780 for current_charm_path
, target_charm_path
in list(
5781 charm_artifact_paths
5785 and target_charm_path
5786 and self
.check_charm_hash_changed(
5787 current_charm_path
, target_charm_path
5791 step
= "Checking whether VNF uses juju bundle"
5792 if check_juju_bundle_existence(current_vnfd
):
5795 "Charm upgrade is not supported for the instance which"
5796 " uses juju-bundle: {}".format(
5797 check_juju_bundle_existence(current_vnfd
)
5801 step
= "Upgrading Charm"
5805 ) = await self
._ns
_charm
_upgrade
(
5808 charm_type
=vca_type
,
5809 path
=self
.fs
.path
+ target_charm_path
,
5810 timeout
=timeout_seconds
,
5813 if result
== "FAILED":
5814 nslcmop_operation_state
= result
5815 error_description_nslcmop
= detailed_status
5817 db_nslcmop_update
["detailed-status"] = detailed_status
5820 + " step {} Done with result {} {}".format(
5821 step
, nslcmop_operation_state
, detailed_status
5825 step
= "Updating policies"
5826 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5827 result
= "COMPLETED"
5828 detailed_status
= "Done"
5829 db_nslcmop_update
["detailed-status"] = "Done"
5832 for item
in helm_artifacts
:
5834 item
["current_artifact_path"]
5835 and item
["target_artifact_path"]
5836 and self
.check_charm_hash_changed(
5837 item
["current_artifact_path"],
5838 item
["target_artifact_path"],
5842 db_update_entry
= "_admin.deployed.VCA.{}.".format(
5845 vnfr_id
= db_vnfr
["_id"]
5846 osm_config
= {"osm": {"ns_id": nsr_id
, "vnf_id": vnfr_id
}}
5848 "collection": "nsrs",
5849 "filter": {"_id": nsr_id
},
5850 "path": db_update_entry
,
5852 vca_type
, namespace
, helm_id
= get_ee_id_parts(item
["ee_id"])
5853 await self
.vca_map
[vca_type
].upgrade_execution_environment(
5854 namespace
=namespace
,
5858 artifact_path
=item
["target_artifact_path"],
5861 vnf_id
= db_vnfr
.get("vnfd-ref")
5862 config_descriptor
= get_configuration(latest_vnfd
, vnf_id
)
5863 self
.logger
.debug("get ssh key block")
5867 ("config-access", "ssh-access", "required"),
5869 # Needed to inject a ssh key
5872 ("config-access", "ssh-access", "default-user"),
5875 "Install configuration Software, getting public ssh key"
5877 pub_key
= await self
.vca_map
[
5879 ].get_ee_ssh_public__key(
5880 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
5884 "Insert public key into VM user={} ssh_key={}".format(
5888 self
.logger
.debug(logging_text
+ step
)
5890 # wait for RO (ip-address) Insert pub_key into VM
5891 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
5901 initial_config_primitive_list
= config_descriptor
.get(
5902 "initial-config-primitive"
5904 config_primitive
= next(
5907 for p
in initial_config_primitive_list
5908 if p
["name"] == "config"
5912 if not config_primitive
:
5915 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
5917 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
5918 if db_vnfr
.get("additionalParamsForVnf"):
5919 deploy_params
.update(
5921 db_vnfr
["additionalParamsForVnf"].copy()
5924 primitive_params_
= self
._map
_primitive
_params
(
5925 config_primitive
, {}, deploy_params
5928 step
= "execute primitive '{}' params '{}'".format(
5929 config_primitive
["name"], primitive_params_
5931 self
.logger
.debug(logging_text
+ step
)
5932 await self
.vca_map
[vca_type
].exec_primitive(
5934 primitive_name
=config_primitive
["name"],
5935 params_dict
=primitive_params_
,
5941 step
= "Updating policies"
5942 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5943 detailed_status
= "Done"
5944 db_nslcmop_update
["detailed-status"] = "Done"
5946 # If nslcmop_operation_state is None, so any operation is not failed.
5947 if not nslcmop_operation_state
:
5948 nslcmop_operation_state
= "COMPLETED"
5950 # If update CHANGE_VNFPKG nslcmop_operation is successful
5951 # vnf revision need to be updated
5952 vnfr_update
["revision"] = latest_vnfd_revision
5953 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
5957 + " task Done with result {} {}".format(
5958 nslcmop_operation_state
, detailed_status
5961 elif update_type
== "REMOVE_VNF":
5962 # This part is included in https://osm.etsi.org/gerrit/11876
5963 vnf_instance_id
= db_nslcmop
["operationParams"]["removeVnfInstanceId"]
5964 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5965 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5966 step
= "Removing VNF"
5967 (result
, detailed_status
) = await self
.remove_vnf(
5968 nsr_id
, nslcmop_id
, vnf_instance_id
5970 if result
== "FAILED":
5971 nslcmop_operation_state
= result
5972 error_description_nslcmop
= detailed_status
5973 db_nslcmop_update
["detailed-status"] = detailed_status
5974 change_type
= "vnf_terminated"
5975 if not nslcmop_operation_state
:
5976 nslcmop_operation_state
= "COMPLETED"
5979 + " task Done with result {} {}".format(
5980 nslcmop_operation_state
, detailed_status
5984 elif update_type
== "OPERATE_VNF":
5985 vnf_id
= db_nslcmop
["operationParams"]["operateVnfData"][
5988 operation_type
= db_nslcmop
["operationParams"]["operateVnfData"][
5991 additional_param
= db_nslcmop
["operationParams"]["operateVnfData"][
5994 (result
, detailed_status
) = await self
.rebuild_start_stop(
5995 nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
5997 if result
== "FAILED":
5998 nslcmop_operation_state
= result
5999 error_description_nslcmop
= detailed_status
6000 db_nslcmop_update
["detailed-status"] = detailed_status
6001 if not nslcmop_operation_state
:
6002 nslcmop_operation_state
= "COMPLETED"
6005 + " task Done with result {} {}".format(
6006 nslcmop_operation_state
, detailed_status
6010 # If nslcmop_operation_state is None, so any operation is not failed.
6011 # All operations are executed in overall.
6012 if not nslcmop_operation_state
:
6013 nslcmop_operation_state
= "COMPLETED"
6014 db_nsr_update
["operational-status"] = old_operational_status
6016 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
6017 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
6019 except asyncio
.CancelledError
:
6021 logging_text
+ "Cancelled Exception while '{}'".format(step
)
6023 exc
= "Operation was cancelled"
6024 except asyncio
.TimeoutError
:
6025 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
6027 except Exception as e
:
6028 exc
= traceback
.format_exc()
6029 self
.logger
.critical(
6030 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6039 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
6040 nslcmop_operation_state
= "FAILED"
6041 db_nsr_update
["operational-status"] = old_operational_status
6043 self
._write
_ns
_status
(
6045 ns_state
=db_nsr
["nsState"],
6046 current_operation
="IDLE",
6047 current_operation_id
=None,
6048 other_update
=db_nsr_update
,
6051 self
._write
_op
_status
(
6054 error_message
=error_description_nslcmop
,
6055 operation_state
=nslcmop_operation_state
,
6056 other_update
=db_nslcmop_update
,
6059 if nslcmop_operation_state
:
6063 "nslcmop_id": nslcmop_id
,
6064 "operationState": nslcmop_operation_state
,
6067 change_type
in ("vnf_terminated", "policy_updated")
6068 and member_vnf_index
6070 msg
.update({"vnf_member_index": member_vnf_index
})
6071 await self
.msg
.aiowrite("ns", change_type
, msg
, loop
=self
.loop
)
6072 except Exception as e
:
6074 logging_text
+ "kafka_write notification Exception {}".format(e
)
6076 self
.logger
.debug(logging_text
+ "Exit")
6077 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_update")
6078 return nslcmop_operation_state
, detailed_status
6080 async def scale(self
, nsr_id
, nslcmop_id
):
6081 # Try to lock HA task here
6082 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
6083 if not task_is_locked_by_me
:
6086 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
6087 stage
= ["", "", ""]
6088 tasks_dict_info
= {}
6089 # ^ stage, step, VIM progress
6090 self
.logger
.debug(logging_text
+ "Enter")
6091 # get all needed from database
6093 db_nslcmop_update
= {}
6096 # in case of error, indicates what part of scale was failed to put nsr at error status
6097 scale_process
= None
6098 old_operational_status
= ""
6099 old_config_status
= ""
6102 # wait for any previous tasks in process
6103 step
= "Waiting for previous operations to terminate"
6104 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
6105 self
._write
_ns
_status
(
6108 current_operation
="SCALING",
6109 current_operation_id
=nslcmop_id
,
6112 step
= "Getting nslcmop from database"
6114 step
+ " after having waited for previous tasks to be completed"
6116 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
6118 step
= "Getting nsr from database"
6119 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
6120 old_operational_status
= db_nsr
["operational-status"]
6121 old_config_status
= db_nsr
["config-status"]
6123 step
= "Parsing scaling parameters"
6124 db_nsr_update
["operational-status"] = "scaling"
6125 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6126 nsr_deployed
= db_nsr
["_admin"].get("deployed")
6128 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
6130 ]["member-vnf-index"]
6131 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
6133 ]["scaling-group-descriptor"]
6134 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
6135 # for backward compatibility
6136 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
6137 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
6138 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
6139 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6141 step
= "Getting vnfr from database"
6142 db_vnfr
= self
.db
.get_one(
6143 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
6146 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
6148 step
= "Getting vnfd from database"
6149 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
6151 base_folder
= db_vnfd
["_admin"]["storage"]
6153 step
= "Getting scaling-group-descriptor"
6154 scaling_descriptor
= find_in_list(
6155 get_scaling_aspect(db_vnfd
),
6156 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
6158 if not scaling_descriptor
:
6160 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
6161 "at vnfd:scaling-group-descriptor".format(scaling_group
)
6164 step
= "Sending scale order to VIM"
6165 # TODO check if ns is in a proper status
6167 if not db_nsr
["_admin"].get("scaling-group"):
6172 "_admin.scaling-group": [
6173 {"name": scaling_group
, "nb-scale-op": 0}
6177 admin_scale_index
= 0
6179 for admin_scale_index
, admin_scale_info
in enumerate(
6180 db_nsr
["_admin"]["scaling-group"]
6182 if admin_scale_info
["name"] == scaling_group
:
6183 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
6185 else: # not found, set index one plus last element and add new entry with the name
6186 admin_scale_index
+= 1
6188 "_admin.scaling-group.{}.name".format(admin_scale_index
)
6191 vca_scaling_info
= []
6192 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
6193 if scaling_type
== "SCALE_OUT":
6194 if "aspect-delta-details" not in scaling_descriptor
:
6196 "Aspect delta details not fount in scaling descriptor {}".format(
6197 scaling_descriptor
["name"]
6200 # count if max-instance-count is reached
6201 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6203 scaling_info
["scaling_direction"] = "OUT"
6204 scaling_info
["vdu-create"] = {}
6205 scaling_info
["kdu-create"] = {}
6206 for delta
in deltas
:
6207 for vdu_delta
in delta
.get("vdu-delta", {}):
6208 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
6209 # vdu_index also provides the number of instance of the targeted vdu
6210 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6211 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
6215 additional_params
= (
6216 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
6219 cloud_init_list
= []
6221 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6222 max_instance_count
= 10
6223 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
6224 max_instance_count
= vdu_profile
.get(
6225 "max-number-of-instances", 10
6228 default_instance_num
= get_number_of_instances(
6231 instances_number
= vdu_delta
.get("number-of-instances", 1)
6232 nb_scale_op
+= instances_number
6234 new_instance_count
= nb_scale_op
+ default_instance_num
6235 # Control if new count is over max and vdu count is less than max.
6236 # Then assign new instance count
6237 if new_instance_count
> max_instance_count
> vdu_count
:
6238 instances_number
= new_instance_count
- max_instance_count
6240 instances_number
= instances_number
6242 if new_instance_count
> max_instance_count
:
6244 "reached the limit of {} (max-instance-count) "
6245 "scaling-out operations for the "
6246 "scaling-group-descriptor '{}'".format(
6247 nb_scale_op
, scaling_group
6250 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6252 # TODO Information of its own ip is not available because db_vnfr is not updated.
6253 additional_params
["OSM"] = get_osm_params(
6254 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
6256 cloud_init_list
.append(
6257 self
._parse
_cloud
_init
(
6264 vca_scaling_info
.append(
6266 "osm_vdu_id": vdu_delta
["id"],
6267 "member-vnf-index": vnf_index
,
6269 "vdu_index": vdu_index
+ x
,
6272 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
6273 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6274 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6275 kdu_name
= kdu_profile
["kdu-name"]
6276 resource_name
= kdu_profile
.get("resource-name", "")
6278 # Might have different kdus in the same delta
6279 # Should have list for each kdu
6280 if not scaling_info
["kdu-create"].get(kdu_name
, None):
6281 scaling_info
["kdu-create"][kdu_name
] = []
6283 kdur
= get_kdur(db_vnfr
, kdu_name
)
6284 if kdur
.get("helm-chart"):
6285 k8s_cluster_type
= "helm-chart-v3"
6286 self
.logger
.debug("kdur: {}".format(kdur
))
6288 kdur
.get("helm-version")
6289 and kdur
.get("helm-version") == "v2"
6291 k8s_cluster_type
= "helm-chart"
6292 elif kdur
.get("juju-bundle"):
6293 k8s_cluster_type
= "juju-bundle"
6296 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6297 "juju-bundle. Maybe an old NBI version is running".format(
6298 db_vnfr
["member-vnf-index-ref"], kdu_name
6302 max_instance_count
= 10
6303 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
6304 max_instance_count
= kdu_profile
.get(
6305 "max-number-of-instances", 10
6308 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
6309 deployed_kdu
, _
= get_deployed_kdu(
6310 nsr_deployed
, kdu_name
, vnf_index
6312 if deployed_kdu
is None:
6314 "KDU '{}' for vnf '{}' not deployed".format(
6318 kdu_instance
= deployed_kdu
.get("kdu-instance")
6319 instance_num
= await self
.k8scluster_map
[
6325 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6326 kdu_model
=deployed_kdu
.get("kdu-model"),
6328 kdu_replica_count
= instance_num
+ kdu_delta
.get(
6329 "number-of-instances", 1
6332 # Control if new count is over max and instance_num is less than max.
6333 # Then assign max instance number to kdu replica count
6334 if kdu_replica_count
> max_instance_count
> instance_num
:
6335 kdu_replica_count
= max_instance_count
6336 if kdu_replica_count
> max_instance_count
:
6338 "reached the limit of {} (max-instance-count) "
6339 "scaling-out operations for the "
6340 "scaling-group-descriptor '{}'".format(
6341 instance_num
, scaling_group
6345 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6346 vca_scaling_info
.append(
6348 "osm_kdu_id": kdu_name
,
6349 "member-vnf-index": vnf_index
,
6351 "kdu_index": instance_num
+ x
- 1,
6354 scaling_info
["kdu-create"][kdu_name
].append(
6356 "member-vnf-index": vnf_index
,
6358 "k8s-cluster-type": k8s_cluster_type
,
6359 "resource-name": resource_name
,
6360 "scale": kdu_replica_count
,
6363 elif scaling_type
== "SCALE_IN":
6364 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6366 scaling_info
["scaling_direction"] = "IN"
6367 scaling_info
["vdu-delete"] = {}
6368 scaling_info
["kdu-delete"] = {}
6370 for delta
in deltas
:
6371 for vdu_delta
in delta
.get("vdu-delta", {}):
6372 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6373 min_instance_count
= 0
6374 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6375 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
6376 min_instance_count
= vdu_profile
["min-number-of-instances"]
6378 default_instance_num
= get_number_of_instances(
6379 db_vnfd
, vdu_delta
["id"]
6381 instance_num
= vdu_delta
.get("number-of-instances", 1)
6382 nb_scale_op
-= instance_num
6384 new_instance_count
= nb_scale_op
+ default_instance_num
6386 if new_instance_count
< min_instance_count
< vdu_count
:
6387 instances_number
= min_instance_count
- new_instance_count
6389 instances_number
= instance_num
6391 if new_instance_count
< min_instance_count
:
6393 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6394 "scaling-group-descriptor '{}'".format(
6395 nb_scale_op
, scaling_group
6398 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6399 vca_scaling_info
.append(
6401 "osm_vdu_id": vdu_delta
["id"],
6402 "member-vnf-index": vnf_index
,
6404 "vdu_index": vdu_index
- 1 - x
,
6407 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
6408 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6409 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6410 kdu_name
= kdu_profile
["kdu-name"]
6411 resource_name
= kdu_profile
.get("resource-name", "")
6413 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
6414 scaling_info
["kdu-delete"][kdu_name
] = []
6416 kdur
= get_kdur(db_vnfr
, kdu_name
)
6417 if kdur
.get("helm-chart"):
6418 k8s_cluster_type
= "helm-chart-v3"
6419 self
.logger
.debug("kdur: {}".format(kdur
))
6421 kdur
.get("helm-version")
6422 and kdur
.get("helm-version") == "v2"
6424 k8s_cluster_type
= "helm-chart"
6425 elif kdur
.get("juju-bundle"):
6426 k8s_cluster_type
= "juju-bundle"
6429 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6430 "juju-bundle. Maybe an old NBI version is running".format(
6431 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
6435 min_instance_count
= 0
6436 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
6437 min_instance_count
= kdu_profile
["min-number-of-instances"]
6439 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
6440 deployed_kdu
, _
= get_deployed_kdu(
6441 nsr_deployed
, kdu_name
, vnf_index
6443 if deployed_kdu
is None:
6445 "KDU '{}' for vnf '{}' not deployed".format(
6449 kdu_instance
= deployed_kdu
.get("kdu-instance")
6450 instance_num
= await self
.k8scluster_map
[
6456 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6457 kdu_model
=deployed_kdu
.get("kdu-model"),
6459 kdu_replica_count
= instance_num
- kdu_delta
.get(
6460 "number-of-instances", 1
6463 if kdu_replica_count
< min_instance_count
< instance_num
:
6464 kdu_replica_count
= min_instance_count
6465 if kdu_replica_count
< min_instance_count
:
6467 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6468 "scaling-group-descriptor '{}'".format(
6469 instance_num
, scaling_group
6473 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6474 vca_scaling_info
.append(
6476 "osm_kdu_id": kdu_name
,
6477 "member-vnf-index": vnf_index
,
6479 "kdu_index": instance_num
- x
- 1,
6482 scaling_info
["kdu-delete"][kdu_name
].append(
6484 "member-vnf-index": vnf_index
,
6486 "k8s-cluster-type": k8s_cluster_type
,
6487 "resource-name": resource_name
,
6488 "scale": kdu_replica_count
,
6492 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
6493 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
6494 if scaling_info
["scaling_direction"] == "IN":
6495 for vdur
in reversed(db_vnfr
["vdur"]):
6496 if vdu_delete
.get(vdur
["vdu-id-ref"]):
6497 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
6498 scaling_info
["vdu"].append(
6500 "name": vdur
.get("name") or vdur
.get("vdu-name"),
6501 "vdu_id": vdur
["vdu-id-ref"],
6505 for interface
in vdur
["interfaces"]:
6506 scaling_info
["vdu"][-1]["interface"].append(
6508 "name": interface
["name"],
6509 "ip_address": interface
["ip-address"],
6510 "mac_address": interface
.get("mac-address"),
6513 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
6516 step
= "Executing pre-scale vnf-config-primitive"
6517 if scaling_descriptor
.get("scaling-config-action"):
6518 for scaling_config_action
in scaling_descriptor
[
6519 "scaling-config-action"
6522 scaling_config_action
.get("trigger") == "pre-scale-in"
6523 and scaling_type
== "SCALE_IN"
6525 scaling_config_action
.get("trigger") == "pre-scale-out"
6526 and scaling_type
== "SCALE_OUT"
6528 vnf_config_primitive
= scaling_config_action
[
6529 "vnf-config-primitive-name-ref"
6531 step
= db_nslcmop_update
[
6533 ] = "executing pre-scale scaling-config-action '{}'".format(
6534 vnf_config_primitive
6537 # look for primitive
6538 for config_primitive
in (
6539 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6540 ).get("config-primitive", ()):
6541 if config_primitive
["name"] == vnf_config_primitive
:
6545 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
6546 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
6547 "primitive".format(scaling_group
, vnf_config_primitive
)
6550 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6551 if db_vnfr
.get("additionalParamsForVnf"):
6552 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6554 scale_process
= "VCA"
6555 db_nsr_update
["config-status"] = "configuring pre-scaling"
6556 primitive_params
= self
._map
_primitive
_params
(
6557 config_primitive
, {}, vnfr_params
6560 # Pre-scale retry check: Check if this sub-operation has been executed before
6561 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6564 vnf_config_primitive
,
6568 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6569 # Skip sub-operation
6570 result
= "COMPLETED"
6571 result_detail
= "Done"
6574 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6575 vnf_config_primitive
, result
, result_detail
6579 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6580 # New sub-operation: Get index of this sub-operation
6582 len(db_nslcmop
.get("_admin", {}).get("operations"))
6587 + "vnf_config_primitive={} New sub-operation".format(
6588 vnf_config_primitive
6592 # retry: Get registered params for this existing sub-operation
6593 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6596 vnf_index
= op
.get("member_vnf_index")
6597 vnf_config_primitive
= op
.get("primitive")
6598 primitive_params
= op
.get("primitive_params")
6601 + "vnf_config_primitive={} Sub-operation retry".format(
6602 vnf_config_primitive
6605 # Execute the primitive, either with new (first-time) or registered (reintent) args
6606 ee_descriptor_id
= config_primitive
.get(
6607 "execution-environment-ref"
6609 primitive_name
= config_primitive
.get(
6610 "execution-environment-primitive", vnf_config_primitive
6612 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6613 nsr_deployed
["VCA"],
6614 member_vnf_index
=vnf_index
,
6616 vdu_count_index
=None,
6617 ee_descriptor_id
=ee_descriptor_id
,
6619 result
, result_detail
= await self
._ns
_execute
_primitive
(
6628 + "vnf_config_primitive={} Done with result {} {}".format(
6629 vnf_config_primitive
, result
, result_detail
6632 # Update operationState = COMPLETED | FAILED
6633 self
._update
_suboperation
_status
(
6634 db_nslcmop
, op_index
, result
, result_detail
6637 if result
== "FAILED":
6638 raise LcmException(result_detail
)
6639 db_nsr_update
["config-status"] = old_config_status
6640 scale_process
= None
6644 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
6647 "_admin.scaling-group.{}.time".format(admin_scale_index
)
6650 # SCALE-IN VCA - BEGIN
6651 if vca_scaling_info
:
6652 step
= db_nslcmop_update
[
6654 ] = "Deleting the execution environments"
6655 scale_process
= "VCA"
6656 for vca_info
in vca_scaling_info
:
6657 if vca_info
["type"] == "delete" and not vca_info
.get("osm_kdu_id"):
6658 member_vnf_index
= str(vca_info
["member-vnf-index"])
6660 logging_text
+ "vdu info: {}".format(vca_info
)
6662 if vca_info
.get("osm_vdu_id"):
6663 vdu_id
= vca_info
["osm_vdu_id"]
6664 vdu_index
= int(vca_info
["vdu_index"])
6667 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6668 member_vnf_index
, vdu_id
, vdu_index
6670 stage
[2] = step
= "Scaling in VCA"
6671 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6672 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
6673 config_update
= db_nsr
["configurationStatus"]
6674 for vca_index
, vca
in enumerate(vca_update
):
6676 (vca
or vca
.get("ee_id"))
6677 and vca
["member-vnf-index"] == member_vnf_index
6678 and vca
["vdu_count_index"] == vdu_index
6680 if vca
.get("vdu_id"):
6681 config_descriptor
= get_configuration(
6682 db_vnfd
, vca
.get("vdu_id")
6684 elif vca
.get("kdu_name"):
6685 config_descriptor
= get_configuration(
6686 db_vnfd
, vca
.get("kdu_name")
6689 config_descriptor
= get_configuration(
6690 db_vnfd
, db_vnfd
["id"]
6692 operation_params
= (
6693 db_nslcmop
.get("operationParams") or {}
6695 exec_terminate_primitives
= not operation_params
.get(
6696 "skip_terminate_primitives"
6697 ) and vca
.get("needed_terminate")
6698 task
= asyncio
.ensure_future(
6707 exec_primitives
=exec_terminate_primitives
,
6711 timeout
=self
.timeout
.charm_delete
,
6714 tasks_dict_info
[task
] = "Terminating VCA {}".format(
6717 del vca_update
[vca_index
]
6718 del config_update
[vca_index
]
6719 # wait for pending tasks of terminate primitives
6723 + "Waiting for tasks {}".format(
6724 list(tasks_dict_info
.keys())
6727 error_list
= await self
._wait
_for
_tasks
(
6731 self
.timeout
.charm_delete
, self
.timeout
.ns_terminate
6736 tasks_dict_info
.clear()
6738 raise LcmException("; ".join(error_list
))
6740 db_vca_and_config_update
= {
6741 "_admin.deployed.VCA": vca_update
,
6742 "configurationStatus": config_update
,
6745 "nsrs", db_nsr
["_id"], db_vca_and_config_update
6747 scale_process
= None
6748 # SCALE-IN VCA - END
6751 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
6752 scale_process
= "RO"
6753 if self
.ro_config
.ng
:
6754 await self
._scale
_ng
_ro
(
6755 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
6757 scaling_info
.pop("vdu-create", None)
6758 scaling_info
.pop("vdu-delete", None)
6760 scale_process
= None
6764 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
6765 scale_process
= "KDU"
6766 await self
._scale
_kdu
(
6767 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6769 scaling_info
.pop("kdu-create", None)
6770 scaling_info
.pop("kdu-delete", None)
6772 scale_process
= None
6776 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6778 # SCALE-UP VCA - BEGIN
6779 if vca_scaling_info
:
6780 step
= db_nslcmop_update
[
6782 ] = "Creating new execution environments"
6783 scale_process
= "VCA"
6784 for vca_info
in vca_scaling_info
:
6785 if vca_info
["type"] == "create" and not vca_info
.get("osm_kdu_id"):
6786 member_vnf_index
= str(vca_info
["member-vnf-index"])
6788 logging_text
+ "vdu info: {}".format(vca_info
)
6790 vnfd_id
= db_vnfr
["vnfd-ref"]
6791 if vca_info
.get("osm_vdu_id"):
6792 vdu_index
= int(vca_info
["vdu_index"])
6793 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
6794 if db_vnfr
.get("additionalParamsForVnf"):
6795 deploy_params
.update(
6797 db_vnfr
["additionalParamsForVnf"].copy()
6800 descriptor_config
= get_configuration(
6801 db_vnfd
, db_vnfd
["id"]
6803 if descriptor_config
:
6809 logging_text
=logging_text
6810 + "member_vnf_index={} ".format(member_vnf_index
),
6813 nslcmop_id
=nslcmop_id
,
6819 kdu_index
=kdu_index
,
6820 member_vnf_index
=member_vnf_index
,
6821 vdu_index
=vdu_index
,
6823 deploy_params
=deploy_params
,
6824 descriptor_config
=descriptor_config
,
6825 base_folder
=base_folder
,
6826 task_instantiation_info
=tasks_dict_info
,
6829 vdu_id
= vca_info
["osm_vdu_id"]
6830 vdur
= find_in_list(
6831 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
6833 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
6834 if vdur
.get("additionalParams"):
6835 deploy_params_vdu
= parse_yaml_strings(
6836 vdur
["additionalParams"]
6839 deploy_params_vdu
= deploy_params
6840 deploy_params_vdu
["OSM"] = get_osm_params(
6841 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
6843 if descriptor_config
:
6849 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6850 member_vnf_index
, vdu_id
, vdu_index
6852 stage
[2] = step
= "Scaling out VCA"
6853 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6855 logging_text
=logging_text
6856 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6857 member_vnf_index
, vdu_id
, vdu_index
6861 nslcmop_id
=nslcmop_id
,
6867 member_vnf_index
=member_vnf_index
,
6868 vdu_index
=vdu_index
,
6869 kdu_index
=kdu_index
,
6871 deploy_params
=deploy_params_vdu
,
6872 descriptor_config
=descriptor_config
,
6873 base_folder
=base_folder
,
6874 task_instantiation_info
=tasks_dict_info
,
6877 # SCALE-UP VCA - END
6878 scale_process
= None
6881 # execute primitive service POST-SCALING
6882 step
= "Executing post-scale vnf-config-primitive"
6883 if scaling_descriptor
.get("scaling-config-action"):
6884 for scaling_config_action
in scaling_descriptor
[
6885 "scaling-config-action"
6888 scaling_config_action
.get("trigger") == "post-scale-in"
6889 and scaling_type
== "SCALE_IN"
6891 scaling_config_action
.get("trigger") == "post-scale-out"
6892 and scaling_type
== "SCALE_OUT"
6894 vnf_config_primitive
= scaling_config_action
[
6895 "vnf-config-primitive-name-ref"
6897 step
= db_nslcmop_update
[
6899 ] = "executing post-scale scaling-config-action '{}'".format(
6900 vnf_config_primitive
6903 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6904 if db_vnfr
.get("additionalParamsForVnf"):
6905 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6907 # look for primitive
6908 for config_primitive
in (
6909 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6910 ).get("config-primitive", ()):
6911 if config_primitive
["name"] == vnf_config_primitive
:
6915 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
6916 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
6917 "config-primitive".format(
6918 scaling_group
, vnf_config_primitive
6921 scale_process
= "VCA"
6922 db_nsr_update
["config-status"] = "configuring post-scaling"
6923 primitive_params
= self
._map
_primitive
_params
(
6924 config_primitive
, {}, vnfr_params
6927 # Post-scale retry check: Check if this sub-operation has been executed before
6928 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6931 vnf_config_primitive
,
6935 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6936 # Skip sub-operation
6937 result
= "COMPLETED"
6938 result_detail
= "Done"
6941 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6942 vnf_config_primitive
, result
, result_detail
6946 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6947 # New sub-operation: Get index of this sub-operation
6949 len(db_nslcmop
.get("_admin", {}).get("operations"))
6954 + "vnf_config_primitive={} New sub-operation".format(
6955 vnf_config_primitive
6959 # retry: Get registered params for this existing sub-operation
6960 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6963 vnf_index
= op
.get("member_vnf_index")
6964 vnf_config_primitive
= op
.get("primitive")
6965 primitive_params
= op
.get("primitive_params")
6968 + "vnf_config_primitive={} Sub-operation retry".format(
6969 vnf_config_primitive
6972 # Execute the primitive, either with new (first-time) or registered (reintent) args
6973 ee_descriptor_id
= config_primitive
.get(
6974 "execution-environment-ref"
6976 primitive_name
= config_primitive
.get(
6977 "execution-environment-primitive", vnf_config_primitive
6979 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6980 nsr_deployed
["VCA"],
6981 member_vnf_index
=vnf_index
,
6983 vdu_count_index
=None,
6984 ee_descriptor_id
=ee_descriptor_id
,
6986 result
, result_detail
= await self
._ns
_execute
_primitive
(
6995 + "vnf_config_primitive={} Done with result {} {}".format(
6996 vnf_config_primitive
, result
, result_detail
6999 # Update operationState = COMPLETED | FAILED
7000 self
._update
_suboperation
_status
(
7001 db_nslcmop
, op_index
, result
, result_detail
7004 if result
== "FAILED":
7005 raise LcmException(result_detail
)
7006 db_nsr_update
["config-status"] = old_config_status
7007 scale_process
= None
7012 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
7013 db_nsr_update
["operational-status"] = (
7015 if old_operational_status
== "failed"
7016 else old_operational_status
7018 db_nsr_update
["config-status"] = old_config_status
7021 ROclient
.ROClientException
,
7026 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7028 except asyncio
.CancelledError
:
7030 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7032 exc
= "Operation was cancelled"
7033 except Exception as e
:
7034 exc
= traceback
.format_exc()
7035 self
.logger
.critical(
7036 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7040 self
._write
_ns
_status
(
7043 current_operation
="IDLE",
7044 current_operation_id
=None,
7047 stage
[1] = "Waiting for instantiate pending tasks."
7048 self
.logger
.debug(logging_text
+ stage
[1])
7049 exc
= await self
._wait
_for
_tasks
(
7052 self
.timeout
.ns_deploy
,
7060 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7061 nslcmop_operation_state
= "FAILED"
7063 db_nsr_update
["operational-status"] = old_operational_status
7064 db_nsr_update
["config-status"] = old_config_status
7065 db_nsr_update
["detailed-status"] = ""
7067 if "VCA" in scale_process
:
7068 db_nsr_update
["config-status"] = "failed"
7069 if "RO" in scale_process
:
7070 db_nsr_update
["operational-status"] = "failed"
7073 ] = "FAILED scaling nslcmop={} {}: {}".format(
7074 nslcmop_id
, step
, exc
7077 error_description_nslcmop
= None
7078 nslcmop_operation_state
= "COMPLETED"
7079 db_nslcmop_update
["detailed-status"] = "Done"
7081 self
._write
_op
_status
(
7084 error_message
=error_description_nslcmop
,
7085 operation_state
=nslcmop_operation_state
,
7086 other_update
=db_nslcmop_update
,
7089 self
._write
_ns
_status
(
7092 current_operation
="IDLE",
7093 current_operation_id
=None,
7094 other_update
=db_nsr_update
,
7097 if nslcmop_operation_state
:
7101 "nslcmop_id": nslcmop_id
,
7102 "operationState": nslcmop_operation_state
,
7104 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
7105 except Exception as e
:
7107 logging_text
+ "kafka_write notification Exception {}".format(e
)
7109 self
.logger
.debug(logging_text
+ "Exit")
7110 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
7112 async def _scale_kdu(
7113 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
7115 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
7116 for kdu_name
in _scaling_info
:
7117 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
7118 deployed_kdu
, index
= get_deployed_kdu(
7119 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
7121 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
7122 kdu_instance
= deployed_kdu
["kdu-instance"]
7123 kdu_model
= deployed_kdu
.get("kdu-model")
7124 scale
= int(kdu_scaling_info
["scale"])
7125 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
7128 "collection": "nsrs",
7129 "filter": {"_id": nsr_id
},
7130 "path": "_admin.deployed.K8s.{}".format(index
),
7133 step
= "scaling application {}".format(
7134 kdu_scaling_info
["resource-name"]
7136 self
.logger
.debug(logging_text
+ step
)
7138 if kdu_scaling_info
["type"] == "delete":
7139 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7142 and kdu_config
.get("terminate-config-primitive")
7143 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7145 terminate_config_primitive_list
= kdu_config
.get(
7146 "terminate-config-primitive"
7148 terminate_config_primitive_list
.sort(
7149 key
=lambda val
: int(val
["seq"])
7153 terminate_config_primitive
7154 ) in terminate_config_primitive_list
:
7155 primitive_params_
= self
._map
_primitive
_params
(
7156 terminate_config_primitive
, {}, {}
7158 step
= "execute terminate config primitive"
7159 self
.logger
.debug(logging_text
+ step
)
7160 await asyncio
.wait_for(
7161 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7162 cluster_uuid
=cluster_uuid
,
7163 kdu_instance
=kdu_instance
,
7164 primitive_name
=terminate_config_primitive
["name"],
7165 params
=primitive_params_
,
7167 total_timeout
=self
.timeout
.primitive
,
7170 timeout
=self
.timeout
.primitive
7171 * self
.timeout
.primitive_outer_factor
,
7174 await asyncio
.wait_for(
7175 self
.k8scluster_map
[k8s_cluster_type
].scale(
7176 kdu_instance
=kdu_instance
,
7178 resource_name
=kdu_scaling_info
["resource-name"],
7179 total_timeout
=self
.timeout
.scale_on_error
,
7181 cluster_uuid
=cluster_uuid
,
7182 kdu_model
=kdu_model
,
7186 timeout
=self
.timeout
.scale_on_error
7187 * self
.timeout
.scale_on_error_outer_factor
,
7190 if kdu_scaling_info
["type"] == "create":
7191 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7194 and kdu_config
.get("initial-config-primitive")
7195 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7197 initial_config_primitive_list
= kdu_config
.get(
7198 "initial-config-primitive"
7200 initial_config_primitive_list
.sort(
7201 key
=lambda val
: int(val
["seq"])
7204 for initial_config_primitive
in initial_config_primitive_list
:
7205 primitive_params_
= self
._map
_primitive
_params
(
7206 initial_config_primitive
, {}, {}
7208 step
= "execute initial config primitive"
7209 self
.logger
.debug(logging_text
+ step
)
7210 await asyncio
.wait_for(
7211 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7212 cluster_uuid
=cluster_uuid
,
7213 kdu_instance
=kdu_instance
,
7214 primitive_name
=initial_config_primitive
["name"],
7215 params
=primitive_params_
,
7222 async def _scale_ng_ro(
7223 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
7225 nsr_id
= db_nslcmop
["nsInstanceId"]
7226 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7229 # read from db: vnfd's for every vnf
7232 # for each vnf in ns, read vnfd
7233 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
7234 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
7235 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
7236 # if we haven't this vnfd, read it from db
7237 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
7239 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7240 db_vnfds
.append(vnfd
)
7241 n2vc_key
= self
.n2vc
.get_public_key()
7242 n2vc_key_list
= [n2vc_key
]
7245 vdu_scaling_info
.get("vdu-create"),
7246 vdu_scaling_info
.get("vdu-delete"),
7249 # db_vnfr has been updated, update db_vnfrs to use it
7250 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
7251 await self
._instantiate
_ng
_ro
(
7261 start_deploy
=time(),
7262 timeout_ns_deploy
=self
.timeout
.ns_deploy
,
7264 if vdu_scaling_info
.get("vdu-delete"):
7266 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
7269 async def extract_prometheus_scrape_jobs(
7273 ee_config_descriptor
: dict,
7278 vnf_member_index
: str = "",
7280 vdu_index
: int = None,
7282 kdu_index
: int = None,
7284 """Method to extract prometheus scrape jobs from EE's Prometheus template job file
7285 This method will wait until the corresponding VDU or KDU is fully instantiated
7288 ee_id (str): Execution Environment ID
7289 artifact_path (str): Path where the EE's content is (including the Prometheus template file)
7290 ee_config_descriptor (dict): Execution Environment's configuration descriptor
7291 vnfr_id (str): VNFR ID where this EE applies
7292 nsr_id (str): NSR ID where this EE applies
7293 target_ip (str): VDU/KDU instance IP address
7294 element_type (str): NS or VNF or VDU or KDU
7295 vnf_member_index (str, optional): VNF index where this EE applies. Defaults to "".
7296 vdu_id (str, optional): VDU ID where this EE applies. Defaults to "".
7297 vdu_index (int, optional): VDU index where this EE applies. Defaults to None.
7298 kdu_name (str, optional): KDU name where this EE applies. Defaults to "".
7299 kdu_index (int, optional): KDU index where this EE applies. Defaults to None.
7302 LcmException: When the VDU or KDU instance was not found in an hour
7305 _type_: Prometheus jobs
7307 # default the vdur and kdur names to an empty string, to avoid any later
7308 # problem with Prometheus when the element type is not VDU or KDU
7312 # look if exist a file called 'prometheus*.j2' and
7313 artifact_content
= self
.fs
.dir_ls(artifact_path
)
7317 for f
in artifact_content
7318 if f
.startswith("prometheus") and f
.endswith(".j2")
7324 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
7327 # obtain the VDUR or KDUR, if the element type is VDU or KDU
7328 if element_type
in ("VDU", "KDU"):
7329 for _
in range(360):
7330 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
7331 if vdu_id
and vdu_index
is not None:
7335 for x
in get_iterable(db_vnfr
, "vdur")
7337 x
.get("vdu-id-ref") == vdu_id
7338 and x
.get("count-index") == vdu_index
7343 if vdur
.get("name"):
7344 vdur_name
= vdur
.get("name")
7346 if kdu_name
and kdu_index
is not None:
7350 for x
in get_iterable(db_vnfr
, "kdur")
7352 x
.get("kdu-name") == kdu_name
7353 and x
.get("count-index") == kdu_index
7358 if kdur
.get("name"):
7359 kdur_name
= kdur
.get("name")
7362 await asyncio
.sleep(10, loop
=self
.loop
)
7364 if vdu_id
and vdu_index
is not None:
7366 f
"Timeout waiting VDU with name={vdu_id} and index={vdu_index} to be intantiated"
7368 if kdu_name
and kdu_index
is not None:
7370 f
"Timeout waiting KDU with name={kdu_name} and index={kdu_index} to be intantiated"
7374 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
7375 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
7377 vnfr_id
= vnfr_id
.replace("-", "")
7379 "JOB_NAME": vnfr_id
,
7380 "TARGET_IP": target_ip
,
7381 "EXPORTER_POD_IP": host_name
,
7382 "EXPORTER_POD_PORT": host_port
,
7384 "VNF_MEMBER_INDEX": vnf_member_index
,
7385 "VDUR_NAME": vdur_name
,
7386 "KDUR_NAME": kdur_name
,
7387 "ELEMENT_TYPE": element_type
,
7389 job_list
= parse_job(job_data
, variables
)
7390 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
7391 for job
in job_list
:
7393 not isinstance(job
.get("job_name"), str)
7394 or vnfr_id
not in job
["job_name"]
7396 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
7397 job
["nsr_id"] = nsr_id
7398 job
["vnfr_id"] = vnfr_id
7401 async def rebuild_start_stop(
7402 self
, nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
7404 logging_text
= "Task ns={} {}={} ".format(nsr_id
, operation_type
, nslcmop_id
)
7405 self
.logger
.info(logging_text
+ "Enter")
7406 stage
= ["Preparing the environment", ""]
7407 # database nsrs record
7411 # in case of error, indicates what part of scale was failed to put nsr at error status
7412 start_deploy
= time()
7414 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_id
})
7415 vim_account_id
= db_vnfr
.get("vim-account-id")
7416 vim_info_key
= "vim:" + vim_account_id
7417 vdu_id
= additional_param
["vdu_id"]
7418 vdurs
= [item
for item
in db_vnfr
["vdur"] if item
["vdu-id-ref"] == vdu_id
]
7419 vdur
= find_in_list(
7420 vdurs
, lambda vdu
: vdu
["count-index"] == additional_param
["count-index"]
7423 vdu_vim_name
= vdur
["name"]
7424 vim_vm_id
= vdur
["vim_info"][vim_info_key
]["vim_id"]
7425 target_vim
, _
= next(k_v
for k_v
in vdur
["vim_info"].items())
7427 raise LcmException("Target vdu is not found")
7428 self
.logger
.info("vdu_vim_name >> {} ".format(vdu_vim_name
))
7429 # wait for any previous tasks in process
7430 stage
[1] = "Waiting for previous operations to terminate"
7431 self
.logger
.info(stage
[1])
7432 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7434 stage
[1] = "Reading from database."
7435 self
.logger
.info(stage
[1])
7436 self
._write
_ns
_status
(
7439 current_operation
=operation_type
.upper(),
7440 current_operation_id
=nslcmop_id
,
7442 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7445 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
7446 db_nsr_update
["operational-status"] = operation_type
7447 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7451 "vim_vm_id": vim_vm_id
,
7453 "vdu_index": additional_param
["count-index"],
7454 "vdu_id": vdur
["id"],
7455 "target_vim": target_vim
,
7456 "vim_account_id": vim_account_id
,
7459 stage
[1] = "Sending rebuild request to RO... {}".format(desc
)
7460 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7461 self
.logger
.info("ro nsr id: {}".format(nsr_id
))
7462 result_dict
= await self
.RO
.operate(nsr_id
, desc
, operation_type
)
7463 self
.logger
.info("response from RO: {}".format(result_dict
))
7464 action_id
= result_dict
["action_id"]
7465 await self
._wait
_ng
_ro
(
7470 self
.timeout
.operate
,
7472 "start_stop_rebuild",
7474 return "COMPLETED", "Done"
7475 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7476 self
.logger
.error("Exit Exception {}".format(e
))
7478 except asyncio
.CancelledError
:
7479 self
.logger
.error("Cancelled Exception while '{}'".format(stage
))
7480 exc
= "Operation was cancelled"
7481 except Exception as e
:
7482 exc
= traceback
.format_exc()
7483 self
.logger
.critical(
7484 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7486 return "FAILED", "Error in operate VNF {}".format(exc
)
7488 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7490 Get VCA Cloud and VCA Cloud Credentials for the VIM account
7492 :param: vim_account_id: VIM Account ID
7494 :return: (cloud_name, cloud_credential)
7496 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7497 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
7499 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7501 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
7503 :param: vim_account_id: VIM Account ID
7505 :return: (cloud_name, cloud_credential)
7507 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7508 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")
7510 async def migrate(self
, nsr_id
, nslcmop_id
):
7512 Migrate VNFs and VDUs instances in a NS
7514 :param: nsr_id: NS Instance ID
7515 :param: nslcmop_id: nslcmop ID of migrate
7518 # Try to lock HA task here
7519 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7520 if not task_is_locked_by_me
:
7522 logging_text
= "Task ns={} migrate ".format(nsr_id
)
7523 self
.logger
.debug(logging_text
+ "Enter")
7524 # get all needed from database
7526 db_nslcmop_update
= {}
7527 nslcmop_operation_state
= None
7531 # in case of error, indicates what part of scale was failed to put nsr at error status
7532 start_deploy
= time()
7535 # wait for any previous tasks in process
7536 step
= "Waiting for previous operations to terminate"
7537 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7539 self
._write
_ns
_status
(
7542 current_operation
="MIGRATING",
7543 current_operation_id
=nslcmop_id
,
7545 step
= "Getting nslcmop from database"
7547 step
+ " after having waited for previous tasks to be completed"
7549 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7550 migrate_params
= db_nslcmop
.get("operationParams")
7553 target
.update(migrate_params
)
7554 desc
= await self
.RO
.migrate(nsr_id
, target
)
7555 self
.logger
.debug("RO return > {}".format(desc
))
7556 action_id
= desc
["action_id"]
7557 await self
._wait
_ng
_ro
(
7562 self
.timeout
.migrate
,
7563 operation
="migrate",
7565 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7566 self
.logger
.error("Exit Exception {}".format(e
))
7568 except asyncio
.CancelledError
:
7569 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
7570 exc
= "Operation was cancelled"
7571 except Exception as e
:
7572 exc
= traceback
.format_exc()
7573 self
.logger
.critical(
7574 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7577 self
._write
_ns
_status
(
7580 current_operation
="IDLE",
7581 current_operation_id
=None,
7584 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
7585 nslcmop_operation_state
= "FAILED"
7587 nslcmop_operation_state
= "COMPLETED"
7588 db_nslcmop_update
["detailed-status"] = "Done"
7589 db_nsr_update
["detailed-status"] = "Done"
7591 self
._write
_op
_status
(
7595 operation_state
=nslcmop_operation_state
,
7596 other_update
=db_nslcmop_update
,
7598 if nslcmop_operation_state
:
7602 "nslcmop_id": nslcmop_id
,
7603 "operationState": nslcmop_operation_state
,
7605 await self
.msg
.aiowrite("ns", "migrated", msg
, loop
=self
.loop
)
7606 except Exception as e
:
7608 logging_text
+ "kafka_write notification Exception {}".format(e
)
7610 self
.logger
.debug(logging_text
+ "Exit")
7611 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_migrate")
7613 async def heal(self
, nsr_id
, nslcmop_id
):
7617 :param nsr_id: ns instance to heal
7618 :param nslcmop_id: operation to run
7622 # Try to lock HA task here
7623 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7624 if not task_is_locked_by_me
:
7627 logging_text
= "Task ns={} heal={} ".format(nsr_id
, nslcmop_id
)
7628 stage
= ["", "", ""]
7629 tasks_dict_info
= {}
7630 # ^ stage, step, VIM progress
7631 self
.logger
.debug(logging_text
+ "Enter")
7632 # get all needed from database
7634 db_nslcmop_update
= {}
7636 db_vnfrs
= {} # vnf's info indexed by _id
7638 old_operational_status
= ""
7639 old_config_status
= ""
7642 # wait for any previous tasks in process
7643 step
= "Waiting for previous operations to terminate"
7644 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7645 self
._write
_ns
_status
(
7648 current_operation
="HEALING",
7649 current_operation_id
=nslcmop_id
,
7652 step
= "Getting nslcmop from database"
7654 step
+ " after having waited for previous tasks to be completed"
7656 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7658 step
= "Getting nsr from database"
7659 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
7660 old_operational_status
= db_nsr
["operational-status"]
7661 old_config_status
= db_nsr
["config-status"]
7664 "_admin.deployed.RO.operational-status": "healing",
7666 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7668 step
= "Sending heal order to VIM"
7670 logging_text
=logging_text
,
7672 db_nslcmop
=db_nslcmop
,
7677 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
7678 self
.logger
.debug(logging_text
+ stage
[1])
7679 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7680 self
.fs
.sync(db_nsr
["nsd-id"])
7682 # read from db: vnfr's of this ns
7683 step
= "Getting vnfrs from db"
7684 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
7685 for vnfr
in db_vnfrs_list
:
7686 db_vnfrs
[vnfr
["_id"]] = vnfr
7687 self
.logger
.debug("ns.heal db_vnfrs={}".format(db_vnfrs
))
7689 # Check for each target VNF
7690 target_list
= db_nslcmop
.get("operationParams", {}).get("healVnfData", {})
7691 for target_vnf
in target_list
:
7692 # Find this VNF in the list from DB
7693 vnfr_id
= target_vnf
.get("vnfInstanceId", None)
7695 db_vnfr
= db_vnfrs
[vnfr_id
]
7696 vnfd_id
= db_vnfr
.get("vnfd-id")
7697 vnfd_ref
= db_vnfr
.get("vnfd-ref")
7698 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7699 base_folder
= vnfd
["_admin"]["storage"]
7704 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
7705 member_vnf_index
= db_vnfr
.get("member-vnf-index-ref")
7707 # Check each target VDU and deploy N2VC
7708 target_vdu_list
= target_vnf
.get("additionalParams", {}).get(
7711 if not target_vdu_list
:
7712 # Codigo nuevo para crear diccionario
7713 target_vdu_list
= []
7714 for existing_vdu
in db_vnfr
.get("vdur"):
7715 vdu_name
= existing_vdu
.get("vdu-name", None)
7716 vdu_index
= existing_vdu
.get("count-index", 0)
7717 vdu_run_day1
= target_vnf
.get("additionalParams", {}).get(
7720 vdu_to_be_healed
= {
7722 "count-index": vdu_index
,
7723 "run-day1": vdu_run_day1
,
7725 target_vdu_list
.append(vdu_to_be_healed
)
7726 for target_vdu
in target_vdu_list
:
7727 deploy_params_vdu
= target_vdu
7728 # Set run-day1 vnf level value if not vdu level value exists
7729 if not deploy_params_vdu
.get("run-day1") and target_vnf
[
7732 deploy_params_vdu
["run-day1"] = target_vnf
[
7735 vdu_name
= target_vdu
.get("vdu-id", None)
7736 # TODO: Get vdu_id from vdud.
7738 # For multi instance VDU count-index is mandatory
7739 # For single session VDU count-indes is 0
7740 vdu_index
= target_vdu
.get("count-index", 0)
7742 # n2vc_redesign STEP 3 to 6 Deploy N2VC
7743 stage
[1] = "Deploying Execution Environments."
7744 self
.logger
.debug(logging_text
+ stage
[1])
7746 # VNF Level charm. Normal case when proxy charms.
7747 # If target instance is management machine continue with actions: recreate EE for native charms or reinject juju key for proxy charms.
7748 descriptor_config
= get_configuration(vnfd
, vnfd_ref
)
7749 if descriptor_config
:
7750 # Continue if healed machine is management machine
7751 vnf_ip_address
= db_vnfr
.get("ip-address")
7752 target_instance
= None
7753 for instance
in db_vnfr
.get("vdur", None):
7755 instance
["vdu-name"] == vdu_name
7756 and instance
["count-index"] == vdu_index
7758 target_instance
= instance
7760 if vnf_ip_address
== target_instance
.get("ip-address"):
7762 logging_text
=logging_text
7763 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7764 member_vnf_index
, vdu_name
, vdu_index
7768 nslcmop_id
=nslcmop_id
,
7774 member_vnf_index
=member_vnf_index
,
7777 deploy_params
=deploy_params_vdu
,
7778 descriptor_config
=descriptor_config
,
7779 base_folder
=base_folder
,
7780 task_instantiation_info
=tasks_dict_info
,
7784 # VDU Level charm. Normal case with native charms.
7785 descriptor_config
= get_configuration(vnfd
, vdu_name
)
7786 if descriptor_config
:
7788 logging_text
=logging_text
7789 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7790 member_vnf_index
, vdu_name
, vdu_index
7794 nslcmop_id
=nslcmop_id
,
7800 member_vnf_index
=member_vnf_index
,
7801 vdu_index
=vdu_index
,
7803 deploy_params
=deploy_params_vdu
,
7804 descriptor_config
=descriptor_config
,
7805 base_folder
=base_folder
,
7806 task_instantiation_info
=tasks_dict_info
,
7811 ROclient
.ROClientException
,
7816 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7818 except asyncio
.CancelledError
:
7820 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7822 exc
= "Operation was cancelled"
7823 except Exception as e
:
7824 exc
= traceback
.format_exc()
7825 self
.logger
.critical(
7826 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7831 stage
[1] = "Waiting for healing pending tasks."
7832 self
.logger
.debug(logging_text
+ stage
[1])
7833 exc
= await self
._wait
_for
_tasks
(
7836 self
.timeout
.ns_deploy
,
7844 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7845 nslcmop_operation_state
= "FAILED"
7847 db_nsr_update
["operational-status"] = old_operational_status
7848 db_nsr_update
["config-status"] = old_config_status
7851 ] = "FAILED healing nslcmop={} {}: {}".format(nslcmop_id
, step
, exc
)
7852 for task
, task_name
in tasks_dict_info
.items():
7853 if not task
.done() or task
.cancelled() or task
.exception():
7854 if task_name
.startswith(self
.task_name_deploy_vca
):
7855 # A N2VC task is pending
7856 db_nsr_update
["config-status"] = "failed"
7858 # RO task is pending
7859 db_nsr_update
["operational-status"] = "failed"
7861 error_description_nslcmop
= None
7862 nslcmop_operation_state
= "COMPLETED"
7863 db_nslcmop_update
["detailed-status"] = "Done"
7864 db_nsr_update
["detailed-status"] = "Done"
7865 db_nsr_update
["operational-status"] = "running"
7866 db_nsr_update
["config-status"] = "configured"
7868 self
._write
_op
_status
(
7871 error_message
=error_description_nslcmop
,
7872 operation_state
=nslcmop_operation_state
,
7873 other_update
=db_nslcmop_update
,
7876 self
._write
_ns
_status
(
7879 current_operation
="IDLE",
7880 current_operation_id
=None,
7881 other_update
=db_nsr_update
,
7884 if nslcmop_operation_state
:
7888 "nslcmop_id": nslcmop_id
,
7889 "operationState": nslcmop_operation_state
,
7891 await self
.msg
.aiowrite("ns", "healed", msg
, loop
=self
.loop
)
7892 except Exception as e
:
7894 logging_text
+ "kafka_write notification Exception {}".format(e
)
7896 self
.logger
.debug(logging_text
+ "Exit")
7897 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_heal")
7908 :param logging_text: preffix text to use at logging
7909 :param nsr_id: nsr identity
7910 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
7911 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
7912 :return: None or exception
7915 def get_vim_account(vim_account_id
):
7917 if vim_account_id
in db_vims
:
7918 return db_vims
[vim_account_id
]
7919 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
7920 db_vims
[vim_account_id
] = db_vim
7925 ns_params
= db_nslcmop
.get("operationParams")
7926 if ns_params
and ns_params
.get("timeout_ns_heal"):
7927 timeout_ns_heal
= ns_params
["timeout_ns_heal"]
7929 timeout_ns_heal
= self
.timeout
.ns_heal
7933 nslcmop_id
= db_nslcmop
["_id"]
7935 "action_id": nslcmop_id
,
7937 self
.logger
.warning(
7938 "db_nslcmop={} and timeout_ns_heal={}".format(
7939 db_nslcmop
, timeout_ns_heal
7942 target
.update(db_nslcmop
.get("operationParams", {}))
7944 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
7945 desc
= await self
.RO
.recreate(nsr_id
, target
)
7946 self
.logger
.debug("RO return > {}".format(desc
))
7947 action_id
= desc
["action_id"]
7948 # waits for RO to complete because Reinjecting juju key at ro can find VM in state Deleted
7949 await self
._wait
_ng
_ro
(
7956 operation
="healing",
7961 "_admin.deployed.RO.operational-status": "running",
7962 "detailed-status": " ".join(stage
),
7964 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7965 self
._write
_op
_status
(nslcmop_id
, stage
)
7967 logging_text
+ "ns healed at RO. RO_id={}".format(action_id
)
7970 except Exception as e
:
7971 stage
[2] = "ERROR healing at VIM"
7972 # self.set_vnfr_at_error(db_vnfrs, str(e))
7974 "Error healing at VIM {}".format(e
),
7975 exc_info
=not isinstance(
7978 ROclient
.ROClientException
,
8004 task_instantiation_info
,
8007 # launch instantiate_N2VC in a asyncio task and register task object
8008 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
8009 # if not found, create one entry and update database
8010 # fill db_nsr._admin.deployed.VCA.<index>
8013 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
8017 get_charm_name
= False
8018 if "execution-environment-list" in descriptor_config
:
8019 ee_list
= descriptor_config
.get("execution-environment-list", [])
8020 elif "juju" in descriptor_config
:
8021 ee_list
= [descriptor_config
] # ns charms
8022 if "execution-environment-list" not in descriptor_config
:
8023 # charm name is only required for ns charms
8024 get_charm_name
= True
8025 else: # other types as script are not supported
8028 for ee_item
in ee_list
:
8031 + "_deploy_n2vc ee_item juju={}, helm={}".format(
8032 ee_item
.get("juju"), ee_item
.get("helm-chart")
8035 ee_descriptor_id
= ee_item
.get("id")
8036 if ee_item
.get("juju"):
8037 vca_name
= ee_item
["juju"].get("charm")
8039 charm_name
= self
.find_charm_name(db_nsr
, str(vca_name
))
8042 if ee_item
["juju"].get("charm") is not None
8045 if ee_item
["juju"].get("cloud") == "k8s":
8046 vca_type
= "k8s_proxy_charm"
8047 elif ee_item
["juju"].get("proxy") is False:
8048 vca_type
= "native_charm"
8049 elif ee_item
.get("helm-chart"):
8050 vca_name
= ee_item
["helm-chart"]
8051 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
8054 vca_type
= "helm-v3"
8057 logging_text
+ "skipping non juju neither charm configuration"
8062 for vca_index
, vca_deployed
in enumerate(
8063 db_nsr
["_admin"]["deployed"]["VCA"]
8065 if not vca_deployed
:
8068 vca_deployed
.get("member-vnf-index") == member_vnf_index
8069 and vca_deployed
.get("vdu_id") == vdu_id
8070 and vca_deployed
.get("kdu_name") == kdu_name
8071 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
8072 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
8076 # not found, create one.
8078 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
8081 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
8083 target
+= "/kdu/{}".format(kdu_name
)
8085 "target_element": target
,
8086 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
8087 "member-vnf-index": member_vnf_index
,
8089 "kdu_name": kdu_name
,
8090 "vdu_count_index": vdu_index
,
8091 "operational-status": "init", # TODO revise
8092 "detailed-status": "", # TODO revise
8093 "step": "initial-deploy", # TODO revise
8095 "vdu_name": vdu_name
,
8097 "ee_descriptor_id": ee_descriptor_id
,
8098 "charm_name": charm_name
,
8102 # create VCA and configurationStatus in db
8104 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
8105 "configurationStatus.{}".format(vca_index
): dict(),
8107 self
.update_db_2("nsrs", nsr_id
, db_dict
)
8109 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
8111 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
8112 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
8113 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
8116 task_n2vc
= asyncio
.ensure_future(
8118 logging_text
=logging_text
,
8119 vca_index
=vca_index
,
8125 vdu_index
=vdu_index
,
8126 deploy_params
=deploy_params
,
8127 config_descriptor
=descriptor_config
,
8128 base_folder
=base_folder
,
8129 nslcmop_id
=nslcmop_id
,
8133 ee_config_descriptor
=ee_item
,
8136 self
.lcm_tasks
.register(
8140 "instantiate_N2VC-{}".format(vca_index
),
8143 task_instantiation_info
[
8145 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
8146 member_vnf_index
or "", vdu_id
or ""
8149 async def heal_N2VC(
8166 ee_config_descriptor
,
8168 nsr_id
= db_nsr
["_id"]
8169 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
8170 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
8171 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
8172 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
8174 "collection": "nsrs",
8175 "filter": {"_id": nsr_id
},
8176 "path": db_update_entry
,
8182 element_under_configuration
= nsr_id
8186 vnfr_id
= db_vnfr
["_id"]
8187 osm_config
["osm"]["vnf_id"] = vnfr_id
8189 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
8191 if vca_type
== "native_charm":
8194 index_number
= vdu_index
or 0
8197 element_type
= "VNF"
8198 element_under_configuration
= vnfr_id
8199 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
8201 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
8202 element_type
= "VDU"
8203 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
8204 osm_config
["osm"]["vdu_id"] = vdu_id
8206 namespace
+= ".{}".format(kdu_name
)
8207 element_type
= "KDU"
8208 element_under_configuration
= kdu_name
8209 osm_config
["osm"]["kdu_name"] = kdu_name
8212 if base_folder
["pkg-dir"]:
8213 artifact_path
= "{}/{}/{}/{}".format(
8214 base_folder
["folder"],
8215 base_folder
["pkg-dir"],
8218 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8223 artifact_path
= "{}/Scripts/{}/{}/".format(
8224 base_folder
["folder"],
8227 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8232 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
8234 # get initial_config_primitive_list that applies to this element
8235 initial_config_primitive_list
= config_descriptor
.get(
8236 "initial-config-primitive"
8240 "Initial config primitive list > {}".format(
8241 initial_config_primitive_list
8245 # add config if not present for NS charm
8246 ee_descriptor_id
= ee_config_descriptor
.get("id")
8247 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
8248 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
8249 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
8253 "Initial config primitive list #2 > {}".format(
8254 initial_config_primitive_list
8257 # n2vc_redesign STEP 3.1
8258 # find old ee_id if exists
8259 ee_id
= vca_deployed
.get("ee_id")
8261 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
8262 # create or register execution environment in VCA. Only for native charms when healing
8263 if vca_type
== "native_charm":
8264 step
= "Waiting to VM being up and getting IP address"
8265 self
.logger
.debug(logging_text
+ step
)
8266 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8275 credentials
= {"hostname": rw_mgmt_ip
}
8277 username
= deep_get(
8278 config_descriptor
, ("config-access", "ssh-access", "default-user")
8280 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
8281 # merged. Meanwhile let's get username from initial-config-primitive
8282 if not username
and initial_config_primitive_list
:
8283 for config_primitive
in initial_config_primitive_list
:
8284 for param
in config_primitive
.get("parameter", ()):
8285 if param
["name"] == "ssh-username":
8286 username
= param
["value"]
8290 "Cannot determine the username neither with 'initial-config-primitive' nor with "
8291 "'config-access.ssh-access.default-user'"
8293 credentials
["username"] = username
8295 # n2vc_redesign STEP 3.2
8296 # TODO: Before healing at RO it is needed to destroy native charm units to be deleted.
8297 self
._write
_configuration
_status
(
8299 vca_index
=vca_index
,
8300 status
="REGISTERING",
8301 element_under_configuration
=element_under_configuration
,
8302 element_type
=element_type
,
8305 step
= "register execution environment {}".format(credentials
)
8306 self
.logger
.debug(logging_text
+ step
)
8307 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
8308 credentials
=credentials
,
8309 namespace
=namespace
,
8314 # update ee_id en db
8316 "_admin.deployed.VCA.{}.ee_id".format(vca_index
): ee_id
,
8318 self
.update_db_2("nsrs", nsr_id
, db_dict_ee_id
)
8320 # for compatibility with MON/POL modules, the need model and application name at database
8321 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
8322 # Not sure if this need to be done when healing
8324 ee_id_parts = ee_id.split(".")
8325 db_nsr_update = {db_update_entry + "ee_id": ee_id}
8326 if len(ee_id_parts) >= 2:
8327 model_name = ee_id_parts[0]
8328 application_name = ee_id_parts[1]
8329 db_nsr_update[db_update_entry + "model"] = model_name
8330 db_nsr_update[db_update_entry + "application"] = application_name
8333 # n2vc_redesign STEP 3.3
8334 # Install configuration software. Only for native charms.
8335 step
= "Install configuration Software"
8337 self
._write
_configuration
_status
(
8339 vca_index
=vca_index
,
8340 status
="INSTALLING SW",
8341 element_under_configuration
=element_under_configuration
,
8342 element_type
=element_type
,
8343 # other_update=db_nsr_update,
8347 # TODO check if already done
8348 self
.logger
.debug(logging_text
+ step
)
8350 if vca_type
== "native_charm":
8351 config_primitive
= next(
8352 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
8355 if config_primitive
:
8356 config
= self
._map
_primitive
_params
(
8357 config_primitive
, {}, deploy_params
8359 await self
.vca_map
[vca_type
].install_configuration_sw(
8361 artifact_path
=artifact_path
,
8369 # write in db flag of configuration_sw already installed
8371 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
8374 # Not sure if this need to be done when healing
8376 # add relations for this VCA (wait for other peers related with this VCA)
8377 await self._add_vca_relations(
8378 logging_text=logging_text,
8381 vca_index=vca_index,
8385 # if SSH access is required, then get execution environment SSH public
8386 # if native charm we have waited already to VM be UP
8387 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
8390 # self.logger.debug("get ssh key block")
8392 config_descriptor
, ("config-access", "ssh-access", "required")
8394 # self.logger.debug("ssh key needed")
8395 # Needed to inject a ssh key
8398 ("config-access", "ssh-access", "default-user"),
8400 step
= "Install configuration Software, getting public ssh key"
8401 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
8402 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
8405 step
= "Insert public key into VM user={} ssh_key={}".format(
8409 # self.logger.debug("no need to get ssh key")
8410 step
= "Waiting to VM being up and getting IP address"
8411 self
.logger
.debug(logging_text
+ step
)
8413 # n2vc_redesign STEP 5.1
8414 # wait for RO (ip-address) Insert pub_key into VM
8415 # IMPORTANT: We need do wait for RO to complete healing operation.
8416 await self
._wait
_heal
_ro
(nsr_id
, self
.timeout
.ns_heal
)
8419 rw_mgmt_ip
= await self
.wait_kdu_up(
8420 logging_text
, nsr_id
, vnfr_id
, kdu_name
8423 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8433 rw_mgmt_ip
= None # This is for a NS configuration
8435 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
8437 # store rw_mgmt_ip in deploy params for later replacement
8438 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
8441 # get run-day1 operation parameter
8442 runDay1
= deploy_params
.get("run-day1", False)
8444 "Healing vnf={}, vdu={}, runDay1 ={}".format(vnfr_id
, vdu_id
, runDay1
)
8447 # n2vc_redesign STEP 6 Execute initial config primitive
8448 step
= "execute initial config primitive"
8450 # wait for dependent primitives execution (NS -> VNF -> VDU)
8451 if initial_config_primitive_list
:
8452 await self
._wait
_dependent
_n
2vc
(
8453 nsr_id
, vca_deployed_list
, vca_index
8456 # stage, in function of element type: vdu, kdu, vnf or ns
8457 my_vca
= vca_deployed_list
[vca_index
]
8458 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
8460 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
8461 elif my_vca
.get("member-vnf-index"):
8463 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
8466 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
8468 self
._write
_configuration
_status
(
8469 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
8472 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
8474 check_if_terminated_needed
= True
8475 for initial_config_primitive
in initial_config_primitive_list
:
8476 # adding information on the vca_deployed if it is a NS execution environment
8477 if not vca_deployed
["member-vnf-index"]:
8478 deploy_params
["ns_config_info"] = json
.dumps(
8479 self
._get
_ns
_config
_info
(nsr_id
)
8481 # TODO check if already done
8482 primitive_params_
= self
._map
_primitive
_params
(
8483 initial_config_primitive
, {}, deploy_params
8486 step
= "execute primitive '{}' params '{}'".format(
8487 initial_config_primitive
["name"], primitive_params_
8489 self
.logger
.debug(logging_text
+ step
)
8490 await self
.vca_map
[vca_type
].exec_primitive(
8492 primitive_name
=initial_config_primitive
["name"],
8493 params_dict
=primitive_params_
,
8498 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
8499 if check_if_terminated_needed
:
8500 if config_descriptor
.get("terminate-config-primitive"):
8504 {db_update_entry
+ "needed_terminate": True},
8506 check_if_terminated_needed
= False
8508 # TODO register in database that primitive is done
8510 # STEP 7 Configure metrics
8511 # Not sure if this need to be done when healing
8513 if vca_type == "helm" or vca_type == "helm-v3":
8514 prometheus_jobs = await self.extract_prometheus_scrape_jobs(
8516 artifact_path=artifact_path,
8517 ee_config_descriptor=ee_config_descriptor,
8520 target_ip=rw_mgmt_ip,
8526 {db_update_entry + "prometheus_jobs": prometheus_jobs},
8529 for job in prometheus_jobs:
8532 {"job_name": job["job_name"]},
8535 fail_on_empty=False,
8539 step
= "instantiated at VCA"
8540 self
.logger
.debug(logging_text
+ step
)
8542 self
._write
_configuration
_status
(
8543 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
8546 except Exception as e
: # TODO not use Exception but N2VC exception
8547 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
8549 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
8552 "Exception while {} : {}".format(step
, e
), exc_info
=True
8554 self
._write
_configuration
_status
(
8555 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
8557 raise LcmException("{} {}".format(step
, e
)) from e
8559 async def _wait_heal_ro(
8565 while time() <= start_time
+ timeout
:
8566 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
8567 operational_status_ro
= db_nsr
["_admin"]["deployed"]["RO"][
8568 "operational-status"
8570 self
.logger
.debug("Wait Heal RO > {}".format(operational_status_ro
))
8571 if operational_status_ro
!= "healing":
8573 await asyncio
.sleep(15, loop
=self
.loop
)
8574 else: # timeout_ns_deploy
8575 raise NgRoException("Timeout waiting ns to deploy")
8577 async def vertical_scale(self
, nsr_id
, nslcmop_id
):
8579 Vertical Scale the VDUs in a NS
8581 :param: nsr_id: NS Instance ID
8582 :param: nslcmop_id: nslcmop ID of migrate
8585 # Try to lock HA task here
8586 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
8587 if not task_is_locked_by_me
:
8589 logging_text
= "Task ns={} vertical scale ".format(nsr_id
)
8590 self
.logger
.debug(logging_text
+ "Enter")
8591 # get all needed from database
8593 db_nslcmop_update
= {}
8594 nslcmop_operation_state
= None
8598 # in case of error, indicates what part of scale was failed to put nsr at error status
8599 start_deploy
= time()
8602 # wait for any previous tasks in process
8603 step
= "Waiting for previous operations to terminate"
8604 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
8606 self
._write
_ns
_status
(
8609 current_operation
="VerticalScale",
8610 current_operation_id
=nslcmop_id
,
8612 step
= "Getting nslcmop from database"
8614 step
+ " after having waited for previous tasks to be completed"
8616 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
8617 operationParams
= db_nslcmop
.get("operationParams")
8619 target
.update(operationParams
)
8620 desc
= await self
.RO
.vertical_scale(nsr_id
, target
)
8621 self
.logger
.debug("RO return > {}".format(desc
))
8622 action_id
= desc
["action_id"]
8623 await self
._wait
_ng
_ro
(
8628 self
.timeout
.verticalscale
,
8629 operation
="verticalscale",
8631 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
8632 self
.logger
.error("Exit Exception {}".format(e
))
8634 except asyncio
.CancelledError
:
8635 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
8636 exc
= "Operation was cancelled"
8637 except Exception as e
:
8638 exc
= traceback
.format_exc()
8639 self
.logger
.critical(
8640 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
8643 self
._write
_ns
_status
(
8646 current_operation
="IDLE",
8647 current_operation_id
=None,
8650 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
8651 nslcmop_operation_state
= "FAILED"
8653 nslcmop_operation_state
= "COMPLETED"
8654 db_nslcmop_update
["detailed-status"] = "Done"
8655 db_nsr_update
["detailed-status"] = "Done"
8657 self
._write
_op
_status
(
8661 operation_state
=nslcmop_operation_state
,
8662 other_update
=db_nslcmop_update
,
8664 if nslcmop_operation_state
:
8668 "nslcmop_id": nslcmop_id
,
8669 "operationState": nslcmop_operation_state
,
8671 await self
.msg
.aiowrite("ns", "verticalscaled", msg
, loop
=self
.loop
)
8672 except Exception as e
:
8674 logging_text
+ "kafka_write notification Exception {}".format(e
)
8676 self
.logger
.debug(logging_text
+ "Exit")
8677 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_verticalscale")