1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 from typing
import Any
, Dict
, List
24 import logging
.handlers
36 from osm_lcm
import ROclient
37 from osm_lcm
.data_utils
.lcm_config
import LcmCfg
38 from osm_lcm
.data_utils
.nsr
import (
41 get_deployed_vca_list
,
44 from osm_lcm
.data_utils
.vca
import (
53 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
54 from osm_lcm
.lcm_utils
import (
61 check_juju_bundle_existence
,
62 get_charm_artifact_path
,
65 from osm_lcm
.data_utils
.nsd
import (
66 get_ns_configuration_relation_list
,
70 from osm_lcm
.data_utils
.vnfd
import (
76 get_ee_sorted_initial_config_primitive_list
,
77 get_ee_sorted_terminate_config_primitive_list
,
79 get_virtual_link_profiles
,
84 get_number_of_instances
,
86 get_kdu_resource_profile
,
87 find_software_version
,
90 from osm_lcm
.data_utils
.list_utils
import find_in_list
91 from osm_lcm
.data_utils
.vnfr
import (
95 get_volumes_from_instantiation_params
,
97 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
98 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
99 from n2vc
.definitions
import RelationEndpoint
100 from n2vc
.k8s_helm_conn
import K8sHelmConnector
101 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
102 from n2vc
.k8s_juju_conn
import K8sJujuConnector
104 from osm_common
.dbbase
import DbException
105 from osm_common
.fsbase
import FsException
107 from osm_lcm
.data_utils
.database
.database
import Database
108 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
109 from osm_lcm
.data_utils
.wim
import (
111 get_target_wim_attrs
,
112 select_feasible_wim_account
,
115 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
116 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
118 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
119 from osm_lcm
.osm_config
import OsmConfigBuilder
120 from osm_lcm
.prometheus
import parse_job
122 from copy
import copy
, deepcopy
123 from time
import time
124 from uuid
import uuid4
126 from random
import randint
128 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
131 class NsLcm(LcmBase
):
132 SUBOPERATION_STATUS_NOT_FOUND
= -1
133 SUBOPERATION_STATUS_NEW
= -2
134 SUBOPERATION_STATUS_SKIP
= -3
135 task_name_deploy_vca
= "Deploying VCA"
137 def __init__(self
, msg
, lcm_tasks
, config
: LcmCfg
, loop
):
139 Init, Connect to database, filesystem storage, and messaging
140 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
143 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
145 self
.db
= Database().instance
.db
146 self
.fs
= Filesystem().instance
.fs
148 self
.lcm_tasks
= lcm_tasks
149 self
.timeout
= config
.timeout
150 self
.ro_config
= config
.RO
151 self
.vca_config
= config
.VCA
153 # create N2VC connector
154 self
.n2vc
= N2VCJujuConnector(
157 on_update_db
=self
._on
_update
_n
2vc
_db
,
162 self
.conn_helm_ee
= LCMHelmConn(
165 vca_config
=self
.vca_config
,
166 on_update_db
=self
._on
_update
_n
2vc
_db
,
169 self
.k8sclusterhelm2
= K8sHelmConnector(
170 kubectl_command
=self
.vca_config
.kubectlpath
,
171 helm_command
=self
.vca_config
.helmpath
,
178 self
.k8sclusterhelm3
= K8sHelm3Connector(
179 kubectl_command
=self
.vca_config
.kubectlpath
,
180 helm_command
=self
.vca_config
.helm3path
,
187 self
.k8sclusterjuju
= K8sJujuConnector(
188 kubectl_command
=self
.vca_config
.kubectlpath
,
189 juju_command
=self
.vca_config
.jujupath
,
192 on_update_db
=self
._on
_update
_k
8s
_db
,
197 self
.k8scluster_map
= {
198 "helm-chart": self
.k8sclusterhelm2
,
199 "helm-chart-v3": self
.k8sclusterhelm3
,
200 "chart": self
.k8sclusterhelm3
,
201 "juju-bundle": self
.k8sclusterjuju
,
202 "juju": self
.k8sclusterjuju
,
206 "lxc_proxy_charm": self
.n2vc
,
207 "native_charm": self
.n2vc
,
208 "k8s_proxy_charm": self
.n2vc
,
209 "helm": self
.conn_helm_ee
,
210 "helm-v3": self
.conn_helm_ee
,
214 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
.to_dict())
216 self
.op_status_map
= {
217 "instantiation": self
.RO
.status
,
218 "termination": self
.RO
.status
,
219 "migrate": self
.RO
.status
,
220 "healing": self
.RO
.recreate_status
,
221 "verticalscale": self
.RO
.status
,
222 "start_stop_rebuild": self
.RO
.status
,
226 def increment_ip_mac(ip_mac
, vm_index
=1):
227 if not isinstance(ip_mac
, str):
230 # try with ipv4 look for last dot
231 i
= ip_mac
.rfind(".")
234 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
235 # try with ipv6 or mac look for last colon. Operate in hex
236 i
= ip_mac
.rfind(":")
239 # format in hex, len can be 2 for mac or 4 for ipv6
240 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
241 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
247 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
249 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
252 # TODO filter RO descriptor fields...
256 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
257 db_dict
["deploymentStatus"] = ro_descriptor
258 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
260 except Exception as e
:
262 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id
, e
)
265 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
267 # remove last dot from path (if exists)
268 if path
.endswith("."):
271 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
272 # .format(table, filter, path, updated_data))
275 nsr_id
= filter.get("_id")
277 # read ns record from database
278 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
279 current_ns_status
= nsr
.get("nsState")
281 # get vca status for NS
282 status_dict
= await self
.n2vc
.get_status(
283 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
288 db_dict
["vcaStatus"] = status_dict
290 # update configurationStatus for this VCA
292 vca_index
= int(path
[path
.rfind(".") + 1 :])
295 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
297 vca_status
= vca_list
[vca_index
].get("status")
299 configuration_status_list
= nsr
.get("configurationStatus")
300 config_status
= configuration_status_list
[vca_index
].get("status")
302 if config_status
== "BROKEN" and vca_status
!= "failed":
303 db_dict
["configurationStatus"][vca_index
] = "READY"
304 elif config_status
!= "BROKEN" and vca_status
== "failed":
305 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
306 except Exception as e
:
307 # not update configurationStatus
308 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
310 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
311 # if nsState = 'DEGRADED' check if all is OK
313 if current_ns_status
in ("READY", "DEGRADED"):
314 error_description
= ""
316 if status_dict
.get("machines"):
317 for machine_id
in status_dict
.get("machines"):
318 machine
= status_dict
.get("machines").get(machine_id
)
319 # check machine agent-status
320 if machine
.get("agent-status"):
321 s
= machine
.get("agent-status").get("status")
324 error_description
+= (
325 "machine {} agent-status={} ; ".format(
329 # check machine instance status
330 if machine
.get("instance-status"):
331 s
= machine
.get("instance-status").get("status")
334 error_description
+= (
335 "machine {} instance-status={} ; ".format(
340 if status_dict
.get("applications"):
341 for app_id
in status_dict
.get("applications"):
342 app
= status_dict
.get("applications").get(app_id
)
343 # check application status
344 if app
.get("status"):
345 s
= app
.get("status").get("status")
348 error_description
+= (
349 "application {} status={} ; ".format(app_id
, s
)
352 if error_description
:
353 db_dict
["errorDescription"] = error_description
354 if current_ns_status
== "READY" and is_degraded
:
355 db_dict
["nsState"] = "DEGRADED"
356 if current_ns_status
== "DEGRADED" and not is_degraded
:
357 db_dict
["nsState"] = "READY"
360 self
.update_db_2("nsrs", nsr_id
, db_dict
)
362 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
364 except Exception as e
:
365 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
367 async def _on_update_k8s_db(
368 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None, cluster_type
="juju"
371 Updating vca status in NSR record
372 :param cluster_uuid: UUID of a k8s cluster
373 :param kdu_instance: The unique name of the KDU instance
374 :param filter: To get nsr_id
375 :cluster_type: The cluster type (juju, k8s)
379 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
380 # .format(cluster_uuid, kdu_instance, filter))
382 nsr_id
= filter.get("_id")
384 vca_status
= await self
.k8scluster_map
[cluster_type
].status_kdu(
385 cluster_uuid
=cluster_uuid
,
386 kdu_instance
=kdu_instance
,
388 complete_status
=True,
394 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
397 f
"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
401 self
.update_db_2("nsrs", nsr_id
, db_dict
)
402 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
404 except Exception as e
:
405 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
408 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
411 undefined
=StrictUndefined
,
412 autoescape
=select_autoescape(default_for_string
=True, default
=True),
414 template
= env
.from_string(cloud_init_text
)
415 return template
.render(additional_params
or {})
416 except UndefinedError
as e
:
418 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
419 "file, must be provided in the instantiation parameters inside the "
420 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
422 except (TemplateError
, TemplateNotFound
) as e
:
424 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
429 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
430 cloud_init_content
= cloud_init_file
= None
432 if vdu
.get("cloud-init-file"):
433 base_folder
= vnfd
["_admin"]["storage"]
434 if base_folder
["pkg-dir"]:
435 cloud_init_file
= "{}/{}/cloud_init/{}".format(
436 base_folder
["folder"],
437 base_folder
["pkg-dir"],
438 vdu
["cloud-init-file"],
441 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
442 base_folder
["folder"],
443 vdu
["cloud-init-file"],
445 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
446 cloud_init_content
= ci_file
.read()
447 elif vdu
.get("cloud-init"):
448 cloud_init_content
= vdu
["cloud-init"]
450 return cloud_init_content
451 except FsException
as e
:
453 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
454 vnfd
["id"], vdu
["id"], cloud_init_file
, e
458 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
460 (vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]), {}
462 additional_params
= vdur
.get("additionalParams")
463 return parse_yaml_strings(additional_params
)
465 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
467 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
468 :param vnfd: input vnfd
469 :param new_id: overrides vnf id if provided
470 :param additionalParams: Instantiation params for VNFs provided
471 :param nsrId: Id of the NSR
472 :return: copy of vnfd
474 vnfd_RO
= deepcopy(vnfd
)
475 # remove unused by RO configuration, monitoring, scaling and internal keys
476 vnfd_RO
.pop("_id", None)
477 vnfd_RO
.pop("_admin", None)
478 vnfd_RO
.pop("monitoring-param", None)
479 vnfd_RO
.pop("scaling-group-descriptor", None)
480 vnfd_RO
.pop("kdu", None)
481 vnfd_RO
.pop("k8s-cluster", None)
483 vnfd_RO
["id"] = new_id
485 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
486 for vdu
in get_iterable(vnfd_RO
, "vdu"):
487 vdu
.pop("cloud-init-file", None)
488 vdu
.pop("cloud-init", None)
492 def ip_profile_2_RO(ip_profile
):
493 RO_ip_profile
= deepcopy(ip_profile
)
494 if "dns-server" in RO_ip_profile
:
495 if isinstance(RO_ip_profile
["dns-server"], list):
496 RO_ip_profile
["dns-address"] = []
497 for ds
in RO_ip_profile
.pop("dns-server"):
498 RO_ip_profile
["dns-address"].append(ds
["address"])
500 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
501 if RO_ip_profile
.get("ip-version") == "ipv4":
502 RO_ip_profile
["ip-version"] = "IPv4"
503 if RO_ip_profile
.get("ip-version") == "ipv6":
504 RO_ip_profile
["ip-version"] = "IPv6"
505 if "dhcp-params" in RO_ip_profile
:
506 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
509 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
510 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
511 if db_vim
["_admin"]["operationalState"] != "ENABLED":
513 "VIM={} is not available. operationalState={}".format(
514 vim_account
, db_vim
["_admin"]["operationalState"]
517 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
520 def get_ro_wim_id_for_wim_account(self
, wim_account
):
521 if isinstance(wim_account
, str):
522 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
523 if db_wim
["_admin"]["operationalState"] != "ENABLED":
525 "WIM={} is not available. operationalState={}".format(
526 wim_account
, db_wim
["_admin"]["operationalState"]
529 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
534 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
536 db_vdu_push_list
= []
538 db_update
= {"_admin.modified": time()}
540 for vdu_id
, vdu_count
in vdu_create
.items():
544 for vdur
in reversed(db_vnfr
["vdur"])
545 if vdur
["vdu-id-ref"] == vdu_id
550 # Read the template saved in the db:
552 "No vdur in the database. Using the vdur-template to scale"
554 vdur_template
= db_vnfr
.get("vdur-template")
555 if not vdur_template
:
557 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
561 vdur
= vdur_template
[0]
562 # Delete a template from the database after using it
565 {"_id": db_vnfr
["_id"]},
567 pull
={"vdur-template": {"_id": vdur
["_id"]}},
569 for count
in range(vdu_count
):
570 vdur_copy
= deepcopy(vdur
)
571 vdur_copy
["status"] = "BUILD"
572 vdur_copy
["status-detailed"] = None
573 vdur_copy
["ip-address"] = None
574 vdur_copy
["_id"] = str(uuid4())
575 vdur_copy
["count-index"] += count
+ 1
576 vdur_copy
["id"] = "{}-{}".format(
577 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
579 vdur_copy
.pop("vim_info", None)
580 for iface
in vdur_copy
["interfaces"]:
581 if iface
.get("fixed-ip"):
582 iface
["ip-address"] = self
.increment_ip_mac(
583 iface
["ip-address"], count
+ 1
586 iface
.pop("ip-address", None)
587 if iface
.get("fixed-mac"):
588 iface
["mac-address"] = self
.increment_ip_mac(
589 iface
["mac-address"], count
+ 1
592 iface
.pop("mac-address", None)
596 ) # only first vdu can be managment of vnf
597 db_vdu_push_list
.append(vdur_copy
)
598 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
600 if len(db_vnfr
["vdur"]) == 1:
601 # The scale will move to 0 instances
603 "Scaling to 0 !, creating the template with the last vdur"
605 template_vdur
= [db_vnfr
["vdur"][0]]
606 for vdu_id
, vdu_count
in vdu_delete
.items():
608 indexes_to_delete
= [
610 for iv
in enumerate(db_vnfr
["vdur"])
611 if iv
[1]["vdu-id-ref"] == vdu_id
615 "vdur.{}.status".format(i
): "DELETING"
616 for i
in indexes_to_delete
[-vdu_count
:]
620 # it must be deleted one by one because common.db does not allow otherwise
623 for v
in reversed(db_vnfr
["vdur"])
624 if v
["vdu-id-ref"] == vdu_id
626 for vdu
in vdus_to_delete
[:vdu_count
]:
629 {"_id": db_vnfr
["_id"]},
631 pull
={"vdur": {"_id": vdu
["_id"]}},
635 db_push
["vdur"] = db_vdu_push_list
637 db_push
["vdur-template"] = template_vdur
640 db_vnfr
["vdur-template"] = template_vdur
641 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
642 # modify passed dictionary db_vnfr
643 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
644 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
646 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
648 Updates database nsr with the RO info for the created vld
649 :param ns_update_nsr: dictionary to be filled with the updated info
650 :param db_nsr: content of db_nsr. This is also modified
651 :param nsr_desc_RO: nsr descriptor from RO
652 :return: Nothing, LcmException is raised on errors
655 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
656 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
657 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
659 vld
["vim-id"] = net_RO
.get("vim_net_id")
660 vld
["name"] = net_RO
.get("vim_name")
661 vld
["status"] = net_RO
.get("status")
662 vld
["status-detailed"] = net_RO
.get("error_msg")
663 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
667 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
670 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
672 for db_vnfr
in db_vnfrs
.values():
673 vnfr_update
= {"status": "ERROR"}
674 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
675 if "status" not in vdur
:
676 vdur
["status"] = "ERROR"
677 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
679 vdur
["status-detailed"] = str(error_text
)
681 "vdur.{}.status-detailed".format(vdu_index
)
683 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
684 except DbException
as e
:
685 self
.logger
.error("Cannot update vnf. {}".format(e
))
687 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
689 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
690 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
691 :param nsr_desc_RO: nsr descriptor from RO
692 :return: Nothing, LcmException is raised on errors
694 for vnf_index
, db_vnfr
in db_vnfrs
.items():
695 for vnf_RO
in nsr_desc_RO
["vnfs"]:
696 if vnf_RO
["member_vnf_index"] != vnf_index
:
699 if vnf_RO
.get("ip_address"):
700 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
[
703 elif not db_vnfr
.get("ip-address"):
704 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
705 raise LcmExceptionNoMgmtIP(
706 "ns member_vnf_index '{}' has no IP address".format(
711 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
712 vdur_RO_count_index
= 0
713 if vdur
.get("pdu-type"):
715 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
716 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
718 if vdur
["count-index"] != vdur_RO_count_index
:
719 vdur_RO_count_index
+= 1
721 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
722 if vdur_RO
.get("ip_address"):
723 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
725 vdur
["ip-address"] = None
726 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
727 vdur
["name"] = vdur_RO
.get("vim_name")
728 vdur
["status"] = vdur_RO
.get("status")
729 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
730 for ifacer
in get_iterable(vdur
, "interfaces"):
731 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
732 if ifacer
["name"] == interface_RO
.get("internal_name"):
733 ifacer
["ip-address"] = interface_RO
.get(
736 ifacer
["mac-address"] = interface_RO
.get(
742 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
743 "from VIM info".format(
744 vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]
747 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
751 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
753 vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]
757 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
758 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
759 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
761 vld
["vim-id"] = net_RO
.get("vim_net_id")
762 vld
["name"] = net_RO
.get("vim_name")
763 vld
["status"] = net_RO
.get("status")
764 vld
["status-detailed"] = net_RO
.get("error_msg")
765 vnfr_update
["vld.{}".format(vld_index
)] = vld
769 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
774 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
779 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
784 def _get_ns_config_info(self
, nsr_id
):
786 Generates a mapping between vnf,vdu elements and the N2VC id
787 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
788 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
789 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
790 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
792 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
793 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
795 ns_config_info
= {"osm-config-mapping": mapping
}
796 for vca
in vca_deployed_list
:
797 if not vca
["member-vnf-index"]:
799 if not vca
["vdu_id"]:
800 mapping
[vca
["member-vnf-index"]] = vca
["application"]
804 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
806 ] = vca
["application"]
807 return ns_config_info
809 async def _instantiate_ng_ro(
826 def get_vim_account(vim_account_id
):
828 if vim_account_id
in db_vims
:
829 return db_vims
[vim_account_id
]
830 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
831 db_vims
[vim_account_id
] = db_vim
834 # modify target_vld info with instantiation parameters
835 def parse_vld_instantiation_params(
836 target_vim
, target_vld
, vld_params
, target_sdn
838 if vld_params
.get("ip-profile"):
839 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
[
842 if vld_params
.get("provider-network"):
843 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
846 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
847 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
851 # check if WIM is needed; if needed, choose a feasible WIM able to connect VIMs
852 # if wim_account_id is specified in vld_params, validate if it is feasible.
853 wim_account_id
, db_wim
= select_feasible_wim_account(
854 db_nsr
, db_vnfrs
, target_vld
, vld_params
, self
.logger
858 # WIM is needed and a feasible one was found, populate WIM target and SDN ports
859 self
.logger
.info("WIM selected: {:s}".format(str(wim_account_id
)))
860 # update vld_params with correct WIM account Id
861 vld_params
["wimAccountId"] = wim_account_id
863 target_wim
= "wim:{}".format(wim_account_id
)
864 target_wim_attrs
= get_target_wim_attrs(nsr_id
, target_vld
, vld_params
)
865 sdn_ports
= get_sdn_ports(vld_params
, db_wim
)
866 if len(sdn_ports
) > 0:
867 target_vld
["vim_info"][target_wim
] = target_wim_attrs
868 target_vld
["vim_info"][target_wim
]["sdn-ports"] = sdn_ports
871 "Target VLD with WIM data: {:s}".format(str(target_vld
))
874 for param
in ("vim-network-name", "vim-network-id"):
875 if vld_params
.get(param
):
876 if isinstance(vld_params
[param
], dict):
877 for vim
, vim_net
in vld_params
[param
].items():
878 other_target_vim
= "vim:" + vim
880 target_vld
["vim_info"],
881 (other_target_vim
, param
.replace("-", "_")),
884 else: # isinstance str
885 target_vld
["vim_info"][target_vim
][
886 param
.replace("-", "_")
887 ] = vld_params
[param
]
888 if vld_params
.get("common_id"):
889 target_vld
["common_id"] = vld_params
.get("common_id")
891 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
892 def update_ns_vld_target(target
, ns_params
):
893 for vnf_params
in ns_params
.get("vnf", ()):
894 if vnf_params
.get("vimAccountId"):
898 for vnfr
in db_vnfrs
.values()
899 if vnf_params
["member-vnf-index"]
900 == vnfr
["member-vnf-index-ref"]
904 vdur
= next((vdur
for vdur
in target_vnf
.get("vdur", ())), None)
907 for a_index
, a_vld
in enumerate(target
["ns"]["vld"]):
908 target_vld
= find_in_list(
909 get_iterable(vdur
, "interfaces"),
910 lambda iface
: iface
.get("ns-vld-id") == a_vld
["name"],
913 vld_params
= find_in_list(
914 get_iterable(ns_params
, "vld"),
915 lambda v_vld
: v_vld
["name"] in (a_vld
["name"], a_vld
["id"]),
919 if vnf_params
.get("vimAccountId") not in a_vld
.get(
922 target_vim_network_list
= [
923 v
for _
, v
in a_vld
.get("vim_info").items()
925 target_vim_network_name
= next(
927 item
.get("vim_network_name", "")
928 for item
in target_vim_network_list
933 target
["ns"]["vld"][a_index
].get("vim_info").update(
935 "vim:{}".format(vnf_params
["vimAccountId"]): {
936 "vim_network_name": target_vim_network_name
,
942 for param
in ("vim-network-name", "vim-network-id"):
943 if vld_params
.get(param
) and isinstance(
944 vld_params
[param
], dict
946 for vim
, vim_net
in vld_params
[
949 other_target_vim
= "vim:" + vim
951 target
["ns"]["vld"][a_index
].get(
956 param
.replace("-", "_"),
961 nslcmop_id
= db_nslcmop
["_id"]
963 "name": db_nsr
["name"],
966 "image": deepcopy(db_nsr
["image"]),
967 "flavor": deepcopy(db_nsr
["flavor"]),
968 "action_id": nslcmop_id
,
969 "cloud_init_content": {},
971 for image
in target
["image"]:
972 image
["vim_info"] = {}
973 for flavor
in target
["flavor"]:
974 flavor
["vim_info"] = {}
975 if db_nsr
.get("affinity-or-anti-affinity-group"):
976 target
["affinity-or-anti-affinity-group"] = deepcopy(
977 db_nsr
["affinity-or-anti-affinity-group"]
979 for affinity_or_anti_affinity_group
in target
[
980 "affinity-or-anti-affinity-group"
982 affinity_or_anti_affinity_group
["vim_info"] = {}
984 if db_nslcmop
.get("lcmOperationType") != "instantiate":
985 # get parameters of instantiation:
986 db_nslcmop_instantiate
= self
.db
.get_list(
989 "nsInstanceId": db_nslcmop
["nsInstanceId"],
990 "lcmOperationType": "instantiate",
993 ns_params
= db_nslcmop_instantiate
.get("operationParams")
995 ns_params
= db_nslcmop
.get("operationParams")
996 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
997 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
1000 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
1001 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
1004 "name": vld
["name"],
1005 "mgmt-network": vld
.get("mgmt-network", False),
1006 "type": vld
.get("type"),
1009 "vim_network_name": vld
.get("vim-network-name"),
1010 "vim_account_id": ns_params
["vimAccountId"],
1014 # check if this network needs SDN assist
1015 if vld
.get("pci-interfaces"):
1016 db_vim
= get_vim_account(ns_params
["vimAccountId"])
1017 sdnc_id
= db_vim
["config"].get("sdn-controller")
1019 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
1020 target_sdn
= "sdn:{}".format(sdnc_id
)
1021 target_vld
["vim_info"][target_sdn
] = {
1023 "target_vim": target_vim
,
1025 "type": vld
.get("type"),
1028 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
1029 for nsd_vnf_profile
in nsd_vnf_profiles
:
1030 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
1031 if cp
["virtual-link-profile-id"] == vld
["id"]:
1033 "member_vnf:{}.{}".format(
1034 cp
["constituent-cpd-id"][0][
1035 "constituent-base-element-id"
1037 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
1039 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
1041 # check at nsd descriptor, if there is an ip-profile
1043 nsd_vlp
= find_in_list(
1044 get_virtual_link_profiles(nsd
),
1045 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
1050 and nsd_vlp
.get("virtual-link-protocol-data")
1051 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1053 ip_profile_source_data
= nsd_vlp
["virtual-link-protocol-data"][
1056 ip_profile_dest_data
= {}
1057 if "ip-version" in ip_profile_source_data
:
1058 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1061 if "cidr" in ip_profile_source_data
:
1062 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1065 if "gateway-ip" in ip_profile_source_data
:
1066 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
[
1069 if "dhcp-enabled" in ip_profile_source_data
:
1070 ip_profile_dest_data
["dhcp-params"] = {
1071 "enabled": ip_profile_source_data
["dhcp-enabled"]
1073 vld_params
["ip-profile"] = ip_profile_dest_data
1075 # update vld_params with instantiation params
1076 vld_instantiation_params
= find_in_list(
1077 get_iterable(ns_params
, "vld"),
1078 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
1080 if vld_instantiation_params
:
1081 vld_params
.update(vld_instantiation_params
)
1082 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
1083 target
["ns"]["vld"].append(target_vld
)
1084 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1085 update_ns_vld_target(target
, ns_params
)
1087 for vnfr
in db_vnfrs
.values():
1088 vnfd
= find_in_list(
1089 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
1091 vnf_params
= find_in_list(
1092 get_iterable(ns_params
, "vnf"),
1093 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
1095 target_vnf
= deepcopy(vnfr
)
1096 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
1097 for vld
in target_vnf
.get("vld", ()):
1098 # check if connected to a ns.vld, to fill target'
1099 vnf_cp
= find_in_list(
1100 vnfd
.get("int-virtual-link-desc", ()),
1101 lambda cpd
: cpd
.get("id") == vld
["id"],
1104 ns_cp
= "member_vnf:{}.{}".format(
1105 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
1107 if cp2target
.get(ns_cp
):
1108 vld
["target"] = cp2target
[ns_cp
]
1111 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
1113 # check if this network needs SDN assist
1115 if vld
.get("pci-interfaces"):
1116 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1117 sdnc_id
= db_vim
["config"].get("sdn-controller")
1119 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
1120 target_sdn
= "sdn:{}".format(sdnc_id
)
1121 vld
["vim_info"][target_sdn
] = {
1123 "target_vim": target_vim
,
1125 "type": vld
.get("type"),
1128 # check at vnfd descriptor, if there is an ip-profile
1130 vnfd_vlp
= find_in_list(
1131 get_virtual_link_profiles(vnfd
),
1132 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
1136 and vnfd_vlp
.get("virtual-link-protocol-data")
1137 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1139 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"][
1142 ip_profile_dest_data
= {}
1143 if "ip-version" in ip_profile_source_data
:
1144 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1147 if "cidr" in ip_profile_source_data
:
1148 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1151 if "gateway-ip" in ip_profile_source_data
:
1152 ip_profile_dest_data
[
1154 ] = ip_profile_source_data
["gateway-ip"]
1155 if "dhcp-enabled" in ip_profile_source_data
:
1156 ip_profile_dest_data
["dhcp-params"] = {
1157 "enabled": ip_profile_source_data
["dhcp-enabled"]
1160 vld_params
["ip-profile"] = ip_profile_dest_data
1161 # update vld_params with instantiation params
1163 vld_instantiation_params
= find_in_list(
1164 get_iterable(vnf_params
, "internal-vld"),
1165 lambda i_vld
: i_vld
["name"] == vld
["id"],
1167 if vld_instantiation_params
:
1168 vld_params
.update(vld_instantiation_params
)
1169 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
1172 for vdur
in target_vnf
.get("vdur", ()):
1173 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
1174 continue # This vdu must not be created
1175 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
1177 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
1180 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
1181 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
1184 and vdu_configuration
.get("config-access")
1185 and vdu_configuration
.get("config-access").get("ssh-access")
1187 vdur
["ssh-keys"] = ssh_keys_all
1188 vdur
["ssh-access-required"] = vdu_configuration
[
1190 ]["ssh-access"]["required"]
1193 and vnf_configuration
.get("config-access")
1194 and vnf_configuration
.get("config-access").get("ssh-access")
1195 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
1197 vdur
["ssh-keys"] = ssh_keys_all
1198 vdur
["ssh-access-required"] = vnf_configuration
[
1200 ]["ssh-access"]["required"]
1201 elif ssh_keys_instantiation
and find_in_list(
1202 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
1204 vdur
["ssh-keys"] = ssh_keys_instantiation
1206 self
.logger
.debug("NS > vdur > {}".format(vdur
))
1208 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1210 if vdud
.get("cloud-init-file"):
1211 vdur
["cloud-init"] = "{}:file:{}".format(
1212 vnfd
["_id"], vdud
.get("cloud-init-file")
1214 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1215 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1216 base_folder
= vnfd
["_admin"]["storage"]
1217 if base_folder
["pkg-dir"]:
1218 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1219 base_folder
["folder"],
1220 base_folder
["pkg-dir"],
1221 vdud
.get("cloud-init-file"),
1224 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
1225 base_folder
["folder"],
1226 vdud
.get("cloud-init-file"),
1228 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1229 target
["cloud_init_content"][
1232 elif vdud
.get("cloud-init"):
1233 vdur
["cloud-init"] = "{}:vdu:{}".format(
1234 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1236 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1237 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1240 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1241 deploy_params_vdu
= self
._format
_additional
_params
(
1242 vdur
.get("additionalParams") or {}
1244 deploy_params_vdu
["OSM"] = get_osm_params(
1245 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1247 vdur
["additionalParams"] = deploy_params_vdu
1250 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1251 if target_vim
not in ns_flavor
["vim_info"]:
1252 ns_flavor
["vim_info"][target_vim
] = {}
1255 # in case alternative images are provided we must check if they should be applied
1256 # for the vim_type, modify the vim_type taking into account
1257 ns_image_id
= int(vdur
["ns-image-id"])
1258 if vdur
.get("alt-image-ids"):
1259 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1260 vim_type
= db_vim
["vim_type"]
1261 for alt_image_id
in vdur
.get("alt-image-ids"):
1262 ns_alt_image
= target
["image"][int(alt_image_id
)]
1263 if vim_type
== ns_alt_image
.get("vim-type"):
1264 # must use alternative image
1266 "use alternative image id: {}".format(alt_image_id
)
1268 ns_image_id
= alt_image_id
1269 vdur
["ns-image-id"] = ns_image_id
1271 ns_image
= target
["image"][int(ns_image_id
)]
1272 if target_vim
not in ns_image
["vim_info"]:
1273 ns_image
["vim_info"][target_vim
] = {}
1276 if vdur
.get("affinity-or-anti-affinity-group-id"):
1277 for ags_id
in vdur
["affinity-or-anti-affinity-group-id"]:
1278 ns_ags
= target
["affinity-or-anti-affinity-group"][int(ags_id
)]
1279 if target_vim
not in ns_ags
["vim_info"]:
1280 ns_ags
["vim_info"][target_vim
] = {}
1282 vdur
["vim_info"] = {target_vim
: {}}
1283 # instantiation parameters
1285 vdu_instantiation_params
= find_in_list(
1286 get_iterable(vnf_params
, "vdu"),
1287 lambda i_vdu
: i_vdu
["id"] == vdud
["id"],
1289 if vdu_instantiation_params
:
1290 # Parse the vdu_volumes from the instantiation params
1291 vdu_volumes
= get_volumes_from_instantiation_params(
1292 vdu_instantiation_params
, vdud
1294 vdur
["additionalParams"]["OSM"]["vdu_volumes"] = vdu_volumes
1295 vdur_list
.append(vdur
)
1296 target_vnf
["vdur"] = vdur_list
1297 target
["vnf"].append(target_vnf
)
1299 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
1300 desc
= await self
.RO
.deploy(nsr_id
, target
)
1301 self
.logger
.debug("RO return > {}".format(desc
))
1302 action_id
= desc
["action_id"]
1303 await self
._wait
_ng
_ro
(
1310 operation
="instantiation",
1315 "_admin.deployed.RO.operational-status": "running",
1316 "detailed-status": " ".join(stage
),
1318 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1319 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1320 self
._write
_op
_status
(nslcmop_id
, stage
)
1322 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1326 async def _wait_ng_ro(
1336 detailed_status_old
= None
1338 start_time
= start_time
or time()
1339 while time() <= start_time
+ timeout
:
1340 desc_status
= await self
.op_status_map
[operation
](nsr_id
, action_id
)
1341 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1342 if desc_status
["status"] == "FAILED":
1343 raise NgRoException(desc_status
["details"])
1344 elif desc_status
["status"] == "BUILD":
1346 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1347 elif desc_status
["status"] == "DONE":
1349 stage
[2] = "Deployed at VIM"
1352 assert False, "ROclient.check_ns_status returns unknown {}".format(
1353 desc_status
["status"]
1355 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1356 detailed_status_old
= stage
[2]
1357 db_nsr_update
["detailed-status"] = " ".join(stage
)
1358 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1359 self
._write
_op
_status
(nslcmop_id
, stage
)
1360 await asyncio
.sleep(15, loop
=self
.loop
)
1361 else: # timeout_ns_deploy
1362 raise NgRoException("Timeout waiting ns to deploy")
1364 async def _terminate_ng_ro(
1365 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1370 start_deploy
= time()
1377 "action_id": nslcmop_id
,
1379 desc
= await self
.RO
.deploy(nsr_id
, target
)
1380 action_id
= desc
["action_id"]
1381 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1382 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1385 + "ns terminate action at RO. action_id={}".format(action_id
)
1389 delete_timeout
= 20 * 60 # 20 minutes
1390 await self
._wait
_ng
_ro
(
1397 operation
="termination",
1400 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1401 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1403 await self
.RO
.delete(nsr_id
)
1404 except Exception as e
:
1405 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
1406 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1407 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1408 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1410 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1412 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
1413 failed_detail
.append("delete conflict: {}".format(e
))
1416 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1419 failed_detail
.append("delete error: {}".format(e
))
1422 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1426 stage
[2] = "Error deleting from VIM"
1428 stage
[2] = "Deleted from VIM"
1429 db_nsr_update
["detailed-status"] = " ".join(stage
)
1430 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1431 self
._write
_op
_status
(nslcmop_id
, stage
)
1434 raise LcmException("; ".join(failed_detail
))
1437 async def instantiate_RO(
1451 :param logging_text: preffix text to use at logging
1452 :param nsr_id: nsr identity
1453 :param nsd: database content of ns descriptor
1454 :param db_nsr: database content of ns record
1455 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1457 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1458 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1459 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1460 :return: None or exception
1463 start_deploy
= time()
1464 ns_params
= db_nslcmop
.get("operationParams")
1465 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1466 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1468 timeout_ns_deploy
= self
.timeout
.ns_deploy
1470 # Check for and optionally request placement optimization. Database will be updated if placement activated
1471 stage
[2] = "Waiting for Placement."
1472 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1473 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1474 for vnfr
in db_vnfrs
.values():
1475 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1478 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1480 return await self
._instantiate
_ng
_ro
(
1493 except Exception as e
:
1494 stage
[2] = "ERROR deploying at VIM"
1495 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1497 "Error deploying at VIM {}".format(e
),
1498 exc_info
=not isinstance(
1501 ROclient
.ROClientException
,
1510 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1512 Wait for kdu to be up, get ip address
1513 :param logging_text: prefix use for logging
1517 :return: IP address, K8s services
1520 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1523 while nb_tries
< 360:
1524 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1528 for x
in get_iterable(db_vnfr
, "kdur")
1529 if x
.get("kdu-name") == kdu_name
1535 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1537 if kdur
.get("status"):
1538 if kdur
["status"] in ("READY", "ENABLED"):
1539 return kdur
.get("ip-address"), kdur
.get("services")
1542 "target KDU={} is in error state".format(kdu_name
)
1545 await asyncio
.sleep(10, loop
=self
.loop
)
1547 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1549 async def wait_vm_up_insert_key_ro(
1550 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1553 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1554 :param logging_text: prefix use for logging
1559 :param pub_key: public ssh key to inject, None to skip
1560 :param user: user to apply the public ssh key
1564 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1568 target_vdu_id
= None
1574 if ro_retries
>= 360: # 1 hour
1576 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1579 await asyncio
.sleep(10, loop
=self
.loop
)
1582 if not target_vdu_id
:
1583 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1585 if not vdu_id
: # for the VNF case
1586 if db_vnfr
.get("status") == "ERROR":
1588 "Cannot inject ssh-key because target VNF is in error state"
1590 ip_address
= db_vnfr
.get("ip-address")
1596 for x
in get_iterable(db_vnfr
, "vdur")
1597 if x
.get("ip-address") == ip_address
1605 for x
in get_iterable(db_vnfr
, "vdur")
1606 if x
.get("vdu-id-ref") == vdu_id
1607 and x
.get("count-index") == vdu_index
1613 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1614 ): # If only one, this should be the target vdu
1615 vdur
= db_vnfr
["vdur"][0]
1618 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1619 vnfr_id
, vdu_id
, vdu_index
1622 # New generation RO stores information at "vim_info"
1625 if vdur
.get("vim_info"):
1627 t
for t
in vdur
["vim_info"]
1628 ) # there should be only one key
1629 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1631 vdur
.get("pdu-type")
1632 or vdur
.get("status") == "ACTIVE"
1633 or ng_ro_status
== "ACTIVE"
1635 ip_address
= vdur
.get("ip-address")
1638 target_vdu_id
= vdur
["vdu-id-ref"]
1639 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1641 "Cannot inject ssh-key because target VM is in error state"
1644 if not target_vdu_id
:
1647 # inject public key into machine
1648 if pub_key
and user
:
1649 self
.logger
.debug(logging_text
+ "Inserting RO key")
1650 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1651 if vdur
.get("pdu-type"):
1652 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1655 ro_vm_id
= "{}-{}".format(
1656 db_vnfr
["member-vnf-index-ref"], target_vdu_id
1657 ) # TODO add vdu_index
1658 if self
.ro_config
.ng
:
1661 "action": "inject_ssh_key",
1665 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1667 desc
= await self
.RO
.deploy(nsr_id
, target
)
1668 action_id
= desc
["action_id"]
1669 await self
._wait
_ng
_ro
(
1670 nsr_id
, action_id
, timeout
=600, operation
="instantiation"
1674 # wait until NS is deployed at RO
1676 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1677 ro_nsr_id
= deep_get(
1678 db_nsrs
, ("_admin", "deployed", "RO", "nsr_id")
1682 result_dict
= await self
.RO
.create_action(
1684 item_id_name
=ro_nsr_id
,
1686 "add_public_key": pub_key
,
1691 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1692 if not result_dict
or not isinstance(result_dict
, dict):
1694 "Unknown response from RO when injecting key"
1696 for result
in result_dict
.values():
1697 if result
.get("vim_result") == 200:
1700 raise ROclient
.ROClientException(
1701 "error injecting key: {}".format(
1702 result
.get("description")
1706 except NgRoException
as e
:
1708 "Reaching max tries injecting key. Error: {}".format(e
)
1710 except ROclient
.ROClientException
as e
:
1714 + "error injecting key: {}. Retrying until {} seconds".format(
1721 "Reaching max tries injecting key. Error: {}".format(e
)
1728 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1730 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1732 my_vca
= vca_deployed_list
[vca_index
]
1733 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1734 # vdu or kdu: no dependencies
1738 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1739 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1740 configuration_status_list
= db_nsr
["configurationStatus"]
1741 for index
, vca_deployed
in enumerate(configuration_status_list
):
1742 if index
== vca_index
:
1745 if not my_vca
.get("member-vnf-index") or (
1746 vca_deployed
.get("member-vnf-index")
1747 == my_vca
.get("member-vnf-index")
1749 internal_status
= configuration_status_list
[index
].get("status")
1750 if internal_status
== "READY":
1752 elif internal_status
== "BROKEN":
1754 "Configuration aborted because dependent charm/s has failed"
1759 # no dependencies, return
1761 await asyncio
.sleep(10)
1764 raise LcmException("Configuration aborted because dependent charm/s timeout")
1766 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1769 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1771 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1772 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1775 async def instantiate_N2VC(
1793 ee_config_descriptor
,
1795 nsr_id
= db_nsr
["_id"]
1796 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1797 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1798 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1799 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1801 "collection": "nsrs",
1802 "filter": {"_id": nsr_id
},
1803 "path": db_update_entry
,
1809 element_under_configuration
= nsr_id
1813 vnfr_id
= db_vnfr
["_id"]
1814 osm_config
["osm"]["vnf_id"] = vnfr_id
1816 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1818 if vca_type
== "native_charm":
1821 index_number
= vdu_index
or 0
1824 element_type
= "VNF"
1825 element_under_configuration
= vnfr_id
1826 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1828 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1829 element_type
= "VDU"
1830 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1831 osm_config
["osm"]["vdu_id"] = vdu_id
1833 namespace
+= ".{}".format(kdu_name
)
1834 element_type
= "KDU"
1835 element_under_configuration
= kdu_name
1836 osm_config
["osm"]["kdu_name"] = kdu_name
1839 if base_folder
["pkg-dir"]:
1840 artifact_path
= "{}/{}/{}/{}".format(
1841 base_folder
["folder"],
1842 base_folder
["pkg-dir"],
1845 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1850 artifact_path
= "{}/Scripts/{}/{}/".format(
1851 base_folder
["folder"],
1854 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1859 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1861 # get initial_config_primitive_list that applies to this element
1862 initial_config_primitive_list
= config_descriptor
.get(
1863 "initial-config-primitive"
1867 "Initial config primitive list > {}".format(
1868 initial_config_primitive_list
1872 # add config if not present for NS charm
1873 ee_descriptor_id
= ee_config_descriptor
.get("id")
1874 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1875 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1876 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1880 "Initial config primitive list #2 > {}".format(
1881 initial_config_primitive_list
1884 # n2vc_redesign STEP 3.1
1885 # find old ee_id if exists
1886 ee_id
= vca_deployed
.get("ee_id")
1888 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1889 # create or register execution environment in VCA
1890 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1892 self
._write
_configuration
_status
(
1894 vca_index
=vca_index
,
1896 element_under_configuration
=element_under_configuration
,
1897 element_type
=element_type
,
1900 step
= "create execution environment"
1901 self
.logger
.debug(logging_text
+ step
)
1905 if vca_type
== "k8s_proxy_charm":
1906 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1907 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1908 namespace
=namespace
,
1909 artifact_path
=artifact_path
,
1913 elif vca_type
== "helm" or vca_type
== "helm-v3":
1914 ee_id
, credentials
= await self
.vca_map
[
1916 ].create_execution_environment(
1917 namespace
=namespace
,
1921 artifact_path
=artifact_path
,
1922 chart_model
=vca_name
,
1926 ee_id
, credentials
= await self
.vca_map
[
1928 ].create_execution_environment(
1929 namespace
=namespace
,
1935 elif vca_type
== "native_charm":
1936 step
= "Waiting to VM being up and getting IP address"
1937 self
.logger
.debug(logging_text
+ step
)
1938 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1947 credentials
= {"hostname": rw_mgmt_ip
}
1949 username
= deep_get(
1950 config_descriptor
, ("config-access", "ssh-access", "default-user")
1952 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1953 # merged. Meanwhile let's get username from initial-config-primitive
1954 if not username
and initial_config_primitive_list
:
1955 for config_primitive
in initial_config_primitive_list
:
1956 for param
in config_primitive
.get("parameter", ()):
1957 if param
["name"] == "ssh-username":
1958 username
= param
["value"]
1962 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1963 "'config-access.ssh-access.default-user'"
1965 credentials
["username"] = username
1966 # n2vc_redesign STEP 3.2
1968 self
._write
_configuration
_status
(
1970 vca_index
=vca_index
,
1971 status
="REGISTERING",
1972 element_under_configuration
=element_under_configuration
,
1973 element_type
=element_type
,
1976 step
= "register execution environment {}".format(credentials
)
1977 self
.logger
.debug(logging_text
+ step
)
1978 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1979 credentials
=credentials
,
1980 namespace
=namespace
,
1985 # for compatibility with MON/POL modules, the need model and application name at database
1986 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1987 ee_id_parts
= ee_id
.split(".")
1988 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1989 if len(ee_id_parts
) >= 2:
1990 model_name
= ee_id_parts
[0]
1991 application_name
= ee_id_parts
[1]
1992 db_nsr_update
[db_update_entry
+ "model"] = model_name
1993 db_nsr_update
[db_update_entry
+ "application"] = application_name
1995 # n2vc_redesign STEP 3.3
1996 step
= "Install configuration Software"
1998 self
._write
_configuration
_status
(
2000 vca_index
=vca_index
,
2001 status
="INSTALLING SW",
2002 element_under_configuration
=element_under_configuration
,
2003 element_type
=element_type
,
2004 other_update
=db_nsr_update
,
2007 # TODO check if already done
2008 self
.logger
.debug(logging_text
+ step
)
2010 if vca_type
== "native_charm":
2011 config_primitive
= next(
2012 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
2015 if config_primitive
:
2016 config
= self
._map
_primitive
_params
(
2017 config_primitive
, {}, deploy_params
2020 if vca_type
== "lxc_proxy_charm":
2021 if element_type
== "NS":
2022 num_units
= db_nsr
.get("config-units") or 1
2023 elif element_type
== "VNF":
2024 num_units
= db_vnfr
.get("config-units") or 1
2025 elif element_type
== "VDU":
2026 for v
in db_vnfr
["vdur"]:
2027 if vdu_id
== v
["vdu-id-ref"]:
2028 num_units
= v
.get("config-units") or 1
2030 if vca_type
!= "k8s_proxy_charm":
2031 await self
.vca_map
[vca_type
].install_configuration_sw(
2033 artifact_path
=artifact_path
,
2036 num_units
=num_units
,
2041 # write in db flag of configuration_sw already installed
2043 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
2046 # add relations for this VCA (wait for other peers related with this VCA)
2047 await self
._add
_vca
_relations
(
2048 logging_text
=logging_text
,
2051 vca_index
=vca_index
,
2054 # if SSH access is required, then get execution environment SSH public
2055 # if native charm we have waited already to VM be UP
2056 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
2059 # self.logger.debug("get ssh key block")
2061 config_descriptor
, ("config-access", "ssh-access", "required")
2063 # self.logger.debug("ssh key needed")
2064 # Needed to inject a ssh key
2067 ("config-access", "ssh-access", "default-user"),
2069 step
= "Install configuration Software, getting public ssh key"
2070 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
2071 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
2074 step
= "Insert public key into VM user={} ssh_key={}".format(
2078 # self.logger.debug("no need to get ssh key")
2079 step
= "Waiting to VM being up and getting IP address"
2080 self
.logger
.debug(logging_text
+ step
)
2082 # default rw_mgmt_ip to None, avoiding the non definition of the variable
2085 # n2vc_redesign STEP 5.1
2086 # wait for RO (ip-address) Insert pub_key into VM
2089 rw_mgmt_ip
, services
= await self
.wait_kdu_up(
2090 logging_text
, nsr_id
, vnfr_id
, kdu_name
2092 vnfd
= self
.db
.get_one(
2094 {"_id": f
'{db_vnfr["vnfd-id"]}:{db_vnfr["revision"]}'},
2096 kdu
= get_kdu(vnfd
, kdu_name
)
2098 service
["name"] for service
in get_kdu_services(kdu
)
2100 exposed_services
= []
2101 for service
in services
:
2102 if any(s
in service
["name"] for s
in kdu_services
):
2103 exposed_services
.append(service
)
2104 await self
.vca_map
[vca_type
].exec_primitive(
2106 primitive_name
="config",
2108 "osm-config": json
.dumps(
2110 k8s
={"services": exposed_services
}
2117 # This verification is needed in order to avoid trying to add a public key
2118 # to a VM, when the VNF is a KNF (in the edge case where the user creates a VCA
2119 # for a KNF and not for its KDUs, the previous verification gives False, and the code
2120 # jumps to this block, meaning that there is the need to verify if the VNF is actually a VNF
2122 elif db_vnfr
.get("vdur"):
2123 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
2133 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
2135 # store rw_mgmt_ip in deploy params for later replacement
2136 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
2138 # n2vc_redesign STEP 6 Execute initial config primitive
2139 step
= "execute initial config primitive"
2141 # wait for dependent primitives execution (NS -> VNF -> VDU)
2142 if initial_config_primitive_list
:
2143 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
2145 # stage, in function of element type: vdu, kdu, vnf or ns
2146 my_vca
= vca_deployed_list
[vca_index
]
2147 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
2149 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
2150 elif my_vca
.get("member-vnf-index"):
2152 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
2155 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
2157 self
._write
_configuration
_status
(
2158 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
2161 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2163 check_if_terminated_needed
= True
2164 for initial_config_primitive
in initial_config_primitive_list
:
2165 # adding information on the vca_deployed if it is a NS execution environment
2166 if not vca_deployed
["member-vnf-index"]:
2167 deploy_params
["ns_config_info"] = json
.dumps(
2168 self
._get
_ns
_config
_info
(nsr_id
)
2170 # TODO check if already done
2171 primitive_params_
= self
._map
_primitive
_params
(
2172 initial_config_primitive
, {}, deploy_params
2175 step
= "execute primitive '{}' params '{}'".format(
2176 initial_config_primitive
["name"], primitive_params_
2178 self
.logger
.debug(logging_text
+ step
)
2179 await self
.vca_map
[vca_type
].exec_primitive(
2181 primitive_name
=initial_config_primitive
["name"],
2182 params_dict
=primitive_params_
,
2187 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2188 if check_if_terminated_needed
:
2189 if config_descriptor
.get("terminate-config-primitive"):
2191 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
2193 check_if_terminated_needed
= False
2195 # TODO register in database that primitive is done
2197 # STEP 7 Configure metrics
2198 if vca_type
== "helm" or vca_type
== "helm-v3":
2199 # TODO: review for those cases where the helm chart is a reference and
2200 # is not part of the NF package
2201 prometheus_jobs
= await self
.extract_prometheus_scrape_jobs(
2203 artifact_path
=artifact_path
,
2204 ee_config_descriptor
=ee_config_descriptor
,
2207 target_ip
=rw_mgmt_ip
,
2208 vnf_member_index
=db_vnfr
.get("member-vnf-index-ref", ""),
2210 vdu_index
=vdu_index
,
2212 kdu_index
=kdu_index
,
2218 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
2221 for job
in prometheus_jobs
:
2224 {"job_name": job
["job_name"]},
2227 fail_on_empty
=False,
2230 step
= "instantiated at VCA"
2231 self
.logger
.debug(logging_text
+ step
)
2233 self
._write
_configuration
_status
(
2234 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
2237 except Exception as e
: # TODO not use Exception but N2VC exception
2238 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2240 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
2243 "Exception while {} : {}".format(step
, e
), exc_info
=True
2245 self
._write
_configuration
_status
(
2246 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2248 raise LcmException("{} {}".format(step
, e
)) from e
2250 def _write_ns_status(
2254 current_operation
: str,
2255 current_operation_id
: str,
2256 error_description
: str = None,
2257 error_detail
: str = None,
2258 other_update
: dict = None,
2261 Update db_nsr fields.
2264 :param current_operation:
2265 :param current_operation_id:
2266 :param error_description:
2267 :param error_detail:
2268 :param other_update: Other required changes at database if provided, will be cleared
2272 db_dict
= other_update
or {}
2275 ] = current_operation_id
# for backward compatibility
2276 db_dict
["_admin.current-operation"] = current_operation_id
2277 db_dict
["_admin.operation-type"] = (
2278 current_operation
if current_operation
!= "IDLE" else None
2280 db_dict
["currentOperation"] = current_operation
2281 db_dict
["currentOperationID"] = current_operation_id
2282 db_dict
["errorDescription"] = error_description
2283 db_dict
["errorDetail"] = error_detail
2286 db_dict
["nsState"] = ns_state
2287 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2288 except DbException
as e
:
2289 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2291 def _write_op_status(
2295 error_message
: str = None,
2296 queuePosition
: int = 0,
2297 operation_state
: str = None,
2298 other_update
: dict = None,
2301 db_dict
= other_update
or {}
2302 db_dict
["queuePosition"] = queuePosition
2303 if isinstance(stage
, list):
2304 db_dict
["stage"] = stage
[0]
2305 db_dict
["detailed-status"] = " ".join(stage
)
2306 elif stage
is not None:
2307 db_dict
["stage"] = str(stage
)
2309 if error_message
is not None:
2310 db_dict
["errorMessage"] = error_message
2311 if operation_state
is not None:
2312 db_dict
["operationState"] = operation_state
2313 db_dict
["statusEnteredTime"] = time()
2314 self
.update_db_2("nslcmops", op_id
, db_dict
)
2315 except DbException
as e
:
2317 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2320 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2322 nsr_id
= db_nsr
["_id"]
2323 # configurationStatus
2324 config_status
= db_nsr
.get("configurationStatus")
2327 "configurationStatus.{}.status".format(index
): status
2328 for index
, v
in enumerate(config_status
)
2332 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2334 except DbException
as e
:
2336 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2339 def _write_configuration_status(
2344 element_under_configuration
: str = None,
2345 element_type
: str = None,
2346 other_update
: dict = None,
2349 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2350 # .format(vca_index, status))
2353 db_path
= "configurationStatus.{}.".format(vca_index
)
2354 db_dict
= other_update
or {}
2356 db_dict
[db_path
+ "status"] = status
2357 if element_under_configuration
:
2359 db_path
+ "elementUnderConfiguration"
2360 ] = element_under_configuration
2362 db_dict
[db_path
+ "elementType"] = element_type
2363 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2364 except DbException
as e
:
2366 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2367 status
, nsr_id
, vca_index
, e
2371 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2373 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2374 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2375 Database is used because the result can be obtained from a different LCM worker in case of HA.
2376 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2377 :param db_nslcmop: database content of nslcmop
2378 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2379 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2380 computed 'vim-account-id'
2383 nslcmop_id
= db_nslcmop
["_id"]
2384 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2385 if placement_engine
== "PLA":
2387 logging_text
+ "Invoke and wait for placement optimization"
2389 await self
.msg
.aiowrite(
2390 "pla", "get_placement", {"nslcmopId": nslcmop_id
}, loop
=self
.loop
2392 db_poll_interval
= 5
2393 wait
= db_poll_interval
* 10
2395 while not pla_result
and wait
>= 0:
2396 await asyncio
.sleep(db_poll_interval
)
2397 wait
-= db_poll_interval
2398 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2399 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2403 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2406 for pla_vnf
in pla_result
["vnf"]:
2407 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2408 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2413 {"_id": vnfr
["_id"]},
2414 {"vim-account-id": pla_vnf
["vimAccountId"]},
2417 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2420 def update_nsrs_with_pla_result(self
, params
):
2422 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2424 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2426 except Exception as e
:
2427 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2429 async def instantiate(self
, nsr_id
, nslcmop_id
):
2432 :param nsr_id: ns instance to deploy
2433 :param nslcmop_id: operation to run
2437 # Try to lock HA task here
2438 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2439 if not task_is_locked_by_me
:
2441 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2445 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2446 self
.logger
.debug(logging_text
+ "Enter")
2448 # get all needed from database
2450 # database nsrs record
2453 # database nslcmops record
2456 # update operation on nsrs
2458 # update operation on nslcmops
2459 db_nslcmop_update
= {}
2461 nslcmop_operation_state
= None
2462 db_vnfrs
= {} # vnf's info indexed by member-index
2464 tasks_dict_info
= {} # from task to info text
2468 "Stage 1/5: preparation of the environment.",
2469 "Waiting for previous operations to terminate.",
2472 # ^ stage, step, VIM progress
2474 # wait for any previous tasks in process
2475 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2477 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2478 stage
[1] = "Reading from database."
2479 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2480 db_nsr_update
["detailed-status"] = "creating"
2481 db_nsr_update
["operational-status"] = "init"
2482 self
._write
_ns
_status
(
2484 ns_state
="BUILDING",
2485 current_operation
="INSTANTIATING",
2486 current_operation_id
=nslcmop_id
,
2487 other_update
=db_nsr_update
,
2489 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2491 # read from db: operation
2492 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2493 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2494 if db_nslcmop
["operationParams"].get("additionalParamsForVnf"):
2495 db_nslcmop
["operationParams"]["additionalParamsForVnf"] = json
.loads(
2496 db_nslcmop
["operationParams"]["additionalParamsForVnf"]
2498 ns_params
= db_nslcmop
.get("operationParams")
2499 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2500 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2502 timeout_ns_deploy
= self
.timeout
.ns_deploy
2505 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2506 self
.logger
.debug(logging_text
+ stage
[1])
2507 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2508 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2509 self
.logger
.debug(logging_text
+ stage
[1])
2510 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2511 self
.fs
.sync(db_nsr
["nsd-id"])
2513 # nsr_name = db_nsr["name"] # TODO short-name??
2515 # read from db: vnf's of this ns
2516 stage
[1] = "Getting vnfrs from db."
2517 self
.logger
.debug(logging_text
+ stage
[1])
2518 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2520 # read from db: vnfd's for every vnf
2521 db_vnfds
= [] # every vnfd data
2523 # for each vnf in ns, read vnfd
2524 for vnfr
in db_vnfrs_list
:
2525 if vnfr
.get("kdur"):
2527 for kdur
in vnfr
["kdur"]:
2528 if kdur
.get("additionalParams"):
2529 kdur
["additionalParams"] = json
.loads(
2530 kdur
["additionalParams"]
2532 kdur_list
.append(kdur
)
2533 vnfr
["kdur"] = kdur_list
2535 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2536 vnfd_id
= vnfr
["vnfd-id"]
2537 vnfd_ref
= vnfr
["vnfd-ref"]
2538 self
.fs
.sync(vnfd_id
)
2540 # if we haven't this vnfd, read it from db
2541 if vnfd_id
not in db_vnfds
:
2543 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2546 self
.logger
.debug(logging_text
+ stage
[1])
2547 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2550 db_vnfds
.append(vnfd
)
2552 # Get or generates the _admin.deployed.VCA list
2553 vca_deployed_list
= None
2554 if db_nsr
["_admin"].get("deployed"):
2555 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2556 if vca_deployed_list
is None:
2557 vca_deployed_list
= []
2558 configuration_status_list
= []
2559 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2560 db_nsr_update
["configurationStatus"] = configuration_status_list
2561 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2562 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2563 elif isinstance(vca_deployed_list
, dict):
2564 # maintain backward compatibility. Change a dict to list at database
2565 vca_deployed_list
= list(vca_deployed_list
.values())
2566 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2567 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2570 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2572 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2573 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2575 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2576 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2577 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2579 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2582 # n2vc_redesign STEP 2 Deploy Network Scenario
2583 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2584 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2586 stage
[1] = "Deploying KDUs."
2587 # self.logger.debug(logging_text + "Before deploy_kdus")
2588 # Call to deploy_kdus in case exists the "vdu:kdu" param
2589 await self
.deploy_kdus(
2590 logging_text
=logging_text
,
2592 nslcmop_id
=nslcmop_id
,
2595 task_instantiation_info
=tasks_dict_info
,
2598 stage
[1] = "Getting VCA public key."
2599 # n2vc_redesign STEP 1 Get VCA public ssh-key
2600 # feature 1429. Add n2vc public key to needed VMs
2601 n2vc_key
= self
.n2vc
.get_public_key()
2602 n2vc_key_list
= [n2vc_key
]
2603 if self
.vca_config
.public_key
:
2604 n2vc_key_list
.append(self
.vca_config
.public_key
)
2606 stage
[1] = "Deploying NS at VIM."
2607 task_ro
= asyncio
.ensure_future(
2608 self
.instantiate_RO(
2609 logging_text
=logging_text
,
2613 db_nslcmop
=db_nslcmop
,
2616 n2vc_key_list
=n2vc_key_list
,
2620 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2621 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2623 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2624 stage
[1] = "Deploying Execution Environments."
2625 self
.logger
.debug(logging_text
+ stage
[1])
2627 # create namespace and certificate if any helm based EE is present in the NS
2628 if check_helm_ee_in_ns(db_vnfds
):
2629 # TODO: create EE namespace
2630 # create TLS certificates
2631 await self
.vca_map
["helm-v3"].create_tls_certificate(
2632 secret_name
="ee-tls-{}".format(nsr_id
),
2635 usage
="server auth",
2638 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2639 for vnf_profile
in get_vnf_profiles(nsd
):
2640 vnfd_id
= vnf_profile
["vnfd-id"]
2641 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2642 member_vnf_index
= str(vnf_profile
["id"])
2643 db_vnfr
= db_vnfrs
[member_vnf_index
]
2644 base_folder
= vnfd
["_admin"]["storage"]
2651 # Get additional parameters
2652 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2653 if db_vnfr
.get("additionalParamsForVnf"):
2654 deploy_params
.update(
2655 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2658 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2659 if descriptor_config
:
2661 logging_text
=logging_text
2662 + "member_vnf_index={} ".format(member_vnf_index
),
2665 nslcmop_id
=nslcmop_id
,
2671 member_vnf_index
=member_vnf_index
,
2672 vdu_index
=vdu_index
,
2673 kdu_index
=kdu_index
,
2675 deploy_params
=deploy_params
,
2676 descriptor_config
=descriptor_config
,
2677 base_folder
=base_folder
,
2678 task_instantiation_info
=tasks_dict_info
,
2682 # Deploy charms for each VDU that supports one.
2683 for vdud
in get_vdu_list(vnfd
):
2685 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2686 vdur
= find_in_list(
2687 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2690 if vdur
.get("additionalParams"):
2691 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2693 deploy_params_vdu
= deploy_params
2694 deploy_params_vdu
["OSM"] = get_osm_params(
2695 db_vnfr
, vdu_id
, vdu_count_index
=0
2697 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2699 self
.logger
.debug("VDUD > {}".format(vdud
))
2701 "Descriptor config > {}".format(descriptor_config
)
2703 if descriptor_config
:
2707 for vdu_index
in range(vdud_count
):
2708 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2710 logging_text
=logging_text
2711 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2712 member_vnf_index
, vdu_id
, vdu_index
2716 nslcmop_id
=nslcmop_id
,
2722 kdu_index
=kdu_index
,
2723 member_vnf_index
=member_vnf_index
,
2724 vdu_index
=vdu_index
,
2726 deploy_params
=deploy_params_vdu
,
2727 descriptor_config
=descriptor_config
,
2728 base_folder
=base_folder
,
2729 task_instantiation_info
=tasks_dict_info
,
2732 for kdud
in get_kdu_list(vnfd
):
2733 kdu_name
= kdud
["name"]
2734 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2735 if descriptor_config
:
2739 kdu_index
, kdur
= next(
2741 for x
in enumerate(db_vnfr
["kdur"])
2742 if x
[1]["kdu-name"] == kdu_name
2744 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2745 if kdur
.get("additionalParams"):
2746 deploy_params_kdu
.update(
2747 parse_yaml_strings(kdur
["additionalParams"].copy())
2751 logging_text
=logging_text
,
2754 nslcmop_id
=nslcmop_id
,
2760 member_vnf_index
=member_vnf_index
,
2761 vdu_index
=vdu_index
,
2762 kdu_index
=kdu_index
,
2764 deploy_params
=deploy_params_kdu
,
2765 descriptor_config
=descriptor_config
,
2766 base_folder
=base_folder
,
2767 task_instantiation_info
=tasks_dict_info
,
2771 # Check if this NS has a charm configuration
2772 descriptor_config
= nsd
.get("ns-configuration")
2773 if descriptor_config
and descriptor_config
.get("juju"):
2776 member_vnf_index
= None
2783 # Get additional parameters
2784 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2785 if db_nsr
.get("additionalParamsForNs"):
2786 deploy_params
.update(
2787 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2789 base_folder
= nsd
["_admin"]["storage"]
2791 logging_text
=logging_text
,
2794 nslcmop_id
=nslcmop_id
,
2800 member_vnf_index
=member_vnf_index
,
2801 vdu_index
=vdu_index
,
2802 kdu_index
=kdu_index
,
2804 deploy_params
=deploy_params
,
2805 descriptor_config
=descriptor_config
,
2806 base_folder
=base_folder
,
2807 task_instantiation_info
=tasks_dict_info
,
2811 # rest of staff will be done at finally
2814 ROclient
.ROClientException
,
2820 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
2823 except asyncio
.CancelledError
:
2825 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
2827 exc
= "Operation was cancelled"
2828 except Exception as e
:
2829 exc
= traceback
.format_exc()
2830 self
.logger
.critical(
2831 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
2836 error_list
.append(str(exc
))
2838 # wait for pending tasks
2840 stage
[1] = "Waiting for instantiate pending tasks."
2841 self
.logger
.debug(logging_text
+ stage
[1])
2842 error_list
+= await self
._wait
_for
_tasks
(
2850 stage
[1] = stage
[2] = ""
2851 except asyncio
.CancelledError
:
2852 error_list
.append("Cancelled")
2853 # TODO cancel all tasks
2854 except Exception as exc
:
2855 error_list
.append(str(exc
))
2857 # update operation-status
2858 db_nsr_update
["operational-status"] = "running"
2859 # let's begin with VCA 'configured' status (later we can change it)
2860 db_nsr_update
["config-status"] = "configured"
2861 for task
, task_name
in tasks_dict_info
.items():
2862 if not task
.done() or task
.cancelled() or task
.exception():
2863 if task_name
.startswith(self
.task_name_deploy_vca
):
2864 # A N2VC task is pending
2865 db_nsr_update
["config-status"] = "failed"
2867 # RO or KDU task is pending
2868 db_nsr_update
["operational-status"] = "failed"
2870 # update status at database
2872 error_detail
= ". ".join(error_list
)
2873 self
.logger
.error(logging_text
+ error_detail
)
2874 error_description_nslcmop
= "{} Detail: {}".format(
2875 stage
[0], error_detail
2877 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
2878 nslcmop_id
, stage
[0]
2881 db_nsr_update
["detailed-status"] = (
2882 error_description_nsr
+ " Detail: " + error_detail
2884 db_nslcmop_update
["detailed-status"] = error_detail
2885 nslcmop_operation_state
= "FAILED"
2889 error_description_nsr
= error_description_nslcmop
= None
2891 db_nsr_update
["detailed-status"] = "Done"
2892 db_nslcmop_update
["detailed-status"] = "Done"
2893 nslcmop_operation_state
= "COMPLETED"
2896 self
._write
_ns
_status
(
2899 current_operation
="IDLE",
2900 current_operation_id
=None,
2901 error_description
=error_description_nsr
,
2902 error_detail
=error_detail
,
2903 other_update
=db_nsr_update
,
2905 self
._write
_op
_status
(
2908 error_message
=error_description_nslcmop
,
2909 operation_state
=nslcmop_operation_state
,
2910 other_update
=db_nslcmop_update
,
2913 if nslcmop_operation_state
:
2915 await self
.msg
.aiowrite(
2920 "nslcmop_id": nslcmop_id
,
2921 "operationState": nslcmop_operation_state
,
2925 except Exception as e
:
2927 logging_text
+ "kafka_write notification Exception {}".format(e
)
2930 self
.logger
.debug(logging_text
+ "Exit")
2931 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2933 def _get_vnfd(self
, vnfd_id
: str, projects_read
: str, cached_vnfds
: Dict
[str, Any
]):
2934 if vnfd_id
not in cached_vnfds
:
2935 cached_vnfds
[vnfd_id
] = self
.db
.get_one(
2936 "vnfds", {"id": vnfd_id
, "_admin.projects_read": projects_read
}
2938 return cached_vnfds
[vnfd_id
]
2940 def _get_vnfr(self
, nsr_id
: str, vnf_profile_id
: str, cached_vnfrs
: Dict
[str, Any
]):
2941 if vnf_profile_id
not in cached_vnfrs
:
2942 cached_vnfrs
[vnf_profile_id
] = self
.db
.get_one(
2945 "member-vnf-index-ref": vnf_profile_id
,
2946 "nsr-id-ref": nsr_id
,
2949 return cached_vnfrs
[vnf_profile_id
]
2951 def _is_deployed_vca_in_relation(
2952 self
, vca
: DeployedVCA
, relation
: Relation
2955 for endpoint
in (relation
.provider
, relation
.requirer
):
2956 if endpoint
["kdu-resource-profile-id"]:
2959 vca
.vnf_profile_id
== endpoint
.vnf_profile_id
2960 and vca
.vdu_profile_id
== endpoint
.vdu_profile_id
2961 and vca
.execution_environment_ref
== endpoint
.execution_environment_ref
2967 def _update_ee_relation_data_with_implicit_data(
2968 self
, nsr_id
, nsd
, ee_relation_data
, cached_vnfds
, vnf_profile_id
: str = None
2970 ee_relation_data
= safe_get_ee_relation(
2971 nsr_id
, ee_relation_data
, vnf_profile_id
=vnf_profile_id
2973 ee_relation_level
= EELevel
.get_level(ee_relation_data
)
2974 if (ee_relation_level
in (EELevel
.VNF
, EELevel
.VDU
)) and not ee_relation_data
[
2975 "execution-environment-ref"
2977 vnf_profile
= get_vnf_profile(nsd
, ee_relation_data
["vnf-profile-id"])
2978 vnfd_id
= vnf_profile
["vnfd-id"]
2979 project
= nsd
["_admin"]["projects_read"][0]
2980 db_vnfd
= self
._get
_vnfd
(vnfd_id
, project
, cached_vnfds
)
2983 if ee_relation_level
== EELevel
.VNF
2984 else ee_relation_data
["vdu-profile-id"]
2986 ee
= get_juju_ee_ref(db_vnfd
, entity_id
)
2989 f
"not execution environments found for ee_relation {ee_relation_data}"
2991 ee_relation_data
["execution-environment-ref"] = ee
["id"]
2992 return ee_relation_data
2994 def _get_ns_relations(
2997 nsd
: Dict
[str, Any
],
2999 cached_vnfds
: Dict
[str, Any
],
3000 ) -> List
[Relation
]:
3002 db_ns_relations
= get_ns_configuration_relation_list(nsd
)
3003 for r
in db_ns_relations
:
3004 provider_dict
= None
3005 requirer_dict
= None
3006 if all(key
in r
for key
in ("provider", "requirer")):
3007 provider_dict
= r
["provider"]
3008 requirer_dict
= r
["requirer"]
3009 elif "entities" in r
:
3010 provider_id
= r
["entities"][0]["id"]
3013 "endpoint": r
["entities"][0]["endpoint"],
3015 if provider_id
!= nsd
["id"]:
3016 provider_dict
["vnf-profile-id"] = provider_id
3017 requirer_id
= r
["entities"][1]["id"]
3020 "endpoint": r
["entities"][1]["endpoint"],
3022 if requirer_id
!= nsd
["id"]:
3023 requirer_dict
["vnf-profile-id"] = requirer_id
3026 "provider/requirer or entities must be included in the relation."
3028 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3029 nsr_id
, nsd
, provider_dict
, cached_vnfds
3031 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3032 nsr_id
, nsd
, requirer_dict
, cached_vnfds
3034 provider
= EERelation(relation_provider
)
3035 requirer
= EERelation(relation_requirer
)
3036 relation
= Relation(r
["name"], provider
, requirer
)
3037 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
3039 relations
.append(relation
)
3042 def _get_vnf_relations(
3045 nsd
: Dict
[str, Any
],
3047 cached_vnfds
: Dict
[str, Any
],
3048 ) -> List
[Relation
]:
3050 vnf_profile
= get_vnf_profile(nsd
, vca
.vnf_profile_id
)
3051 vnf_profile_id
= vnf_profile
["id"]
3052 vnfd_id
= vnf_profile
["vnfd-id"]
3053 project
= nsd
["_admin"]["projects_read"][0]
3054 db_vnfd
= self
._get
_vnfd
(vnfd_id
, project
, cached_vnfds
)
3055 db_vnf_relations
= get_relation_list(db_vnfd
, vnfd_id
)
3056 for r
in db_vnf_relations
:
3057 provider_dict
= None
3058 requirer_dict
= None
3059 if all(key
in r
for key
in ("provider", "requirer")):
3060 provider_dict
= r
["provider"]
3061 requirer_dict
= r
["requirer"]
3062 elif "entities" in r
:
3063 provider_id
= r
["entities"][0]["id"]
3066 "vnf-profile-id": vnf_profile_id
,
3067 "endpoint": r
["entities"][0]["endpoint"],
3069 if provider_id
!= vnfd_id
:
3070 provider_dict
["vdu-profile-id"] = provider_id
3071 requirer_id
= r
["entities"][1]["id"]
3074 "vnf-profile-id": vnf_profile_id
,
3075 "endpoint": r
["entities"][1]["endpoint"],
3077 if requirer_id
!= vnfd_id
:
3078 requirer_dict
["vdu-profile-id"] = requirer_id
3081 "provider/requirer or entities must be included in the relation."
3083 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3084 nsr_id
, nsd
, provider_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3086 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3087 nsr_id
, nsd
, requirer_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3089 provider
= EERelation(relation_provider
)
3090 requirer
= EERelation(relation_requirer
)
3091 relation
= Relation(r
["name"], provider
, requirer
)
3092 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
3094 relations
.append(relation
)
3097 def _get_kdu_resource_data(
3099 ee_relation
: EERelation
,
3100 db_nsr
: Dict
[str, Any
],
3101 cached_vnfds
: Dict
[str, Any
],
3102 ) -> DeployedK8sResource
:
3103 nsd
= get_nsd(db_nsr
)
3104 vnf_profiles
= get_vnf_profiles(nsd
)
3105 vnfd_id
= find_in_list(
3107 lambda vnf_profile
: vnf_profile
["id"] == ee_relation
.vnf_profile_id
,
3109 project
= nsd
["_admin"]["projects_read"][0]
3110 db_vnfd
= self
._get
_vnfd
(vnfd_id
, project
, cached_vnfds
)
3111 kdu_resource_profile
= get_kdu_resource_profile(
3112 db_vnfd
, ee_relation
.kdu_resource_profile_id
3114 kdu_name
= kdu_resource_profile
["kdu-name"]
3115 deployed_kdu
, _
= get_deployed_kdu(
3116 db_nsr
.get("_admin", ()).get("deployed", ()),
3118 ee_relation
.vnf_profile_id
,
3120 deployed_kdu
.update({"resource-name": kdu_resource_profile
["resource-name"]})
3123 def _get_deployed_component(
3125 ee_relation
: EERelation
,
3126 db_nsr
: Dict
[str, Any
],
3127 cached_vnfds
: Dict
[str, Any
],
3128 ) -> DeployedComponent
:
3129 nsr_id
= db_nsr
["_id"]
3130 deployed_component
= None
3131 ee_level
= EELevel
.get_level(ee_relation
)
3132 if ee_level
== EELevel
.NS
:
3133 vca
= get_deployed_vca(db_nsr
, {"vdu_id": None, "member-vnf-index": None})
3135 deployed_component
= DeployedVCA(nsr_id
, vca
)
3136 elif ee_level
== EELevel
.VNF
:
3137 vca
= get_deployed_vca(
3141 "member-vnf-index": ee_relation
.vnf_profile_id
,
3142 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3146 deployed_component
= DeployedVCA(nsr_id
, vca
)
3147 elif ee_level
== EELevel
.VDU
:
3148 vca
= get_deployed_vca(
3151 "vdu_id": ee_relation
.vdu_profile_id
,
3152 "member-vnf-index": ee_relation
.vnf_profile_id
,
3153 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3157 deployed_component
= DeployedVCA(nsr_id
, vca
)
3158 elif ee_level
== EELevel
.KDU
:
3159 kdu_resource_data
= self
._get
_kdu
_resource
_data
(
3160 ee_relation
, db_nsr
, cached_vnfds
3162 if kdu_resource_data
:
3163 deployed_component
= DeployedK8sResource(kdu_resource_data
)
3164 return deployed_component
3166 async def _add_relation(
3170 db_nsr
: Dict
[str, Any
],
3171 cached_vnfds
: Dict
[str, Any
],
3172 cached_vnfrs
: Dict
[str, Any
],
3174 deployed_provider
= self
._get
_deployed
_component
(
3175 relation
.provider
, db_nsr
, cached_vnfds
3177 deployed_requirer
= self
._get
_deployed
_component
(
3178 relation
.requirer
, db_nsr
, cached_vnfds
3182 and deployed_requirer
3183 and deployed_provider
.config_sw_installed
3184 and deployed_requirer
.config_sw_installed
3186 provider_db_vnfr
= (
3188 relation
.provider
.nsr_id
,
3189 relation
.provider
.vnf_profile_id
,
3192 if relation
.provider
.vnf_profile_id
3195 requirer_db_vnfr
= (
3197 relation
.requirer
.nsr_id
,
3198 relation
.requirer
.vnf_profile_id
,
3201 if relation
.requirer
.vnf_profile_id
3204 provider_vca_id
= self
.get_vca_id(provider_db_vnfr
, db_nsr
)
3205 requirer_vca_id
= self
.get_vca_id(requirer_db_vnfr
, db_nsr
)
3206 provider_relation_endpoint
= RelationEndpoint(
3207 deployed_provider
.ee_id
,
3209 relation
.provider
.endpoint
,
3211 requirer_relation_endpoint
= RelationEndpoint(
3212 deployed_requirer
.ee_id
,
3214 relation
.requirer
.endpoint
,
3216 await self
.vca_map
[vca_type
].add_relation(
3217 provider
=provider_relation_endpoint
,
3218 requirer
=requirer_relation_endpoint
,
3220 # remove entry from relations list
3224 async def _add_vca_relations(
3230 timeout
: int = 3600,
3234 # 1. find all relations for this VCA
3235 # 2. wait for other peers related
3239 # STEP 1: find all relations for this VCA
3242 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3243 nsd
= get_nsd(db_nsr
)
3246 deployed_vca_dict
= get_deployed_vca_list(db_nsr
)[vca_index
]
3247 my_vca
= DeployedVCA(nsr_id
, deployed_vca_dict
)
3252 relations
.extend(self
._get
_ns
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3253 relations
.extend(self
._get
_vnf
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3255 # if no relations, terminate
3257 self
.logger
.debug(logging_text
+ " No relations")
3260 self
.logger
.debug(logging_text
+ " adding relations {}".format(relations
))
3267 if now
- start
>= timeout
:
3268 self
.logger
.error(logging_text
+ " : timeout adding relations")
3271 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3272 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3274 # for each relation, find the VCA's related
3275 for relation
in relations
.copy():
3276 added
= await self
._add
_relation
(
3284 relations
.remove(relation
)
3287 self
.logger
.debug("Relations added")
3289 await asyncio
.sleep(5.0)
3293 except Exception as e
:
3294 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
3297 async def _install_kdu(
3305 k8s_instance_info
: dict,
3306 k8params
: dict = None,
3312 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
3315 "collection": "nsrs",
3316 "filter": {"_id": nsr_id
},
3317 "path": nsr_db_path
,
3320 if k8s_instance_info
.get("kdu-deployment-name"):
3321 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
3323 kdu_instance
= self
.k8scluster_map
[
3325 ].generate_kdu_instance_name(
3326 db_dict
=db_dict_install
,
3327 kdu_model
=k8s_instance_info
["kdu-model"],
3328 kdu_name
=k8s_instance_info
["kdu-name"],
3331 # Update the nsrs table with the kdu-instance value
3335 _desc
={nsr_db_path
+ ".kdu-instance": kdu_instance
},
3338 # Update the nsrs table with the actual namespace being used, if the k8scluster-type is `juju` or
3339 # `juju-bundle`. This verification is needed because there is not a standard/homogeneous namespace
3340 # between the Helm Charts and Juju Bundles-based KNFs. If we found a way of having an homogeneous
3341 # namespace, this first verification could be removed, and the next step would be done for any kind
3343 # TODO -> find a way to have an homogeneous namespace between the Helm Charts and Juju Bundles-based
3344 # KNFs (Bug 2027: https://osm.etsi.org/bugzilla/show_bug.cgi?id=2027)
3345 if k8sclustertype
in ("juju", "juju-bundle"):
3346 # First, verify if the current namespace is present in the `_admin.projects_read` (if not, it means
3347 # that the user passed a namespace which he wants its KDU to be deployed in)
3353 "_admin.projects_write": k8s_instance_info
["namespace"],
3354 "_admin.projects_read": k8s_instance_info
["namespace"],
3360 f
"Updating namespace/model for Juju Bundle from {k8s_instance_info['namespace']} to {kdu_instance}"
3365 _desc
={f
"{nsr_db_path}.namespace": kdu_instance
},
3367 k8s_instance_info
["namespace"] = kdu_instance
3369 await self
.k8scluster_map
[k8sclustertype
].install(
3370 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3371 kdu_model
=k8s_instance_info
["kdu-model"],
3374 db_dict
=db_dict_install
,
3376 kdu_name
=k8s_instance_info
["kdu-name"],
3377 namespace
=k8s_instance_info
["namespace"],
3378 kdu_instance
=kdu_instance
,
3382 # Obtain services to obtain management service ip
3383 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
3384 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3385 kdu_instance
=kdu_instance
,
3386 namespace
=k8s_instance_info
["namespace"],
3389 # Obtain management service info (if exists)
3390 vnfr_update_dict
= {}
3391 kdu_config
= get_configuration(vnfd
, kdud
["name"])
3393 target_ee_list
= kdu_config
.get("execution-environment-list", [])
3398 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
3401 for service
in kdud
.get("service", [])
3402 if service
.get("mgmt-service")
3404 for mgmt_service
in mgmt_services
:
3405 for service
in services
:
3406 if service
["name"].startswith(mgmt_service
["name"]):
3407 # Mgmt service found, Obtain service ip
3408 ip
= service
.get("external_ip", service
.get("cluster_ip"))
3409 if isinstance(ip
, list) and len(ip
) == 1:
3413 "kdur.{}.ip-address".format(kdu_index
)
3416 # Check if must update also mgmt ip at the vnf
3417 service_external_cp
= mgmt_service
.get(
3418 "external-connection-point-ref"
3420 if service_external_cp
:
3422 deep_get(vnfd
, ("mgmt-interface", "cp"))
3423 == service_external_cp
3425 vnfr_update_dict
["ip-address"] = ip
3430 "external-connection-point-ref", ""
3432 == service_external_cp
,
3435 "kdur.{}.ip-address".format(kdu_index
)
3440 "Mgmt service name: {} not found".format(
3441 mgmt_service
["name"]
3445 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3446 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3448 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3451 and kdu_config
.get("initial-config-primitive")
3452 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3454 initial_config_primitive_list
= kdu_config
.get(
3455 "initial-config-primitive"
3457 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3459 for initial_config_primitive
in initial_config_primitive_list
:
3460 primitive_params_
= self
._map
_primitive
_params
(
3461 initial_config_primitive
, {}, {}
3464 await asyncio
.wait_for(
3465 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3466 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3467 kdu_instance
=kdu_instance
,
3468 primitive_name
=initial_config_primitive
["name"],
3469 params
=primitive_params_
,
3470 db_dict
=db_dict_install
,
3476 except Exception as e
:
3477 # Prepare update db with error and raise exception
3480 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3484 vnfr_data
.get("_id"),
3485 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3488 # ignore to keep original exception
3490 # reraise original error
3495 async def deploy_kdus(
3502 task_instantiation_info
,
3504 # Launch kdus if present in the descriptor
3506 k8scluster_id_2_uuic
= {
3507 "helm-chart-v3": {},
3512 async def _get_cluster_id(cluster_id
, cluster_type
):
3513 nonlocal k8scluster_id_2_uuic
3514 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3515 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3517 # check if K8scluster is creating and wait look if previous tasks in process
3518 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3519 "k8scluster", cluster_id
3522 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3523 task_name
, cluster_id
3525 self
.logger
.debug(logging_text
+ text
)
3526 await asyncio
.wait(task_dependency
, timeout
=3600)
3528 db_k8scluster
= self
.db
.get_one(
3529 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3531 if not db_k8scluster
:
3532 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3534 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3536 if cluster_type
== "helm-chart-v3":
3538 # backward compatibility for existing clusters that have not been initialized for helm v3
3539 k8s_credentials
= yaml
.safe_dump(
3540 db_k8scluster
.get("credentials")
3542 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3543 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3545 db_k8scluster_update
= {}
3546 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3547 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3548 db_k8scluster_update
[
3549 "_admin.helm-chart-v3.created"
3551 db_k8scluster_update
[
3552 "_admin.helm-chart-v3.operationalState"
3555 "k8sclusters", cluster_id
, db_k8scluster_update
3557 except Exception as e
:
3560 + "error initializing helm-v3 cluster: {}".format(str(e
))
3563 "K8s cluster '{}' has not been initialized for '{}'".format(
3564 cluster_id
, cluster_type
3569 "K8s cluster '{}' has not been initialized for '{}'".format(
3570 cluster_id
, cluster_type
3573 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3576 logging_text
+= "Deploy kdus: "
3579 db_nsr_update
= {"_admin.deployed.K8s": []}
3580 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3583 updated_cluster_list
= []
3584 updated_v3_cluster_list
= []
3586 for vnfr_data
in db_vnfrs
.values():
3587 vca_id
= self
.get_vca_id(vnfr_data
, {})
3588 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3589 # Step 0: Prepare and set parameters
3590 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3591 vnfd_id
= vnfr_data
.get("vnfd-id")
3592 vnfd_with_id
= find_in_list(
3593 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3597 for kdud
in vnfd_with_id
["kdu"]
3598 if kdud
["name"] == kdur
["kdu-name"]
3600 namespace
= kdur
.get("k8s-namespace")
3601 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3602 if kdur
.get("helm-chart"):
3603 kdumodel
= kdur
["helm-chart"]
3604 # Default version: helm3, if helm-version is v2 assign v2
3605 k8sclustertype
= "helm-chart-v3"
3606 self
.logger
.debug("kdur: {}".format(kdur
))
3608 kdur
.get("helm-version")
3609 and kdur
.get("helm-version") == "v2"
3611 k8sclustertype
= "helm-chart"
3612 elif kdur
.get("juju-bundle"):
3613 kdumodel
= kdur
["juju-bundle"]
3614 k8sclustertype
= "juju-bundle"
3617 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3618 "juju-bundle. Maybe an old NBI version is running".format(
3619 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3622 # check if kdumodel is a file and exists
3624 vnfd_with_id
= find_in_list(
3625 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3627 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3628 if storage
: # may be not present if vnfd has not artifacts
3629 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3630 if storage
["pkg-dir"]:
3631 filename
= "{}/{}/{}s/{}".format(
3638 filename
= "{}/Scripts/{}s/{}".format(
3643 if self
.fs
.file_exists(
3644 filename
, mode
="file"
3645 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3646 kdumodel
= self
.fs
.path
+ filename
3647 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3649 except Exception: # it is not a file
3652 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3653 step
= "Synchronize repos for k8s cluster '{}'".format(
3656 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3660 k8sclustertype
== "helm-chart"
3661 and cluster_uuid
not in updated_cluster_list
3663 k8sclustertype
== "helm-chart-v3"
3664 and cluster_uuid
not in updated_v3_cluster_list
3666 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3667 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3668 cluster_uuid
=cluster_uuid
3671 if del_repo_list
or added_repo_dict
:
3672 if k8sclustertype
== "helm-chart":
3674 "_admin.helm_charts_added." + item
: None
3675 for item
in del_repo_list
3678 "_admin.helm_charts_added." + item
: name
3679 for item
, name
in added_repo_dict
.items()
3681 updated_cluster_list
.append(cluster_uuid
)
3682 elif k8sclustertype
== "helm-chart-v3":
3684 "_admin.helm_charts_v3_added." + item
: None
3685 for item
in del_repo_list
3688 "_admin.helm_charts_v3_added." + item
: name
3689 for item
, name
in added_repo_dict
.items()
3691 updated_v3_cluster_list
.append(cluster_uuid
)
3693 logging_text
+ "repos synchronized on k8s cluster "
3694 "'{}' to_delete: {}, to_add: {}".format(
3695 k8s_cluster_id
, del_repo_list
, added_repo_dict
3700 {"_id": k8s_cluster_id
},
3706 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3707 vnfr_data
["member-vnf-index-ref"],
3711 k8s_instance_info
= {
3712 "kdu-instance": None,
3713 "k8scluster-uuid": cluster_uuid
,
3714 "k8scluster-type": k8sclustertype
,
3715 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3716 "kdu-name": kdur
["kdu-name"],
3717 "kdu-model": kdumodel
,
3718 "namespace": namespace
,
3719 "kdu-deployment-name": kdu_deployment_name
,
3721 db_path
= "_admin.deployed.K8s.{}".format(index
)
3722 db_nsr_update
[db_path
] = k8s_instance_info
3723 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3724 vnfd_with_id
= find_in_list(
3725 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3727 task
= asyncio
.ensure_future(
3736 k8params
=desc_params
,
3741 self
.lcm_tasks
.register(
3745 "instantiate_KDU-{}".format(index
),
3748 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3754 except (LcmException
, asyncio
.CancelledError
):
3756 except Exception as e
:
3757 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3758 if isinstance(e
, (N2VCException
, DbException
)):
3759 self
.logger
.error(logging_text
+ msg
)
3761 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3762 raise LcmException(msg
)
3765 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3785 task_instantiation_info
,
3788 # launch instantiate_N2VC in a asyncio task and register task object
3789 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3790 # if not found, create one entry and update database
3791 # fill db_nsr._admin.deployed.VCA.<index>
3794 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
3798 get_charm_name
= False
3799 if "execution-environment-list" in descriptor_config
:
3800 ee_list
= descriptor_config
.get("execution-environment-list", [])
3801 elif "juju" in descriptor_config
:
3802 ee_list
= [descriptor_config
] # ns charms
3803 if "execution-environment-list" not in descriptor_config
:
3804 # charm name is only required for ns charms
3805 get_charm_name
= True
3806 else: # other types as script are not supported
3809 for ee_item
in ee_list
:
3812 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3813 ee_item
.get("juju"), ee_item
.get("helm-chart")
3816 ee_descriptor_id
= ee_item
.get("id")
3817 if ee_item
.get("juju"):
3818 vca_name
= ee_item
["juju"].get("charm")
3820 charm_name
= self
.find_charm_name(db_nsr
, str(vca_name
))
3823 if ee_item
["juju"].get("charm") is not None
3826 if ee_item
["juju"].get("cloud") == "k8s":
3827 vca_type
= "k8s_proxy_charm"
3828 elif ee_item
["juju"].get("proxy") is False:
3829 vca_type
= "native_charm"
3830 elif ee_item
.get("helm-chart"):
3831 vca_name
= ee_item
["helm-chart"]
3832 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
3835 vca_type
= "helm-v3"
3838 logging_text
+ "skipping non juju neither charm configuration"
3843 for vca_index
, vca_deployed
in enumerate(
3844 db_nsr
["_admin"]["deployed"]["VCA"]
3846 if not vca_deployed
:
3849 vca_deployed
.get("member-vnf-index") == member_vnf_index
3850 and vca_deployed
.get("vdu_id") == vdu_id
3851 and vca_deployed
.get("kdu_name") == kdu_name
3852 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
3853 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
3857 # not found, create one.
3859 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
3862 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
3864 target
+= "/kdu/{}".format(kdu_name
)
3866 "target_element": target
,
3867 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3868 "member-vnf-index": member_vnf_index
,
3870 "kdu_name": kdu_name
,
3871 "vdu_count_index": vdu_index
,
3872 "operational-status": "init", # TODO revise
3873 "detailed-status": "", # TODO revise
3874 "step": "initial-deploy", # TODO revise
3876 "vdu_name": vdu_name
,
3878 "ee_descriptor_id": ee_descriptor_id
,
3879 "charm_name": charm_name
,
3883 # create VCA and configurationStatus in db
3885 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
3886 "configurationStatus.{}".format(vca_index
): dict(),
3888 self
.update_db_2("nsrs", nsr_id
, db_dict
)
3890 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
3892 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
3893 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
3894 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
3897 task_n2vc
= asyncio
.ensure_future(
3898 self
.instantiate_N2VC(
3899 logging_text
=logging_text
,
3900 vca_index
=vca_index
,
3906 vdu_index
=vdu_index
,
3907 kdu_index
=kdu_index
,
3908 deploy_params
=deploy_params
,
3909 config_descriptor
=descriptor_config
,
3910 base_folder
=base_folder
,
3911 nslcmop_id
=nslcmop_id
,
3915 ee_config_descriptor
=ee_item
,
3918 self
.lcm_tasks
.register(
3922 "instantiate_N2VC-{}".format(vca_index
),
3925 task_instantiation_info
[
3927 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
3928 member_vnf_index
or "", vdu_id
or ""
3932 def _create_nslcmop(nsr_id
, operation
, params
):
3934 Creates a ns-lcm-opp content to be stored at database.
3935 :param nsr_id: internal id of the instance
3936 :param operation: instantiate, terminate, scale, action, ...
3937 :param params: user parameters for the operation
3938 :return: dictionary following SOL005 format
3940 # Raise exception if invalid arguments
3941 if not (nsr_id
and operation
and params
):
3943 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3950 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3951 "operationState": "PROCESSING",
3952 "statusEnteredTime": now
,
3953 "nsInstanceId": nsr_id
,
3954 "lcmOperationType": operation
,
3956 "isAutomaticInvocation": False,
3957 "operationParams": params
,
3958 "isCancelPending": False,
3960 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
3961 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
3966 def _format_additional_params(self
, params
):
3967 params
= params
or {}
3968 for key
, value
in params
.items():
3969 if str(value
).startswith("!!yaml "):
3970 params
[key
] = yaml
.safe_load(value
[7:])
3973 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
3974 primitive
= seq
.get("name")
3975 primitive_params
= {}
3977 "member_vnf_index": vnf_index
,
3978 "primitive": primitive
,
3979 "primitive_params": primitive_params
,
3982 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
3986 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
3987 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
3988 if op
.get("operationState") == "COMPLETED":
3989 # b. Skip sub-operation
3990 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3991 return self
.SUBOPERATION_STATUS_SKIP
3993 # c. retry executing sub-operation
3994 # The sub-operation exists, and operationState != 'COMPLETED'
3995 # Update operationState = 'PROCESSING' to indicate a retry.
3996 operationState
= "PROCESSING"
3997 detailed_status
= "In progress"
3998 self
._update
_suboperation
_status
(
3999 db_nslcmop
, op_index
, operationState
, detailed_status
4001 # Return the sub-operation index
4002 # _ns_execute_primitive() or RO.create_action() will be called from scale()
4003 # with arguments extracted from the sub-operation
4006 # Find a sub-operation where all keys in a matching dictionary must match
4007 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
4008 def _find_suboperation(self
, db_nslcmop
, match
):
4009 if db_nslcmop
and match
:
4010 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
4011 for i
, op
in enumerate(op_list
):
4012 if all(op
.get(k
) == match
[k
] for k
in match
):
4014 return self
.SUBOPERATION_STATUS_NOT_FOUND
4016 # Update status for a sub-operation given its index
4017 def _update_suboperation_status(
4018 self
, db_nslcmop
, op_index
, operationState
, detailed_status
4020 # Update DB for HA tasks
4021 q_filter
= {"_id": db_nslcmop
["_id"]}
4023 "_admin.operations.{}.operationState".format(op_index
): operationState
,
4024 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
4027 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
4030 # Add sub-operation, return the index of the added sub-operation
4031 # Optionally, set operationState, detailed-status, and operationType
4032 # Status and type are currently set for 'scale' sub-operations:
4033 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
4034 # 'detailed-status' : status message
4035 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
4036 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
4037 def _add_suboperation(
4045 mapped_primitive_params
,
4046 operationState
=None,
4047 detailed_status
=None,
4050 RO_scaling_info
=None,
4053 return self
.SUBOPERATION_STATUS_NOT_FOUND
4054 # Get the "_admin.operations" list, if it exists
4055 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
4056 op_list
= db_nslcmop_admin
.get("operations")
4057 # Create or append to the "_admin.operations" list
4059 "member_vnf_index": vnf_index
,
4061 "vdu_count_index": vdu_count_index
,
4062 "primitive": primitive
,
4063 "primitive_params": mapped_primitive_params
,
4066 new_op
["operationState"] = operationState
4068 new_op
["detailed-status"] = detailed_status
4070 new_op
["lcmOperationType"] = operationType
4072 new_op
["RO_nsr_id"] = RO_nsr_id
4074 new_op
["RO_scaling_info"] = RO_scaling_info
4076 # No existing operations, create key 'operations' with current operation as first list element
4077 db_nslcmop_admin
.update({"operations": [new_op
]})
4078 op_list
= db_nslcmop_admin
.get("operations")
4080 # Existing operations, append operation to list
4081 op_list
.append(new_op
)
4083 db_nslcmop_update
= {"_admin.operations": op_list
}
4084 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
4085 op_index
= len(op_list
) - 1
4088 # Helper methods for scale() sub-operations
4090 # pre-scale/post-scale:
4091 # Check for 3 different cases:
4092 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
4093 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
4094 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
4095 def _check_or_add_scale_suboperation(
4099 vnf_config_primitive
,
4103 RO_scaling_info
=None,
4105 # Find this sub-operation
4106 if RO_nsr_id
and RO_scaling_info
:
4107 operationType
= "SCALE-RO"
4109 "member_vnf_index": vnf_index
,
4110 "RO_nsr_id": RO_nsr_id
,
4111 "RO_scaling_info": RO_scaling_info
,
4115 "member_vnf_index": vnf_index
,
4116 "primitive": vnf_config_primitive
,
4117 "primitive_params": primitive_params
,
4118 "lcmOperationType": operationType
,
4120 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
4121 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
4122 # a. New sub-operation
4123 # The sub-operation does not exist, add it.
4124 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
4125 # The following parameters are set to None for all kind of scaling:
4127 vdu_count_index
= None
4129 if RO_nsr_id
and RO_scaling_info
:
4130 vnf_config_primitive
= None
4131 primitive_params
= None
4134 RO_scaling_info
= None
4135 # Initial status for sub-operation
4136 operationState
= "PROCESSING"
4137 detailed_status
= "In progress"
4138 # Add sub-operation for pre/post-scaling (zero or more operations)
4139 self
._add
_suboperation
(
4145 vnf_config_primitive
,
4153 return self
.SUBOPERATION_STATUS_NEW
4155 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
4156 # or op_index (operationState != 'COMPLETED')
4157 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
4159 # Function to return execution_environment id
4161 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
4162 # TODO vdu_index_count
4163 for vca
in vca_deployed_list
:
4164 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
4167 async def destroy_N2VC(
4175 exec_primitives
=True,
4180 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
4181 :param logging_text:
4183 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
4184 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
4185 :param vca_index: index in the database _admin.deployed.VCA
4186 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
4187 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
4188 not executed properly
4189 :param scaling_in: True destroys the application, False destroys the model
4190 :return: None or exception
4195 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
4196 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
4200 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
4202 # execute terminate_primitives
4204 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
4205 config_descriptor
.get("terminate-config-primitive"),
4206 vca_deployed
.get("ee_descriptor_id"),
4208 vdu_id
= vca_deployed
.get("vdu_id")
4209 vdu_count_index
= vca_deployed
.get("vdu_count_index")
4210 vdu_name
= vca_deployed
.get("vdu_name")
4211 vnf_index
= vca_deployed
.get("member-vnf-index")
4212 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
4213 for seq
in terminate_primitives
:
4214 # For each sequence in list, get primitive and call _ns_execute_primitive()
4215 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
4216 vnf_index
, seq
.get("name")
4218 self
.logger
.debug(logging_text
+ step
)
4219 # Create the primitive for each sequence, i.e. "primitive": "touch"
4220 primitive
= seq
.get("name")
4221 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
4226 self
._add
_suboperation
(
4233 mapped_primitive_params
,
4235 # Sub-operations: Call _ns_execute_primitive() instead of action()
4237 result
, result_detail
= await self
._ns
_execute
_primitive
(
4238 vca_deployed
["ee_id"],
4240 mapped_primitive_params
,
4244 except LcmException
:
4245 # this happens when VCA is not deployed. In this case it is not needed to terminate
4247 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
4248 if result
not in result_ok
:
4250 "terminate_primitive {} for vnf_member_index={} fails with "
4251 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
4253 # set that this VCA do not need terminated
4254 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
4258 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
4261 # Delete Prometheus Jobs if any
4262 # This uses NSR_ID, so it will destroy any jobs under this index
4263 self
.db
.del_list("prometheus_jobs", {"nsr_id": db_nslcmop
["nsInstanceId"]})
4266 await self
.vca_map
[vca_type
].delete_execution_environment(
4267 vca_deployed
["ee_id"],
4268 scaling_in
=scaling_in
,
4273 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
4274 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
4275 namespace
= "." + db_nsr
["_id"]
4277 await self
.n2vc
.delete_namespace(
4278 namespace
=namespace
,
4279 total_timeout
=self
.timeout
.charm_delete
,
4282 except N2VCNotFound
: # already deleted. Skip
4284 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
4286 async def _terminate_RO(
4287 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4290 Terminates a deployment from RO
4291 :param logging_text:
4292 :param nsr_deployed: db_nsr._admin.deployed
4295 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
4296 this method will update only the index 2, but it will write on database the concatenated content of the list
4301 ro_nsr_id
= ro_delete_action
= None
4302 if nsr_deployed
and nsr_deployed
.get("RO"):
4303 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
4304 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
4307 stage
[2] = "Deleting ns from VIM."
4308 db_nsr_update
["detailed-status"] = " ".join(stage
)
4309 self
._write
_op
_status
(nslcmop_id
, stage
)
4310 self
.logger
.debug(logging_text
+ stage
[2])
4311 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4312 self
._write
_op
_status
(nslcmop_id
, stage
)
4313 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
4314 ro_delete_action
= desc
["action_id"]
4316 "_admin.deployed.RO.nsr_delete_action_id"
4317 ] = ro_delete_action
4318 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4319 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4320 if ro_delete_action
:
4321 # wait until NS is deleted from VIM
4322 stage
[2] = "Waiting ns deleted from VIM."
4323 detailed_status_old
= None
4327 + " RO_id={} ro_delete_action={}".format(
4328 ro_nsr_id
, ro_delete_action
4331 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4332 self
._write
_op
_status
(nslcmop_id
, stage
)
4334 delete_timeout
= 20 * 60 # 20 minutes
4335 while delete_timeout
> 0:
4336 desc
= await self
.RO
.show(
4338 item_id_name
=ro_nsr_id
,
4339 extra_item
="action",
4340 extra_item_id
=ro_delete_action
,
4344 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
4346 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
4347 if ns_status
== "ERROR":
4348 raise ROclient
.ROClientException(ns_status_info
)
4349 elif ns_status
== "BUILD":
4350 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
4351 elif ns_status
== "ACTIVE":
4352 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4353 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4358 ), "ROclient.check_action_status returns unknown {}".format(
4361 if stage
[2] != detailed_status_old
:
4362 detailed_status_old
= stage
[2]
4363 db_nsr_update
["detailed-status"] = " ".join(stage
)
4364 self
._write
_op
_status
(nslcmop_id
, stage
)
4365 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4366 await asyncio
.sleep(5, loop
=self
.loop
)
4368 else: # delete_timeout <= 0:
4369 raise ROclient
.ROClientException(
4370 "Timeout waiting ns deleted from VIM"
4373 except Exception as e
:
4374 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4376 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4378 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4379 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4380 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4382 logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
)
4385 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4387 failed_detail
.append("delete conflict: {}".format(e
))
4390 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
)
4393 failed_detail
.append("delete error: {}".format(e
))
4395 logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
)
4399 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
4400 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
4402 stage
[2] = "Deleting nsd from RO."
4403 db_nsr_update
["detailed-status"] = " ".join(stage
)
4404 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4405 self
._write
_op
_status
(nslcmop_id
, stage
)
4406 await self
.RO
.delete("nsd", ro_nsd_id
)
4408 logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
)
4410 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4411 except Exception as e
:
4413 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4415 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4417 logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
)
4420 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4422 failed_detail
.append(
4423 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
)
4425 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4427 failed_detail
.append(
4428 "ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
)
4430 self
.logger
.error(logging_text
+ failed_detail
[-1])
4432 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
4433 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
4434 if not vnf_deployed
or not vnf_deployed
["id"]:
4437 ro_vnfd_id
= vnf_deployed
["id"]
4440 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4441 vnf_deployed
["member-vnf-index"], ro_vnfd_id
4443 db_nsr_update
["detailed-status"] = " ".join(stage
)
4444 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4445 self
._write
_op
_status
(nslcmop_id
, stage
)
4446 await self
.RO
.delete("vnfd", ro_vnfd_id
)
4448 logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
)
4450 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
4451 except Exception as e
:
4453 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4456 "_admin.deployed.RO.vnfd.{}.id".format(index
)
4460 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
)
4463 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4465 failed_detail
.append(
4466 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
)
4468 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4470 failed_detail
.append(
4471 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
)
4473 self
.logger
.error(logging_text
+ failed_detail
[-1])
4476 stage
[2] = "Error deleting from VIM"
4478 stage
[2] = "Deleted from VIM"
4479 db_nsr_update
["detailed-status"] = " ".join(stage
)
4480 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4481 self
._write
_op
_status
(nslcmop_id
, stage
)
4484 raise LcmException("; ".join(failed_detail
))
4486 async def terminate(self
, nsr_id
, nslcmop_id
):
4487 # Try to lock HA task here
4488 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4489 if not task_is_locked_by_me
:
4492 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4493 self
.logger
.debug(logging_text
+ "Enter")
4494 timeout_ns_terminate
= self
.timeout
.ns_terminate
4497 operation_params
= None
4499 error_list
= [] # annotates all failed error messages
4500 db_nslcmop_update
= {}
4501 autoremove
= False # autoremove after terminated
4502 tasks_dict_info
= {}
4505 "Stage 1/3: Preparing task.",
4506 "Waiting for previous operations to terminate.",
4509 # ^ contains [stage, step, VIM-status]
4511 # wait for any previous tasks in process
4512 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4514 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4515 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4516 operation_params
= db_nslcmop
.get("operationParams") or {}
4517 if operation_params
.get("timeout_ns_terminate"):
4518 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4519 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4520 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4522 db_nsr_update
["operational-status"] = "terminating"
4523 db_nsr_update
["config-status"] = "terminating"
4524 self
._write
_ns
_status
(
4526 ns_state
="TERMINATING",
4527 current_operation
="TERMINATING",
4528 current_operation_id
=nslcmop_id
,
4529 other_update
=db_nsr_update
,
4531 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4532 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4533 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4536 stage
[1] = "Getting vnf descriptors from db."
4537 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4539 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4541 db_vnfds_from_id
= {}
4542 db_vnfds_from_member_index
= {}
4544 for vnfr
in db_vnfrs_list
:
4545 vnfd_id
= vnfr
["vnfd-id"]
4546 if vnfd_id
not in db_vnfds_from_id
:
4547 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4548 db_vnfds_from_id
[vnfd_id
] = vnfd
4549 db_vnfds_from_member_index
[
4550 vnfr
["member-vnf-index-ref"]
4551 ] = db_vnfds_from_id
[vnfd_id
]
4553 # Destroy individual execution environments when there are terminating primitives.
4554 # Rest of EE will be deleted at once
4555 # TODO - check before calling _destroy_N2VC
4556 # if not operation_params.get("skip_terminate_primitives"):#
4557 # or not vca.get("needed_terminate"):
4558 stage
[0] = "Stage 2/3 execute terminating primitives."
4559 self
.logger
.debug(logging_text
+ stage
[0])
4560 stage
[1] = "Looking execution environment that needs terminate."
4561 self
.logger
.debug(logging_text
+ stage
[1])
4563 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4564 config_descriptor
= None
4565 vca_member_vnf_index
= vca
.get("member-vnf-index")
4566 vca_id
= self
.get_vca_id(
4567 db_vnfrs_dict
.get(vca_member_vnf_index
)
4568 if vca_member_vnf_index
4572 if not vca
or not vca
.get("ee_id"):
4574 if not vca
.get("member-vnf-index"):
4576 config_descriptor
= db_nsr
.get("ns-configuration")
4577 elif vca
.get("vdu_id"):
4578 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4579 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4580 elif vca
.get("kdu_name"):
4581 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4582 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4584 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4585 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4586 vca_type
= vca
.get("type")
4587 exec_terminate_primitives
= not operation_params
.get(
4588 "skip_terminate_primitives"
4589 ) and vca
.get("needed_terminate")
4590 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4591 # pending native charms
4593 True if vca_type
in ("helm", "helm-v3", "native_charm") else False
4595 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4596 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4597 task
= asyncio
.ensure_future(
4605 exec_terminate_primitives
,
4609 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4611 # wait for pending tasks of terminate primitives
4615 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4617 error_list
= await self
._wait
_for
_tasks
(
4620 min(self
.timeout
.charm_delete
, timeout_ns_terminate
),
4624 tasks_dict_info
.clear()
4626 return # raise LcmException("; ".join(error_list))
4628 # remove All execution environments at once
4629 stage
[0] = "Stage 3/3 delete all."
4631 if nsr_deployed
.get("VCA"):
4632 stage
[1] = "Deleting all execution environments."
4633 self
.logger
.debug(logging_text
+ stage
[1])
4634 vca_id
= self
.get_vca_id({}, db_nsr
)
4635 task_delete_ee
= asyncio
.ensure_future(
4637 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4638 timeout
=self
.timeout
.charm_delete
,
4641 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4642 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4644 # Delete Namespace and Certificates if necessary
4645 if check_helm_ee_in_ns(list(db_vnfds_from_member_index
.values())):
4646 await self
.vca_map
["helm-v3"].delete_tls_certificate(
4647 certificate_name
=db_nslcmop
["nsInstanceId"],
4649 # TODO: Delete namespace
4651 # Delete from k8scluster
4652 stage
[1] = "Deleting KDUs."
4653 self
.logger
.debug(logging_text
+ stage
[1])
4654 # print(nsr_deployed)
4655 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4656 if not kdu
or not kdu
.get("kdu-instance"):
4658 kdu_instance
= kdu
.get("kdu-instance")
4659 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4660 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4661 vca_id
= self
.get_vca_id({}, db_nsr
)
4662 task_delete_kdu_instance
= asyncio
.ensure_future(
4663 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4664 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4665 kdu_instance
=kdu_instance
,
4667 namespace
=kdu
.get("namespace"),
4673 + "Unknown k8s deployment type {}".format(
4674 kdu
.get("k8scluster-type")
4679 task_delete_kdu_instance
4680 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4683 stage
[1] = "Deleting ns from VIM."
4684 if self
.ro_config
.ng
:
4685 task_delete_ro
= asyncio
.ensure_future(
4686 self
._terminate
_ng
_ro
(
4687 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4691 task_delete_ro
= asyncio
.ensure_future(
4693 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4696 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4698 # rest of staff will be done at finally
4701 ROclient
.ROClientException
,
4706 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4708 except asyncio
.CancelledError
:
4710 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4712 exc
= "Operation was cancelled"
4713 except Exception as e
:
4714 exc
= traceback
.format_exc()
4715 self
.logger
.critical(
4716 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4721 error_list
.append(str(exc
))
4723 # wait for pending tasks
4725 stage
[1] = "Waiting for terminate pending tasks."
4726 self
.logger
.debug(logging_text
+ stage
[1])
4727 error_list
+= await self
._wait
_for
_tasks
(
4730 timeout_ns_terminate
,
4734 stage
[1] = stage
[2] = ""
4735 except asyncio
.CancelledError
:
4736 error_list
.append("Cancelled")
4737 # TODO cancell all tasks
4738 except Exception as exc
:
4739 error_list
.append(str(exc
))
4740 # update status at database
4742 error_detail
= "; ".join(error_list
)
4743 # self.logger.error(logging_text + error_detail)
4744 error_description_nslcmop
= "{} Detail: {}".format(
4745 stage
[0], error_detail
4747 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4748 nslcmop_id
, stage
[0]
4751 db_nsr_update
["operational-status"] = "failed"
4752 db_nsr_update
["detailed-status"] = (
4753 error_description_nsr
+ " Detail: " + error_detail
4755 db_nslcmop_update
["detailed-status"] = error_detail
4756 nslcmop_operation_state
= "FAILED"
4760 error_description_nsr
= error_description_nslcmop
= None
4761 ns_state
= "NOT_INSTANTIATED"
4762 db_nsr_update
["operational-status"] = "terminated"
4763 db_nsr_update
["detailed-status"] = "Done"
4764 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4765 db_nslcmop_update
["detailed-status"] = "Done"
4766 nslcmop_operation_state
= "COMPLETED"
4769 self
._write
_ns
_status
(
4772 current_operation
="IDLE",
4773 current_operation_id
=None,
4774 error_description
=error_description_nsr
,
4775 error_detail
=error_detail
,
4776 other_update
=db_nsr_update
,
4778 self
._write
_op
_status
(
4781 error_message
=error_description_nslcmop
,
4782 operation_state
=nslcmop_operation_state
,
4783 other_update
=db_nslcmop_update
,
4785 if ns_state
== "NOT_INSTANTIATED":
4789 {"nsr-id-ref": nsr_id
},
4790 {"_admin.nsState": "NOT_INSTANTIATED"},
4792 except DbException
as e
:
4795 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4799 if operation_params
:
4800 autoremove
= operation_params
.get("autoremove", False)
4801 if nslcmop_operation_state
:
4803 await self
.msg
.aiowrite(
4808 "nslcmop_id": nslcmop_id
,
4809 "operationState": nslcmop_operation_state
,
4810 "autoremove": autoremove
,
4814 except Exception as e
:
4816 logging_text
+ "kafka_write notification Exception {}".format(e
)
4819 self
.logger
.debug(logging_text
+ "Exit")
4820 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4822 async def _wait_for_tasks(
4823 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4826 error_detail_list
= []
4828 pending_tasks
= list(created_tasks_info
.keys())
4829 num_tasks
= len(pending_tasks
)
4831 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4832 self
._write
_op
_status
(nslcmop_id
, stage
)
4833 while pending_tasks
:
4835 _timeout
= timeout
+ time_start
- time()
4836 done
, pending_tasks
= await asyncio
.wait(
4837 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4839 num_done
+= len(done
)
4840 if not done
: # Timeout
4841 for task
in pending_tasks
:
4842 new_error
= created_tasks_info
[task
] + ": Timeout"
4843 error_detail_list
.append(new_error
)
4844 error_list
.append(new_error
)
4847 if task
.cancelled():
4850 exc
= task
.exception()
4852 if isinstance(exc
, asyncio
.TimeoutError
):
4854 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4855 error_list
.append(created_tasks_info
[task
])
4856 error_detail_list
.append(new_error
)
4863 ROclient
.ROClientException
,
4869 self
.logger
.error(logging_text
+ new_error
)
4871 exc_traceback
= "".join(
4872 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4876 + created_tasks_info
[task
]
4882 logging_text
+ created_tasks_info
[task
] + ": Done"
4884 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4886 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4887 if nsr_id
: # update also nsr
4892 "errorDescription": "Error at: " + ", ".join(error_list
),
4893 "errorDetail": ". ".join(error_detail_list
),
4896 self
._write
_op
_status
(nslcmop_id
, stage
)
4897 return error_detail_list
4900 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4902 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4903 The default-value is used. If it is between < > it look for a value at instantiation_params
4904 :param primitive_desc: portion of VNFD/NSD that describes primitive
4905 :param params: Params provided by user
4906 :param instantiation_params: Instantiation params provided by user
4907 :return: a dictionary with the calculated params
4909 calculated_params
= {}
4910 for parameter
in primitive_desc
.get("parameter", ()):
4911 param_name
= parameter
["name"]
4912 if param_name
in params
:
4913 calculated_params
[param_name
] = params
[param_name
]
4914 elif "default-value" in parameter
or "value" in parameter
:
4915 if "value" in parameter
:
4916 calculated_params
[param_name
] = parameter
["value"]
4918 calculated_params
[param_name
] = parameter
["default-value"]
4920 isinstance(calculated_params
[param_name
], str)
4921 and calculated_params
[param_name
].startswith("<")
4922 and calculated_params
[param_name
].endswith(">")
4924 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4925 calculated_params
[param_name
] = instantiation_params
[
4926 calculated_params
[param_name
][1:-1]
4930 "Parameter {} needed to execute primitive {} not provided".format(
4931 calculated_params
[param_name
], primitive_desc
["name"]
4936 "Parameter {} needed to execute primitive {} not provided".format(
4937 param_name
, primitive_desc
["name"]
4941 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4942 calculated_params
[param_name
] = yaml
.safe_dump(
4943 calculated_params
[param_name
], default_flow_style
=True, width
=256
4945 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4947 ].startswith("!!yaml "):
4948 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4949 if parameter
.get("data-type") == "INTEGER":
4951 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4952 except ValueError: # error converting string to int
4954 "Parameter {} of primitive {} must be integer".format(
4955 param_name
, primitive_desc
["name"]
4958 elif parameter
.get("data-type") == "BOOLEAN":
4959 calculated_params
[param_name
] = not (
4960 (str(calculated_params
[param_name
])).lower() == "false"
4963 # add always ns_config_info if primitive name is config
4964 if primitive_desc
["name"] == "config":
4965 if "ns_config_info" in instantiation_params
:
4966 calculated_params
["ns_config_info"] = instantiation_params
[
4969 return calculated_params
4971 def _look_for_deployed_vca(
4978 ee_descriptor_id
=None,
4980 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4981 for vca
in deployed_vca
:
4984 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4987 vdu_count_index
is not None
4988 and vdu_count_index
!= vca
["vdu_count_index"]
4991 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4993 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4997 # vca_deployed not found
4999 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
5000 " is not deployed".format(
5009 ee_id
= vca
.get("ee_id")
5011 "type", "lxc_proxy_charm"
5012 ) # default value for backward compatibility - proxy charm
5015 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
5016 "execution environment".format(
5017 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
5020 return ee_id
, vca_type
5022 async def _ns_execute_primitive(
5028 retries_interval
=30,
5035 if primitive
== "config":
5036 primitive_params
= {"params": primitive_params
}
5038 vca_type
= vca_type
or "lxc_proxy_charm"
5042 output
= await asyncio
.wait_for(
5043 self
.vca_map
[vca_type
].exec_primitive(
5045 primitive_name
=primitive
,
5046 params_dict
=primitive_params
,
5047 progress_timeout
=self
.timeout
.progress_primitive
,
5048 total_timeout
=self
.timeout
.primitive
,
5053 timeout
=timeout
or self
.timeout
.primitive
,
5057 except asyncio
.CancelledError
:
5059 except Exception as e
:
5063 "Error executing action {} on {} -> {}".format(
5068 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
5070 if isinstance(e
, asyncio
.TimeoutError
):
5072 message
="Timed out waiting for action to complete"
5074 return "FAILED", getattr(e
, "message", repr(e
))
5076 return "COMPLETED", output
5078 except (LcmException
, asyncio
.CancelledError
):
5080 except Exception as e
:
5081 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
5083 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
5085 Updating the vca_status with latest juju information in nsrs record
5086 :param: nsr_id: Id of the nsr
5087 :param: nslcmop_id: Id of the nslcmop
5091 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
5092 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5093 vca_id
= self
.get_vca_id({}, db_nsr
)
5094 if db_nsr
["_admin"]["deployed"]["K8s"]:
5095 for _
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
5096 cluster_uuid
, kdu_instance
, cluster_type
= (
5097 k8s
["k8scluster-uuid"],
5098 k8s
["kdu-instance"],
5099 k8s
["k8scluster-type"],
5101 await self
._on
_update
_k
8s
_db
(
5102 cluster_uuid
=cluster_uuid
,
5103 kdu_instance
=kdu_instance
,
5104 filter={"_id": nsr_id
},
5106 cluster_type
=cluster_type
,
5109 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
5110 table
, filter = "nsrs", {"_id": nsr_id
}
5111 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
5112 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
5114 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
5115 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
5117 async def action(self
, nsr_id
, nslcmop_id
):
5118 # Try to lock HA task here
5119 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5120 if not task_is_locked_by_me
:
5123 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
5124 self
.logger
.debug(logging_text
+ "Enter")
5125 # get all needed from database
5129 db_nslcmop_update
= {}
5130 nslcmop_operation_state
= None
5131 error_description_nslcmop
= None
5134 # wait for any previous tasks in process
5135 step
= "Waiting for previous operations to terminate"
5136 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5138 self
._write
_ns
_status
(
5141 current_operation
="RUNNING ACTION",
5142 current_operation_id
=nslcmop_id
,
5145 step
= "Getting information from database"
5146 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5147 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5148 if db_nslcmop
["operationParams"].get("primitive_params"):
5149 db_nslcmop
["operationParams"]["primitive_params"] = json
.loads(
5150 db_nslcmop
["operationParams"]["primitive_params"]
5153 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5154 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
5155 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
5156 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
5157 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
5158 primitive
= db_nslcmop
["operationParams"]["primitive"]
5159 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
5160 timeout_ns_action
= db_nslcmop
["operationParams"].get(
5161 "timeout_ns_action", self
.timeout
.primitive
5165 step
= "Getting vnfr from database"
5166 db_vnfr
= self
.db
.get_one(
5167 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
5169 if db_vnfr
.get("kdur"):
5171 for kdur
in db_vnfr
["kdur"]:
5172 if kdur
.get("additionalParams"):
5173 kdur
["additionalParams"] = json
.loads(
5174 kdur
["additionalParams"]
5176 kdur_list
.append(kdur
)
5177 db_vnfr
["kdur"] = kdur_list
5178 step
= "Getting vnfd from database"
5179 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
5181 # Sync filesystem before running a primitive
5182 self
.fs
.sync(db_vnfr
["vnfd-id"])
5184 step
= "Getting nsd from database"
5185 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
5187 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5188 # for backward compatibility
5189 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5190 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5191 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5192 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5194 # look for primitive
5195 config_primitive_desc
= descriptor_configuration
= None
5197 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
5199 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
5201 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
5203 descriptor_configuration
= db_nsd
.get("ns-configuration")
5205 if descriptor_configuration
and descriptor_configuration
.get(
5208 for config_primitive
in descriptor_configuration
["config-primitive"]:
5209 if config_primitive
["name"] == primitive
:
5210 config_primitive_desc
= config_primitive
5213 if not config_primitive_desc
:
5214 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
5216 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
5220 primitive_name
= primitive
5221 ee_descriptor_id
= None
5223 primitive_name
= config_primitive_desc
.get(
5224 "execution-environment-primitive", primitive
5226 ee_descriptor_id
= config_primitive_desc
.get(
5227 "execution-environment-ref"
5233 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
5235 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
5238 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
5240 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
5242 desc_params
= parse_yaml_strings(
5243 db_vnfr
.get("additionalParamsForVnf")
5246 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
5247 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
5248 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
5250 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
5251 actions
.add(primitive
["name"])
5252 for primitive
in kdu_configuration
.get("config-primitive", []):
5253 actions
.add(primitive
["name"])
5255 nsr_deployed
["K8s"],
5256 lambda kdu
: kdu_name
== kdu
["kdu-name"]
5257 and kdu
["member-vnf-index"] == vnf_index
,
5261 if primitive_name
in actions
5262 and kdu
["k8scluster-type"] not in ("helm-chart", "helm-chart-v3")
5266 # TODO check if ns is in a proper status
5268 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
5270 # kdur and desc_params already set from before
5271 if primitive_params
:
5272 desc_params
.update(primitive_params
)
5273 # TODO Check if we will need something at vnf level
5274 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
5276 kdu_name
== kdu
["kdu-name"]
5277 and kdu
["member-vnf-index"] == vnf_index
5282 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
5285 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
5286 msg
= "unknown k8scluster-type '{}'".format(
5287 kdu
.get("k8scluster-type")
5289 raise LcmException(msg
)
5292 "collection": "nsrs",
5293 "filter": {"_id": nsr_id
},
5294 "path": "_admin.deployed.K8s.{}".format(index
),
5298 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
5300 step
= "Executing kdu {}".format(primitive_name
)
5301 if primitive_name
== "upgrade":
5302 if desc_params
.get("kdu_model"):
5303 kdu_model
= desc_params
.get("kdu_model")
5304 del desc_params
["kdu_model"]
5306 kdu_model
= kdu
.get("kdu-model")
5307 parts
= kdu_model
.split(sep
=":")
5309 kdu_model
= parts
[0]
5310 if desc_params
.get("kdu_atomic_upgrade"):
5311 atomic_upgrade
= desc_params
.get(
5312 "kdu_atomic_upgrade"
5313 ).lower() in ("yes", "true", "1")
5314 del desc_params
["kdu_atomic_upgrade"]
5316 atomic_upgrade
= True
5318 detailed_status
= await asyncio
.wait_for(
5319 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
5320 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5321 kdu_instance
=kdu
.get("kdu-instance"),
5322 atomic
=atomic_upgrade
,
5323 kdu_model
=kdu_model
,
5326 timeout
=timeout_ns_action
,
5328 timeout
=timeout_ns_action
+ 10,
5331 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
5333 elif primitive_name
== "rollback":
5334 detailed_status
= await asyncio
.wait_for(
5335 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
5336 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5337 kdu_instance
=kdu
.get("kdu-instance"),
5340 timeout
=timeout_ns_action
,
5342 elif primitive_name
== "status":
5343 detailed_status
= await asyncio
.wait_for(
5344 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
5345 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5346 kdu_instance
=kdu
.get("kdu-instance"),
5349 timeout
=timeout_ns_action
,
5352 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
5353 kdu
["kdu-name"], nsr_id
5355 params
= self
._map
_primitive
_params
(
5356 config_primitive_desc
, primitive_params
, desc_params
5359 detailed_status
= await asyncio
.wait_for(
5360 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
5361 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5362 kdu_instance
=kdu_instance
,
5363 primitive_name
=primitive_name
,
5366 timeout
=timeout_ns_action
,
5369 timeout
=timeout_ns_action
,
5373 nslcmop_operation_state
= "COMPLETED"
5375 detailed_status
= ""
5376 nslcmop_operation_state
= "FAILED"
5378 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5379 nsr_deployed
["VCA"],
5380 member_vnf_index
=vnf_index
,
5382 vdu_count_index
=vdu_count_index
,
5383 ee_descriptor_id
=ee_descriptor_id
,
5385 for vca_index
, vca_deployed
in enumerate(
5386 db_nsr
["_admin"]["deployed"]["VCA"]
5388 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5390 "collection": "nsrs",
5391 "filter": {"_id": nsr_id
},
5392 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
5396 nslcmop_operation_state
,
5398 ) = await self
._ns
_execute
_primitive
(
5400 primitive
=primitive_name
,
5401 primitive_params
=self
._map
_primitive
_params
(
5402 config_primitive_desc
, primitive_params
, desc_params
5404 timeout
=timeout_ns_action
,
5410 db_nslcmop_update
["detailed-status"] = detailed_status
5411 error_description_nslcmop
= (
5412 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
5416 + "Done with result {} {}".format(
5417 nslcmop_operation_state
, detailed_status
5420 return # database update is called inside finally
5422 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5423 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5425 except asyncio
.CancelledError
:
5427 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5429 exc
= "Operation was cancelled"
5430 except asyncio
.TimeoutError
:
5431 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5433 except Exception as e
:
5434 exc
= traceback
.format_exc()
5435 self
.logger
.critical(
5436 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5445 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5446 nslcmop_operation_state
= "FAILED"
5448 self
._write
_ns
_status
(
5452 ], # TODO check if degraded. For the moment use previous status
5453 current_operation
="IDLE",
5454 current_operation_id
=None,
5455 # error_description=error_description_nsr,
5456 # error_detail=error_detail,
5457 other_update
=db_nsr_update
,
5460 self
._write
_op
_status
(
5463 error_message
=error_description_nslcmop
,
5464 operation_state
=nslcmop_operation_state
,
5465 other_update
=db_nslcmop_update
,
5468 if nslcmop_operation_state
:
5470 await self
.msg
.aiowrite(
5475 "nslcmop_id": nslcmop_id
,
5476 "operationState": nslcmop_operation_state
,
5480 except Exception as e
:
5482 logging_text
+ "kafka_write notification Exception {}".format(e
)
5484 self
.logger
.debug(logging_text
+ "Exit")
5485 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5486 return nslcmop_operation_state
, detailed_status
5488 async def terminate_vdus(
5489 self
, db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
5491 """This method terminates VDUs
5494 db_vnfr: VNF instance record
5495 member_vnf_index: VNF index to identify the VDUs to be removed
5496 db_nsr: NS instance record
5497 update_db_nslcmops: Nslcmop update record
5499 vca_scaling_info
= []
5500 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5501 scaling_info
["scaling_direction"] = "IN"
5502 scaling_info
["vdu-delete"] = {}
5503 scaling_info
["kdu-delete"] = {}
5504 db_vdur
= db_vnfr
.get("vdur")
5505 vdur_list
= copy(db_vdur
)
5507 for index
, vdu
in enumerate(vdur_list
):
5508 vca_scaling_info
.append(
5510 "osm_vdu_id": vdu
["vdu-id-ref"],
5511 "member-vnf-index": member_vnf_index
,
5513 "vdu_index": count_index
,
5516 scaling_info
["vdu-delete"][vdu
["vdu-id-ref"]] = count_index
5517 scaling_info
["vdu"].append(
5519 "name": vdu
.get("name") or vdu
.get("vdu-name"),
5520 "vdu_id": vdu
["vdu-id-ref"],
5524 for interface
in vdu
["interfaces"]:
5525 scaling_info
["vdu"][index
]["interface"].append(
5527 "name": interface
["name"],
5528 "ip_address": interface
["ip-address"],
5529 "mac_address": interface
.get("mac-address"),
5532 self
.logger
.info("NS update scaling info{}".format(scaling_info
))
5533 stage
[2] = "Terminating VDUs"
5534 if scaling_info
.get("vdu-delete"):
5535 # scale_process = "RO"
5536 if self
.ro_config
.ng
:
5537 await self
._scale
_ng
_ro
(
5546 async def remove_vnf(self
, nsr_id
, nslcmop_id
, vnf_instance_id
):
5547 """This method is to Remove VNF instances from NS.
5550 nsr_id: NS instance id
5551 nslcmop_id: nslcmop id of update
5552 vnf_instance_id: id of the VNF instance to be removed
5555 result: (str, str) COMPLETED/FAILED, details
5559 logging_text
= "Task ns={} update ".format(nsr_id
)
5560 check_vnfr_count
= len(self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}))
5561 self
.logger
.info("check_vnfr_count {}".format(check_vnfr_count
))
5562 if check_vnfr_count
> 1:
5563 stage
= ["", "", ""]
5564 step
= "Getting nslcmop from database"
5566 step
+ " after having waited for previous tasks to be completed"
5568 # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5569 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5570 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5571 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5572 """ db_vnfr = self.db.get_one(
5573 "vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id}) """
5575 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5576 await self
.terminate_vdus(
5585 constituent_vnfr
= db_nsr
.get("constituent-vnfr-ref")
5586 constituent_vnfr
.remove(db_vnfr
.get("_id"))
5587 db_nsr_update
["constituent-vnfr-ref"] = db_nsr
.get(
5588 "constituent-vnfr-ref"
5590 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5591 self
.db
.del_one("vnfrs", {"_id": db_vnfr
.get("_id")})
5592 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5593 return "COMPLETED", "Done"
5595 step
= "Terminate VNF Failed with"
5597 "{} Cannot terminate the last VNF in this NS.".format(
5601 except (LcmException
, asyncio
.CancelledError
):
5603 except Exception as e
:
5604 self
.logger
.debug("Error removing VNF {}".format(e
))
5605 return "FAILED", "Error removing VNF {}".format(e
)
5607 async def _ns_redeploy_vnf(
5615 """This method updates and redeploys VNF instances
5618 nsr_id: NS instance id
5619 nslcmop_id: nslcmop id
5620 db_vnfd: VNF descriptor
5621 db_vnfr: VNF instance record
5622 db_nsr: NS instance record
5625 result: (str, str) COMPLETED/FAILED, details
5629 stage
= ["", "", ""]
5630 logging_text
= "Task ns={} update ".format(nsr_id
)
5631 latest_vnfd_revision
= db_vnfd
["_admin"].get("revision")
5632 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5634 # Terminate old VNF resources
5635 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5636 await self
.terminate_vdus(
5645 # old_vnfd_id = db_vnfr["vnfd-id"]
5646 # new_db_vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
5647 new_db_vnfd
= db_vnfd
5648 # new_vnfd_ref = new_db_vnfd["id"]
5649 # new_vnfd_id = vnfd_id
5653 for cp
in new_db_vnfd
.get("ext-cpd", ()):
5655 "name": cp
.get("id"),
5656 "connection-point-id": cp
.get("int-cpd", {}).get("cpd"),
5657 "connection-point-vdu-id": cp
.get("int-cpd", {}).get("vdu-id"),
5660 new_vnfr_cp
.append(vnf_cp
)
5661 new_vdur
= update_db_nslcmops
["operationParams"]["newVdur"]
5662 # new_vdur = self._create_vdur_descriptor_from_vnfd(db_nsd, db_vnfd, old_db_vnfd, vnfd_id, db_nsr, member_vnf_index)
5663 # new_vnfr_update = {"vnfd-ref": new_vnfd_ref, "vnfd-id": new_vnfd_id, "connection-point": new_vnfr_cp, "vdur": new_vdur, "ip-address": ""}
5665 "revision": latest_vnfd_revision
,
5666 "connection-point": new_vnfr_cp
,
5670 self
.update_db_2("vnfrs", db_vnfr
["_id"], new_vnfr_update
)
5671 updated_db_vnfr
= self
.db
.get_one(
5673 {"member-vnf-index-ref": member_vnf_index
, "nsr-id-ref": nsr_id
},
5676 # Instantiate new VNF resources
5677 # update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5678 vca_scaling_info
= []
5679 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5680 scaling_info
["scaling_direction"] = "OUT"
5681 scaling_info
["vdu-create"] = {}
5682 scaling_info
["kdu-create"] = {}
5683 vdud_instantiate_list
= db_vnfd
["vdu"]
5684 for index
, vdud
in enumerate(vdud_instantiate_list
):
5685 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(vdud
, db_vnfd
)
5687 additional_params
= (
5688 self
._get
_vdu
_additional
_params
(updated_db_vnfr
, vdud
["id"])
5691 cloud_init_list
= []
5693 # TODO Information of its own ip is not available because db_vnfr is not updated.
5694 additional_params
["OSM"] = get_osm_params(
5695 updated_db_vnfr
, vdud
["id"], 1
5697 cloud_init_list
.append(
5698 self
._parse
_cloud
_init
(
5705 vca_scaling_info
.append(
5707 "osm_vdu_id": vdud
["id"],
5708 "member-vnf-index": member_vnf_index
,
5710 "vdu_index": count_index
,
5713 scaling_info
["vdu-create"][vdud
["id"]] = count_index
5714 if self
.ro_config
.ng
:
5716 "New Resources to be deployed: {}".format(scaling_info
)
5718 await self
._scale
_ng
_ro
(
5726 return "COMPLETED", "Done"
5727 except (LcmException
, asyncio
.CancelledError
):
5729 except Exception as e
:
5730 self
.logger
.debug("Error updating VNF {}".format(e
))
5731 return "FAILED", "Error updating VNF {}".format(e
)
5733 async def _ns_charm_upgrade(
5739 timeout
: float = None,
5741 """This method upgrade charms in VNF instances
5744 ee_id: Execution environment id
5745 path: Local path to the charm
5747 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
5748 timeout: (Float) Timeout for the ns update operation
5751 result: (str, str) COMPLETED/FAILED, details
5754 charm_type
= charm_type
or "lxc_proxy_charm"
5755 output
= await self
.vca_map
[charm_type
].upgrade_charm(
5759 charm_type
=charm_type
,
5760 timeout
=timeout
or self
.timeout
.ns_update
,
5764 return "COMPLETED", output
5766 except (LcmException
, asyncio
.CancelledError
):
5769 except Exception as e
:
5771 self
.logger
.debug("Error upgrading charm {}".format(path
))
5773 return "FAILED", "Error upgrading charm {}: {}".format(path
, e
)
5775 async def update(self
, nsr_id
, nslcmop_id
):
5776 """Update NS according to different update types
5778 This method performs upgrade of VNF instances then updates the revision
5779 number in VNF record
5782 nsr_id: Network service will be updated
5783 nslcmop_id: ns lcm operation id
5786 It may raise DbException, LcmException, N2VCException, K8sException
5789 # Try to lock HA task here
5790 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5791 if not task_is_locked_by_me
:
5794 logging_text
= "Task ns={} update={} ".format(nsr_id
, nslcmop_id
)
5795 self
.logger
.debug(logging_text
+ "Enter")
5797 # Set the required variables to be filled up later
5799 db_nslcmop_update
= {}
5801 nslcmop_operation_state
= None
5803 error_description_nslcmop
= ""
5805 change_type
= "updated"
5806 detailed_status
= ""
5809 # wait for any previous tasks in process
5810 step
= "Waiting for previous operations to terminate"
5811 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5812 self
._write
_ns
_status
(
5815 current_operation
="UPDATING",
5816 current_operation_id
=nslcmop_id
,
5819 step
= "Getting nslcmop from database"
5820 db_nslcmop
= self
.db
.get_one(
5821 "nslcmops", {"_id": nslcmop_id
}, fail_on_empty
=False
5823 update_type
= db_nslcmop
["operationParams"]["updateType"]
5825 step
= "Getting nsr from database"
5826 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5827 old_operational_status
= db_nsr
["operational-status"]
5828 db_nsr_update
["operational-status"] = "updating"
5829 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5830 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5832 if update_type
== "CHANGE_VNFPKG":
5834 # Get the input parameters given through update request
5835 vnf_instance_id
= db_nslcmop
["operationParams"][
5836 "changeVnfPackageData"
5837 ].get("vnfInstanceId")
5839 vnfd_id
= db_nslcmop
["operationParams"]["changeVnfPackageData"].get(
5842 timeout_seconds
= db_nslcmop
["operationParams"].get("timeout_ns_update")
5844 step
= "Getting vnfr from database"
5845 db_vnfr
= self
.db
.get_one(
5846 "vnfrs", {"_id": vnf_instance_id
}, fail_on_empty
=False
5849 step
= "Getting vnfds from database"
5851 latest_vnfd
= self
.db
.get_one(
5852 "vnfds", {"_id": vnfd_id
}, fail_on_empty
=False
5854 latest_vnfd_revision
= latest_vnfd
["_admin"].get("revision")
5857 current_vnf_revision
= db_vnfr
.get("revision", 1)
5858 current_vnfd
= self
.db
.get_one(
5860 {"_id": vnfd_id
+ ":" + str(current_vnf_revision
)},
5861 fail_on_empty
=False,
5863 # Charm artifact paths will be filled up later
5865 current_charm_artifact_path
,
5866 target_charm_artifact_path
,
5867 charm_artifact_paths
,
5869 ) = ([], [], [], [])
5871 step
= "Checking if revision has changed in VNFD"
5872 if current_vnf_revision
!= latest_vnfd_revision
:
5874 change_type
= "policy_updated"
5876 # There is new revision of VNFD, update operation is required
5877 current_vnfd_path
= vnfd_id
+ ":" + str(current_vnf_revision
)
5878 latest_vnfd_path
= vnfd_id
+ ":" + str(latest_vnfd_revision
)
5880 step
= "Removing the VNFD packages if they exist in the local path"
5881 shutil
.rmtree(self
.fs
.path
+ current_vnfd_path
, ignore_errors
=True)
5882 shutil
.rmtree(self
.fs
.path
+ latest_vnfd_path
, ignore_errors
=True)
5884 step
= "Get the VNFD packages from FSMongo"
5885 self
.fs
.sync(from_path
=latest_vnfd_path
)
5886 self
.fs
.sync(from_path
=current_vnfd_path
)
5889 "Get the charm-type, charm-id, ee-id if there is deployed VCA"
5891 current_base_folder
= current_vnfd
["_admin"]["storage"]
5892 latest_base_folder
= latest_vnfd
["_admin"]["storage"]
5894 for vca_index
, vca_deployed
in enumerate(
5895 get_iterable(nsr_deployed
, "VCA")
5897 vnf_index
= db_vnfr
.get("member-vnf-index-ref")
5899 # Getting charm-id and charm-type
5900 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5901 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5902 vca_type
= vca_deployed
.get("type")
5903 vdu_count_index
= vca_deployed
.get("vdu_count_index")
5906 ee_id
= vca_deployed
.get("ee_id")
5908 step
= "Getting descriptor config"
5909 if current_vnfd
.get("kdu"):
5911 search_key
= "kdu_name"
5913 search_key
= "vnfd_id"
5915 entity_id
= vca_deployed
.get(search_key
)
5917 descriptor_config
= get_configuration(
5918 current_vnfd
, entity_id
5921 if "execution-environment-list" in descriptor_config
:
5922 ee_list
= descriptor_config
.get(
5923 "execution-environment-list", []
5928 # There could be several charm used in the same VNF
5929 for ee_item
in ee_list
:
5930 if ee_item
.get("juju"):
5932 step
= "Getting charm name"
5933 charm_name
= ee_item
["juju"].get("charm")
5935 step
= "Setting Charm artifact paths"
5936 current_charm_artifact_path
.append(
5937 get_charm_artifact_path(
5938 current_base_folder
,
5941 current_vnf_revision
,
5944 target_charm_artifact_path
.append(
5945 get_charm_artifact_path(
5949 latest_vnfd_revision
,
5952 elif ee_item
.get("helm-chart"):
5953 # add chart to list and all parameters
5954 step
= "Getting helm chart name"
5955 chart_name
= ee_item
.get("helm-chart")
5957 ee_item
.get("helm-version")
5958 and ee_item
.get("helm-version") == "v2"
5962 vca_type
= "helm-v3"
5963 step
= "Setting Helm chart artifact paths"
5965 helm_artifacts
.append(
5967 "current_artifact_path": get_charm_artifact_path(
5968 current_base_folder
,
5971 current_vnf_revision
,
5973 "target_artifact_path": get_charm_artifact_path(
5977 latest_vnfd_revision
,
5980 "vca_index": vca_index
,
5981 "vdu_index": vdu_count_index
,
5985 charm_artifact_paths
= zip(
5986 current_charm_artifact_path
, target_charm_artifact_path
5989 step
= "Checking if software version has changed in VNFD"
5990 if find_software_version(current_vnfd
) != find_software_version(
5994 step
= "Checking if existing VNF has charm"
5995 for current_charm_path
, target_charm_path
in list(
5996 charm_artifact_paths
5998 if current_charm_path
:
6000 "Software version change is not supported as VNF instance {} has charm.".format(
6005 # There is no change in the charm package, then redeploy the VNF
6006 # based on new descriptor
6007 step
= "Redeploying VNF"
6008 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
6009 (result
, detailed_status
) = await self
._ns
_redeploy
_vnf
(
6010 nsr_id
, nslcmop_id
, latest_vnfd
, db_vnfr
, db_nsr
6012 if result
== "FAILED":
6013 nslcmop_operation_state
= result
6014 error_description_nslcmop
= detailed_status
6015 db_nslcmop_update
["detailed-status"] = detailed_status
6018 + " step {} Done with result {} {}".format(
6019 step
, nslcmop_operation_state
, detailed_status
6024 step
= "Checking if any charm package has changed or not"
6025 for current_charm_path
, target_charm_path
in list(
6026 charm_artifact_paths
6030 and target_charm_path
6031 and self
.check_charm_hash_changed(
6032 current_charm_path
, target_charm_path
6036 step
= "Checking whether VNF uses juju bundle"
6037 if check_juju_bundle_existence(current_vnfd
):
6040 "Charm upgrade is not supported for the instance which"
6041 " uses juju-bundle: {}".format(
6042 check_juju_bundle_existence(current_vnfd
)
6046 step
= "Upgrading Charm"
6050 ) = await self
._ns
_charm
_upgrade
(
6053 charm_type
=vca_type
,
6054 path
=self
.fs
.path
+ target_charm_path
,
6055 timeout
=timeout_seconds
,
6058 if result
== "FAILED":
6059 nslcmop_operation_state
= result
6060 error_description_nslcmop
= detailed_status
6062 db_nslcmop_update
["detailed-status"] = detailed_status
6065 + " step {} Done with result {} {}".format(
6066 step
, nslcmop_operation_state
, detailed_status
6070 step
= "Updating policies"
6071 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
6072 result
= "COMPLETED"
6073 detailed_status
= "Done"
6074 db_nslcmop_update
["detailed-status"] = "Done"
6077 for item
in helm_artifacts
:
6079 item
["current_artifact_path"]
6080 and item
["target_artifact_path"]
6081 and self
.check_charm_hash_changed(
6082 item
["current_artifact_path"],
6083 item
["target_artifact_path"],
6087 db_update_entry
= "_admin.deployed.VCA.{}.".format(
6090 vnfr_id
= db_vnfr
["_id"]
6091 osm_config
= {"osm": {"ns_id": nsr_id
, "vnf_id": vnfr_id
}}
6093 "collection": "nsrs",
6094 "filter": {"_id": nsr_id
},
6095 "path": db_update_entry
,
6097 vca_type
, namespace
, helm_id
= get_ee_id_parts(item
["ee_id"])
6098 await self
.vca_map
[vca_type
].upgrade_execution_environment(
6099 namespace
=namespace
,
6103 artifact_path
=item
["target_artifact_path"],
6106 vnf_id
= db_vnfr
.get("vnfd-ref")
6107 config_descriptor
= get_configuration(latest_vnfd
, vnf_id
)
6108 self
.logger
.debug("get ssh key block")
6112 ("config-access", "ssh-access", "required"),
6114 # Needed to inject a ssh key
6117 ("config-access", "ssh-access", "default-user"),
6120 "Install configuration Software, getting public ssh key"
6122 pub_key
= await self
.vca_map
[
6124 ].get_ee_ssh_public__key(
6125 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
6129 "Insert public key into VM user={} ssh_key={}".format(
6133 self
.logger
.debug(logging_text
+ step
)
6135 # wait for RO (ip-address) Insert pub_key into VM
6136 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
6146 initial_config_primitive_list
= config_descriptor
.get(
6147 "initial-config-primitive"
6149 config_primitive
= next(
6152 for p
in initial_config_primitive_list
6153 if p
["name"] == "config"
6157 if not config_primitive
:
6160 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
6162 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
6163 if db_vnfr
.get("additionalParamsForVnf"):
6164 deploy_params
.update(
6166 db_vnfr
["additionalParamsForVnf"].copy()
6169 primitive_params_
= self
._map
_primitive
_params
(
6170 config_primitive
, {}, deploy_params
6173 step
= "execute primitive '{}' params '{}'".format(
6174 config_primitive
["name"], primitive_params_
6176 self
.logger
.debug(logging_text
+ step
)
6177 await self
.vca_map
[vca_type
].exec_primitive(
6179 primitive_name
=config_primitive
["name"],
6180 params_dict
=primitive_params_
,
6186 step
= "Updating policies"
6187 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
6188 detailed_status
= "Done"
6189 db_nslcmop_update
["detailed-status"] = "Done"
6191 # If nslcmop_operation_state is None, so any operation is not failed.
6192 if not nslcmop_operation_state
:
6193 nslcmop_operation_state
= "COMPLETED"
6195 # If update CHANGE_VNFPKG nslcmop_operation is successful
6196 # vnf revision need to be updated
6197 vnfr_update
["revision"] = latest_vnfd_revision
6198 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
6202 + " task Done with result {} {}".format(
6203 nslcmop_operation_state
, detailed_status
6206 elif update_type
== "REMOVE_VNF":
6207 # This part is included in https://osm.etsi.org/gerrit/11876
6208 vnf_instance_id
= db_nslcmop
["operationParams"]["removeVnfInstanceId"]
6209 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
6210 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
6211 step
= "Removing VNF"
6212 (result
, detailed_status
) = await self
.remove_vnf(
6213 nsr_id
, nslcmop_id
, vnf_instance_id
6215 if result
== "FAILED":
6216 nslcmop_operation_state
= result
6217 error_description_nslcmop
= detailed_status
6218 db_nslcmop_update
["detailed-status"] = detailed_status
6219 change_type
= "vnf_terminated"
6220 if not nslcmop_operation_state
:
6221 nslcmop_operation_state
= "COMPLETED"
6224 + " task Done with result {} {}".format(
6225 nslcmop_operation_state
, detailed_status
6229 elif update_type
== "OPERATE_VNF":
6230 vnf_id
= db_nslcmop
["operationParams"]["operateVnfData"][
6233 operation_type
= db_nslcmop
["operationParams"]["operateVnfData"][
6236 additional_param
= db_nslcmop
["operationParams"]["operateVnfData"][
6239 (result
, detailed_status
) = await self
.rebuild_start_stop(
6240 nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
6242 if result
== "FAILED":
6243 nslcmop_operation_state
= result
6244 error_description_nslcmop
= detailed_status
6245 db_nslcmop_update
["detailed-status"] = detailed_status
6246 if not nslcmop_operation_state
:
6247 nslcmop_operation_state
= "COMPLETED"
6250 + " task Done with result {} {}".format(
6251 nslcmop_operation_state
, detailed_status
6255 # If nslcmop_operation_state is None, so any operation is not failed.
6256 # All operations are executed in overall.
6257 if not nslcmop_operation_state
:
6258 nslcmop_operation_state
= "COMPLETED"
6259 db_nsr_update
["operational-status"] = old_operational_status
6261 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
6262 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
6264 except asyncio
.CancelledError
:
6266 logging_text
+ "Cancelled Exception while '{}'".format(step
)
6268 exc
= "Operation was cancelled"
6269 except asyncio
.TimeoutError
:
6270 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
6272 except Exception as e
:
6273 exc
= traceback
.format_exc()
6274 self
.logger
.critical(
6275 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6284 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
6285 nslcmop_operation_state
= "FAILED"
6286 db_nsr_update
["operational-status"] = old_operational_status
6288 self
._write
_ns
_status
(
6290 ns_state
=db_nsr
["nsState"],
6291 current_operation
="IDLE",
6292 current_operation_id
=None,
6293 other_update
=db_nsr_update
,
6296 self
._write
_op
_status
(
6299 error_message
=error_description_nslcmop
,
6300 operation_state
=nslcmop_operation_state
,
6301 other_update
=db_nslcmop_update
,
6304 if nslcmop_operation_state
:
6308 "nslcmop_id": nslcmop_id
,
6309 "operationState": nslcmop_operation_state
,
6311 if change_type
in ("vnf_terminated", "policy_updated"):
6312 msg
.update({"vnf_member_index": member_vnf_index
})
6313 await self
.msg
.aiowrite("ns", change_type
, msg
, loop
=self
.loop
)
6314 except Exception as e
:
6316 logging_text
+ "kafka_write notification Exception {}".format(e
)
6318 self
.logger
.debug(logging_text
+ "Exit")
6319 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_update")
6320 return nslcmop_operation_state
, detailed_status
6322 async def scale(self
, nsr_id
, nslcmop_id
):
6323 # Try to lock HA task here
6324 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
6325 if not task_is_locked_by_me
:
6328 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
6329 stage
= ["", "", ""]
6330 tasks_dict_info
= {}
6331 # ^ stage, step, VIM progress
6332 self
.logger
.debug(logging_text
+ "Enter")
6333 # get all needed from database
6335 db_nslcmop_update
= {}
6338 # in case of error, indicates what part of scale was failed to put nsr at error status
6339 scale_process
= None
6340 old_operational_status
= ""
6341 old_config_status
= ""
6344 # wait for any previous tasks in process
6345 step
= "Waiting for previous operations to terminate"
6346 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
6347 self
._write
_ns
_status
(
6350 current_operation
="SCALING",
6351 current_operation_id
=nslcmop_id
,
6354 step
= "Getting nslcmop from database"
6356 step
+ " after having waited for previous tasks to be completed"
6358 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
6360 step
= "Getting nsr from database"
6361 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
6362 old_operational_status
= db_nsr
["operational-status"]
6363 old_config_status
= db_nsr
["config-status"]
6365 step
= "Parsing scaling parameters"
6366 db_nsr_update
["operational-status"] = "scaling"
6367 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6368 nsr_deployed
= db_nsr
["_admin"].get("deployed")
6370 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
6372 ]["member-vnf-index"]
6373 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
6375 ]["scaling-group-descriptor"]
6376 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
6377 # for backward compatibility
6378 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
6379 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
6380 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
6381 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6383 step
= "Getting vnfr from database"
6384 db_vnfr
= self
.db
.get_one(
6385 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
6388 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
6390 step
= "Getting vnfd from database"
6391 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
6393 base_folder
= db_vnfd
["_admin"]["storage"]
6395 step
= "Getting scaling-group-descriptor"
6396 scaling_descriptor
= find_in_list(
6397 get_scaling_aspect(db_vnfd
),
6398 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
6400 if not scaling_descriptor
:
6402 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
6403 "at vnfd:scaling-group-descriptor".format(scaling_group
)
6406 step
= "Sending scale order to VIM"
6407 # TODO check if ns is in a proper status
6409 if not db_nsr
["_admin"].get("scaling-group"):
6414 "_admin.scaling-group": [
6415 {"name": scaling_group
, "nb-scale-op": 0}
6419 admin_scale_index
= 0
6421 for admin_scale_index
, admin_scale_info
in enumerate(
6422 db_nsr
["_admin"]["scaling-group"]
6424 if admin_scale_info
["name"] == scaling_group
:
6425 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
6427 else: # not found, set index one plus last element and add new entry with the name
6428 admin_scale_index
+= 1
6430 "_admin.scaling-group.{}.name".format(admin_scale_index
)
6433 vca_scaling_info
= []
6434 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
6435 if scaling_type
== "SCALE_OUT":
6436 if "aspect-delta-details" not in scaling_descriptor
:
6438 "Aspect delta details not fount in scaling descriptor {}".format(
6439 scaling_descriptor
["name"]
6442 # count if max-instance-count is reached
6443 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6445 scaling_info
["scaling_direction"] = "OUT"
6446 scaling_info
["vdu-create"] = {}
6447 scaling_info
["kdu-create"] = {}
6448 for delta
in deltas
:
6449 for vdu_delta
in delta
.get("vdu-delta", {}):
6450 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
6451 # vdu_index also provides the number of instance of the targeted vdu
6452 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6453 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
6457 additional_params
= (
6458 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
6461 cloud_init_list
= []
6463 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6464 max_instance_count
= 10
6465 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
6466 max_instance_count
= vdu_profile
.get(
6467 "max-number-of-instances", 10
6470 default_instance_num
= get_number_of_instances(
6473 instances_number
= vdu_delta
.get("number-of-instances", 1)
6474 nb_scale_op
+= instances_number
6476 new_instance_count
= nb_scale_op
+ default_instance_num
6477 # Control if new count is over max and vdu count is less than max.
6478 # Then assign new instance count
6479 if new_instance_count
> max_instance_count
> vdu_count
:
6480 instances_number
= new_instance_count
- max_instance_count
6482 instances_number
= instances_number
6484 if new_instance_count
> max_instance_count
:
6486 "reached the limit of {} (max-instance-count) "
6487 "scaling-out operations for the "
6488 "scaling-group-descriptor '{}'".format(
6489 nb_scale_op
, scaling_group
6492 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6494 # TODO Information of its own ip is not available because db_vnfr is not updated.
6495 additional_params
["OSM"] = get_osm_params(
6496 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
6498 cloud_init_list
.append(
6499 self
._parse
_cloud
_init
(
6506 vca_scaling_info
.append(
6508 "osm_vdu_id": vdu_delta
["id"],
6509 "member-vnf-index": vnf_index
,
6511 "vdu_index": vdu_index
+ x
,
6514 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
6515 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6516 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6517 kdu_name
= kdu_profile
["kdu-name"]
6518 resource_name
= kdu_profile
.get("resource-name", "")
6520 # Might have different kdus in the same delta
6521 # Should have list for each kdu
6522 if not scaling_info
["kdu-create"].get(kdu_name
, None):
6523 scaling_info
["kdu-create"][kdu_name
] = []
6525 kdur
= get_kdur(db_vnfr
, kdu_name
)
6526 if kdur
.get("helm-chart"):
6527 k8s_cluster_type
= "helm-chart-v3"
6528 self
.logger
.debug("kdur: {}".format(kdur
))
6530 kdur
.get("helm-version")
6531 and kdur
.get("helm-version") == "v2"
6533 k8s_cluster_type
= "helm-chart"
6534 elif kdur
.get("juju-bundle"):
6535 k8s_cluster_type
= "juju-bundle"
6538 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6539 "juju-bundle. Maybe an old NBI version is running".format(
6540 db_vnfr
["member-vnf-index-ref"], kdu_name
6544 max_instance_count
= 10
6545 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
6546 max_instance_count
= kdu_profile
.get(
6547 "max-number-of-instances", 10
6550 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
6551 deployed_kdu
, _
= get_deployed_kdu(
6552 nsr_deployed
, kdu_name
, vnf_index
6554 if deployed_kdu
is None:
6556 "KDU '{}' for vnf '{}' not deployed".format(
6560 kdu_instance
= deployed_kdu
.get("kdu-instance")
6561 instance_num
= await self
.k8scluster_map
[
6567 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6568 kdu_model
=deployed_kdu
.get("kdu-model"),
6570 kdu_replica_count
= instance_num
+ kdu_delta
.get(
6571 "number-of-instances", 1
6574 # Control if new count is over max and instance_num is less than max.
6575 # Then assign max instance number to kdu replica count
6576 if kdu_replica_count
> max_instance_count
> instance_num
:
6577 kdu_replica_count
= max_instance_count
6578 if kdu_replica_count
> max_instance_count
:
6580 "reached the limit of {} (max-instance-count) "
6581 "scaling-out operations for the "
6582 "scaling-group-descriptor '{}'".format(
6583 instance_num
, scaling_group
6587 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6588 vca_scaling_info
.append(
6590 "osm_kdu_id": kdu_name
,
6591 "member-vnf-index": vnf_index
,
6593 "kdu_index": instance_num
+ x
- 1,
6596 scaling_info
["kdu-create"][kdu_name
].append(
6598 "member-vnf-index": vnf_index
,
6600 "k8s-cluster-type": k8s_cluster_type
,
6601 "resource-name": resource_name
,
6602 "scale": kdu_replica_count
,
6605 elif scaling_type
== "SCALE_IN":
6606 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6608 scaling_info
["scaling_direction"] = "IN"
6609 scaling_info
["vdu-delete"] = {}
6610 scaling_info
["kdu-delete"] = {}
6612 for delta
in deltas
:
6613 for vdu_delta
in delta
.get("vdu-delta", {}):
6614 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6615 min_instance_count
= 0
6616 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6617 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
6618 min_instance_count
= vdu_profile
["min-number-of-instances"]
6620 default_instance_num
= get_number_of_instances(
6621 db_vnfd
, vdu_delta
["id"]
6623 instance_num
= vdu_delta
.get("number-of-instances", 1)
6624 nb_scale_op
-= instance_num
6626 new_instance_count
= nb_scale_op
+ default_instance_num
6628 if new_instance_count
< min_instance_count
< vdu_count
:
6629 instances_number
= min_instance_count
- new_instance_count
6631 instances_number
= instance_num
6633 if new_instance_count
< min_instance_count
:
6635 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6636 "scaling-group-descriptor '{}'".format(
6637 nb_scale_op
, scaling_group
6640 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6641 vca_scaling_info
.append(
6643 "osm_vdu_id": vdu_delta
["id"],
6644 "member-vnf-index": vnf_index
,
6646 "vdu_index": vdu_index
- 1 - x
,
6649 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
6650 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6651 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6652 kdu_name
= kdu_profile
["kdu-name"]
6653 resource_name
= kdu_profile
.get("resource-name", "")
6655 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
6656 scaling_info
["kdu-delete"][kdu_name
] = []
6658 kdur
= get_kdur(db_vnfr
, kdu_name
)
6659 if kdur
.get("helm-chart"):
6660 k8s_cluster_type
= "helm-chart-v3"
6661 self
.logger
.debug("kdur: {}".format(kdur
))
6663 kdur
.get("helm-version")
6664 and kdur
.get("helm-version") == "v2"
6666 k8s_cluster_type
= "helm-chart"
6667 elif kdur
.get("juju-bundle"):
6668 k8s_cluster_type
= "juju-bundle"
6671 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6672 "juju-bundle. Maybe an old NBI version is running".format(
6673 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
6677 min_instance_count
= 0
6678 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
6679 min_instance_count
= kdu_profile
["min-number-of-instances"]
6681 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
6682 deployed_kdu
, _
= get_deployed_kdu(
6683 nsr_deployed
, kdu_name
, vnf_index
6685 if deployed_kdu
is None:
6687 "KDU '{}' for vnf '{}' not deployed".format(
6691 kdu_instance
= deployed_kdu
.get("kdu-instance")
6692 instance_num
= await self
.k8scluster_map
[
6698 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6699 kdu_model
=deployed_kdu
.get("kdu-model"),
6701 kdu_replica_count
= instance_num
- kdu_delta
.get(
6702 "number-of-instances", 1
6705 if kdu_replica_count
< min_instance_count
< instance_num
:
6706 kdu_replica_count
= min_instance_count
6707 if kdu_replica_count
< min_instance_count
:
6709 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6710 "scaling-group-descriptor '{}'".format(
6711 instance_num
, scaling_group
6715 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6716 vca_scaling_info
.append(
6718 "osm_kdu_id": kdu_name
,
6719 "member-vnf-index": vnf_index
,
6721 "kdu_index": instance_num
- x
- 1,
6724 scaling_info
["kdu-delete"][kdu_name
].append(
6726 "member-vnf-index": vnf_index
,
6728 "k8s-cluster-type": k8s_cluster_type
,
6729 "resource-name": resource_name
,
6730 "scale": kdu_replica_count
,
6734 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
6735 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
6736 if scaling_info
["scaling_direction"] == "IN":
6737 for vdur
in reversed(db_vnfr
["vdur"]):
6738 if vdu_delete
.get(vdur
["vdu-id-ref"]):
6739 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
6740 scaling_info
["vdu"].append(
6742 "name": vdur
.get("name") or vdur
.get("vdu-name"),
6743 "vdu_id": vdur
["vdu-id-ref"],
6747 for interface
in vdur
["interfaces"]:
6748 scaling_info
["vdu"][-1]["interface"].append(
6750 "name": interface
["name"],
6751 "ip_address": interface
["ip-address"],
6752 "mac_address": interface
.get("mac-address"),
6755 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
6758 step
= "Executing pre-scale vnf-config-primitive"
6759 if scaling_descriptor
.get("scaling-config-action"):
6760 for scaling_config_action
in scaling_descriptor
[
6761 "scaling-config-action"
6764 scaling_config_action
.get("trigger") == "pre-scale-in"
6765 and scaling_type
== "SCALE_IN"
6767 scaling_config_action
.get("trigger") == "pre-scale-out"
6768 and scaling_type
== "SCALE_OUT"
6770 vnf_config_primitive
= scaling_config_action
[
6771 "vnf-config-primitive-name-ref"
6773 step
= db_nslcmop_update
[
6775 ] = "executing pre-scale scaling-config-action '{}'".format(
6776 vnf_config_primitive
6779 # look for primitive
6780 for config_primitive
in (
6781 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6782 ).get("config-primitive", ()):
6783 if config_primitive
["name"] == vnf_config_primitive
:
6787 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
6788 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
6789 "primitive".format(scaling_group
, vnf_config_primitive
)
6792 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6793 if db_vnfr
.get("additionalParamsForVnf"):
6794 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6796 scale_process
= "VCA"
6797 db_nsr_update
["config-status"] = "configuring pre-scaling"
6798 primitive_params
= self
._map
_primitive
_params
(
6799 config_primitive
, {}, vnfr_params
6802 # Pre-scale retry check: Check if this sub-operation has been executed before
6803 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6806 vnf_config_primitive
,
6810 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6811 # Skip sub-operation
6812 result
= "COMPLETED"
6813 result_detail
= "Done"
6816 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6817 vnf_config_primitive
, result
, result_detail
6821 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6822 # New sub-operation: Get index of this sub-operation
6824 len(db_nslcmop
.get("_admin", {}).get("operations"))
6829 + "vnf_config_primitive={} New sub-operation".format(
6830 vnf_config_primitive
6834 # retry: Get registered params for this existing sub-operation
6835 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6838 vnf_index
= op
.get("member_vnf_index")
6839 vnf_config_primitive
= op
.get("primitive")
6840 primitive_params
= op
.get("primitive_params")
6843 + "vnf_config_primitive={} Sub-operation retry".format(
6844 vnf_config_primitive
6847 # Execute the primitive, either with new (first-time) or registered (reintent) args
6848 ee_descriptor_id
= config_primitive
.get(
6849 "execution-environment-ref"
6851 primitive_name
= config_primitive
.get(
6852 "execution-environment-primitive", vnf_config_primitive
6854 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6855 nsr_deployed
["VCA"],
6856 member_vnf_index
=vnf_index
,
6858 vdu_count_index
=None,
6859 ee_descriptor_id
=ee_descriptor_id
,
6861 result
, result_detail
= await self
._ns
_execute
_primitive
(
6870 + "vnf_config_primitive={} Done with result {} {}".format(
6871 vnf_config_primitive
, result
, result_detail
6874 # Update operationState = COMPLETED | FAILED
6875 self
._update
_suboperation
_status
(
6876 db_nslcmop
, op_index
, result
, result_detail
6879 if result
== "FAILED":
6880 raise LcmException(result_detail
)
6881 db_nsr_update
["config-status"] = old_config_status
6882 scale_process
= None
6886 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
6889 "_admin.scaling-group.{}.time".format(admin_scale_index
)
6892 # SCALE-IN VCA - BEGIN
6893 if vca_scaling_info
:
6894 step
= db_nslcmop_update
[
6896 ] = "Deleting the execution environments"
6897 scale_process
= "VCA"
6898 for vca_info
in vca_scaling_info
:
6899 if vca_info
["type"] == "delete" and not vca_info
.get("osm_kdu_id"):
6900 member_vnf_index
= str(vca_info
["member-vnf-index"])
6902 logging_text
+ "vdu info: {}".format(vca_info
)
6904 if vca_info
.get("osm_vdu_id"):
6905 vdu_id
= vca_info
["osm_vdu_id"]
6906 vdu_index
= int(vca_info
["vdu_index"])
6909 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6910 member_vnf_index
, vdu_id
, vdu_index
6912 stage
[2] = step
= "Scaling in VCA"
6913 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6914 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
6915 config_update
= db_nsr
["configurationStatus"]
6916 for vca_index
, vca
in enumerate(vca_update
):
6918 (vca
or vca
.get("ee_id"))
6919 and vca
["member-vnf-index"] == member_vnf_index
6920 and vca
["vdu_count_index"] == vdu_index
6922 if vca
.get("vdu_id"):
6923 config_descriptor
= get_configuration(
6924 db_vnfd
, vca
.get("vdu_id")
6926 elif vca
.get("kdu_name"):
6927 config_descriptor
= get_configuration(
6928 db_vnfd
, vca
.get("kdu_name")
6931 config_descriptor
= get_configuration(
6932 db_vnfd
, db_vnfd
["id"]
6934 operation_params
= (
6935 db_nslcmop
.get("operationParams") or {}
6937 exec_terminate_primitives
= not operation_params
.get(
6938 "skip_terminate_primitives"
6939 ) and vca
.get("needed_terminate")
6940 task
= asyncio
.ensure_future(
6949 exec_primitives
=exec_terminate_primitives
,
6953 timeout
=self
.timeout
.charm_delete
,
6956 tasks_dict_info
[task
] = "Terminating VCA {}".format(
6959 del vca_update
[vca_index
]
6960 del config_update
[vca_index
]
6961 # wait for pending tasks of terminate primitives
6965 + "Waiting for tasks {}".format(
6966 list(tasks_dict_info
.keys())
6969 error_list
= await self
._wait
_for
_tasks
(
6973 self
.timeout
.charm_delete
, self
.timeout
.ns_terminate
6978 tasks_dict_info
.clear()
6980 raise LcmException("; ".join(error_list
))
6982 db_vca_and_config_update
= {
6983 "_admin.deployed.VCA": vca_update
,
6984 "configurationStatus": config_update
,
6987 "nsrs", db_nsr
["_id"], db_vca_and_config_update
6989 scale_process
= None
6990 # SCALE-IN VCA - END
6993 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
6994 scale_process
= "RO"
6995 if self
.ro_config
.ng
:
6996 await self
._scale
_ng
_ro
(
6997 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
6999 scaling_info
.pop("vdu-create", None)
7000 scaling_info
.pop("vdu-delete", None)
7002 scale_process
= None
7006 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
7007 scale_process
= "KDU"
7008 await self
._scale
_kdu
(
7009 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
7011 scaling_info
.pop("kdu-create", None)
7012 scaling_info
.pop("kdu-delete", None)
7014 scale_process
= None
7018 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7020 # SCALE-UP VCA - BEGIN
7021 if vca_scaling_info
:
7022 step
= db_nslcmop_update
[
7024 ] = "Creating new execution environments"
7025 scale_process
= "VCA"
7026 for vca_info
in vca_scaling_info
:
7027 if vca_info
["type"] == "create" and not vca_info
.get("osm_kdu_id"):
7028 member_vnf_index
= str(vca_info
["member-vnf-index"])
7030 logging_text
+ "vdu info: {}".format(vca_info
)
7032 vnfd_id
= db_vnfr
["vnfd-ref"]
7033 if vca_info
.get("osm_vdu_id"):
7034 vdu_index
= int(vca_info
["vdu_index"])
7035 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
7036 if db_vnfr
.get("additionalParamsForVnf"):
7037 deploy_params
.update(
7039 db_vnfr
["additionalParamsForVnf"].copy()
7042 descriptor_config
= get_configuration(
7043 db_vnfd
, db_vnfd
["id"]
7045 if descriptor_config
:
7051 logging_text
=logging_text
7052 + "member_vnf_index={} ".format(member_vnf_index
),
7055 nslcmop_id
=nslcmop_id
,
7061 kdu_index
=kdu_index
,
7062 member_vnf_index
=member_vnf_index
,
7063 vdu_index
=vdu_index
,
7065 deploy_params
=deploy_params
,
7066 descriptor_config
=descriptor_config
,
7067 base_folder
=base_folder
,
7068 task_instantiation_info
=tasks_dict_info
,
7071 vdu_id
= vca_info
["osm_vdu_id"]
7072 vdur
= find_in_list(
7073 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
7075 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
7076 if vdur
.get("additionalParams"):
7077 deploy_params_vdu
= parse_yaml_strings(
7078 vdur
["additionalParams"]
7081 deploy_params_vdu
= deploy_params
7082 deploy_params_vdu
["OSM"] = get_osm_params(
7083 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
7085 if descriptor_config
:
7091 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
7092 member_vnf_index
, vdu_id
, vdu_index
7094 stage
[2] = step
= "Scaling out VCA"
7095 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
7097 logging_text
=logging_text
7098 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
7099 member_vnf_index
, vdu_id
, vdu_index
7103 nslcmop_id
=nslcmop_id
,
7109 member_vnf_index
=member_vnf_index
,
7110 vdu_index
=vdu_index
,
7111 kdu_index
=kdu_index
,
7113 deploy_params
=deploy_params_vdu
,
7114 descriptor_config
=descriptor_config
,
7115 base_folder
=base_folder
,
7116 task_instantiation_info
=tasks_dict_info
,
7119 # SCALE-UP VCA - END
7120 scale_process
= None
7123 # execute primitive service POST-SCALING
7124 step
= "Executing post-scale vnf-config-primitive"
7125 if scaling_descriptor
.get("scaling-config-action"):
7126 for scaling_config_action
in scaling_descriptor
[
7127 "scaling-config-action"
7130 scaling_config_action
.get("trigger") == "post-scale-in"
7131 and scaling_type
== "SCALE_IN"
7133 scaling_config_action
.get("trigger") == "post-scale-out"
7134 and scaling_type
== "SCALE_OUT"
7136 vnf_config_primitive
= scaling_config_action
[
7137 "vnf-config-primitive-name-ref"
7139 step
= db_nslcmop_update
[
7141 ] = "executing post-scale scaling-config-action '{}'".format(
7142 vnf_config_primitive
7145 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
7146 if db_vnfr
.get("additionalParamsForVnf"):
7147 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
7149 # look for primitive
7150 for config_primitive
in (
7151 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
7152 ).get("config-primitive", ()):
7153 if config_primitive
["name"] == vnf_config_primitive
:
7157 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
7158 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
7159 "config-primitive".format(
7160 scaling_group
, vnf_config_primitive
7163 scale_process
= "VCA"
7164 db_nsr_update
["config-status"] = "configuring post-scaling"
7165 primitive_params
= self
._map
_primitive
_params
(
7166 config_primitive
, {}, vnfr_params
7169 # Post-scale retry check: Check if this sub-operation has been executed before
7170 op_index
= self
._check
_or
_add
_scale
_suboperation
(
7173 vnf_config_primitive
,
7177 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
7178 # Skip sub-operation
7179 result
= "COMPLETED"
7180 result_detail
= "Done"
7183 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
7184 vnf_config_primitive
, result
, result_detail
7188 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
7189 # New sub-operation: Get index of this sub-operation
7191 len(db_nslcmop
.get("_admin", {}).get("operations"))
7196 + "vnf_config_primitive={} New sub-operation".format(
7197 vnf_config_primitive
7201 # retry: Get registered params for this existing sub-operation
7202 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
7205 vnf_index
= op
.get("member_vnf_index")
7206 vnf_config_primitive
= op
.get("primitive")
7207 primitive_params
= op
.get("primitive_params")
7210 + "vnf_config_primitive={} Sub-operation retry".format(
7211 vnf_config_primitive
7214 # Execute the primitive, either with new (first-time) or registered (reintent) args
7215 ee_descriptor_id
= config_primitive
.get(
7216 "execution-environment-ref"
7218 primitive_name
= config_primitive
.get(
7219 "execution-environment-primitive", vnf_config_primitive
7221 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
7222 nsr_deployed
["VCA"],
7223 member_vnf_index
=vnf_index
,
7225 vdu_count_index
=None,
7226 ee_descriptor_id
=ee_descriptor_id
,
7228 result
, result_detail
= await self
._ns
_execute
_primitive
(
7237 + "vnf_config_primitive={} Done with result {} {}".format(
7238 vnf_config_primitive
, result
, result_detail
7241 # Update operationState = COMPLETED | FAILED
7242 self
._update
_suboperation
_status
(
7243 db_nslcmop
, op_index
, result
, result_detail
7246 if result
== "FAILED":
7247 raise LcmException(result_detail
)
7248 db_nsr_update
["config-status"] = old_config_status
7249 scale_process
= None
7254 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
7255 db_nsr_update
["operational-status"] = (
7257 if old_operational_status
== "failed"
7258 else old_operational_status
7260 db_nsr_update
["config-status"] = old_config_status
7263 ROclient
.ROClientException
,
7268 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7270 except asyncio
.CancelledError
:
7272 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7274 exc
= "Operation was cancelled"
7275 except Exception as e
:
7276 exc
= traceback
.format_exc()
7277 self
.logger
.critical(
7278 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7282 self
._write
_ns
_status
(
7285 current_operation
="IDLE",
7286 current_operation_id
=None,
7289 stage
[1] = "Waiting for instantiate pending tasks."
7290 self
.logger
.debug(logging_text
+ stage
[1])
7291 exc
= await self
._wait
_for
_tasks
(
7294 self
.timeout
.ns_deploy
,
7302 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7303 nslcmop_operation_state
= "FAILED"
7305 db_nsr_update
["operational-status"] = old_operational_status
7306 db_nsr_update
["config-status"] = old_config_status
7307 db_nsr_update
["detailed-status"] = ""
7309 if "VCA" in scale_process
:
7310 db_nsr_update
["config-status"] = "failed"
7311 if "RO" in scale_process
:
7312 db_nsr_update
["operational-status"] = "failed"
7315 ] = "FAILED scaling nslcmop={} {}: {}".format(
7316 nslcmop_id
, step
, exc
7319 error_description_nslcmop
= None
7320 nslcmop_operation_state
= "COMPLETED"
7321 db_nslcmop_update
["detailed-status"] = "Done"
7323 self
._write
_op
_status
(
7326 error_message
=error_description_nslcmop
,
7327 operation_state
=nslcmop_operation_state
,
7328 other_update
=db_nslcmop_update
,
7331 self
._write
_ns
_status
(
7334 current_operation
="IDLE",
7335 current_operation_id
=None,
7336 other_update
=db_nsr_update
,
7339 if nslcmop_operation_state
:
7343 "nslcmop_id": nslcmop_id
,
7344 "operationState": nslcmop_operation_state
,
7346 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
7347 except Exception as e
:
7349 logging_text
+ "kafka_write notification Exception {}".format(e
)
7351 self
.logger
.debug(logging_text
+ "Exit")
7352 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
7354 async def _scale_kdu(
7355 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
7357 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
7358 for kdu_name
in _scaling_info
:
7359 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
7360 deployed_kdu
, index
= get_deployed_kdu(
7361 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
7363 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
7364 kdu_instance
= deployed_kdu
["kdu-instance"]
7365 kdu_model
= deployed_kdu
.get("kdu-model")
7366 scale
= int(kdu_scaling_info
["scale"])
7367 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
7370 "collection": "nsrs",
7371 "filter": {"_id": nsr_id
},
7372 "path": "_admin.deployed.K8s.{}".format(index
),
7375 step
= "scaling application {}".format(
7376 kdu_scaling_info
["resource-name"]
7378 self
.logger
.debug(logging_text
+ step
)
7380 if kdu_scaling_info
["type"] == "delete":
7381 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7384 and kdu_config
.get("terminate-config-primitive")
7385 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7387 terminate_config_primitive_list
= kdu_config
.get(
7388 "terminate-config-primitive"
7390 terminate_config_primitive_list
.sort(
7391 key
=lambda val
: int(val
["seq"])
7395 terminate_config_primitive
7396 ) in terminate_config_primitive_list
:
7397 primitive_params_
= self
._map
_primitive
_params
(
7398 terminate_config_primitive
, {}, {}
7400 step
= "execute terminate config primitive"
7401 self
.logger
.debug(logging_text
+ step
)
7402 await asyncio
.wait_for(
7403 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7404 cluster_uuid
=cluster_uuid
,
7405 kdu_instance
=kdu_instance
,
7406 primitive_name
=terminate_config_primitive
["name"],
7407 params
=primitive_params_
,
7409 total_timeout
=self
.timeout
.primitive
,
7412 timeout
=self
.timeout
.primitive
7413 * self
.timeout
.primitive_outer_factor
,
7416 await asyncio
.wait_for(
7417 self
.k8scluster_map
[k8s_cluster_type
].scale(
7418 kdu_instance
=kdu_instance
,
7420 resource_name
=kdu_scaling_info
["resource-name"],
7421 total_timeout
=self
.timeout
.scale_on_error
,
7423 cluster_uuid
=cluster_uuid
,
7424 kdu_model
=kdu_model
,
7428 timeout
=self
.timeout
.scale_on_error
7429 * self
.timeout
.scale_on_error_outer_factor
,
7432 if kdu_scaling_info
["type"] == "create":
7433 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7436 and kdu_config
.get("initial-config-primitive")
7437 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7439 initial_config_primitive_list
= kdu_config
.get(
7440 "initial-config-primitive"
7442 initial_config_primitive_list
.sort(
7443 key
=lambda val
: int(val
["seq"])
7446 for initial_config_primitive
in initial_config_primitive_list
:
7447 primitive_params_
= self
._map
_primitive
_params
(
7448 initial_config_primitive
, {}, {}
7450 step
= "execute initial config primitive"
7451 self
.logger
.debug(logging_text
+ step
)
7452 await asyncio
.wait_for(
7453 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7454 cluster_uuid
=cluster_uuid
,
7455 kdu_instance
=kdu_instance
,
7456 primitive_name
=initial_config_primitive
["name"],
7457 params
=primitive_params_
,
7464 async def _scale_ng_ro(
7465 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
7467 nsr_id
= db_nslcmop
["nsInstanceId"]
7468 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7471 # read from db: vnfd's for every vnf
7474 # for each vnf in ns, read vnfd
7475 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
7476 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
7477 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
7478 # if we haven't this vnfd, read it from db
7479 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
7481 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7482 db_vnfds
.append(vnfd
)
7483 n2vc_key
= self
.n2vc
.get_public_key()
7484 n2vc_key_list
= [n2vc_key
]
7487 vdu_scaling_info
.get("vdu-create"),
7488 vdu_scaling_info
.get("vdu-delete"),
7491 # db_vnfr has been updated, update db_vnfrs to use it
7492 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
7493 await self
._instantiate
_ng
_ro
(
7503 start_deploy
=time(),
7504 timeout_ns_deploy
=self
.timeout
.ns_deploy
,
7506 if vdu_scaling_info
.get("vdu-delete"):
7508 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
7511 async def extract_prometheus_scrape_jobs(
7515 ee_config_descriptor
: dict,
7519 vnf_member_index
: str = "",
7521 vdu_index
: int = None,
7523 kdu_index
: int = None,
7525 """Method to extract prometheus scrape jobs from EE's Prometheus template job file
7526 This method will wait until the corresponding VDU or KDU is fully instantiated
7529 ee_id (str): Execution Environment ID
7530 artifact_path (str): Path where the EE's content is (including the Prometheus template file)
7531 ee_config_descriptor (dict): Execution Environment's configuration descriptor
7532 vnfr_id (str): VNFR ID where this EE applies
7533 nsr_id (str): NSR ID where this EE applies
7534 target_ip (str): VDU/KDU instance IP address
7535 vnf_member_index (str, optional): VNF index where this EE applies. Defaults to "".
7536 vdu_id (str, optional): VDU ID where this EE applies. Defaults to "".
7537 vdu_index (int, optional): VDU index where this EE applies. Defaults to None.
7538 kdu_name (str, optional): KDU name where this EE applies. Defaults to "".
7539 kdu_index (int, optional): KDU index where this EE applies. Defaults to None.
7542 LcmException: When the VDU or KDU instance was not found in an hour
7545 _type_: Prometheus jobs
7547 self
.logger
.debug(f
"KDU: {kdu_name}; KDU INDEX: {kdu_index}")
7548 # look if exist a file called 'prometheus*.j2' and
7549 artifact_content
= self
.fs
.dir_ls(artifact_path
)
7553 for f
in artifact_content
7554 if f
.startswith("prometheus") and f
.endswith(".j2")
7560 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
7565 for r
in range(360):
7566 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
7567 if vdu_id
and vdu_index
is not None:
7571 for x
in get_iterable(db_vnfr
, "vdur")
7573 x
.get("vdu-id-ref") == vdu_id
7574 and x
.get("count-index") == vdu_index
7579 if vdur
.get("name"):
7580 vdur_name
= vdur
.get("name")
7582 if kdu_name
and kdu_index
is not None:
7586 for x
in get_iterable(db_vnfr
, "kdur")
7588 x
.get("kdu-name") == kdu_name
7589 and x
.get("count-index") == kdu_index
7594 if kdur
.get("name"):
7595 kdur_name
= kdur
.get("name")
7598 await asyncio
.sleep(10, loop
=self
.loop
)
7600 if vdu_id
and vdu_index
is not None:
7602 f
"Timeout waiting VDU with name={vdu_id} and index={vdu_index} to be intantiated"
7604 if kdu_name
and kdu_index
is not None:
7606 f
"Timeout waiting KDU with name={kdu_name} and index={kdu_index} to be intantiated"
7610 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
7611 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
7613 vnfr_id
= vnfr_id
.replace("-", "")
7615 "JOB_NAME": vnfr_id
,
7616 "TARGET_IP": target_ip
,
7617 "EXPORTER_POD_IP": host_name
,
7618 "EXPORTER_POD_PORT": host_port
,
7620 "VNF_MEMBER_INDEX": vnf_member_index
,
7621 "VDUR_NAME": vdur_name
,
7622 "KDUR_NAME": kdur_name
,
7624 job_list
= parse_job(job_data
, variables
)
7625 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
7626 for job
in job_list
:
7628 not isinstance(job
.get("job_name"), str)
7629 or vnfr_id
not in job
["job_name"]
7631 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
7632 job
["nsr_id"] = nsr_id
7633 job
["vnfr_id"] = vnfr_id
7636 async def rebuild_start_stop(
7637 self
, nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
7639 logging_text
= "Task ns={} {}={} ".format(nsr_id
, operation_type
, nslcmop_id
)
7640 self
.logger
.info(logging_text
+ "Enter")
7641 stage
= ["Preparing the environment", ""]
7642 # database nsrs record
7646 # in case of error, indicates what part of scale was failed to put nsr at error status
7647 start_deploy
= time()
7649 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_id
})
7650 vim_account_id
= db_vnfr
.get("vim-account-id")
7651 vim_info_key
= "vim:" + vim_account_id
7652 vdu_id
= additional_param
["vdu_id"]
7653 vdurs
= [item
for item
in db_vnfr
["vdur"] if item
["vdu-id-ref"] == vdu_id
]
7654 vdur
= find_in_list(
7655 vdurs
, lambda vdu
: vdu
["count-index"] == additional_param
["count-index"]
7658 vdu_vim_name
= vdur
["name"]
7659 vim_vm_id
= vdur
["vim_info"][vim_info_key
]["vim_id"]
7660 target_vim
, _
= next(k_v
for k_v
in vdur
["vim_info"].items())
7662 raise LcmException("Target vdu is not found")
7663 self
.logger
.info("vdu_vim_name >> {} ".format(vdu_vim_name
))
7664 # wait for any previous tasks in process
7665 stage
[1] = "Waiting for previous operations to terminate"
7666 self
.logger
.info(stage
[1])
7667 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7669 stage
[1] = "Reading from database."
7670 self
.logger
.info(stage
[1])
7671 self
._write
_ns
_status
(
7674 current_operation
=operation_type
.upper(),
7675 current_operation_id
=nslcmop_id
,
7677 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7680 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
7681 db_nsr_update
["operational-status"] = operation_type
7682 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7686 "vim_vm_id": vim_vm_id
,
7688 "vdu_index": additional_param
["count-index"],
7689 "vdu_id": vdur
["id"],
7690 "target_vim": target_vim
,
7691 "vim_account_id": vim_account_id
,
7694 stage
[1] = "Sending rebuild request to RO... {}".format(desc
)
7695 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7696 self
.logger
.info("ro nsr id: {}".format(nsr_id
))
7697 result_dict
= await self
.RO
.operate(nsr_id
, desc
, operation_type
)
7698 self
.logger
.info("response from RO: {}".format(result_dict
))
7699 action_id
= result_dict
["action_id"]
7700 await self
._wait
_ng
_ro
(
7705 self
.timeout
.operate
,
7707 "start_stop_rebuild",
7709 return "COMPLETED", "Done"
7710 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7711 self
.logger
.error("Exit Exception {}".format(e
))
7713 except asyncio
.CancelledError
:
7714 self
.logger
.error("Cancelled Exception while '{}'".format(stage
))
7715 exc
= "Operation was cancelled"
7716 except Exception as e
:
7717 exc
= traceback
.format_exc()
7718 self
.logger
.critical(
7719 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7721 return "FAILED", "Error in operate VNF {}".format(exc
)
7723 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7725 Get VCA Cloud and VCA Cloud Credentials for the VIM account
7727 :param: vim_account_id: VIM Account ID
7729 :return: (cloud_name, cloud_credential)
7731 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7732 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
7734 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7736 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
7738 :param: vim_account_id: VIM Account ID
7740 :return: (cloud_name, cloud_credential)
7742 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7743 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")
7745 async def migrate(self
, nsr_id
, nslcmop_id
):
7747 Migrate VNFs and VDUs instances in a NS
7749 :param: nsr_id: NS Instance ID
7750 :param: nslcmop_id: nslcmop ID of migrate
7753 # Try to lock HA task here
7754 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7755 if not task_is_locked_by_me
:
7757 logging_text
= "Task ns={} migrate ".format(nsr_id
)
7758 self
.logger
.debug(logging_text
+ "Enter")
7759 # get all needed from database
7761 db_nslcmop_update
= {}
7762 nslcmop_operation_state
= None
7766 # in case of error, indicates what part of scale was failed to put nsr at error status
7767 start_deploy
= time()
7770 # wait for any previous tasks in process
7771 step
= "Waiting for previous operations to terminate"
7772 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7774 self
._write
_ns
_status
(
7777 current_operation
="MIGRATING",
7778 current_operation_id
=nslcmop_id
,
7780 step
= "Getting nslcmop from database"
7782 step
+ " after having waited for previous tasks to be completed"
7784 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7785 migrate_params
= db_nslcmop
.get("operationParams")
7788 target
.update(migrate_params
)
7789 desc
= await self
.RO
.migrate(nsr_id
, target
)
7790 self
.logger
.debug("RO return > {}".format(desc
))
7791 action_id
= desc
["action_id"]
7792 await self
._wait
_ng
_ro
(
7797 self
.timeout
.migrate
,
7798 operation
="migrate",
7800 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7801 self
.logger
.error("Exit Exception {}".format(e
))
7803 except asyncio
.CancelledError
:
7804 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
7805 exc
= "Operation was cancelled"
7806 except Exception as e
:
7807 exc
= traceback
.format_exc()
7808 self
.logger
.critical(
7809 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7812 self
._write
_ns
_status
(
7815 current_operation
="IDLE",
7816 current_operation_id
=None,
7819 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
7820 nslcmop_operation_state
= "FAILED"
7822 nslcmop_operation_state
= "COMPLETED"
7823 db_nslcmop_update
["detailed-status"] = "Done"
7824 db_nsr_update
["detailed-status"] = "Done"
7826 self
._write
_op
_status
(
7830 operation_state
=nslcmop_operation_state
,
7831 other_update
=db_nslcmop_update
,
7833 if nslcmop_operation_state
:
7837 "nslcmop_id": nslcmop_id
,
7838 "operationState": nslcmop_operation_state
,
7840 await self
.msg
.aiowrite("ns", "migrated", msg
, loop
=self
.loop
)
7841 except Exception as e
:
7843 logging_text
+ "kafka_write notification Exception {}".format(e
)
7845 self
.logger
.debug(logging_text
+ "Exit")
7846 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_migrate")
7848 async def heal(self
, nsr_id
, nslcmop_id
):
7852 :param nsr_id: ns instance to heal
7853 :param nslcmop_id: operation to run
7857 # Try to lock HA task here
7858 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7859 if not task_is_locked_by_me
:
7862 logging_text
= "Task ns={} heal={} ".format(nsr_id
, nslcmop_id
)
7863 stage
= ["", "", ""]
7864 tasks_dict_info
= {}
7865 # ^ stage, step, VIM progress
7866 self
.logger
.debug(logging_text
+ "Enter")
7867 # get all needed from database
7869 db_nslcmop_update
= {}
7871 db_vnfrs
= {} # vnf's info indexed by _id
7873 old_operational_status
= ""
7874 old_config_status
= ""
7877 # wait for any previous tasks in process
7878 step
= "Waiting for previous operations to terminate"
7879 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7880 self
._write
_ns
_status
(
7883 current_operation
="HEALING",
7884 current_operation_id
=nslcmop_id
,
7887 step
= "Getting nslcmop from database"
7889 step
+ " after having waited for previous tasks to be completed"
7891 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7893 step
= "Getting nsr from database"
7894 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
7895 old_operational_status
= db_nsr
["operational-status"]
7896 old_config_status
= db_nsr
["config-status"]
7899 "_admin.deployed.RO.operational-status": "healing",
7901 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7903 step
= "Sending heal order to VIM"
7905 logging_text
=logging_text
,
7907 db_nslcmop
=db_nslcmop
,
7912 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
7913 self
.logger
.debug(logging_text
+ stage
[1])
7914 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7915 self
.fs
.sync(db_nsr
["nsd-id"])
7917 # read from db: vnfr's of this ns
7918 step
= "Getting vnfrs from db"
7919 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
7920 for vnfr
in db_vnfrs_list
:
7921 db_vnfrs
[vnfr
["_id"]] = vnfr
7922 self
.logger
.debug("ns.heal db_vnfrs={}".format(db_vnfrs
))
7924 # Check for each target VNF
7925 target_list
= db_nslcmop
.get("operationParams", {}).get("healVnfData", {})
7926 for target_vnf
in target_list
:
7927 # Find this VNF in the list from DB
7928 vnfr_id
= target_vnf
.get("vnfInstanceId", None)
7930 db_vnfr
= db_vnfrs
[vnfr_id
]
7931 vnfd_id
= db_vnfr
.get("vnfd-id")
7932 vnfd_ref
= db_vnfr
.get("vnfd-ref")
7933 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7934 base_folder
= vnfd
["_admin"]["storage"]
7939 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
7940 member_vnf_index
= db_vnfr
.get("member-vnf-index-ref")
7942 # Check each target VDU and deploy N2VC
7943 target_vdu_list
= target_vnf
.get("additionalParams", {}).get(
7946 if not target_vdu_list
:
7947 # Codigo nuevo para crear diccionario
7948 target_vdu_list
= []
7949 for existing_vdu
in db_vnfr
.get("vdur"):
7950 vdu_name
= existing_vdu
.get("vdu-name", None)
7951 vdu_index
= existing_vdu
.get("count-index", 0)
7952 vdu_run_day1
= target_vnf
.get("additionalParams", {}).get(
7955 vdu_to_be_healed
= {
7957 "count-index": vdu_index
,
7958 "run-day1": vdu_run_day1
,
7960 target_vdu_list
.append(vdu_to_be_healed
)
7961 for target_vdu
in target_vdu_list
:
7962 deploy_params_vdu
= target_vdu
7963 # Set run-day1 vnf level value if not vdu level value exists
7964 if not deploy_params_vdu
.get("run-day1") and target_vnf
[
7967 deploy_params_vdu
["run-day1"] = target_vnf
[
7970 vdu_name
= target_vdu
.get("vdu-id", None)
7971 # TODO: Get vdu_id from vdud.
7973 # For multi instance VDU count-index is mandatory
7974 # For single session VDU count-indes is 0
7975 vdu_index
= target_vdu
.get("count-index", 0)
7977 # n2vc_redesign STEP 3 to 6 Deploy N2VC
7978 stage
[1] = "Deploying Execution Environments."
7979 self
.logger
.debug(logging_text
+ stage
[1])
7981 # VNF Level charm. Normal case when proxy charms.
7982 # If target instance is management machine continue with actions: recreate EE for native charms or reinject juju key for proxy charms.
7983 descriptor_config
= get_configuration(vnfd
, vnfd_ref
)
7984 if descriptor_config
:
7985 # Continue if healed machine is management machine
7986 vnf_ip_address
= db_vnfr
.get("ip-address")
7987 target_instance
= None
7988 for instance
in db_vnfr
.get("vdur", None):
7990 instance
["vdu-name"] == vdu_name
7991 and instance
["count-index"] == vdu_index
7993 target_instance
= instance
7995 if vnf_ip_address
== target_instance
.get("ip-address"):
7997 logging_text
=logging_text
7998 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7999 member_vnf_index
, vdu_name
, vdu_index
8003 nslcmop_id
=nslcmop_id
,
8009 member_vnf_index
=member_vnf_index
,
8012 deploy_params
=deploy_params_vdu
,
8013 descriptor_config
=descriptor_config
,
8014 base_folder
=base_folder
,
8015 task_instantiation_info
=tasks_dict_info
,
8019 # VDU Level charm. Normal case with native charms.
8020 descriptor_config
= get_configuration(vnfd
, vdu_name
)
8021 if descriptor_config
:
8023 logging_text
=logging_text
8024 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
8025 member_vnf_index
, vdu_name
, vdu_index
8029 nslcmop_id
=nslcmop_id
,
8035 member_vnf_index
=member_vnf_index
,
8036 vdu_index
=vdu_index
,
8038 deploy_params
=deploy_params_vdu
,
8039 descriptor_config
=descriptor_config
,
8040 base_folder
=base_folder
,
8041 task_instantiation_info
=tasks_dict_info
,
8046 ROclient
.ROClientException
,
8051 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
8053 except asyncio
.CancelledError
:
8055 logging_text
+ "Cancelled Exception while '{}'".format(step
)
8057 exc
= "Operation was cancelled"
8058 except Exception as e
:
8059 exc
= traceback
.format_exc()
8060 self
.logger
.critical(
8061 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
8066 stage
[1] = "Waiting for healing pending tasks."
8067 self
.logger
.debug(logging_text
+ stage
[1])
8068 exc
= await self
._wait
_for
_tasks
(
8071 self
.timeout
.ns_deploy
,
8079 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
8080 nslcmop_operation_state
= "FAILED"
8082 db_nsr_update
["operational-status"] = old_operational_status
8083 db_nsr_update
["config-status"] = old_config_status
8086 ] = "FAILED healing nslcmop={} {}: {}".format(nslcmop_id
, step
, exc
)
8087 for task
, task_name
in tasks_dict_info
.items():
8088 if not task
.done() or task
.cancelled() or task
.exception():
8089 if task_name
.startswith(self
.task_name_deploy_vca
):
8090 # A N2VC task is pending
8091 db_nsr_update
["config-status"] = "failed"
8093 # RO task is pending
8094 db_nsr_update
["operational-status"] = "failed"
8096 error_description_nslcmop
= None
8097 nslcmop_operation_state
= "COMPLETED"
8098 db_nslcmop_update
["detailed-status"] = "Done"
8099 db_nsr_update
["detailed-status"] = "Done"
8100 db_nsr_update
["operational-status"] = "running"
8101 db_nsr_update
["config-status"] = "configured"
8103 self
._write
_op
_status
(
8106 error_message
=error_description_nslcmop
,
8107 operation_state
=nslcmop_operation_state
,
8108 other_update
=db_nslcmop_update
,
8111 self
._write
_ns
_status
(
8114 current_operation
="IDLE",
8115 current_operation_id
=None,
8116 other_update
=db_nsr_update
,
8119 if nslcmop_operation_state
:
8123 "nslcmop_id": nslcmop_id
,
8124 "operationState": nslcmop_operation_state
,
8126 await self
.msg
.aiowrite("ns", "healed", msg
, loop
=self
.loop
)
8127 except Exception as e
:
8129 logging_text
+ "kafka_write notification Exception {}".format(e
)
8131 self
.logger
.debug(logging_text
+ "Exit")
8132 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_heal")
8143 :param logging_text: preffix text to use at logging
8144 :param nsr_id: nsr identity
8145 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
8146 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
8147 :return: None or exception
8150 def get_vim_account(vim_account_id
):
8152 if vim_account_id
in db_vims
:
8153 return db_vims
[vim_account_id
]
8154 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
8155 db_vims
[vim_account_id
] = db_vim
8160 ns_params
= db_nslcmop
.get("operationParams")
8161 if ns_params
and ns_params
.get("timeout_ns_heal"):
8162 timeout_ns_heal
= ns_params
["timeout_ns_heal"]
8164 timeout_ns_heal
= self
.timeout
.ns_heal
8168 nslcmop_id
= db_nslcmop
["_id"]
8170 "action_id": nslcmop_id
,
8172 self
.logger
.warning(
8173 "db_nslcmop={} and timeout_ns_heal={}".format(
8174 db_nslcmop
, timeout_ns_heal
8177 target
.update(db_nslcmop
.get("operationParams", {}))
8179 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
8180 desc
= await self
.RO
.recreate(nsr_id
, target
)
8181 self
.logger
.debug("RO return > {}".format(desc
))
8182 action_id
= desc
["action_id"]
8183 # waits for RO to complete because Reinjecting juju key at ro can find VM in state Deleted
8184 await self
._wait
_ng
_ro
(
8191 operation
="healing",
8196 "_admin.deployed.RO.operational-status": "running",
8197 "detailed-status": " ".join(stage
),
8199 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
8200 self
._write
_op
_status
(nslcmop_id
, stage
)
8202 logging_text
+ "ns healed at RO. RO_id={}".format(action_id
)
8205 except Exception as e
:
8206 stage
[2] = "ERROR healing at VIM"
8207 # self.set_vnfr_at_error(db_vnfrs, str(e))
8209 "Error healing at VIM {}".format(e
),
8210 exc_info
=not isinstance(
8213 ROclient
.ROClientException
,
8239 task_instantiation_info
,
8242 # launch instantiate_N2VC in a asyncio task and register task object
8243 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
8244 # if not found, create one entry and update database
8245 # fill db_nsr._admin.deployed.VCA.<index>
8248 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
8252 get_charm_name
= False
8253 if "execution-environment-list" in descriptor_config
:
8254 ee_list
= descriptor_config
.get("execution-environment-list", [])
8255 elif "juju" in descriptor_config
:
8256 ee_list
= [descriptor_config
] # ns charms
8257 if "execution-environment-list" not in descriptor_config
:
8258 # charm name is only required for ns charms
8259 get_charm_name
= True
8260 else: # other types as script are not supported
8263 for ee_item
in ee_list
:
8266 + "_deploy_n2vc ee_item juju={}, helm={}".format(
8267 ee_item
.get("juju"), ee_item
.get("helm-chart")
8270 ee_descriptor_id
= ee_item
.get("id")
8271 if ee_item
.get("juju"):
8272 vca_name
= ee_item
["juju"].get("charm")
8274 charm_name
= self
.find_charm_name(db_nsr
, str(vca_name
))
8277 if ee_item
["juju"].get("charm") is not None
8280 if ee_item
["juju"].get("cloud") == "k8s":
8281 vca_type
= "k8s_proxy_charm"
8282 elif ee_item
["juju"].get("proxy") is False:
8283 vca_type
= "native_charm"
8284 elif ee_item
.get("helm-chart"):
8285 vca_name
= ee_item
["helm-chart"]
8286 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
8289 vca_type
= "helm-v3"
8292 logging_text
+ "skipping non juju neither charm configuration"
8297 for vca_index
, vca_deployed
in enumerate(
8298 db_nsr
["_admin"]["deployed"]["VCA"]
8300 if not vca_deployed
:
8303 vca_deployed
.get("member-vnf-index") == member_vnf_index
8304 and vca_deployed
.get("vdu_id") == vdu_id
8305 and vca_deployed
.get("kdu_name") == kdu_name
8306 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
8307 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
8311 # not found, create one.
8313 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
8316 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
8318 target
+= "/kdu/{}".format(kdu_name
)
8320 "target_element": target
,
8321 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
8322 "member-vnf-index": member_vnf_index
,
8324 "kdu_name": kdu_name
,
8325 "vdu_count_index": vdu_index
,
8326 "operational-status": "init", # TODO revise
8327 "detailed-status": "", # TODO revise
8328 "step": "initial-deploy", # TODO revise
8330 "vdu_name": vdu_name
,
8332 "ee_descriptor_id": ee_descriptor_id
,
8333 "charm_name": charm_name
,
8337 # create VCA and configurationStatus in db
8339 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
8340 "configurationStatus.{}".format(vca_index
): dict(),
8342 self
.update_db_2("nsrs", nsr_id
, db_dict
)
8344 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
8346 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
8347 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
8348 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
8351 task_n2vc
= asyncio
.ensure_future(
8353 logging_text
=logging_text
,
8354 vca_index
=vca_index
,
8360 vdu_index
=vdu_index
,
8361 deploy_params
=deploy_params
,
8362 config_descriptor
=descriptor_config
,
8363 base_folder
=base_folder
,
8364 nslcmop_id
=nslcmop_id
,
8368 ee_config_descriptor
=ee_item
,
8371 self
.lcm_tasks
.register(
8375 "instantiate_N2VC-{}".format(vca_index
),
8378 task_instantiation_info
[
8380 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
8381 member_vnf_index
or "", vdu_id
or ""
8384 async def heal_N2VC(
8401 ee_config_descriptor
,
8403 nsr_id
= db_nsr
["_id"]
8404 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
8405 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
8406 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
8407 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
8409 "collection": "nsrs",
8410 "filter": {"_id": nsr_id
},
8411 "path": db_update_entry
,
8417 element_under_configuration
= nsr_id
8421 vnfr_id
= db_vnfr
["_id"]
8422 osm_config
["osm"]["vnf_id"] = vnfr_id
8424 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
8426 if vca_type
== "native_charm":
8429 index_number
= vdu_index
or 0
8432 element_type
= "VNF"
8433 element_under_configuration
= vnfr_id
8434 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
8436 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
8437 element_type
= "VDU"
8438 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
8439 osm_config
["osm"]["vdu_id"] = vdu_id
8441 namespace
+= ".{}".format(kdu_name
)
8442 element_type
= "KDU"
8443 element_under_configuration
= kdu_name
8444 osm_config
["osm"]["kdu_name"] = kdu_name
8447 if base_folder
["pkg-dir"]:
8448 artifact_path
= "{}/{}/{}/{}".format(
8449 base_folder
["folder"],
8450 base_folder
["pkg-dir"],
8453 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8458 artifact_path
= "{}/Scripts/{}/{}/".format(
8459 base_folder
["folder"],
8462 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8467 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
8469 # get initial_config_primitive_list that applies to this element
8470 initial_config_primitive_list
= config_descriptor
.get(
8471 "initial-config-primitive"
8475 "Initial config primitive list > {}".format(
8476 initial_config_primitive_list
8480 # add config if not present for NS charm
8481 ee_descriptor_id
= ee_config_descriptor
.get("id")
8482 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
8483 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
8484 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
8488 "Initial config primitive list #2 > {}".format(
8489 initial_config_primitive_list
8492 # n2vc_redesign STEP 3.1
8493 # find old ee_id if exists
8494 ee_id
= vca_deployed
.get("ee_id")
8496 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
8497 # create or register execution environment in VCA. Only for native charms when healing
8498 if vca_type
== "native_charm":
8499 step
= "Waiting to VM being up and getting IP address"
8500 self
.logger
.debug(logging_text
+ step
)
8501 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8510 credentials
= {"hostname": rw_mgmt_ip
}
8512 username
= deep_get(
8513 config_descriptor
, ("config-access", "ssh-access", "default-user")
8515 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
8516 # merged. Meanwhile let's get username from initial-config-primitive
8517 if not username
and initial_config_primitive_list
:
8518 for config_primitive
in initial_config_primitive_list
:
8519 for param
in config_primitive
.get("parameter", ()):
8520 if param
["name"] == "ssh-username":
8521 username
= param
["value"]
8525 "Cannot determine the username neither with 'initial-config-primitive' nor with "
8526 "'config-access.ssh-access.default-user'"
8528 credentials
["username"] = username
8530 # n2vc_redesign STEP 3.2
8531 # TODO: Before healing at RO it is needed to destroy native charm units to be deleted.
8532 self
._write
_configuration
_status
(
8534 vca_index
=vca_index
,
8535 status
="REGISTERING",
8536 element_under_configuration
=element_under_configuration
,
8537 element_type
=element_type
,
8540 step
= "register execution environment {}".format(credentials
)
8541 self
.logger
.debug(logging_text
+ step
)
8542 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
8543 credentials
=credentials
,
8544 namespace
=namespace
,
8549 # update ee_id en db
8551 "_admin.deployed.VCA.{}.ee_id".format(vca_index
): ee_id
,
8553 self
.update_db_2("nsrs", nsr_id
, db_dict_ee_id
)
8555 # for compatibility with MON/POL modules, the need model and application name at database
8556 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
8557 # Not sure if this need to be done when healing
8559 ee_id_parts = ee_id.split(".")
8560 db_nsr_update = {db_update_entry + "ee_id": ee_id}
8561 if len(ee_id_parts) >= 2:
8562 model_name = ee_id_parts[0]
8563 application_name = ee_id_parts[1]
8564 db_nsr_update[db_update_entry + "model"] = model_name
8565 db_nsr_update[db_update_entry + "application"] = application_name
8568 # n2vc_redesign STEP 3.3
8569 # Install configuration software. Only for native charms.
8570 step
= "Install configuration Software"
8572 self
._write
_configuration
_status
(
8574 vca_index
=vca_index
,
8575 status
="INSTALLING SW",
8576 element_under_configuration
=element_under_configuration
,
8577 element_type
=element_type
,
8578 # other_update=db_nsr_update,
8582 # TODO check if already done
8583 self
.logger
.debug(logging_text
+ step
)
8585 if vca_type
== "native_charm":
8586 config_primitive
= next(
8587 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
8590 if config_primitive
:
8591 config
= self
._map
_primitive
_params
(
8592 config_primitive
, {}, deploy_params
8594 await self
.vca_map
[vca_type
].install_configuration_sw(
8596 artifact_path
=artifact_path
,
8604 # write in db flag of configuration_sw already installed
8606 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
8609 # Not sure if this need to be done when healing
8611 # add relations for this VCA (wait for other peers related with this VCA)
8612 await self._add_vca_relations(
8613 logging_text=logging_text,
8616 vca_index=vca_index,
8620 # if SSH access is required, then get execution environment SSH public
8621 # if native charm we have waited already to VM be UP
8622 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
8625 # self.logger.debug("get ssh key block")
8627 config_descriptor
, ("config-access", "ssh-access", "required")
8629 # self.logger.debug("ssh key needed")
8630 # Needed to inject a ssh key
8633 ("config-access", "ssh-access", "default-user"),
8635 step
= "Install configuration Software, getting public ssh key"
8636 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
8637 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
8640 step
= "Insert public key into VM user={} ssh_key={}".format(
8644 # self.logger.debug("no need to get ssh key")
8645 step
= "Waiting to VM being up and getting IP address"
8646 self
.logger
.debug(logging_text
+ step
)
8648 # n2vc_redesign STEP 5.1
8649 # wait for RO (ip-address) Insert pub_key into VM
8650 # IMPORTANT: We need do wait for RO to complete healing operation.
8651 await self
._wait
_heal
_ro
(nsr_id
, self
.timeout
.ns_heal
)
8654 rw_mgmt_ip
= await self
.wait_kdu_up(
8655 logging_text
, nsr_id
, vnfr_id
, kdu_name
8658 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8668 rw_mgmt_ip
= None # This is for a NS configuration
8670 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
8672 # store rw_mgmt_ip in deploy params for later replacement
8673 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
8676 # get run-day1 operation parameter
8677 runDay1
= deploy_params
.get("run-day1", False)
8679 "Healing vnf={}, vdu={}, runDay1 ={}".format(vnfr_id
, vdu_id
, runDay1
)
8682 # n2vc_redesign STEP 6 Execute initial config primitive
8683 step
= "execute initial config primitive"
8685 # wait for dependent primitives execution (NS -> VNF -> VDU)
8686 if initial_config_primitive_list
:
8687 await self
._wait
_dependent
_n
2vc
(
8688 nsr_id
, vca_deployed_list
, vca_index
8691 # stage, in function of element type: vdu, kdu, vnf or ns
8692 my_vca
= vca_deployed_list
[vca_index
]
8693 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
8695 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
8696 elif my_vca
.get("member-vnf-index"):
8698 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
8701 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
8703 self
._write
_configuration
_status
(
8704 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
8707 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
8709 check_if_terminated_needed
= True
8710 for initial_config_primitive
in initial_config_primitive_list
:
8711 # adding information on the vca_deployed if it is a NS execution environment
8712 if not vca_deployed
["member-vnf-index"]:
8713 deploy_params
["ns_config_info"] = json
.dumps(
8714 self
._get
_ns
_config
_info
(nsr_id
)
8716 # TODO check if already done
8717 primitive_params_
= self
._map
_primitive
_params
(
8718 initial_config_primitive
, {}, deploy_params
8721 step
= "execute primitive '{}' params '{}'".format(
8722 initial_config_primitive
["name"], primitive_params_
8724 self
.logger
.debug(logging_text
+ step
)
8725 await self
.vca_map
[vca_type
].exec_primitive(
8727 primitive_name
=initial_config_primitive
["name"],
8728 params_dict
=primitive_params_
,
8733 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
8734 if check_if_terminated_needed
:
8735 if config_descriptor
.get("terminate-config-primitive"):
8739 {db_update_entry
+ "needed_terminate": True},
8741 check_if_terminated_needed
= False
8743 # TODO register in database that primitive is done
8745 # STEP 7 Configure metrics
8746 # Not sure if this need to be done when healing
8748 if vca_type == "helm" or vca_type == "helm-v3":
8749 prometheus_jobs = await self.extract_prometheus_scrape_jobs(
8751 artifact_path=artifact_path,
8752 ee_config_descriptor=ee_config_descriptor,
8755 target_ip=rw_mgmt_ip,
8761 {db_update_entry + "prometheus_jobs": prometheus_jobs},
8764 for job in prometheus_jobs:
8767 {"job_name": job["job_name"]},
8770 fail_on_empty=False,
8774 step
= "instantiated at VCA"
8775 self
.logger
.debug(logging_text
+ step
)
8777 self
._write
_configuration
_status
(
8778 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
8781 except Exception as e
: # TODO not use Exception but N2VC exception
8782 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
8784 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
8787 "Exception while {} : {}".format(step
, e
), exc_info
=True
8789 self
._write
_configuration
_status
(
8790 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
8792 raise LcmException("{} {}".format(step
, e
)) from e
8794 async def _wait_heal_ro(
8800 while time() <= start_time
+ timeout
:
8801 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
8802 operational_status_ro
= db_nsr
["_admin"]["deployed"]["RO"][
8803 "operational-status"
8805 self
.logger
.debug("Wait Heal RO > {}".format(operational_status_ro
))
8806 if operational_status_ro
!= "healing":
8808 await asyncio
.sleep(15, loop
=self
.loop
)
8809 else: # timeout_ns_deploy
8810 raise NgRoException("Timeout waiting ns to deploy")
8812 async def vertical_scale(self
, nsr_id
, nslcmop_id
):
8814 Vertical Scale the VDUs in a NS
8816 :param: nsr_id: NS Instance ID
8817 :param: nslcmop_id: nslcmop ID of migrate
8820 # Try to lock HA task here
8821 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
8822 if not task_is_locked_by_me
:
8824 logging_text
= "Task ns={} vertical scale ".format(nsr_id
)
8825 self
.logger
.debug(logging_text
+ "Enter")
8826 # get all needed from database
8828 db_nslcmop_update
= {}
8829 nslcmop_operation_state
= None
8833 # in case of error, indicates what part of scale was failed to put nsr at error status
8834 start_deploy
= time()
8837 # wait for any previous tasks in process
8838 step
= "Waiting for previous operations to terminate"
8839 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
8841 self
._write
_ns
_status
(
8844 current_operation
="VerticalScale",
8845 current_operation_id
=nslcmop_id
,
8847 step
= "Getting nslcmop from database"
8849 step
+ " after having waited for previous tasks to be completed"
8851 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
8852 operationParams
= db_nslcmop
.get("operationParams")
8854 target
.update(operationParams
)
8855 desc
= await self
.RO
.vertical_scale(nsr_id
, target
)
8856 self
.logger
.debug("RO return > {}".format(desc
))
8857 action_id
= desc
["action_id"]
8858 await self
._wait
_ng
_ro
(
8863 self
.timeout
.verticalscale
,
8864 operation
="verticalscale",
8866 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
8867 self
.logger
.error("Exit Exception {}".format(e
))
8869 except asyncio
.CancelledError
:
8870 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
8871 exc
= "Operation was cancelled"
8872 except Exception as e
:
8873 exc
= traceback
.format_exc()
8874 self
.logger
.critical(
8875 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
8878 self
._write
_ns
_status
(
8881 current_operation
="IDLE",
8882 current_operation_id
=None,
8885 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
8886 nslcmop_operation_state
= "FAILED"
8888 nslcmop_operation_state
= "COMPLETED"
8889 db_nslcmop_update
["detailed-status"] = "Done"
8890 db_nsr_update
["detailed-status"] = "Done"
8892 self
._write
_op
_status
(
8896 operation_state
=nslcmop_operation_state
,
8897 other_update
=db_nslcmop_update
,
8899 if nslcmop_operation_state
:
8903 "nslcmop_id": nslcmop_id
,
8904 "operationState": nslcmop_operation_state
,
8906 await self
.msg
.aiowrite("ns", "verticalscaled", msg
, loop
=self
.loop
)
8907 except Exception as e
:
8909 logging_text
+ "kafka_write notification Exception {}".format(e
)
8911 self
.logger
.debug(logging_text
+ "Exit")
8912 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_verticalscale")