1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 from typing
import Any
, Dict
, List
24 import logging
.handlers
36 from osm_lcm
import ROclient
37 from osm_lcm
.data_utils
.lcm_config
import LcmCfg
38 from osm_lcm
.data_utils
.nsr
import (
41 get_deployed_vca_list
,
44 from osm_lcm
.data_utils
.vca
import (
53 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
54 from osm_lcm
.lcm_utils
import (
61 check_juju_bundle_existence
,
62 get_charm_artifact_path
,
65 from osm_lcm
.data_utils
.nsd
import (
66 get_ns_configuration_relation_list
,
70 from osm_lcm
.data_utils
.vnfd
import (
76 get_ee_sorted_initial_config_primitive_list
,
77 get_ee_sorted_terminate_config_primitive_list
,
79 get_virtual_link_profiles
,
84 get_number_of_instances
,
86 get_kdu_resource_profile
,
87 find_software_version
,
90 from osm_lcm
.data_utils
.list_utils
import find_in_list
91 from osm_lcm
.data_utils
.vnfr
import (
95 get_volumes_from_instantiation_params
,
97 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
98 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
99 from n2vc
.definitions
import RelationEndpoint
100 from n2vc
.k8s_helm_conn
import K8sHelmConnector
101 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
102 from n2vc
.k8s_juju_conn
import K8sJujuConnector
104 from osm_common
.dbbase
import DbException
105 from osm_common
.fsbase
import FsException
107 from osm_lcm
.data_utils
.database
.database
import Database
108 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
109 from osm_lcm
.data_utils
.wim
import (
111 get_target_wim_attrs
,
112 select_feasible_wim_account
,
115 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
116 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
118 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
119 from osm_lcm
.osm_config
import OsmConfigBuilder
120 from osm_lcm
.prometheus
import parse_job
122 from copy
import copy
, deepcopy
123 from time
import time
124 from uuid
import uuid4
126 from random
import randint
128 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
131 class NsLcm(LcmBase
):
132 SUBOPERATION_STATUS_NOT_FOUND
= -1
133 SUBOPERATION_STATUS_NEW
= -2
134 SUBOPERATION_STATUS_SKIP
= -3
135 task_name_deploy_vca
= "Deploying VCA"
137 def __init__(self
, msg
, lcm_tasks
, config
: LcmCfg
, loop
):
139 Init, Connect to database, filesystem storage, and messaging
140 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
143 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
145 self
.db
= Database().instance
.db
146 self
.fs
= Filesystem().instance
.fs
148 self
.lcm_tasks
= lcm_tasks
149 self
.timeout
= config
.timeout
150 self
.ro_config
= config
.RO
151 self
.vca_config
= config
.VCA
153 # create N2VC connector
154 self
.n2vc
= N2VCJujuConnector(
157 on_update_db
=self
._on
_update
_n
2vc
_db
,
162 self
.conn_helm_ee
= LCMHelmConn(
165 vca_config
=self
.vca_config
,
166 on_update_db
=self
._on
_update
_n
2vc
_db
,
169 self
.k8sclusterhelm2
= K8sHelmConnector(
170 kubectl_command
=self
.vca_config
.kubectlpath
,
171 helm_command
=self
.vca_config
.helmpath
,
178 self
.k8sclusterhelm3
= K8sHelm3Connector(
179 kubectl_command
=self
.vca_config
.kubectlpath
,
180 helm_command
=self
.vca_config
.helm3path
,
187 self
.k8sclusterjuju
= K8sJujuConnector(
188 kubectl_command
=self
.vca_config
.kubectlpath
,
189 juju_command
=self
.vca_config
.jujupath
,
192 on_update_db
=self
._on
_update
_k
8s
_db
,
197 self
.k8scluster_map
= {
198 "helm-chart": self
.k8sclusterhelm2
,
199 "helm-chart-v3": self
.k8sclusterhelm3
,
200 "chart": self
.k8sclusterhelm3
,
201 "juju-bundle": self
.k8sclusterjuju
,
202 "juju": self
.k8sclusterjuju
,
206 "lxc_proxy_charm": self
.n2vc
,
207 "native_charm": self
.n2vc
,
208 "k8s_proxy_charm": self
.n2vc
,
209 "helm": self
.conn_helm_ee
,
210 "helm-v3": self
.conn_helm_ee
,
214 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
.to_dict())
216 self
.op_status_map
= {
217 "instantiation": self
.RO
.status
,
218 "termination": self
.RO
.status
,
219 "migrate": self
.RO
.status
,
220 "healing": self
.RO
.recreate_status
,
221 "verticalscale": self
.RO
.status
,
222 "start_stop_rebuild": self
.RO
.status
,
226 def increment_ip_mac(ip_mac
, vm_index
=1):
227 if not isinstance(ip_mac
, str):
230 # try with ipv4 look for last dot
231 i
= ip_mac
.rfind(".")
234 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
235 # try with ipv6 or mac look for last colon. Operate in hex
236 i
= ip_mac
.rfind(":")
239 # format in hex, len can be 2 for mac or 4 for ipv6
240 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
241 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
247 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
249 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
252 # TODO filter RO descriptor fields...
256 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
257 db_dict
["deploymentStatus"] = ro_descriptor
258 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
260 except Exception as e
:
262 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id
, e
)
265 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
267 # remove last dot from path (if exists)
268 if path
.endswith("."):
271 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
272 # .format(table, filter, path, updated_data))
275 nsr_id
= filter.get("_id")
277 # read ns record from database
278 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
279 current_ns_status
= nsr
.get("nsState")
281 # get vca status for NS
282 status_dict
= await self
.n2vc
.get_status(
283 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
288 db_dict
["vcaStatus"] = status_dict
290 # update configurationStatus for this VCA
292 vca_index
= int(path
[path
.rfind(".") + 1 :])
295 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
297 vca_status
= vca_list
[vca_index
].get("status")
299 configuration_status_list
= nsr
.get("configurationStatus")
300 config_status
= configuration_status_list
[vca_index
].get("status")
302 if config_status
== "BROKEN" and vca_status
!= "failed":
303 db_dict
["configurationStatus"][vca_index
] = "READY"
304 elif config_status
!= "BROKEN" and vca_status
== "failed":
305 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
306 except Exception as e
:
307 # not update configurationStatus
308 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
310 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
311 # if nsState = 'DEGRADED' check if all is OK
313 if current_ns_status
in ("READY", "DEGRADED"):
314 error_description
= ""
316 if status_dict
.get("machines"):
317 for machine_id
in status_dict
.get("machines"):
318 machine
= status_dict
.get("machines").get(machine_id
)
319 # check machine agent-status
320 if machine
.get("agent-status"):
321 s
= machine
.get("agent-status").get("status")
324 error_description
+= (
325 "machine {} agent-status={} ; ".format(
329 # check machine instance status
330 if machine
.get("instance-status"):
331 s
= machine
.get("instance-status").get("status")
334 error_description
+= (
335 "machine {} instance-status={} ; ".format(
340 if status_dict
.get("applications"):
341 for app_id
in status_dict
.get("applications"):
342 app
= status_dict
.get("applications").get(app_id
)
343 # check application status
344 if app
.get("status"):
345 s
= app
.get("status").get("status")
348 error_description
+= (
349 "application {} status={} ; ".format(app_id
, s
)
352 if error_description
:
353 db_dict
["errorDescription"] = error_description
354 if current_ns_status
== "READY" and is_degraded
:
355 db_dict
["nsState"] = "DEGRADED"
356 if current_ns_status
== "DEGRADED" and not is_degraded
:
357 db_dict
["nsState"] = "READY"
360 self
.update_db_2("nsrs", nsr_id
, db_dict
)
362 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
364 except Exception as e
:
365 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
367 async def _on_update_k8s_db(
368 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None, cluster_type
="juju"
371 Updating vca status in NSR record
372 :param cluster_uuid: UUID of a k8s cluster
373 :param kdu_instance: The unique name of the KDU instance
374 :param filter: To get nsr_id
375 :cluster_type: The cluster type (juju, k8s)
379 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
380 # .format(cluster_uuid, kdu_instance, filter))
382 nsr_id
= filter.get("_id")
384 vca_status
= await self
.k8scluster_map
[cluster_type
].status_kdu(
385 cluster_uuid
=cluster_uuid
,
386 kdu_instance
=kdu_instance
,
388 complete_status
=True,
394 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
397 f
"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
401 self
.update_db_2("nsrs", nsr_id
, db_dict
)
402 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
404 except Exception as e
:
405 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
408 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
411 undefined
=StrictUndefined
,
412 autoescape
=select_autoescape(default_for_string
=True, default
=True),
414 template
= env
.from_string(cloud_init_text
)
415 return template
.render(additional_params
or {})
416 except UndefinedError
as e
:
418 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
419 "file, must be provided in the instantiation parameters inside the "
420 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
422 except (TemplateError
, TemplateNotFound
) as e
:
424 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
429 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
430 cloud_init_content
= cloud_init_file
= None
432 if vdu
.get("cloud-init-file"):
433 base_folder
= vnfd
["_admin"]["storage"]
434 if base_folder
["pkg-dir"]:
435 cloud_init_file
= "{}/{}/cloud_init/{}".format(
436 base_folder
["folder"],
437 base_folder
["pkg-dir"],
438 vdu
["cloud-init-file"],
441 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
442 base_folder
["folder"],
443 vdu
["cloud-init-file"],
445 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
446 cloud_init_content
= ci_file
.read()
447 elif vdu
.get("cloud-init"):
448 cloud_init_content
= vdu
["cloud-init"]
450 return cloud_init_content
451 except FsException
as e
:
453 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
454 vnfd
["id"], vdu
["id"], cloud_init_file
, e
458 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
460 (vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]), {}
462 additional_params
= vdur
.get("additionalParams")
463 return parse_yaml_strings(additional_params
)
465 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
467 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
468 :param vnfd: input vnfd
469 :param new_id: overrides vnf id if provided
470 :param additionalParams: Instantiation params for VNFs provided
471 :param nsrId: Id of the NSR
472 :return: copy of vnfd
474 vnfd_RO
= deepcopy(vnfd
)
475 # remove unused by RO configuration, monitoring, scaling and internal keys
476 vnfd_RO
.pop("_id", None)
477 vnfd_RO
.pop("_admin", None)
478 vnfd_RO
.pop("monitoring-param", None)
479 vnfd_RO
.pop("scaling-group-descriptor", None)
480 vnfd_RO
.pop("kdu", None)
481 vnfd_RO
.pop("k8s-cluster", None)
483 vnfd_RO
["id"] = new_id
485 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
486 for vdu
in get_iterable(vnfd_RO
, "vdu"):
487 vdu
.pop("cloud-init-file", None)
488 vdu
.pop("cloud-init", None)
492 def ip_profile_2_RO(ip_profile
):
493 RO_ip_profile
= deepcopy(ip_profile
)
494 if "dns-server" in RO_ip_profile
:
495 if isinstance(RO_ip_profile
["dns-server"], list):
496 RO_ip_profile
["dns-address"] = []
497 for ds
in RO_ip_profile
.pop("dns-server"):
498 RO_ip_profile
["dns-address"].append(ds
["address"])
500 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
501 if RO_ip_profile
.get("ip-version") == "ipv4":
502 RO_ip_profile
["ip-version"] = "IPv4"
503 if RO_ip_profile
.get("ip-version") == "ipv6":
504 RO_ip_profile
["ip-version"] = "IPv6"
505 if "dhcp-params" in RO_ip_profile
:
506 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
509 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
510 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
511 if db_vim
["_admin"]["operationalState"] != "ENABLED":
513 "VIM={} is not available. operationalState={}".format(
514 vim_account
, db_vim
["_admin"]["operationalState"]
517 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
520 def get_ro_wim_id_for_wim_account(self
, wim_account
):
521 if isinstance(wim_account
, str):
522 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
523 if db_wim
["_admin"]["operationalState"] != "ENABLED":
525 "WIM={} is not available. operationalState={}".format(
526 wim_account
, db_wim
["_admin"]["operationalState"]
529 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
534 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
536 db_vdu_push_list
= []
538 db_update
= {"_admin.modified": time()}
540 for vdu_id
, vdu_count
in vdu_create
.items():
544 for vdur
in reversed(db_vnfr
["vdur"])
545 if vdur
["vdu-id-ref"] == vdu_id
550 # Read the template saved in the db:
552 "No vdur in the database. Using the vdur-template to scale"
554 vdur_template
= db_vnfr
.get("vdur-template")
555 if not vdur_template
:
557 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
561 vdur
= vdur_template
[0]
562 # Delete a template from the database after using it
565 {"_id": db_vnfr
["_id"]},
567 pull
={"vdur-template": {"_id": vdur
["_id"]}},
569 for count
in range(vdu_count
):
570 vdur_copy
= deepcopy(vdur
)
571 vdur_copy
["status"] = "BUILD"
572 vdur_copy
["status-detailed"] = None
573 vdur_copy
["ip-address"] = None
574 vdur_copy
["_id"] = str(uuid4())
575 vdur_copy
["count-index"] += count
+ 1
576 vdur_copy
["id"] = "{}-{}".format(
577 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
579 vdur_copy
.pop("vim_info", None)
580 for iface
in vdur_copy
["interfaces"]:
581 if iface
.get("fixed-ip"):
582 iface
["ip-address"] = self
.increment_ip_mac(
583 iface
["ip-address"], count
+ 1
586 iface
.pop("ip-address", None)
587 if iface
.get("fixed-mac"):
588 iface
["mac-address"] = self
.increment_ip_mac(
589 iface
["mac-address"], count
+ 1
592 iface
.pop("mac-address", None)
596 ) # only first vdu can be managment of vnf
597 db_vdu_push_list
.append(vdur_copy
)
598 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
600 if len(db_vnfr
["vdur"]) == 1:
601 # The scale will move to 0 instances
603 "Scaling to 0 !, creating the template with the last vdur"
605 template_vdur
= [db_vnfr
["vdur"][0]]
606 for vdu_id
, vdu_count
in vdu_delete
.items():
608 indexes_to_delete
= [
610 for iv
in enumerate(db_vnfr
["vdur"])
611 if iv
[1]["vdu-id-ref"] == vdu_id
615 "vdur.{}.status".format(i
): "DELETING"
616 for i
in indexes_to_delete
[-vdu_count
:]
620 # it must be deleted one by one because common.db does not allow otherwise
623 for v
in reversed(db_vnfr
["vdur"])
624 if v
["vdu-id-ref"] == vdu_id
626 for vdu
in vdus_to_delete
[:vdu_count
]:
629 {"_id": db_vnfr
["_id"]},
631 pull
={"vdur": {"_id": vdu
["_id"]}},
635 db_push
["vdur"] = db_vdu_push_list
637 db_push
["vdur-template"] = template_vdur
640 db_vnfr
["vdur-template"] = template_vdur
641 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
642 # modify passed dictionary db_vnfr
643 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
644 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
646 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
648 Updates database nsr with the RO info for the created vld
649 :param ns_update_nsr: dictionary to be filled with the updated info
650 :param db_nsr: content of db_nsr. This is also modified
651 :param nsr_desc_RO: nsr descriptor from RO
652 :return: Nothing, LcmException is raised on errors
655 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
656 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
657 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
659 vld
["vim-id"] = net_RO
.get("vim_net_id")
660 vld
["name"] = net_RO
.get("vim_name")
661 vld
["status"] = net_RO
.get("status")
662 vld
["status-detailed"] = net_RO
.get("error_msg")
663 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
667 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
670 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
672 for db_vnfr
in db_vnfrs
.values():
673 vnfr_update
= {"status": "ERROR"}
674 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
675 if "status" not in vdur
:
676 vdur
["status"] = "ERROR"
677 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
679 vdur
["status-detailed"] = str(error_text
)
681 "vdur.{}.status-detailed".format(vdu_index
)
683 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
684 except DbException
as e
:
685 self
.logger
.error("Cannot update vnf. {}".format(e
))
687 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
689 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
690 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
691 :param nsr_desc_RO: nsr descriptor from RO
692 :return: Nothing, LcmException is raised on errors
694 for vnf_index
, db_vnfr
in db_vnfrs
.items():
695 for vnf_RO
in nsr_desc_RO
["vnfs"]:
696 if vnf_RO
["member_vnf_index"] != vnf_index
:
699 if vnf_RO
.get("ip_address"):
700 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
[
703 elif not db_vnfr
.get("ip-address"):
704 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
705 raise LcmExceptionNoMgmtIP(
706 "ns member_vnf_index '{}' has no IP address".format(
711 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
712 vdur_RO_count_index
= 0
713 if vdur
.get("pdu-type"):
715 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
716 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
718 if vdur
["count-index"] != vdur_RO_count_index
:
719 vdur_RO_count_index
+= 1
721 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
722 if vdur_RO
.get("ip_address"):
723 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
725 vdur
["ip-address"] = None
726 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
727 vdur
["name"] = vdur_RO
.get("vim_name")
728 vdur
["status"] = vdur_RO
.get("status")
729 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
730 for ifacer
in get_iterable(vdur
, "interfaces"):
731 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
732 if ifacer
["name"] == interface_RO
.get("internal_name"):
733 ifacer
["ip-address"] = interface_RO
.get(
736 ifacer
["mac-address"] = interface_RO
.get(
742 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
743 "from VIM info".format(
744 vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]
747 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
751 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
753 vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]
757 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
758 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
759 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
761 vld
["vim-id"] = net_RO
.get("vim_net_id")
762 vld
["name"] = net_RO
.get("vim_name")
763 vld
["status"] = net_RO
.get("status")
764 vld
["status-detailed"] = net_RO
.get("error_msg")
765 vnfr_update
["vld.{}".format(vld_index
)] = vld
769 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
774 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
779 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
784 def _get_ns_config_info(self
, nsr_id
):
786 Generates a mapping between vnf,vdu elements and the N2VC id
787 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
788 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
789 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
790 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
792 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
793 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
795 ns_config_info
= {"osm-config-mapping": mapping
}
796 for vca
in vca_deployed_list
:
797 if not vca
["member-vnf-index"]:
799 if not vca
["vdu_id"]:
800 mapping
[vca
["member-vnf-index"]] = vca
["application"]
804 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
806 ] = vca
["application"]
807 return ns_config_info
809 async def _instantiate_ng_ro(
826 def get_vim_account(vim_account_id
):
828 if vim_account_id
in db_vims
:
829 return db_vims
[vim_account_id
]
830 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
831 db_vims
[vim_account_id
] = db_vim
834 # modify target_vld info with instantiation parameters
835 def parse_vld_instantiation_params(
836 target_vim
, target_vld
, vld_params
, target_sdn
838 if vld_params
.get("ip-profile"):
839 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
[
842 if vld_params
.get("provider-network"):
843 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
846 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
847 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
851 # check if WIM is needed; if needed, choose a feasible WIM able to connect VIMs
852 # if wim_account_id is specified in vld_params, validate if it is feasible.
853 wim_account_id
, db_wim
= select_feasible_wim_account(
854 db_nsr
, db_vnfrs
, target_vld
, vld_params
, self
.logger
858 # WIM is needed and a feasible one was found, populate WIM target and SDN ports
859 self
.logger
.info("WIM selected: {:s}".format(str(wim_account_id
)))
860 # update vld_params with correct WIM account Id
861 vld_params
["wimAccountId"] = wim_account_id
863 target_wim
= "wim:{}".format(wim_account_id
)
864 target_wim_attrs
= get_target_wim_attrs(nsr_id
, target_vld
, vld_params
)
865 sdn_ports
= get_sdn_ports(vld_params
, db_wim
)
866 if len(sdn_ports
) > 0:
867 target_vld
["vim_info"][target_wim
] = target_wim_attrs
868 target_vld
["vim_info"][target_wim
]["sdn-ports"] = sdn_ports
871 "Target VLD with WIM data: {:s}".format(str(target_vld
))
874 for param
in ("vim-network-name", "vim-network-id"):
875 if vld_params
.get(param
):
876 if isinstance(vld_params
[param
], dict):
877 for vim
, vim_net
in vld_params
[param
].items():
878 other_target_vim
= "vim:" + vim
880 target_vld
["vim_info"],
881 (other_target_vim
, param
.replace("-", "_")),
884 else: # isinstance str
885 target_vld
["vim_info"][target_vim
][
886 param
.replace("-", "_")
887 ] = vld_params
[param
]
888 if vld_params
.get("common_id"):
889 target_vld
["common_id"] = vld_params
.get("common_id")
891 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
892 def update_ns_vld_target(target
, ns_params
):
893 for vnf_params
in ns_params
.get("vnf", ()):
894 if vnf_params
.get("vimAccountId"):
898 for vnfr
in db_vnfrs
.values()
899 if vnf_params
["member-vnf-index"]
900 == vnfr
["member-vnf-index-ref"]
904 vdur
= next((vdur
for vdur
in target_vnf
.get("vdur", ())), None)
907 for a_index
, a_vld
in enumerate(target
["ns"]["vld"]):
908 target_vld
= find_in_list(
909 get_iterable(vdur
, "interfaces"),
910 lambda iface
: iface
.get("ns-vld-id") == a_vld
["name"],
913 vld_params
= find_in_list(
914 get_iterable(ns_params
, "vld"),
915 lambda v_vld
: v_vld
["name"] in (a_vld
["name"], a_vld
["id"]),
919 if vnf_params
.get("vimAccountId") not in a_vld
.get(
922 target_vim_network_list
= [
923 v
for _
, v
in a_vld
.get("vim_info").items()
925 target_vim_network_name
= next(
927 item
.get("vim_network_name", "")
928 for item
in target_vim_network_list
933 target
["ns"]["vld"][a_index
].get("vim_info").update(
935 "vim:{}".format(vnf_params
["vimAccountId"]): {
936 "vim_network_name": target_vim_network_name
,
942 for param
in ("vim-network-name", "vim-network-id"):
943 if vld_params
.get(param
) and isinstance(
944 vld_params
[param
], dict
946 for vim
, vim_net
in vld_params
[
949 other_target_vim
= "vim:" + vim
951 target
["ns"]["vld"][a_index
].get(
956 param
.replace("-", "_"),
961 nslcmop_id
= db_nslcmop
["_id"]
963 "name": db_nsr
["name"],
966 "image": deepcopy(db_nsr
["image"]),
967 "flavor": deepcopy(db_nsr
["flavor"]),
968 "action_id": nslcmop_id
,
969 "cloud_init_content": {},
971 for image
in target
["image"]:
972 image
["vim_info"] = {}
973 for flavor
in target
["flavor"]:
974 flavor
["vim_info"] = {}
975 if db_nsr
.get("affinity-or-anti-affinity-group"):
976 target
["affinity-or-anti-affinity-group"] = deepcopy(
977 db_nsr
["affinity-or-anti-affinity-group"]
979 for affinity_or_anti_affinity_group
in target
[
980 "affinity-or-anti-affinity-group"
982 affinity_or_anti_affinity_group
["vim_info"] = {}
984 if db_nslcmop
.get("lcmOperationType") != "instantiate":
985 # get parameters of instantiation:
986 db_nslcmop_instantiate
= self
.db
.get_list(
989 "nsInstanceId": db_nslcmop
["nsInstanceId"],
990 "lcmOperationType": "instantiate",
993 ns_params
= db_nslcmop_instantiate
.get("operationParams")
995 ns_params
= db_nslcmop
.get("operationParams")
996 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
997 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
1000 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
1001 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
1004 "name": vld
["name"],
1005 "mgmt-network": vld
.get("mgmt-network", False),
1006 "type": vld
.get("type"),
1009 "vim_network_name": vld
.get("vim-network-name"),
1010 "vim_account_id": ns_params
["vimAccountId"],
1014 # check if this network needs SDN assist
1015 if vld
.get("pci-interfaces"):
1016 db_vim
= get_vim_account(ns_params
["vimAccountId"])
1017 sdnc_id
= db_vim
["config"].get("sdn-controller")
1019 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
1020 target_sdn
= "sdn:{}".format(sdnc_id
)
1021 target_vld
["vim_info"][target_sdn
] = {
1023 "target_vim": target_vim
,
1025 "type": vld
.get("type"),
1028 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
1029 for nsd_vnf_profile
in nsd_vnf_profiles
:
1030 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
1031 if cp
["virtual-link-profile-id"] == vld
["id"]:
1033 "member_vnf:{}.{}".format(
1034 cp
["constituent-cpd-id"][0][
1035 "constituent-base-element-id"
1037 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
1039 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
1041 # check at nsd descriptor, if there is an ip-profile
1043 nsd_vlp
= find_in_list(
1044 get_virtual_link_profiles(nsd
),
1045 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
1050 and nsd_vlp
.get("virtual-link-protocol-data")
1051 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1053 ip_profile_source_data
= nsd_vlp
["virtual-link-protocol-data"][
1056 ip_profile_dest_data
= {}
1057 if "ip-version" in ip_profile_source_data
:
1058 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1061 if "cidr" in ip_profile_source_data
:
1062 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1065 if "gateway-ip" in ip_profile_source_data
:
1066 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
[
1069 if "dhcp-enabled" in ip_profile_source_data
:
1070 ip_profile_dest_data
["dhcp-params"] = {
1071 "enabled": ip_profile_source_data
["dhcp-enabled"]
1073 vld_params
["ip-profile"] = ip_profile_dest_data
1075 # update vld_params with instantiation params
1076 vld_instantiation_params
= find_in_list(
1077 get_iterable(ns_params
, "vld"),
1078 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
1080 if vld_instantiation_params
:
1081 vld_params
.update(vld_instantiation_params
)
1082 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
1083 target
["ns"]["vld"].append(target_vld
)
1084 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1085 update_ns_vld_target(target
, ns_params
)
1087 for vnfr
in db_vnfrs
.values():
1088 vnfd
= find_in_list(
1089 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
1091 vnf_params
= find_in_list(
1092 get_iterable(ns_params
, "vnf"),
1093 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
1095 target_vnf
= deepcopy(vnfr
)
1096 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
1097 for vld
in target_vnf
.get("vld", ()):
1098 # check if connected to a ns.vld, to fill target'
1099 vnf_cp
= find_in_list(
1100 vnfd
.get("int-virtual-link-desc", ()),
1101 lambda cpd
: cpd
.get("id") == vld
["id"],
1104 ns_cp
= "member_vnf:{}.{}".format(
1105 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
1107 if cp2target
.get(ns_cp
):
1108 vld
["target"] = cp2target
[ns_cp
]
1111 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
1113 # check if this network needs SDN assist
1115 if vld
.get("pci-interfaces"):
1116 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1117 sdnc_id
= db_vim
["config"].get("sdn-controller")
1119 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
1120 target_sdn
= "sdn:{}".format(sdnc_id
)
1121 vld
["vim_info"][target_sdn
] = {
1123 "target_vim": target_vim
,
1125 "type": vld
.get("type"),
1128 # check at vnfd descriptor, if there is an ip-profile
1130 vnfd_vlp
= find_in_list(
1131 get_virtual_link_profiles(vnfd
),
1132 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
1136 and vnfd_vlp
.get("virtual-link-protocol-data")
1137 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1139 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"][
1142 ip_profile_dest_data
= {}
1143 if "ip-version" in ip_profile_source_data
:
1144 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1147 if "cidr" in ip_profile_source_data
:
1148 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1151 if "gateway-ip" in ip_profile_source_data
:
1152 ip_profile_dest_data
[
1154 ] = ip_profile_source_data
["gateway-ip"]
1155 if "dhcp-enabled" in ip_profile_source_data
:
1156 ip_profile_dest_data
["dhcp-params"] = {
1157 "enabled": ip_profile_source_data
["dhcp-enabled"]
1160 vld_params
["ip-profile"] = ip_profile_dest_data
1161 # update vld_params with instantiation params
1163 vld_instantiation_params
= find_in_list(
1164 get_iterable(vnf_params
, "internal-vld"),
1165 lambda i_vld
: i_vld
["name"] == vld
["id"],
1167 if vld_instantiation_params
:
1168 vld_params
.update(vld_instantiation_params
)
1169 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
1172 for vdur
in target_vnf
.get("vdur", ()):
1173 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
1174 continue # This vdu must not be created
1175 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
1177 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
1180 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
1181 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
1184 and vdu_configuration
.get("config-access")
1185 and vdu_configuration
.get("config-access").get("ssh-access")
1187 vdur
["ssh-keys"] = ssh_keys_all
1188 vdur
["ssh-access-required"] = vdu_configuration
[
1190 ]["ssh-access"]["required"]
1193 and vnf_configuration
.get("config-access")
1194 and vnf_configuration
.get("config-access").get("ssh-access")
1195 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
1197 vdur
["ssh-keys"] = ssh_keys_all
1198 vdur
["ssh-access-required"] = vnf_configuration
[
1200 ]["ssh-access"]["required"]
1201 elif ssh_keys_instantiation
and find_in_list(
1202 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
1204 vdur
["ssh-keys"] = ssh_keys_instantiation
1206 self
.logger
.debug("NS > vdur > {}".format(vdur
))
1208 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1210 if vdud
.get("cloud-init-file"):
1211 vdur
["cloud-init"] = "{}:file:{}".format(
1212 vnfd
["_id"], vdud
.get("cloud-init-file")
1214 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1215 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1216 base_folder
= vnfd
["_admin"]["storage"]
1217 if base_folder
["pkg-dir"]:
1218 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1219 base_folder
["folder"],
1220 base_folder
["pkg-dir"],
1221 vdud
.get("cloud-init-file"),
1224 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
1225 base_folder
["folder"],
1226 vdud
.get("cloud-init-file"),
1228 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1229 target
["cloud_init_content"][
1232 elif vdud
.get("cloud-init"):
1233 vdur
["cloud-init"] = "{}:vdu:{}".format(
1234 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1236 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1237 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1240 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1241 deploy_params_vdu
= self
._format
_additional
_params
(
1242 vdur
.get("additionalParams") or {}
1244 deploy_params_vdu
["OSM"] = get_osm_params(
1245 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1247 vdur
["additionalParams"] = deploy_params_vdu
1250 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1251 if target_vim
not in ns_flavor
["vim_info"]:
1252 ns_flavor
["vim_info"][target_vim
] = {}
1255 # in case alternative images are provided we must check if they should be applied
1256 # for the vim_type, modify the vim_type taking into account
1257 ns_image_id
= int(vdur
["ns-image-id"])
1258 if vdur
.get("alt-image-ids"):
1259 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1260 vim_type
= db_vim
["vim_type"]
1261 for alt_image_id
in vdur
.get("alt-image-ids"):
1262 ns_alt_image
= target
["image"][int(alt_image_id
)]
1263 if vim_type
== ns_alt_image
.get("vim-type"):
1264 # must use alternative image
1266 "use alternative image id: {}".format(alt_image_id
)
1268 ns_image_id
= alt_image_id
1269 vdur
["ns-image-id"] = ns_image_id
1271 ns_image
= target
["image"][int(ns_image_id
)]
1272 if target_vim
not in ns_image
["vim_info"]:
1273 ns_image
["vim_info"][target_vim
] = {}
1276 if vdur
.get("affinity-or-anti-affinity-group-id"):
1277 for ags_id
in vdur
["affinity-or-anti-affinity-group-id"]:
1278 ns_ags
= target
["affinity-or-anti-affinity-group"][int(ags_id
)]
1279 if target_vim
not in ns_ags
["vim_info"]:
1280 ns_ags
["vim_info"][target_vim
] = {}
1282 vdur
["vim_info"] = {target_vim
: {}}
1283 # instantiation parameters
1285 vdu_instantiation_params
= find_in_list(
1286 get_iterable(vnf_params
, "vdu"),
1287 lambda i_vdu
: i_vdu
["id"] == vdud
["id"],
1289 if vdu_instantiation_params
:
1290 # Parse the vdu_volumes from the instantiation params
1291 vdu_volumes
= get_volumes_from_instantiation_params(
1292 vdu_instantiation_params
, vdud
1294 vdur
["additionalParams"]["OSM"]["vdu_volumes"] = vdu_volumes
1295 vdur_list
.append(vdur
)
1296 target_vnf
["vdur"] = vdur_list
1297 target
["vnf"].append(target_vnf
)
1299 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
1300 desc
= await self
.RO
.deploy(nsr_id
, target
)
1301 self
.logger
.debug("RO return > {}".format(desc
))
1302 action_id
= desc
["action_id"]
1303 await self
._wait
_ng
_ro
(
1310 operation
="instantiation",
1315 "_admin.deployed.RO.operational-status": "running",
1316 "detailed-status": " ".join(stage
),
1318 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1319 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1320 self
._write
_op
_status
(nslcmop_id
, stage
)
1322 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1326 async def _wait_ng_ro(
1336 detailed_status_old
= None
1338 start_time
= start_time
or time()
1339 while time() <= start_time
+ timeout
:
1340 desc_status
= await self
.op_status_map
[operation
](nsr_id
, action_id
)
1341 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1342 if desc_status
["status"] == "FAILED":
1343 raise NgRoException(desc_status
["details"])
1344 elif desc_status
["status"] == "BUILD":
1346 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1347 elif desc_status
["status"] == "DONE":
1349 stage
[2] = "Deployed at VIM"
1352 assert False, "ROclient.check_ns_status returns unknown {}".format(
1353 desc_status
["status"]
1355 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1356 detailed_status_old
= stage
[2]
1357 db_nsr_update
["detailed-status"] = " ".join(stage
)
1358 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1359 self
._write
_op
_status
(nslcmop_id
, stage
)
1360 await asyncio
.sleep(15, loop
=self
.loop
)
1361 else: # timeout_ns_deploy
1362 raise NgRoException("Timeout waiting ns to deploy")
1364 async def _terminate_ng_ro(
1365 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1370 start_deploy
= time()
1377 "action_id": nslcmop_id
,
1379 desc
= await self
.RO
.deploy(nsr_id
, target
)
1380 action_id
= desc
["action_id"]
1381 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1382 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1385 + "ns terminate action at RO. action_id={}".format(action_id
)
1389 delete_timeout
= 20 * 60 # 20 minutes
1390 await self
._wait
_ng
_ro
(
1397 operation
="termination",
1400 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1401 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1403 await self
.RO
.delete(nsr_id
)
1404 except Exception as e
:
1405 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
1406 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1407 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1408 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1410 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1412 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
1413 failed_detail
.append("delete conflict: {}".format(e
))
1416 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1419 failed_detail
.append("delete error: {}".format(e
))
1422 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1426 stage
[2] = "Error deleting from VIM"
1428 stage
[2] = "Deleted from VIM"
1429 db_nsr_update
["detailed-status"] = " ".join(stage
)
1430 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1431 self
._write
_op
_status
(nslcmop_id
, stage
)
1434 raise LcmException("; ".join(failed_detail
))
1437 async def instantiate_RO(
1451 :param logging_text: preffix text to use at logging
1452 :param nsr_id: nsr identity
1453 :param nsd: database content of ns descriptor
1454 :param db_nsr: database content of ns record
1455 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1457 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1458 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1459 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1460 :return: None or exception
1463 start_deploy
= time()
1464 ns_params
= db_nslcmop
.get("operationParams")
1465 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1466 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1468 timeout_ns_deploy
= self
.timeout
.ns_deploy
1470 # Check for and optionally request placement optimization. Database will be updated if placement activated
1471 stage
[2] = "Waiting for Placement."
1472 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1473 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1474 for vnfr
in db_vnfrs
.values():
1475 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1478 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1480 return await self
._instantiate
_ng
_ro
(
1493 except Exception as e
:
1494 stage
[2] = "ERROR deploying at VIM"
1495 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1497 "Error deploying at VIM {}".format(e
),
1498 exc_info
=not isinstance(
1501 ROclient
.ROClientException
,
1510 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1512 Wait for kdu to be up, get ip address
1513 :param logging_text: prefix use for logging
1517 :return: IP address, K8s services
1520 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1523 while nb_tries
< 360:
1524 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1528 for x
in get_iterable(db_vnfr
, "kdur")
1529 if x
.get("kdu-name") == kdu_name
1535 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1537 if kdur
.get("status"):
1538 if kdur
["status"] in ("READY", "ENABLED"):
1539 return kdur
.get("ip-address"), kdur
.get("services")
1542 "target KDU={} is in error state".format(kdu_name
)
1545 await asyncio
.sleep(10, loop
=self
.loop
)
1547 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1549 async def wait_vm_up_insert_key_ro(
1550 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1553 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1554 :param logging_text: prefix use for logging
1559 :param pub_key: public ssh key to inject, None to skip
1560 :param user: user to apply the public ssh key
1564 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1568 target_vdu_id
= None
1574 if ro_retries
>= 360: # 1 hour
1576 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1579 await asyncio
.sleep(10, loop
=self
.loop
)
1582 if not target_vdu_id
:
1583 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1585 if not vdu_id
: # for the VNF case
1586 if db_vnfr
.get("status") == "ERROR":
1588 "Cannot inject ssh-key because target VNF is in error state"
1590 ip_address
= db_vnfr
.get("ip-address")
1596 for x
in get_iterable(db_vnfr
, "vdur")
1597 if x
.get("ip-address") == ip_address
1605 for x
in get_iterable(db_vnfr
, "vdur")
1606 if x
.get("vdu-id-ref") == vdu_id
1607 and x
.get("count-index") == vdu_index
1613 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1614 ): # If only one, this should be the target vdu
1615 vdur
= db_vnfr
["vdur"][0]
1618 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1619 vnfr_id
, vdu_id
, vdu_index
1622 # New generation RO stores information at "vim_info"
1625 if vdur
.get("vim_info"):
1627 t
for t
in vdur
["vim_info"]
1628 ) # there should be only one key
1629 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1631 vdur
.get("pdu-type")
1632 or vdur
.get("status") == "ACTIVE"
1633 or ng_ro_status
== "ACTIVE"
1635 ip_address
= vdur
.get("ip-address")
1638 target_vdu_id
= vdur
["vdu-id-ref"]
1639 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1641 "Cannot inject ssh-key because target VM is in error state"
1644 if not target_vdu_id
:
1647 # inject public key into machine
1648 if pub_key
and user
:
1649 self
.logger
.debug(logging_text
+ "Inserting RO key")
1650 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1651 if vdur
.get("pdu-type"):
1652 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1655 ro_vm_id
= "{}-{}".format(
1656 db_vnfr
["member-vnf-index-ref"], target_vdu_id
1657 ) # TODO add vdu_index
1658 if self
.ro_config
.ng
:
1661 "action": "inject_ssh_key",
1665 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1667 desc
= await self
.RO
.deploy(nsr_id
, target
)
1668 action_id
= desc
["action_id"]
1669 await self
._wait
_ng
_ro
(
1670 nsr_id
, action_id
, timeout
=600, operation
="instantiation"
1674 # wait until NS is deployed at RO
1676 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1677 ro_nsr_id
= deep_get(
1678 db_nsrs
, ("_admin", "deployed", "RO", "nsr_id")
1682 result_dict
= await self
.RO
.create_action(
1684 item_id_name
=ro_nsr_id
,
1686 "add_public_key": pub_key
,
1691 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1692 if not result_dict
or not isinstance(result_dict
, dict):
1694 "Unknown response from RO when injecting key"
1696 for result
in result_dict
.values():
1697 if result
.get("vim_result") == 200:
1700 raise ROclient
.ROClientException(
1701 "error injecting key: {}".format(
1702 result
.get("description")
1706 except NgRoException
as e
:
1708 "Reaching max tries injecting key. Error: {}".format(e
)
1710 except ROclient
.ROClientException
as e
:
1714 + "error injecting key: {}. Retrying until {} seconds".format(
1721 "Reaching max tries injecting key. Error: {}".format(e
)
1728 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1730 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1732 my_vca
= vca_deployed_list
[vca_index
]
1733 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1734 # vdu or kdu: no dependencies
1738 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1739 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1740 configuration_status_list
= db_nsr
["configurationStatus"]
1741 for index
, vca_deployed
in enumerate(configuration_status_list
):
1742 if index
== vca_index
:
1745 if not my_vca
.get("member-vnf-index") or (
1746 vca_deployed
.get("member-vnf-index")
1747 == my_vca
.get("member-vnf-index")
1749 internal_status
= configuration_status_list
[index
].get("status")
1750 if internal_status
== "READY":
1752 elif internal_status
== "BROKEN":
1754 "Configuration aborted because dependent charm/s has failed"
1759 # no dependencies, return
1761 await asyncio
.sleep(10)
1764 raise LcmException("Configuration aborted because dependent charm/s timeout")
1766 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1769 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1771 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1772 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1775 async def instantiate_N2VC(
1792 ee_config_descriptor
,
1794 nsr_id
= db_nsr
["_id"]
1795 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1796 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1797 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1798 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1800 "collection": "nsrs",
1801 "filter": {"_id": nsr_id
},
1802 "path": db_update_entry
,
1808 element_under_configuration
= nsr_id
1812 vnfr_id
= db_vnfr
["_id"]
1813 osm_config
["osm"]["vnf_id"] = vnfr_id
1815 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1817 if vca_type
== "native_charm":
1820 index_number
= vdu_index
or 0
1823 element_type
= "VNF"
1824 element_under_configuration
= vnfr_id
1825 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1827 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1828 element_type
= "VDU"
1829 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1830 osm_config
["osm"]["vdu_id"] = vdu_id
1832 namespace
+= ".{}".format(kdu_name
)
1833 element_type
= "KDU"
1834 element_under_configuration
= kdu_name
1835 osm_config
["osm"]["kdu_name"] = kdu_name
1838 if base_folder
["pkg-dir"]:
1839 artifact_path
= "{}/{}/{}/{}".format(
1840 base_folder
["folder"],
1841 base_folder
["pkg-dir"],
1844 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1849 artifact_path
= "{}/Scripts/{}/{}/".format(
1850 base_folder
["folder"],
1853 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1858 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1860 # get initial_config_primitive_list that applies to this element
1861 initial_config_primitive_list
= config_descriptor
.get(
1862 "initial-config-primitive"
1866 "Initial config primitive list > {}".format(
1867 initial_config_primitive_list
1871 # add config if not present for NS charm
1872 ee_descriptor_id
= ee_config_descriptor
.get("id")
1873 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1874 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1875 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1879 "Initial config primitive list #2 > {}".format(
1880 initial_config_primitive_list
1883 # n2vc_redesign STEP 3.1
1884 # find old ee_id if exists
1885 ee_id
= vca_deployed
.get("ee_id")
1887 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1888 # create or register execution environment in VCA
1889 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1891 self
._write
_configuration
_status
(
1893 vca_index
=vca_index
,
1895 element_under_configuration
=element_under_configuration
,
1896 element_type
=element_type
,
1899 step
= "create execution environment"
1900 self
.logger
.debug(logging_text
+ step
)
1904 if vca_type
== "k8s_proxy_charm":
1905 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1906 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1907 namespace
=namespace
,
1908 artifact_path
=artifact_path
,
1912 elif vca_type
== "helm" or vca_type
== "helm-v3":
1913 ee_id
, credentials
= await self
.vca_map
[
1915 ].create_execution_environment(
1916 namespace
=namespace
,
1920 artifact_path
=artifact_path
,
1921 chart_model
=vca_name
,
1925 ee_id
, credentials
= await self
.vca_map
[
1927 ].create_execution_environment(
1928 namespace
=namespace
,
1934 elif vca_type
== "native_charm":
1935 step
= "Waiting to VM being up and getting IP address"
1936 self
.logger
.debug(logging_text
+ step
)
1937 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1946 credentials
= {"hostname": rw_mgmt_ip
}
1948 username
= deep_get(
1949 config_descriptor
, ("config-access", "ssh-access", "default-user")
1951 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1952 # merged. Meanwhile let's get username from initial-config-primitive
1953 if not username
and initial_config_primitive_list
:
1954 for config_primitive
in initial_config_primitive_list
:
1955 for param
in config_primitive
.get("parameter", ()):
1956 if param
["name"] == "ssh-username":
1957 username
= param
["value"]
1961 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1962 "'config-access.ssh-access.default-user'"
1964 credentials
["username"] = username
1965 # n2vc_redesign STEP 3.2
1967 self
._write
_configuration
_status
(
1969 vca_index
=vca_index
,
1970 status
="REGISTERING",
1971 element_under_configuration
=element_under_configuration
,
1972 element_type
=element_type
,
1975 step
= "register execution environment {}".format(credentials
)
1976 self
.logger
.debug(logging_text
+ step
)
1977 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1978 credentials
=credentials
,
1979 namespace
=namespace
,
1984 # for compatibility with MON/POL modules, the need model and application name at database
1985 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1986 ee_id_parts
= ee_id
.split(".")
1987 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1988 if len(ee_id_parts
) >= 2:
1989 model_name
= ee_id_parts
[0]
1990 application_name
= ee_id_parts
[1]
1991 db_nsr_update
[db_update_entry
+ "model"] = model_name
1992 db_nsr_update
[db_update_entry
+ "application"] = application_name
1994 # n2vc_redesign STEP 3.3
1995 step
= "Install configuration Software"
1997 self
._write
_configuration
_status
(
1999 vca_index
=vca_index
,
2000 status
="INSTALLING SW",
2001 element_under_configuration
=element_under_configuration
,
2002 element_type
=element_type
,
2003 other_update
=db_nsr_update
,
2006 # TODO check if already done
2007 self
.logger
.debug(logging_text
+ step
)
2009 if vca_type
== "native_charm":
2010 config_primitive
= next(
2011 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
2014 if config_primitive
:
2015 config
= self
._map
_primitive
_params
(
2016 config_primitive
, {}, deploy_params
2019 if vca_type
== "lxc_proxy_charm":
2020 if element_type
== "NS":
2021 num_units
= db_nsr
.get("config-units") or 1
2022 elif element_type
== "VNF":
2023 num_units
= db_vnfr
.get("config-units") or 1
2024 elif element_type
== "VDU":
2025 for v
in db_vnfr
["vdur"]:
2026 if vdu_id
== v
["vdu-id-ref"]:
2027 num_units
= v
.get("config-units") or 1
2029 if vca_type
!= "k8s_proxy_charm":
2030 await self
.vca_map
[vca_type
].install_configuration_sw(
2032 artifact_path
=artifact_path
,
2035 num_units
=num_units
,
2040 # write in db flag of configuration_sw already installed
2042 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
2045 # add relations for this VCA (wait for other peers related with this VCA)
2046 await self
._add
_vca
_relations
(
2047 logging_text
=logging_text
,
2050 vca_index
=vca_index
,
2053 # if SSH access is required, then get execution environment SSH public
2054 # if native charm we have waited already to VM be UP
2055 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
2058 # self.logger.debug("get ssh key block")
2060 config_descriptor
, ("config-access", "ssh-access", "required")
2062 # self.logger.debug("ssh key needed")
2063 # Needed to inject a ssh key
2066 ("config-access", "ssh-access", "default-user"),
2068 step
= "Install configuration Software, getting public ssh key"
2069 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
2070 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
2073 step
= "Insert public key into VM user={} ssh_key={}".format(
2077 # self.logger.debug("no need to get ssh key")
2078 step
= "Waiting to VM being up and getting IP address"
2079 self
.logger
.debug(logging_text
+ step
)
2081 # default rw_mgmt_ip to None, avoiding the non definition of the variable
2084 # n2vc_redesign STEP 5.1
2085 # wait for RO (ip-address) Insert pub_key into VM
2088 rw_mgmt_ip
, services
= await self
.wait_kdu_up(
2089 logging_text
, nsr_id
, vnfr_id
, kdu_name
2091 vnfd
= self
.db
.get_one(
2093 {"_id": f
'{db_vnfr["vnfd-id"]}:{db_vnfr["revision"]}'},
2095 kdu
= get_kdu(vnfd
, kdu_name
)
2097 service
["name"] for service
in get_kdu_services(kdu
)
2099 exposed_services
= []
2100 for service
in services
:
2101 if any(s
in service
["name"] for s
in kdu_services
):
2102 exposed_services
.append(service
)
2103 await self
.vca_map
[vca_type
].exec_primitive(
2105 primitive_name
="config",
2107 "osm-config": json
.dumps(
2109 k8s
={"services": exposed_services
}
2116 # This verification is needed in order to avoid trying to add a public key
2117 # to a VM, when the VNF is a KNF (in the edge case where the user creates a VCA
2118 # for a KNF and not for its KDUs, the previous verification gives False, and the code
2119 # jumps to this block, meaning that there is the need to verify if the VNF is actually a VNF
2121 elif db_vnfr
.get("vdur"):
2122 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
2132 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
2134 # store rw_mgmt_ip in deploy params for later replacement
2135 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
2137 # n2vc_redesign STEP 6 Execute initial config primitive
2138 step
= "execute initial config primitive"
2140 # wait for dependent primitives execution (NS -> VNF -> VDU)
2141 if initial_config_primitive_list
:
2142 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
2144 # stage, in function of element type: vdu, kdu, vnf or ns
2145 my_vca
= vca_deployed_list
[vca_index
]
2146 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
2148 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
2149 elif my_vca
.get("member-vnf-index"):
2151 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
2154 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
2156 self
._write
_configuration
_status
(
2157 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
2160 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2162 check_if_terminated_needed
= True
2163 for initial_config_primitive
in initial_config_primitive_list
:
2164 # adding information on the vca_deployed if it is a NS execution environment
2165 if not vca_deployed
["member-vnf-index"]:
2166 deploy_params
["ns_config_info"] = json
.dumps(
2167 self
._get
_ns
_config
_info
(nsr_id
)
2169 # TODO check if already done
2170 primitive_params_
= self
._map
_primitive
_params
(
2171 initial_config_primitive
, {}, deploy_params
2174 step
= "execute primitive '{}' params '{}'".format(
2175 initial_config_primitive
["name"], primitive_params_
2177 self
.logger
.debug(logging_text
+ step
)
2178 await self
.vca_map
[vca_type
].exec_primitive(
2180 primitive_name
=initial_config_primitive
["name"],
2181 params_dict
=primitive_params_
,
2186 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2187 if check_if_terminated_needed
:
2188 if config_descriptor
.get("terminate-config-primitive"):
2190 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
2192 check_if_terminated_needed
= False
2194 # TODO register in database that primitive is done
2196 # STEP 7 Configure metrics
2197 if vca_type
== "helm" or vca_type
== "helm-v3":
2198 # TODO: review for those cases where the helm chart is a reference and
2199 # is not part of the NF package
2200 prometheus_jobs
= await self
.extract_prometheus_scrape_jobs(
2202 artifact_path
=artifact_path
,
2203 ee_config_descriptor
=ee_config_descriptor
,
2206 target_ip
=rw_mgmt_ip
,
2212 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
2215 for job
in prometheus_jobs
:
2218 {"job_name": job
["job_name"]},
2221 fail_on_empty
=False,
2224 step
= "instantiated at VCA"
2225 self
.logger
.debug(logging_text
+ step
)
2227 self
._write
_configuration
_status
(
2228 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
2231 except Exception as e
: # TODO not use Exception but N2VC exception
2232 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2234 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
2237 "Exception while {} : {}".format(step
, e
), exc_info
=True
2239 self
._write
_configuration
_status
(
2240 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2242 raise LcmException("{} {}".format(step
, e
)) from e
2244 def _write_ns_status(
2248 current_operation
: str,
2249 current_operation_id
: str,
2250 error_description
: str = None,
2251 error_detail
: str = None,
2252 other_update
: dict = None,
2255 Update db_nsr fields.
2258 :param current_operation:
2259 :param current_operation_id:
2260 :param error_description:
2261 :param error_detail:
2262 :param other_update: Other required changes at database if provided, will be cleared
2266 db_dict
= other_update
or {}
2269 ] = current_operation_id
# for backward compatibility
2270 db_dict
["_admin.current-operation"] = current_operation_id
2271 db_dict
["_admin.operation-type"] = (
2272 current_operation
if current_operation
!= "IDLE" else None
2274 db_dict
["currentOperation"] = current_operation
2275 db_dict
["currentOperationID"] = current_operation_id
2276 db_dict
["errorDescription"] = error_description
2277 db_dict
["errorDetail"] = error_detail
2280 db_dict
["nsState"] = ns_state
2281 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2282 except DbException
as e
:
2283 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2285 def _write_op_status(
2289 error_message
: str = None,
2290 queuePosition
: int = 0,
2291 operation_state
: str = None,
2292 other_update
: dict = None,
2295 db_dict
= other_update
or {}
2296 db_dict
["queuePosition"] = queuePosition
2297 if isinstance(stage
, list):
2298 db_dict
["stage"] = stage
[0]
2299 db_dict
["detailed-status"] = " ".join(stage
)
2300 elif stage
is not None:
2301 db_dict
["stage"] = str(stage
)
2303 if error_message
is not None:
2304 db_dict
["errorMessage"] = error_message
2305 if operation_state
is not None:
2306 db_dict
["operationState"] = operation_state
2307 db_dict
["statusEnteredTime"] = time()
2308 self
.update_db_2("nslcmops", op_id
, db_dict
)
2309 except DbException
as e
:
2311 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2314 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2316 nsr_id
= db_nsr
["_id"]
2317 # configurationStatus
2318 config_status
= db_nsr
.get("configurationStatus")
2321 "configurationStatus.{}.status".format(index
): status
2322 for index
, v
in enumerate(config_status
)
2326 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2328 except DbException
as e
:
2330 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2333 def _write_configuration_status(
2338 element_under_configuration
: str = None,
2339 element_type
: str = None,
2340 other_update
: dict = None,
2343 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2344 # .format(vca_index, status))
2347 db_path
= "configurationStatus.{}.".format(vca_index
)
2348 db_dict
= other_update
or {}
2350 db_dict
[db_path
+ "status"] = status
2351 if element_under_configuration
:
2353 db_path
+ "elementUnderConfiguration"
2354 ] = element_under_configuration
2356 db_dict
[db_path
+ "elementType"] = element_type
2357 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2358 except DbException
as e
:
2360 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2361 status
, nsr_id
, vca_index
, e
2365 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2367 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2368 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2369 Database is used because the result can be obtained from a different LCM worker in case of HA.
2370 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2371 :param db_nslcmop: database content of nslcmop
2372 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2373 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2374 computed 'vim-account-id'
2377 nslcmop_id
= db_nslcmop
["_id"]
2378 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2379 if placement_engine
== "PLA":
2381 logging_text
+ "Invoke and wait for placement optimization"
2383 await self
.msg
.aiowrite(
2384 "pla", "get_placement", {"nslcmopId": nslcmop_id
}, loop
=self
.loop
2386 db_poll_interval
= 5
2387 wait
= db_poll_interval
* 10
2389 while not pla_result
and wait
>= 0:
2390 await asyncio
.sleep(db_poll_interval
)
2391 wait
-= db_poll_interval
2392 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2393 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2397 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2400 for pla_vnf
in pla_result
["vnf"]:
2401 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2402 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2407 {"_id": vnfr
["_id"]},
2408 {"vim-account-id": pla_vnf
["vimAccountId"]},
2411 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2414 def update_nsrs_with_pla_result(self
, params
):
2416 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2418 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2420 except Exception as e
:
2421 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2423 async def instantiate(self
, nsr_id
, nslcmop_id
):
2426 :param nsr_id: ns instance to deploy
2427 :param nslcmop_id: operation to run
2431 # Try to lock HA task here
2432 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2433 if not task_is_locked_by_me
:
2435 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2439 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2440 self
.logger
.debug(logging_text
+ "Enter")
2442 # get all needed from database
2444 # database nsrs record
2447 # database nslcmops record
2450 # update operation on nsrs
2452 # update operation on nslcmops
2453 db_nslcmop_update
= {}
2455 nslcmop_operation_state
= None
2456 db_vnfrs
= {} # vnf's info indexed by member-index
2458 tasks_dict_info
= {} # from task to info text
2462 "Stage 1/5: preparation of the environment.",
2463 "Waiting for previous operations to terminate.",
2466 # ^ stage, step, VIM progress
2468 # wait for any previous tasks in process
2469 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2471 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2472 stage
[1] = "Reading from database."
2473 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2474 db_nsr_update
["detailed-status"] = "creating"
2475 db_nsr_update
["operational-status"] = "init"
2476 self
._write
_ns
_status
(
2478 ns_state
="BUILDING",
2479 current_operation
="INSTANTIATING",
2480 current_operation_id
=nslcmop_id
,
2481 other_update
=db_nsr_update
,
2483 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2485 # read from db: operation
2486 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2487 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2488 if db_nslcmop
["operationParams"].get("additionalParamsForVnf"):
2489 db_nslcmop
["operationParams"]["additionalParamsForVnf"] = json
.loads(
2490 db_nslcmop
["operationParams"]["additionalParamsForVnf"]
2492 ns_params
= db_nslcmop
.get("operationParams")
2493 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2494 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2496 timeout_ns_deploy
= self
.timeout
.ns_deploy
2499 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2500 self
.logger
.debug(logging_text
+ stage
[1])
2501 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2502 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2503 self
.logger
.debug(logging_text
+ stage
[1])
2504 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2505 self
.fs
.sync(db_nsr
["nsd-id"])
2507 # nsr_name = db_nsr["name"] # TODO short-name??
2509 # read from db: vnf's of this ns
2510 stage
[1] = "Getting vnfrs from db."
2511 self
.logger
.debug(logging_text
+ stage
[1])
2512 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2514 # read from db: vnfd's for every vnf
2515 db_vnfds
= [] # every vnfd data
2517 # for each vnf in ns, read vnfd
2518 for vnfr
in db_vnfrs_list
:
2519 if vnfr
.get("kdur"):
2521 for kdur
in vnfr
["kdur"]:
2522 if kdur
.get("additionalParams"):
2523 kdur
["additionalParams"] = json
.loads(
2524 kdur
["additionalParams"]
2526 kdur_list
.append(kdur
)
2527 vnfr
["kdur"] = kdur_list
2529 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2530 vnfd_id
= vnfr
["vnfd-id"]
2531 vnfd_ref
= vnfr
["vnfd-ref"]
2532 self
.fs
.sync(vnfd_id
)
2534 # if we haven't this vnfd, read it from db
2535 if vnfd_id
not in db_vnfds
:
2537 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2540 self
.logger
.debug(logging_text
+ stage
[1])
2541 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2544 db_vnfds
.append(vnfd
)
2546 # Get or generates the _admin.deployed.VCA list
2547 vca_deployed_list
= None
2548 if db_nsr
["_admin"].get("deployed"):
2549 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2550 if vca_deployed_list
is None:
2551 vca_deployed_list
= []
2552 configuration_status_list
= []
2553 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2554 db_nsr_update
["configurationStatus"] = configuration_status_list
2555 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2556 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2557 elif isinstance(vca_deployed_list
, dict):
2558 # maintain backward compatibility. Change a dict to list at database
2559 vca_deployed_list
= list(vca_deployed_list
.values())
2560 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2561 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2564 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2566 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2567 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2569 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2570 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2571 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2573 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2576 # n2vc_redesign STEP 2 Deploy Network Scenario
2577 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2578 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2580 stage
[1] = "Deploying KDUs."
2581 # self.logger.debug(logging_text + "Before deploy_kdus")
2582 # Call to deploy_kdus in case exists the "vdu:kdu" param
2583 await self
.deploy_kdus(
2584 logging_text
=logging_text
,
2586 nslcmop_id
=nslcmop_id
,
2589 task_instantiation_info
=tasks_dict_info
,
2592 stage
[1] = "Getting VCA public key."
2593 # n2vc_redesign STEP 1 Get VCA public ssh-key
2594 # feature 1429. Add n2vc public key to needed VMs
2595 n2vc_key
= self
.n2vc
.get_public_key()
2596 n2vc_key_list
= [n2vc_key
]
2597 if self
.vca_config
.public_key
:
2598 n2vc_key_list
.append(self
.vca_config
.public_key
)
2600 stage
[1] = "Deploying NS at VIM."
2601 task_ro
= asyncio
.ensure_future(
2602 self
.instantiate_RO(
2603 logging_text
=logging_text
,
2607 db_nslcmop
=db_nslcmop
,
2610 n2vc_key_list
=n2vc_key_list
,
2614 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2615 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2617 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2618 stage
[1] = "Deploying Execution Environments."
2619 self
.logger
.debug(logging_text
+ stage
[1])
2621 # create namespace and certificate if any helm based EE is present in the NS
2622 if check_helm_ee_in_ns(db_vnfds
):
2623 # TODO: create EE namespace
2624 # create TLS certificates
2625 await self
.vca_map
["helm-v3"].create_tls_certificate(
2626 secret_name
="ee-tls-{}".format(nsr_id
),
2629 usage
="server auth",
2632 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2633 for vnf_profile
in get_vnf_profiles(nsd
):
2634 vnfd_id
= vnf_profile
["vnfd-id"]
2635 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2636 member_vnf_index
= str(vnf_profile
["id"])
2637 db_vnfr
= db_vnfrs
[member_vnf_index
]
2638 base_folder
= vnfd
["_admin"]["storage"]
2644 # Get additional parameters
2645 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2646 if db_vnfr
.get("additionalParamsForVnf"):
2647 deploy_params
.update(
2648 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2651 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2652 if descriptor_config
:
2654 logging_text
=logging_text
2655 + "member_vnf_index={} ".format(member_vnf_index
),
2658 nslcmop_id
=nslcmop_id
,
2664 member_vnf_index
=member_vnf_index
,
2665 vdu_index
=vdu_index
,
2667 deploy_params
=deploy_params
,
2668 descriptor_config
=descriptor_config
,
2669 base_folder
=base_folder
,
2670 task_instantiation_info
=tasks_dict_info
,
2674 # Deploy charms for each VDU that supports one.
2675 for vdud
in get_vdu_list(vnfd
):
2677 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2678 vdur
= find_in_list(
2679 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2682 if vdur
.get("additionalParams"):
2683 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2685 deploy_params_vdu
= deploy_params
2686 deploy_params_vdu
["OSM"] = get_osm_params(
2687 db_vnfr
, vdu_id
, vdu_count_index
=0
2689 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2691 self
.logger
.debug("VDUD > {}".format(vdud
))
2693 "Descriptor config > {}".format(descriptor_config
)
2695 if descriptor_config
:
2698 for vdu_index
in range(vdud_count
):
2699 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2701 logging_text
=logging_text
2702 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2703 member_vnf_index
, vdu_id
, vdu_index
2707 nslcmop_id
=nslcmop_id
,
2713 member_vnf_index
=member_vnf_index
,
2714 vdu_index
=vdu_index
,
2716 deploy_params
=deploy_params_vdu
,
2717 descriptor_config
=descriptor_config
,
2718 base_folder
=base_folder
,
2719 task_instantiation_info
=tasks_dict_info
,
2722 for kdud
in get_kdu_list(vnfd
):
2723 kdu_name
= kdud
["name"]
2724 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2725 if descriptor_config
:
2730 x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
2732 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2733 if kdur
.get("additionalParams"):
2734 deploy_params_kdu
.update(
2735 parse_yaml_strings(kdur
["additionalParams"].copy())
2739 logging_text
=logging_text
,
2742 nslcmop_id
=nslcmop_id
,
2748 member_vnf_index
=member_vnf_index
,
2749 vdu_index
=vdu_index
,
2751 deploy_params
=deploy_params_kdu
,
2752 descriptor_config
=descriptor_config
,
2753 base_folder
=base_folder
,
2754 task_instantiation_info
=tasks_dict_info
,
2758 # Check if this NS has a charm configuration
2759 descriptor_config
= nsd
.get("ns-configuration")
2760 if descriptor_config
and descriptor_config
.get("juju"):
2763 member_vnf_index
= None
2769 # Get additional parameters
2770 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2771 if db_nsr
.get("additionalParamsForNs"):
2772 deploy_params
.update(
2773 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2775 base_folder
= nsd
["_admin"]["storage"]
2777 logging_text
=logging_text
,
2780 nslcmop_id
=nslcmop_id
,
2786 member_vnf_index
=member_vnf_index
,
2787 vdu_index
=vdu_index
,
2789 deploy_params
=deploy_params
,
2790 descriptor_config
=descriptor_config
,
2791 base_folder
=base_folder
,
2792 task_instantiation_info
=tasks_dict_info
,
2796 # rest of staff will be done at finally
2799 ROclient
.ROClientException
,
2805 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
2808 except asyncio
.CancelledError
:
2810 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
2812 exc
= "Operation was cancelled"
2813 except Exception as e
:
2814 exc
= traceback
.format_exc()
2815 self
.logger
.critical(
2816 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
2821 error_list
.append(str(exc
))
2823 # wait for pending tasks
2825 stage
[1] = "Waiting for instantiate pending tasks."
2826 self
.logger
.debug(logging_text
+ stage
[1])
2827 error_list
+= await self
._wait
_for
_tasks
(
2835 stage
[1] = stage
[2] = ""
2836 except asyncio
.CancelledError
:
2837 error_list
.append("Cancelled")
2838 # TODO cancel all tasks
2839 except Exception as exc
:
2840 error_list
.append(str(exc
))
2842 # update operation-status
2843 db_nsr_update
["operational-status"] = "running"
2844 # let's begin with VCA 'configured' status (later we can change it)
2845 db_nsr_update
["config-status"] = "configured"
2846 for task
, task_name
in tasks_dict_info
.items():
2847 if not task
.done() or task
.cancelled() or task
.exception():
2848 if task_name
.startswith(self
.task_name_deploy_vca
):
2849 # A N2VC task is pending
2850 db_nsr_update
["config-status"] = "failed"
2852 # RO or KDU task is pending
2853 db_nsr_update
["operational-status"] = "failed"
2855 # update status at database
2857 error_detail
= ". ".join(error_list
)
2858 self
.logger
.error(logging_text
+ error_detail
)
2859 error_description_nslcmop
= "{} Detail: {}".format(
2860 stage
[0], error_detail
2862 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
2863 nslcmop_id
, stage
[0]
2866 db_nsr_update
["detailed-status"] = (
2867 error_description_nsr
+ " Detail: " + error_detail
2869 db_nslcmop_update
["detailed-status"] = error_detail
2870 nslcmop_operation_state
= "FAILED"
2874 error_description_nsr
= error_description_nslcmop
= None
2876 db_nsr_update
["detailed-status"] = "Done"
2877 db_nslcmop_update
["detailed-status"] = "Done"
2878 nslcmop_operation_state
= "COMPLETED"
2881 self
._write
_ns
_status
(
2884 current_operation
="IDLE",
2885 current_operation_id
=None,
2886 error_description
=error_description_nsr
,
2887 error_detail
=error_detail
,
2888 other_update
=db_nsr_update
,
2890 self
._write
_op
_status
(
2893 error_message
=error_description_nslcmop
,
2894 operation_state
=nslcmop_operation_state
,
2895 other_update
=db_nslcmop_update
,
2898 if nslcmop_operation_state
:
2900 await self
.msg
.aiowrite(
2905 "nslcmop_id": nslcmop_id
,
2906 "operationState": nslcmop_operation_state
,
2910 except Exception as e
:
2912 logging_text
+ "kafka_write notification Exception {}".format(e
)
2915 self
.logger
.debug(logging_text
+ "Exit")
2916 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2918 def _get_vnfd(self
, vnfd_id
: str, projects_read
: str, cached_vnfds
: Dict
[str, Any
]):
2919 if vnfd_id
not in cached_vnfds
:
2920 cached_vnfds
[vnfd_id
] = self
.db
.get_one(
2921 "vnfds", {"id": vnfd_id
, "_admin.projects_read": projects_read
}
2923 return cached_vnfds
[vnfd_id
]
2925 def _get_vnfr(self
, nsr_id
: str, vnf_profile_id
: str, cached_vnfrs
: Dict
[str, Any
]):
2926 if vnf_profile_id
not in cached_vnfrs
:
2927 cached_vnfrs
[vnf_profile_id
] = self
.db
.get_one(
2930 "member-vnf-index-ref": vnf_profile_id
,
2931 "nsr-id-ref": nsr_id
,
2934 return cached_vnfrs
[vnf_profile_id
]
2936 def _is_deployed_vca_in_relation(
2937 self
, vca
: DeployedVCA
, relation
: Relation
2940 for endpoint
in (relation
.provider
, relation
.requirer
):
2941 if endpoint
["kdu-resource-profile-id"]:
2944 vca
.vnf_profile_id
== endpoint
.vnf_profile_id
2945 and vca
.vdu_profile_id
== endpoint
.vdu_profile_id
2946 and vca
.execution_environment_ref
== endpoint
.execution_environment_ref
2952 def _update_ee_relation_data_with_implicit_data(
2953 self
, nsr_id
, nsd
, ee_relation_data
, cached_vnfds
, vnf_profile_id
: str = None
2955 ee_relation_data
= safe_get_ee_relation(
2956 nsr_id
, ee_relation_data
, vnf_profile_id
=vnf_profile_id
2958 ee_relation_level
= EELevel
.get_level(ee_relation_data
)
2959 if (ee_relation_level
in (EELevel
.VNF
, EELevel
.VDU
)) and not ee_relation_data
[
2960 "execution-environment-ref"
2962 vnf_profile
= get_vnf_profile(nsd
, ee_relation_data
["vnf-profile-id"])
2963 vnfd_id
= vnf_profile
["vnfd-id"]
2964 project
= nsd
["_admin"]["projects_read"][0]
2965 db_vnfd
= self
._get
_vnfd
(vnfd_id
, project
, cached_vnfds
)
2968 if ee_relation_level
== EELevel
.VNF
2969 else ee_relation_data
["vdu-profile-id"]
2971 ee
= get_juju_ee_ref(db_vnfd
, entity_id
)
2974 f
"not execution environments found for ee_relation {ee_relation_data}"
2976 ee_relation_data
["execution-environment-ref"] = ee
["id"]
2977 return ee_relation_data
2979 def _get_ns_relations(
2982 nsd
: Dict
[str, Any
],
2984 cached_vnfds
: Dict
[str, Any
],
2985 ) -> List
[Relation
]:
2987 db_ns_relations
= get_ns_configuration_relation_list(nsd
)
2988 for r
in db_ns_relations
:
2989 provider_dict
= None
2990 requirer_dict
= None
2991 if all(key
in r
for key
in ("provider", "requirer")):
2992 provider_dict
= r
["provider"]
2993 requirer_dict
= r
["requirer"]
2994 elif "entities" in r
:
2995 provider_id
= r
["entities"][0]["id"]
2998 "endpoint": r
["entities"][0]["endpoint"],
3000 if provider_id
!= nsd
["id"]:
3001 provider_dict
["vnf-profile-id"] = provider_id
3002 requirer_id
= r
["entities"][1]["id"]
3005 "endpoint": r
["entities"][1]["endpoint"],
3007 if requirer_id
!= nsd
["id"]:
3008 requirer_dict
["vnf-profile-id"] = requirer_id
3011 "provider/requirer or entities must be included in the relation."
3013 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3014 nsr_id
, nsd
, provider_dict
, cached_vnfds
3016 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3017 nsr_id
, nsd
, requirer_dict
, cached_vnfds
3019 provider
= EERelation(relation_provider
)
3020 requirer
= EERelation(relation_requirer
)
3021 relation
= Relation(r
["name"], provider
, requirer
)
3022 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
3024 relations
.append(relation
)
3027 def _get_vnf_relations(
3030 nsd
: Dict
[str, Any
],
3032 cached_vnfds
: Dict
[str, Any
],
3033 ) -> List
[Relation
]:
3035 vnf_profile
= get_vnf_profile(nsd
, vca
.vnf_profile_id
)
3036 vnf_profile_id
= vnf_profile
["id"]
3037 vnfd_id
= vnf_profile
["vnfd-id"]
3038 project
= nsd
["_admin"]["projects_read"][0]
3039 db_vnfd
= self
._get
_vnfd
(vnfd_id
, project
, cached_vnfds
)
3040 db_vnf_relations
= get_relation_list(db_vnfd
, vnfd_id
)
3041 for r
in db_vnf_relations
:
3042 provider_dict
= None
3043 requirer_dict
= None
3044 if all(key
in r
for key
in ("provider", "requirer")):
3045 provider_dict
= r
["provider"]
3046 requirer_dict
= r
["requirer"]
3047 elif "entities" in r
:
3048 provider_id
= r
["entities"][0]["id"]
3051 "vnf-profile-id": vnf_profile_id
,
3052 "endpoint": r
["entities"][0]["endpoint"],
3054 if provider_id
!= vnfd_id
:
3055 provider_dict
["vdu-profile-id"] = provider_id
3056 requirer_id
= r
["entities"][1]["id"]
3059 "vnf-profile-id": vnf_profile_id
,
3060 "endpoint": r
["entities"][1]["endpoint"],
3062 if requirer_id
!= vnfd_id
:
3063 requirer_dict
["vdu-profile-id"] = requirer_id
3066 "provider/requirer or entities must be included in the relation."
3068 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3069 nsr_id
, nsd
, provider_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3071 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3072 nsr_id
, nsd
, requirer_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3074 provider
= EERelation(relation_provider
)
3075 requirer
= EERelation(relation_requirer
)
3076 relation
= Relation(r
["name"], provider
, requirer
)
3077 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
3079 relations
.append(relation
)
3082 def _get_kdu_resource_data(
3084 ee_relation
: EERelation
,
3085 db_nsr
: Dict
[str, Any
],
3086 cached_vnfds
: Dict
[str, Any
],
3087 ) -> DeployedK8sResource
:
3088 nsd
= get_nsd(db_nsr
)
3089 vnf_profiles
= get_vnf_profiles(nsd
)
3090 vnfd_id
= find_in_list(
3092 lambda vnf_profile
: vnf_profile
["id"] == ee_relation
.vnf_profile_id
,
3094 project
= nsd
["_admin"]["projects_read"][0]
3095 db_vnfd
= self
._get
_vnfd
(vnfd_id
, project
, cached_vnfds
)
3096 kdu_resource_profile
= get_kdu_resource_profile(
3097 db_vnfd
, ee_relation
.kdu_resource_profile_id
3099 kdu_name
= kdu_resource_profile
["kdu-name"]
3100 deployed_kdu
, _
= get_deployed_kdu(
3101 db_nsr
.get("_admin", ()).get("deployed", ()),
3103 ee_relation
.vnf_profile_id
,
3105 deployed_kdu
.update({"resource-name": kdu_resource_profile
["resource-name"]})
3108 def _get_deployed_component(
3110 ee_relation
: EERelation
,
3111 db_nsr
: Dict
[str, Any
],
3112 cached_vnfds
: Dict
[str, Any
],
3113 ) -> DeployedComponent
:
3114 nsr_id
= db_nsr
["_id"]
3115 deployed_component
= None
3116 ee_level
= EELevel
.get_level(ee_relation
)
3117 if ee_level
== EELevel
.NS
:
3118 vca
= get_deployed_vca(db_nsr
, {"vdu_id": None, "member-vnf-index": None})
3120 deployed_component
= DeployedVCA(nsr_id
, vca
)
3121 elif ee_level
== EELevel
.VNF
:
3122 vca
= get_deployed_vca(
3126 "member-vnf-index": ee_relation
.vnf_profile_id
,
3127 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3131 deployed_component
= DeployedVCA(nsr_id
, vca
)
3132 elif ee_level
== EELevel
.VDU
:
3133 vca
= get_deployed_vca(
3136 "vdu_id": ee_relation
.vdu_profile_id
,
3137 "member-vnf-index": ee_relation
.vnf_profile_id
,
3138 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3142 deployed_component
= DeployedVCA(nsr_id
, vca
)
3143 elif ee_level
== EELevel
.KDU
:
3144 kdu_resource_data
= self
._get
_kdu
_resource
_data
(
3145 ee_relation
, db_nsr
, cached_vnfds
3147 if kdu_resource_data
:
3148 deployed_component
= DeployedK8sResource(kdu_resource_data
)
3149 return deployed_component
3151 async def _add_relation(
3155 db_nsr
: Dict
[str, Any
],
3156 cached_vnfds
: Dict
[str, Any
],
3157 cached_vnfrs
: Dict
[str, Any
],
3159 deployed_provider
= self
._get
_deployed
_component
(
3160 relation
.provider
, db_nsr
, cached_vnfds
3162 deployed_requirer
= self
._get
_deployed
_component
(
3163 relation
.requirer
, db_nsr
, cached_vnfds
3167 and deployed_requirer
3168 and deployed_provider
.config_sw_installed
3169 and deployed_requirer
.config_sw_installed
3171 provider_db_vnfr
= (
3173 relation
.provider
.nsr_id
,
3174 relation
.provider
.vnf_profile_id
,
3177 if relation
.provider
.vnf_profile_id
3180 requirer_db_vnfr
= (
3182 relation
.requirer
.nsr_id
,
3183 relation
.requirer
.vnf_profile_id
,
3186 if relation
.requirer
.vnf_profile_id
3189 provider_vca_id
= self
.get_vca_id(provider_db_vnfr
, db_nsr
)
3190 requirer_vca_id
= self
.get_vca_id(requirer_db_vnfr
, db_nsr
)
3191 provider_relation_endpoint
= RelationEndpoint(
3192 deployed_provider
.ee_id
,
3194 relation
.provider
.endpoint
,
3196 requirer_relation_endpoint
= RelationEndpoint(
3197 deployed_requirer
.ee_id
,
3199 relation
.requirer
.endpoint
,
3201 await self
.vca_map
[vca_type
].add_relation(
3202 provider
=provider_relation_endpoint
,
3203 requirer
=requirer_relation_endpoint
,
3205 # remove entry from relations list
3209 async def _add_vca_relations(
3215 timeout
: int = 3600,
3219 # 1. find all relations for this VCA
3220 # 2. wait for other peers related
3224 # STEP 1: find all relations for this VCA
3227 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3228 nsd
= get_nsd(db_nsr
)
3231 deployed_vca_dict
= get_deployed_vca_list(db_nsr
)[vca_index
]
3232 my_vca
= DeployedVCA(nsr_id
, deployed_vca_dict
)
3237 relations
.extend(self
._get
_ns
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3238 relations
.extend(self
._get
_vnf
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3240 # if no relations, terminate
3242 self
.logger
.debug(logging_text
+ " No relations")
3245 self
.logger
.debug(logging_text
+ " adding relations {}".format(relations
))
3252 if now
- start
>= timeout
:
3253 self
.logger
.error(logging_text
+ " : timeout adding relations")
3256 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3257 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3259 # for each relation, find the VCA's related
3260 for relation
in relations
.copy():
3261 added
= await self
._add
_relation
(
3269 relations
.remove(relation
)
3272 self
.logger
.debug("Relations added")
3274 await asyncio
.sleep(5.0)
3278 except Exception as e
:
3279 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
3282 async def _install_kdu(
3290 k8s_instance_info
: dict,
3291 k8params
: dict = None,
3297 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
3300 "collection": "nsrs",
3301 "filter": {"_id": nsr_id
},
3302 "path": nsr_db_path
,
3305 if k8s_instance_info
.get("kdu-deployment-name"):
3306 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
3308 kdu_instance
= self
.k8scluster_map
[
3310 ].generate_kdu_instance_name(
3311 db_dict
=db_dict_install
,
3312 kdu_model
=k8s_instance_info
["kdu-model"],
3313 kdu_name
=k8s_instance_info
["kdu-name"],
3316 # Update the nsrs table with the kdu-instance value
3320 _desc
={nsr_db_path
+ ".kdu-instance": kdu_instance
},
3323 # Update the nsrs table with the actual namespace being used, if the k8scluster-type is `juju` or
3324 # `juju-bundle`. This verification is needed because there is not a standard/homogeneous namespace
3325 # between the Helm Charts and Juju Bundles-based KNFs. If we found a way of having an homogeneous
3326 # namespace, this first verification could be removed, and the next step would be done for any kind
3328 # TODO -> find a way to have an homogeneous namespace between the Helm Charts and Juju Bundles-based
3329 # KNFs (Bug 2027: https://osm.etsi.org/bugzilla/show_bug.cgi?id=2027)
3330 if k8sclustertype
in ("juju", "juju-bundle"):
3331 # First, verify if the current namespace is present in the `_admin.projects_read` (if not, it means
3332 # that the user passed a namespace which he wants its KDU to be deployed in)
3338 "_admin.projects_write": k8s_instance_info
["namespace"],
3339 "_admin.projects_read": k8s_instance_info
["namespace"],
3345 f
"Updating namespace/model for Juju Bundle from {k8s_instance_info['namespace']} to {kdu_instance}"
3350 _desc
={f
"{nsr_db_path}.namespace": kdu_instance
},
3352 k8s_instance_info
["namespace"] = kdu_instance
3354 await self
.k8scluster_map
[k8sclustertype
].install(
3355 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3356 kdu_model
=k8s_instance_info
["kdu-model"],
3359 db_dict
=db_dict_install
,
3361 kdu_name
=k8s_instance_info
["kdu-name"],
3362 namespace
=k8s_instance_info
["namespace"],
3363 kdu_instance
=kdu_instance
,
3367 # Obtain services to obtain management service ip
3368 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
3369 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3370 kdu_instance
=kdu_instance
,
3371 namespace
=k8s_instance_info
["namespace"],
3374 # Obtain management service info (if exists)
3375 vnfr_update_dict
= {}
3376 kdu_config
= get_configuration(vnfd
, kdud
["name"])
3378 target_ee_list
= kdu_config
.get("execution-environment-list", [])
3383 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
3386 for service
in kdud
.get("service", [])
3387 if service
.get("mgmt-service")
3389 for mgmt_service
in mgmt_services
:
3390 for service
in services
:
3391 if service
["name"].startswith(mgmt_service
["name"]):
3392 # Mgmt service found, Obtain service ip
3393 ip
= service
.get("external_ip", service
.get("cluster_ip"))
3394 if isinstance(ip
, list) and len(ip
) == 1:
3398 "kdur.{}.ip-address".format(kdu_index
)
3401 # Check if must update also mgmt ip at the vnf
3402 service_external_cp
= mgmt_service
.get(
3403 "external-connection-point-ref"
3405 if service_external_cp
:
3407 deep_get(vnfd
, ("mgmt-interface", "cp"))
3408 == service_external_cp
3410 vnfr_update_dict
["ip-address"] = ip
3415 "external-connection-point-ref", ""
3417 == service_external_cp
,
3420 "kdur.{}.ip-address".format(kdu_index
)
3425 "Mgmt service name: {} not found".format(
3426 mgmt_service
["name"]
3430 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3431 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3433 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3436 and kdu_config
.get("initial-config-primitive")
3437 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3439 initial_config_primitive_list
= kdu_config
.get(
3440 "initial-config-primitive"
3442 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3444 for initial_config_primitive
in initial_config_primitive_list
:
3445 primitive_params_
= self
._map
_primitive
_params
(
3446 initial_config_primitive
, {}, {}
3449 await asyncio
.wait_for(
3450 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3451 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3452 kdu_instance
=kdu_instance
,
3453 primitive_name
=initial_config_primitive
["name"],
3454 params
=primitive_params_
,
3455 db_dict
=db_dict_install
,
3461 except Exception as e
:
3462 # Prepare update db with error and raise exception
3465 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3469 vnfr_data
.get("_id"),
3470 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3473 # ignore to keep original exception
3475 # reraise original error
3480 async def deploy_kdus(
3487 task_instantiation_info
,
3489 # Launch kdus if present in the descriptor
3491 k8scluster_id_2_uuic
= {
3492 "helm-chart-v3": {},
3497 async def _get_cluster_id(cluster_id
, cluster_type
):
3498 nonlocal k8scluster_id_2_uuic
3499 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3500 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3502 # check if K8scluster is creating and wait look if previous tasks in process
3503 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3504 "k8scluster", cluster_id
3507 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3508 task_name
, cluster_id
3510 self
.logger
.debug(logging_text
+ text
)
3511 await asyncio
.wait(task_dependency
, timeout
=3600)
3513 db_k8scluster
= self
.db
.get_one(
3514 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3516 if not db_k8scluster
:
3517 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3519 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3521 if cluster_type
== "helm-chart-v3":
3523 # backward compatibility for existing clusters that have not been initialized for helm v3
3524 k8s_credentials
= yaml
.safe_dump(
3525 db_k8scluster
.get("credentials")
3527 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3528 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3530 db_k8scluster_update
= {}
3531 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3532 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3533 db_k8scluster_update
[
3534 "_admin.helm-chart-v3.created"
3536 db_k8scluster_update
[
3537 "_admin.helm-chart-v3.operationalState"
3540 "k8sclusters", cluster_id
, db_k8scluster_update
3542 except Exception as e
:
3545 + "error initializing helm-v3 cluster: {}".format(str(e
))
3548 "K8s cluster '{}' has not been initialized for '{}'".format(
3549 cluster_id
, cluster_type
3554 "K8s cluster '{}' has not been initialized for '{}'".format(
3555 cluster_id
, cluster_type
3558 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3561 logging_text
+= "Deploy kdus: "
3564 db_nsr_update
= {"_admin.deployed.K8s": []}
3565 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3568 updated_cluster_list
= []
3569 updated_v3_cluster_list
= []
3571 for vnfr_data
in db_vnfrs
.values():
3572 vca_id
= self
.get_vca_id(vnfr_data
, {})
3573 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3574 # Step 0: Prepare and set parameters
3575 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3576 vnfd_id
= vnfr_data
.get("vnfd-id")
3577 vnfd_with_id
= find_in_list(
3578 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3582 for kdud
in vnfd_with_id
["kdu"]
3583 if kdud
["name"] == kdur
["kdu-name"]
3585 namespace
= kdur
.get("k8s-namespace")
3586 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3587 if kdur
.get("helm-chart"):
3588 kdumodel
= kdur
["helm-chart"]
3589 # Default version: helm3, if helm-version is v2 assign v2
3590 k8sclustertype
= "helm-chart-v3"
3591 self
.logger
.debug("kdur: {}".format(kdur
))
3593 kdur
.get("helm-version")
3594 and kdur
.get("helm-version") == "v2"
3596 k8sclustertype
= "helm-chart"
3597 elif kdur
.get("juju-bundle"):
3598 kdumodel
= kdur
["juju-bundle"]
3599 k8sclustertype
= "juju-bundle"
3602 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3603 "juju-bundle. Maybe an old NBI version is running".format(
3604 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3607 # check if kdumodel is a file and exists
3609 vnfd_with_id
= find_in_list(
3610 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3612 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3613 if storage
: # may be not present if vnfd has not artifacts
3614 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3615 if storage
["pkg-dir"]:
3616 filename
= "{}/{}/{}s/{}".format(
3623 filename
= "{}/Scripts/{}s/{}".format(
3628 if self
.fs
.file_exists(
3629 filename
, mode
="file"
3630 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3631 kdumodel
= self
.fs
.path
+ filename
3632 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3634 except Exception: # it is not a file
3637 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3638 step
= "Synchronize repos for k8s cluster '{}'".format(
3641 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3645 k8sclustertype
== "helm-chart"
3646 and cluster_uuid
not in updated_cluster_list
3648 k8sclustertype
== "helm-chart-v3"
3649 and cluster_uuid
not in updated_v3_cluster_list
3651 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3652 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3653 cluster_uuid
=cluster_uuid
3656 if del_repo_list
or added_repo_dict
:
3657 if k8sclustertype
== "helm-chart":
3659 "_admin.helm_charts_added." + item
: None
3660 for item
in del_repo_list
3663 "_admin.helm_charts_added." + item
: name
3664 for item
, name
in added_repo_dict
.items()
3666 updated_cluster_list
.append(cluster_uuid
)
3667 elif k8sclustertype
== "helm-chart-v3":
3669 "_admin.helm_charts_v3_added." + item
: None
3670 for item
in del_repo_list
3673 "_admin.helm_charts_v3_added." + item
: name
3674 for item
, name
in added_repo_dict
.items()
3676 updated_v3_cluster_list
.append(cluster_uuid
)
3678 logging_text
+ "repos synchronized on k8s cluster "
3679 "'{}' to_delete: {}, to_add: {}".format(
3680 k8s_cluster_id
, del_repo_list
, added_repo_dict
3685 {"_id": k8s_cluster_id
},
3691 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3692 vnfr_data
["member-vnf-index-ref"],
3696 k8s_instance_info
= {
3697 "kdu-instance": None,
3698 "k8scluster-uuid": cluster_uuid
,
3699 "k8scluster-type": k8sclustertype
,
3700 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3701 "kdu-name": kdur
["kdu-name"],
3702 "kdu-model": kdumodel
,
3703 "namespace": namespace
,
3704 "kdu-deployment-name": kdu_deployment_name
,
3706 db_path
= "_admin.deployed.K8s.{}".format(index
)
3707 db_nsr_update
[db_path
] = k8s_instance_info
3708 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3709 vnfd_with_id
= find_in_list(
3710 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3712 task
= asyncio
.ensure_future(
3721 k8params
=desc_params
,
3726 self
.lcm_tasks
.register(
3730 "instantiate_KDU-{}".format(index
),
3733 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3739 except (LcmException
, asyncio
.CancelledError
):
3741 except Exception as e
:
3742 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3743 if isinstance(e
, (N2VCException
, DbException
)):
3744 self
.logger
.error(logging_text
+ msg
)
3746 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3747 raise LcmException(msg
)
3750 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3769 task_instantiation_info
,
3772 # launch instantiate_N2VC in a asyncio task and register task object
3773 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3774 # if not found, create one entry and update database
3775 # fill db_nsr._admin.deployed.VCA.<index>
3778 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
3782 get_charm_name
= False
3783 if "execution-environment-list" in descriptor_config
:
3784 ee_list
= descriptor_config
.get("execution-environment-list", [])
3785 elif "juju" in descriptor_config
:
3786 ee_list
= [descriptor_config
] # ns charms
3787 if "execution-environment-list" not in descriptor_config
:
3788 # charm name is only required for ns charms
3789 get_charm_name
= True
3790 else: # other types as script are not supported
3793 for ee_item
in ee_list
:
3796 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3797 ee_item
.get("juju"), ee_item
.get("helm-chart")
3800 ee_descriptor_id
= ee_item
.get("id")
3801 if ee_item
.get("juju"):
3802 vca_name
= ee_item
["juju"].get("charm")
3804 charm_name
= self
.find_charm_name(db_nsr
, str(vca_name
))
3807 if ee_item
["juju"].get("charm") is not None
3810 if ee_item
["juju"].get("cloud") == "k8s":
3811 vca_type
= "k8s_proxy_charm"
3812 elif ee_item
["juju"].get("proxy") is False:
3813 vca_type
= "native_charm"
3814 elif ee_item
.get("helm-chart"):
3815 vca_name
= ee_item
["helm-chart"]
3816 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
3819 vca_type
= "helm-v3"
3822 logging_text
+ "skipping non juju neither charm configuration"
3827 for vca_index
, vca_deployed
in enumerate(
3828 db_nsr
["_admin"]["deployed"]["VCA"]
3830 if not vca_deployed
:
3833 vca_deployed
.get("member-vnf-index") == member_vnf_index
3834 and vca_deployed
.get("vdu_id") == vdu_id
3835 and vca_deployed
.get("kdu_name") == kdu_name
3836 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
3837 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
3841 # not found, create one.
3843 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
3846 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
3848 target
+= "/kdu/{}".format(kdu_name
)
3850 "target_element": target
,
3851 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3852 "member-vnf-index": member_vnf_index
,
3854 "kdu_name": kdu_name
,
3855 "vdu_count_index": vdu_index
,
3856 "operational-status": "init", # TODO revise
3857 "detailed-status": "", # TODO revise
3858 "step": "initial-deploy", # TODO revise
3860 "vdu_name": vdu_name
,
3862 "ee_descriptor_id": ee_descriptor_id
,
3863 "charm_name": charm_name
,
3867 # create VCA and configurationStatus in db
3869 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
3870 "configurationStatus.{}".format(vca_index
): dict(),
3872 self
.update_db_2("nsrs", nsr_id
, db_dict
)
3874 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
3876 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
3877 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
3878 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
3881 task_n2vc
= asyncio
.ensure_future(
3882 self
.instantiate_N2VC(
3883 logging_text
=logging_text
,
3884 vca_index
=vca_index
,
3890 vdu_index
=vdu_index
,
3891 deploy_params
=deploy_params
,
3892 config_descriptor
=descriptor_config
,
3893 base_folder
=base_folder
,
3894 nslcmop_id
=nslcmop_id
,
3898 ee_config_descriptor
=ee_item
,
3901 self
.lcm_tasks
.register(
3905 "instantiate_N2VC-{}".format(vca_index
),
3908 task_instantiation_info
[
3910 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
3911 member_vnf_index
or "", vdu_id
or ""
3915 def _create_nslcmop(nsr_id
, operation
, params
):
3917 Creates a ns-lcm-opp content to be stored at database.
3918 :param nsr_id: internal id of the instance
3919 :param operation: instantiate, terminate, scale, action, ...
3920 :param params: user parameters for the operation
3921 :return: dictionary following SOL005 format
3923 # Raise exception if invalid arguments
3924 if not (nsr_id
and operation
and params
):
3926 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3933 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3934 "operationState": "PROCESSING",
3935 "statusEnteredTime": now
,
3936 "nsInstanceId": nsr_id
,
3937 "lcmOperationType": operation
,
3939 "isAutomaticInvocation": False,
3940 "operationParams": params
,
3941 "isCancelPending": False,
3943 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
3944 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
3949 def _format_additional_params(self
, params
):
3950 params
= params
or {}
3951 for key
, value
in params
.items():
3952 if str(value
).startswith("!!yaml "):
3953 params
[key
] = yaml
.safe_load(value
[7:])
3956 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
3957 primitive
= seq
.get("name")
3958 primitive_params
= {}
3960 "member_vnf_index": vnf_index
,
3961 "primitive": primitive
,
3962 "primitive_params": primitive_params
,
3965 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
3969 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
3970 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
3971 if op
.get("operationState") == "COMPLETED":
3972 # b. Skip sub-operation
3973 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3974 return self
.SUBOPERATION_STATUS_SKIP
3976 # c. retry executing sub-operation
3977 # The sub-operation exists, and operationState != 'COMPLETED'
3978 # Update operationState = 'PROCESSING' to indicate a retry.
3979 operationState
= "PROCESSING"
3980 detailed_status
= "In progress"
3981 self
._update
_suboperation
_status
(
3982 db_nslcmop
, op_index
, operationState
, detailed_status
3984 # Return the sub-operation index
3985 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3986 # with arguments extracted from the sub-operation
3989 # Find a sub-operation where all keys in a matching dictionary must match
3990 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3991 def _find_suboperation(self
, db_nslcmop
, match
):
3992 if db_nslcmop
and match
:
3993 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
3994 for i
, op
in enumerate(op_list
):
3995 if all(op
.get(k
) == match
[k
] for k
in match
):
3997 return self
.SUBOPERATION_STATUS_NOT_FOUND
3999 # Update status for a sub-operation given its index
4000 def _update_suboperation_status(
4001 self
, db_nslcmop
, op_index
, operationState
, detailed_status
4003 # Update DB for HA tasks
4004 q_filter
= {"_id": db_nslcmop
["_id"]}
4006 "_admin.operations.{}.operationState".format(op_index
): operationState
,
4007 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
4010 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
4013 # Add sub-operation, return the index of the added sub-operation
4014 # Optionally, set operationState, detailed-status, and operationType
4015 # Status and type are currently set for 'scale' sub-operations:
4016 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
4017 # 'detailed-status' : status message
4018 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
4019 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
4020 def _add_suboperation(
4028 mapped_primitive_params
,
4029 operationState
=None,
4030 detailed_status
=None,
4033 RO_scaling_info
=None,
4036 return self
.SUBOPERATION_STATUS_NOT_FOUND
4037 # Get the "_admin.operations" list, if it exists
4038 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
4039 op_list
= db_nslcmop_admin
.get("operations")
4040 # Create or append to the "_admin.operations" list
4042 "member_vnf_index": vnf_index
,
4044 "vdu_count_index": vdu_count_index
,
4045 "primitive": primitive
,
4046 "primitive_params": mapped_primitive_params
,
4049 new_op
["operationState"] = operationState
4051 new_op
["detailed-status"] = detailed_status
4053 new_op
["lcmOperationType"] = operationType
4055 new_op
["RO_nsr_id"] = RO_nsr_id
4057 new_op
["RO_scaling_info"] = RO_scaling_info
4059 # No existing operations, create key 'operations' with current operation as first list element
4060 db_nslcmop_admin
.update({"operations": [new_op
]})
4061 op_list
= db_nslcmop_admin
.get("operations")
4063 # Existing operations, append operation to list
4064 op_list
.append(new_op
)
4066 db_nslcmop_update
= {"_admin.operations": op_list
}
4067 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
4068 op_index
= len(op_list
) - 1
4071 # Helper methods for scale() sub-operations
4073 # pre-scale/post-scale:
4074 # Check for 3 different cases:
4075 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
4076 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
4077 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
4078 def _check_or_add_scale_suboperation(
4082 vnf_config_primitive
,
4086 RO_scaling_info
=None,
4088 # Find this sub-operation
4089 if RO_nsr_id
and RO_scaling_info
:
4090 operationType
= "SCALE-RO"
4092 "member_vnf_index": vnf_index
,
4093 "RO_nsr_id": RO_nsr_id
,
4094 "RO_scaling_info": RO_scaling_info
,
4098 "member_vnf_index": vnf_index
,
4099 "primitive": vnf_config_primitive
,
4100 "primitive_params": primitive_params
,
4101 "lcmOperationType": operationType
,
4103 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
4104 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
4105 # a. New sub-operation
4106 # The sub-operation does not exist, add it.
4107 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
4108 # The following parameters are set to None for all kind of scaling:
4110 vdu_count_index
= None
4112 if RO_nsr_id
and RO_scaling_info
:
4113 vnf_config_primitive
= None
4114 primitive_params
= None
4117 RO_scaling_info
= None
4118 # Initial status for sub-operation
4119 operationState
= "PROCESSING"
4120 detailed_status
= "In progress"
4121 # Add sub-operation for pre/post-scaling (zero or more operations)
4122 self
._add
_suboperation
(
4128 vnf_config_primitive
,
4136 return self
.SUBOPERATION_STATUS_NEW
4138 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
4139 # or op_index (operationState != 'COMPLETED')
4140 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
4142 # Function to return execution_environment id
4144 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
4145 # TODO vdu_index_count
4146 for vca
in vca_deployed_list
:
4147 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
4150 async def destroy_N2VC(
4158 exec_primitives
=True,
4163 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
4164 :param logging_text:
4166 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
4167 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
4168 :param vca_index: index in the database _admin.deployed.VCA
4169 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
4170 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
4171 not executed properly
4172 :param scaling_in: True destroys the application, False destroys the model
4173 :return: None or exception
4178 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
4179 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
4183 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
4185 # execute terminate_primitives
4187 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
4188 config_descriptor
.get("terminate-config-primitive"),
4189 vca_deployed
.get("ee_descriptor_id"),
4191 vdu_id
= vca_deployed
.get("vdu_id")
4192 vdu_count_index
= vca_deployed
.get("vdu_count_index")
4193 vdu_name
= vca_deployed
.get("vdu_name")
4194 vnf_index
= vca_deployed
.get("member-vnf-index")
4195 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
4196 for seq
in terminate_primitives
:
4197 # For each sequence in list, get primitive and call _ns_execute_primitive()
4198 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
4199 vnf_index
, seq
.get("name")
4201 self
.logger
.debug(logging_text
+ step
)
4202 # Create the primitive for each sequence, i.e. "primitive": "touch"
4203 primitive
= seq
.get("name")
4204 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
4209 self
._add
_suboperation
(
4216 mapped_primitive_params
,
4218 # Sub-operations: Call _ns_execute_primitive() instead of action()
4220 result
, result_detail
= await self
._ns
_execute
_primitive
(
4221 vca_deployed
["ee_id"],
4223 mapped_primitive_params
,
4227 except LcmException
:
4228 # this happens when VCA is not deployed. In this case it is not needed to terminate
4230 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
4231 if result
not in result_ok
:
4233 "terminate_primitive {} for vnf_member_index={} fails with "
4234 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
4236 # set that this VCA do not need terminated
4237 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
4241 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
4244 # Delete Prometheus Jobs if any
4245 # This uses NSR_ID, so it will destroy any jobs under this index
4246 self
.db
.del_list("prometheus_jobs", {"nsr_id": db_nslcmop
["nsInstanceId"]})
4249 await self
.vca_map
[vca_type
].delete_execution_environment(
4250 vca_deployed
["ee_id"],
4251 scaling_in
=scaling_in
,
4256 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
4257 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
4258 namespace
= "." + db_nsr
["_id"]
4260 await self
.n2vc
.delete_namespace(
4261 namespace
=namespace
,
4262 total_timeout
=self
.timeout
.charm_delete
,
4265 except N2VCNotFound
: # already deleted. Skip
4267 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
4269 async def _terminate_RO(
4270 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4273 Terminates a deployment from RO
4274 :param logging_text:
4275 :param nsr_deployed: db_nsr._admin.deployed
4278 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
4279 this method will update only the index 2, but it will write on database the concatenated content of the list
4284 ro_nsr_id
= ro_delete_action
= None
4285 if nsr_deployed
and nsr_deployed
.get("RO"):
4286 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
4287 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
4290 stage
[2] = "Deleting ns from VIM."
4291 db_nsr_update
["detailed-status"] = " ".join(stage
)
4292 self
._write
_op
_status
(nslcmop_id
, stage
)
4293 self
.logger
.debug(logging_text
+ stage
[2])
4294 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4295 self
._write
_op
_status
(nslcmop_id
, stage
)
4296 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
4297 ro_delete_action
= desc
["action_id"]
4299 "_admin.deployed.RO.nsr_delete_action_id"
4300 ] = ro_delete_action
4301 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4302 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4303 if ro_delete_action
:
4304 # wait until NS is deleted from VIM
4305 stage
[2] = "Waiting ns deleted from VIM."
4306 detailed_status_old
= None
4310 + " RO_id={} ro_delete_action={}".format(
4311 ro_nsr_id
, ro_delete_action
4314 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4315 self
._write
_op
_status
(nslcmop_id
, stage
)
4317 delete_timeout
= 20 * 60 # 20 minutes
4318 while delete_timeout
> 0:
4319 desc
= await self
.RO
.show(
4321 item_id_name
=ro_nsr_id
,
4322 extra_item
="action",
4323 extra_item_id
=ro_delete_action
,
4327 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
4329 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
4330 if ns_status
== "ERROR":
4331 raise ROclient
.ROClientException(ns_status_info
)
4332 elif ns_status
== "BUILD":
4333 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
4334 elif ns_status
== "ACTIVE":
4335 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4336 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4341 ), "ROclient.check_action_status returns unknown {}".format(
4344 if stage
[2] != detailed_status_old
:
4345 detailed_status_old
= stage
[2]
4346 db_nsr_update
["detailed-status"] = " ".join(stage
)
4347 self
._write
_op
_status
(nslcmop_id
, stage
)
4348 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4349 await asyncio
.sleep(5, loop
=self
.loop
)
4351 else: # delete_timeout <= 0:
4352 raise ROclient
.ROClientException(
4353 "Timeout waiting ns deleted from VIM"
4356 except Exception as e
:
4357 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4359 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4361 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4362 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4363 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4365 logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
)
4368 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4370 failed_detail
.append("delete conflict: {}".format(e
))
4373 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
)
4376 failed_detail
.append("delete error: {}".format(e
))
4378 logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
)
4382 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
4383 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
4385 stage
[2] = "Deleting nsd from RO."
4386 db_nsr_update
["detailed-status"] = " ".join(stage
)
4387 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4388 self
._write
_op
_status
(nslcmop_id
, stage
)
4389 await self
.RO
.delete("nsd", ro_nsd_id
)
4391 logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
)
4393 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4394 except Exception as e
:
4396 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4398 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4400 logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
)
4403 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4405 failed_detail
.append(
4406 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
)
4408 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4410 failed_detail
.append(
4411 "ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
)
4413 self
.logger
.error(logging_text
+ failed_detail
[-1])
4415 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
4416 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
4417 if not vnf_deployed
or not vnf_deployed
["id"]:
4420 ro_vnfd_id
= vnf_deployed
["id"]
4423 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4424 vnf_deployed
["member-vnf-index"], ro_vnfd_id
4426 db_nsr_update
["detailed-status"] = " ".join(stage
)
4427 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4428 self
._write
_op
_status
(nslcmop_id
, stage
)
4429 await self
.RO
.delete("vnfd", ro_vnfd_id
)
4431 logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
)
4433 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
4434 except Exception as e
:
4436 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4439 "_admin.deployed.RO.vnfd.{}.id".format(index
)
4443 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
)
4446 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4448 failed_detail
.append(
4449 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
)
4451 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4453 failed_detail
.append(
4454 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
)
4456 self
.logger
.error(logging_text
+ failed_detail
[-1])
4459 stage
[2] = "Error deleting from VIM"
4461 stage
[2] = "Deleted from VIM"
4462 db_nsr_update
["detailed-status"] = " ".join(stage
)
4463 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4464 self
._write
_op
_status
(nslcmop_id
, stage
)
4467 raise LcmException("; ".join(failed_detail
))
4469 async def terminate(self
, nsr_id
, nslcmop_id
):
4470 # Try to lock HA task here
4471 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4472 if not task_is_locked_by_me
:
4475 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4476 self
.logger
.debug(logging_text
+ "Enter")
4477 timeout_ns_terminate
= self
.timeout
.ns_terminate
4480 operation_params
= None
4482 error_list
= [] # annotates all failed error messages
4483 db_nslcmop_update
= {}
4484 autoremove
= False # autoremove after terminated
4485 tasks_dict_info
= {}
4488 "Stage 1/3: Preparing task.",
4489 "Waiting for previous operations to terminate.",
4492 # ^ contains [stage, step, VIM-status]
4494 # wait for any previous tasks in process
4495 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4497 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4498 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4499 operation_params
= db_nslcmop
.get("operationParams") or {}
4500 if operation_params
.get("timeout_ns_terminate"):
4501 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4502 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4503 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4505 db_nsr_update
["operational-status"] = "terminating"
4506 db_nsr_update
["config-status"] = "terminating"
4507 self
._write
_ns
_status
(
4509 ns_state
="TERMINATING",
4510 current_operation
="TERMINATING",
4511 current_operation_id
=nslcmop_id
,
4512 other_update
=db_nsr_update
,
4514 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4515 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4516 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4519 stage
[1] = "Getting vnf descriptors from db."
4520 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4522 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4524 db_vnfds_from_id
= {}
4525 db_vnfds_from_member_index
= {}
4527 for vnfr
in db_vnfrs_list
:
4528 vnfd_id
= vnfr
["vnfd-id"]
4529 if vnfd_id
not in db_vnfds_from_id
:
4530 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4531 db_vnfds_from_id
[vnfd_id
] = vnfd
4532 db_vnfds_from_member_index
[
4533 vnfr
["member-vnf-index-ref"]
4534 ] = db_vnfds_from_id
[vnfd_id
]
4536 # Destroy individual execution environments when there are terminating primitives.
4537 # Rest of EE will be deleted at once
4538 # TODO - check before calling _destroy_N2VC
4539 # if not operation_params.get("skip_terminate_primitives"):#
4540 # or not vca.get("needed_terminate"):
4541 stage
[0] = "Stage 2/3 execute terminating primitives."
4542 self
.logger
.debug(logging_text
+ stage
[0])
4543 stage
[1] = "Looking execution environment that needs terminate."
4544 self
.logger
.debug(logging_text
+ stage
[1])
4546 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4547 config_descriptor
= None
4548 vca_member_vnf_index
= vca
.get("member-vnf-index")
4549 vca_id
= self
.get_vca_id(
4550 db_vnfrs_dict
.get(vca_member_vnf_index
)
4551 if vca_member_vnf_index
4555 if not vca
or not vca
.get("ee_id"):
4557 if not vca
.get("member-vnf-index"):
4559 config_descriptor
= db_nsr
.get("ns-configuration")
4560 elif vca
.get("vdu_id"):
4561 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4562 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4563 elif vca
.get("kdu_name"):
4564 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4565 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4567 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4568 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4569 vca_type
= vca
.get("type")
4570 exec_terminate_primitives
= not operation_params
.get(
4571 "skip_terminate_primitives"
4572 ) and vca
.get("needed_terminate")
4573 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4574 # pending native charms
4576 True if vca_type
in ("helm", "helm-v3", "native_charm") else False
4578 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4579 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4580 task
= asyncio
.ensure_future(
4588 exec_terminate_primitives
,
4592 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4594 # wait for pending tasks of terminate primitives
4598 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4600 error_list
= await self
._wait
_for
_tasks
(
4603 min(self
.timeout
.charm_delete
, timeout_ns_terminate
),
4607 tasks_dict_info
.clear()
4609 return # raise LcmException("; ".join(error_list))
4611 # remove All execution environments at once
4612 stage
[0] = "Stage 3/3 delete all."
4614 if nsr_deployed
.get("VCA"):
4615 stage
[1] = "Deleting all execution environments."
4616 self
.logger
.debug(logging_text
+ stage
[1])
4617 vca_id
= self
.get_vca_id({}, db_nsr
)
4618 task_delete_ee
= asyncio
.ensure_future(
4620 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4621 timeout
=self
.timeout
.charm_delete
,
4624 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4625 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4627 # Delete Namespace and Certificates if necessary
4628 if check_helm_ee_in_ns(list(db_vnfds_from_member_index
.values())):
4629 await self
.vca_map
["helm-v3"].delete_tls_certificate(
4630 certificate_name
=db_nslcmop
["nsInstanceId"],
4632 # TODO: Delete namespace
4634 # Delete from k8scluster
4635 stage
[1] = "Deleting KDUs."
4636 self
.logger
.debug(logging_text
+ stage
[1])
4637 # print(nsr_deployed)
4638 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4639 if not kdu
or not kdu
.get("kdu-instance"):
4641 kdu_instance
= kdu
.get("kdu-instance")
4642 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4643 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4644 vca_id
= self
.get_vca_id({}, db_nsr
)
4645 task_delete_kdu_instance
= asyncio
.ensure_future(
4646 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4647 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4648 kdu_instance
=kdu_instance
,
4650 namespace
=kdu
.get("namespace"),
4656 + "Unknown k8s deployment type {}".format(
4657 kdu
.get("k8scluster-type")
4662 task_delete_kdu_instance
4663 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4666 stage
[1] = "Deleting ns from VIM."
4667 if self
.ro_config
.ng
:
4668 task_delete_ro
= asyncio
.ensure_future(
4669 self
._terminate
_ng
_ro
(
4670 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4674 task_delete_ro
= asyncio
.ensure_future(
4676 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4679 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4681 # rest of staff will be done at finally
4684 ROclient
.ROClientException
,
4689 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4691 except asyncio
.CancelledError
:
4693 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4695 exc
= "Operation was cancelled"
4696 except Exception as e
:
4697 exc
= traceback
.format_exc()
4698 self
.logger
.critical(
4699 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4704 error_list
.append(str(exc
))
4706 # wait for pending tasks
4708 stage
[1] = "Waiting for terminate pending tasks."
4709 self
.logger
.debug(logging_text
+ stage
[1])
4710 error_list
+= await self
._wait
_for
_tasks
(
4713 timeout_ns_terminate
,
4717 stage
[1] = stage
[2] = ""
4718 except asyncio
.CancelledError
:
4719 error_list
.append("Cancelled")
4720 # TODO cancell all tasks
4721 except Exception as exc
:
4722 error_list
.append(str(exc
))
4723 # update status at database
4725 error_detail
= "; ".join(error_list
)
4726 # self.logger.error(logging_text + error_detail)
4727 error_description_nslcmop
= "{} Detail: {}".format(
4728 stage
[0], error_detail
4730 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4731 nslcmop_id
, stage
[0]
4734 db_nsr_update
["operational-status"] = "failed"
4735 db_nsr_update
["detailed-status"] = (
4736 error_description_nsr
+ " Detail: " + error_detail
4738 db_nslcmop_update
["detailed-status"] = error_detail
4739 nslcmop_operation_state
= "FAILED"
4743 error_description_nsr
= error_description_nslcmop
= None
4744 ns_state
= "NOT_INSTANTIATED"
4745 db_nsr_update
["operational-status"] = "terminated"
4746 db_nsr_update
["detailed-status"] = "Done"
4747 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4748 db_nslcmop_update
["detailed-status"] = "Done"
4749 nslcmop_operation_state
= "COMPLETED"
4752 self
._write
_ns
_status
(
4755 current_operation
="IDLE",
4756 current_operation_id
=None,
4757 error_description
=error_description_nsr
,
4758 error_detail
=error_detail
,
4759 other_update
=db_nsr_update
,
4761 self
._write
_op
_status
(
4764 error_message
=error_description_nslcmop
,
4765 operation_state
=nslcmop_operation_state
,
4766 other_update
=db_nslcmop_update
,
4768 if ns_state
== "NOT_INSTANTIATED":
4772 {"nsr-id-ref": nsr_id
},
4773 {"_admin.nsState": "NOT_INSTANTIATED"},
4775 except DbException
as e
:
4778 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4782 if operation_params
:
4783 autoremove
= operation_params
.get("autoremove", False)
4784 if nslcmop_operation_state
:
4786 await self
.msg
.aiowrite(
4791 "nslcmop_id": nslcmop_id
,
4792 "operationState": nslcmop_operation_state
,
4793 "autoremove": autoremove
,
4797 except Exception as e
:
4799 logging_text
+ "kafka_write notification Exception {}".format(e
)
4802 self
.logger
.debug(logging_text
+ "Exit")
4803 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4805 async def _wait_for_tasks(
4806 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4809 error_detail_list
= []
4811 pending_tasks
= list(created_tasks_info
.keys())
4812 num_tasks
= len(pending_tasks
)
4814 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4815 self
._write
_op
_status
(nslcmop_id
, stage
)
4816 while pending_tasks
:
4818 _timeout
= timeout
+ time_start
- time()
4819 done
, pending_tasks
= await asyncio
.wait(
4820 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4822 num_done
+= len(done
)
4823 if not done
: # Timeout
4824 for task
in pending_tasks
:
4825 new_error
= created_tasks_info
[task
] + ": Timeout"
4826 error_detail_list
.append(new_error
)
4827 error_list
.append(new_error
)
4830 if task
.cancelled():
4833 exc
= task
.exception()
4835 if isinstance(exc
, asyncio
.TimeoutError
):
4837 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4838 error_list
.append(created_tasks_info
[task
])
4839 error_detail_list
.append(new_error
)
4846 ROclient
.ROClientException
,
4852 self
.logger
.error(logging_text
+ new_error
)
4854 exc_traceback
= "".join(
4855 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4859 + created_tasks_info
[task
]
4865 logging_text
+ created_tasks_info
[task
] + ": Done"
4867 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4869 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4870 if nsr_id
: # update also nsr
4875 "errorDescription": "Error at: " + ", ".join(error_list
),
4876 "errorDetail": ". ".join(error_detail_list
),
4879 self
._write
_op
_status
(nslcmop_id
, stage
)
4880 return error_detail_list
4883 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4885 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4886 The default-value is used. If it is between < > it look for a value at instantiation_params
4887 :param primitive_desc: portion of VNFD/NSD that describes primitive
4888 :param params: Params provided by user
4889 :param instantiation_params: Instantiation params provided by user
4890 :return: a dictionary with the calculated params
4892 calculated_params
= {}
4893 for parameter
in primitive_desc
.get("parameter", ()):
4894 param_name
= parameter
["name"]
4895 if param_name
in params
:
4896 calculated_params
[param_name
] = params
[param_name
]
4897 elif "default-value" in parameter
or "value" in parameter
:
4898 if "value" in parameter
:
4899 calculated_params
[param_name
] = parameter
["value"]
4901 calculated_params
[param_name
] = parameter
["default-value"]
4903 isinstance(calculated_params
[param_name
], str)
4904 and calculated_params
[param_name
].startswith("<")
4905 and calculated_params
[param_name
].endswith(">")
4907 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4908 calculated_params
[param_name
] = instantiation_params
[
4909 calculated_params
[param_name
][1:-1]
4913 "Parameter {} needed to execute primitive {} not provided".format(
4914 calculated_params
[param_name
], primitive_desc
["name"]
4919 "Parameter {} needed to execute primitive {} not provided".format(
4920 param_name
, primitive_desc
["name"]
4924 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4925 calculated_params
[param_name
] = yaml
.safe_dump(
4926 calculated_params
[param_name
], default_flow_style
=True, width
=256
4928 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4930 ].startswith("!!yaml "):
4931 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4932 if parameter
.get("data-type") == "INTEGER":
4934 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4935 except ValueError: # error converting string to int
4937 "Parameter {} of primitive {} must be integer".format(
4938 param_name
, primitive_desc
["name"]
4941 elif parameter
.get("data-type") == "BOOLEAN":
4942 calculated_params
[param_name
] = not (
4943 (str(calculated_params
[param_name
])).lower() == "false"
4946 # add always ns_config_info if primitive name is config
4947 if primitive_desc
["name"] == "config":
4948 if "ns_config_info" in instantiation_params
:
4949 calculated_params
["ns_config_info"] = instantiation_params
[
4952 return calculated_params
4954 def _look_for_deployed_vca(
4961 ee_descriptor_id
=None,
4963 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4964 for vca
in deployed_vca
:
4967 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4970 vdu_count_index
is not None
4971 and vdu_count_index
!= vca
["vdu_count_index"]
4974 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4976 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4980 # vca_deployed not found
4982 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4983 " is not deployed".format(
4992 ee_id
= vca
.get("ee_id")
4994 "type", "lxc_proxy_charm"
4995 ) # default value for backward compatibility - proxy charm
4998 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4999 "execution environment".format(
5000 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
5003 return ee_id
, vca_type
5005 async def _ns_execute_primitive(
5011 retries_interval
=30,
5018 if primitive
== "config":
5019 primitive_params
= {"params": primitive_params
}
5021 vca_type
= vca_type
or "lxc_proxy_charm"
5025 output
= await asyncio
.wait_for(
5026 self
.vca_map
[vca_type
].exec_primitive(
5028 primitive_name
=primitive
,
5029 params_dict
=primitive_params
,
5030 progress_timeout
=self
.timeout
.progress_primitive
,
5031 total_timeout
=self
.timeout
.primitive
,
5036 timeout
=timeout
or self
.timeout
.primitive
,
5040 except asyncio
.CancelledError
:
5042 except Exception as e
:
5046 "Error executing action {} on {} -> {}".format(
5051 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
5053 if isinstance(e
, asyncio
.TimeoutError
):
5055 message
="Timed out waiting for action to complete"
5057 return "FAILED", getattr(e
, "message", repr(e
))
5059 return "COMPLETED", output
5061 except (LcmException
, asyncio
.CancelledError
):
5063 except Exception as e
:
5064 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
5066 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
5068 Updating the vca_status with latest juju information in nsrs record
5069 :param: nsr_id: Id of the nsr
5070 :param: nslcmop_id: Id of the nslcmop
5074 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
5075 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5076 vca_id
= self
.get_vca_id({}, db_nsr
)
5077 if db_nsr
["_admin"]["deployed"]["K8s"]:
5078 for _
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
5079 cluster_uuid
, kdu_instance
, cluster_type
= (
5080 k8s
["k8scluster-uuid"],
5081 k8s
["kdu-instance"],
5082 k8s
["k8scluster-type"],
5084 await self
._on
_update
_k
8s
_db
(
5085 cluster_uuid
=cluster_uuid
,
5086 kdu_instance
=kdu_instance
,
5087 filter={"_id": nsr_id
},
5089 cluster_type
=cluster_type
,
5092 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
5093 table
, filter = "nsrs", {"_id": nsr_id
}
5094 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
5095 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
5097 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
5098 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
5100 async def action(self
, nsr_id
, nslcmop_id
):
5101 # Try to lock HA task here
5102 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5103 if not task_is_locked_by_me
:
5106 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
5107 self
.logger
.debug(logging_text
+ "Enter")
5108 # get all needed from database
5112 db_nslcmop_update
= {}
5113 nslcmop_operation_state
= None
5114 error_description_nslcmop
= None
5117 # wait for any previous tasks in process
5118 step
= "Waiting for previous operations to terminate"
5119 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5121 self
._write
_ns
_status
(
5124 current_operation
="RUNNING ACTION",
5125 current_operation_id
=nslcmop_id
,
5128 step
= "Getting information from database"
5129 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5130 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5131 if db_nslcmop
["operationParams"].get("primitive_params"):
5132 db_nslcmop
["operationParams"]["primitive_params"] = json
.loads(
5133 db_nslcmop
["operationParams"]["primitive_params"]
5136 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5137 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
5138 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
5139 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
5140 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
5141 primitive
= db_nslcmop
["operationParams"]["primitive"]
5142 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
5143 timeout_ns_action
= db_nslcmop
["operationParams"].get(
5144 "timeout_ns_action", self
.timeout
.primitive
5148 step
= "Getting vnfr from database"
5149 db_vnfr
= self
.db
.get_one(
5150 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
5152 if db_vnfr
.get("kdur"):
5154 for kdur
in db_vnfr
["kdur"]:
5155 if kdur
.get("additionalParams"):
5156 kdur
["additionalParams"] = json
.loads(
5157 kdur
["additionalParams"]
5159 kdur_list
.append(kdur
)
5160 db_vnfr
["kdur"] = kdur_list
5161 step
= "Getting vnfd from database"
5162 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
5164 # Sync filesystem before running a primitive
5165 self
.fs
.sync(db_vnfr
["vnfd-id"])
5167 step
= "Getting nsd from database"
5168 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
5170 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5171 # for backward compatibility
5172 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5173 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5174 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5175 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5177 # look for primitive
5178 config_primitive_desc
= descriptor_configuration
= None
5180 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
5182 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
5184 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
5186 descriptor_configuration
= db_nsd
.get("ns-configuration")
5188 if descriptor_configuration
and descriptor_configuration
.get(
5191 for config_primitive
in descriptor_configuration
["config-primitive"]:
5192 if config_primitive
["name"] == primitive
:
5193 config_primitive_desc
= config_primitive
5196 if not config_primitive_desc
:
5197 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
5199 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
5203 primitive_name
= primitive
5204 ee_descriptor_id
= None
5206 primitive_name
= config_primitive_desc
.get(
5207 "execution-environment-primitive", primitive
5209 ee_descriptor_id
= config_primitive_desc
.get(
5210 "execution-environment-ref"
5216 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
5218 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
5221 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
5223 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
5225 desc_params
= parse_yaml_strings(
5226 db_vnfr
.get("additionalParamsForVnf")
5229 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
5230 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
5231 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
5233 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
5234 actions
.add(primitive
["name"])
5235 for primitive
in kdu_configuration
.get("config-primitive", []):
5236 actions
.add(primitive
["name"])
5238 nsr_deployed
["K8s"],
5239 lambda kdu
: kdu_name
== kdu
["kdu-name"]
5240 and kdu
["member-vnf-index"] == vnf_index
,
5244 if primitive_name
in actions
5245 and kdu
["k8scluster-type"] not in ("helm-chart", "helm-chart-v3")
5249 # TODO check if ns is in a proper status
5251 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
5253 # kdur and desc_params already set from before
5254 if primitive_params
:
5255 desc_params
.update(primitive_params
)
5256 # TODO Check if we will need something at vnf level
5257 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
5259 kdu_name
== kdu
["kdu-name"]
5260 and kdu
["member-vnf-index"] == vnf_index
5265 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
5268 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
5269 msg
= "unknown k8scluster-type '{}'".format(
5270 kdu
.get("k8scluster-type")
5272 raise LcmException(msg
)
5275 "collection": "nsrs",
5276 "filter": {"_id": nsr_id
},
5277 "path": "_admin.deployed.K8s.{}".format(index
),
5281 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
5283 step
= "Executing kdu {}".format(primitive_name
)
5284 if primitive_name
== "upgrade":
5285 if desc_params
.get("kdu_model"):
5286 kdu_model
= desc_params
.get("kdu_model")
5287 del desc_params
["kdu_model"]
5289 kdu_model
= kdu
.get("kdu-model")
5290 parts
= kdu_model
.split(sep
=":")
5292 kdu_model
= parts
[0]
5293 if desc_params
.get("kdu_atomic_upgrade"):
5294 atomic_upgrade
= desc_params
.get(
5295 "kdu_atomic_upgrade"
5296 ).lower() in ("yes", "true", "1")
5297 del desc_params
["kdu_atomic_upgrade"]
5299 atomic_upgrade
= True
5301 detailed_status
= await asyncio
.wait_for(
5302 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
5303 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5304 kdu_instance
=kdu
.get("kdu-instance"),
5305 atomic
=atomic_upgrade
,
5306 kdu_model
=kdu_model
,
5309 timeout
=timeout_ns_action
,
5311 timeout
=timeout_ns_action
+ 10,
5314 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
5316 elif primitive_name
== "rollback":
5317 detailed_status
= await asyncio
.wait_for(
5318 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
5319 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5320 kdu_instance
=kdu
.get("kdu-instance"),
5323 timeout
=timeout_ns_action
,
5325 elif primitive_name
== "status":
5326 detailed_status
= await asyncio
.wait_for(
5327 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
5328 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5329 kdu_instance
=kdu
.get("kdu-instance"),
5332 timeout
=timeout_ns_action
,
5335 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
5336 kdu
["kdu-name"], nsr_id
5338 params
= self
._map
_primitive
_params
(
5339 config_primitive_desc
, primitive_params
, desc_params
5342 detailed_status
= await asyncio
.wait_for(
5343 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
5344 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5345 kdu_instance
=kdu_instance
,
5346 primitive_name
=primitive_name
,
5349 timeout
=timeout_ns_action
,
5352 timeout
=timeout_ns_action
,
5356 nslcmop_operation_state
= "COMPLETED"
5358 detailed_status
= ""
5359 nslcmop_operation_state
= "FAILED"
5361 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5362 nsr_deployed
["VCA"],
5363 member_vnf_index
=vnf_index
,
5365 vdu_count_index
=vdu_count_index
,
5366 ee_descriptor_id
=ee_descriptor_id
,
5368 for vca_index
, vca_deployed
in enumerate(
5369 db_nsr
["_admin"]["deployed"]["VCA"]
5371 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5373 "collection": "nsrs",
5374 "filter": {"_id": nsr_id
},
5375 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
5379 nslcmop_operation_state
,
5381 ) = await self
._ns
_execute
_primitive
(
5383 primitive
=primitive_name
,
5384 primitive_params
=self
._map
_primitive
_params
(
5385 config_primitive_desc
, primitive_params
, desc_params
5387 timeout
=timeout_ns_action
,
5393 db_nslcmop_update
["detailed-status"] = detailed_status
5394 error_description_nslcmop
= (
5395 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
5399 + "Done with result {} {}".format(
5400 nslcmop_operation_state
, detailed_status
5403 return # database update is called inside finally
5405 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5406 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5408 except asyncio
.CancelledError
:
5410 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5412 exc
= "Operation was cancelled"
5413 except asyncio
.TimeoutError
:
5414 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5416 except Exception as e
:
5417 exc
= traceback
.format_exc()
5418 self
.logger
.critical(
5419 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5428 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5429 nslcmop_operation_state
= "FAILED"
5431 self
._write
_ns
_status
(
5435 ], # TODO check if degraded. For the moment use previous status
5436 current_operation
="IDLE",
5437 current_operation_id
=None,
5438 # error_description=error_description_nsr,
5439 # error_detail=error_detail,
5440 other_update
=db_nsr_update
,
5443 self
._write
_op
_status
(
5446 error_message
=error_description_nslcmop
,
5447 operation_state
=nslcmop_operation_state
,
5448 other_update
=db_nslcmop_update
,
5451 if nslcmop_operation_state
:
5453 await self
.msg
.aiowrite(
5458 "nslcmop_id": nslcmop_id
,
5459 "operationState": nslcmop_operation_state
,
5463 except Exception as e
:
5465 logging_text
+ "kafka_write notification Exception {}".format(e
)
5467 self
.logger
.debug(logging_text
+ "Exit")
5468 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5469 return nslcmop_operation_state
, detailed_status
5471 async def terminate_vdus(
5472 self
, db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
5474 """This method terminates VDUs
5477 db_vnfr: VNF instance record
5478 member_vnf_index: VNF index to identify the VDUs to be removed
5479 db_nsr: NS instance record
5480 update_db_nslcmops: Nslcmop update record
5482 vca_scaling_info
= []
5483 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5484 scaling_info
["scaling_direction"] = "IN"
5485 scaling_info
["vdu-delete"] = {}
5486 scaling_info
["kdu-delete"] = {}
5487 db_vdur
= db_vnfr
.get("vdur")
5488 vdur_list
= copy(db_vdur
)
5490 for index
, vdu
in enumerate(vdur_list
):
5491 vca_scaling_info
.append(
5493 "osm_vdu_id": vdu
["vdu-id-ref"],
5494 "member-vnf-index": member_vnf_index
,
5496 "vdu_index": count_index
,
5499 scaling_info
["vdu-delete"][vdu
["vdu-id-ref"]] = count_index
5500 scaling_info
["vdu"].append(
5502 "name": vdu
.get("name") or vdu
.get("vdu-name"),
5503 "vdu_id": vdu
["vdu-id-ref"],
5507 for interface
in vdu
["interfaces"]:
5508 scaling_info
["vdu"][index
]["interface"].append(
5510 "name": interface
["name"],
5511 "ip_address": interface
["ip-address"],
5512 "mac_address": interface
.get("mac-address"),
5515 self
.logger
.info("NS update scaling info{}".format(scaling_info
))
5516 stage
[2] = "Terminating VDUs"
5517 if scaling_info
.get("vdu-delete"):
5518 # scale_process = "RO"
5519 if self
.ro_config
.ng
:
5520 await self
._scale
_ng
_ro
(
5529 async def remove_vnf(self
, nsr_id
, nslcmop_id
, vnf_instance_id
):
5530 """This method is to Remove VNF instances from NS.
5533 nsr_id: NS instance id
5534 nslcmop_id: nslcmop id of update
5535 vnf_instance_id: id of the VNF instance to be removed
5538 result: (str, str) COMPLETED/FAILED, details
5542 logging_text
= "Task ns={} update ".format(nsr_id
)
5543 check_vnfr_count
= len(self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}))
5544 self
.logger
.info("check_vnfr_count {}".format(check_vnfr_count
))
5545 if check_vnfr_count
> 1:
5546 stage
= ["", "", ""]
5547 step
= "Getting nslcmop from database"
5549 step
+ " after having waited for previous tasks to be completed"
5551 # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5552 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5553 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5554 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5555 """ db_vnfr = self.db.get_one(
5556 "vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id}) """
5558 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5559 await self
.terminate_vdus(
5568 constituent_vnfr
= db_nsr
.get("constituent-vnfr-ref")
5569 constituent_vnfr
.remove(db_vnfr
.get("_id"))
5570 db_nsr_update
["constituent-vnfr-ref"] = db_nsr
.get(
5571 "constituent-vnfr-ref"
5573 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5574 self
.db
.del_one("vnfrs", {"_id": db_vnfr
.get("_id")})
5575 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5576 return "COMPLETED", "Done"
5578 step
= "Terminate VNF Failed with"
5580 "{} Cannot terminate the last VNF in this NS.".format(
5584 except (LcmException
, asyncio
.CancelledError
):
5586 except Exception as e
:
5587 self
.logger
.debug("Error removing VNF {}".format(e
))
5588 return "FAILED", "Error removing VNF {}".format(e
)
5590 async def _ns_redeploy_vnf(
5598 """This method updates and redeploys VNF instances
5601 nsr_id: NS instance id
5602 nslcmop_id: nslcmop id
5603 db_vnfd: VNF descriptor
5604 db_vnfr: VNF instance record
5605 db_nsr: NS instance record
5608 result: (str, str) COMPLETED/FAILED, details
5612 stage
= ["", "", ""]
5613 logging_text
= "Task ns={} update ".format(nsr_id
)
5614 latest_vnfd_revision
= db_vnfd
["_admin"].get("revision")
5615 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5617 # Terminate old VNF resources
5618 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5619 await self
.terminate_vdus(
5628 # old_vnfd_id = db_vnfr["vnfd-id"]
5629 # new_db_vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
5630 new_db_vnfd
= db_vnfd
5631 # new_vnfd_ref = new_db_vnfd["id"]
5632 # new_vnfd_id = vnfd_id
5636 for cp
in new_db_vnfd
.get("ext-cpd", ()):
5638 "name": cp
.get("id"),
5639 "connection-point-id": cp
.get("int-cpd", {}).get("cpd"),
5640 "connection-point-vdu-id": cp
.get("int-cpd", {}).get("vdu-id"),
5643 new_vnfr_cp
.append(vnf_cp
)
5644 new_vdur
= update_db_nslcmops
["operationParams"]["newVdur"]
5645 # new_vdur = self._create_vdur_descriptor_from_vnfd(db_nsd, db_vnfd, old_db_vnfd, vnfd_id, db_nsr, member_vnf_index)
5646 # new_vnfr_update = {"vnfd-ref": new_vnfd_ref, "vnfd-id": new_vnfd_id, "connection-point": new_vnfr_cp, "vdur": new_vdur, "ip-address": ""}
5648 "revision": latest_vnfd_revision
,
5649 "connection-point": new_vnfr_cp
,
5653 self
.update_db_2("vnfrs", db_vnfr
["_id"], new_vnfr_update
)
5654 updated_db_vnfr
= self
.db
.get_one(
5656 {"member-vnf-index-ref": member_vnf_index
, "nsr-id-ref": nsr_id
},
5659 # Instantiate new VNF resources
5660 # update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5661 vca_scaling_info
= []
5662 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5663 scaling_info
["scaling_direction"] = "OUT"
5664 scaling_info
["vdu-create"] = {}
5665 scaling_info
["kdu-create"] = {}
5666 vdud_instantiate_list
= db_vnfd
["vdu"]
5667 for index
, vdud
in enumerate(vdud_instantiate_list
):
5668 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(vdud
, db_vnfd
)
5670 additional_params
= (
5671 self
._get
_vdu
_additional
_params
(updated_db_vnfr
, vdud
["id"])
5674 cloud_init_list
= []
5676 # TODO Information of its own ip is not available because db_vnfr is not updated.
5677 additional_params
["OSM"] = get_osm_params(
5678 updated_db_vnfr
, vdud
["id"], 1
5680 cloud_init_list
.append(
5681 self
._parse
_cloud
_init
(
5688 vca_scaling_info
.append(
5690 "osm_vdu_id": vdud
["id"],
5691 "member-vnf-index": member_vnf_index
,
5693 "vdu_index": count_index
,
5696 scaling_info
["vdu-create"][vdud
["id"]] = count_index
5697 if self
.ro_config
.ng
:
5699 "New Resources to be deployed: {}".format(scaling_info
)
5701 await self
._scale
_ng
_ro
(
5709 return "COMPLETED", "Done"
5710 except (LcmException
, asyncio
.CancelledError
):
5712 except Exception as e
:
5713 self
.logger
.debug("Error updating VNF {}".format(e
))
5714 return "FAILED", "Error updating VNF {}".format(e
)
5716 async def _ns_charm_upgrade(
5722 timeout
: float = None,
5724 """This method upgrade charms in VNF instances
5727 ee_id: Execution environment id
5728 path: Local path to the charm
5730 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
5731 timeout: (Float) Timeout for the ns update operation
5734 result: (str, str) COMPLETED/FAILED, details
5737 charm_type
= charm_type
or "lxc_proxy_charm"
5738 output
= await self
.vca_map
[charm_type
].upgrade_charm(
5742 charm_type
=charm_type
,
5743 timeout
=timeout
or self
.timeout
.ns_update
,
5747 return "COMPLETED", output
5749 except (LcmException
, asyncio
.CancelledError
):
5752 except Exception as e
:
5754 self
.logger
.debug("Error upgrading charm {}".format(path
))
5756 return "FAILED", "Error upgrading charm {}: {}".format(path
, e
)
5758 async def update(self
, nsr_id
, nslcmop_id
):
5759 """Update NS according to different update types
5761 This method performs upgrade of VNF instances then updates the revision
5762 number in VNF record
5765 nsr_id: Network service will be updated
5766 nslcmop_id: ns lcm operation id
5769 It may raise DbException, LcmException, N2VCException, K8sException
5772 # Try to lock HA task here
5773 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5774 if not task_is_locked_by_me
:
5777 logging_text
= "Task ns={} update={} ".format(nsr_id
, nslcmop_id
)
5778 self
.logger
.debug(logging_text
+ "Enter")
5780 # Set the required variables to be filled up later
5782 db_nslcmop_update
= {}
5784 nslcmop_operation_state
= None
5786 error_description_nslcmop
= ""
5788 change_type
= "updated"
5789 detailed_status
= ""
5792 # wait for any previous tasks in process
5793 step
= "Waiting for previous operations to terminate"
5794 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5795 self
._write
_ns
_status
(
5798 current_operation
="UPDATING",
5799 current_operation_id
=nslcmop_id
,
5802 step
= "Getting nslcmop from database"
5803 db_nslcmop
= self
.db
.get_one(
5804 "nslcmops", {"_id": nslcmop_id
}, fail_on_empty
=False
5806 update_type
= db_nslcmop
["operationParams"]["updateType"]
5808 step
= "Getting nsr from database"
5809 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5810 old_operational_status
= db_nsr
["operational-status"]
5811 db_nsr_update
["operational-status"] = "updating"
5812 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5813 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5815 if update_type
== "CHANGE_VNFPKG":
5817 # Get the input parameters given through update request
5818 vnf_instance_id
= db_nslcmop
["operationParams"][
5819 "changeVnfPackageData"
5820 ].get("vnfInstanceId")
5822 vnfd_id
= db_nslcmop
["operationParams"]["changeVnfPackageData"].get(
5825 timeout_seconds
= db_nslcmop
["operationParams"].get("timeout_ns_update")
5827 step
= "Getting vnfr from database"
5828 db_vnfr
= self
.db
.get_one(
5829 "vnfrs", {"_id": vnf_instance_id
}, fail_on_empty
=False
5832 step
= "Getting vnfds from database"
5834 latest_vnfd
= self
.db
.get_one(
5835 "vnfds", {"_id": vnfd_id
}, fail_on_empty
=False
5837 latest_vnfd_revision
= latest_vnfd
["_admin"].get("revision")
5840 current_vnf_revision
= db_vnfr
.get("revision", 1)
5841 current_vnfd
= self
.db
.get_one(
5843 {"_id": vnfd_id
+ ":" + str(current_vnf_revision
)},
5844 fail_on_empty
=False,
5846 # Charm artifact paths will be filled up later
5848 current_charm_artifact_path
,
5849 target_charm_artifact_path
,
5850 charm_artifact_paths
,
5852 ) = ([], [], [], [])
5854 step
= "Checking if revision has changed in VNFD"
5855 if current_vnf_revision
!= latest_vnfd_revision
:
5857 change_type
= "policy_updated"
5859 # There is new revision of VNFD, update operation is required
5860 current_vnfd_path
= vnfd_id
+ ":" + str(current_vnf_revision
)
5861 latest_vnfd_path
= vnfd_id
+ ":" + str(latest_vnfd_revision
)
5863 step
= "Removing the VNFD packages if they exist in the local path"
5864 shutil
.rmtree(self
.fs
.path
+ current_vnfd_path
, ignore_errors
=True)
5865 shutil
.rmtree(self
.fs
.path
+ latest_vnfd_path
, ignore_errors
=True)
5867 step
= "Get the VNFD packages from FSMongo"
5868 self
.fs
.sync(from_path
=latest_vnfd_path
)
5869 self
.fs
.sync(from_path
=current_vnfd_path
)
5872 "Get the charm-type, charm-id, ee-id if there is deployed VCA"
5874 current_base_folder
= current_vnfd
["_admin"]["storage"]
5875 latest_base_folder
= latest_vnfd
["_admin"]["storage"]
5877 for vca_index
, vca_deployed
in enumerate(
5878 get_iterable(nsr_deployed
, "VCA")
5880 vnf_index
= db_vnfr
.get("member-vnf-index-ref")
5882 # Getting charm-id and charm-type
5883 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5884 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5885 vca_type
= vca_deployed
.get("type")
5886 vdu_count_index
= vca_deployed
.get("vdu_count_index")
5889 ee_id
= vca_deployed
.get("ee_id")
5891 step
= "Getting descriptor config"
5892 if current_vnfd
.get("kdu"):
5894 search_key
= "kdu_name"
5896 search_key
= "vnfd_id"
5898 entity_id
= vca_deployed
.get(search_key
)
5900 descriptor_config
= get_configuration(
5901 current_vnfd
, entity_id
5904 if "execution-environment-list" in descriptor_config
:
5905 ee_list
= descriptor_config
.get(
5906 "execution-environment-list", []
5911 # There could be several charm used in the same VNF
5912 for ee_item
in ee_list
:
5913 if ee_item
.get("juju"):
5915 step
= "Getting charm name"
5916 charm_name
= ee_item
["juju"].get("charm")
5918 step
= "Setting Charm artifact paths"
5919 current_charm_artifact_path
.append(
5920 get_charm_artifact_path(
5921 current_base_folder
,
5924 current_vnf_revision
,
5927 target_charm_artifact_path
.append(
5928 get_charm_artifact_path(
5932 latest_vnfd_revision
,
5935 elif ee_item
.get("helm-chart"):
5936 # add chart to list and all parameters
5937 step
= "Getting helm chart name"
5938 chart_name
= ee_item
.get("helm-chart")
5940 ee_item
.get("helm-version")
5941 and ee_item
.get("helm-version") == "v2"
5945 vca_type
= "helm-v3"
5946 step
= "Setting Helm chart artifact paths"
5948 helm_artifacts
.append(
5950 "current_artifact_path": get_charm_artifact_path(
5951 current_base_folder
,
5954 current_vnf_revision
,
5956 "target_artifact_path": get_charm_artifact_path(
5960 latest_vnfd_revision
,
5963 "vca_index": vca_index
,
5964 "vdu_index": vdu_count_index
,
5968 charm_artifact_paths
= zip(
5969 current_charm_artifact_path
, target_charm_artifact_path
5972 step
= "Checking if software version has changed in VNFD"
5973 if find_software_version(current_vnfd
) != find_software_version(
5977 step
= "Checking if existing VNF has charm"
5978 for current_charm_path
, target_charm_path
in list(
5979 charm_artifact_paths
5981 if current_charm_path
:
5983 "Software version change is not supported as VNF instance {} has charm.".format(
5988 # There is no change in the charm package, then redeploy the VNF
5989 # based on new descriptor
5990 step
= "Redeploying VNF"
5991 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5992 (result
, detailed_status
) = await self
._ns
_redeploy
_vnf
(
5993 nsr_id
, nslcmop_id
, latest_vnfd
, db_vnfr
, db_nsr
5995 if result
== "FAILED":
5996 nslcmop_operation_state
= result
5997 error_description_nslcmop
= detailed_status
5998 db_nslcmop_update
["detailed-status"] = detailed_status
6001 + " step {} Done with result {} {}".format(
6002 step
, nslcmop_operation_state
, detailed_status
6007 step
= "Checking if any charm package has changed or not"
6008 for current_charm_path
, target_charm_path
in list(
6009 charm_artifact_paths
6013 and target_charm_path
6014 and self
.check_charm_hash_changed(
6015 current_charm_path
, target_charm_path
6019 step
= "Checking whether VNF uses juju bundle"
6020 if check_juju_bundle_existence(current_vnfd
):
6023 "Charm upgrade is not supported for the instance which"
6024 " uses juju-bundle: {}".format(
6025 check_juju_bundle_existence(current_vnfd
)
6029 step
= "Upgrading Charm"
6033 ) = await self
._ns
_charm
_upgrade
(
6036 charm_type
=vca_type
,
6037 path
=self
.fs
.path
+ target_charm_path
,
6038 timeout
=timeout_seconds
,
6041 if result
== "FAILED":
6042 nslcmop_operation_state
= result
6043 error_description_nslcmop
= detailed_status
6045 db_nslcmop_update
["detailed-status"] = detailed_status
6048 + " step {} Done with result {} {}".format(
6049 step
, nslcmop_operation_state
, detailed_status
6053 step
= "Updating policies"
6054 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
6055 result
= "COMPLETED"
6056 detailed_status
= "Done"
6057 db_nslcmop_update
["detailed-status"] = "Done"
6060 for item
in helm_artifacts
:
6062 item
["current_artifact_path"]
6063 and item
["target_artifact_path"]
6064 and self
.check_charm_hash_changed(
6065 item
["current_artifact_path"],
6066 item
["target_artifact_path"],
6070 db_update_entry
= "_admin.deployed.VCA.{}.".format(
6073 vnfr_id
= db_vnfr
["_id"]
6074 osm_config
= {"osm": {"ns_id": nsr_id
, "vnf_id": vnfr_id
}}
6076 "collection": "nsrs",
6077 "filter": {"_id": nsr_id
},
6078 "path": db_update_entry
,
6080 vca_type
, namespace
, helm_id
= get_ee_id_parts(item
["ee_id"])
6081 await self
.vca_map
[vca_type
].upgrade_execution_environment(
6082 namespace
=namespace
,
6086 artifact_path
=item
["target_artifact_path"],
6089 vnf_id
= db_vnfr
.get("vnfd-ref")
6090 config_descriptor
= get_configuration(latest_vnfd
, vnf_id
)
6091 self
.logger
.debug("get ssh key block")
6095 ("config-access", "ssh-access", "required"),
6097 # Needed to inject a ssh key
6100 ("config-access", "ssh-access", "default-user"),
6103 "Install configuration Software, getting public ssh key"
6105 pub_key
= await self
.vca_map
[
6107 ].get_ee_ssh_public__key(
6108 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
6112 "Insert public key into VM user={} ssh_key={}".format(
6116 self
.logger
.debug(logging_text
+ step
)
6118 # wait for RO (ip-address) Insert pub_key into VM
6119 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
6129 initial_config_primitive_list
= config_descriptor
.get(
6130 "initial-config-primitive"
6132 config_primitive
= next(
6135 for p
in initial_config_primitive_list
6136 if p
["name"] == "config"
6140 if not config_primitive
:
6143 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
6145 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
6146 if db_vnfr
.get("additionalParamsForVnf"):
6147 deploy_params
.update(
6149 db_vnfr
["additionalParamsForVnf"].copy()
6152 primitive_params_
= self
._map
_primitive
_params
(
6153 config_primitive
, {}, deploy_params
6156 step
= "execute primitive '{}' params '{}'".format(
6157 config_primitive
["name"], primitive_params_
6159 self
.logger
.debug(logging_text
+ step
)
6160 await self
.vca_map
[vca_type
].exec_primitive(
6162 primitive_name
=config_primitive
["name"],
6163 params_dict
=primitive_params_
,
6169 step
= "Updating policies"
6170 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
6171 detailed_status
= "Done"
6172 db_nslcmop_update
["detailed-status"] = "Done"
6174 # If nslcmop_operation_state is None, so any operation is not failed.
6175 if not nslcmop_operation_state
:
6176 nslcmop_operation_state
= "COMPLETED"
6178 # If update CHANGE_VNFPKG nslcmop_operation is successful
6179 # vnf revision need to be updated
6180 vnfr_update
["revision"] = latest_vnfd_revision
6181 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
6185 + " task Done with result {} {}".format(
6186 nslcmop_operation_state
, detailed_status
6189 elif update_type
== "REMOVE_VNF":
6190 # This part is included in https://osm.etsi.org/gerrit/11876
6191 vnf_instance_id
= db_nslcmop
["operationParams"]["removeVnfInstanceId"]
6192 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
6193 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
6194 step
= "Removing VNF"
6195 (result
, detailed_status
) = await self
.remove_vnf(
6196 nsr_id
, nslcmop_id
, vnf_instance_id
6198 if result
== "FAILED":
6199 nslcmop_operation_state
= result
6200 error_description_nslcmop
= detailed_status
6201 db_nslcmop_update
["detailed-status"] = detailed_status
6202 change_type
= "vnf_terminated"
6203 if not nslcmop_operation_state
:
6204 nslcmop_operation_state
= "COMPLETED"
6207 + " task Done with result {} {}".format(
6208 nslcmop_operation_state
, detailed_status
6212 elif update_type
== "OPERATE_VNF":
6213 vnf_id
= db_nslcmop
["operationParams"]["operateVnfData"][
6216 operation_type
= db_nslcmop
["operationParams"]["operateVnfData"][
6219 additional_param
= db_nslcmop
["operationParams"]["operateVnfData"][
6222 (result
, detailed_status
) = await self
.rebuild_start_stop(
6223 nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
6225 if result
== "FAILED":
6226 nslcmop_operation_state
= result
6227 error_description_nslcmop
= detailed_status
6228 db_nslcmop_update
["detailed-status"] = detailed_status
6229 if not nslcmop_operation_state
:
6230 nslcmop_operation_state
= "COMPLETED"
6233 + " task Done with result {} {}".format(
6234 nslcmop_operation_state
, detailed_status
6238 # If nslcmop_operation_state is None, so any operation is not failed.
6239 # All operations are executed in overall.
6240 if not nslcmop_operation_state
:
6241 nslcmop_operation_state
= "COMPLETED"
6242 db_nsr_update
["operational-status"] = old_operational_status
6244 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
6245 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
6247 except asyncio
.CancelledError
:
6249 logging_text
+ "Cancelled Exception while '{}'".format(step
)
6251 exc
= "Operation was cancelled"
6252 except asyncio
.TimeoutError
:
6253 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
6255 except Exception as e
:
6256 exc
= traceback
.format_exc()
6257 self
.logger
.critical(
6258 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6267 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
6268 nslcmop_operation_state
= "FAILED"
6269 db_nsr_update
["operational-status"] = old_operational_status
6271 self
._write
_ns
_status
(
6273 ns_state
=db_nsr
["nsState"],
6274 current_operation
="IDLE",
6275 current_operation_id
=None,
6276 other_update
=db_nsr_update
,
6279 self
._write
_op
_status
(
6282 error_message
=error_description_nslcmop
,
6283 operation_state
=nslcmop_operation_state
,
6284 other_update
=db_nslcmop_update
,
6287 if nslcmop_operation_state
:
6291 "nslcmop_id": nslcmop_id
,
6292 "operationState": nslcmop_operation_state
,
6294 if change_type
in ("vnf_terminated", "policy_updated"):
6295 msg
.update({"vnf_member_index": member_vnf_index
})
6296 await self
.msg
.aiowrite("ns", change_type
, msg
, loop
=self
.loop
)
6297 except Exception as e
:
6299 logging_text
+ "kafka_write notification Exception {}".format(e
)
6301 self
.logger
.debug(logging_text
+ "Exit")
6302 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_update")
6303 return nslcmop_operation_state
, detailed_status
6305 async def scale(self
, nsr_id
, nslcmop_id
):
6306 # Try to lock HA task here
6307 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
6308 if not task_is_locked_by_me
:
6311 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
6312 stage
= ["", "", ""]
6313 tasks_dict_info
= {}
6314 # ^ stage, step, VIM progress
6315 self
.logger
.debug(logging_text
+ "Enter")
6316 # get all needed from database
6318 db_nslcmop_update
= {}
6321 # in case of error, indicates what part of scale was failed to put nsr at error status
6322 scale_process
= None
6323 old_operational_status
= ""
6324 old_config_status
= ""
6327 # wait for any previous tasks in process
6328 step
= "Waiting for previous operations to terminate"
6329 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
6330 self
._write
_ns
_status
(
6333 current_operation
="SCALING",
6334 current_operation_id
=nslcmop_id
,
6337 step
= "Getting nslcmop from database"
6339 step
+ " after having waited for previous tasks to be completed"
6341 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
6343 step
= "Getting nsr from database"
6344 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
6345 old_operational_status
= db_nsr
["operational-status"]
6346 old_config_status
= db_nsr
["config-status"]
6348 step
= "Parsing scaling parameters"
6349 db_nsr_update
["operational-status"] = "scaling"
6350 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6351 nsr_deployed
= db_nsr
["_admin"].get("deployed")
6353 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
6355 ]["member-vnf-index"]
6356 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
6358 ]["scaling-group-descriptor"]
6359 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
6360 # for backward compatibility
6361 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
6362 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
6363 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
6364 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6366 step
= "Getting vnfr from database"
6367 db_vnfr
= self
.db
.get_one(
6368 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
6371 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
6373 step
= "Getting vnfd from database"
6374 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
6376 base_folder
= db_vnfd
["_admin"]["storage"]
6378 step
= "Getting scaling-group-descriptor"
6379 scaling_descriptor
= find_in_list(
6380 get_scaling_aspect(db_vnfd
),
6381 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
6383 if not scaling_descriptor
:
6385 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
6386 "at vnfd:scaling-group-descriptor".format(scaling_group
)
6389 step
= "Sending scale order to VIM"
6390 # TODO check if ns is in a proper status
6392 if not db_nsr
["_admin"].get("scaling-group"):
6397 "_admin.scaling-group": [
6398 {"name": scaling_group
, "nb-scale-op": 0}
6402 admin_scale_index
= 0
6404 for admin_scale_index
, admin_scale_info
in enumerate(
6405 db_nsr
["_admin"]["scaling-group"]
6407 if admin_scale_info
["name"] == scaling_group
:
6408 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
6410 else: # not found, set index one plus last element and add new entry with the name
6411 admin_scale_index
+= 1
6413 "_admin.scaling-group.{}.name".format(admin_scale_index
)
6416 vca_scaling_info
= []
6417 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
6418 if scaling_type
== "SCALE_OUT":
6419 if "aspect-delta-details" not in scaling_descriptor
:
6421 "Aspect delta details not fount in scaling descriptor {}".format(
6422 scaling_descriptor
["name"]
6425 # count if max-instance-count is reached
6426 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6428 scaling_info
["scaling_direction"] = "OUT"
6429 scaling_info
["vdu-create"] = {}
6430 scaling_info
["kdu-create"] = {}
6431 for delta
in deltas
:
6432 for vdu_delta
in delta
.get("vdu-delta", {}):
6433 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
6434 # vdu_index also provides the number of instance of the targeted vdu
6435 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6436 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
6440 additional_params
= (
6441 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
6444 cloud_init_list
= []
6446 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6447 max_instance_count
= 10
6448 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
6449 max_instance_count
= vdu_profile
.get(
6450 "max-number-of-instances", 10
6453 default_instance_num
= get_number_of_instances(
6456 instances_number
= vdu_delta
.get("number-of-instances", 1)
6457 nb_scale_op
+= instances_number
6459 new_instance_count
= nb_scale_op
+ default_instance_num
6460 # Control if new count is over max and vdu count is less than max.
6461 # Then assign new instance count
6462 if new_instance_count
> max_instance_count
> vdu_count
:
6463 instances_number
= new_instance_count
- max_instance_count
6465 instances_number
= instances_number
6467 if new_instance_count
> max_instance_count
:
6469 "reached the limit of {} (max-instance-count) "
6470 "scaling-out operations for the "
6471 "scaling-group-descriptor '{}'".format(
6472 nb_scale_op
, scaling_group
6475 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6477 # TODO Information of its own ip is not available because db_vnfr is not updated.
6478 additional_params
["OSM"] = get_osm_params(
6479 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
6481 cloud_init_list
.append(
6482 self
._parse
_cloud
_init
(
6489 vca_scaling_info
.append(
6491 "osm_vdu_id": vdu_delta
["id"],
6492 "member-vnf-index": vnf_index
,
6494 "vdu_index": vdu_index
+ x
,
6497 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
6498 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6499 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6500 kdu_name
= kdu_profile
["kdu-name"]
6501 resource_name
= kdu_profile
.get("resource-name", "")
6503 # Might have different kdus in the same delta
6504 # Should have list for each kdu
6505 if not scaling_info
["kdu-create"].get(kdu_name
, None):
6506 scaling_info
["kdu-create"][kdu_name
] = []
6508 kdur
= get_kdur(db_vnfr
, kdu_name
)
6509 if kdur
.get("helm-chart"):
6510 k8s_cluster_type
= "helm-chart-v3"
6511 self
.logger
.debug("kdur: {}".format(kdur
))
6513 kdur
.get("helm-version")
6514 and kdur
.get("helm-version") == "v2"
6516 k8s_cluster_type
= "helm-chart"
6517 elif kdur
.get("juju-bundle"):
6518 k8s_cluster_type
= "juju-bundle"
6521 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6522 "juju-bundle. Maybe an old NBI version is running".format(
6523 db_vnfr
["member-vnf-index-ref"], kdu_name
6527 max_instance_count
= 10
6528 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
6529 max_instance_count
= kdu_profile
.get(
6530 "max-number-of-instances", 10
6533 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
6534 deployed_kdu
, _
= get_deployed_kdu(
6535 nsr_deployed
, kdu_name
, vnf_index
6537 if deployed_kdu
is None:
6539 "KDU '{}' for vnf '{}' not deployed".format(
6543 kdu_instance
= deployed_kdu
.get("kdu-instance")
6544 instance_num
= await self
.k8scluster_map
[
6550 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6551 kdu_model
=deployed_kdu
.get("kdu-model"),
6553 kdu_replica_count
= instance_num
+ kdu_delta
.get(
6554 "number-of-instances", 1
6557 # Control if new count is over max and instance_num is less than max.
6558 # Then assign max instance number to kdu replica count
6559 if kdu_replica_count
> max_instance_count
> instance_num
:
6560 kdu_replica_count
= max_instance_count
6561 if kdu_replica_count
> max_instance_count
:
6563 "reached the limit of {} (max-instance-count) "
6564 "scaling-out operations for the "
6565 "scaling-group-descriptor '{}'".format(
6566 instance_num
, scaling_group
6570 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6571 vca_scaling_info
.append(
6573 "osm_kdu_id": kdu_name
,
6574 "member-vnf-index": vnf_index
,
6576 "kdu_index": instance_num
+ x
- 1,
6579 scaling_info
["kdu-create"][kdu_name
].append(
6581 "member-vnf-index": vnf_index
,
6583 "k8s-cluster-type": k8s_cluster_type
,
6584 "resource-name": resource_name
,
6585 "scale": kdu_replica_count
,
6588 elif scaling_type
== "SCALE_IN":
6589 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6591 scaling_info
["scaling_direction"] = "IN"
6592 scaling_info
["vdu-delete"] = {}
6593 scaling_info
["kdu-delete"] = {}
6595 for delta
in deltas
:
6596 for vdu_delta
in delta
.get("vdu-delta", {}):
6597 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6598 min_instance_count
= 0
6599 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6600 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
6601 min_instance_count
= vdu_profile
["min-number-of-instances"]
6603 default_instance_num
= get_number_of_instances(
6604 db_vnfd
, vdu_delta
["id"]
6606 instance_num
= vdu_delta
.get("number-of-instances", 1)
6607 nb_scale_op
-= instance_num
6609 new_instance_count
= nb_scale_op
+ default_instance_num
6611 if new_instance_count
< min_instance_count
< vdu_count
:
6612 instances_number
= min_instance_count
- new_instance_count
6614 instances_number
= instance_num
6616 if new_instance_count
< min_instance_count
:
6618 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6619 "scaling-group-descriptor '{}'".format(
6620 nb_scale_op
, scaling_group
6623 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6624 vca_scaling_info
.append(
6626 "osm_vdu_id": vdu_delta
["id"],
6627 "member-vnf-index": vnf_index
,
6629 "vdu_index": vdu_index
- 1 - x
,
6632 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
6633 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6634 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6635 kdu_name
= kdu_profile
["kdu-name"]
6636 resource_name
= kdu_profile
.get("resource-name", "")
6638 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
6639 scaling_info
["kdu-delete"][kdu_name
] = []
6641 kdur
= get_kdur(db_vnfr
, kdu_name
)
6642 if kdur
.get("helm-chart"):
6643 k8s_cluster_type
= "helm-chart-v3"
6644 self
.logger
.debug("kdur: {}".format(kdur
))
6646 kdur
.get("helm-version")
6647 and kdur
.get("helm-version") == "v2"
6649 k8s_cluster_type
= "helm-chart"
6650 elif kdur
.get("juju-bundle"):
6651 k8s_cluster_type
= "juju-bundle"
6654 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6655 "juju-bundle. Maybe an old NBI version is running".format(
6656 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
6660 min_instance_count
= 0
6661 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
6662 min_instance_count
= kdu_profile
["min-number-of-instances"]
6664 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
6665 deployed_kdu
, _
= get_deployed_kdu(
6666 nsr_deployed
, kdu_name
, vnf_index
6668 if deployed_kdu
is None:
6670 "KDU '{}' for vnf '{}' not deployed".format(
6674 kdu_instance
= deployed_kdu
.get("kdu-instance")
6675 instance_num
= await self
.k8scluster_map
[
6681 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6682 kdu_model
=deployed_kdu
.get("kdu-model"),
6684 kdu_replica_count
= instance_num
- kdu_delta
.get(
6685 "number-of-instances", 1
6688 if kdu_replica_count
< min_instance_count
< instance_num
:
6689 kdu_replica_count
= min_instance_count
6690 if kdu_replica_count
< min_instance_count
:
6692 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6693 "scaling-group-descriptor '{}'".format(
6694 instance_num
, scaling_group
6698 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6699 vca_scaling_info
.append(
6701 "osm_kdu_id": kdu_name
,
6702 "member-vnf-index": vnf_index
,
6704 "kdu_index": instance_num
- x
- 1,
6707 scaling_info
["kdu-delete"][kdu_name
].append(
6709 "member-vnf-index": vnf_index
,
6711 "k8s-cluster-type": k8s_cluster_type
,
6712 "resource-name": resource_name
,
6713 "scale": kdu_replica_count
,
6717 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
6718 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
6719 if scaling_info
["scaling_direction"] == "IN":
6720 for vdur
in reversed(db_vnfr
["vdur"]):
6721 if vdu_delete
.get(vdur
["vdu-id-ref"]):
6722 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
6723 scaling_info
["vdu"].append(
6725 "name": vdur
.get("name") or vdur
.get("vdu-name"),
6726 "vdu_id": vdur
["vdu-id-ref"],
6730 for interface
in vdur
["interfaces"]:
6731 scaling_info
["vdu"][-1]["interface"].append(
6733 "name": interface
["name"],
6734 "ip_address": interface
["ip-address"],
6735 "mac_address": interface
.get("mac-address"),
6738 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
6741 step
= "Executing pre-scale vnf-config-primitive"
6742 if scaling_descriptor
.get("scaling-config-action"):
6743 for scaling_config_action
in scaling_descriptor
[
6744 "scaling-config-action"
6747 scaling_config_action
.get("trigger") == "pre-scale-in"
6748 and scaling_type
== "SCALE_IN"
6750 scaling_config_action
.get("trigger") == "pre-scale-out"
6751 and scaling_type
== "SCALE_OUT"
6753 vnf_config_primitive
= scaling_config_action
[
6754 "vnf-config-primitive-name-ref"
6756 step
= db_nslcmop_update
[
6758 ] = "executing pre-scale scaling-config-action '{}'".format(
6759 vnf_config_primitive
6762 # look for primitive
6763 for config_primitive
in (
6764 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6765 ).get("config-primitive", ()):
6766 if config_primitive
["name"] == vnf_config_primitive
:
6770 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
6771 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
6772 "primitive".format(scaling_group
, vnf_config_primitive
)
6775 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6776 if db_vnfr
.get("additionalParamsForVnf"):
6777 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6779 scale_process
= "VCA"
6780 db_nsr_update
["config-status"] = "configuring pre-scaling"
6781 primitive_params
= self
._map
_primitive
_params
(
6782 config_primitive
, {}, vnfr_params
6785 # Pre-scale retry check: Check if this sub-operation has been executed before
6786 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6789 vnf_config_primitive
,
6793 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6794 # Skip sub-operation
6795 result
= "COMPLETED"
6796 result_detail
= "Done"
6799 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6800 vnf_config_primitive
, result
, result_detail
6804 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6805 # New sub-operation: Get index of this sub-operation
6807 len(db_nslcmop
.get("_admin", {}).get("operations"))
6812 + "vnf_config_primitive={} New sub-operation".format(
6813 vnf_config_primitive
6817 # retry: Get registered params for this existing sub-operation
6818 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6821 vnf_index
= op
.get("member_vnf_index")
6822 vnf_config_primitive
= op
.get("primitive")
6823 primitive_params
= op
.get("primitive_params")
6826 + "vnf_config_primitive={} Sub-operation retry".format(
6827 vnf_config_primitive
6830 # Execute the primitive, either with new (first-time) or registered (reintent) args
6831 ee_descriptor_id
= config_primitive
.get(
6832 "execution-environment-ref"
6834 primitive_name
= config_primitive
.get(
6835 "execution-environment-primitive", vnf_config_primitive
6837 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6838 nsr_deployed
["VCA"],
6839 member_vnf_index
=vnf_index
,
6841 vdu_count_index
=None,
6842 ee_descriptor_id
=ee_descriptor_id
,
6844 result
, result_detail
= await self
._ns
_execute
_primitive
(
6853 + "vnf_config_primitive={} Done with result {} {}".format(
6854 vnf_config_primitive
, result
, result_detail
6857 # Update operationState = COMPLETED | FAILED
6858 self
._update
_suboperation
_status
(
6859 db_nslcmop
, op_index
, result
, result_detail
6862 if result
== "FAILED":
6863 raise LcmException(result_detail
)
6864 db_nsr_update
["config-status"] = old_config_status
6865 scale_process
= None
6869 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
6872 "_admin.scaling-group.{}.time".format(admin_scale_index
)
6875 # SCALE-IN VCA - BEGIN
6876 if vca_scaling_info
:
6877 step
= db_nslcmop_update
[
6879 ] = "Deleting the execution environments"
6880 scale_process
= "VCA"
6881 for vca_info
in vca_scaling_info
:
6882 if vca_info
["type"] == "delete" and not vca_info
.get("osm_kdu_id"):
6883 member_vnf_index
= str(vca_info
["member-vnf-index"])
6885 logging_text
+ "vdu info: {}".format(vca_info
)
6887 if vca_info
.get("osm_vdu_id"):
6888 vdu_id
= vca_info
["osm_vdu_id"]
6889 vdu_index
= int(vca_info
["vdu_index"])
6892 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6893 member_vnf_index
, vdu_id
, vdu_index
6895 stage
[2] = step
= "Scaling in VCA"
6896 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6897 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
6898 config_update
= db_nsr
["configurationStatus"]
6899 for vca_index
, vca
in enumerate(vca_update
):
6901 (vca
or vca
.get("ee_id"))
6902 and vca
["member-vnf-index"] == member_vnf_index
6903 and vca
["vdu_count_index"] == vdu_index
6905 if vca
.get("vdu_id"):
6906 config_descriptor
= get_configuration(
6907 db_vnfd
, vca
.get("vdu_id")
6909 elif vca
.get("kdu_name"):
6910 config_descriptor
= get_configuration(
6911 db_vnfd
, vca
.get("kdu_name")
6914 config_descriptor
= get_configuration(
6915 db_vnfd
, db_vnfd
["id"]
6917 operation_params
= (
6918 db_nslcmop
.get("operationParams") or {}
6920 exec_terminate_primitives
= not operation_params
.get(
6921 "skip_terminate_primitives"
6922 ) and vca
.get("needed_terminate")
6923 task
= asyncio
.ensure_future(
6932 exec_primitives
=exec_terminate_primitives
,
6936 timeout
=self
.timeout
.charm_delete
,
6939 tasks_dict_info
[task
] = "Terminating VCA {}".format(
6942 del vca_update
[vca_index
]
6943 del config_update
[vca_index
]
6944 # wait for pending tasks of terminate primitives
6948 + "Waiting for tasks {}".format(
6949 list(tasks_dict_info
.keys())
6952 error_list
= await self
._wait
_for
_tasks
(
6956 self
.timeout
.charm_delete
, self
.timeout
.ns_terminate
6961 tasks_dict_info
.clear()
6963 raise LcmException("; ".join(error_list
))
6965 db_vca_and_config_update
= {
6966 "_admin.deployed.VCA": vca_update
,
6967 "configurationStatus": config_update
,
6970 "nsrs", db_nsr
["_id"], db_vca_and_config_update
6972 scale_process
= None
6973 # SCALE-IN VCA - END
6976 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
6977 scale_process
= "RO"
6978 if self
.ro_config
.ng
:
6979 await self
._scale
_ng
_ro
(
6980 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
6982 scaling_info
.pop("vdu-create", None)
6983 scaling_info
.pop("vdu-delete", None)
6985 scale_process
= None
6989 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
6990 scale_process
= "KDU"
6991 await self
._scale
_kdu
(
6992 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6994 scaling_info
.pop("kdu-create", None)
6995 scaling_info
.pop("kdu-delete", None)
6997 scale_process
= None
7001 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7003 # SCALE-UP VCA - BEGIN
7004 if vca_scaling_info
:
7005 step
= db_nslcmop_update
[
7007 ] = "Creating new execution environments"
7008 scale_process
= "VCA"
7009 for vca_info
in vca_scaling_info
:
7010 if vca_info
["type"] == "create" and not vca_info
.get("osm_kdu_id"):
7011 member_vnf_index
= str(vca_info
["member-vnf-index"])
7013 logging_text
+ "vdu info: {}".format(vca_info
)
7015 vnfd_id
= db_vnfr
["vnfd-ref"]
7016 if vca_info
.get("osm_vdu_id"):
7017 vdu_index
= int(vca_info
["vdu_index"])
7018 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
7019 if db_vnfr
.get("additionalParamsForVnf"):
7020 deploy_params
.update(
7022 db_vnfr
["additionalParamsForVnf"].copy()
7025 descriptor_config
= get_configuration(
7026 db_vnfd
, db_vnfd
["id"]
7028 if descriptor_config
:
7033 logging_text
=logging_text
7034 + "member_vnf_index={} ".format(member_vnf_index
),
7037 nslcmop_id
=nslcmop_id
,
7043 member_vnf_index
=member_vnf_index
,
7044 vdu_index
=vdu_index
,
7046 deploy_params
=deploy_params
,
7047 descriptor_config
=descriptor_config
,
7048 base_folder
=base_folder
,
7049 task_instantiation_info
=tasks_dict_info
,
7052 vdu_id
= vca_info
["osm_vdu_id"]
7053 vdur
= find_in_list(
7054 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
7056 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
7057 if vdur
.get("additionalParams"):
7058 deploy_params_vdu
= parse_yaml_strings(
7059 vdur
["additionalParams"]
7062 deploy_params_vdu
= deploy_params
7063 deploy_params_vdu
["OSM"] = get_osm_params(
7064 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
7066 if descriptor_config
:
7071 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
7072 member_vnf_index
, vdu_id
, vdu_index
7074 stage
[2] = step
= "Scaling out VCA"
7075 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
7077 logging_text
=logging_text
7078 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
7079 member_vnf_index
, vdu_id
, vdu_index
7083 nslcmop_id
=nslcmop_id
,
7089 member_vnf_index
=member_vnf_index
,
7090 vdu_index
=vdu_index
,
7092 deploy_params
=deploy_params_vdu
,
7093 descriptor_config
=descriptor_config
,
7094 base_folder
=base_folder
,
7095 task_instantiation_info
=tasks_dict_info
,
7098 # SCALE-UP VCA - END
7099 scale_process
= None
7102 # execute primitive service POST-SCALING
7103 step
= "Executing post-scale vnf-config-primitive"
7104 if scaling_descriptor
.get("scaling-config-action"):
7105 for scaling_config_action
in scaling_descriptor
[
7106 "scaling-config-action"
7109 scaling_config_action
.get("trigger") == "post-scale-in"
7110 and scaling_type
== "SCALE_IN"
7112 scaling_config_action
.get("trigger") == "post-scale-out"
7113 and scaling_type
== "SCALE_OUT"
7115 vnf_config_primitive
= scaling_config_action
[
7116 "vnf-config-primitive-name-ref"
7118 step
= db_nslcmop_update
[
7120 ] = "executing post-scale scaling-config-action '{}'".format(
7121 vnf_config_primitive
7124 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
7125 if db_vnfr
.get("additionalParamsForVnf"):
7126 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
7128 # look for primitive
7129 for config_primitive
in (
7130 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
7131 ).get("config-primitive", ()):
7132 if config_primitive
["name"] == vnf_config_primitive
:
7136 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
7137 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
7138 "config-primitive".format(
7139 scaling_group
, vnf_config_primitive
7142 scale_process
= "VCA"
7143 db_nsr_update
["config-status"] = "configuring post-scaling"
7144 primitive_params
= self
._map
_primitive
_params
(
7145 config_primitive
, {}, vnfr_params
7148 # Post-scale retry check: Check if this sub-operation has been executed before
7149 op_index
= self
._check
_or
_add
_scale
_suboperation
(
7152 vnf_config_primitive
,
7156 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
7157 # Skip sub-operation
7158 result
= "COMPLETED"
7159 result_detail
= "Done"
7162 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
7163 vnf_config_primitive
, result
, result_detail
7167 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
7168 # New sub-operation: Get index of this sub-operation
7170 len(db_nslcmop
.get("_admin", {}).get("operations"))
7175 + "vnf_config_primitive={} New sub-operation".format(
7176 vnf_config_primitive
7180 # retry: Get registered params for this existing sub-operation
7181 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
7184 vnf_index
= op
.get("member_vnf_index")
7185 vnf_config_primitive
= op
.get("primitive")
7186 primitive_params
= op
.get("primitive_params")
7189 + "vnf_config_primitive={} Sub-operation retry".format(
7190 vnf_config_primitive
7193 # Execute the primitive, either with new (first-time) or registered (reintent) args
7194 ee_descriptor_id
= config_primitive
.get(
7195 "execution-environment-ref"
7197 primitive_name
= config_primitive
.get(
7198 "execution-environment-primitive", vnf_config_primitive
7200 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
7201 nsr_deployed
["VCA"],
7202 member_vnf_index
=vnf_index
,
7204 vdu_count_index
=None,
7205 ee_descriptor_id
=ee_descriptor_id
,
7207 result
, result_detail
= await self
._ns
_execute
_primitive
(
7216 + "vnf_config_primitive={} Done with result {} {}".format(
7217 vnf_config_primitive
, result
, result_detail
7220 # Update operationState = COMPLETED | FAILED
7221 self
._update
_suboperation
_status
(
7222 db_nslcmop
, op_index
, result
, result_detail
7225 if result
== "FAILED":
7226 raise LcmException(result_detail
)
7227 db_nsr_update
["config-status"] = old_config_status
7228 scale_process
= None
7233 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
7234 db_nsr_update
["operational-status"] = (
7236 if old_operational_status
== "failed"
7237 else old_operational_status
7239 db_nsr_update
["config-status"] = old_config_status
7242 ROclient
.ROClientException
,
7247 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7249 except asyncio
.CancelledError
:
7251 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7253 exc
= "Operation was cancelled"
7254 except Exception as e
:
7255 exc
= traceback
.format_exc()
7256 self
.logger
.critical(
7257 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7261 self
._write
_ns
_status
(
7264 current_operation
="IDLE",
7265 current_operation_id
=None,
7268 stage
[1] = "Waiting for instantiate pending tasks."
7269 self
.logger
.debug(logging_text
+ stage
[1])
7270 exc
= await self
._wait
_for
_tasks
(
7273 self
.timeout
.ns_deploy
,
7281 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7282 nslcmop_operation_state
= "FAILED"
7284 db_nsr_update
["operational-status"] = old_operational_status
7285 db_nsr_update
["config-status"] = old_config_status
7286 db_nsr_update
["detailed-status"] = ""
7288 if "VCA" in scale_process
:
7289 db_nsr_update
["config-status"] = "failed"
7290 if "RO" in scale_process
:
7291 db_nsr_update
["operational-status"] = "failed"
7294 ] = "FAILED scaling nslcmop={} {}: {}".format(
7295 nslcmop_id
, step
, exc
7298 error_description_nslcmop
= None
7299 nslcmop_operation_state
= "COMPLETED"
7300 db_nslcmop_update
["detailed-status"] = "Done"
7302 self
._write
_op
_status
(
7305 error_message
=error_description_nslcmop
,
7306 operation_state
=nslcmop_operation_state
,
7307 other_update
=db_nslcmop_update
,
7310 self
._write
_ns
_status
(
7313 current_operation
="IDLE",
7314 current_operation_id
=None,
7315 other_update
=db_nsr_update
,
7318 if nslcmop_operation_state
:
7322 "nslcmop_id": nslcmop_id
,
7323 "operationState": nslcmop_operation_state
,
7325 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
7326 except Exception as e
:
7328 logging_text
+ "kafka_write notification Exception {}".format(e
)
7330 self
.logger
.debug(logging_text
+ "Exit")
7331 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
7333 async def _scale_kdu(
7334 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
7336 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
7337 for kdu_name
in _scaling_info
:
7338 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
7339 deployed_kdu
, index
= get_deployed_kdu(
7340 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
7342 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
7343 kdu_instance
= deployed_kdu
["kdu-instance"]
7344 kdu_model
= deployed_kdu
.get("kdu-model")
7345 scale
= int(kdu_scaling_info
["scale"])
7346 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
7349 "collection": "nsrs",
7350 "filter": {"_id": nsr_id
},
7351 "path": "_admin.deployed.K8s.{}".format(index
),
7354 step
= "scaling application {}".format(
7355 kdu_scaling_info
["resource-name"]
7357 self
.logger
.debug(logging_text
+ step
)
7359 if kdu_scaling_info
["type"] == "delete":
7360 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7363 and kdu_config
.get("terminate-config-primitive")
7364 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7366 terminate_config_primitive_list
= kdu_config
.get(
7367 "terminate-config-primitive"
7369 terminate_config_primitive_list
.sort(
7370 key
=lambda val
: int(val
["seq"])
7374 terminate_config_primitive
7375 ) in terminate_config_primitive_list
:
7376 primitive_params_
= self
._map
_primitive
_params
(
7377 terminate_config_primitive
, {}, {}
7379 step
= "execute terminate config primitive"
7380 self
.logger
.debug(logging_text
+ step
)
7381 await asyncio
.wait_for(
7382 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7383 cluster_uuid
=cluster_uuid
,
7384 kdu_instance
=kdu_instance
,
7385 primitive_name
=terminate_config_primitive
["name"],
7386 params
=primitive_params_
,
7388 total_timeout
=self
.timeout
.primitive
,
7391 timeout
=self
.timeout
.primitive
7392 * self
.timeout
.primitive_outer_factor
,
7395 await asyncio
.wait_for(
7396 self
.k8scluster_map
[k8s_cluster_type
].scale(
7397 kdu_instance
=kdu_instance
,
7399 resource_name
=kdu_scaling_info
["resource-name"],
7400 total_timeout
=self
.timeout
.scale_on_error
,
7402 cluster_uuid
=cluster_uuid
,
7403 kdu_model
=kdu_model
,
7407 timeout
=self
.timeout
.scale_on_error
7408 * self
.timeout
.scale_on_error_outer_factor
,
7411 if kdu_scaling_info
["type"] == "create":
7412 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7415 and kdu_config
.get("initial-config-primitive")
7416 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7418 initial_config_primitive_list
= kdu_config
.get(
7419 "initial-config-primitive"
7421 initial_config_primitive_list
.sort(
7422 key
=lambda val
: int(val
["seq"])
7425 for initial_config_primitive
in initial_config_primitive_list
:
7426 primitive_params_
= self
._map
_primitive
_params
(
7427 initial_config_primitive
, {}, {}
7429 step
= "execute initial config primitive"
7430 self
.logger
.debug(logging_text
+ step
)
7431 await asyncio
.wait_for(
7432 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7433 cluster_uuid
=cluster_uuid
,
7434 kdu_instance
=kdu_instance
,
7435 primitive_name
=initial_config_primitive
["name"],
7436 params
=primitive_params_
,
7443 async def _scale_ng_ro(
7444 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
7446 nsr_id
= db_nslcmop
["nsInstanceId"]
7447 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7450 # read from db: vnfd's for every vnf
7453 # for each vnf in ns, read vnfd
7454 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
7455 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
7456 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
7457 # if we haven't this vnfd, read it from db
7458 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
7460 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7461 db_vnfds
.append(vnfd
)
7462 n2vc_key
= self
.n2vc
.get_public_key()
7463 n2vc_key_list
= [n2vc_key
]
7466 vdu_scaling_info
.get("vdu-create"),
7467 vdu_scaling_info
.get("vdu-delete"),
7470 # db_vnfr has been updated, update db_vnfrs to use it
7471 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
7472 await self
._instantiate
_ng
_ro
(
7482 start_deploy
=time(),
7483 timeout_ns_deploy
=self
.timeout
.ns_deploy
,
7485 if vdu_scaling_info
.get("vdu-delete"):
7487 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
7490 async def extract_prometheus_scrape_jobs(
7491 self
, ee_id
, artifact_path
, ee_config_descriptor
, vnfr_id
, nsr_id
, target_ip
7493 # look if exist a file called 'prometheus*.j2' and
7494 artifact_content
= self
.fs
.dir_ls(artifact_path
)
7498 for f
in artifact_content
7499 if f
.startswith("prometheus") and f
.endswith(".j2")
7505 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
7509 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
7510 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
7512 vnfr_id
= vnfr_id
.replace("-", "")
7514 "JOB_NAME": vnfr_id
,
7515 "TARGET_IP": target_ip
,
7516 "EXPORTER_POD_IP": host_name
,
7517 "EXPORTER_POD_PORT": host_port
,
7519 job_list
= parse_job(job_data
, variables
)
7520 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
7521 for job
in job_list
:
7523 not isinstance(job
.get("job_name"), str)
7524 or vnfr_id
not in job
["job_name"]
7526 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
7527 job
["nsr_id"] = nsr_id
7528 job
["vnfr_id"] = vnfr_id
7531 async def rebuild_start_stop(
7532 self
, nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
7534 logging_text
= "Task ns={} {}={} ".format(nsr_id
, operation_type
, nslcmop_id
)
7535 self
.logger
.info(logging_text
+ "Enter")
7536 stage
= ["Preparing the environment", ""]
7537 # database nsrs record
7541 # in case of error, indicates what part of scale was failed to put nsr at error status
7542 start_deploy
= time()
7544 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_id
})
7545 vim_account_id
= db_vnfr
.get("vim-account-id")
7546 vim_info_key
= "vim:" + vim_account_id
7547 vdu_id
= additional_param
["vdu_id"]
7548 vdurs
= [item
for item
in db_vnfr
["vdur"] if item
["vdu-id-ref"] == vdu_id
]
7549 vdur
= find_in_list(
7550 vdurs
, lambda vdu
: vdu
["count-index"] == additional_param
["count-index"]
7553 vdu_vim_name
= vdur
["name"]
7554 vim_vm_id
= vdur
["vim_info"][vim_info_key
]["vim_id"]
7555 target_vim
, _
= next(k_v
for k_v
in vdur
["vim_info"].items())
7557 raise LcmException("Target vdu is not found")
7558 self
.logger
.info("vdu_vim_name >> {} ".format(vdu_vim_name
))
7559 # wait for any previous tasks in process
7560 stage
[1] = "Waiting for previous operations to terminate"
7561 self
.logger
.info(stage
[1])
7562 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7564 stage
[1] = "Reading from database."
7565 self
.logger
.info(stage
[1])
7566 self
._write
_ns
_status
(
7569 current_operation
=operation_type
.upper(),
7570 current_operation_id
=nslcmop_id
,
7572 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7575 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
7576 db_nsr_update
["operational-status"] = operation_type
7577 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7581 "vim_vm_id": vim_vm_id
,
7583 "vdu_index": additional_param
["count-index"],
7584 "vdu_id": vdur
["id"],
7585 "target_vim": target_vim
,
7586 "vim_account_id": vim_account_id
,
7589 stage
[1] = "Sending rebuild request to RO... {}".format(desc
)
7590 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7591 self
.logger
.info("ro nsr id: {}".format(nsr_id
))
7592 result_dict
= await self
.RO
.operate(nsr_id
, desc
, operation_type
)
7593 self
.logger
.info("response from RO: {}".format(result_dict
))
7594 action_id
= result_dict
["action_id"]
7595 await self
._wait
_ng
_ro
(
7600 self
.timeout
.operate
,
7602 "start_stop_rebuild",
7604 return "COMPLETED", "Done"
7605 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7606 self
.logger
.error("Exit Exception {}".format(e
))
7608 except asyncio
.CancelledError
:
7609 self
.logger
.error("Cancelled Exception while '{}'".format(stage
))
7610 exc
= "Operation was cancelled"
7611 except Exception as e
:
7612 exc
= traceback
.format_exc()
7613 self
.logger
.critical(
7614 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7616 return "FAILED", "Error in operate VNF {}".format(exc
)
7618 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7620 Get VCA Cloud and VCA Cloud Credentials for the VIM account
7622 :param: vim_account_id: VIM Account ID
7624 :return: (cloud_name, cloud_credential)
7626 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7627 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
7629 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7631 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
7633 :param: vim_account_id: VIM Account ID
7635 :return: (cloud_name, cloud_credential)
7637 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7638 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")
7640 async def migrate(self
, nsr_id
, nslcmop_id
):
7642 Migrate VNFs and VDUs instances in a NS
7644 :param: nsr_id: NS Instance ID
7645 :param: nslcmop_id: nslcmop ID of migrate
7648 # Try to lock HA task here
7649 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7650 if not task_is_locked_by_me
:
7652 logging_text
= "Task ns={} migrate ".format(nsr_id
)
7653 self
.logger
.debug(logging_text
+ "Enter")
7654 # get all needed from database
7656 db_nslcmop_update
= {}
7657 nslcmop_operation_state
= None
7661 # in case of error, indicates what part of scale was failed to put nsr at error status
7662 start_deploy
= time()
7665 # wait for any previous tasks in process
7666 step
= "Waiting for previous operations to terminate"
7667 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7669 self
._write
_ns
_status
(
7672 current_operation
="MIGRATING",
7673 current_operation_id
=nslcmop_id
,
7675 step
= "Getting nslcmop from database"
7677 step
+ " after having waited for previous tasks to be completed"
7679 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7680 migrate_params
= db_nslcmop
.get("operationParams")
7683 target
.update(migrate_params
)
7684 desc
= await self
.RO
.migrate(nsr_id
, target
)
7685 self
.logger
.debug("RO return > {}".format(desc
))
7686 action_id
= desc
["action_id"]
7687 await self
._wait
_ng
_ro
(
7692 self
.timeout
.migrate
,
7693 operation
="migrate",
7695 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7696 self
.logger
.error("Exit Exception {}".format(e
))
7698 except asyncio
.CancelledError
:
7699 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
7700 exc
= "Operation was cancelled"
7701 except Exception as e
:
7702 exc
= traceback
.format_exc()
7703 self
.logger
.critical(
7704 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7707 self
._write
_ns
_status
(
7710 current_operation
="IDLE",
7711 current_operation_id
=None,
7714 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
7715 nslcmop_operation_state
= "FAILED"
7717 nslcmop_operation_state
= "COMPLETED"
7718 db_nslcmop_update
["detailed-status"] = "Done"
7719 db_nsr_update
["detailed-status"] = "Done"
7721 self
._write
_op
_status
(
7725 operation_state
=nslcmop_operation_state
,
7726 other_update
=db_nslcmop_update
,
7728 if nslcmop_operation_state
:
7732 "nslcmop_id": nslcmop_id
,
7733 "operationState": nslcmop_operation_state
,
7735 await self
.msg
.aiowrite("ns", "migrated", msg
, loop
=self
.loop
)
7736 except Exception as e
:
7738 logging_text
+ "kafka_write notification Exception {}".format(e
)
7740 self
.logger
.debug(logging_text
+ "Exit")
7741 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_migrate")
7743 async def heal(self
, nsr_id
, nslcmop_id
):
7747 :param nsr_id: ns instance to heal
7748 :param nslcmop_id: operation to run
7752 # Try to lock HA task here
7753 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7754 if not task_is_locked_by_me
:
7757 logging_text
= "Task ns={} heal={} ".format(nsr_id
, nslcmop_id
)
7758 stage
= ["", "", ""]
7759 tasks_dict_info
= {}
7760 # ^ stage, step, VIM progress
7761 self
.logger
.debug(logging_text
+ "Enter")
7762 # get all needed from database
7764 db_nslcmop_update
= {}
7766 db_vnfrs
= {} # vnf's info indexed by _id
7768 old_operational_status
= ""
7769 old_config_status
= ""
7772 # wait for any previous tasks in process
7773 step
= "Waiting for previous operations to terminate"
7774 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7775 self
._write
_ns
_status
(
7778 current_operation
="HEALING",
7779 current_operation_id
=nslcmop_id
,
7782 step
= "Getting nslcmop from database"
7784 step
+ " after having waited for previous tasks to be completed"
7786 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7788 step
= "Getting nsr from database"
7789 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
7790 old_operational_status
= db_nsr
["operational-status"]
7791 old_config_status
= db_nsr
["config-status"]
7794 "_admin.deployed.RO.operational-status": "healing",
7796 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7798 step
= "Sending heal order to VIM"
7800 logging_text
=logging_text
,
7802 db_nslcmop
=db_nslcmop
,
7807 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
7808 self
.logger
.debug(logging_text
+ stage
[1])
7809 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7810 self
.fs
.sync(db_nsr
["nsd-id"])
7812 # read from db: vnfr's of this ns
7813 step
= "Getting vnfrs from db"
7814 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
7815 for vnfr
in db_vnfrs_list
:
7816 db_vnfrs
[vnfr
["_id"]] = vnfr
7817 self
.logger
.debug("ns.heal db_vnfrs={}".format(db_vnfrs
))
7819 # Check for each target VNF
7820 target_list
= db_nslcmop
.get("operationParams", {}).get("healVnfData", {})
7821 for target_vnf
in target_list
:
7822 # Find this VNF in the list from DB
7823 vnfr_id
= target_vnf
.get("vnfInstanceId", None)
7825 db_vnfr
= db_vnfrs
[vnfr_id
]
7826 vnfd_id
= db_vnfr
.get("vnfd-id")
7827 vnfd_ref
= db_vnfr
.get("vnfd-ref")
7828 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7829 base_folder
= vnfd
["_admin"]["storage"]
7834 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
7835 member_vnf_index
= db_vnfr
.get("member-vnf-index-ref")
7837 # Check each target VDU and deploy N2VC
7838 target_vdu_list
= target_vnf
.get("additionalParams", {}).get(
7841 if not target_vdu_list
:
7842 # Codigo nuevo para crear diccionario
7843 target_vdu_list
= []
7844 for existing_vdu
in db_vnfr
.get("vdur"):
7845 vdu_name
= existing_vdu
.get("vdu-name", None)
7846 vdu_index
= existing_vdu
.get("count-index", 0)
7847 vdu_run_day1
= target_vnf
.get("additionalParams", {}).get(
7850 vdu_to_be_healed
= {
7852 "count-index": vdu_index
,
7853 "run-day1": vdu_run_day1
,
7855 target_vdu_list
.append(vdu_to_be_healed
)
7856 for target_vdu
in target_vdu_list
:
7857 deploy_params_vdu
= target_vdu
7858 # Set run-day1 vnf level value if not vdu level value exists
7859 if not deploy_params_vdu
.get("run-day1") and target_vnf
[
7862 deploy_params_vdu
["run-day1"] = target_vnf
[
7865 vdu_name
= target_vdu
.get("vdu-id", None)
7866 # TODO: Get vdu_id from vdud.
7868 # For multi instance VDU count-index is mandatory
7869 # For single session VDU count-indes is 0
7870 vdu_index
= target_vdu
.get("count-index", 0)
7872 # n2vc_redesign STEP 3 to 6 Deploy N2VC
7873 stage
[1] = "Deploying Execution Environments."
7874 self
.logger
.debug(logging_text
+ stage
[1])
7876 # VNF Level charm. Normal case when proxy charms.
7877 # If target instance is management machine continue with actions: recreate EE for native charms or reinject juju key for proxy charms.
7878 descriptor_config
= get_configuration(vnfd
, vnfd_ref
)
7879 if descriptor_config
:
7880 # Continue if healed machine is management machine
7881 vnf_ip_address
= db_vnfr
.get("ip-address")
7882 target_instance
= None
7883 for instance
in db_vnfr
.get("vdur", None):
7885 instance
["vdu-name"] == vdu_name
7886 and instance
["count-index"] == vdu_index
7888 target_instance
= instance
7890 if vnf_ip_address
== target_instance
.get("ip-address"):
7892 logging_text
=logging_text
7893 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7894 member_vnf_index
, vdu_name
, vdu_index
7898 nslcmop_id
=nslcmop_id
,
7904 member_vnf_index
=member_vnf_index
,
7907 deploy_params
=deploy_params_vdu
,
7908 descriptor_config
=descriptor_config
,
7909 base_folder
=base_folder
,
7910 task_instantiation_info
=tasks_dict_info
,
7914 # VDU Level charm. Normal case with native charms.
7915 descriptor_config
= get_configuration(vnfd
, vdu_name
)
7916 if descriptor_config
:
7918 logging_text
=logging_text
7919 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7920 member_vnf_index
, vdu_name
, vdu_index
7924 nslcmop_id
=nslcmop_id
,
7930 member_vnf_index
=member_vnf_index
,
7931 vdu_index
=vdu_index
,
7933 deploy_params
=deploy_params_vdu
,
7934 descriptor_config
=descriptor_config
,
7935 base_folder
=base_folder
,
7936 task_instantiation_info
=tasks_dict_info
,
7941 ROclient
.ROClientException
,
7946 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7948 except asyncio
.CancelledError
:
7950 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7952 exc
= "Operation was cancelled"
7953 except Exception as e
:
7954 exc
= traceback
.format_exc()
7955 self
.logger
.critical(
7956 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7961 stage
[1] = "Waiting for healing pending tasks."
7962 self
.logger
.debug(logging_text
+ stage
[1])
7963 exc
= await self
._wait
_for
_tasks
(
7966 self
.timeout
.ns_deploy
,
7974 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7975 nslcmop_operation_state
= "FAILED"
7977 db_nsr_update
["operational-status"] = old_operational_status
7978 db_nsr_update
["config-status"] = old_config_status
7981 ] = "FAILED healing nslcmop={} {}: {}".format(nslcmop_id
, step
, exc
)
7982 for task
, task_name
in tasks_dict_info
.items():
7983 if not task
.done() or task
.cancelled() or task
.exception():
7984 if task_name
.startswith(self
.task_name_deploy_vca
):
7985 # A N2VC task is pending
7986 db_nsr_update
["config-status"] = "failed"
7988 # RO task is pending
7989 db_nsr_update
["operational-status"] = "failed"
7991 error_description_nslcmop
= None
7992 nslcmop_operation_state
= "COMPLETED"
7993 db_nslcmop_update
["detailed-status"] = "Done"
7994 db_nsr_update
["detailed-status"] = "Done"
7995 db_nsr_update
["operational-status"] = "running"
7996 db_nsr_update
["config-status"] = "configured"
7998 self
._write
_op
_status
(
8001 error_message
=error_description_nslcmop
,
8002 operation_state
=nslcmop_operation_state
,
8003 other_update
=db_nslcmop_update
,
8006 self
._write
_ns
_status
(
8009 current_operation
="IDLE",
8010 current_operation_id
=None,
8011 other_update
=db_nsr_update
,
8014 if nslcmop_operation_state
:
8018 "nslcmop_id": nslcmop_id
,
8019 "operationState": nslcmop_operation_state
,
8021 await self
.msg
.aiowrite("ns", "healed", msg
, loop
=self
.loop
)
8022 except Exception as e
:
8024 logging_text
+ "kafka_write notification Exception {}".format(e
)
8026 self
.logger
.debug(logging_text
+ "Exit")
8027 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_heal")
8038 :param logging_text: preffix text to use at logging
8039 :param nsr_id: nsr identity
8040 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
8041 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
8042 :return: None or exception
8045 def get_vim_account(vim_account_id
):
8047 if vim_account_id
in db_vims
:
8048 return db_vims
[vim_account_id
]
8049 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
8050 db_vims
[vim_account_id
] = db_vim
8055 ns_params
= db_nslcmop
.get("operationParams")
8056 if ns_params
and ns_params
.get("timeout_ns_heal"):
8057 timeout_ns_heal
= ns_params
["timeout_ns_heal"]
8059 timeout_ns_heal
= self
.timeout
.ns_heal
8063 nslcmop_id
= db_nslcmop
["_id"]
8065 "action_id": nslcmop_id
,
8067 self
.logger
.warning(
8068 "db_nslcmop={} and timeout_ns_heal={}".format(
8069 db_nslcmop
, timeout_ns_heal
8072 target
.update(db_nslcmop
.get("operationParams", {}))
8074 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
8075 desc
= await self
.RO
.recreate(nsr_id
, target
)
8076 self
.logger
.debug("RO return > {}".format(desc
))
8077 action_id
= desc
["action_id"]
8078 # waits for RO to complete because Reinjecting juju key at ro can find VM in state Deleted
8079 await self
._wait
_ng
_ro
(
8086 operation
="healing",
8091 "_admin.deployed.RO.operational-status": "running",
8092 "detailed-status": " ".join(stage
),
8094 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
8095 self
._write
_op
_status
(nslcmop_id
, stage
)
8097 logging_text
+ "ns healed at RO. RO_id={}".format(action_id
)
8100 except Exception as e
:
8101 stage
[2] = "ERROR healing at VIM"
8102 # self.set_vnfr_at_error(db_vnfrs, str(e))
8104 "Error healing at VIM {}".format(e
),
8105 exc_info
=not isinstance(
8108 ROclient
.ROClientException
,
8134 task_instantiation_info
,
8137 # launch instantiate_N2VC in a asyncio task and register task object
8138 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
8139 # if not found, create one entry and update database
8140 # fill db_nsr._admin.deployed.VCA.<index>
8143 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
8147 get_charm_name
= False
8148 if "execution-environment-list" in descriptor_config
:
8149 ee_list
= descriptor_config
.get("execution-environment-list", [])
8150 elif "juju" in descriptor_config
:
8151 ee_list
= [descriptor_config
] # ns charms
8152 if "execution-environment-list" not in descriptor_config
:
8153 # charm name is only required for ns charms
8154 get_charm_name
= True
8155 else: # other types as script are not supported
8158 for ee_item
in ee_list
:
8161 + "_deploy_n2vc ee_item juju={}, helm={}".format(
8162 ee_item
.get("juju"), ee_item
.get("helm-chart")
8165 ee_descriptor_id
= ee_item
.get("id")
8166 if ee_item
.get("juju"):
8167 vca_name
= ee_item
["juju"].get("charm")
8169 charm_name
= self
.find_charm_name(db_nsr
, str(vca_name
))
8172 if ee_item
["juju"].get("charm") is not None
8175 if ee_item
["juju"].get("cloud") == "k8s":
8176 vca_type
= "k8s_proxy_charm"
8177 elif ee_item
["juju"].get("proxy") is False:
8178 vca_type
= "native_charm"
8179 elif ee_item
.get("helm-chart"):
8180 vca_name
= ee_item
["helm-chart"]
8181 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
8184 vca_type
= "helm-v3"
8187 logging_text
+ "skipping non juju neither charm configuration"
8192 for vca_index
, vca_deployed
in enumerate(
8193 db_nsr
["_admin"]["deployed"]["VCA"]
8195 if not vca_deployed
:
8198 vca_deployed
.get("member-vnf-index") == member_vnf_index
8199 and vca_deployed
.get("vdu_id") == vdu_id
8200 and vca_deployed
.get("kdu_name") == kdu_name
8201 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
8202 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
8206 # not found, create one.
8208 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
8211 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
8213 target
+= "/kdu/{}".format(kdu_name
)
8215 "target_element": target
,
8216 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
8217 "member-vnf-index": member_vnf_index
,
8219 "kdu_name": kdu_name
,
8220 "vdu_count_index": vdu_index
,
8221 "operational-status": "init", # TODO revise
8222 "detailed-status": "", # TODO revise
8223 "step": "initial-deploy", # TODO revise
8225 "vdu_name": vdu_name
,
8227 "ee_descriptor_id": ee_descriptor_id
,
8228 "charm_name": charm_name
,
8232 # create VCA and configurationStatus in db
8234 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
8235 "configurationStatus.{}".format(vca_index
): dict(),
8237 self
.update_db_2("nsrs", nsr_id
, db_dict
)
8239 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
8241 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
8242 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
8243 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
8246 task_n2vc
= asyncio
.ensure_future(
8248 logging_text
=logging_text
,
8249 vca_index
=vca_index
,
8255 vdu_index
=vdu_index
,
8256 deploy_params
=deploy_params
,
8257 config_descriptor
=descriptor_config
,
8258 base_folder
=base_folder
,
8259 nslcmop_id
=nslcmop_id
,
8263 ee_config_descriptor
=ee_item
,
8266 self
.lcm_tasks
.register(
8270 "instantiate_N2VC-{}".format(vca_index
),
8273 task_instantiation_info
[
8275 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
8276 member_vnf_index
or "", vdu_id
or ""
8279 async def heal_N2VC(
8296 ee_config_descriptor
,
8298 nsr_id
= db_nsr
["_id"]
8299 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
8300 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
8301 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
8302 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
8304 "collection": "nsrs",
8305 "filter": {"_id": nsr_id
},
8306 "path": db_update_entry
,
8312 element_under_configuration
= nsr_id
8316 vnfr_id
= db_vnfr
["_id"]
8317 osm_config
["osm"]["vnf_id"] = vnfr_id
8319 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
8321 if vca_type
== "native_charm":
8324 index_number
= vdu_index
or 0
8327 element_type
= "VNF"
8328 element_under_configuration
= vnfr_id
8329 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
8331 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
8332 element_type
= "VDU"
8333 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
8334 osm_config
["osm"]["vdu_id"] = vdu_id
8336 namespace
+= ".{}".format(kdu_name
)
8337 element_type
= "KDU"
8338 element_under_configuration
= kdu_name
8339 osm_config
["osm"]["kdu_name"] = kdu_name
8342 if base_folder
["pkg-dir"]:
8343 artifact_path
= "{}/{}/{}/{}".format(
8344 base_folder
["folder"],
8345 base_folder
["pkg-dir"],
8348 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8353 artifact_path
= "{}/Scripts/{}/{}/".format(
8354 base_folder
["folder"],
8357 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8362 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
8364 # get initial_config_primitive_list that applies to this element
8365 initial_config_primitive_list
= config_descriptor
.get(
8366 "initial-config-primitive"
8370 "Initial config primitive list > {}".format(
8371 initial_config_primitive_list
8375 # add config if not present for NS charm
8376 ee_descriptor_id
= ee_config_descriptor
.get("id")
8377 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
8378 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
8379 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
8383 "Initial config primitive list #2 > {}".format(
8384 initial_config_primitive_list
8387 # n2vc_redesign STEP 3.1
8388 # find old ee_id if exists
8389 ee_id
= vca_deployed
.get("ee_id")
8391 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
8392 # create or register execution environment in VCA. Only for native charms when healing
8393 if vca_type
== "native_charm":
8394 step
= "Waiting to VM being up and getting IP address"
8395 self
.logger
.debug(logging_text
+ step
)
8396 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8405 credentials
= {"hostname": rw_mgmt_ip
}
8407 username
= deep_get(
8408 config_descriptor
, ("config-access", "ssh-access", "default-user")
8410 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
8411 # merged. Meanwhile let's get username from initial-config-primitive
8412 if not username
and initial_config_primitive_list
:
8413 for config_primitive
in initial_config_primitive_list
:
8414 for param
in config_primitive
.get("parameter", ()):
8415 if param
["name"] == "ssh-username":
8416 username
= param
["value"]
8420 "Cannot determine the username neither with 'initial-config-primitive' nor with "
8421 "'config-access.ssh-access.default-user'"
8423 credentials
["username"] = username
8425 # n2vc_redesign STEP 3.2
8426 # TODO: Before healing at RO it is needed to destroy native charm units to be deleted.
8427 self
._write
_configuration
_status
(
8429 vca_index
=vca_index
,
8430 status
="REGISTERING",
8431 element_under_configuration
=element_under_configuration
,
8432 element_type
=element_type
,
8435 step
= "register execution environment {}".format(credentials
)
8436 self
.logger
.debug(logging_text
+ step
)
8437 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
8438 credentials
=credentials
,
8439 namespace
=namespace
,
8444 # update ee_id en db
8446 "_admin.deployed.VCA.{}.ee_id".format(vca_index
): ee_id
,
8448 self
.update_db_2("nsrs", nsr_id
, db_dict_ee_id
)
8450 # for compatibility with MON/POL modules, the need model and application name at database
8451 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
8452 # Not sure if this need to be done when healing
8454 ee_id_parts = ee_id.split(".")
8455 db_nsr_update = {db_update_entry + "ee_id": ee_id}
8456 if len(ee_id_parts) >= 2:
8457 model_name = ee_id_parts[0]
8458 application_name = ee_id_parts[1]
8459 db_nsr_update[db_update_entry + "model"] = model_name
8460 db_nsr_update[db_update_entry + "application"] = application_name
8463 # n2vc_redesign STEP 3.3
8464 # Install configuration software. Only for native charms.
8465 step
= "Install configuration Software"
8467 self
._write
_configuration
_status
(
8469 vca_index
=vca_index
,
8470 status
="INSTALLING SW",
8471 element_under_configuration
=element_under_configuration
,
8472 element_type
=element_type
,
8473 # other_update=db_nsr_update,
8477 # TODO check if already done
8478 self
.logger
.debug(logging_text
+ step
)
8480 if vca_type
== "native_charm":
8481 config_primitive
= next(
8482 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
8485 if config_primitive
:
8486 config
= self
._map
_primitive
_params
(
8487 config_primitive
, {}, deploy_params
8489 await self
.vca_map
[vca_type
].install_configuration_sw(
8491 artifact_path
=artifact_path
,
8499 # write in db flag of configuration_sw already installed
8501 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
8504 # Not sure if this need to be done when healing
8506 # add relations for this VCA (wait for other peers related with this VCA)
8507 await self._add_vca_relations(
8508 logging_text=logging_text,
8511 vca_index=vca_index,
8515 # if SSH access is required, then get execution environment SSH public
8516 # if native charm we have waited already to VM be UP
8517 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
8520 # self.logger.debug("get ssh key block")
8522 config_descriptor
, ("config-access", "ssh-access", "required")
8524 # self.logger.debug("ssh key needed")
8525 # Needed to inject a ssh key
8528 ("config-access", "ssh-access", "default-user"),
8530 step
= "Install configuration Software, getting public ssh key"
8531 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
8532 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
8535 step
= "Insert public key into VM user={} ssh_key={}".format(
8539 # self.logger.debug("no need to get ssh key")
8540 step
= "Waiting to VM being up and getting IP address"
8541 self
.logger
.debug(logging_text
+ step
)
8543 # n2vc_redesign STEP 5.1
8544 # wait for RO (ip-address) Insert pub_key into VM
8545 # IMPORTANT: We need do wait for RO to complete healing operation.
8546 await self
._wait
_heal
_ro
(nsr_id
, self
.timeout
.ns_heal
)
8549 rw_mgmt_ip
= await self
.wait_kdu_up(
8550 logging_text
, nsr_id
, vnfr_id
, kdu_name
8553 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8563 rw_mgmt_ip
= None # This is for a NS configuration
8565 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
8567 # store rw_mgmt_ip in deploy params for later replacement
8568 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
8571 # get run-day1 operation parameter
8572 runDay1
= deploy_params
.get("run-day1", False)
8574 "Healing vnf={}, vdu={}, runDay1 ={}".format(vnfr_id
, vdu_id
, runDay1
)
8577 # n2vc_redesign STEP 6 Execute initial config primitive
8578 step
= "execute initial config primitive"
8580 # wait for dependent primitives execution (NS -> VNF -> VDU)
8581 if initial_config_primitive_list
:
8582 await self
._wait
_dependent
_n
2vc
(
8583 nsr_id
, vca_deployed_list
, vca_index
8586 # stage, in function of element type: vdu, kdu, vnf or ns
8587 my_vca
= vca_deployed_list
[vca_index
]
8588 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
8590 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
8591 elif my_vca
.get("member-vnf-index"):
8593 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
8596 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
8598 self
._write
_configuration
_status
(
8599 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
8602 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
8604 check_if_terminated_needed
= True
8605 for initial_config_primitive
in initial_config_primitive_list
:
8606 # adding information on the vca_deployed if it is a NS execution environment
8607 if not vca_deployed
["member-vnf-index"]:
8608 deploy_params
["ns_config_info"] = json
.dumps(
8609 self
._get
_ns
_config
_info
(nsr_id
)
8611 # TODO check if already done
8612 primitive_params_
= self
._map
_primitive
_params
(
8613 initial_config_primitive
, {}, deploy_params
8616 step
= "execute primitive '{}' params '{}'".format(
8617 initial_config_primitive
["name"], primitive_params_
8619 self
.logger
.debug(logging_text
+ step
)
8620 await self
.vca_map
[vca_type
].exec_primitive(
8622 primitive_name
=initial_config_primitive
["name"],
8623 params_dict
=primitive_params_
,
8628 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
8629 if check_if_terminated_needed
:
8630 if config_descriptor
.get("terminate-config-primitive"):
8634 {db_update_entry
+ "needed_terminate": True},
8636 check_if_terminated_needed
= False
8638 # TODO register in database that primitive is done
8640 # STEP 7 Configure metrics
8641 # Not sure if this need to be done when healing
8643 if vca_type == "helm" or vca_type == "helm-v3":
8644 prometheus_jobs = await self.extract_prometheus_scrape_jobs(
8646 artifact_path=artifact_path,
8647 ee_config_descriptor=ee_config_descriptor,
8650 target_ip=rw_mgmt_ip,
8656 {db_update_entry + "prometheus_jobs": prometheus_jobs},
8659 for job in prometheus_jobs:
8662 {"job_name": job["job_name"]},
8665 fail_on_empty=False,
8669 step
= "instantiated at VCA"
8670 self
.logger
.debug(logging_text
+ step
)
8672 self
._write
_configuration
_status
(
8673 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
8676 except Exception as e
: # TODO not use Exception but N2VC exception
8677 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
8679 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
8682 "Exception while {} : {}".format(step
, e
), exc_info
=True
8684 self
._write
_configuration
_status
(
8685 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
8687 raise LcmException("{} {}".format(step
, e
)) from e
8689 async def _wait_heal_ro(
8695 while time() <= start_time
+ timeout
:
8696 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
8697 operational_status_ro
= db_nsr
["_admin"]["deployed"]["RO"][
8698 "operational-status"
8700 self
.logger
.debug("Wait Heal RO > {}".format(operational_status_ro
))
8701 if operational_status_ro
!= "healing":
8703 await asyncio
.sleep(15, loop
=self
.loop
)
8704 else: # timeout_ns_deploy
8705 raise NgRoException("Timeout waiting ns to deploy")
8707 async def vertical_scale(self
, nsr_id
, nslcmop_id
):
8709 Vertical Scale the VDUs in a NS
8711 :param: nsr_id: NS Instance ID
8712 :param: nslcmop_id: nslcmop ID of migrate
8715 # Try to lock HA task here
8716 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
8717 if not task_is_locked_by_me
:
8719 logging_text
= "Task ns={} vertical scale ".format(nsr_id
)
8720 self
.logger
.debug(logging_text
+ "Enter")
8721 # get all needed from database
8723 db_nslcmop_update
= {}
8724 nslcmop_operation_state
= None
8728 # in case of error, indicates what part of scale was failed to put nsr at error status
8729 start_deploy
= time()
8732 # wait for any previous tasks in process
8733 step
= "Waiting for previous operations to terminate"
8734 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
8736 self
._write
_ns
_status
(
8739 current_operation
="VerticalScale",
8740 current_operation_id
=nslcmop_id
,
8742 step
= "Getting nslcmop from database"
8744 step
+ " after having waited for previous tasks to be completed"
8746 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
8747 operationParams
= db_nslcmop
.get("operationParams")
8749 target
.update(operationParams
)
8750 desc
= await self
.RO
.vertical_scale(nsr_id
, target
)
8751 self
.logger
.debug("RO return > {}".format(desc
))
8752 action_id
= desc
["action_id"]
8753 await self
._wait
_ng
_ro
(
8758 self
.timeout
.verticalscale
,
8759 operation
="verticalscale",
8761 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
8762 self
.logger
.error("Exit Exception {}".format(e
))
8764 except asyncio
.CancelledError
:
8765 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
8766 exc
= "Operation was cancelled"
8767 except Exception as e
:
8768 exc
= traceback
.format_exc()
8769 self
.logger
.critical(
8770 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
8773 self
._write
_ns
_status
(
8776 current_operation
="IDLE",
8777 current_operation_id
=None,
8780 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
8781 nslcmop_operation_state
= "FAILED"
8783 nslcmop_operation_state
= "COMPLETED"
8784 db_nslcmop_update
["detailed-status"] = "Done"
8785 db_nsr_update
["detailed-status"] = "Done"
8787 self
._write
_op
_status
(
8791 operation_state
=nslcmop_operation_state
,
8792 other_update
=db_nslcmop_update
,
8794 if nslcmop_operation_state
:
8798 "nslcmop_id": nslcmop_id
,
8799 "operationState": nslcmop_operation_state
,
8801 await self
.msg
.aiowrite("ns", "verticalscaled", msg
, loop
=self
.loop
)
8802 except Exception as e
:
8804 logging_text
+ "kafka_write notification Exception {}".format(e
)
8806 self
.logger
.debug(logging_text
+ "Exit")
8807 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_verticalscale")