1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 from typing
import Any
, Dict
, List
24 import logging
.handlers
36 from osm_lcm
import ROclient
37 from osm_lcm
.data_utils
.lcm_config
import LcmCfg
38 from osm_lcm
.data_utils
.nsr
import (
41 get_deployed_vca_list
,
44 from osm_lcm
.data_utils
.vca
import (
53 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
54 from osm_lcm
.lcm_utils
import (
61 check_juju_bundle_existence
,
62 get_charm_artifact_path
,
65 from osm_lcm
.data_utils
.nsd
import (
66 get_ns_configuration_relation_list
,
70 from osm_lcm
.data_utils
.vnfd
import (
76 get_ee_sorted_initial_config_primitive_list
,
77 get_ee_sorted_terminate_config_primitive_list
,
79 get_virtual_link_profiles
,
84 get_number_of_instances
,
86 get_kdu_resource_profile
,
87 find_software_version
,
90 from osm_lcm
.data_utils
.list_utils
import find_in_list
91 from osm_lcm
.data_utils
.vnfr
import (
95 get_volumes_from_instantiation_params
,
97 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
98 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
99 from n2vc
.definitions
import RelationEndpoint
100 from n2vc
.k8s_helm_conn
import K8sHelmConnector
101 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
102 from n2vc
.k8s_juju_conn
import K8sJujuConnector
104 from osm_common
.dbbase
import DbException
105 from osm_common
.fsbase
import FsException
107 from osm_lcm
.data_utils
.database
.database
import Database
108 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
109 from osm_lcm
.data_utils
.wim
import (
111 get_target_wim_attrs
,
112 select_feasible_wim_account
,
115 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
116 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
118 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
119 from osm_lcm
.osm_config
import OsmConfigBuilder
120 from osm_lcm
.prometheus
import parse_job
122 from copy
import copy
, deepcopy
123 from time
import time
124 from uuid
import uuid4
126 from random
import randint
128 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
131 class NsLcm(LcmBase
):
132 SUBOPERATION_STATUS_NOT_FOUND
= -1
133 SUBOPERATION_STATUS_NEW
= -2
134 SUBOPERATION_STATUS_SKIP
= -3
135 task_name_deploy_vca
= "Deploying VCA"
137 def __init__(self
, msg
, lcm_tasks
, config
: LcmCfg
, loop
):
139 Init, Connect to database, filesystem storage, and messaging
140 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
143 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
145 self
.db
= Database().instance
.db
146 self
.fs
= Filesystem().instance
.fs
148 self
.lcm_tasks
= lcm_tasks
149 self
.timeout
= config
.timeout
150 self
.ro_config
= config
.RO
151 self
.vca_config
= config
.VCA
153 # create N2VC connector
154 self
.n2vc
= N2VCJujuConnector(
157 on_update_db
=self
._on
_update
_n
2vc
_db
,
162 self
.conn_helm_ee
= LCMHelmConn(
165 vca_config
=self
.vca_config
,
166 on_update_db
=self
._on
_update
_n
2vc
_db
,
169 self
.k8sclusterhelm2
= K8sHelmConnector(
170 kubectl_command
=self
.vca_config
.kubectlpath
,
171 helm_command
=self
.vca_config
.helmpath
,
178 self
.k8sclusterhelm3
= K8sHelm3Connector(
179 kubectl_command
=self
.vca_config
.kubectlpath
,
180 helm_command
=self
.vca_config
.helm3path
,
187 self
.k8sclusterjuju
= K8sJujuConnector(
188 kubectl_command
=self
.vca_config
.kubectlpath
,
189 juju_command
=self
.vca_config
.jujupath
,
192 on_update_db
=self
._on
_update
_k
8s
_db
,
197 self
.k8scluster_map
= {
198 "helm-chart": self
.k8sclusterhelm2
,
199 "helm-chart-v3": self
.k8sclusterhelm3
,
200 "chart": self
.k8sclusterhelm3
,
201 "juju-bundle": self
.k8sclusterjuju
,
202 "juju": self
.k8sclusterjuju
,
206 "lxc_proxy_charm": self
.n2vc
,
207 "native_charm": self
.n2vc
,
208 "k8s_proxy_charm": self
.n2vc
,
209 "helm": self
.conn_helm_ee
,
210 "helm-v3": self
.conn_helm_ee
,
214 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
.to_dict())
216 self
.op_status_map
= {
217 "instantiation": self
.RO
.status
,
218 "termination": self
.RO
.status
,
219 "migrate": self
.RO
.status
,
220 "healing": self
.RO
.recreate_status
,
221 "verticalscale": self
.RO
.status
,
222 "start_stop_rebuild": self
.RO
.status
,
226 def increment_ip_mac(ip_mac
, vm_index
=1):
227 if not isinstance(ip_mac
, str):
230 # try with ipv4 look for last dot
231 i
= ip_mac
.rfind(".")
234 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
235 # try with ipv6 or mac look for last colon. Operate in hex
236 i
= ip_mac
.rfind(":")
239 # format in hex, len can be 2 for mac or 4 for ipv6
240 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
241 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
247 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
249 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
252 # TODO filter RO descriptor fields...
256 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
257 db_dict
["deploymentStatus"] = ro_descriptor
258 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
260 except Exception as e
:
262 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id
, e
)
265 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
267 # remove last dot from path (if exists)
268 if path
.endswith("."):
271 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
272 # .format(table, filter, path, updated_data))
275 nsr_id
= filter.get("_id")
277 # read ns record from database
278 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
279 current_ns_status
= nsr
.get("nsState")
281 # get vca status for NS
282 status_dict
= await self
.n2vc
.get_status(
283 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
288 db_dict
["vcaStatus"] = status_dict
290 # update configurationStatus for this VCA
292 vca_index
= int(path
[path
.rfind(".") + 1 :])
295 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
297 vca_status
= vca_list
[vca_index
].get("status")
299 configuration_status_list
= nsr
.get("configurationStatus")
300 config_status
= configuration_status_list
[vca_index
].get("status")
302 if config_status
== "BROKEN" and vca_status
!= "failed":
303 db_dict
["configurationStatus"][vca_index
] = "READY"
304 elif config_status
!= "BROKEN" and vca_status
== "failed":
305 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
306 except Exception as e
:
307 # not update configurationStatus
308 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
310 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
311 # if nsState = 'DEGRADED' check if all is OK
313 if current_ns_status
in ("READY", "DEGRADED"):
314 error_description
= ""
316 if status_dict
.get("machines"):
317 for machine_id
in status_dict
.get("machines"):
318 machine
= status_dict
.get("machines").get(machine_id
)
319 # check machine agent-status
320 if machine
.get("agent-status"):
321 s
= machine
.get("agent-status").get("status")
324 error_description
+= (
325 "machine {} agent-status={} ; ".format(
329 # check machine instance status
330 if machine
.get("instance-status"):
331 s
= machine
.get("instance-status").get("status")
334 error_description
+= (
335 "machine {} instance-status={} ; ".format(
340 if status_dict
.get("applications"):
341 for app_id
in status_dict
.get("applications"):
342 app
= status_dict
.get("applications").get(app_id
)
343 # check application status
344 if app
.get("status"):
345 s
= app
.get("status").get("status")
348 error_description
+= (
349 "application {} status={} ; ".format(app_id
, s
)
352 if error_description
:
353 db_dict
["errorDescription"] = error_description
354 if current_ns_status
== "READY" and is_degraded
:
355 db_dict
["nsState"] = "DEGRADED"
356 if current_ns_status
== "DEGRADED" and not is_degraded
:
357 db_dict
["nsState"] = "READY"
360 self
.update_db_2("nsrs", nsr_id
, db_dict
)
362 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
364 except Exception as e
:
365 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
367 async def _on_update_k8s_db(
368 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None, cluster_type
="juju"
371 Updating vca status in NSR record
372 :param cluster_uuid: UUID of a k8s cluster
373 :param kdu_instance: The unique name of the KDU instance
374 :param filter: To get nsr_id
375 :cluster_type: The cluster type (juju, k8s)
379 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
380 # .format(cluster_uuid, kdu_instance, filter))
382 nsr_id
= filter.get("_id")
384 vca_status
= await self
.k8scluster_map
[cluster_type
].status_kdu(
385 cluster_uuid
=cluster_uuid
,
386 kdu_instance
=kdu_instance
,
388 complete_status
=True,
394 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
397 f
"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
401 self
.update_db_2("nsrs", nsr_id
, db_dict
)
402 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
404 except Exception as e
:
405 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
408 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
411 undefined
=StrictUndefined
,
412 autoescape
=select_autoescape(default_for_string
=True, default
=True),
414 template
= env
.from_string(cloud_init_text
)
415 return template
.render(additional_params
or {})
416 except UndefinedError
as e
:
418 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
419 "file, must be provided in the instantiation parameters inside the "
420 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
422 except (TemplateError
, TemplateNotFound
) as e
:
424 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
429 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
430 cloud_init_content
= cloud_init_file
= None
432 if vdu
.get("cloud-init-file"):
433 base_folder
= vnfd
["_admin"]["storage"]
434 if base_folder
["pkg-dir"]:
435 cloud_init_file
= "{}/{}/cloud_init/{}".format(
436 base_folder
["folder"],
437 base_folder
["pkg-dir"],
438 vdu
["cloud-init-file"],
441 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
442 base_folder
["folder"],
443 vdu
["cloud-init-file"],
445 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
446 cloud_init_content
= ci_file
.read()
447 elif vdu
.get("cloud-init"):
448 cloud_init_content
= vdu
["cloud-init"]
450 return cloud_init_content
451 except FsException
as e
:
453 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
454 vnfd
["id"], vdu
["id"], cloud_init_file
, e
458 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
460 (vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]), {}
462 additional_params
= vdur
.get("additionalParams")
463 return parse_yaml_strings(additional_params
)
465 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
467 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
468 :param vnfd: input vnfd
469 :param new_id: overrides vnf id if provided
470 :param additionalParams: Instantiation params for VNFs provided
471 :param nsrId: Id of the NSR
472 :return: copy of vnfd
474 vnfd_RO
= deepcopy(vnfd
)
475 # remove unused by RO configuration, monitoring, scaling and internal keys
476 vnfd_RO
.pop("_id", None)
477 vnfd_RO
.pop("_admin", None)
478 vnfd_RO
.pop("monitoring-param", None)
479 vnfd_RO
.pop("scaling-group-descriptor", None)
480 vnfd_RO
.pop("kdu", None)
481 vnfd_RO
.pop("k8s-cluster", None)
483 vnfd_RO
["id"] = new_id
485 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
486 for vdu
in get_iterable(vnfd_RO
, "vdu"):
487 vdu
.pop("cloud-init-file", None)
488 vdu
.pop("cloud-init", None)
492 def ip_profile_2_RO(ip_profile
):
493 RO_ip_profile
= deepcopy(ip_profile
)
494 if "dns-server" in RO_ip_profile
:
495 if isinstance(RO_ip_profile
["dns-server"], list):
496 RO_ip_profile
["dns-address"] = []
497 for ds
in RO_ip_profile
.pop("dns-server"):
498 RO_ip_profile
["dns-address"].append(ds
["address"])
500 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
501 if RO_ip_profile
.get("ip-version") == "ipv4":
502 RO_ip_profile
["ip-version"] = "IPv4"
503 if RO_ip_profile
.get("ip-version") == "ipv6":
504 RO_ip_profile
["ip-version"] = "IPv6"
505 if "dhcp-params" in RO_ip_profile
:
506 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
509 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
510 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
511 if db_vim
["_admin"]["operationalState"] != "ENABLED":
513 "VIM={} is not available. operationalState={}".format(
514 vim_account
, db_vim
["_admin"]["operationalState"]
517 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
520 def get_ro_wim_id_for_wim_account(self
, wim_account
):
521 if isinstance(wim_account
, str):
522 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
523 if db_wim
["_admin"]["operationalState"] != "ENABLED":
525 "WIM={} is not available. operationalState={}".format(
526 wim_account
, db_wim
["_admin"]["operationalState"]
529 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
534 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
536 db_vdu_push_list
= []
538 db_update
= {"_admin.modified": time()}
540 for vdu_id
, vdu_count
in vdu_create
.items():
544 for vdur
in reversed(db_vnfr
["vdur"])
545 if vdur
["vdu-id-ref"] == vdu_id
550 # Read the template saved in the db:
552 "No vdur in the database. Using the vdur-template to scale"
554 vdur_template
= db_vnfr
.get("vdur-template")
555 if not vdur_template
:
557 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
561 vdur
= vdur_template
[0]
562 # Delete a template from the database after using it
565 {"_id": db_vnfr
["_id"]},
567 pull
={"vdur-template": {"_id": vdur
["_id"]}},
569 for count
in range(vdu_count
):
570 vdur_copy
= deepcopy(vdur
)
571 vdur_copy
["status"] = "BUILD"
572 vdur_copy
["status-detailed"] = None
573 vdur_copy
["ip-address"] = None
574 vdur_copy
["_id"] = str(uuid4())
575 vdur_copy
["count-index"] += count
+ 1
576 vdur_copy
["id"] = "{}-{}".format(
577 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
579 vdur_copy
.pop("vim_info", None)
580 for iface
in vdur_copy
["interfaces"]:
581 if iface
.get("fixed-ip"):
582 iface
["ip-address"] = self
.increment_ip_mac(
583 iface
["ip-address"], count
+ 1
586 iface
.pop("ip-address", None)
587 if iface
.get("fixed-mac"):
588 iface
["mac-address"] = self
.increment_ip_mac(
589 iface
["mac-address"], count
+ 1
592 iface
.pop("mac-address", None)
596 ) # only first vdu can be managment of vnf
597 db_vdu_push_list
.append(vdur_copy
)
598 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
600 if len(db_vnfr
["vdur"]) == 1:
601 # The scale will move to 0 instances
603 "Scaling to 0 !, creating the template with the last vdur"
605 template_vdur
= [db_vnfr
["vdur"][0]]
606 for vdu_id
, vdu_count
in vdu_delete
.items():
608 indexes_to_delete
= [
610 for iv
in enumerate(db_vnfr
["vdur"])
611 if iv
[1]["vdu-id-ref"] == vdu_id
615 "vdur.{}.status".format(i
): "DELETING"
616 for i
in indexes_to_delete
[-vdu_count
:]
620 # it must be deleted one by one because common.db does not allow otherwise
623 for v
in reversed(db_vnfr
["vdur"])
624 if v
["vdu-id-ref"] == vdu_id
626 for vdu
in vdus_to_delete
[:vdu_count
]:
629 {"_id": db_vnfr
["_id"]},
631 pull
={"vdur": {"_id": vdu
["_id"]}},
635 db_push
["vdur"] = db_vdu_push_list
637 db_push
["vdur-template"] = template_vdur
640 db_vnfr
["vdur-template"] = template_vdur
641 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
642 # modify passed dictionary db_vnfr
643 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
644 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
646 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
648 Updates database nsr with the RO info for the created vld
649 :param ns_update_nsr: dictionary to be filled with the updated info
650 :param db_nsr: content of db_nsr. This is also modified
651 :param nsr_desc_RO: nsr descriptor from RO
652 :return: Nothing, LcmException is raised on errors
655 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
656 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
657 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
659 vld
["vim-id"] = net_RO
.get("vim_net_id")
660 vld
["name"] = net_RO
.get("vim_name")
661 vld
["status"] = net_RO
.get("status")
662 vld
["status-detailed"] = net_RO
.get("error_msg")
663 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
667 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
670 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
672 for db_vnfr
in db_vnfrs
.values():
673 vnfr_update
= {"status": "ERROR"}
674 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
675 if "status" not in vdur
:
676 vdur
["status"] = "ERROR"
677 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
679 vdur
["status-detailed"] = str(error_text
)
681 "vdur.{}.status-detailed".format(vdu_index
)
683 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
684 except DbException
as e
:
685 self
.logger
.error("Cannot update vnf. {}".format(e
))
687 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
689 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
690 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
691 :param nsr_desc_RO: nsr descriptor from RO
692 :return: Nothing, LcmException is raised on errors
694 for vnf_index
, db_vnfr
in db_vnfrs
.items():
695 for vnf_RO
in nsr_desc_RO
["vnfs"]:
696 if vnf_RO
["member_vnf_index"] != vnf_index
:
699 if vnf_RO
.get("ip_address"):
700 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
[
703 elif not db_vnfr
.get("ip-address"):
704 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
705 raise LcmExceptionNoMgmtIP(
706 "ns member_vnf_index '{}' has no IP address".format(
711 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
712 vdur_RO_count_index
= 0
713 if vdur
.get("pdu-type"):
715 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
716 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
718 if vdur
["count-index"] != vdur_RO_count_index
:
719 vdur_RO_count_index
+= 1
721 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
722 if vdur_RO
.get("ip_address"):
723 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
725 vdur
["ip-address"] = None
726 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
727 vdur
["name"] = vdur_RO
.get("vim_name")
728 vdur
["status"] = vdur_RO
.get("status")
729 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
730 for ifacer
in get_iterable(vdur
, "interfaces"):
731 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
732 if ifacer
["name"] == interface_RO
.get("internal_name"):
733 ifacer
["ip-address"] = interface_RO
.get(
736 ifacer
["mac-address"] = interface_RO
.get(
742 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
743 "from VIM info".format(
744 vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]
747 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
751 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
753 vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]
757 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
758 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
759 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
761 vld
["vim-id"] = net_RO
.get("vim_net_id")
762 vld
["name"] = net_RO
.get("vim_name")
763 vld
["status"] = net_RO
.get("status")
764 vld
["status-detailed"] = net_RO
.get("error_msg")
765 vnfr_update
["vld.{}".format(vld_index
)] = vld
769 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
774 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
779 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
784 def _get_ns_config_info(self
, nsr_id
):
786 Generates a mapping between vnf,vdu elements and the N2VC id
787 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
788 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
789 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
790 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
792 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
793 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
795 ns_config_info
= {"osm-config-mapping": mapping
}
796 for vca
in vca_deployed_list
:
797 if not vca
["member-vnf-index"]:
799 if not vca
["vdu_id"]:
800 mapping
[vca
["member-vnf-index"]] = vca
["application"]
804 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
806 ] = vca
["application"]
807 return ns_config_info
809 async def _instantiate_ng_ro(
826 def get_vim_account(vim_account_id
):
828 if vim_account_id
in db_vims
:
829 return db_vims
[vim_account_id
]
830 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
831 db_vims
[vim_account_id
] = db_vim
834 # modify target_vld info with instantiation parameters
835 def parse_vld_instantiation_params(
836 target_vim
, target_vld
, vld_params
, target_sdn
838 if vld_params
.get("ip-profile"):
839 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
[
842 if vld_params
.get("provider-network"):
843 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
846 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
847 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
851 # check if WIM is needed; if needed, choose a feasible WIM able to connect VIMs
852 # if wim_account_id is specified in vld_params, validate if it is feasible.
853 wim_account_id
, db_wim
= select_feasible_wim_account(
854 db_nsr
, db_vnfrs
, target_vld
, vld_params
, self
.logger
858 # WIM is needed and a feasible one was found, populate WIM target and SDN ports
859 self
.logger
.info("WIM selected: {:s}".format(str(wim_account_id
)))
860 # update vld_params with correct WIM account Id
861 vld_params
["wimAccountId"] = wim_account_id
863 target_wim
= "wim:{}".format(wim_account_id
)
864 target_wim_attrs
= get_target_wim_attrs(nsr_id
, target_vld
, vld_params
)
865 sdn_ports
= get_sdn_ports(vld_params
, db_wim
)
866 if len(sdn_ports
) > 0:
867 target_vld
["vim_info"][target_wim
] = target_wim_attrs
868 target_vld
["vim_info"][target_wim
]["sdn-ports"] = sdn_ports
871 "Target VLD with WIM data: {:s}".format(str(target_vld
))
874 for param
in ("vim-network-name", "vim-network-id"):
875 if vld_params
.get(param
):
876 if isinstance(vld_params
[param
], dict):
877 for vim
, vim_net
in vld_params
[param
].items():
878 other_target_vim
= "vim:" + vim
880 target_vld
["vim_info"],
881 (other_target_vim
, param
.replace("-", "_")),
884 else: # isinstance str
885 target_vld
["vim_info"][target_vim
][
886 param
.replace("-", "_")
887 ] = vld_params
[param
]
888 if vld_params
.get("common_id"):
889 target_vld
["common_id"] = vld_params
.get("common_id")
891 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
892 def update_ns_vld_target(target
, ns_params
):
893 for vnf_params
in ns_params
.get("vnf", ()):
894 if vnf_params
.get("vimAccountId"):
898 for vnfr
in db_vnfrs
.values()
899 if vnf_params
["member-vnf-index"]
900 == vnfr
["member-vnf-index-ref"]
904 vdur
= next((vdur
for vdur
in target_vnf
.get("vdur", ())), None)
907 for a_index
, a_vld
in enumerate(target
["ns"]["vld"]):
908 target_vld
= find_in_list(
909 get_iterable(vdur
, "interfaces"),
910 lambda iface
: iface
.get("ns-vld-id") == a_vld
["name"],
913 vld_params
= find_in_list(
914 get_iterable(ns_params
, "vld"),
915 lambda v_vld
: v_vld
["name"] in (a_vld
["name"], a_vld
["id"]),
919 if vnf_params
.get("vimAccountId") not in a_vld
.get(
922 target_vim_network_list
= [
923 v
for _
, v
in a_vld
.get("vim_info").items()
925 target_vim_network_name
= next(
927 item
.get("vim_network_name", "")
928 for item
in target_vim_network_list
933 target
["ns"]["vld"][a_index
].get("vim_info").update(
935 "vim:{}".format(vnf_params
["vimAccountId"]): {
936 "vim_network_name": target_vim_network_name
,
942 for param
in ("vim-network-name", "vim-network-id"):
943 if vld_params
.get(param
) and isinstance(
944 vld_params
[param
], dict
946 for vim
, vim_net
in vld_params
[
949 other_target_vim
= "vim:" + vim
951 target
["ns"]["vld"][a_index
].get(
956 param
.replace("-", "_"),
961 nslcmop_id
= db_nslcmop
["_id"]
963 "name": db_nsr
["name"],
966 "image": deepcopy(db_nsr
["image"]),
967 "flavor": deepcopy(db_nsr
["flavor"]),
968 "action_id": nslcmop_id
,
969 "cloud_init_content": {},
971 for image
in target
["image"]:
972 image
["vim_info"] = {}
973 for flavor
in target
["flavor"]:
974 flavor
["vim_info"] = {}
975 if db_nsr
.get("affinity-or-anti-affinity-group"):
976 target
["affinity-or-anti-affinity-group"] = deepcopy(
977 db_nsr
["affinity-or-anti-affinity-group"]
979 for affinity_or_anti_affinity_group
in target
[
980 "affinity-or-anti-affinity-group"
982 affinity_or_anti_affinity_group
["vim_info"] = {}
984 if db_nslcmop
.get("lcmOperationType") != "instantiate":
985 # get parameters of instantiation:
986 db_nslcmop_instantiate
= self
.db
.get_list(
989 "nsInstanceId": db_nslcmop
["nsInstanceId"],
990 "lcmOperationType": "instantiate",
993 ns_params
= db_nslcmop_instantiate
.get("operationParams")
995 ns_params
= db_nslcmop
.get("operationParams")
996 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
997 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
1000 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
1001 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
1004 "name": vld
["name"],
1005 "mgmt-network": vld
.get("mgmt-network", False),
1006 "type": vld
.get("type"),
1009 "vim_network_name": vld
.get("vim-network-name"),
1010 "vim_account_id": ns_params
["vimAccountId"],
1014 # check if this network needs SDN assist
1015 if vld
.get("pci-interfaces"):
1016 db_vim
= get_vim_account(ns_params
["vimAccountId"])
1017 sdnc_id
= db_vim
["config"].get("sdn-controller")
1019 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
1020 target_sdn
= "sdn:{}".format(sdnc_id
)
1021 target_vld
["vim_info"][target_sdn
] = {
1023 "target_vim": target_vim
,
1025 "type": vld
.get("type"),
1028 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
1029 for nsd_vnf_profile
in nsd_vnf_profiles
:
1030 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
1031 if cp
["virtual-link-profile-id"] == vld
["id"]:
1033 "member_vnf:{}.{}".format(
1034 cp
["constituent-cpd-id"][0][
1035 "constituent-base-element-id"
1037 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
1039 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
1041 # check at nsd descriptor, if there is an ip-profile
1043 nsd_vlp
= find_in_list(
1044 get_virtual_link_profiles(nsd
),
1045 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
1050 and nsd_vlp
.get("virtual-link-protocol-data")
1051 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1053 ip_profile_source_data
= nsd_vlp
["virtual-link-protocol-data"][
1056 ip_profile_dest_data
= {}
1057 if "ip-version" in ip_profile_source_data
:
1058 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1061 if "cidr" in ip_profile_source_data
:
1062 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1065 if "gateway-ip" in ip_profile_source_data
:
1066 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
[
1069 if "dhcp-enabled" in ip_profile_source_data
:
1070 ip_profile_dest_data
["dhcp-params"] = {
1071 "enabled": ip_profile_source_data
["dhcp-enabled"]
1073 vld_params
["ip-profile"] = ip_profile_dest_data
1075 # update vld_params with instantiation params
1076 vld_instantiation_params
= find_in_list(
1077 get_iterable(ns_params
, "vld"),
1078 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
1080 if vld_instantiation_params
:
1081 vld_params
.update(vld_instantiation_params
)
1082 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
1083 target
["ns"]["vld"].append(target_vld
)
1084 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1085 update_ns_vld_target(target
, ns_params
)
1087 for vnfr
in db_vnfrs
.values():
1088 vnfd
= find_in_list(
1089 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
1091 vnf_params
= find_in_list(
1092 get_iterable(ns_params
, "vnf"),
1093 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
1095 target_vnf
= deepcopy(vnfr
)
1096 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
1097 for vld
in target_vnf
.get("vld", ()):
1098 # check if connected to a ns.vld, to fill target'
1099 vnf_cp
= find_in_list(
1100 vnfd
.get("int-virtual-link-desc", ()),
1101 lambda cpd
: cpd
.get("id") == vld
["id"],
1104 ns_cp
= "member_vnf:{}.{}".format(
1105 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
1107 if cp2target
.get(ns_cp
):
1108 vld
["target"] = cp2target
[ns_cp
]
1111 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
1113 # check if this network needs SDN assist
1115 if vld
.get("pci-interfaces"):
1116 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1117 sdnc_id
= db_vim
["config"].get("sdn-controller")
1119 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
1120 target_sdn
= "sdn:{}".format(sdnc_id
)
1121 vld
["vim_info"][target_sdn
] = {
1123 "target_vim": target_vim
,
1125 "type": vld
.get("type"),
1128 # check at vnfd descriptor, if there is an ip-profile
1130 vnfd_vlp
= find_in_list(
1131 get_virtual_link_profiles(vnfd
),
1132 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
1136 and vnfd_vlp
.get("virtual-link-protocol-data")
1137 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1139 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"][
1142 ip_profile_dest_data
= {}
1143 if "ip-version" in ip_profile_source_data
:
1144 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1147 if "cidr" in ip_profile_source_data
:
1148 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1151 if "gateway-ip" in ip_profile_source_data
:
1152 ip_profile_dest_data
[
1154 ] = ip_profile_source_data
["gateway-ip"]
1155 if "dhcp-enabled" in ip_profile_source_data
:
1156 ip_profile_dest_data
["dhcp-params"] = {
1157 "enabled": ip_profile_source_data
["dhcp-enabled"]
1160 vld_params
["ip-profile"] = ip_profile_dest_data
1161 # update vld_params with instantiation params
1163 vld_instantiation_params
= find_in_list(
1164 get_iterable(vnf_params
, "internal-vld"),
1165 lambda i_vld
: i_vld
["name"] == vld
["id"],
1167 if vld_instantiation_params
:
1168 vld_params
.update(vld_instantiation_params
)
1169 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
1172 for vdur
in target_vnf
.get("vdur", ()):
1173 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
1174 continue # This vdu must not be created
1175 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
1177 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
1180 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
1181 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
1184 and vdu_configuration
.get("config-access")
1185 and vdu_configuration
.get("config-access").get("ssh-access")
1187 vdur
["ssh-keys"] = ssh_keys_all
1188 vdur
["ssh-access-required"] = vdu_configuration
[
1190 ]["ssh-access"]["required"]
1193 and vnf_configuration
.get("config-access")
1194 and vnf_configuration
.get("config-access").get("ssh-access")
1195 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
1197 vdur
["ssh-keys"] = ssh_keys_all
1198 vdur
["ssh-access-required"] = vnf_configuration
[
1200 ]["ssh-access"]["required"]
1201 elif ssh_keys_instantiation
and find_in_list(
1202 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
1204 vdur
["ssh-keys"] = ssh_keys_instantiation
1206 self
.logger
.debug("NS > vdur > {}".format(vdur
))
1208 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1210 if vdud
.get("cloud-init-file"):
1211 vdur
["cloud-init"] = "{}:file:{}".format(
1212 vnfd
["_id"], vdud
.get("cloud-init-file")
1214 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1215 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1216 base_folder
= vnfd
["_admin"]["storage"]
1217 if base_folder
["pkg-dir"]:
1218 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1219 base_folder
["folder"],
1220 base_folder
["pkg-dir"],
1221 vdud
.get("cloud-init-file"),
1224 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
1225 base_folder
["folder"],
1226 vdud
.get("cloud-init-file"),
1228 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1229 target
["cloud_init_content"][
1232 elif vdud
.get("cloud-init"):
1233 vdur
["cloud-init"] = "{}:vdu:{}".format(
1234 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1236 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1237 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1240 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1241 deploy_params_vdu
= self
._format
_additional
_params
(
1242 vdur
.get("additionalParams") or {}
1244 deploy_params_vdu
["OSM"] = get_osm_params(
1245 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1247 vdur
["additionalParams"] = deploy_params_vdu
1250 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1251 if target_vim
not in ns_flavor
["vim_info"]:
1252 ns_flavor
["vim_info"][target_vim
] = {}
1255 # in case alternative images are provided we must check if they should be applied
1256 # for the vim_type, modify the vim_type taking into account
1257 ns_image_id
= int(vdur
["ns-image-id"])
1258 if vdur
.get("alt-image-ids"):
1259 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1260 vim_type
= db_vim
["vim_type"]
1261 for alt_image_id
in vdur
.get("alt-image-ids"):
1262 ns_alt_image
= target
["image"][int(alt_image_id
)]
1263 if vim_type
== ns_alt_image
.get("vim-type"):
1264 # must use alternative image
1266 "use alternative image id: {}".format(alt_image_id
)
1268 ns_image_id
= alt_image_id
1269 vdur
["ns-image-id"] = ns_image_id
1271 ns_image
= target
["image"][int(ns_image_id
)]
1272 if target_vim
not in ns_image
["vim_info"]:
1273 ns_image
["vim_info"][target_vim
] = {}
1276 if vdur
.get("affinity-or-anti-affinity-group-id"):
1277 for ags_id
in vdur
["affinity-or-anti-affinity-group-id"]:
1278 ns_ags
= target
["affinity-or-anti-affinity-group"][int(ags_id
)]
1279 if target_vim
not in ns_ags
["vim_info"]:
1280 ns_ags
["vim_info"][target_vim
] = {}
1282 vdur
["vim_info"] = {target_vim
: {}}
1283 # instantiation parameters
1285 vdu_instantiation_params
= find_in_list(
1286 get_iterable(vnf_params
, "vdu"),
1287 lambda i_vdu
: i_vdu
["id"] == vdud
["id"],
1289 if vdu_instantiation_params
:
1290 # Parse the vdu_volumes from the instantiation params
1291 vdu_volumes
= get_volumes_from_instantiation_params(
1292 vdu_instantiation_params
, vdud
1294 vdur
["additionalParams"]["OSM"]["vdu_volumes"] = vdu_volumes
1295 vdur_list
.append(vdur
)
1296 target_vnf
["vdur"] = vdur_list
1297 target
["vnf"].append(target_vnf
)
1299 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
1300 desc
= await self
.RO
.deploy(nsr_id
, target
)
1301 self
.logger
.debug("RO return > {}".format(desc
))
1302 action_id
= desc
["action_id"]
1303 await self
._wait
_ng
_ro
(
1310 operation
="instantiation",
1315 "_admin.deployed.RO.operational-status": "running",
1316 "detailed-status": " ".join(stage
),
1318 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1319 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1320 self
._write
_op
_status
(nslcmop_id
, stage
)
1322 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1326 async def _wait_ng_ro(
1336 detailed_status_old
= None
1338 start_time
= start_time
or time()
1339 while time() <= start_time
+ timeout
:
1340 desc_status
= await self
.op_status_map
[operation
](nsr_id
, action_id
)
1341 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1342 if desc_status
["status"] == "FAILED":
1343 raise NgRoException(desc_status
["details"])
1344 elif desc_status
["status"] == "BUILD":
1346 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1347 elif desc_status
["status"] == "DONE":
1349 stage
[2] = "Deployed at VIM"
1352 assert False, "ROclient.check_ns_status returns unknown {}".format(
1353 desc_status
["status"]
1355 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1356 detailed_status_old
= stage
[2]
1357 db_nsr_update
["detailed-status"] = " ".join(stage
)
1358 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1359 self
._write
_op
_status
(nslcmop_id
, stage
)
1360 await asyncio
.sleep(15, loop
=self
.loop
)
1361 else: # timeout_ns_deploy
1362 raise NgRoException("Timeout waiting ns to deploy")
1364 async def _terminate_ng_ro(
1365 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1370 start_deploy
= time()
1377 "action_id": nslcmop_id
,
1379 desc
= await self
.RO
.deploy(nsr_id
, target
)
1380 action_id
= desc
["action_id"]
1381 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1382 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1385 + "ns terminate action at RO. action_id={}".format(action_id
)
1389 delete_timeout
= 20 * 60 # 20 minutes
1390 await self
._wait
_ng
_ro
(
1397 operation
="termination",
1400 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1401 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1403 await self
.RO
.delete(nsr_id
)
1404 except Exception as e
:
1405 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
1406 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1407 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1408 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1410 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1412 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
1413 failed_detail
.append("delete conflict: {}".format(e
))
1416 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1419 failed_detail
.append("delete error: {}".format(e
))
1422 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1426 stage
[2] = "Error deleting from VIM"
1428 stage
[2] = "Deleted from VIM"
1429 db_nsr_update
["detailed-status"] = " ".join(stage
)
1430 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1431 self
._write
_op
_status
(nslcmop_id
, stage
)
1434 raise LcmException("; ".join(failed_detail
))
1437 async def instantiate_RO(
1451 :param logging_text: preffix text to use at logging
1452 :param nsr_id: nsr identity
1453 :param nsd: database content of ns descriptor
1454 :param db_nsr: database content of ns record
1455 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1457 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1458 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1459 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1460 :return: None or exception
1463 start_deploy
= time()
1464 ns_params
= db_nslcmop
.get("operationParams")
1465 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1466 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1468 timeout_ns_deploy
= self
.timeout
.ns_deploy
1470 # Check for and optionally request placement optimization. Database will be updated if placement activated
1471 stage
[2] = "Waiting for Placement."
1472 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1473 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1474 for vnfr
in db_vnfrs
.values():
1475 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1478 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1480 return await self
._instantiate
_ng
_ro
(
1493 except Exception as e
:
1494 stage
[2] = "ERROR deploying at VIM"
1495 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1497 "Error deploying at VIM {}".format(e
),
1498 exc_info
=not isinstance(
1501 ROclient
.ROClientException
,
1510 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1512 Wait for kdu to be up, get ip address
1513 :param logging_text: prefix use for logging
1517 :return: IP address, K8s services
1520 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1523 while nb_tries
< 360:
1524 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1528 for x
in get_iterable(db_vnfr
, "kdur")
1529 if x
.get("kdu-name") == kdu_name
1535 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1537 if kdur
.get("status"):
1538 if kdur
["status"] in ("READY", "ENABLED"):
1539 return kdur
.get("ip-address"), kdur
.get("services")
1542 "target KDU={} is in error state".format(kdu_name
)
1545 await asyncio
.sleep(10, loop
=self
.loop
)
1547 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1549 async def wait_vm_up_insert_key_ro(
1550 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1553 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1554 :param logging_text: prefix use for logging
1559 :param pub_key: public ssh key to inject, None to skip
1560 :param user: user to apply the public ssh key
1564 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1568 target_vdu_id
= None
1574 if ro_retries
>= 360: # 1 hour
1576 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1579 await asyncio
.sleep(10, loop
=self
.loop
)
1582 if not target_vdu_id
:
1583 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1585 if not vdu_id
: # for the VNF case
1586 if db_vnfr
.get("status") == "ERROR":
1588 "Cannot inject ssh-key because target VNF is in error state"
1590 ip_address
= db_vnfr
.get("ip-address")
1596 for x
in get_iterable(db_vnfr
, "vdur")
1597 if x
.get("ip-address") == ip_address
1605 for x
in get_iterable(db_vnfr
, "vdur")
1606 if x
.get("vdu-id-ref") == vdu_id
1607 and x
.get("count-index") == vdu_index
1613 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1614 ): # If only one, this should be the target vdu
1615 vdur
= db_vnfr
["vdur"][0]
1618 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1619 vnfr_id
, vdu_id
, vdu_index
1622 # New generation RO stores information at "vim_info"
1625 if vdur
.get("vim_info"):
1627 t
for t
in vdur
["vim_info"]
1628 ) # there should be only one key
1629 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1631 vdur
.get("pdu-type")
1632 or vdur
.get("status") == "ACTIVE"
1633 or ng_ro_status
== "ACTIVE"
1635 ip_address
= vdur
.get("ip-address")
1638 target_vdu_id
= vdur
["vdu-id-ref"]
1639 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1641 "Cannot inject ssh-key because target VM is in error state"
1644 if not target_vdu_id
:
1647 # inject public key into machine
1648 if pub_key
and user
:
1649 self
.logger
.debug(logging_text
+ "Inserting RO key")
1650 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1651 if vdur
.get("pdu-type"):
1652 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1655 ro_vm_id
= "{}-{}".format(
1656 db_vnfr
["member-vnf-index-ref"], target_vdu_id
1657 ) # TODO add vdu_index
1658 if self
.ro_config
.ng
:
1661 "action": "inject_ssh_key",
1665 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1667 desc
= await self
.RO
.deploy(nsr_id
, target
)
1668 action_id
= desc
["action_id"]
1669 await self
._wait
_ng
_ro
(
1670 nsr_id
, action_id
, timeout
=600, operation
="instantiation"
1674 # wait until NS is deployed at RO
1676 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1677 ro_nsr_id
= deep_get(
1678 db_nsrs
, ("_admin", "deployed", "RO", "nsr_id")
1682 result_dict
= await self
.RO
.create_action(
1684 item_id_name
=ro_nsr_id
,
1686 "add_public_key": pub_key
,
1691 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1692 if not result_dict
or not isinstance(result_dict
, dict):
1694 "Unknown response from RO when injecting key"
1696 for result
in result_dict
.values():
1697 if result
.get("vim_result") == 200:
1700 raise ROclient
.ROClientException(
1701 "error injecting key: {}".format(
1702 result
.get("description")
1706 except NgRoException
as e
:
1708 "Reaching max tries injecting key. Error: {}".format(e
)
1710 except ROclient
.ROClientException
as e
:
1714 + "error injecting key: {}. Retrying until {} seconds".format(
1721 "Reaching max tries injecting key. Error: {}".format(e
)
1728 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1730 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1732 my_vca
= vca_deployed_list
[vca_index
]
1733 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1734 # vdu or kdu: no dependencies
1738 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1739 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1740 configuration_status_list
= db_nsr
["configurationStatus"]
1741 for index
, vca_deployed
in enumerate(configuration_status_list
):
1742 if index
== vca_index
:
1745 if not my_vca
.get("member-vnf-index") or (
1746 vca_deployed
.get("member-vnf-index")
1747 == my_vca
.get("member-vnf-index")
1749 internal_status
= configuration_status_list
[index
].get("status")
1750 if internal_status
== "READY":
1752 elif internal_status
== "BROKEN":
1754 "Configuration aborted because dependent charm/s has failed"
1759 # no dependencies, return
1761 await asyncio
.sleep(10)
1764 raise LcmException("Configuration aborted because dependent charm/s timeout")
1766 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1769 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1771 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1772 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1775 async def instantiate_N2VC(
1792 ee_config_descriptor
,
1794 nsr_id
= db_nsr
["_id"]
1795 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1796 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1797 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1798 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1800 "collection": "nsrs",
1801 "filter": {"_id": nsr_id
},
1802 "path": db_update_entry
,
1808 element_under_configuration
= nsr_id
1812 vnfr_id
= db_vnfr
["_id"]
1813 osm_config
["osm"]["vnf_id"] = vnfr_id
1815 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1817 if vca_type
== "native_charm":
1820 index_number
= vdu_index
or 0
1823 element_type
= "VNF"
1824 element_under_configuration
= vnfr_id
1825 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1827 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1828 element_type
= "VDU"
1829 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1830 osm_config
["osm"]["vdu_id"] = vdu_id
1832 namespace
+= ".{}".format(kdu_name
)
1833 element_type
= "KDU"
1834 element_under_configuration
= kdu_name
1835 osm_config
["osm"]["kdu_name"] = kdu_name
1838 if base_folder
["pkg-dir"]:
1839 artifact_path
= "{}/{}/{}/{}".format(
1840 base_folder
["folder"],
1841 base_folder
["pkg-dir"],
1844 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1849 artifact_path
= "{}/Scripts/{}/{}/".format(
1850 base_folder
["folder"],
1853 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1858 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1860 # get initial_config_primitive_list that applies to this element
1861 initial_config_primitive_list
= config_descriptor
.get(
1862 "initial-config-primitive"
1866 "Initial config primitive list > {}".format(
1867 initial_config_primitive_list
1871 # add config if not present for NS charm
1872 ee_descriptor_id
= ee_config_descriptor
.get("id")
1873 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1874 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1875 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1879 "Initial config primitive list #2 > {}".format(
1880 initial_config_primitive_list
1883 # n2vc_redesign STEP 3.1
1884 # find old ee_id if exists
1885 ee_id
= vca_deployed
.get("ee_id")
1887 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1888 # create or register execution environment in VCA
1889 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1891 self
._write
_configuration
_status
(
1893 vca_index
=vca_index
,
1895 element_under_configuration
=element_under_configuration
,
1896 element_type
=element_type
,
1899 step
= "create execution environment"
1900 self
.logger
.debug(logging_text
+ step
)
1904 if vca_type
== "k8s_proxy_charm":
1905 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1906 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1907 namespace
=namespace
,
1908 artifact_path
=artifact_path
,
1912 elif vca_type
== "helm" or vca_type
== "helm-v3":
1913 ee_id
, credentials
= await self
.vca_map
[
1915 ].create_execution_environment(
1916 namespace
=namespace
,
1920 artifact_path
=artifact_path
,
1921 chart_model
=vca_name
,
1925 ee_id
, credentials
= await self
.vca_map
[
1927 ].create_execution_environment(
1928 namespace
=namespace
,
1934 elif vca_type
== "native_charm":
1935 step
= "Waiting to VM being up and getting IP address"
1936 self
.logger
.debug(logging_text
+ step
)
1937 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1946 credentials
= {"hostname": rw_mgmt_ip
}
1948 username
= deep_get(
1949 config_descriptor
, ("config-access", "ssh-access", "default-user")
1951 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1952 # merged. Meanwhile let's get username from initial-config-primitive
1953 if not username
and initial_config_primitive_list
:
1954 for config_primitive
in initial_config_primitive_list
:
1955 for param
in config_primitive
.get("parameter", ()):
1956 if param
["name"] == "ssh-username":
1957 username
= param
["value"]
1961 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1962 "'config-access.ssh-access.default-user'"
1964 credentials
["username"] = username
1965 # n2vc_redesign STEP 3.2
1967 self
._write
_configuration
_status
(
1969 vca_index
=vca_index
,
1970 status
="REGISTERING",
1971 element_under_configuration
=element_under_configuration
,
1972 element_type
=element_type
,
1975 step
= "register execution environment {}".format(credentials
)
1976 self
.logger
.debug(logging_text
+ step
)
1977 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1978 credentials
=credentials
,
1979 namespace
=namespace
,
1984 # for compatibility with MON/POL modules, the need model and application name at database
1985 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1986 ee_id_parts
= ee_id
.split(".")
1987 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1988 if len(ee_id_parts
) >= 2:
1989 model_name
= ee_id_parts
[0]
1990 application_name
= ee_id_parts
[1]
1991 db_nsr_update
[db_update_entry
+ "model"] = model_name
1992 db_nsr_update
[db_update_entry
+ "application"] = application_name
1994 # n2vc_redesign STEP 3.3
1995 step
= "Install configuration Software"
1997 self
._write
_configuration
_status
(
1999 vca_index
=vca_index
,
2000 status
="INSTALLING SW",
2001 element_under_configuration
=element_under_configuration
,
2002 element_type
=element_type
,
2003 other_update
=db_nsr_update
,
2006 # TODO check if already done
2007 self
.logger
.debug(logging_text
+ step
)
2009 if vca_type
== "native_charm":
2010 config_primitive
= next(
2011 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
2014 if config_primitive
:
2015 config
= self
._map
_primitive
_params
(
2016 config_primitive
, {}, deploy_params
2019 if vca_type
== "lxc_proxy_charm":
2020 if element_type
== "NS":
2021 num_units
= db_nsr
.get("config-units") or 1
2022 elif element_type
== "VNF":
2023 num_units
= db_vnfr
.get("config-units") or 1
2024 elif element_type
== "VDU":
2025 for v
in db_vnfr
["vdur"]:
2026 if vdu_id
== v
["vdu-id-ref"]:
2027 num_units
= v
.get("config-units") or 1
2029 if vca_type
!= "k8s_proxy_charm":
2030 await self
.vca_map
[vca_type
].install_configuration_sw(
2032 artifact_path
=artifact_path
,
2035 num_units
=num_units
,
2040 # write in db flag of configuration_sw already installed
2042 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
2045 # add relations for this VCA (wait for other peers related with this VCA)
2046 await self
._add
_vca
_relations
(
2047 logging_text
=logging_text
,
2050 vca_index
=vca_index
,
2053 # if SSH access is required, then get execution environment SSH public
2054 # if native charm we have waited already to VM be UP
2055 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
2058 # self.logger.debug("get ssh key block")
2060 config_descriptor
, ("config-access", "ssh-access", "required")
2062 # self.logger.debug("ssh key needed")
2063 # Needed to inject a ssh key
2066 ("config-access", "ssh-access", "default-user"),
2068 step
= "Install configuration Software, getting public ssh key"
2069 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
2070 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
2073 step
= "Insert public key into VM user={} ssh_key={}".format(
2077 # self.logger.debug("no need to get ssh key")
2078 step
= "Waiting to VM being up and getting IP address"
2079 self
.logger
.debug(logging_text
+ step
)
2081 # default rw_mgmt_ip to None, avoiding the non definition of the variable
2084 # n2vc_redesign STEP 5.1
2085 # wait for RO (ip-address) Insert pub_key into VM
2088 rw_mgmt_ip
, services
= await self
.wait_kdu_up(
2089 logging_text
, nsr_id
, vnfr_id
, kdu_name
2091 vnfd
= self
.db
.get_one(
2093 {"_id": f
'{db_vnfr["vnfd-id"]}:{db_vnfr["revision"]}'},
2095 kdu
= get_kdu(vnfd
, kdu_name
)
2097 service
["name"] for service
in get_kdu_services(kdu
)
2099 exposed_services
= []
2100 for service
in services
:
2101 if any(s
in service
["name"] for s
in kdu_services
):
2102 exposed_services
.append(service
)
2103 await self
.vca_map
[vca_type
].exec_primitive(
2105 primitive_name
="config",
2107 "osm-config": json
.dumps(
2109 k8s
={"services": exposed_services
}
2116 # This verification is needed in order to avoid trying to add a public key
2117 # to a VM, when the VNF is a KNF (in the edge case where the user creates a VCA
2118 # for a KNF and not for its KDUs, the previous verification gives False, and the code
2119 # jumps to this block, meaning that there is the need to verify if the VNF is actually a VNF
2121 elif db_vnfr
.get("vdur"):
2122 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
2132 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
2134 # store rw_mgmt_ip in deploy params for later replacement
2135 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
2137 # n2vc_redesign STEP 6 Execute initial config primitive
2138 step
= "execute initial config primitive"
2140 # wait for dependent primitives execution (NS -> VNF -> VDU)
2141 if initial_config_primitive_list
:
2142 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
2144 # stage, in function of element type: vdu, kdu, vnf or ns
2145 my_vca
= vca_deployed_list
[vca_index
]
2146 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
2148 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
2149 elif my_vca
.get("member-vnf-index"):
2151 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
2154 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
2156 self
._write
_configuration
_status
(
2157 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
2160 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2162 check_if_terminated_needed
= True
2163 for initial_config_primitive
in initial_config_primitive_list
:
2164 # adding information on the vca_deployed if it is a NS execution environment
2165 if not vca_deployed
["member-vnf-index"]:
2166 deploy_params
["ns_config_info"] = json
.dumps(
2167 self
._get
_ns
_config
_info
(nsr_id
)
2169 # TODO check if already done
2170 primitive_params_
= self
._map
_primitive
_params
(
2171 initial_config_primitive
, {}, deploy_params
2174 step
= "execute primitive '{}' params '{}'".format(
2175 initial_config_primitive
["name"], primitive_params_
2177 self
.logger
.debug(logging_text
+ step
)
2178 await self
.vca_map
[vca_type
].exec_primitive(
2180 primitive_name
=initial_config_primitive
["name"],
2181 params_dict
=primitive_params_
,
2186 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2187 if check_if_terminated_needed
:
2188 if config_descriptor
.get("terminate-config-primitive"):
2190 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
2192 check_if_terminated_needed
= False
2194 # TODO register in database that primitive is done
2196 # STEP 7 Configure metrics
2197 if vca_type
== "helm" or vca_type
== "helm-v3":
2198 # TODO: review for those cases where the helm chart is a reference and
2199 # is not part of the NF package
2200 prometheus_jobs
= await self
.extract_prometheus_scrape_jobs(
2202 artifact_path
=artifact_path
,
2203 ee_config_descriptor
=ee_config_descriptor
,
2206 target_ip
=rw_mgmt_ip
,
2212 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
2215 for job
in prometheus_jobs
:
2218 {"job_name": job
["job_name"]},
2221 fail_on_empty
=False,
2224 step
= "instantiated at VCA"
2225 self
.logger
.debug(logging_text
+ step
)
2227 self
._write
_configuration
_status
(
2228 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
2231 except Exception as e
: # TODO not use Exception but N2VC exception
2232 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2234 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
2237 "Exception while {} : {}".format(step
, e
), exc_info
=True
2239 self
._write
_configuration
_status
(
2240 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2242 raise LcmException("{} {}".format(step
, e
)) from e
2244 def _write_ns_status(
2248 current_operation
: str,
2249 current_operation_id
: str,
2250 error_description
: str = None,
2251 error_detail
: str = None,
2252 other_update
: dict = None,
2255 Update db_nsr fields.
2258 :param current_operation:
2259 :param current_operation_id:
2260 :param error_description:
2261 :param error_detail:
2262 :param other_update: Other required changes at database if provided, will be cleared
2266 db_dict
= other_update
or {}
2269 ] = current_operation_id
# for backward compatibility
2270 db_dict
["_admin.current-operation"] = current_operation_id
2271 db_dict
["_admin.operation-type"] = (
2272 current_operation
if current_operation
!= "IDLE" else None
2274 db_dict
["currentOperation"] = current_operation
2275 db_dict
["currentOperationID"] = current_operation_id
2276 db_dict
["errorDescription"] = error_description
2277 db_dict
["errorDetail"] = error_detail
2280 db_dict
["nsState"] = ns_state
2281 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2282 except DbException
as e
:
2283 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2285 def _write_op_status(
2289 error_message
: str = None,
2290 queuePosition
: int = 0,
2291 operation_state
: str = None,
2292 other_update
: dict = None,
2295 db_dict
= other_update
or {}
2296 db_dict
["queuePosition"] = queuePosition
2297 if isinstance(stage
, list):
2298 db_dict
["stage"] = stage
[0]
2299 db_dict
["detailed-status"] = " ".join(stage
)
2300 elif stage
is not None:
2301 db_dict
["stage"] = str(stage
)
2303 if error_message
is not None:
2304 db_dict
["errorMessage"] = error_message
2305 if operation_state
is not None:
2306 db_dict
["operationState"] = operation_state
2307 db_dict
["statusEnteredTime"] = time()
2308 self
.update_db_2("nslcmops", op_id
, db_dict
)
2309 except DbException
as e
:
2311 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2314 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2316 nsr_id
= db_nsr
["_id"]
2317 # configurationStatus
2318 config_status
= db_nsr
.get("configurationStatus")
2321 "configurationStatus.{}.status".format(index
): status
2322 for index
, v
in enumerate(config_status
)
2326 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2328 except DbException
as e
:
2330 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2333 def _write_configuration_status(
2338 element_under_configuration
: str = None,
2339 element_type
: str = None,
2340 other_update
: dict = None,
2343 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2344 # .format(vca_index, status))
2347 db_path
= "configurationStatus.{}.".format(vca_index
)
2348 db_dict
= other_update
or {}
2350 db_dict
[db_path
+ "status"] = status
2351 if element_under_configuration
:
2353 db_path
+ "elementUnderConfiguration"
2354 ] = element_under_configuration
2356 db_dict
[db_path
+ "elementType"] = element_type
2357 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2358 except DbException
as e
:
2360 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2361 status
, nsr_id
, vca_index
, e
2365 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2367 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2368 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2369 Database is used because the result can be obtained from a different LCM worker in case of HA.
2370 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2371 :param db_nslcmop: database content of nslcmop
2372 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2373 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2374 computed 'vim-account-id'
2377 nslcmop_id
= db_nslcmop
["_id"]
2378 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2379 if placement_engine
== "PLA":
2381 logging_text
+ "Invoke and wait for placement optimization"
2383 await self
.msg
.aiowrite(
2384 "pla", "get_placement", {"nslcmopId": nslcmop_id
}, loop
=self
.loop
2386 db_poll_interval
= 5
2387 wait
= db_poll_interval
* 10
2389 while not pla_result
and wait
>= 0:
2390 await asyncio
.sleep(db_poll_interval
)
2391 wait
-= db_poll_interval
2392 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2393 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2397 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2400 for pla_vnf
in pla_result
["vnf"]:
2401 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2402 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2407 {"_id": vnfr
["_id"]},
2408 {"vim-account-id": pla_vnf
["vimAccountId"]},
2411 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2414 def update_nsrs_with_pla_result(self
, params
):
2416 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2418 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2420 except Exception as e
:
2421 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2423 async def instantiate(self
, nsr_id
, nslcmop_id
):
2426 :param nsr_id: ns instance to deploy
2427 :param nslcmop_id: operation to run
2431 # Try to lock HA task here
2432 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2433 if not task_is_locked_by_me
:
2435 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2439 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2440 self
.logger
.debug(logging_text
+ "Enter")
2442 # get all needed from database
2444 # database nsrs record
2447 # database nslcmops record
2450 # update operation on nsrs
2452 # update operation on nslcmops
2453 db_nslcmop_update
= {}
2455 nslcmop_operation_state
= None
2456 db_vnfrs
= {} # vnf's info indexed by member-index
2458 tasks_dict_info
= {} # from task to info text
2462 "Stage 1/5: preparation of the environment.",
2463 "Waiting for previous operations to terminate.",
2466 # ^ stage, step, VIM progress
2468 # wait for any previous tasks in process
2469 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2471 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2472 stage
[1] = "Reading from database."
2473 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2474 db_nsr_update
["detailed-status"] = "creating"
2475 db_nsr_update
["operational-status"] = "init"
2476 self
._write
_ns
_status
(
2478 ns_state
="BUILDING",
2479 current_operation
="INSTANTIATING",
2480 current_operation_id
=nslcmop_id
,
2481 other_update
=db_nsr_update
,
2483 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2485 # read from db: operation
2486 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2487 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2488 if db_nslcmop
["operationParams"].get("additionalParamsForVnf"):
2489 db_nslcmop
["operationParams"]["additionalParamsForVnf"] = json
.loads(
2490 db_nslcmop
["operationParams"]["additionalParamsForVnf"]
2492 ns_params
= db_nslcmop
.get("operationParams")
2493 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2494 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2496 timeout_ns_deploy
= self
.timeout
.ns_deploy
2499 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2500 self
.logger
.debug(logging_text
+ stage
[1])
2501 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2502 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2503 self
.logger
.debug(logging_text
+ stage
[1])
2504 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2505 self
.fs
.sync(db_nsr
["nsd-id"])
2507 # nsr_name = db_nsr["name"] # TODO short-name??
2509 # read from db: vnf's of this ns
2510 stage
[1] = "Getting vnfrs from db."
2511 self
.logger
.debug(logging_text
+ stage
[1])
2512 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2514 # read from db: vnfd's for every vnf
2515 db_vnfds
= [] # every vnfd data
2517 # for each vnf in ns, read vnfd
2518 for vnfr
in db_vnfrs_list
:
2519 if vnfr
.get("kdur"):
2521 for kdur
in vnfr
["kdur"]:
2522 if kdur
.get("additionalParams"):
2523 kdur
["additionalParams"] = json
.loads(
2524 kdur
["additionalParams"]
2526 kdur_list
.append(kdur
)
2527 vnfr
["kdur"] = kdur_list
2529 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2530 vnfd_id
= vnfr
["vnfd-id"]
2531 vnfd_ref
= vnfr
["vnfd-ref"]
2532 self
.fs
.sync(vnfd_id
)
2534 # if we haven't this vnfd, read it from db
2535 if vnfd_id
not in db_vnfds
:
2537 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2540 self
.logger
.debug(logging_text
+ stage
[1])
2541 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2544 db_vnfds
.append(vnfd
)
2546 # Get or generates the _admin.deployed.VCA list
2547 vca_deployed_list
= None
2548 if db_nsr
["_admin"].get("deployed"):
2549 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2550 if vca_deployed_list
is None:
2551 vca_deployed_list
= []
2552 configuration_status_list
= []
2553 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2554 db_nsr_update
["configurationStatus"] = configuration_status_list
2555 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2556 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2557 elif isinstance(vca_deployed_list
, dict):
2558 # maintain backward compatibility. Change a dict to list at database
2559 vca_deployed_list
= list(vca_deployed_list
.values())
2560 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2561 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2564 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2566 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2567 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2569 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2570 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2571 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2573 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2576 # n2vc_redesign STEP 2 Deploy Network Scenario
2577 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2578 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2580 stage
[1] = "Deploying KDUs."
2581 # self.logger.debug(logging_text + "Before deploy_kdus")
2582 # Call to deploy_kdus in case exists the "vdu:kdu" param
2583 await self
.deploy_kdus(
2584 logging_text
=logging_text
,
2586 nslcmop_id
=nslcmop_id
,
2589 task_instantiation_info
=tasks_dict_info
,
2592 stage
[1] = "Getting VCA public key."
2593 # n2vc_redesign STEP 1 Get VCA public ssh-key
2594 # feature 1429. Add n2vc public key to needed VMs
2595 n2vc_key
= self
.n2vc
.get_public_key()
2596 n2vc_key_list
= [n2vc_key
]
2597 if self
.vca_config
.public_key
:
2598 n2vc_key_list
.append(self
.vca_config
.public_key
)
2600 stage
[1] = "Deploying NS at VIM."
2601 task_ro
= asyncio
.ensure_future(
2602 self
.instantiate_RO(
2603 logging_text
=logging_text
,
2607 db_nslcmop
=db_nslcmop
,
2610 n2vc_key_list
=n2vc_key_list
,
2614 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2615 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2617 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2618 stage
[1] = "Deploying Execution Environments."
2619 self
.logger
.debug(logging_text
+ stage
[1])
2621 # create namespace and certificate if any helm based EE is present in the NS
2622 if check_helm_ee_in_ns(db_vnfds
):
2623 # TODO: create EE namespace
2624 # create TLS certificates
2625 await self
.vca_map
["helm-v3"].create_tls_certificate(
2626 secret_name
="ee-tls-{}".format(nsr_id
),
2629 usage
="server auth",
2632 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2633 for vnf_profile
in get_vnf_profiles(nsd
):
2634 vnfd_id
= vnf_profile
["vnfd-id"]
2635 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2636 member_vnf_index
= str(vnf_profile
["id"])
2637 db_vnfr
= db_vnfrs
[member_vnf_index
]
2638 base_folder
= vnfd
["_admin"]["storage"]
2644 # Get additional parameters
2645 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2646 if db_vnfr
.get("additionalParamsForVnf"):
2647 deploy_params
.update(
2648 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2651 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2652 if descriptor_config
:
2654 logging_text
=logging_text
2655 + "member_vnf_index={} ".format(member_vnf_index
),
2658 nslcmop_id
=nslcmop_id
,
2664 member_vnf_index
=member_vnf_index
,
2665 vdu_index
=vdu_index
,
2667 deploy_params
=deploy_params
,
2668 descriptor_config
=descriptor_config
,
2669 base_folder
=base_folder
,
2670 task_instantiation_info
=tasks_dict_info
,
2674 # Deploy charms for each VDU that supports one.
2675 for vdud
in get_vdu_list(vnfd
):
2677 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2678 vdur
= find_in_list(
2679 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2682 if vdur
.get("additionalParams"):
2683 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2685 deploy_params_vdu
= deploy_params
2686 deploy_params_vdu
["OSM"] = get_osm_params(
2687 db_vnfr
, vdu_id
, vdu_count_index
=0
2689 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2691 self
.logger
.debug("VDUD > {}".format(vdud
))
2693 "Descriptor config > {}".format(descriptor_config
)
2695 if descriptor_config
:
2698 for vdu_index
in range(vdud_count
):
2699 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2701 logging_text
=logging_text
2702 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2703 member_vnf_index
, vdu_id
, vdu_index
2707 nslcmop_id
=nslcmop_id
,
2713 member_vnf_index
=member_vnf_index
,
2714 vdu_index
=vdu_index
,
2716 deploy_params
=deploy_params_vdu
,
2717 descriptor_config
=descriptor_config
,
2718 base_folder
=base_folder
,
2719 task_instantiation_info
=tasks_dict_info
,
2722 for kdud
in get_kdu_list(vnfd
):
2723 kdu_name
= kdud
["name"]
2724 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2725 if descriptor_config
:
2730 x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
2732 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2733 if kdur
.get("additionalParams"):
2734 deploy_params_kdu
.update(
2735 parse_yaml_strings(kdur
["additionalParams"].copy())
2739 logging_text
=logging_text
,
2742 nslcmop_id
=nslcmop_id
,
2748 member_vnf_index
=member_vnf_index
,
2749 vdu_index
=vdu_index
,
2751 deploy_params
=deploy_params_kdu
,
2752 descriptor_config
=descriptor_config
,
2753 base_folder
=base_folder
,
2754 task_instantiation_info
=tasks_dict_info
,
2758 # Check if this NS has a charm configuration
2759 descriptor_config
= nsd
.get("ns-configuration")
2760 if descriptor_config
and descriptor_config
.get("juju"):
2763 member_vnf_index
= None
2769 # Get additional parameters
2770 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2771 if db_nsr
.get("additionalParamsForNs"):
2772 deploy_params
.update(
2773 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2775 base_folder
= nsd
["_admin"]["storage"]
2777 logging_text
=logging_text
,
2780 nslcmop_id
=nslcmop_id
,
2786 member_vnf_index
=member_vnf_index
,
2787 vdu_index
=vdu_index
,
2789 deploy_params
=deploy_params
,
2790 descriptor_config
=descriptor_config
,
2791 base_folder
=base_folder
,
2792 task_instantiation_info
=tasks_dict_info
,
2796 # rest of staff will be done at finally
2799 ROclient
.ROClientException
,
2805 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
2808 except asyncio
.CancelledError
:
2810 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
2812 exc
= "Operation was cancelled"
2813 except Exception as e
:
2814 exc
= traceback
.format_exc()
2815 self
.logger
.critical(
2816 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
2821 error_list
.append(str(exc
))
2823 # wait for pending tasks
2825 stage
[1] = "Waiting for instantiate pending tasks."
2826 self
.logger
.debug(logging_text
+ stage
[1])
2827 error_list
+= await self
._wait
_for
_tasks
(
2835 stage
[1] = stage
[2] = ""
2836 except asyncio
.CancelledError
:
2837 error_list
.append("Cancelled")
2838 # TODO cancel all tasks
2839 except Exception as exc
:
2840 error_list
.append(str(exc
))
2842 # update operation-status
2843 db_nsr_update
["operational-status"] = "running"
2844 # let's begin with VCA 'configured' status (later we can change it)
2845 db_nsr_update
["config-status"] = "configured"
2846 for task
, task_name
in tasks_dict_info
.items():
2847 if not task
.done() or task
.cancelled() or task
.exception():
2848 if task_name
.startswith(self
.task_name_deploy_vca
):
2849 # A N2VC task is pending
2850 db_nsr_update
["config-status"] = "failed"
2852 # RO or KDU task is pending
2853 db_nsr_update
["operational-status"] = "failed"
2855 # update status at database
2857 error_detail
= ". ".join(error_list
)
2858 self
.logger
.error(logging_text
+ error_detail
)
2859 error_description_nslcmop
= "{} Detail: {}".format(
2860 stage
[0], error_detail
2862 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
2863 nslcmop_id
, stage
[0]
2866 db_nsr_update
["detailed-status"] = (
2867 error_description_nsr
+ " Detail: " + error_detail
2869 db_nslcmop_update
["detailed-status"] = error_detail
2870 nslcmop_operation_state
= "FAILED"
2874 error_description_nsr
= error_description_nslcmop
= None
2876 db_nsr_update
["detailed-status"] = "Done"
2877 db_nslcmop_update
["detailed-status"] = "Done"
2878 nslcmop_operation_state
= "COMPLETED"
2881 self
._write
_ns
_status
(
2884 current_operation
="IDLE",
2885 current_operation_id
=None,
2886 error_description
=error_description_nsr
,
2887 error_detail
=error_detail
,
2888 other_update
=db_nsr_update
,
2890 self
._write
_op
_status
(
2893 error_message
=error_description_nslcmop
,
2894 operation_state
=nslcmop_operation_state
,
2895 other_update
=db_nslcmop_update
,
2898 if nslcmop_operation_state
:
2900 await self
.msg
.aiowrite(
2905 "nslcmop_id": nslcmop_id
,
2906 "operationState": nslcmop_operation_state
,
2910 except Exception as e
:
2912 logging_text
+ "kafka_write notification Exception {}".format(e
)
2915 self
.logger
.debug(logging_text
+ "Exit")
2916 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2918 def _get_vnfd(self
, vnfd_id
: str, cached_vnfds
: Dict
[str, Any
]):
2919 if vnfd_id
not in cached_vnfds
:
2920 cached_vnfds
[vnfd_id
] = self
.db
.get_one("vnfds", {"id": vnfd_id
})
2921 return cached_vnfds
[vnfd_id
]
2923 def _get_vnfr(self
, nsr_id
: str, vnf_profile_id
: str, cached_vnfrs
: Dict
[str, Any
]):
2924 if vnf_profile_id
not in cached_vnfrs
:
2925 cached_vnfrs
[vnf_profile_id
] = self
.db
.get_one(
2928 "member-vnf-index-ref": vnf_profile_id
,
2929 "nsr-id-ref": nsr_id
,
2932 return cached_vnfrs
[vnf_profile_id
]
2934 def _is_deployed_vca_in_relation(
2935 self
, vca
: DeployedVCA
, relation
: Relation
2938 for endpoint
in (relation
.provider
, relation
.requirer
):
2939 if endpoint
["kdu-resource-profile-id"]:
2942 vca
.vnf_profile_id
== endpoint
.vnf_profile_id
2943 and vca
.vdu_profile_id
== endpoint
.vdu_profile_id
2944 and vca
.execution_environment_ref
== endpoint
.execution_environment_ref
2950 def _update_ee_relation_data_with_implicit_data(
2951 self
, nsr_id
, nsd
, ee_relation_data
, cached_vnfds
, vnf_profile_id
: str = None
2953 ee_relation_data
= safe_get_ee_relation(
2954 nsr_id
, ee_relation_data
, vnf_profile_id
=vnf_profile_id
2956 ee_relation_level
= EELevel
.get_level(ee_relation_data
)
2957 if (ee_relation_level
in (EELevel
.VNF
, EELevel
.VDU
)) and not ee_relation_data
[
2958 "execution-environment-ref"
2960 vnf_profile
= get_vnf_profile(nsd
, ee_relation_data
["vnf-profile-id"])
2961 vnfd_id
= vnf_profile
["vnfd-id"]
2962 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2965 if ee_relation_level
== EELevel
.VNF
2966 else ee_relation_data
["vdu-profile-id"]
2968 ee
= get_juju_ee_ref(db_vnfd
, entity_id
)
2971 f
"not execution environments found for ee_relation {ee_relation_data}"
2973 ee_relation_data
["execution-environment-ref"] = ee
["id"]
2974 return ee_relation_data
2976 def _get_ns_relations(
2979 nsd
: Dict
[str, Any
],
2981 cached_vnfds
: Dict
[str, Any
],
2982 ) -> List
[Relation
]:
2984 db_ns_relations
= get_ns_configuration_relation_list(nsd
)
2985 for r
in db_ns_relations
:
2986 provider_dict
= None
2987 requirer_dict
= None
2988 if all(key
in r
for key
in ("provider", "requirer")):
2989 provider_dict
= r
["provider"]
2990 requirer_dict
= r
["requirer"]
2991 elif "entities" in r
:
2992 provider_id
= r
["entities"][0]["id"]
2995 "endpoint": r
["entities"][0]["endpoint"],
2997 if provider_id
!= nsd
["id"]:
2998 provider_dict
["vnf-profile-id"] = provider_id
2999 requirer_id
= r
["entities"][1]["id"]
3002 "endpoint": r
["entities"][1]["endpoint"],
3004 if requirer_id
!= nsd
["id"]:
3005 requirer_dict
["vnf-profile-id"] = requirer_id
3008 "provider/requirer or entities must be included in the relation."
3010 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3011 nsr_id
, nsd
, provider_dict
, cached_vnfds
3013 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3014 nsr_id
, nsd
, requirer_dict
, cached_vnfds
3016 provider
= EERelation(relation_provider
)
3017 requirer
= EERelation(relation_requirer
)
3018 relation
= Relation(r
["name"], provider
, requirer
)
3019 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
3021 relations
.append(relation
)
3024 def _get_vnf_relations(
3027 nsd
: Dict
[str, Any
],
3029 cached_vnfds
: Dict
[str, Any
],
3030 ) -> List
[Relation
]:
3032 vnf_profile
= get_vnf_profile(nsd
, vca
.vnf_profile_id
)
3033 vnf_profile_id
= vnf_profile
["id"]
3034 vnfd_id
= vnf_profile
["vnfd-id"]
3035 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
3036 db_vnf_relations
= get_relation_list(db_vnfd
, vnfd_id
)
3037 for r
in db_vnf_relations
:
3038 provider_dict
= None
3039 requirer_dict
= None
3040 if all(key
in r
for key
in ("provider", "requirer")):
3041 provider_dict
= r
["provider"]
3042 requirer_dict
= r
["requirer"]
3043 elif "entities" in r
:
3044 provider_id
= r
["entities"][0]["id"]
3047 "vnf-profile-id": vnf_profile_id
,
3048 "endpoint": r
["entities"][0]["endpoint"],
3050 if provider_id
!= vnfd_id
:
3051 provider_dict
["vdu-profile-id"] = provider_id
3052 requirer_id
= r
["entities"][1]["id"]
3055 "vnf-profile-id": vnf_profile_id
,
3056 "endpoint": r
["entities"][1]["endpoint"],
3058 if requirer_id
!= vnfd_id
:
3059 requirer_dict
["vdu-profile-id"] = requirer_id
3062 "provider/requirer or entities must be included in the relation."
3064 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3065 nsr_id
, nsd
, provider_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3067 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3068 nsr_id
, nsd
, requirer_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3070 provider
= EERelation(relation_provider
)
3071 requirer
= EERelation(relation_requirer
)
3072 relation
= Relation(r
["name"], provider
, requirer
)
3073 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
3075 relations
.append(relation
)
3078 def _get_kdu_resource_data(
3080 ee_relation
: EERelation
,
3081 db_nsr
: Dict
[str, Any
],
3082 cached_vnfds
: Dict
[str, Any
],
3083 ) -> DeployedK8sResource
:
3084 nsd
= get_nsd(db_nsr
)
3085 vnf_profiles
= get_vnf_profiles(nsd
)
3086 vnfd_id
= find_in_list(
3088 lambda vnf_profile
: vnf_profile
["id"] == ee_relation
.vnf_profile_id
,
3090 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
3091 kdu_resource_profile
= get_kdu_resource_profile(
3092 db_vnfd
, ee_relation
.kdu_resource_profile_id
3094 kdu_name
= kdu_resource_profile
["kdu-name"]
3095 deployed_kdu
, _
= get_deployed_kdu(
3096 db_nsr
.get("_admin", ()).get("deployed", ()),
3098 ee_relation
.vnf_profile_id
,
3100 deployed_kdu
.update({"resource-name": kdu_resource_profile
["resource-name"]})
3103 def _get_deployed_component(
3105 ee_relation
: EERelation
,
3106 db_nsr
: Dict
[str, Any
],
3107 cached_vnfds
: Dict
[str, Any
],
3108 ) -> DeployedComponent
:
3109 nsr_id
= db_nsr
["_id"]
3110 deployed_component
= None
3111 ee_level
= EELevel
.get_level(ee_relation
)
3112 if ee_level
== EELevel
.NS
:
3113 vca
= get_deployed_vca(db_nsr
, {"vdu_id": None, "member-vnf-index": None})
3115 deployed_component
= DeployedVCA(nsr_id
, vca
)
3116 elif ee_level
== EELevel
.VNF
:
3117 vca
= get_deployed_vca(
3121 "member-vnf-index": ee_relation
.vnf_profile_id
,
3122 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3126 deployed_component
= DeployedVCA(nsr_id
, vca
)
3127 elif ee_level
== EELevel
.VDU
:
3128 vca
= get_deployed_vca(
3131 "vdu_id": ee_relation
.vdu_profile_id
,
3132 "member-vnf-index": ee_relation
.vnf_profile_id
,
3133 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3137 deployed_component
= DeployedVCA(nsr_id
, vca
)
3138 elif ee_level
== EELevel
.KDU
:
3139 kdu_resource_data
= self
._get
_kdu
_resource
_data
(
3140 ee_relation
, db_nsr
, cached_vnfds
3142 if kdu_resource_data
:
3143 deployed_component
= DeployedK8sResource(kdu_resource_data
)
3144 return deployed_component
3146 async def _add_relation(
3150 db_nsr
: Dict
[str, Any
],
3151 cached_vnfds
: Dict
[str, Any
],
3152 cached_vnfrs
: Dict
[str, Any
],
3154 deployed_provider
= self
._get
_deployed
_component
(
3155 relation
.provider
, db_nsr
, cached_vnfds
3157 deployed_requirer
= self
._get
_deployed
_component
(
3158 relation
.requirer
, db_nsr
, cached_vnfds
3162 and deployed_requirer
3163 and deployed_provider
.config_sw_installed
3164 and deployed_requirer
.config_sw_installed
3166 provider_db_vnfr
= (
3168 relation
.provider
.nsr_id
,
3169 relation
.provider
.vnf_profile_id
,
3172 if relation
.provider
.vnf_profile_id
3175 requirer_db_vnfr
= (
3177 relation
.requirer
.nsr_id
,
3178 relation
.requirer
.vnf_profile_id
,
3181 if relation
.requirer
.vnf_profile_id
3184 provider_vca_id
= self
.get_vca_id(provider_db_vnfr
, db_nsr
)
3185 requirer_vca_id
= self
.get_vca_id(requirer_db_vnfr
, db_nsr
)
3186 provider_relation_endpoint
= RelationEndpoint(
3187 deployed_provider
.ee_id
,
3189 relation
.provider
.endpoint
,
3191 requirer_relation_endpoint
= RelationEndpoint(
3192 deployed_requirer
.ee_id
,
3194 relation
.requirer
.endpoint
,
3196 await self
.vca_map
[vca_type
].add_relation(
3197 provider
=provider_relation_endpoint
,
3198 requirer
=requirer_relation_endpoint
,
3200 # remove entry from relations list
3204 async def _add_vca_relations(
3210 timeout
: int = 3600,
3214 # 1. find all relations for this VCA
3215 # 2. wait for other peers related
3219 # STEP 1: find all relations for this VCA
3222 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3223 nsd
= get_nsd(db_nsr
)
3226 deployed_vca_dict
= get_deployed_vca_list(db_nsr
)[vca_index
]
3227 my_vca
= DeployedVCA(nsr_id
, deployed_vca_dict
)
3232 relations
.extend(self
._get
_ns
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3233 relations
.extend(self
._get
_vnf
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3235 # if no relations, terminate
3237 self
.logger
.debug(logging_text
+ " No relations")
3240 self
.logger
.debug(logging_text
+ " adding relations {}".format(relations
))
3247 if now
- start
>= timeout
:
3248 self
.logger
.error(logging_text
+ " : timeout adding relations")
3251 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3252 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3254 # for each relation, find the VCA's related
3255 for relation
in relations
.copy():
3256 added
= await self
._add
_relation
(
3264 relations
.remove(relation
)
3267 self
.logger
.debug("Relations added")
3269 await asyncio
.sleep(5.0)
3273 except Exception as e
:
3274 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
3277 async def _install_kdu(
3285 k8s_instance_info
: dict,
3286 k8params
: dict = None,
3292 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
3295 "collection": "nsrs",
3296 "filter": {"_id": nsr_id
},
3297 "path": nsr_db_path
,
3300 if k8s_instance_info
.get("kdu-deployment-name"):
3301 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
3303 kdu_instance
= self
.k8scluster_map
[
3305 ].generate_kdu_instance_name(
3306 db_dict
=db_dict_install
,
3307 kdu_model
=k8s_instance_info
["kdu-model"],
3308 kdu_name
=k8s_instance_info
["kdu-name"],
3311 # Update the nsrs table with the kdu-instance value
3315 _desc
={nsr_db_path
+ ".kdu-instance": kdu_instance
},
3318 # Update the nsrs table with the actual namespace being used, if the k8scluster-type is `juju` or
3319 # `juju-bundle`. This verification is needed because there is not a standard/homogeneous namespace
3320 # between the Helm Charts and Juju Bundles-based KNFs. If we found a way of having an homogeneous
3321 # namespace, this first verification could be removed, and the next step would be done for any kind
3323 # TODO -> find a way to have an homogeneous namespace between the Helm Charts and Juju Bundles-based
3324 # KNFs (Bug 2027: https://osm.etsi.org/bugzilla/show_bug.cgi?id=2027)
3325 if k8sclustertype
in ("juju", "juju-bundle"):
3326 # First, verify if the current namespace is present in the `_admin.projects_read` (if not, it means
3327 # that the user passed a namespace which he wants its KDU to be deployed in)
3333 "_admin.projects_write": k8s_instance_info
["namespace"],
3334 "_admin.projects_read": k8s_instance_info
["namespace"],
3340 f
"Updating namespace/model for Juju Bundle from {k8s_instance_info['namespace']} to {kdu_instance}"
3345 _desc
={f
"{nsr_db_path}.namespace": kdu_instance
},
3347 k8s_instance_info
["namespace"] = kdu_instance
3349 await self
.k8scluster_map
[k8sclustertype
].install(
3350 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3351 kdu_model
=k8s_instance_info
["kdu-model"],
3354 db_dict
=db_dict_install
,
3356 kdu_name
=k8s_instance_info
["kdu-name"],
3357 namespace
=k8s_instance_info
["namespace"],
3358 kdu_instance
=kdu_instance
,
3362 # Obtain services to obtain management service ip
3363 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
3364 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3365 kdu_instance
=kdu_instance
,
3366 namespace
=k8s_instance_info
["namespace"],
3369 # Obtain management service info (if exists)
3370 vnfr_update_dict
= {}
3371 kdu_config
= get_configuration(vnfd
, kdud
["name"])
3373 target_ee_list
= kdu_config
.get("execution-environment-list", [])
3378 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
3381 for service
in kdud
.get("service", [])
3382 if service
.get("mgmt-service")
3384 for mgmt_service
in mgmt_services
:
3385 for service
in services
:
3386 if service
["name"].startswith(mgmt_service
["name"]):
3387 # Mgmt service found, Obtain service ip
3388 ip
= service
.get("external_ip", service
.get("cluster_ip"))
3389 if isinstance(ip
, list) and len(ip
) == 1:
3393 "kdur.{}.ip-address".format(kdu_index
)
3396 # Check if must update also mgmt ip at the vnf
3397 service_external_cp
= mgmt_service
.get(
3398 "external-connection-point-ref"
3400 if service_external_cp
:
3402 deep_get(vnfd
, ("mgmt-interface", "cp"))
3403 == service_external_cp
3405 vnfr_update_dict
["ip-address"] = ip
3410 "external-connection-point-ref", ""
3412 == service_external_cp
,
3415 "kdur.{}.ip-address".format(kdu_index
)
3420 "Mgmt service name: {} not found".format(
3421 mgmt_service
["name"]
3425 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3426 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3428 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3431 and kdu_config
.get("initial-config-primitive")
3432 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3434 initial_config_primitive_list
= kdu_config
.get(
3435 "initial-config-primitive"
3437 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3439 for initial_config_primitive
in initial_config_primitive_list
:
3440 primitive_params_
= self
._map
_primitive
_params
(
3441 initial_config_primitive
, {}, {}
3444 await asyncio
.wait_for(
3445 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3446 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3447 kdu_instance
=kdu_instance
,
3448 primitive_name
=initial_config_primitive
["name"],
3449 params
=primitive_params_
,
3450 db_dict
=db_dict_install
,
3456 except Exception as e
:
3457 # Prepare update db with error and raise exception
3460 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3464 vnfr_data
.get("_id"),
3465 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3468 # ignore to keep original exception
3470 # reraise original error
3475 async def deploy_kdus(
3482 task_instantiation_info
,
3484 # Launch kdus if present in the descriptor
3486 k8scluster_id_2_uuic
= {
3487 "helm-chart-v3": {},
3492 async def _get_cluster_id(cluster_id
, cluster_type
):
3493 nonlocal k8scluster_id_2_uuic
3494 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3495 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3497 # check if K8scluster is creating and wait look if previous tasks in process
3498 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3499 "k8scluster", cluster_id
3502 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3503 task_name
, cluster_id
3505 self
.logger
.debug(logging_text
+ text
)
3506 await asyncio
.wait(task_dependency
, timeout
=3600)
3508 db_k8scluster
= self
.db
.get_one(
3509 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3511 if not db_k8scluster
:
3512 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3514 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3516 if cluster_type
== "helm-chart-v3":
3518 # backward compatibility for existing clusters that have not been initialized for helm v3
3519 k8s_credentials
= yaml
.safe_dump(
3520 db_k8scluster
.get("credentials")
3522 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3523 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3525 db_k8scluster_update
= {}
3526 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3527 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3528 db_k8scluster_update
[
3529 "_admin.helm-chart-v3.created"
3531 db_k8scluster_update
[
3532 "_admin.helm-chart-v3.operationalState"
3535 "k8sclusters", cluster_id
, db_k8scluster_update
3537 except Exception as e
:
3540 + "error initializing helm-v3 cluster: {}".format(str(e
))
3543 "K8s cluster '{}' has not been initialized for '{}'".format(
3544 cluster_id
, cluster_type
3549 "K8s cluster '{}' has not been initialized for '{}'".format(
3550 cluster_id
, cluster_type
3553 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3556 logging_text
+= "Deploy kdus: "
3559 db_nsr_update
= {"_admin.deployed.K8s": []}
3560 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3563 updated_cluster_list
= []
3564 updated_v3_cluster_list
= []
3566 for vnfr_data
in db_vnfrs
.values():
3567 vca_id
= self
.get_vca_id(vnfr_data
, {})
3568 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3569 # Step 0: Prepare and set parameters
3570 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3571 vnfd_id
= vnfr_data
.get("vnfd-id")
3572 vnfd_with_id
= find_in_list(
3573 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3577 for kdud
in vnfd_with_id
["kdu"]
3578 if kdud
["name"] == kdur
["kdu-name"]
3580 namespace
= kdur
.get("k8s-namespace")
3581 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3582 if kdur
.get("helm-chart"):
3583 kdumodel
= kdur
["helm-chart"]
3584 # Default version: helm3, if helm-version is v2 assign v2
3585 k8sclustertype
= "helm-chart-v3"
3586 self
.logger
.debug("kdur: {}".format(kdur
))
3588 kdur
.get("helm-version")
3589 and kdur
.get("helm-version") == "v2"
3591 k8sclustertype
= "helm-chart"
3592 elif kdur
.get("juju-bundle"):
3593 kdumodel
= kdur
["juju-bundle"]
3594 k8sclustertype
= "juju-bundle"
3597 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3598 "juju-bundle. Maybe an old NBI version is running".format(
3599 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3602 # check if kdumodel is a file and exists
3604 vnfd_with_id
= find_in_list(
3605 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3607 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3608 if storage
: # may be not present if vnfd has not artifacts
3609 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3610 if storage
["pkg-dir"]:
3611 filename
= "{}/{}/{}s/{}".format(
3618 filename
= "{}/Scripts/{}s/{}".format(
3623 if self
.fs
.file_exists(
3624 filename
, mode
="file"
3625 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3626 kdumodel
= self
.fs
.path
+ filename
3627 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3629 except Exception: # it is not a file
3632 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3633 step
= "Synchronize repos for k8s cluster '{}'".format(
3636 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3640 k8sclustertype
== "helm-chart"
3641 and cluster_uuid
not in updated_cluster_list
3643 k8sclustertype
== "helm-chart-v3"
3644 and cluster_uuid
not in updated_v3_cluster_list
3646 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3647 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3648 cluster_uuid
=cluster_uuid
3651 if del_repo_list
or added_repo_dict
:
3652 if k8sclustertype
== "helm-chart":
3654 "_admin.helm_charts_added." + item
: None
3655 for item
in del_repo_list
3658 "_admin.helm_charts_added." + item
: name
3659 for item
, name
in added_repo_dict
.items()
3661 updated_cluster_list
.append(cluster_uuid
)
3662 elif k8sclustertype
== "helm-chart-v3":
3664 "_admin.helm_charts_v3_added." + item
: None
3665 for item
in del_repo_list
3668 "_admin.helm_charts_v3_added." + item
: name
3669 for item
, name
in added_repo_dict
.items()
3671 updated_v3_cluster_list
.append(cluster_uuid
)
3673 logging_text
+ "repos synchronized on k8s cluster "
3674 "'{}' to_delete: {}, to_add: {}".format(
3675 k8s_cluster_id
, del_repo_list
, added_repo_dict
3680 {"_id": k8s_cluster_id
},
3686 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3687 vnfr_data
["member-vnf-index-ref"],
3691 k8s_instance_info
= {
3692 "kdu-instance": None,
3693 "k8scluster-uuid": cluster_uuid
,
3694 "k8scluster-type": k8sclustertype
,
3695 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3696 "kdu-name": kdur
["kdu-name"],
3697 "kdu-model": kdumodel
,
3698 "namespace": namespace
,
3699 "kdu-deployment-name": kdu_deployment_name
,
3701 db_path
= "_admin.deployed.K8s.{}".format(index
)
3702 db_nsr_update
[db_path
] = k8s_instance_info
3703 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3704 vnfd_with_id
= find_in_list(
3705 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3707 task
= asyncio
.ensure_future(
3716 k8params
=desc_params
,
3721 self
.lcm_tasks
.register(
3725 "instantiate_KDU-{}".format(index
),
3728 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3734 except (LcmException
, asyncio
.CancelledError
):
3736 except Exception as e
:
3737 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3738 if isinstance(e
, (N2VCException
, DbException
)):
3739 self
.logger
.error(logging_text
+ msg
)
3741 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3742 raise LcmException(msg
)
3745 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3764 task_instantiation_info
,
3767 # launch instantiate_N2VC in a asyncio task and register task object
3768 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3769 # if not found, create one entry and update database
3770 # fill db_nsr._admin.deployed.VCA.<index>
3773 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
3777 get_charm_name
= False
3778 if "execution-environment-list" in descriptor_config
:
3779 ee_list
= descriptor_config
.get("execution-environment-list", [])
3780 elif "juju" in descriptor_config
:
3781 ee_list
= [descriptor_config
] # ns charms
3782 if "execution-environment-list" not in descriptor_config
:
3783 # charm name is only required for ns charms
3784 get_charm_name
= True
3785 else: # other types as script are not supported
3788 for ee_item
in ee_list
:
3791 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3792 ee_item
.get("juju"), ee_item
.get("helm-chart")
3795 ee_descriptor_id
= ee_item
.get("id")
3796 if ee_item
.get("juju"):
3797 vca_name
= ee_item
["juju"].get("charm")
3799 charm_name
= self
.find_charm_name(db_nsr
, str(vca_name
))
3802 if ee_item
["juju"].get("charm") is not None
3805 if ee_item
["juju"].get("cloud") == "k8s":
3806 vca_type
= "k8s_proxy_charm"
3807 elif ee_item
["juju"].get("proxy") is False:
3808 vca_type
= "native_charm"
3809 elif ee_item
.get("helm-chart"):
3810 vca_name
= ee_item
["helm-chart"]
3811 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
3814 vca_type
= "helm-v3"
3817 logging_text
+ "skipping non juju neither charm configuration"
3822 for vca_index
, vca_deployed
in enumerate(
3823 db_nsr
["_admin"]["deployed"]["VCA"]
3825 if not vca_deployed
:
3828 vca_deployed
.get("member-vnf-index") == member_vnf_index
3829 and vca_deployed
.get("vdu_id") == vdu_id
3830 and vca_deployed
.get("kdu_name") == kdu_name
3831 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
3832 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
3836 # not found, create one.
3838 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
3841 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
3843 target
+= "/kdu/{}".format(kdu_name
)
3845 "target_element": target
,
3846 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3847 "member-vnf-index": member_vnf_index
,
3849 "kdu_name": kdu_name
,
3850 "vdu_count_index": vdu_index
,
3851 "operational-status": "init", # TODO revise
3852 "detailed-status": "", # TODO revise
3853 "step": "initial-deploy", # TODO revise
3855 "vdu_name": vdu_name
,
3857 "ee_descriptor_id": ee_descriptor_id
,
3858 "charm_name": charm_name
,
3862 # create VCA and configurationStatus in db
3864 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
3865 "configurationStatus.{}".format(vca_index
): dict(),
3867 self
.update_db_2("nsrs", nsr_id
, db_dict
)
3869 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
3871 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
3872 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
3873 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
3876 task_n2vc
= asyncio
.ensure_future(
3877 self
.instantiate_N2VC(
3878 logging_text
=logging_text
,
3879 vca_index
=vca_index
,
3885 vdu_index
=vdu_index
,
3886 deploy_params
=deploy_params
,
3887 config_descriptor
=descriptor_config
,
3888 base_folder
=base_folder
,
3889 nslcmop_id
=nslcmop_id
,
3893 ee_config_descriptor
=ee_item
,
3896 self
.lcm_tasks
.register(
3900 "instantiate_N2VC-{}".format(vca_index
),
3903 task_instantiation_info
[
3905 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
3906 member_vnf_index
or "", vdu_id
or ""
3910 def _create_nslcmop(nsr_id
, operation
, params
):
3912 Creates a ns-lcm-opp content to be stored at database.
3913 :param nsr_id: internal id of the instance
3914 :param operation: instantiate, terminate, scale, action, ...
3915 :param params: user parameters for the operation
3916 :return: dictionary following SOL005 format
3918 # Raise exception if invalid arguments
3919 if not (nsr_id
and operation
and params
):
3921 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3928 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3929 "operationState": "PROCESSING",
3930 "statusEnteredTime": now
,
3931 "nsInstanceId": nsr_id
,
3932 "lcmOperationType": operation
,
3934 "isAutomaticInvocation": False,
3935 "operationParams": params
,
3936 "isCancelPending": False,
3938 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
3939 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
3944 def _format_additional_params(self
, params
):
3945 params
= params
or {}
3946 for key
, value
in params
.items():
3947 if str(value
).startswith("!!yaml "):
3948 params
[key
] = yaml
.safe_load(value
[7:])
3951 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
3952 primitive
= seq
.get("name")
3953 primitive_params
= {}
3955 "member_vnf_index": vnf_index
,
3956 "primitive": primitive
,
3957 "primitive_params": primitive_params
,
3960 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
3964 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
3965 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
3966 if op
.get("operationState") == "COMPLETED":
3967 # b. Skip sub-operation
3968 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3969 return self
.SUBOPERATION_STATUS_SKIP
3971 # c. retry executing sub-operation
3972 # The sub-operation exists, and operationState != 'COMPLETED'
3973 # Update operationState = 'PROCESSING' to indicate a retry.
3974 operationState
= "PROCESSING"
3975 detailed_status
= "In progress"
3976 self
._update
_suboperation
_status
(
3977 db_nslcmop
, op_index
, operationState
, detailed_status
3979 # Return the sub-operation index
3980 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3981 # with arguments extracted from the sub-operation
3984 # Find a sub-operation where all keys in a matching dictionary must match
3985 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3986 def _find_suboperation(self
, db_nslcmop
, match
):
3987 if db_nslcmop
and match
:
3988 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
3989 for i
, op
in enumerate(op_list
):
3990 if all(op
.get(k
) == match
[k
] for k
in match
):
3992 return self
.SUBOPERATION_STATUS_NOT_FOUND
3994 # Update status for a sub-operation given its index
3995 def _update_suboperation_status(
3996 self
, db_nslcmop
, op_index
, operationState
, detailed_status
3998 # Update DB for HA tasks
3999 q_filter
= {"_id": db_nslcmop
["_id"]}
4001 "_admin.operations.{}.operationState".format(op_index
): operationState
,
4002 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
4005 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
4008 # Add sub-operation, return the index of the added sub-operation
4009 # Optionally, set operationState, detailed-status, and operationType
4010 # Status and type are currently set for 'scale' sub-operations:
4011 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
4012 # 'detailed-status' : status message
4013 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
4014 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
4015 def _add_suboperation(
4023 mapped_primitive_params
,
4024 operationState
=None,
4025 detailed_status
=None,
4028 RO_scaling_info
=None,
4031 return self
.SUBOPERATION_STATUS_NOT_FOUND
4032 # Get the "_admin.operations" list, if it exists
4033 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
4034 op_list
= db_nslcmop_admin
.get("operations")
4035 # Create or append to the "_admin.operations" list
4037 "member_vnf_index": vnf_index
,
4039 "vdu_count_index": vdu_count_index
,
4040 "primitive": primitive
,
4041 "primitive_params": mapped_primitive_params
,
4044 new_op
["operationState"] = operationState
4046 new_op
["detailed-status"] = detailed_status
4048 new_op
["lcmOperationType"] = operationType
4050 new_op
["RO_nsr_id"] = RO_nsr_id
4052 new_op
["RO_scaling_info"] = RO_scaling_info
4054 # No existing operations, create key 'operations' with current operation as first list element
4055 db_nslcmop_admin
.update({"operations": [new_op
]})
4056 op_list
= db_nslcmop_admin
.get("operations")
4058 # Existing operations, append operation to list
4059 op_list
.append(new_op
)
4061 db_nslcmop_update
= {"_admin.operations": op_list
}
4062 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
4063 op_index
= len(op_list
) - 1
4066 # Helper methods for scale() sub-operations
4068 # pre-scale/post-scale:
4069 # Check for 3 different cases:
4070 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
4071 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
4072 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
4073 def _check_or_add_scale_suboperation(
4077 vnf_config_primitive
,
4081 RO_scaling_info
=None,
4083 # Find this sub-operation
4084 if RO_nsr_id
and RO_scaling_info
:
4085 operationType
= "SCALE-RO"
4087 "member_vnf_index": vnf_index
,
4088 "RO_nsr_id": RO_nsr_id
,
4089 "RO_scaling_info": RO_scaling_info
,
4093 "member_vnf_index": vnf_index
,
4094 "primitive": vnf_config_primitive
,
4095 "primitive_params": primitive_params
,
4096 "lcmOperationType": operationType
,
4098 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
4099 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
4100 # a. New sub-operation
4101 # The sub-operation does not exist, add it.
4102 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
4103 # The following parameters are set to None for all kind of scaling:
4105 vdu_count_index
= None
4107 if RO_nsr_id
and RO_scaling_info
:
4108 vnf_config_primitive
= None
4109 primitive_params
= None
4112 RO_scaling_info
= None
4113 # Initial status for sub-operation
4114 operationState
= "PROCESSING"
4115 detailed_status
= "In progress"
4116 # Add sub-operation for pre/post-scaling (zero or more operations)
4117 self
._add
_suboperation
(
4123 vnf_config_primitive
,
4131 return self
.SUBOPERATION_STATUS_NEW
4133 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
4134 # or op_index (operationState != 'COMPLETED')
4135 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
4137 # Function to return execution_environment id
4139 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
4140 # TODO vdu_index_count
4141 for vca
in vca_deployed_list
:
4142 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
4145 async def destroy_N2VC(
4153 exec_primitives
=True,
4158 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
4159 :param logging_text:
4161 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
4162 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
4163 :param vca_index: index in the database _admin.deployed.VCA
4164 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
4165 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
4166 not executed properly
4167 :param scaling_in: True destroys the application, False destroys the model
4168 :return: None or exception
4173 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
4174 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
4178 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
4180 # execute terminate_primitives
4182 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
4183 config_descriptor
.get("terminate-config-primitive"),
4184 vca_deployed
.get("ee_descriptor_id"),
4186 vdu_id
= vca_deployed
.get("vdu_id")
4187 vdu_count_index
= vca_deployed
.get("vdu_count_index")
4188 vdu_name
= vca_deployed
.get("vdu_name")
4189 vnf_index
= vca_deployed
.get("member-vnf-index")
4190 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
4191 for seq
in terminate_primitives
:
4192 # For each sequence in list, get primitive and call _ns_execute_primitive()
4193 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
4194 vnf_index
, seq
.get("name")
4196 self
.logger
.debug(logging_text
+ step
)
4197 # Create the primitive for each sequence, i.e. "primitive": "touch"
4198 primitive
= seq
.get("name")
4199 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
4204 self
._add
_suboperation
(
4211 mapped_primitive_params
,
4213 # Sub-operations: Call _ns_execute_primitive() instead of action()
4215 result
, result_detail
= await self
._ns
_execute
_primitive
(
4216 vca_deployed
["ee_id"],
4218 mapped_primitive_params
,
4222 except LcmException
:
4223 # this happens when VCA is not deployed. In this case it is not needed to terminate
4225 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
4226 if result
not in result_ok
:
4228 "terminate_primitive {} for vnf_member_index={} fails with "
4229 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
4231 # set that this VCA do not need terminated
4232 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
4236 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
4239 # Delete Prometheus Jobs if any
4240 # This uses NSR_ID, so it will destroy any jobs under this index
4241 self
.db
.del_list("prometheus_jobs", {"nsr_id": db_nslcmop
["nsInstanceId"]})
4244 await self
.vca_map
[vca_type
].delete_execution_environment(
4245 vca_deployed
["ee_id"],
4246 scaling_in
=scaling_in
,
4251 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
4252 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
4253 namespace
= "." + db_nsr
["_id"]
4255 await self
.n2vc
.delete_namespace(
4256 namespace
=namespace
,
4257 total_timeout
=self
.timeout
.charm_delete
,
4260 except N2VCNotFound
: # already deleted. Skip
4262 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
4264 async def _terminate_RO(
4265 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4268 Terminates a deployment from RO
4269 :param logging_text:
4270 :param nsr_deployed: db_nsr._admin.deployed
4273 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
4274 this method will update only the index 2, but it will write on database the concatenated content of the list
4279 ro_nsr_id
= ro_delete_action
= None
4280 if nsr_deployed
and nsr_deployed
.get("RO"):
4281 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
4282 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
4285 stage
[2] = "Deleting ns from VIM."
4286 db_nsr_update
["detailed-status"] = " ".join(stage
)
4287 self
._write
_op
_status
(nslcmop_id
, stage
)
4288 self
.logger
.debug(logging_text
+ stage
[2])
4289 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4290 self
._write
_op
_status
(nslcmop_id
, stage
)
4291 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
4292 ro_delete_action
= desc
["action_id"]
4294 "_admin.deployed.RO.nsr_delete_action_id"
4295 ] = ro_delete_action
4296 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4297 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4298 if ro_delete_action
:
4299 # wait until NS is deleted from VIM
4300 stage
[2] = "Waiting ns deleted from VIM."
4301 detailed_status_old
= None
4305 + " RO_id={} ro_delete_action={}".format(
4306 ro_nsr_id
, ro_delete_action
4309 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4310 self
._write
_op
_status
(nslcmop_id
, stage
)
4312 delete_timeout
= 20 * 60 # 20 minutes
4313 while delete_timeout
> 0:
4314 desc
= await self
.RO
.show(
4316 item_id_name
=ro_nsr_id
,
4317 extra_item
="action",
4318 extra_item_id
=ro_delete_action
,
4322 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
4324 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
4325 if ns_status
== "ERROR":
4326 raise ROclient
.ROClientException(ns_status_info
)
4327 elif ns_status
== "BUILD":
4328 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
4329 elif ns_status
== "ACTIVE":
4330 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4331 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4336 ), "ROclient.check_action_status returns unknown {}".format(
4339 if stage
[2] != detailed_status_old
:
4340 detailed_status_old
= stage
[2]
4341 db_nsr_update
["detailed-status"] = " ".join(stage
)
4342 self
._write
_op
_status
(nslcmop_id
, stage
)
4343 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4344 await asyncio
.sleep(5, loop
=self
.loop
)
4346 else: # delete_timeout <= 0:
4347 raise ROclient
.ROClientException(
4348 "Timeout waiting ns deleted from VIM"
4351 except Exception as e
:
4352 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4354 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4356 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4357 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4358 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4360 logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
)
4363 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4365 failed_detail
.append("delete conflict: {}".format(e
))
4368 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
)
4371 failed_detail
.append("delete error: {}".format(e
))
4373 logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
)
4377 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
4378 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
4380 stage
[2] = "Deleting nsd from RO."
4381 db_nsr_update
["detailed-status"] = " ".join(stage
)
4382 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4383 self
._write
_op
_status
(nslcmop_id
, stage
)
4384 await self
.RO
.delete("nsd", ro_nsd_id
)
4386 logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
)
4388 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4389 except Exception as e
:
4391 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4393 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4395 logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
)
4398 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4400 failed_detail
.append(
4401 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
)
4403 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4405 failed_detail
.append(
4406 "ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
)
4408 self
.logger
.error(logging_text
+ failed_detail
[-1])
4410 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
4411 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
4412 if not vnf_deployed
or not vnf_deployed
["id"]:
4415 ro_vnfd_id
= vnf_deployed
["id"]
4418 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4419 vnf_deployed
["member-vnf-index"], ro_vnfd_id
4421 db_nsr_update
["detailed-status"] = " ".join(stage
)
4422 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4423 self
._write
_op
_status
(nslcmop_id
, stage
)
4424 await self
.RO
.delete("vnfd", ro_vnfd_id
)
4426 logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
)
4428 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
4429 except Exception as e
:
4431 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4434 "_admin.deployed.RO.vnfd.{}.id".format(index
)
4438 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
)
4441 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4443 failed_detail
.append(
4444 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
)
4446 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4448 failed_detail
.append(
4449 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
)
4451 self
.logger
.error(logging_text
+ failed_detail
[-1])
4454 stage
[2] = "Error deleting from VIM"
4456 stage
[2] = "Deleted from VIM"
4457 db_nsr_update
["detailed-status"] = " ".join(stage
)
4458 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4459 self
._write
_op
_status
(nslcmop_id
, stage
)
4462 raise LcmException("; ".join(failed_detail
))
4464 async def terminate(self
, nsr_id
, nslcmop_id
):
4465 # Try to lock HA task here
4466 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4467 if not task_is_locked_by_me
:
4470 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4471 self
.logger
.debug(logging_text
+ "Enter")
4472 timeout_ns_terminate
= self
.timeout
.ns_terminate
4475 operation_params
= None
4477 error_list
= [] # annotates all failed error messages
4478 db_nslcmop_update
= {}
4479 autoremove
= False # autoremove after terminated
4480 tasks_dict_info
= {}
4483 "Stage 1/3: Preparing task.",
4484 "Waiting for previous operations to terminate.",
4487 # ^ contains [stage, step, VIM-status]
4489 # wait for any previous tasks in process
4490 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4492 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4493 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4494 operation_params
= db_nslcmop
.get("operationParams") or {}
4495 if operation_params
.get("timeout_ns_terminate"):
4496 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4497 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4498 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4500 db_nsr_update
["operational-status"] = "terminating"
4501 db_nsr_update
["config-status"] = "terminating"
4502 self
._write
_ns
_status
(
4504 ns_state
="TERMINATING",
4505 current_operation
="TERMINATING",
4506 current_operation_id
=nslcmop_id
,
4507 other_update
=db_nsr_update
,
4509 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4510 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4511 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4514 stage
[1] = "Getting vnf descriptors from db."
4515 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4517 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4519 db_vnfds_from_id
= {}
4520 db_vnfds_from_member_index
= {}
4522 for vnfr
in db_vnfrs_list
:
4523 vnfd_id
= vnfr
["vnfd-id"]
4524 if vnfd_id
not in db_vnfds_from_id
:
4525 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4526 db_vnfds_from_id
[vnfd_id
] = vnfd
4527 db_vnfds_from_member_index
[
4528 vnfr
["member-vnf-index-ref"]
4529 ] = db_vnfds_from_id
[vnfd_id
]
4531 # Destroy individual execution environments when there are terminating primitives.
4532 # Rest of EE will be deleted at once
4533 # TODO - check before calling _destroy_N2VC
4534 # if not operation_params.get("skip_terminate_primitives"):#
4535 # or not vca.get("needed_terminate"):
4536 stage
[0] = "Stage 2/3 execute terminating primitives."
4537 self
.logger
.debug(logging_text
+ stage
[0])
4538 stage
[1] = "Looking execution environment that needs terminate."
4539 self
.logger
.debug(logging_text
+ stage
[1])
4541 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4542 config_descriptor
= None
4543 vca_member_vnf_index
= vca
.get("member-vnf-index")
4544 vca_id
= self
.get_vca_id(
4545 db_vnfrs_dict
.get(vca_member_vnf_index
)
4546 if vca_member_vnf_index
4550 if not vca
or not vca
.get("ee_id"):
4552 if not vca
.get("member-vnf-index"):
4554 config_descriptor
= db_nsr
.get("ns-configuration")
4555 elif vca
.get("vdu_id"):
4556 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4557 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4558 elif vca
.get("kdu_name"):
4559 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4560 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4562 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4563 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4564 vca_type
= vca
.get("type")
4565 exec_terminate_primitives
= not operation_params
.get(
4566 "skip_terminate_primitives"
4567 ) and vca
.get("needed_terminate")
4568 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4569 # pending native charms
4571 True if vca_type
in ("helm", "helm-v3", "native_charm") else False
4573 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4574 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4575 task
= asyncio
.ensure_future(
4583 exec_terminate_primitives
,
4587 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4589 # wait for pending tasks of terminate primitives
4593 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4595 error_list
= await self
._wait
_for
_tasks
(
4598 min(self
.timeout
.charm_delete
, timeout_ns_terminate
),
4602 tasks_dict_info
.clear()
4604 return # raise LcmException("; ".join(error_list))
4606 # remove All execution environments at once
4607 stage
[0] = "Stage 3/3 delete all."
4609 if nsr_deployed
.get("VCA"):
4610 stage
[1] = "Deleting all execution environments."
4611 self
.logger
.debug(logging_text
+ stage
[1])
4612 vca_id
= self
.get_vca_id({}, db_nsr
)
4613 task_delete_ee
= asyncio
.ensure_future(
4615 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4616 timeout
=self
.timeout
.charm_delete
,
4619 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4620 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4622 # Delete Namespace and Certificates if necessary
4623 if check_helm_ee_in_ns(list(db_vnfds_from_member_index
.values())):
4624 await self
.vca_map
["helm-v3"].delete_tls_certificate(
4625 certificate_name
=db_nslcmop
["nsInstanceId"],
4627 # TODO: Delete namespace
4629 # Delete from k8scluster
4630 stage
[1] = "Deleting KDUs."
4631 self
.logger
.debug(logging_text
+ stage
[1])
4632 # print(nsr_deployed)
4633 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4634 if not kdu
or not kdu
.get("kdu-instance"):
4636 kdu_instance
= kdu
.get("kdu-instance")
4637 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4638 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4639 vca_id
= self
.get_vca_id({}, db_nsr
)
4640 task_delete_kdu_instance
= asyncio
.ensure_future(
4641 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4642 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4643 kdu_instance
=kdu_instance
,
4645 namespace
=kdu
.get("namespace"),
4651 + "Unknown k8s deployment type {}".format(
4652 kdu
.get("k8scluster-type")
4657 task_delete_kdu_instance
4658 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4661 stage
[1] = "Deleting ns from VIM."
4662 if self
.ro_config
.ng
:
4663 task_delete_ro
= asyncio
.ensure_future(
4664 self
._terminate
_ng
_ro
(
4665 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4669 task_delete_ro
= asyncio
.ensure_future(
4671 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4674 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4676 # rest of staff will be done at finally
4679 ROclient
.ROClientException
,
4684 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4686 except asyncio
.CancelledError
:
4688 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4690 exc
= "Operation was cancelled"
4691 except Exception as e
:
4692 exc
= traceback
.format_exc()
4693 self
.logger
.critical(
4694 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4699 error_list
.append(str(exc
))
4701 # wait for pending tasks
4703 stage
[1] = "Waiting for terminate pending tasks."
4704 self
.logger
.debug(logging_text
+ stage
[1])
4705 error_list
+= await self
._wait
_for
_tasks
(
4708 timeout_ns_terminate
,
4712 stage
[1] = stage
[2] = ""
4713 except asyncio
.CancelledError
:
4714 error_list
.append("Cancelled")
4715 # TODO cancell all tasks
4716 except Exception as exc
:
4717 error_list
.append(str(exc
))
4718 # update status at database
4720 error_detail
= "; ".join(error_list
)
4721 # self.logger.error(logging_text + error_detail)
4722 error_description_nslcmop
= "{} Detail: {}".format(
4723 stage
[0], error_detail
4725 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4726 nslcmop_id
, stage
[0]
4729 db_nsr_update
["operational-status"] = "failed"
4730 db_nsr_update
["detailed-status"] = (
4731 error_description_nsr
+ " Detail: " + error_detail
4733 db_nslcmop_update
["detailed-status"] = error_detail
4734 nslcmop_operation_state
= "FAILED"
4738 error_description_nsr
= error_description_nslcmop
= None
4739 ns_state
= "NOT_INSTANTIATED"
4740 db_nsr_update
["operational-status"] = "terminated"
4741 db_nsr_update
["detailed-status"] = "Done"
4742 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4743 db_nslcmop_update
["detailed-status"] = "Done"
4744 nslcmop_operation_state
= "COMPLETED"
4747 self
._write
_ns
_status
(
4750 current_operation
="IDLE",
4751 current_operation_id
=None,
4752 error_description
=error_description_nsr
,
4753 error_detail
=error_detail
,
4754 other_update
=db_nsr_update
,
4756 self
._write
_op
_status
(
4759 error_message
=error_description_nslcmop
,
4760 operation_state
=nslcmop_operation_state
,
4761 other_update
=db_nslcmop_update
,
4763 if ns_state
== "NOT_INSTANTIATED":
4767 {"nsr-id-ref": nsr_id
},
4768 {"_admin.nsState": "NOT_INSTANTIATED"},
4770 except DbException
as e
:
4773 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4777 if operation_params
:
4778 autoremove
= operation_params
.get("autoremove", False)
4779 if nslcmop_operation_state
:
4781 await self
.msg
.aiowrite(
4786 "nslcmop_id": nslcmop_id
,
4787 "operationState": nslcmop_operation_state
,
4788 "autoremove": autoremove
,
4792 except Exception as e
:
4794 logging_text
+ "kafka_write notification Exception {}".format(e
)
4797 self
.logger
.debug(logging_text
+ "Exit")
4798 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4800 async def _wait_for_tasks(
4801 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4804 error_detail_list
= []
4806 pending_tasks
= list(created_tasks_info
.keys())
4807 num_tasks
= len(pending_tasks
)
4809 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4810 self
._write
_op
_status
(nslcmop_id
, stage
)
4811 while pending_tasks
:
4813 _timeout
= timeout
+ time_start
- time()
4814 done
, pending_tasks
= await asyncio
.wait(
4815 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4817 num_done
+= len(done
)
4818 if not done
: # Timeout
4819 for task
in pending_tasks
:
4820 new_error
= created_tasks_info
[task
] + ": Timeout"
4821 error_detail_list
.append(new_error
)
4822 error_list
.append(new_error
)
4825 if task
.cancelled():
4828 exc
= task
.exception()
4830 if isinstance(exc
, asyncio
.TimeoutError
):
4832 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4833 error_list
.append(created_tasks_info
[task
])
4834 error_detail_list
.append(new_error
)
4841 ROclient
.ROClientException
,
4847 self
.logger
.error(logging_text
+ new_error
)
4849 exc_traceback
= "".join(
4850 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4854 + created_tasks_info
[task
]
4860 logging_text
+ created_tasks_info
[task
] + ": Done"
4862 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4864 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4865 if nsr_id
: # update also nsr
4870 "errorDescription": "Error at: " + ", ".join(error_list
),
4871 "errorDetail": ". ".join(error_detail_list
),
4874 self
._write
_op
_status
(nslcmop_id
, stage
)
4875 return error_detail_list
4878 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4880 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4881 The default-value is used. If it is between < > it look for a value at instantiation_params
4882 :param primitive_desc: portion of VNFD/NSD that describes primitive
4883 :param params: Params provided by user
4884 :param instantiation_params: Instantiation params provided by user
4885 :return: a dictionary with the calculated params
4887 calculated_params
= {}
4888 for parameter
in primitive_desc
.get("parameter", ()):
4889 param_name
= parameter
["name"]
4890 if param_name
in params
:
4891 calculated_params
[param_name
] = params
[param_name
]
4892 elif "default-value" in parameter
or "value" in parameter
:
4893 if "value" in parameter
:
4894 calculated_params
[param_name
] = parameter
["value"]
4896 calculated_params
[param_name
] = parameter
["default-value"]
4898 isinstance(calculated_params
[param_name
], str)
4899 and calculated_params
[param_name
].startswith("<")
4900 and calculated_params
[param_name
].endswith(">")
4902 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4903 calculated_params
[param_name
] = instantiation_params
[
4904 calculated_params
[param_name
][1:-1]
4908 "Parameter {} needed to execute primitive {} not provided".format(
4909 calculated_params
[param_name
], primitive_desc
["name"]
4914 "Parameter {} needed to execute primitive {} not provided".format(
4915 param_name
, primitive_desc
["name"]
4919 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4920 calculated_params
[param_name
] = yaml
.safe_dump(
4921 calculated_params
[param_name
], default_flow_style
=True, width
=256
4923 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4925 ].startswith("!!yaml "):
4926 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4927 if parameter
.get("data-type") == "INTEGER":
4929 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4930 except ValueError: # error converting string to int
4932 "Parameter {} of primitive {} must be integer".format(
4933 param_name
, primitive_desc
["name"]
4936 elif parameter
.get("data-type") == "BOOLEAN":
4937 calculated_params
[param_name
] = not (
4938 (str(calculated_params
[param_name
])).lower() == "false"
4941 # add always ns_config_info if primitive name is config
4942 if primitive_desc
["name"] == "config":
4943 if "ns_config_info" in instantiation_params
:
4944 calculated_params
["ns_config_info"] = instantiation_params
[
4947 return calculated_params
4949 def _look_for_deployed_vca(
4956 ee_descriptor_id
=None,
4958 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4959 for vca
in deployed_vca
:
4962 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4965 vdu_count_index
is not None
4966 and vdu_count_index
!= vca
["vdu_count_index"]
4969 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4971 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4975 # vca_deployed not found
4977 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4978 " is not deployed".format(
4987 ee_id
= vca
.get("ee_id")
4989 "type", "lxc_proxy_charm"
4990 ) # default value for backward compatibility - proxy charm
4993 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4994 "execution environment".format(
4995 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
4998 return ee_id
, vca_type
5000 async def _ns_execute_primitive(
5006 retries_interval
=30,
5013 if primitive
== "config":
5014 primitive_params
= {"params": primitive_params
}
5016 vca_type
= vca_type
or "lxc_proxy_charm"
5020 output
= await asyncio
.wait_for(
5021 self
.vca_map
[vca_type
].exec_primitive(
5023 primitive_name
=primitive
,
5024 params_dict
=primitive_params
,
5025 progress_timeout
=self
.timeout
.progress_primitive
,
5026 total_timeout
=self
.timeout
.primitive
,
5031 timeout
=timeout
or self
.timeout
.primitive
,
5035 except asyncio
.CancelledError
:
5037 except Exception as e
:
5041 "Error executing action {} on {} -> {}".format(
5046 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
5048 if isinstance(e
, asyncio
.TimeoutError
):
5050 message
="Timed out waiting for action to complete"
5052 return "FAILED", getattr(e
, "message", repr(e
))
5054 return "COMPLETED", output
5056 except (LcmException
, asyncio
.CancelledError
):
5058 except Exception as e
:
5059 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
5061 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
5063 Updating the vca_status with latest juju information in nsrs record
5064 :param: nsr_id: Id of the nsr
5065 :param: nslcmop_id: Id of the nslcmop
5069 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
5070 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5071 vca_id
= self
.get_vca_id({}, db_nsr
)
5072 if db_nsr
["_admin"]["deployed"]["K8s"]:
5073 for _
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
5074 cluster_uuid
, kdu_instance
, cluster_type
= (
5075 k8s
["k8scluster-uuid"],
5076 k8s
["kdu-instance"],
5077 k8s
["k8scluster-type"],
5079 await self
._on
_update
_k
8s
_db
(
5080 cluster_uuid
=cluster_uuid
,
5081 kdu_instance
=kdu_instance
,
5082 filter={"_id": nsr_id
},
5084 cluster_type
=cluster_type
,
5087 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
5088 table
, filter = "nsrs", {"_id": nsr_id
}
5089 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
5090 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
5092 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
5093 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
5095 async def action(self
, nsr_id
, nslcmop_id
):
5096 # Try to lock HA task here
5097 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5098 if not task_is_locked_by_me
:
5101 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
5102 self
.logger
.debug(logging_text
+ "Enter")
5103 # get all needed from database
5107 db_nslcmop_update
= {}
5108 nslcmop_operation_state
= None
5109 error_description_nslcmop
= None
5112 # wait for any previous tasks in process
5113 step
= "Waiting for previous operations to terminate"
5114 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5116 self
._write
_ns
_status
(
5119 current_operation
="RUNNING ACTION",
5120 current_operation_id
=nslcmop_id
,
5123 step
= "Getting information from database"
5124 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5125 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5126 if db_nslcmop
["operationParams"].get("primitive_params"):
5127 db_nslcmop
["operationParams"]["primitive_params"] = json
.loads(
5128 db_nslcmop
["operationParams"]["primitive_params"]
5131 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5132 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
5133 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
5134 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
5135 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
5136 primitive
= db_nslcmop
["operationParams"]["primitive"]
5137 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
5138 timeout_ns_action
= db_nslcmop
["operationParams"].get(
5139 "timeout_ns_action", self
.timeout
.primitive
5143 step
= "Getting vnfr from database"
5144 db_vnfr
= self
.db
.get_one(
5145 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
5147 if db_vnfr
.get("kdur"):
5149 for kdur
in db_vnfr
["kdur"]:
5150 if kdur
.get("additionalParams"):
5151 kdur
["additionalParams"] = json
.loads(
5152 kdur
["additionalParams"]
5154 kdur_list
.append(kdur
)
5155 db_vnfr
["kdur"] = kdur_list
5156 step
= "Getting vnfd from database"
5157 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
5159 # Sync filesystem before running a primitive
5160 self
.fs
.sync(db_vnfr
["vnfd-id"])
5162 step
= "Getting nsd from database"
5163 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
5165 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5166 # for backward compatibility
5167 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5168 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5169 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5170 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5172 # look for primitive
5173 config_primitive_desc
= descriptor_configuration
= None
5175 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
5177 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
5179 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
5181 descriptor_configuration
= db_nsd
.get("ns-configuration")
5183 if descriptor_configuration
and descriptor_configuration
.get(
5186 for config_primitive
in descriptor_configuration
["config-primitive"]:
5187 if config_primitive
["name"] == primitive
:
5188 config_primitive_desc
= config_primitive
5191 if not config_primitive_desc
:
5192 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
5194 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
5198 primitive_name
= primitive
5199 ee_descriptor_id
= None
5201 primitive_name
= config_primitive_desc
.get(
5202 "execution-environment-primitive", primitive
5204 ee_descriptor_id
= config_primitive_desc
.get(
5205 "execution-environment-ref"
5211 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
5213 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
5216 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
5218 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
5220 desc_params
= parse_yaml_strings(
5221 db_vnfr
.get("additionalParamsForVnf")
5224 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
5225 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
5226 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
5228 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
5229 actions
.add(primitive
["name"])
5230 for primitive
in kdu_configuration
.get("config-primitive", []):
5231 actions
.add(primitive
["name"])
5233 nsr_deployed
["K8s"],
5234 lambda kdu
: kdu_name
== kdu
["kdu-name"]
5235 and kdu
["member-vnf-index"] == vnf_index
,
5239 if primitive_name
in actions
5240 and kdu
["k8scluster-type"] not in ("helm-chart", "helm-chart-v3")
5244 # TODO check if ns is in a proper status
5246 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
5248 # kdur and desc_params already set from before
5249 if primitive_params
:
5250 desc_params
.update(primitive_params
)
5251 # TODO Check if we will need something at vnf level
5252 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
5254 kdu_name
== kdu
["kdu-name"]
5255 and kdu
["member-vnf-index"] == vnf_index
5260 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
5263 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
5264 msg
= "unknown k8scluster-type '{}'".format(
5265 kdu
.get("k8scluster-type")
5267 raise LcmException(msg
)
5270 "collection": "nsrs",
5271 "filter": {"_id": nsr_id
},
5272 "path": "_admin.deployed.K8s.{}".format(index
),
5276 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
5278 step
= "Executing kdu {}".format(primitive_name
)
5279 if primitive_name
== "upgrade":
5280 if desc_params
.get("kdu_model"):
5281 kdu_model
= desc_params
.get("kdu_model")
5282 del desc_params
["kdu_model"]
5284 kdu_model
= kdu
.get("kdu-model")
5285 parts
= kdu_model
.split(sep
=":")
5287 kdu_model
= parts
[0]
5288 if desc_params
.get("kdu_atomic_upgrade"):
5289 atomic_upgrade
= desc_params
.get(
5290 "kdu_atomic_upgrade"
5291 ).lower() in ("yes", "true", "1")
5292 del desc_params
["kdu_atomic_upgrade"]
5294 atomic_upgrade
= True
5296 detailed_status
= await asyncio
.wait_for(
5297 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
5298 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5299 kdu_instance
=kdu
.get("kdu-instance"),
5300 atomic
=atomic_upgrade
,
5301 kdu_model
=kdu_model
,
5304 timeout
=timeout_ns_action
,
5306 timeout
=timeout_ns_action
+ 10,
5309 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
5311 elif primitive_name
== "rollback":
5312 detailed_status
= await asyncio
.wait_for(
5313 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
5314 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5315 kdu_instance
=kdu
.get("kdu-instance"),
5318 timeout
=timeout_ns_action
,
5320 elif primitive_name
== "status":
5321 detailed_status
= await asyncio
.wait_for(
5322 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
5323 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5324 kdu_instance
=kdu
.get("kdu-instance"),
5327 timeout
=timeout_ns_action
,
5330 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
5331 kdu
["kdu-name"], nsr_id
5333 params
= self
._map
_primitive
_params
(
5334 config_primitive_desc
, primitive_params
, desc_params
5337 detailed_status
= await asyncio
.wait_for(
5338 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
5339 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5340 kdu_instance
=kdu_instance
,
5341 primitive_name
=primitive_name
,
5344 timeout
=timeout_ns_action
,
5347 timeout
=timeout_ns_action
,
5351 nslcmop_operation_state
= "COMPLETED"
5353 detailed_status
= ""
5354 nslcmop_operation_state
= "FAILED"
5356 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5357 nsr_deployed
["VCA"],
5358 member_vnf_index
=vnf_index
,
5360 vdu_count_index
=vdu_count_index
,
5361 ee_descriptor_id
=ee_descriptor_id
,
5363 for vca_index
, vca_deployed
in enumerate(
5364 db_nsr
["_admin"]["deployed"]["VCA"]
5366 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5368 "collection": "nsrs",
5369 "filter": {"_id": nsr_id
},
5370 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
5374 nslcmop_operation_state
,
5376 ) = await self
._ns
_execute
_primitive
(
5378 primitive
=primitive_name
,
5379 primitive_params
=self
._map
_primitive
_params
(
5380 config_primitive_desc
, primitive_params
, desc_params
5382 timeout
=timeout_ns_action
,
5388 db_nslcmop_update
["detailed-status"] = detailed_status
5389 error_description_nslcmop
= (
5390 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
5394 + "Done with result {} {}".format(
5395 nslcmop_operation_state
, detailed_status
5398 return # database update is called inside finally
5400 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5401 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5403 except asyncio
.CancelledError
:
5405 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5407 exc
= "Operation was cancelled"
5408 except asyncio
.TimeoutError
:
5409 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5411 except Exception as e
:
5412 exc
= traceback
.format_exc()
5413 self
.logger
.critical(
5414 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5423 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5424 nslcmop_operation_state
= "FAILED"
5426 self
._write
_ns
_status
(
5430 ], # TODO check if degraded. For the moment use previous status
5431 current_operation
="IDLE",
5432 current_operation_id
=None,
5433 # error_description=error_description_nsr,
5434 # error_detail=error_detail,
5435 other_update
=db_nsr_update
,
5438 self
._write
_op
_status
(
5441 error_message
=error_description_nslcmop
,
5442 operation_state
=nslcmop_operation_state
,
5443 other_update
=db_nslcmop_update
,
5446 if nslcmop_operation_state
:
5448 await self
.msg
.aiowrite(
5453 "nslcmop_id": nslcmop_id
,
5454 "operationState": nslcmop_operation_state
,
5458 except Exception as e
:
5460 logging_text
+ "kafka_write notification Exception {}".format(e
)
5462 self
.logger
.debug(logging_text
+ "Exit")
5463 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5464 return nslcmop_operation_state
, detailed_status
5466 async def terminate_vdus(
5467 self
, db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
5469 """This method terminates VDUs
5472 db_vnfr: VNF instance record
5473 member_vnf_index: VNF index to identify the VDUs to be removed
5474 db_nsr: NS instance record
5475 update_db_nslcmops: Nslcmop update record
5477 vca_scaling_info
= []
5478 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5479 scaling_info
["scaling_direction"] = "IN"
5480 scaling_info
["vdu-delete"] = {}
5481 scaling_info
["kdu-delete"] = {}
5482 db_vdur
= db_vnfr
.get("vdur")
5483 vdur_list
= copy(db_vdur
)
5485 for index
, vdu
in enumerate(vdur_list
):
5486 vca_scaling_info
.append(
5488 "osm_vdu_id": vdu
["vdu-id-ref"],
5489 "member-vnf-index": member_vnf_index
,
5491 "vdu_index": count_index
,
5494 scaling_info
["vdu-delete"][vdu
["vdu-id-ref"]] = count_index
5495 scaling_info
["vdu"].append(
5497 "name": vdu
.get("name") or vdu
.get("vdu-name"),
5498 "vdu_id": vdu
["vdu-id-ref"],
5502 for interface
in vdu
["interfaces"]:
5503 scaling_info
["vdu"][index
]["interface"].append(
5505 "name": interface
["name"],
5506 "ip_address": interface
["ip-address"],
5507 "mac_address": interface
.get("mac-address"),
5510 self
.logger
.info("NS update scaling info{}".format(scaling_info
))
5511 stage
[2] = "Terminating VDUs"
5512 if scaling_info
.get("vdu-delete"):
5513 # scale_process = "RO"
5514 if self
.ro_config
.ng
:
5515 await self
._scale
_ng
_ro
(
5524 async def remove_vnf(self
, nsr_id
, nslcmop_id
, vnf_instance_id
):
5525 """This method is to Remove VNF instances from NS.
5528 nsr_id: NS instance id
5529 nslcmop_id: nslcmop id of update
5530 vnf_instance_id: id of the VNF instance to be removed
5533 result: (str, str) COMPLETED/FAILED, details
5537 logging_text
= "Task ns={} update ".format(nsr_id
)
5538 check_vnfr_count
= len(self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}))
5539 self
.logger
.info("check_vnfr_count {}".format(check_vnfr_count
))
5540 if check_vnfr_count
> 1:
5541 stage
= ["", "", ""]
5542 step
= "Getting nslcmop from database"
5544 step
+ " after having waited for previous tasks to be completed"
5546 # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5547 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5548 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5549 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5550 """ db_vnfr = self.db.get_one(
5551 "vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id}) """
5553 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5554 await self
.terminate_vdus(
5563 constituent_vnfr
= db_nsr
.get("constituent-vnfr-ref")
5564 constituent_vnfr
.remove(db_vnfr
.get("_id"))
5565 db_nsr_update
["constituent-vnfr-ref"] = db_nsr
.get(
5566 "constituent-vnfr-ref"
5568 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5569 self
.db
.del_one("vnfrs", {"_id": db_vnfr
.get("_id")})
5570 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5571 return "COMPLETED", "Done"
5573 step
= "Terminate VNF Failed with"
5575 "{} Cannot terminate the last VNF in this NS.".format(
5579 except (LcmException
, asyncio
.CancelledError
):
5581 except Exception as e
:
5582 self
.logger
.debug("Error removing VNF {}".format(e
))
5583 return "FAILED", "Error removing VNF {}".format(e
)
5585 async def _ns_redeploy_vnf(
5593 """This method updates and redeploys VNF instances
5596 nsr_id: NS instance id
5597 nslcmop_id: nslcmop id
5598 db_vnfd: VNF descriptor
5599 db_vnfr: VNF instance record
5600 db_nsr: NS instance record
5603 result: (str, str) COMPLETED/FAILED, details
5607 stage
= ["", "", ""]
5608 logging_text
= "Task ns={} update ".format(nsr_id
)
5609 latest_vnfd_revision
= db_vnfd
["_admin"].get("revision")
5610 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5612 # Terminate old VNF resources
5613 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5614 await self
.terminate_vdus(
5623 # old_vnfd_id = db_vnfr["vnfd-id"]
5624 # new_db_vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
5625 new_db_vnfd
= db_vnfd
5626 # new_vnfd_ref = new_db_vnfd["id"]
5627 # new_vnfd_id = vnfd_id
5631 for cp
in new_db_vnfd
.get("ext-cpd", ()):
5633 "name": cp
.get("id"),
5634 "connection-point-id": cp
.get("int-cpd", {}).get("cpd"),
5635 "connection-point-vdu-id": cp
.get("int-cpd", {}).get("vdu-id"),
5638 new_vnfr_cp
.append(vnf_cp
)
5639 new_vdur
= update_db_nslcmops
["operationParams"]["newVdur"]
5640 # new_vdur = self._create_vdur_descriptor_from_vnfd(db_nsd, db_vnfd, old_db_vnfd, vnfd_id, db_nsr, member_vnf_index)
5641 # new_vnfr_update = {"vnfd-ref": new_vnfd_ref, "vnfd-id": new_vnfd_id, "connection-point": new_vnfr_cp, "vdur": new_vdur, "ip-address": ""}
5643 "revision": latest_vnfd_revision
,
5644 "connection-point": new_vnfr_cp
,
5648 self
.update_db_2("vnfrs", db_vnfr
["_id"], new_vnfr_update
)
5649 updated_db_vnfr
= self
.db
.get_one(
5651 {"member-vnf-index-ref": member_vnf_index
, "nsr-id-ref": nsr_id
},
5654 # Instantiate new VNF resources
5655 # update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5656 vca_scaling_info
= []
5657 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5658 scaling_info
["scaling_direction"] = "OUT"
5659 scaling_info
["vdu-create"] = {}
5660 scaling_info
["kdu-create"] = {}
5661 vdud_instantiate_list
= db_vnfd
["vdu"]
5662 for index
, vdud
in enumerate(vdud_instantiate_list
):
5663 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(vdud
, db_vnfd
)
5665 additional_params
= (
5666 self
._get
_vdu
_additional
_params
(updated_db_vnfr
, vdud
["id"])
5669 cloud_init_list
= []
5671 # TODO Information of its own ip is not available because db_vnfr is not updated.
5672 additional_params
["OSM"] = get_osm_params(
5673 updated_db_vnfr
, vdud
["id"], 1
5675 cloud_init_list
.append(
5676 self
._parse
_cloud
_init
(
5683 vca_scaling_info
.append(
5685 "osm_vdu_id": vdud
["id"],
5686 "member-vnf-index": member_vnf_index
,
5688 "vdu_index": count_index
,
5691 scaling_info
["vdu-create"][vdud
["id"]] = count_index
5692 if self
.ro_config
.ng
:
5694 "New Resources to be deployed: {}".format(scaling_info
)
5696 await self
._scale
_ng
_ro
(
5704 return "COMPLETED", "Done"
5705 except (LcmException
, asyncio
.CancelledError
):
5707 except Exception as e
:
5708 self
.logger
.debug("Error updating VNF {}".format(e
))
5709 return "FAILED", "Error updating VNF {}".format(e
)
5711 async def _ns_charm_upgrade(
5717 timeout
: float = None,
5719 """This method upgrade charms in VNF instances
5722 ee_id: Execution environment id
5723 path: Local path to the charm
5725 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
5726 timeout: (Float) Timeout for the ns update operation
5729 result: (str, str) COMPLETED/FAILED, details
5732 charm_type
= charm_type
or "lxc_proxy_charm"
5733 output
= await self
.vca_map
[charm_type
].upgrade_charm(
5737 charm_type
=charm_type
,
5738 timeout
=timeout
or self
.timeout
.ns_update
,
5742 return "COMPLETED", output
5744 except (LcmException
, asyncio
.CancelledError
):
5747 except Exception as e
:
5749 self
.logger
.debug("Error upgrading charm {}".format(path
))
5751 return "FAILED", "Error upgrading charm {}: {}".format(path
, e
)
5753 async def update(self
, nsr_id
, nslcmop_id
):
5754 """Update NS according to different update types
5756 This method performs upgrade of VNF instances then updates the revision
5757 number in VNF record
5760 nsr_id: Network service will be updated
5761 nslcmop_id: ns lcm operation id
5764 It may raise DbException, LcmException, N2VCException, K8sException
5767 # Try to lock HA task here
5768 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5769 if not task_is_locked_by_me
:
5772 logging_text
= "Task ns={} update={} ".format(nsr_id
, nslcmop_id
)
5773 self
.logger
.debug(logging_text
+ "Enter")
5775 # Set the required variables to be filled up later
5777 db_nslcmop_update
= {}
5779 nslcmop_operation_state
= None
5781 error_description_nslcmop
= ""
5783 change_type
= "updated"
5784 detailed_status
= ""
5787 # wait for any previous tasks in process
5788 step
= "Waiting for previous operations to terminate"
5789 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5790 self
._write
_ns
_status
(
5793 current_operation
="UPDATING",
5794 current_operation_id
=nslcmop_id
,
5797 step
= "Getting nslcmop from database"
5798 db_nslcmop
= self
.db
.get_one(
5799 "nslcmops", {"_id": nslcmop_id
}, fail_on_empty
=False
5801 update_type
= db_nslcmop
["operationParams"]["updateType"]
5803 step
= "Getting nsr from database"
5804 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5805 old_operational_status
= db_nsr
["operational-status"]
5806 db_nsr_update
["operational-status"] = "updating"
5807 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5808 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5810 if update_type
== "CHANGE_VNFPKG":
5812 # Get the input parameters given through update request
5813 vnf_instance_id
= db_nslcmop
["operationParams"][
5814 "changeVnfPackageData"
5815 ].get("vnfInstanceId")
5817 vnfd_id
= db_nslcmop
["operationParams"]["changeVnfPackageData"].get(
5820 timeout_seconds
= db_nslcmop
["operationParams"].get("timeout_ns_update")
5822 step
= "Getting vnfr from database"
5823 db_vnfr
= self
.db
.get_one(
5824 "vnfrs", {"_id": vnf_instance_id
}, fail_on_empty
=False
5827 step
= "Getting vnfds from database"
5829 latest_vnfd
= self
.db
.get_one(
5830 "vnfds", {"_id": vnfd_id
}, fail_on_empty
=False
5832 latest_vnfd_revision
= latest_vnfd
["_admin"].get("revision")
5835 current_vnf_revision
= db_vnfr
.get("revision", 1)
5836 current_vnfd
= self
.db
.get_one(
5838 {"_id": vnfd_id
+ ":" + str(current_vnf_revision
)},
5839 fail_on_empty
=False,
5841 # Charm artifact paths will be filled up later
5843 current_charm_artifact_path
,
5844 target_charm_artifact_path
,
5845 charm_artifact_paths
,
5847 ) = ([], [], [], [])
5849 step
= "Checking if revision has changed in VNFD"
5850 if current_vnf_revision
!= latest_vnfd_revision
:
5852 change_type
= "policy_updated"
5854 # There is new revision of VNFD, update operation is required
5855 current_vnfd_path
= vnfd_id
+ ":" + str(current_vnf_revision
)
5856 latest_vnfd_path
= vnfd_id
+ ":" + str(latest_vnfd_revision
)
5858 step
= "Removing the VNFD packages if they exist in the local path"
5859 shutil
.rmtree(self
.fs
.path
+ current_vnfd_path
, ignore_errors
=True)
5860 shutil
.rmtree(self
.fs
.path
+ latest_vnfd_path
, ignore_errors
=True)
5862 step
= "Get the VNFD packages from FSMongo"
5863 self
.fs
.sync(from_path
=latest_vnfd_path
)
5864 self
.fs
.sync(from_path
=current_vnfd_path
)
5867 "Get the charm-type, charm-id, ee-id if there is deployed VCA"
5869 current_base_folder
= current_vnfd
["_admin"]["storage"]
5870 latest_base_folder
= latest_vnfd
["_admin"]["storage"]
5872 for vca_index
, vca_deployed
in enumerate(
5873 get_iterable(nsr_deployed
, "VCA")
5875 vnf_index
= db_vnfr
.get("member-vnf-index-ref")
5877 # Getting charm-id and charm-type
5878 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5879 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5880 vca_type
= vca_deployed
.get("type")
5881 vdu_count_index
= vca_deployed
.get("vdu_count_index")
5884 ee_id
= vca_deployed
.get("ee_id")
5886 step
= "Getting descriptor config"
5887 if current_vnfd
.get("kdu"):
5889 search_key
= "kdu_name"
5891 search_key
= "vnfd_id"
5893 entity_id
= vca_deployed
.get(search_key
)
5895 descriptor_config
= get_configuration(
5896 current_vnfd
, entity_id
5899 if "execution-environment-list" in descriptor_config
:
5900 ee_list
= descriptor_config
.get(
5901 "execution-environment-list", []
5906 # There could be several charm used in the same VNF
5907 for ee_item
in ee_list
:
5908 if ee_item
.get("juju"):
5910 step
= "Getting charm name"
5911 charm_name
= ee_item
["juju"].get("charm")
5913 step
= "Setting Charm artifact paths"
5914 current_charm_artifact_path
.append(
5915 get_charm_artifact_path(
5916 current_base_folder
,
5919 current_vnf_revision
,
5922 target_charm_artifact_path
.append(
5923 get_charm_artifact_path(
5927 latest_vnfd_revision
,
5930 elif ee_item
.get("helm-chart"):
5931 # add chart to list and all parameters
5932 step
= "Getting helm chart name"
5933 chart_name
= ee_item
.get("helm-chart")
5935 ee_item
.get("helm-version")
5936 and ee_item
.get("helm-version") == "v2"
5940 vca_type
= "helm-v3"
5941 step
= "Setting Helm chart artifact paths"
5943 helm_artifacts
.append(
5945 "current_artifact_path": get_charm_artifact_path(
5946 current_base_folder
,
5949 current_vnf_revision
,
5951 "target_artifact_path": get_charm_artifact_path(
5955 latest_vnfd_revision
,
5958 "vca_index": vca_index
,
5959 "vdu_index": vdu_count_index
,
5963 charm_artifact_paths
= zip(
5964 current_charm_artifact_path
, target_charm_artifact_path
5967 step
= "Checking if software version has changed in VNFD"
5968 if find_software_version(current_vnfd
) != find_software_version(
5972 step
= "Checking if existing VNF has charm"
5973 for current_charm_path
, target_charm_path
in list(
5974 charm_artifact_paths
5976 if current_charm_path
:
5978 "Software version change is not supported as VNF instance {} has charm.".format(
5983 # There is no change in the charm package, then redeploy the VNF
5984 # based on new descriptor
5985 step
= "Redeploying VNF"
5986 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5987 (result
, detailed_status
) = await self
._ns
_redeploy
_vnf
(
5988 nsr_id
, nslcmop_id
, latest_vnfd
, db_vnfr
, db_nsr
5990 if result
== "FAILED":
5991 nslcmop_operation_state
= result
5992 error_description_nslcmop
= detailed_status
5993 db_nslcmop_update
["detailed-status"] = detailed_status
5996 + " step {} Done with result {} {}".format(
5997 step
, nslcmop_operation_state
, detailed_status
6002 step
= "Checking if any charm package has changed or not"
6003 for current_charm_path
, target_charm_path
in list(
6004 charm_artifact_paths
6008 and target_charm_path
6009 and self
.check_charm_hash_changed(
6010 current_charm_path
, target_charm_path
6014 step
= "Checking whether VNF uses juju bundle"
6015 if check_juju_bundle_existence(current_vnfd
):
6018 "Charm upgrade is not supported for the instance which"
6019 " uses juju-bundle: {}".format(
6020 check_juju_bundle_existence(current_vnfd
)
6024 step
= "Upgrading Charm"
6028 ) = await self
._ns
_charm
_upgrade
(
6031 charm_type
=vca_type
,
6032 path
=self
.fs
.path
+ target_charm_path
,
6033 timeout
=timeout_seconds
,
6036 if result
== "FAILED":
6037 nslcmop_operation_state
= result
6038 error_description_nslcmop
= detailed_status
6040 db_nslcmop_update
["detailed-status"] = detailed_status
6043 + " step {} Done with result {} {}".format(
6044 step
, nslcmop_operation_state
, detailed_status
6048 step
= "Updating policies"
6049 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
6050 result
= "COMPLETED"
6051 detailed_status
= "Done"
6052 db_nslcmop_update
["detailed-status"] = "Done"
6055 for item
in helm_artifacts
:
6057 item
["current_artifact_path"]
6058 and item
["target_artifact_path"]
6059 and self
.check_charm_hash_changed(
6060 item
["current_artifact_path"],
6061 item
["target_artifact_path"],
6065 db_update_entry
= "_admin.deployed.VCA.{}.".format(
6068 vnfr_id
= db_vnfr
["_id"]
6069 osm_config
= {"osm": {"ns_id": nsr_id
, "vnf_id": vnfr_id
}}
6071 "collection": "nsrs",
6072 "filter": {"_id": nsr_id
},
6073 "path": db_update_entry
,
6075 vca_type
, namespace
, helm_id
= get_ee_id_parts(item
["ee_id"])
6076 await self
.vca_map
[vca_type
].upgrade_execution_environment(
6077 namespace
=namespace
,
6081 artifact_path
=item
["target_artifact_path"],
6084 vnf_id
= db_vnfr
.get("vnfd-ref")
6085 config_descriptor
= get_configuration(latest_vnfd
, vnf_id
)
6086 self
.logger
.debug("get ssh key block")
6090 ("config-access", "ssh-access", "required"),
6092 # Needed to inject a ssh key
6095 ("config-access", "ssh-access", "default-user"),
6098 "Install configuration Software, getting public ssh key"
6100 pub_key
= await self
.vca_map
[
6102 ].get_ee_ssh_public__key(
6103 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
6107 "Insert public key into VM user={} ssh_key={}".format(
6111 self
.logger
.debug(logging_text
+ step
)
6113 # wait for RO (ip-address) Insert pub_key into VM
6114 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
6124 initial_config_primitive_list
= config_descriptor
.get(
6125 "initial-config-primitive"
6127 config_primitive
= next(
6130 for p
in initial_config_primitive_list
6131 if p
["name"] == "config"
6135 if not config_primitive
:
6138 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
6140 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
6141 if db_vnfr
.get("additionalParamsForVnf"):
6142 deploy_params
.update(
6144 db_vnfr
["additionalParamsForVnf"].copy()
6147 primitive_params_
= self
._map
_primitive
_params
(
6148 config_primitive
, {}, deploy_params
6151 step
= "execute primitive '{}' params '{}'".format(
6152 config_primitive
["name"], primitive_params_
6154 self
.logger
.debug(logging_text
+ step
)
6155 await self
.vca_map
[vca_type
].exec_primitive(
6157 primitive_name
=config_primitive
["name"],
6158 params_dict
=primitive_params_
,
6164 step
= "Updating policies"
6165 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
6166 detailed_status
= "Done"
6167 db_nslcmop_update
["detailed-status"] = "Done"
6169 # If nslcmop_operation_state is None, so any operation is not failed.
6170 if not nslcmop_operation_state
:
6171 nslcmop_operation_state
= "COMPLETED"
6173 # If update CHANGE_VNFPKG nslcmop_operation is successful
6174 # vnf revision need to be updated
6175 vnfr_update
["revision"] = latest_vnfd_revision
6176 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
6180 + " task Done with result {} {}".format(
6181 nslcmop_operation_state
, detailed_status
6184 elif update_type
== "REMOVE_VNF":
6185 # This part is included in https://osm.etsi.org/gerrit/11876
6186 vnf_instance_id
= db_nslcmop
["operationParams"]["removeVnfInstanceId"]
6187 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
6188 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
6189 step
= "Removing VNF"
6190 (result
, detailed_status
) = await self
.remove_vnf(
6191 nsr_id
, nslcmop_id
, vnf_instance_id
6193 if result
== "FAILED":
6194 nslcmop_operation_state
= result
6195 error_description_nslcmop
= detailed_status
6196 db_nslcmop_update
["detailed-status"] = detailed_status
6197 change_type
= "vnf_terminated"
6198 if not nslcmop_operation_state
:
6199 nslcmop_operation_state
= "COMPLETED"
6202 + " task Done with result {} {}".format(
6203 nslcmop_operation_state
, detailed_status
6207 elif update_type
== "OPERATE_VNF":
6208 vnf_id
= db_nslcmop
["operationParams"]["operateVnfData"][
6211 operation_type
= db_nslcmop
["operationParams"]["operateVnfData"][
6214 additional_param
= db_nslcmop
["operationParams"]["operateVnfData"][
6217 (result
, detailed_status
) = await self
.rebuild_start_stop(
6218 nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
6220 if result
== "FAILED":
6221 nslcmop_operation_state
= result
6222 error_description_nslcmop
= detailed_status
6223 db_nslcmop_update
["detailed-status"] = detailed_status
6224 if not nslcmop_operation_state
:
6225 nslcmop_operation_state
= "COMPLETED"
6228 + " task Done with result {} {}".format(
6229 nslcmop_operation_state
, detailed_status
6233 # If nslcmop_operation_state is None, so any operation is not failed.
6234 # All operations are executed in overall.
6235 if not nslcmop_operation_state
:
6236 nslcmop_operation_state
= "COMPLETED"
6237 db_nsr_update
["operational-status"] = old_operational_status
6239 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
6240 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
6242 except asyncio
.CancelledError
:
6244 logging_text
+ "Cancelled Exception while '{}'".format(step
)
6246 exc
= "Operation was cancelled"
6247 except asyncio
.TimeoutError
:
6248 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
6250 except Exception as e
:
6251 exc
= traceback
.format_exc()
6252 self
.logger
.critical(
6253 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6262 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
6263 nslcmop_operation_state
= "FAILED"
6264 db_nsr_update
["operational-status"] = old_operational_status
6266 self
._write
_ns
_status
(
6268 ns_state
=db_nsr
["nsState"],
6269 current_operation
="IDLE",
6270 current_operation_id
=None,
6271 other_update
=db_nsr_update
,
6274 self
._write
_op
_status
(
6277 error_message
=error_description_nslcmop
,
6278 operation_state
=nslcmop_operation_state
,
6279 other_update
=db_nslcmop_update
,
6282 if nslcmop_operation_state
:
6286 "nslcmop_id": nslcmop_id
,
6287 "operationState": nslcmop_operation_state
,
6289 if change_type
in ("vnf_terminated", "policy_updated"):
6290 msg
.update({"vnf_member_index": member_vnf_index
})
6291 await self
.msg
.aiowrite("ns", change_type
, msg
, loop
=self
.loop
)
6292 except Exception as e
:
6294 logging_text
+ "kafka_write notification Exception {}".format(e
)
6296 self
.logger
.debug(logging_text
+ "Exit")
6297 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_update")
6298 return nslcmop_operation_state
, detailed_status
6300 async def scale(self
, nsr_id
, nslcmop_id
):
6301 # Try to lock HA task here
6302 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
6303 if not task_is_locked_by_me
:
6306 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
6307 stage
= ["", "", ""]
6308 tasks_dict_info
= {}
6309 # ^ stage, step, VIM progress
6310 self
.logger
.debug(logging_text
+ "Enter")
6311 # get all needed from database
6313 db_nslcmop_update
= {}
6316 # in case of error, indicates what part of scale was failed to put nsr at error status
6317 scale_process
= None
6318 old_operational_status
= ""
6319 old_config_status
= ""
6322 # wait for any previous tasks in process
6323 step
= "Waiting for previous operations to terminate"
6324 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
6325 self
._write
_ns
_status
(
6328 current_operation
="SCALING",
6329 current_operation_id
=nslcmop_id
,
6332 step
= "Getting nslcmop from database"
6334 step
+ " after having waited for previous tasks to be completed"
6336 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
6338 step
= "Getting nsr from database"
6339 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
6340 old_operational_status
= db_nsr
["operational-status"]
6341 old_config_status
= db_nsr
["config-status"]
6343 step
= "Parsing scaling parameters"
6344 db_nsr_update
["operational-status"] = "scaling"
6345 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6346 nsr_deployed
= db_nsr
["_admin"].get("deployed")
6348 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
6350 ]["member-vnf-index"]
6351 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
6353 ]["scaling-group-descriptor"]
6354 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
6355 # for backward compatibility
6356 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
6357 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
6358 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
6359 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6361 step
= "Getting vnfr from database"
6362 db_vnfr
= self
.db
.get_one(
6363 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
6366 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
6368 step
= "Getting vnfd from database"
6369 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
6371 base_folder
= db_vnfd
["_admin"]["storage"]
6373 step
= "Getting scaling-group-descriptor"
6374 scaling_descriptor
= find_in_list(
6375 get_scaling_aspect(db_vnfd
),
6376 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
6378 if not scaling_descriptor
:
6380 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
6381 "at vnfd:scaling-group-descriptor".format(scaling_group
)
6384 step
= "Sending scale order to VIM"
6385 # TODO check if ns is in a proper status
6387 if not db_nsr
["_admin"].get("scaling-group"):
6392 "_admin.scaling-group": [
6393 {"name": scaling_group
, "nb-scale-op": 0}
6397 admin_scale_index
= 0
6399 for admin_scale_index
, admin_scale_info
in enumerate(
6400 db_nsr
["_admin"]["scaling-group"]
6402 if admin_scale_info
["name"] == scaling_group
:
6403 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
6405 else: # not found, set index one plus last element and add new entry with the name
6406 admin_scale_index
+= 1
6408 "_admin.scaling-group.{}.name".format(admin_scale_index
)
6411 vca_scaling_info
= []
6412 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
6413 if scaling_type
== "SCALE_OUT":
6414 if "aspect-delta-details" not in scaling_descriptor
:
6416 "Aspect delta details not fount in scaling descriptor {}".format(
6417 scaling_descriptor
["name"]
6420 # count if max-instance-count is reached
6421 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6423 scaling_info
["scaling_direction"] = "OUT"
6424 scaling_info
["vdu-create"] = {}
6425 scaling_info
["kdu-create"] = {}
6426 for delta
in deltas
:
6427 for vdu_delta
in delta
.get("vdu-delta", {}):
6428 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
6429 # vdu_index also provides the number of instance of the targeted vdu
6430 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6431 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
6435 additional_params
= (
6436 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
6439 cloud_init_list
= []
6441 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6442 max_instance_count
= 10
6443 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
6444 max_instance_count
= vdu_profile
.get(
6445 "max-number-of-instances", 10
6448 default_instance_num
= get_number_of_instances(
6451 instances_number
= vdu_delta
.get("number-of-instances", 1)
6452 nb_scale_op
+= instances_number
6454 new_instance_count
= nb_scale_op
+ default_instance_num
6455 # Control if new count is over max and vdu count is less than max.
6456 # Then assign new instance count
6457 if new_instance_count
> max_instance_count
> vdu_count
:
6458 instances_number
= new_instance_count
- max_instance_count
6460 instances_number
= instances_number
6462 if new_instance_count
> max_instance_count
:
6464 "reached the limit of {} (max-instance-count) "
6465 "scaling-out operations for the "
6466 "scaling-group-descriptor '{}'".format(
6467 nb_scale_op
, scaling_group
6470 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6472 # TODO Information of its own ip is not available because db_vnfr is not updated.
6473 additional_params
["OSM"] = get_osm_params(
6474 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
6476 cloud_init_list
.append(
6477 self
._parse
_cloud
_init
(
6484 vca_scaling_info
.append(
6486 "osm_vdu_id": vdu_delta
["id"],
6487 "member-vnf-index": vnf_index
,
6489 "vdu_index": vdu_index
+ x
,
6492 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
6493 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6494 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6495 kdu_name
= kdu_profile
["kdu-name"]
6496 resource_name
= kdu_profile
.get("resource-name", "")
6498 # Might have different kdus in the same delta
6499 # Should have list for each kdu
6500 if not scaling_info
["kdu-create"].get(kdu_name
, None):
6501 scaling_info
["kdu-create"][kdu_name
] = []
6503 kdur
= get_kdur(db_vnfr
, kdu_name
)
6504 if kdur
.get("helm-chart"):
6505 k8s_cluster_type
= "helm-chart-v3"
6506 self
.logger
.debug("kdur: {}".format(kdur
))
6508 kdur
.get("helm-version")
6509 and kdur
.get("helm-version") == "v2"
6511 k8s_cluster_type
= "helm-chart"
6512 elif kdur
.get("juju-bundle"):
6513 k8s_cluster_type
= "juju-bundle"
6516 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6517 "juju-bundle. Maybe an old NBI version is running".format(
6518 db_vnfr
["member-vnf-index-ref"], kdu_name
6522 max_instance_count
= 10
6523 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
6524 max_instance_count
= kdu_profile
.get(
6525 "max-number-of-instances", 10
6528 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
6529 deployed_kdu
, _
= get_deployed_kdu(
6530 nsr_deployed
, kdu_name
, vnf_index
6532 if deployed_kdu
is None:
6534 "KDU '{}' for vnf '{}' not deployed".format(
6538 kdu_instance
= deployed_kdu
.get("kdu-instance")
6539 instance_num
= await self
.k8scluster_map
[
6545 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6546 kdu_model
=deployed_kdu
.get("kdu-model"),
6548 kdu_replica_count
= instance_num
+ kdu_delta
.get(
6549 "number-of-instances", 1
6552 # Control if new count is over max and instance_num is less than max.
6553 # Then assign max instance number to kdu replica count
6554 if kdu_replica_count
> max_instance_count
> instance_num
:
6555 kdu_replica_count
= max_instance_count
6556 if kdu_replica_count
> max_instance_count
:
6558 "reached the limit of {} (max-instance-count) "
6559 "scaling-out operations for the "
6560 "scaling-group-descriptor '{}'".format(
6561 instance_num
, scaling_group
6565 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6566 vca_scaling_info
.append(
6568 "osm_kdu_id": kdu_name
,
6569 "member-vnf-index": vnf_index
,
6571 "kdu_index": instance_num
+ x
- 1,
6574 scaling_info
["kdu-create"][kdu_name
].append(
6576 "member-vnf-index": vnf_index
,
6578 "k8s-cluster-type": k8s_cluster_type
,
6579 "resource-name": resource_name
,
6580 "scale": kdu_replica_count
,
6583 elif scaling_type
== "SCALE_IN":
6584 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6586 scaling_info
["scaling_direction"] = "IN"
6587 scaling_info
["vdu-delete"] = {}
6588 scaling_info
["kdu-delete"] = {}
6590 for delta
in deltas
:
6591 for vdu_delta
in delta
.get("vdu-delta", {}):
6592 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6593 min_instance_count
= 0
6594 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6595 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
6596 min_instance_count
= vdu_profile
["min-number-of-instances"]
6598 default_instance_num
= get_number_of_instances(
6599 db_vnfd
, vdu_delta
["id"]
6601 instance_num
= vdu_delta
.get("number-of-instances", 1)
6602 nb_scale_op
-= instance_num
6604 new_instance_count
= nb_scale_op
+ default_instance_num
6606 if new_instance_count
< min_instance_count
< vdu_count
:
6607 instances_number
= min_instance_count
- new_instance_count
6609 instances_number
= instance_num
6611 if new_instance_count
< min_instance_count
:
6613 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6614 "scaling-group-descriptor '{}'".format(
6615 nb_scale_op
, scaling_group
6618 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6619 vca_scaling_info
.append(
6621 "osm_vdu_id": vdu_delta
["id"],
6622 "member-vnf-index": vnf_index
,
6624 "vdu_index": vdu_index
- 1 - x
,
6627 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
6628 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6629 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6630 kdu_name
= kdu_profile
["kdu-name"]
6631 resource_name
= kdu_profile
.get("resource-name", "")
6633 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
6634 scaling_info
["kdu-delete"][kdu_name
] = []
6636 kdur
= get_kdur(db_vnfr
, kdu_name
)
6637 if kdur
.get("helm-chart"):
6638 k8s_cluster_type
= "helm-chart-v3"
6639 self
.logger
.debug("kdur: {}".format(kdur
))
6641 kdur
.get("helm-version")
6642 and kdur
.get("helm-version") == "v2"
6644 k8s_cluster_type
= "helm-chart"
6645 elif kdur
.get("juju-bundle"):
6646 k8s_cluster_type
= "juju-bundle"
6649 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6650 "juju-bundle. Maybe an old NBI version is running".format(
6651 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
6655 min_instance_count
= 0
6656 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
6657 min_instance_count
= kdu_profile
["min-number-of-instances"]
6659 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
6660 deployed_kdu
, _
= get_deployed_kdu(
6661 nsr_deployed
, kdu_name
, vnf_index
6663 if deployed_kdu
is None:
6665 "KDU '{}' for vnf '{}' not deployed".format(
6669 kdu_instance
= deployed_kdu
.get("kdu-instance")
6670 instance_num
= await self
.k8scluster_map
[
6676 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6677 kdu_model
=deployed_kdu
.get("kdu-model"),
6679 kdu_replica_count
= instance_num
- kdu_delta
.get(
6680 "number-of-instances", 1
6683 if kdu_replica_count
< min_instance_count
< instance_num
:
6684 kdu_replica_count
= min_instance_count
6685 if kdu_replica_count
< min_instance_count
:
6687 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6688 "scaling-group-descriptor '{}'".format(
6689 instance_num
, scaling_group
6693 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6694 vca_scaling_info
.append(
6696 "osm_kdu_id": kdu_name
,
6697 "member-vnf-index": vnf_index
,
6699 "kdu_index": instance_num
- x
- 1,
6702 scaling_info
["kdu-delete"][kdu_name
].append(
6704 "member-vnf-index": vnf_index
,
6706 "k8s-cluster-type": k8s_cluster_type
,
6707 "resource-name": resource_name
,
6708 "scale": kdu_replica_count
,
6712 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
6713 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
6714 if scaling_info
["scaling_direction"] == "IN":
6715 for vdur
in reversed(db_vnfr
["vdur"]):
6716 if vdu_delete
.get(vdur
["vdu-id-ref"]):
6717 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
6718 scaling_info
["vdu"].append(
6720 "name": vdur
.get("name") or vdur
.get("vdu-name"),
6721 "vdu_id": vdur
["vdu-id-ref"],
6725 for interface
in vdur
["interfaces"]:
6726 scaling_info
["vdu"][-1]["interface"].append(
6728 "name": interface
["name"],
6729 "ip_address": interface
["ip-address"],
6730 "mac_address": interface
.get("mac-address"),
6733 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
6736 step
= "Executing pre-scale vnf-config-primitive"
6737 if scaling_descriptor
.get("scaling-config-action"):
6738 for scaling_config_action
in scaling_descriptor
[
6739 "scaling-config-action"
6742 scaling_config_action
.get("trigger") == "pre-scale-in"
6743 and scaling_type
== "SCALE_IN"
6745 scaling_config_action
.get("trigger") == "pre-scale-out"
6746 and scaling_type
== "SCALE_OUT"
6748 vnf_config_primitive
= scaling_config_action
[
6749 "vnf-config-primitive-name-ref"
6751 step
= db_nslcmop_update
[
6753 ] = "executing pre-scale scaling-config-action '{}'".format(
6754 vnf_config_primitive
6757 # look for primitive
6758 for config_primitive
in (
6759 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6760 ).get("config-primitive", ()):
6761 if config_primitive
["name"] == vnf_config_primitive
:
6765 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
6766 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
6767 "primitive".format(scaling_group
, vnf_config_primitive
)
6770 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6771 if db_vnfr
.get("additionalParamsForVnf"):
6772 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6774 scale_process
= "VCA"
6775 db_nsr_update
["config-status"] = "configuring pre-scaling"
6776 primitive_params
= self
._map
_primitive
_params
(
6777 config_primitive
, {}, vnfr_params
6780 # Pre-scale retry check: Check if this sub-operation has been executed before
6781 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6784 vnf_config_primitive
,
6788 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6789 # Skip sub-operation
6790 result
= "COMPLETED"
6791 result_detail
= "Done"
6794 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6795 vnf_config_primitive
, result
, result_detail
6799 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6800 # New sub-operation: Get index of this sub-operation
6802 len(db_nslcmop
.get("_admin", {}).get("operations"))
6807 + "vnf_config_primitive={} New sub-operation".format(
6808 vnf_config_primitive
6812 # retry: Get registered params for this existing sub-operation
6813 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6816 vnf_index
= op
.get("member_vnf_index")
6817 vnf_config_primitive
= op
.get("primitive")
6818 primitive_params
= op
.get("primitive_params")
6821 + "vnf_config_primitive={} Sub-operation retry".format(
6822 vnf_config_primitive
6825 # Execute the primitive, either with new (first-time) or registered (reintent) args
6826 ee_descriptor_id
= config_primitive
.get(
6827 "execution-environment-ref"
6829 primitive_name
= config_primitive
.get(
6830 "execution-environment-primitive", vnf_config_primitive
6832 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6833 nsr_deployed
["VCA"],
6834 member_vnf_index
=vnf_index
,
6836 vdu_count_index
=None,
6837 ee_descriptor_id
=ee_descriptor_id
,
6839 result
, result_detail
= await self
._ns
_execute
_primitive
(
6848 + "vnf_config_primitive={} Done with result {} {}".format(
6849 vnf_config_primitive
, result
, result_detail
6852 # Update operationState = COMPLETED | FAILED
6853 self
._update
_suboperation
_status
(
6854 db_nslcmop
, op_index
, result
, result_detail
6857 if result
== "FAILED":
6858 raise LcmException(result_detail
)
6859 db_nsr_update
["config-status"] = old_config_status
6860 scale_process
= None
6864 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
6867 "_admin.scaling-group.{}.time".format(admin_scale_index
)
6870 # SCALE-IN VCA - BEGIN
6871 if vca_scaling_info
:
6872 step
= db_nslcmop_update
[
6874 ] = "Deleting the execution environments"
6875 scale_process
= "VCA"
6876 for vca_info
in vca_scaling_info
:
6877 if vca_info
["type"] == "delete" and not vca_info
.get("osm_kdu_id"):
6878 member_vnf_index
= str(vca_info
["member-vnf-index"])
6880 logging_text
+ "vdu info: {}".format(vca_info
)
6882 if vca_info
.get("osm_vdu_id"):
6883 vdu_id
= vca_info
["osm_vdu_id"]
6884 vdu_index
= int(vca_info
["vdu_index"])
6887 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6888 member_vnf_index
, vdu_id
, vdu_index
6890 stage
[2] = step
= "Scaling in VCA"
6891 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6892 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
6893 config_update
= db_nsr
["configurationStatus"]
6894 for vca_index
, vca
in enumerate(vca_update
):
6896 (vca
or vca
.get("ee_id"))
6897 and vca
["member-vnf-index"] == member_vnf_index
6898 and vca
["vdu_count_index"] == vdu_index
6900 if vca
.get("vdu_id"):
6901 config_descriptor
= get_configuration(
6902 db_vnfd
, vca
.get("vdu_id")
6904 elif vca
.get("kdu_name"):
6905 config_descriptor
= get_configuration(
6906 db_vnfd
, vca
.get("kdu_name")
6909 config_descriptor
= get_configuration(
6910 db_vnfd
, db_vnfd
["id"]
6912 operation_params
= (
6913 db_nslcmop
.get("operationParams") or {}
6915 exec_terminate_primitives
= not operation_params
.get(
6916 "skip_terminate_primitives"
6917 ) and vca
.get("needed_terminate")
6918 task
= asyncio
.ensure_future(
6927 exec_primitives
=exec_terminate_primitives
,
6931 timeout
=self
.timeout
.charm_delete
,
6934 tasks_dict_info
[task
] = "Terminating VCA {}".format(
6937 del vca_update
[vca_index
]
6938 del config_update
[vca_index
]
6939 # wait for pending tasks of terminate primitives
6943 + "Waiting for tasks {}".format(
6944 list(tasks_dict_info
.keys())
6947 error_list
= await self
._wait
_for
_tasks
(
6951 self
.timeout
.charm_delete
, self
.timeout
.ns_terminate
6956 tasks_dict_info
.clear()
6958 raise LcmException("; ".join(error_list
))
6960 db_vca_and_config_update
= {
6961 "_admin.deployed.VCA": vca_update
,
6962 "configurationStatus": config_update
,
6965 "nsrs", db_nsr
["_id"], db_vca_and_config_update
6967 scale_process
= None
6968 # SCALE-IN VCA - END
6971 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
6972 scale_process
= "RO"
6973 if self
.ro_config
.ng
:
6974 await self
._scale
_ng
_ro
(
6975 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
6977 scaling_info
.pop("vdu-create", None)
6978 scaling_info
.pop("vdu-delete", None)
6980 scale_process
= None
6984 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
6985 scale_process
= "KDU"
6986 await self
._scale
_kdu
(
6987 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6989 scaling_info
.pop("kdu-create", None)
6990 scaling_info
.pop("kdu-delete", None)
6992 scale_process
= None
6996 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6998 # SCALE-UP VCA - BEGIN
6999 if vca_scaling_info
:
7000 step
= db_nslcmop_update
[
7002 ] = "Creating new execution environments"
7003 scale_process
= "VCA"
7004 for vca_info
in vca_scaling_info
:
7005 if vca_info
["type"] == "create" and not vca_info
.get("osm_kdu_id"):
7006 member_vnf_index
= str(vca_info
["member-vnf-index"])
7008 logging_text
+ "vdu info: {}".format(vca_info
)
7010 vnfd_id
= db_vnfr
["vnfd-ref"]
7011 if vca_info
.get("osm_vdu_id"):
7012 vdu_index
= int(vca_info
["vdu_index"])
7013 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
7014 if db_vnfr
.get("additionalParamsForVnf"):
7015 deploy_params
.update(
7017 db_vnfr
["additionalParamsForVnf"].copy()
7020 descriptor_config
= get_configuration(
7021 db_vnfd
, db_vnfd
["id"]
7023 if descriptor_config
:
7028 logging_text
=logging_text
7029 + "member_vnf_index={} ".format(member_vnf_index
),
7032 nslcmop_id
=nslcmop_id
,
7038 member_vnf_index
=member_vnf_index
,
7039 vdu_index
=vdu_index
,
7041 deploy_params
=deploy_params
,
7042 descriptor_config
=descriptor_config
,
7043 base_folder
=base_folder
,
7044 task_instantiation_info
=tasks_dict_info
,
7047 vdu_id
= vca_info
["osm_vdu_id"]
7048 vdur
= find_in_list(
7049 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
7051 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
7052 if vdur
.get("additionalParams"):
7053 deploy_params_vdu
= parse_yaml_strings(
7054 vdur
["additionalParams"]
7057 deploy_params_vdu
= deploy_params
7058 deploy_params_vdu
["OSM"] = get_osm_params(
7059 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
7061 if descriptor_config
:
7066 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
7067 member_vnf_index
, vdu_id
, vdu_index
7069 stage
[2] = step
= "Scaling out VCA"
7070 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
7072 logging_text
=logging_text
7073 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
7074 member_vnf_index
, vdu_id
, vdu_index
7078 nslcmop_id
=nslcmop_id
,
7084 member_vnf_index
=member_vnf_index
,
7085 vdu_index
=vdu_index
,
7087 deploy_params
=deploy_params_vdu
,
7088 descriptor_config
=descriptor_config
,
7089 base_folder
=base_folder
,
7090 task_instantiation_info
=tasks_dict_info
,
7093 # SCALE-UP VCA - END
7094 scale_process
= None
7097 # execute primitive service POST-SCALING
7098 step
= "Executing post-scale vnf-config-primitive"
7099 if scaling_descriptor
.get("scaling-config-action"):
7100 for scaling_config_action
in scaling_descriptor
[
7101 "scaling-config-action"
7104 scaling_config_action
.get("trigger") == "post-scale-in"
7105 and scaling_type
== "SCALE_IN"
7107 scaling_config_action
.get("trigger") == "post-scale-out"
7108 and scaling_type
== "SCALE_OUT"
7110 vnf_config_primitive
= scaling_config_action
[
7111 "vnf-config-primitive-name-ref"
7113 step
= db_nslcmop_update
[
7115 ] = "executing post-scale scaling-config-action '{}'".format(
7116 vnf_config_primitive
7119 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
7120 if db_vnfr
.get("additionalParamsForVnf"):
7121 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
7123 # look for primitive
7124 for config_primitive
in (
7125 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
7126 ).get("config-primitive", ()):
7127 if config_primitive
["name"] == vnf_config_primitive
:
7131 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
7132 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
7133 "config-primitive".format(
7134 scaling_group
, vnf_config_primitive
7137 scale_process
= "VCA"
7138 db_nsr_update
["config-status"] = "configuring post-scaling"
7139 primitive_params
= self
._map
_primitive
_params
(
7140 config_primitive
, {}, vnfr_params
7143 # Post-scale retry check: Check if this sub-operation has been executed before
7144 op_index
= self
._check
_or
_add
_scale
_suboperation
(
7147 vnf_config_primitive
,
7151 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
7152 # Skip sub-operation
7153 result
= "COMPLETED"
7154 result_detail
= "Done"
7157 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
7158 vnf_config_primitive
, result
, result_detail
7162 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
7163 # New sub-operation: Get index of this sub-operation
7165 len(db_nslcmop
.get("_admin", {}).get("operations"))
7170 + "vnf_config_primitive={} New sub-operation".format(
7171 vnf_config_primitive
7175 # retry: Get registered params for this existing sub-operation
7176 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
7179 vnf_index
= op
.get("member_vnf_index")
7180 vnf_config_primitive
= op
.get("primitive")
7181 primitive_params
= op
.get("primitive_params")
7184 + "vnf_config_primitive={} Sub-operation retry".format(
7185 vnf_config_primitive
7188 # Execute the primitive, either with new (first-time) or registered (reintent) args
7189 ee_descriptor_id
= config_primitive
.get(
7190 "execution-environment-ref"
7192 primitive_name
= config_primitive
.get(
7193 "execution-environment-primitive", vnf_config_primitive
7195 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
7196 nsr_deployed
["VCA"],
7197 member_vnf_index
=vnf_index
,
7199 vdu_count_index
=None,
7200 ee_descriptor_id
=ee_descriptor_id
,
7202 result
, result_detail
= await self
._ns
_execute
_primitive
(
7211 + "vnf_config_primitive={} Done with result {} {}".format(
7212 vnf_config_primitive
, result
, result_detail
7215 # Update operationState = COMPLETED | FAILED
7216 self
._update
_suboperation
_status
(
7217 db_nslcmop
, op_index
, result
, result_detail
7220 if result
== "FAILED":
7221 raise LcmException(result_detail
)
7222 db_nsr_update
["config-status"] = old_config_status
7223 scale_process
= None
7228 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
7229 db_nsr_update
["operational-status"] = (
7231 if old_operational_status
== "failed"
7232 else old_operational_status
7234 db_nsr_update
["config-status"] = old_config_status
7237 ROclient
.ROClientException
,
7242 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7244 except asyncio
.CancelledError
:
7246 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7248 exc
= "Operation was cancelled"
7249 except Exception as e
:
7250 exc
= traceback
.format_exc()
7251 self
.logger
.critical(
7252 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7256 self
._write
_ns
_status
(
7259 current_operation
="IDLE",
7260 current_operation_id
=None,
7263 stage
[1] = "Waiting for instantiate pending tasks."
7264 self
.logger
.debug(logging_text
+ stage
[1])
7265 exc
= await self
._wait
_for
_tasks
(
7268 self
.timeout
.ns_deploy
,
7276 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7277 nslcmop_operation_state
= "FAILED"
7279 db_nsr_update
["operational-status"] = old_operational_status
7280 db_nsr_update
["config-status"] = old_config_status
7281 db_nsr_update
["detailed-status"] = ""
7283 if "VCA" in scale_process
:
7284 db_nsr_update
["config-status"] = "failed"
7285 if "RO" in scale_process
:
7286 db_nsr_update
["operational-status"] = "failed"
7289 ] = "FAILED scaling nslcmop={} {}: {}".format(
7290 nslcmop_id
, step
, exc
7293 error_description_nslcmop
= None
7294 nslcmop_operation_state
= "COMPLETED"
7295 db_nslcmop_update
["detailed-status"] = "Done"
7297 self
._write
_op
_status
(
7300 error_message
=error_description_nslcmop
,
7301 operation_state
=nslcmop_operation_state
,
7302 other_update
=db_nslcmop_update
,
7305 self
._write
_ns
_status
(
7308 current_operation
="IDLE",
7309 current_operation_id
=None,
7310 other_update
=db_nsr_update
,
7313 if nslcmop_operation_state
:
7317 "nslcmop_id": nslcmop_id
,
7318 "operationState": nslcmop_operation_state
,
7320 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
7321 except Exception as e
:
7323 logging_text
+ "kafka_write notification Exception {}".format(e
)
7325 self
.logger
.debug(logging_text
+ "Exit")
7326 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
7328 async def _scale_kdu(
7329 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
7331 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
7332 for kdu_name
in _scaling_info
:
7333 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
7334 deployed_kdu
, index
= get_deployed_kdu(
7335 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
7337 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
7338 kdu_instance
= deployed_kdu
["kdu-instance"]
7339 kdu_model
= deployed_kdu
.get("kdu-model")
7340 scale
= int(kdu_scaling_info
["scale"])
7341 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
7344 "collection": "nsrs",
7345 "filter": {"_id": nsr_id
},
7346 "path": "_admin.deployed.K8s.{}".format(index
),
7349 step
= "scaling application {}".format(
7350 kdu_scaling_info
["resource-name"]
7352 self
.logger
.debug(logging_text
+ step
)
7354 if kdu_scaling_info
["type"] == "delete":
7355 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7358 and kdu_config
.get("terminate-config-primitive")
7359 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7361 terminate_config_primitive_list
= kdu_config
.get(
7362 "terminate-config-primitive"
7364 terminate_config_primitive_list
.sort(
7365 key
=lambda val
: int(val
["seq"])
7369 terminate_config_primitive
7370 ) in terminate_config_primitive_list
:
7371 primitive_params_
= self
._map
_primitive
_params
(
7372 terminate_config_primitive
, {}, {}
7374 step
= "execute terminate config primitive"
7375 self
.logger
.debug(logging_text
+ step
)
7376 await asyncio
.wait_for(
7377 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7378 cluster_uuid
=cluster_uuid
,
7379 kdu_instance
=kdu_instance
,
7380 primitive_name
=terminate_config_primitive
["name"],
7381 params
=primitive_params_
,
7383 total_timeout
=self
.timeout
.primitive
,
7386 timeout
=self
.timeout
.primitive
7387 * self
.timeout
.primitive_outer_factor
,
7390 await asyncio
.wait_for(
7391 self
.k8scluster_map
[k8s_cluster_type
].scale(
7392 kdu_instance
=kdu_instance
,
7394 resource_name
=kdu_scaling_info
["resource-name"],
7395 total_timeout
=self
.timeout
.scale_on_error
,
7397 cluster_uuid
=cluster_uuid
,
7398 kdu_model
=kdu_model
,
7402 timeout
=self
.timeout
.scale_on_error
7403 * self
.timeout
.scale_on_error_outer_factor
,
7406 if kdu_scaling_info
["type"] == "create":
7407 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7410 and kdu_config
.get("initial-config-primitive")
7411 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7413 initial_config_primitive_list
= kdu_config
.get(
7414 "initial-config-primitive"
7416 initial_config_primitive_list
.sort(
7417 key
=lambda val
: int(val
["seq"])
7420 for initial_config_primitive
in initial_config_primitive_list
:
7421 primitive_params_
= self
._map
_primitive
_params
(
7422 initial_config_primitive
, {}, {}
7424 step
= "execute initial config primitive"
7425 self
.logger
.debug(logging_text
+ step
)
7426 await asyncio
.wait_for(
7427 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7428 cluster_uuid
=cluster_uuid
,
7429 kdu_instance
=kdu_instance
,
7430 primitive_name
=initial_config_primitive
["name"],
7431 params
=primitive_params_
,
7438 async def _scale_ng_ro(
7439 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
7441 nsr_id
= db_nslcmop
["nsInstanceId"]
7442 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7445 # read from db: vnfd's for every vnf
7448 # for each vnf in ns, read vnfd
7449 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
7450 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
7451 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
7452 # if we haven't this vnfd, read it from db
7453 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
7455 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7456 db_vnfds
.append(vnfd
)
7457 n2vc_key
= self
.n2vc
.get_public_key()
7458 n2vc_key_list
= [n2vc_key
]
7461 vdu_scaling_info
.get("vdu-create"),
7462 vdu_scaling_info
.get("vdu-delete"),
7465 # db_vnfr has been updated, update db_vnfrs to use it
7466 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
7467 await self
._instantiate
_ng
_ro
(
7477 start_deploy
=time(),
7478 timeout_ns_deploy
=self
.timeout
.ns_deploy
,
7480 if vdu_scaling_info
.get("vdu-delete"):
7482 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
7485 async def extract_prometheus_scrape_jobs(
7486 self
, ee_id
, artifact_path
, ee_config_descriptor
, vnfr_id
, nsr_id
, target_ip
7488 # look if exist a file called 'prometheus*.j2' and
7489 artifact_content
= self
.fs
.dir_ls(artifact_path
)
7493 for f
in artifact_content
7494 if f
.startswith("prometheus") and f
.endswith(".j2")
7500 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
7504 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
7505 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
7507 vnfr_id
= vnfr_id
.replace("-", "")
7509 "JOB_NAME": vnfr_id
,
7510 "TARGET_IP": target_ip
,
7511 "EXPORTER_POD_IP": host_name
,
7512 "EXPORTER_POD_PORT": host_port
,
7514 job_list
= parse_job(job_data
, variables
)
7515 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
7516 for job
in job_list
:
7518 not isinstance(job
.get("job_name"), str)
7519 or vnfr_id
not in job
["job_name"]
7521 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
7522 job
["nsr_id"] = nsr_id
7523 job
["vnfr_id"] = vnfr_id
7526 async def rebuild_start_stop(
7527 self
, nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
7529 logging_text
= "Task ns={} {}={} ".format(nsr_id
, operation_type
, nslcmop_id
)
7530 self
.logger
.info(logging_text
+ "Enter")
7531 stage
= ["Preparing the environment", ""]
7532 # database nsrs record
7536 # in case of error, indicates what part of scale was failed to put nsr at error status
7537 start_deploy
= time()
7539 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_id
})
7540 vim_account_id
= db_vnfr
.get("vim-account-id")
7541 vim_info_key
= "vim:" + vim_account_id
7542 vdu_id
= additional_param
["vdu_id"]
7543 vdurs
= [item
for item
in db_vnfr
["vdur"] if item
["vdu-id-ref"] == vdu_id
]
7544 vdur
= find_in_list(
7545 vdurs
, lambda vdu
: vdu
["count-index"] == additional_param
["count-index"]
7548 vdu_vim_name
= vdur
["name"]
7549 vim_vm_id
= vdur
["vim_info"][vim_info_key
]["vim_id"]
7550 target_vim
, _
= next(k_v
for k_v
in vdur
["vim_info"].items())
7552 raise LcmException("Target vdu is not found")
7553 self
.logger
.info("vdu_vim_name >> {} ".format(vdu_vim_name
))
7554 # wait for any previous tasks in process
7555 stage
[1] = "Waiting for previous operations to terminate"
7556 self
.logger
.info(stage
[1])
7557 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7559 stage
[1] = "Reading from database."
7560 self
.logger
.info(stage
[1])
7561 self
._write
_ns
_status
(
7564 current_operation
=operation_type
.upper(),
7565 current_operation_id
=nslcmop_id
,
7567 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7570 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
7571 db_nsr_update
["operational-status"] = operation_type
7572 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7576 "vim_vm_id": vim_vm_id
,
7578 "vdu_index": additional_param
["count-index"],
7579 "vdu_id": vdur
["id"],
7580 "target_vim": target_vim
,
7581 "vim_account_id": vim_account_id
,
7584 stage
[1] = "Sending rebuild request to RO... {}".format(desc
)
7585 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7586 self
.logger
.info("ro nsr id: {}".format(nsr_id
))
7587 result_dict
= await self
.RO
.operate(nsr_id
, desc
, operation_type
)
7588 self
.logger
.info("response from RO: {}".format(result_dict
))
7589 action_id
= result_dict
["action_id"]
7590 await self
._wait
_ng
_ro
(
7595 self
.timeout
.operate
,
7597 "start_stop_rebuild",
7599 return "COMPLETED", "Done"
7600 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7601 self
.logger
.error("Exit Exception {}".format(e
))
7603 except asyncio
.CancelledError
:
7604 self
.logger
.error("Cancelled Exception while '{}'".format(stage
))
7605 exc
= "Operation was cancelled"
7606 except Exception as e
:
7607 exc
= traceback
.format_exc()
7608 self
.logger
.critical(
7609 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7611 return "FAILED", "Error in operate VNF {}".format(exc
)
7613 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7615 Get VCA Cloud and VCA Cloud Credentials for the VIM account
7617 :param: vim_account_id: VIM Account ID
7619 :return: (cloud_name, cloud_credential)
7621 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7622 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
7624 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7626 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
7628 :param: vim_account_id: VIM Account ID
7630 :return: (cloud_name, cloud_credential)
7632 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7633 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")
7635 async def migrate(self
, nsr_id
, nslcmop_id
):
7637 Migrate VNFs and VDUs instances in a NS
7639 :param: nsr_id: NS Instance ID
7640 :param: nslcmop_id: nslcmop ID of migrate
7643 # Try to lock HA task here
7644 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7645 if not task_is_locked_by_me
:
7647 logging_text
= "Task ns={} migrate ".format(nsr_id
)
7648 self
.logger
.debug(logging_text
+ "Enter")
7649 # get all needed from database
7651 db_nslcmop_update
= {}
7652 nslcmop_operation_state
= None
7656 # in case of error, indicates what part of scale was failed to put nsr at error status
7657 start_deploy
= time()
7660 # wait for any previous tasks in process
7661 step
= "Waiting for previous operations to terminate"
7662 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7664 self
._write
_ns
_status
(
7667 current_operation
="MIGRATING",
7668 current_operation_id
=nslcmop_id
,
7670 step
= "Getting nslcmop from database"
7672 step
+ " after having waited for previous tasks to be completed"
7674 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7675 migrate_params
= db_nslcmop
.get("operationParams")
7678 target
.update(migrate_params
)
7679 desc
= await self
.RO
.migrate(nsr_id
, target
)
7680 self
.logger
.debug("RO return > {}".format(desc
))
7681 action_id
= desc
["action_id"]
7682 await self
._wait
_ng
_ro
(
7687 self
.timeout
.migrate
,
7688 operation
="migrate",
7690 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7691 self
.logger
.error("Exit Exception {}".format(e
))
7693 except asyncio
.CancelledError
:
7694 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
7695 exc
= "Operation was cancelled"
7696 except Exception as e
:
7697 exc
= traceback
.format_exc()
7698 self
.logger
.critical(
7699 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7702 self
._write
_ns
_status
(
7705 current_operation
="IDLE",
7706 current_operation_id
=None,
7709 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
7710 nslcmop_operation_state
= "FAILED"
7712 nslcmop_operation_state
= "COMPLETED"
7713 db_nslcmop_update
["detailed-status"] = "Done"
7714 db_nsr_update
["detailed-status"] = "Done"
7716 self
._write
_op
_status
(
7720 operation_state
=nslcmop_operation_state
,
7721 other_update
=db_nslcmop_update
,
7723 if nslcmop_operation_state
:
7727 "nslcmop_id": nslcmop_id
,
7728 "operationState": nslcmop_operation_state
,
7730 await self
.msg
.aiowrite("ns", "migrated", msg
, loop
=self
.loop
)
7731 except Exception as e
:
7733 logging_text
+ "kafka_write notification Exception {}".format(e
)
7735 self
.logger
.debug(logging_text
+ "Exit")
7736 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_migrate")
7738 async def heal(self
, nsr_id
, nslcmop_id
):
7742 :param nsr_id: ns instance to heal
7743 :param nslcmop_id: operation to run
7747 # Try to lock HA task here
7748 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7749 if not task_is_locked_by_me
:
7752 logging_text
= "Task ns={} heal={} ".format(nsr_id
, nslcmop_id
)
7753 stage
= ["", "", ""]
7754 tasks_dict_info
= {}
7755 # ^ stage, step, VIM progress
7756 self
.logger
.debug(logging_text
+ "Enter")
7757 # get all needed from database
7759 db_nslcmop_update
= {}
7761 db_vnfrs
= {} # vnf's info indexed by _id
7763 old_operational_status
= ""
7764 old_config_status
= ""
7767 # wait for any previous tasks in process
7768 step
= "Waiting for previous operations to terminate"
7769 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7770 self
._write
_ns
_status
(
7773 current_operation
="HEALING",
7774 current_operation_id
=nslcmop_id
,
7777 step
= "Getting nslcmop from database"
7779 step
+ " after having waited for previous tasks to be completed"
7781 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7783 step
= "Getting nsr from database"
7784 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
7785 old_operational_status
= db_nsr
["operational-status"]
7786 old_config_status
= db_nsr
["config-status"]
7789 "_admin.deployed.RO.operational-status": "healing",
7791 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7793 step
= "Sending heal order to VIM"
7795 logging_text
=logging_text
,
7797 db_nslcmop
=db_nslcmop
,
7802 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
7803 self
.logger
.debug(logging_text
+ stage
[1])
7804 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7805 self
.fs
.sync(db_nsr
["nsd-id"])
7807 # read from db: vnfr's of this ns
7808 step
= "Getting vnfrs from db"
7809 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
7810 for vnfr
in db_vnfrs_list
:
7811 db_vnfrs
[vnfr
["_id"]] = vnfr
7812 self
.logger
.debug("ns.heal db_vnfrs={}".format(db_vnfrs
))
7814 # Check for each target VNF
7815 target_list
= db_nslcmop
.get("operationParams", {}).get("healVnfData", {})
7816 for target_vnf
in target_list
:
7817 # Find this VNF in the list from DB
7818 vnfr_id
= target_vnf
.get("vnfInstanceId", None)
7820 db_vnfr
= db_vnfrs
[vnfr_id
]
7821 vnfd_id
= db_vnfr
.get("vnfd-id")
7822 vnfd_ref
= db_vnfr
.get("vnfd-ref")
7823 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7824 base_folder
= vnfd
["_admin"]["storage"]
7829 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
7830 member_vnf_index
= db_vnfr
.get("member-vnf-index-ref")
7832 # Check each target VDU and deploy N2VC
7833 target_vdu_list
= target_vnf
.get("additionalParams", {}).get(
7836 if not target_vdu_list
:
7837 # Codigo nuevo para crear diccionario
7838 target_vdu_list
= []
7839 for existing_vdu
in db_vnfr
.get("vdur"):
7840 vdu_name
= existing_vdu
.get("vdu-name", None)
7841 vdu_index
= existing_vdu
.get("count-index", 0)
7842 vdu_run_day1
= target_vnf
.get("additionalParams", {}).get(
7845 vdu_to_be_healed
= {
7847 "count-index": vdu_index
,
7848 "run-day1": vdu_run_day1
,
7850 target_vdu_list
.append(vdu_to_be_healed
)
7851 for target_vdu
in target_vdu_list
:
7852 deploy_params_vdu
= target_vdu
7853 # Set run-day1 vnf level value if not vdu level value exists
7854 if not deploy_params_vdu
.get("run-day1") and target_vnf
[
7857 deploy_params_vdu
["run-day1"] = target_vnf
[
7860 vdu_name
= target_vdu
.get("vdu-id", None)
7861 # TODO: Get vdu_id from vdud.
7863 # For multi instance VDU count-index is mandatory
7864 # For single session VDU count-indes is 0
7865 vdu_index
= target_vdu
.get("count-index", 0)
7867 # n2vc_redesign STEP 3 to 6 Deploy N2VC
7868 stage
[1] = "Deploying Execution Environments."
7869 self
.logger
.debug(logging_text
+ stage
[1])
7871 # VNF Level charm. Normal case when proxy charms.
7872 # If target instance is management machine continue with actions: recreate EE for native charms or reinject juju key for proxy charms.
7873 descriptor_config
= get_configuration(vnfd
, vnfd_ref
)
7874 if descriptor_config
:
7875 # Continue if healed machine is management machine
7876 vnf_ip_address
= db_vnfr
.get("ip-address")
7877 target_instance
= None
7878 for instance
in db_vnfr
.get("vdur", None):
7880 instance
["vdu-name"] == vdu_name
7881 and instance
["count-index"] == vdu_index
7883 target_instance
= instance
7885 if vnf_ip_address
== target_instance
.get("ip-address"):
7887 logging_text
=logging_text
7888 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7889 member_vnf_index
, vdu_name
, vdu_index
7893 nslcmop_id
=nslcmop_id
,
7899 member_vnf_index
=member_vnf_index
,
7902 deploy_params
=deploy_params_vdu
,
7903 descriptor_config
=descriptor_config
,
7904 base_folder
=base_folder
,
7905 task_instantiation_info
=tasks_dict_info
,
7909 # VDU Level charm. Normal case with native charms.
7910 descriptor_config
= get_configuration(vnfd
, vdu_name
)
7911 if descriptor_config
:
7913 logging_text
=logging_text
7914 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7915 member_vnf_index
, vdu_name
, vdu_index
7919 nslcmop_id
=nslcmop_id
,
7925 member_vnf_index
=member_vnf_index
,
7926 vdu_index
=vdu_index
,
7928 deploy_params
=deploy_params_vdu
,
7929 descriptor_config
=descriptor_config
,
7930 base_folder
=base_folder
,
7931 task_instantiation_info
=tasks_dict_info
,
7936 ROclient
.ROClientException
,
7941 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7943 except asyncio
.CancelledError
:
7945 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7947 exc
= "Operation was cancelled"
7948 except Exception as e
:
7949 exc
= traceback
.format_exc()
7950 self
.logger
.critical(
7951 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7956 stage
[1] = "Waiting for healing pending tasks."
7957 self
.logger
.debug(logging_text
+ stage
[1])
7958 exc
= await self
._wait
_for
_tasks
(
7961 self
.timeout
.ns_deploy
,
7969 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7970 nslcmop_operation_state
= "FAILED"
7972 db_nsr_update
["operational-status"] = old_operational_status
7973 db_nsr_update
["config-status"] = old_config_status
7976 ] = "FAILED healing nslcmop={} {}: {}".format(nslcmop_id
, step
, exc
)
7977 for task
, task_name
in tasks_dict_info
.items():
7978 if not task
.done() or task
.cancelled() or task
.exception():
7979 if task_name
.startswith(self
.task_name_deploy_vca
):
7980 # A N2VC task is pending
7981 db_nsr_update
["config-status"] = "failed"
7983 # RO task is pending
7984 db_nsr_update
["operational-status"] = "failed"
7986 error_description_nslcmop
= None
7987 nslcmop_operation_state
= "COMPLETED"
7988 db_nslcmop_update
["detailed-status"] = "Done"
7989 db_nsr_update
["detailed-status"] = "Done"
7990 db_nsr_update
["operational-status"] = "running"
7991 db_nsr_update
["config-status"] = "configured"
7993 self
._write
_op
_status
(
7996 error_message
=error_description_nslcmop
,
7997 operation_state
=nslcmop_operation_state
,
7998 other_update
=db_nslcmop_update
,
8001 self
._write
_ns
_status
(
8004 current_operation
="IDLE",
8005 current_operation_id
=None,
8006 other_update
=db_nsr_update
,
8009 if nslcmop_operation_state
:
8013 "nslcmop_id": nslcmop_id
,
8014 "operationState": nslcmop_operation_state
,
8016 await self
.msg
.aiowrite("ns", "healed", msg
, loop
=self
.loop
)
8017 except Exception as e
:
8019 logging_text
+ "kafka_write notification Exception {}".format(e
)
8021 self
.logger
.debug(logging_text
+ "Exit")
8022 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_heal")
8033 :param logging_text: preffix text to use at logging
8034 :param nsr_id: nsr identity
8035 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
8036 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
8037 :return: None or exception
8040 def get_vim_account(vim_account_id
):
8042 if vim_account_id
in db_vims
:
8043 return db_vims
[vim_account_id
]
8044 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
8045 db_vims
[vim_account_id
] = db_vim
8050 ns_params
= db_nslcmop
.get("operationParams")
8051 if ns_params
and ns_params
.get("timeout_ns_heal"):
8052 timeout_ns_heal
= ns_params
["timeout_ns_heal"]
8054 timeout_ns_heal
= self
.timeout
.ns_heal
8058 nslcmop_id
= db_nslcmop
["_id"]
8060 "action_id": nslcmop_id
,
8062 self
.logger
.warning(
8063 "db_nslcmop={} and timeout_ns_heal={}".format(
8064 db_nslcmop
, timeout_ns_heal
8067 target
.update(db_nslcmop
.get("operationParams", {}))
8069 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
8070 desc
= await self
.RO
.recreate(nsr_id
, target
)
8071 self
.logger
.debug("RO return > {}".format(desc
))
8072 action_id
= desc
["action_id"]
8073 # waits for RO to complete because Reinjecting juju key at ro can find VM in state Deleted
8074 await self
._wait
_ng
_ro
(
8081 operation
="healing",
8086 "_admin.deployed.RO.operational-status": "running",
8087 "detailed-status": " ".join(stage
),
8089 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
8090 self
._write
_op
_status
(nslcmop_id
, stage
)
8092 logging_text
+ "ns healed at RO. RO_id={}".format(action_id
)
8095 except Exception as e
:
8096 stage
[2] = "ERROR healing at VIM"
8097 # self.set_vnfr_at_error(db_vnfrs, str(e))
8099 "Error healing at VIM {}".format(e
),
8100 exc_info
=not isinstance(
8103 ROclient
.ROClientException
,
8129 task_instantiation_info
,
8132 # launch instantiate_N2VC in a asyncio task and register task object
8133 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
8134 # if not found, create one entry and update database
8135 # fill db_nsr._admin.deployed.VCA.<index>
8138 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
8142 get_charm_name
= False
8143 if "execution-environment-list" in descriptor_config
:
8144 ee_list
= descriptor_config
.get("execution-environment-list", [])
8145 elif "juju" in descriptor_config
:
8146 ee_list
= [descriptor_config
] # ns charms
8147 if "execution-environment-list" not in descriptor_config
:
8148 # charm name is only required for ns charms
8149 get_charm_name
= True
8150 else: # other types as script are not supported
8153 for ee_item
in ee_list
:
8156 + "_deploy_n2vc ee_item juju={}, helm={}".format(
8157 ee_item
.get("juju"), ee_item
.get("helm-chart")
8160 ee_descriptor_id
= ee_item
.get("id")
8161 if ee_item
.get("juju"):
8162 vca_name
= ee_item
["juju"].get("charm")
8164 charm_name
= self
.find_charm_name(db_nsr
, str(vca_name
))
8167 if ee_item
["juju"].get("charm") is not None
8170 if ee_item
["juju"].get("cloud") == "k8s":
8171 vca_type
= "k8s_proxy_charm"
8172 elif ee_item
["juju"].get("proxy") is False:
8173 vca_type
= "native_charm"
8174 elif ee_item
.get("helm-chart"):
8175 vca_name
= ee_item
["helm-chart"]
8176 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
8179 vca_type
= "helm-v3"
8182 logging_text
+ "skipping non juju neither charm configuration"
8187 for vca_index
, vca_deployed
in enumerate(
8188 db_nsr
["_admin"]["deployed"]["VCA"]
8190 if not vca_deployed
:
8193 vca_deployed
.get("member-vnf-index") == member_vnf_index
8194 and vca_deployed
.get("vdu_id") == vdu_id
8195 and vca_deployed
.get("kdu_name") == kdu_name
8196 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
8197 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
8201 # not found, create one.
8203 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
8206 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
8208 target
+= "/kdu/{}".format(kdu_name
)
8210 "target_element": target
,
8211 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
8212 "member-vnf-index": member_vnf_index
,
8214 "kdu_name": kdu_name
,
8215 "vdu_count_index": vdu_index
,
8216 "operational-status": "init", # TODO revise
8217 "detailed-status": "", # TODO revise
8218 "step": "initial-deploy", # TODO revise
8220 "vdu_name": vdu_name
,
8222 "ee_descriptor_id": ee_descriptor_id
,
8223 "charm_name": charm_name
,
8227 # create VCA and configurationStatus in db
8229 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
8230 "configurationStatus.{}".format(vca_index
): dict(),
8232 self
.update_db_2("nsrs", nsr_id
, db_dict
)
8234 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
8236 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
8237 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
8238 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
8241 task_n2vc
= asyncio
.ensure_future(
8243 logging_text
=logging_text
,
8244 vca_index
=vca_index
,
8250 vdu_index
=vdu_index
,
8251 deploy_params
=deploy_params
,
8252 config_descriptor
=descriptor_config
,
8253 base_folder
=base_folder
,
8254 nslcmop_id
=nslcmop_id
,
8258 ee_config_descriptor
=ee_item
,
8261 self
.lcm_tasks
.register(
8265 "instantiate_N2VC-{}".format(vca_index
),
8268 task_instantiation_info
[
8270 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
8271 member_vnf_index
or "", vdu_id
or ""
8274 async def heal_N2VC(
8291 ee_config_descriptor
,
8293 nsr_id
= db_nsr
["_id"]
8294 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
8295 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
8296 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
8297 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
8299 "collection": "nsrs",
8300 "filter": {"_id": nsr_id
},
8301 "path": db_update_entry
,
8307 element_under_configuration
= nsr_id
8311 vnfr_id
= db_vnfr
["_id"]
8312 osm_config
["osm"]["vnf_id"] = vnfr_id
8314 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
8316 if vca_type
== "native_charm":
8319 index_number
= vdu_index
or 0
8322 element_type
= "VNF"
8323 element_under_configuration
= vnfr_id
8324 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
8326 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
8327 element_type
= "VDU"
8328 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
8329 osm_config
["osm"]["vdu_id"] = vdu_id
8331 namespace
+= ".{}".format(kdu_name
)
8332 element_type
= "KDU"
8333 element_under_configuration
= kdu_name
8334 osm_config
["osm"]["kdu_name"] = kdu_name
8337 if base_folder
["pkg-dir"]:
8338 artifact_path
= "{}/{}/{}/{}".format(
8339 base_folder
["folder"],
8340 base_folder
["pkg-dir"],
8343 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8348 artifact_path
= "{}/Scripts/{}/{}/".format(
8349 base_folder
["folder"],
8352 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8357 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
8359 # get initial_config_primitive_list that applies to this element
8360 initial_config_primitive_list
= config_descriptor
.get(
8361 "initial-config-primitive"
8365 "Initial config primitive list > {}".format(
8366 initial_config_primitive_list
8370 # add config if not present for NS charm
8371 ee_descriptor_id
= ee_config_descriptor
.get("id")
8372 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
8373 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
8374 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
8378 "Initial config primitive list #2 > {}".format(
8379 initial_config_primitive_list
8382 # n2vc_redesign STEP 3.1
8383 # find old ee_id if exists
8384 ee_id
= vca_deployed
.get("ee_id")
8386 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
8387 # create or register execution environment in VCA. Only for native charms when healing
8388 if vca_type
== "native_charm":
8389 step
= "Waiting to VM being up and getting IP address"
8390 self
.logger
.debug(logging_text
+ step
)
8391 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8400 credentials
= {"hostname": rw_mgmt_ip
}
8402 username
= deep_get(
8403 config_descriptor
, ("config-access", "ssh-access", "default-user")
8405 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
8406 # merged. Meanwhile let's get username from initial-config-primitive
8407 if not username
and initial_config_primitive_list
:
8408 for config_primitive
in initial_config_primitive_list
:
8409 for param
in config_primitive
.get("parameter", ()):
8410 if param
["name"] == "ssh-username":
8411 username
= param
["value"]
8415 "Cannot determine the username neither with 'initial-config-primitive' nor with "
8416 "'config-access.ssh-access.default-user'"
8418 credentials
["username"] = username
8420 # n2vc_redesign STEP 3.2
8421 # TODO: Before healing at RO it is needed to destroy native charm units to be deleted.
8422 self
._write
_configuration
_status
(
8424 vca_index
=vca_index
,
8425 status
="REGISTERING",
8426 element_under_configuration
=element_under_configuration
,
8427 element_type
=element_type
,
8430 step
= "register execution environment {}".format(credentials
)
8431 self
.logger
.debug(logging_text
+ step
)
8432 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
8433 credentials
=credentials
,
8434 namespace
=namespace
,
8439 # update ee_id en db
8441 "_admin.deployed.VCA.{}.ee_id".format(vca_index
): ee_id
,
8443 self
.update_db_2("nsrs", nsr_id
, db_dict_ee_id
)
8445 # for compatibility with MON/POL modules, the need model and application name at database
8446 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
8447 # Not sure if this need to be done when healing
8449 ee_id_parts = ee_id.split(".")
8450 db_nsr_update = {db_update_entry + "ee_id": ee_id}
8451 if len(ee_id_parts) >= 2:
8452 model_name = ee_id_parts[0]
8453 application_name = ee_id_parts[1]
8454 db_nsr_update[db_update_entry + "model"] = model_name
8455 db_nsr_update[db_update_entry + "application"] = application_name
8458 # n2vc_redesign STEP 3.3
8459 # Install configuration software. Only for native charms.
8460 step
= "Install configuration Software"
8462 self
._write
_configuration
_status
(
8464 vca_index
=vca_index
,
8465 status
="INSTALLING SW",
8466 element_under_configuration
=element_under_configuration
,
8467 element_type
=element_type
,
8468 # other_update=db_nsr_update,
8472 # TODO check if already done
8473 self
.logger
.debug(logging_text
+ step
)
8475 if vca_type
== "native_charm":
8476 config_primitive
= next(
8477 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
8480 if config_primitive
:
8481 config
= self
._map
_primitive
_params
(
8482 config_primitive
, {}, deploy_params
8484 await self
.vca_map
[vca_type
].install_configuration_sw(
8486 artifact_path
=artifact_path
,
8494 # write in db flag of configuration_sw already installed
8496 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
8499 # Not sure if this need to be done when healing
8501 # add relations for this VCA (wait for other peers related with this VCA)
8502 await self._add_vca_relations(
8503 logging_text=logging_text,
8506 vca_index=vca_index,
8510 # if SSH access is required, then get execution environment SSH public
8511 # if native charm we have waited already to VM be UP
8512 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
8515 # self.logger.debug("get ssh key block")
8517 config_descriptor
, ("config-access", "ssh-access", "required")
8519 # self.logger.debug("ssh key needed")
8520 # Needed to inject a ssh key
8523 ("config-access", "ssh-access", "default-user"),
8525 step
= "Install configuration Software, getting public ssh key"
8526 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
8527 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
8530 step
= "Insert public key into VM user={} ssh_key={}".format(
8534 # self.logger.debug("no need to get ssh key")
8535 step
= "Waiting to VM being up and getting IP address"
8536 self
.logger
.debug(logging_text
+ step
)
8538 # n2vc_redesign STEP 5.1
8539 # wait for RO (ip-address) Insert pub_key into VM
8540 # IMPORTANT: We need do wait for RO to complete healing operation.
8541 await self
._wait
_heal
_ro
(nsr_id
, self
.timeout
.ns_heal
)
8544 rw_mgmt_ip
= await self
.wait_kdu_up(
8545 logging_text
, nsr_id
, vnfr_id
, kdu_name
8548 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8558 rw_mgmt_ip
= None # This is for a NS configuration
8560 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
8562 # store rw_mgmt_ip in deploy params for later replacement
8563 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
8566 # get run-day1 operation parameter
8567 runDay1
= deploy_params
.get("run-day1", False)
8569 "Healing vnf={}, vdu={}, runDay1 ={}".format(vnfr_id
, vdu_id
, runDay1
)
8572 # n2vc_redesign STEP 6 Execute initial config primitive
8573 step
= "execute initial config primitive"
8575 # wait for dependent primitives execution (NS -> VNF -> VDU)
8576 if initial_config_primitive_list
:
8577 await self
._wait
_dependent
_n
2vc
(
8578 nsr_id
, vca_deployed_list
, vca_index
8581 # stage, in function of element type: vdu, kdu, vnf or ns
8582 my_vca
= vca_deployed_list
[vca_index
]
8583 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
8585 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
8586 elif my_vca
.get("member-vnf-index"):
8588 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
8591 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
8593 self
._write
_configuration
_status
(
8594 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
8597 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
8599 check_if_terminated_needed
= True
8600 for initial_config_primitive
in initial_config_primitive_list
:
8601 # adding information on the vca_deployed if it is a NS execution environment
8602 if not vca_deployed
["member-vnf-index"]:
8603 deploy_params
["ns_config_info"] = json
.dumps(
8604 self
._get
_ns
_config
_info
(nsr_id
)
8606 # TODO check if already done
8607 primitive_params_
= self
._map
_primitive
_params
(
8608 initial_config_primitive
, {}, deploy_params
8611 step
= "execute primitive '{}' params '{}'".format(
8612 initial_config_primitive
["name"], primitive_params_
8614 self
.logger
.debug(logging_text
+ step
)
8615 await self
.vca_map
[vca_type
].exec_primitive(
8617 primitive_name
=initial_config_primitive
["name"],
8618 params_dict
=primitive_params_
,
8623 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
8624 if check_if_terminated_needed
:
8625 if config_descriptor
.get("terminate-config-primitive"):
8629 {db_update_entry
+ "needed_terminate": True},
8631 check_if_terminated_needed
= False
8633 # TODO register in database that primitive is done
8635 # STEP 7 Configure metrics
8636 # Not sure if this need to be done when healing
8638 if vca_type == "helm" or vca_type == "helm-v3":
8639 prometheus_jobs = await self.extract_prometheus_scrape_jobs(
8641 artifact_path=artifact_path,
8642 ee_config_descriptor=ee_config_descriptor,
8645 target_ip=rw_mgmt_ip,
8651 {db_update_entry + "prometheus_jobs": prometheus_jobs},
8654 for job in prometheus_jobs:
8657 {"job_name": job["job_name"]},
8660 fail_on_empty=False,
8664 step
= "instantiated at VCA"
8665 self
.logger
.debug(logging_text
+ step
)
8667 self
._write
_configuration
_status
(
8668 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
8671 except Exception as e
: # TODO not use Exception but N2VC exception
8672 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
8674 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
8677 "Exception while {} : {}".format(step
, e
), exc_info
=True
8679 self
._write
_configuration
_status
(
8680 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
8682 raise LcmException("{} {}".format(step
, e
)) from e
8684 async def _wait_heal_ro(
8690 while time() <= start_time
+ timeout
:
8691 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
8692 operational_status_ro
= db_nsr
["_admin"]["deployed"]["RO"][
8693 "operational-status"
8695 self
.logger
.debug("Wait Heal RO > {}".format(operational_status_ro
))
8696 if operational_status_ro
!= "healing":
8698 await asyncio
.sleep(15, loop
=self
.loop
)
8699 else: # timeout_ns_deploy
8700 raise NgRoException("Timeout waiting ns to deploy")
8702 async def vertical_scale(self
, nsr_id
, nslcmop_id
):
8704 Vertical Scale the VDUs in a NS
8706 :param: nsr_id: NS Instance ID
8707 :param: nslcmop_id: nslcmop ID of migrate
8710 # Try to lock HA task here
8711 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
8712 if not task_is_locked_by_me
:
8714 logging_text
= "Task ns={} vertical scale ".format(nsr_id
)
8715 self
.logger
.debug(logging_text
+ "Enter")
8716 # get all needed from database
8718 db_nslcmop_update
= {}
8719 nslcmop_operation_state
= None
8723 # in case of error, indicates what part of scale was failed to put nsr at error status
8724 start_deploy
= time()
8727 # wait for any previous tasks in process
8728 step
= "Waiting for previous operations to terminate"
8729 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
8731 self
._write
_ns
_status
(
8734 current_operation
="VerticalScale",
8735 current_operation_id
=nslcmop_id
,
8737 step
= "Getting nslcmop from database"
8739 step
+ " after having waited for previous tasks to be completed"
8741 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
8742 operationParams
= db_nslcmop
.get("operationParams")
8744 target
.update(operationParams
)
8745 desc
= await self
.RO
.vertical_scale(nsr_id
, target
)
8746 self
.logger
.debug("RO return > {}".format(desc
))
8747 action_id
= desc
["action_id"]
8748 await self
._wait
_ng
_ro
(
8753 self
.timeout
.verticalscale
,
8754 operation
="verticalscale",
8756 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
8757 self
.logger
.error("Exit Exception {}".format(e
))
8759 except asyncio
.CancelledError
:
8760 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
8761 exc
= "Operation was cancelled"
8762 except Exception as e
:
8763 exc
= traceback
.format_exc()
8764 self
.logger
.critical(
8765 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
8768 self
._write
_ns
_status
(
8771 current_operation
="IDLE",
8772 current_operation_id
=None,
8775 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
8776 nslcmop_operation_state
= "FAILED"
8778 nslcmop_operation_state
= "COMPLETED"
8779 db_nslcmop_update
["detailed-status"] = "Done"
8780 db_nsr_update
["detailed-status"] = "Done"
8782 self
._write
_op
_status
(
8786 operation_state
=nslcmop_operation_state
,
8787 other_update
=db_nslcmop_update
,
8789 if nslcmop_operation_state
:
8793 "nslcmop_id": nslcmop_id
,
8794 "operationState": nslcmop_operation_state
,
8796 await self
.msg
.aiowrite("ns", "verticalscaled", msg
, loop
=self
.loop
)
8797 except Exception as e
:
8799 logging_text
+ "kafka_write notification Exception {}".format(e
)
8801 self
.logger
.debug(logging_text
+ "Exit")
8802 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_verticalscale")