1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 from typing
import Any
, Dict
, List
24 import logging
.handlers
36 from osm_lcm
import ROclient
37 from osm_lcm
.data_utils
.lcm_config
import LcmCfg
38 from osm_lcm
.data_utils
.nsr
import (
41 get_deployed_vca_list
,
44 from osm_lcm
.data_utils
.vca
import (
53 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
54 from osm_lcm
.lcm_utils
import (
61 check_juju_bundle_existence
,
62 get_charm_artifact_path
,
66 from osm_lcm
.data_utils
.nsd
import (
67 get_ns_configuration_relation_list
,
71 from osm_lcm
.data_utils
.vnfd
import (
77 get_ee_sorted_initial_config_primitive_list
,
78 get_ee_sorted_terminate_config_primitive_list
,
80 get_virtual_link_profiles
,
85 get_number_of_instances
,
87 get_kdu_resource_profile
,
88 find_software_version
,
91 from osm_lcm
.data_utils
.list_utils
import find_in_list
92 from osm_lcm
.data_utils
.vnfr
import (
96 get_volumes_from_instantiation_params
,
98 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
99 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
100 from n2vc
.definitions
import RelationEndpoint
101 from n2vc
.k8s_helm_conn
import K8sHelmConnector
102 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
103 from n2vc
.k8s_juju_conn
import K8sJujuConnector
105 from osm_common
.dbbase
import DbException
106 from osm_common
.fsbase
import FsException
108 from osm_lcm
.data_utils
.database
.database
import Database
109 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
110 from osm_lcm
.data_utils
.wim
import (
112 get_target_wim_attrs
,
113 select_feasible_wim_account
,
116 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
117 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
119 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
120 from osm_lcm
.osm_config
import OsmConfigBuilder
121 from osm_lcm
.prometheus
import parse_job
123 from copy
import copy
, deepcopy
124 from time
import time
125 from uuid
import uuid4
127 from random
import randint
129 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
132 class NsLcm(LcmBase
):
133 SUBOPERATION_STATUS_NOT_FOUND
= -1
134 SUBOPERATION_STATUS_NEW
= -2
135 SUBOPERATION_STATUS_SKIP
= -3
136 task_name_deploy_vca
= "Deploying VCA"
138 def __init__(self
, msg
, lcm_tasks
, config
: LcmCfg
, loop
):
140 Init, Connect to database, filesystem storage, and messaging
141 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
144 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
146 self
.db
= Database().instance
.db
147 self
.fs
= Filesystem().instance
.fs
149 self
.lcm_tasks
= lcm_tasks
150 self
.timeout
= config
.timeout
151 self
.ro_config
= config
.RO
152 self
.vca_config
= config
.VCA
154 # create N2VC connector
155 self
.n2vc
= N2VCJujuConnector(
158 on_update_db
=self
._on
_update
_n
2vc
_db
,
163 self
.conn_helm_ee
= LCMHelmConn(
166 vca_config
=self
.vca_config
,
167 on_update_db
=self
._on
_update
_n
2vc
_db
,
170 self
.k8sclusterhelm2
= K8sHelmConnector(
171 kubectl_command
=self
.vca_config
.kubectlpath
,
172 helm_command
=self
.vca_config
.helmpath
,
179 self
.k8sclusterhelm3
= K8sHelm3Connector(
180 kubectl_command
=self
.vca_config
.kubectlpath
,
181 helm_command
=self
.vca_config
.helm3path
,
188 self
.k8sclusterjuju
= K8sJujuConnector(
189 kubectl_command
=self
.vca_config
.kubectlpath
,
190 juju_command
=self
.vca_config
.jujupath
,
193 on_update_db
=self
._on
_update
_k
8s
_db
,
198 self
.k8scluster_map
= {
199 "helm-chart": self
.k8sclusterhelm2
,
200 "helm-chart-v3": self
.k8sclusterhelm3
,
201 "chart": self
.k8sclusterhelm3
,
202 "juju-bundle": self
.k8sclusterjuju
,
203 "juju": self
.k8sclusterjuju
,
207 "lxc_proxy_charm": self
.n2vc
,
208 "native_charm": self
.n2vc
,
209 "k8s_proxy_charm": self
.n2vc
,
210 "helm": self
.conn_helm_ee
,
211 "helm-v3": self
.conn_helm_ee
,
215 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
.to_dict())
217 self
.op_status_map
= {
218 "instantiation": self
.RO
.status
,
219 "termination": self
.RO
.status
,
220 "migrate": self
.RO
.status
,
221 "healing": self
.RO
.recreate_status
,
222 "verticalscale": self
.RO
.status
,
223 "start_stop_rebuild": self
.RO
.status
,
227 def increment_ip_mac(ip_mac
, vm_index
=1):
228 if not isinstance(ip_mac
, str):
231 # try with ipv4 look for last dot
232 i
= ip_mac
.rfind(".")
235 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
236 # try with ipv6 or mac look for last colon. Operate in hex
237 i
= ip_mac
.rfind(":")
240 # format in hex, len can be 2 for mac or 4 for ipv6
241 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
242 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
248 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
249 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
252 # TODO filter RO descriptor fields...
256 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
257 db_dict
["deploymentStatus"] = ro_descriptor
258 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
260 except Exception as e
:
262 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id
, e
)
265 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
266 # remove last dot from path (if exists)
267 if path
.endswith("."):
270 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
271 # .format(table, filter, path, updated_data))
273 nsr_id
= filter.get("_id")
275 # read ns record from database
276 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
277 current_ns_status
= nsr
.get("nsState")
279 # get vca status for NS
280 status_dict
= await self
.n2vc
.get_status(
281 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
286 db_dict
["vcaStatus"] = status_dict
288 # update configurationStatus for this VCA
290 vca_index
= int(path
[path
.rfind(".") + 1 :])
293 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
295 vca_status
= vca_list
[vca_index
].get("status")
297 configuration_status_list
= nsr
.get("configurationStatus")
298 config_status
= configuration_status_list
[vca_index
].get("status")
300 if config_status
== "BROKEN" and vca_status
!= "failed":
301 db_dict
["configurationStatus"][vca_index
] = "READY"
302 elif config_status
!= "BROKEN" and vca_status
== "failed":
303 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
304 except Exception as e
:
305 # not update configurationStatus
306 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
308 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
309 # if nsState = 'DEGRADED' check if all is OK
311 if current_ns_status
in ("READY", "DEGRADED"):
312 error_description
= ""
314 if status_dict
.get("machines"):
315 for machine_id
in status_dict
.get("machines"):
316 machine
= status_dict
.get("machines").get(machine_id
)
317 # check machine agent-status
318 if machine
.get("agent-status"):
319 s
= machine
.get("agent-status").get("status")
322 error_description
+= (
323 "machine {} agent-status={} ; ".format(
327 # check machine instance status
328 if machine
.get("instance-status"):
329 s
= machine
.get("instance-status").get("status")
332 error_description
+= (
333 "machine {} instance-status={} ; ".format(
338 if status_dict
.get("applications"):
339 for app_id
in status_dict
.get("applications"):
340 app
= status_dict
.get("applications").get(app_id
)
341 # check application status
342 if app
.get("status"):
343 s
= app
.get("status").get("status")
346 error_description
+= (
347 "application {} status={} ; ".format(app_id
, s
)
350 if error_description
:
351 db_dict
["errorDescription"] = error_description
352 if current_ns_status
== "READY" and is_degraded
:
353 db_dict
["nsState"] = "DEGRADED"
354 if current_ns_status
== "DEGRADED" and not is_degraded
:
355 db_dict
["nsState"] = "READY"
358 self
.update_db_2("nsrs", nsr_id
, db_dict
)
360 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
362 except Exception as e
:
363 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
365 async def _on_update_k8s_db(
366 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None, cluster_type
="juju"
369 Updating vca status in NSR record
370 :param cluster_uuid: UUID of a k8s cluster
371 :param kdu_instance: The unique name of the KDU instance
372 :param filter: To get nsr_id
373 :cluster_type: The cluster type (juju, k8s)
377 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
378 # .format(cluster_uuid, kdu_instance, filter))
380 nsr_id
= filter.get("_id")
382 vca_status
= await self
.k8scluster_map
[cluster_type
].status_kdu(
383 cluster_uuid
=cluster_uuid
,
384 kdu_instance
=kdu_instance
,
386 complete_status
=True,
392 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
395 f
"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
399 self
.update_db_2("nsrs", nsr_id
, db_dict
)
400 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
402 except Exception as e
:
403 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
406 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
409 undefined
=StrictUndefined
,
410 autoescape
=select_autoescape(default_for_string
=True, default
=True),
412 template
= env
.from_string(cloud_init_text
)
413 return template
.render(additional_params
or {})
414 except UndefinedError
as e
:
416 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
417 "file, must be provided in the instantiation parameters inside the "
418 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
420 except (TemplateError
, TemplateNotFound
) as e
:
422 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
427 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
428 cloud_init_content
= cloud_init_file
= None
430 if vdu
.get("cloud-init-file"):
431 base_folder
= vnfd
["_admin"]["storage"]
432 if base_folder
["pkg-dir"]:
433 cloud_init_file
= "{}/{}/cloud_init/{}".format(
434 base_folder
["folder"],
435 base_folder
["pkg-dir"],
436 vdu
["cloud-init-file"],
439 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
440 base_folder
["folder"],
441 vdu
["cloud-init-file"],
443 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
444 cloud_init_content
= ci_file
.read()
445 elif vdu
.get("cloud-init"):
446 cloud_init_content
= vdu
["cloud-init"]
448 return cloud_init_content
449 except FsException
as e
:
451 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
452 vnfd
["id"], vdu
["id"], cloud_init_file
, e
456 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
458 (vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]), {}
460 additional_params
= vdur
.get("additionalParams")
461 return parse_yaml_strings(additional_params
)
463 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
465 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
466 :param vnfd: input vnfd
467 :param new_id: overrides vnf id if provided
468 :param additionalParams: Instantiation params for VNFs provided
469 :param nsrId: Id of the NSR
470 :return: copy of vnfd
472 vnfd_RO
= deepcopy(vnfd
)
473 # remove unused by RO configuration, monitoring, scaling and internal keys
474 vnfd_RO
.pop("_id", None)
475 vnfd_RO
.pop("_admin", None)
476 vnfd_RO
.pop("monitoring-param", None)
477 vnfd_RO
.pop("scaling-group-descriptor", None)
478 vnfd_RO
.pop("kdu", None)
479 vnfd_RO
.pop("k8s-cluster", None)
481 vnfd_RO
["id"] = new_id
483 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
484 for vdu
in get_iterable(vnfd_RO
, "vdu"):
485 vdu
.pop("cloud-init-file", None)
486 vdu
.pop("cloud-init", None)
490 def ip_profile_2_RO(ip_profile
):
491 RO_ip_profile
= deepcopy(ip_profile
)
492 if "dns-server" in RO_ip_profile
:
493 if isinstance(RO_ip_profile
["dns-server"], list):
494 RO_ip_profile
["dns-address"] = []
495 for ds
in RO_ip_profile
.pop("dns-server"):
496 RO_ip_profile
["dns-address"].append(ds
["address"])
498 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
499 if RO_ip_profile
.get("ip-version") == "ipv4":
500 RO_ip_profile
["ip-version"] = "IPv4"
501 if RO_ip_profile
.get("ip-version") == "ipv6":
502 RO_ip_profile
["ip-version"] = "IPv6"
503 if "dhcp-params" in RO_ip_profile
:
504 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
507 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
508 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
509 if db_vim
["_admin"]["operationalState"] != "ENABLED":
511 "VIM={} is not available. operationalState={}".format(
512 vim_account
, db_vim
["_admin"]["operationalState"]
515 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
518 def get_ro_wim_id_for_wim_account(self
, wim_account
):
519 if isinstance(wim_account
, str):
520 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
521 if db_wim
["_admin"]["operationalState"] != "ENABLED":
523 "WIM={} is not available. operationalState={}".format(
524 wim_account
, db_wim
["_admin"]["operationalState"]
527 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
532 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
533 db_vdu_push_list
= []
535 db_update
= {"_admin.modified": time()}
537 for vdu_id
, vdu_count
in vdu_create
.items():
541 for vdur
in reversed(db_vnfr
["vdur"])
542 if vdur
["vdu-id-ref"] == vdu_id
547 # Read the template saved in the db:
549 "No vdur in the database. Using the vdur-template to scale"
551 vdur_template
= db_vnfr
.get("vdur-template")
552 if not vdur_template
:
554 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
558 vdur
= vdur_template
[0]
559 # Delete a template from the database after using it
562 {"_id": db_vnfr
["_id"]},
564 pull
={"vdur-template": {"_id": vdur
["_id"]}},
566 for count
in range(vdu_count
):
567 vdur_copy
= deepcopy(vdur
)
568 vdur_copy
["status"] = "BUILD"
569 vdur_copy
["status-detailed"] = None
570 vdur_copy
["ip-address"] = None
571 vdur_copy
["_id"] = str(uuid4())
572 vdur_copy
["count-index"] += count
+ 1
573 vdur_copy
["id"] = "{}-{}".format(
574 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
576 vdur_copy
.pop("vim_info", None)
577 for iface
in vdur_copy
["interfaces"]:
578 if iface
.get("fixed-ip"):
579 iface
["ip-address"] = self
.increment_ip_mac(
580 iface
["ip-address"], count
+ 1
583 iface
.pop("ip-address", None)
584 if iface
.get("fixed-mac"):
585 iface
["mac-address"] = self
.increment_ip_mac(
586 iface
["mac-address"], count
+ 1
589 iface
.pop("mac-address", None)
593 ) # only first vdu can be managment of vnf
594 db_vdu_push_list
.append(vdur_copy
)
595 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
597 if len(db_vnfr
["vdur"]) == 1:
598 # The scale will move to 0 instances
600 "Scaling to 0 !, creating the template with the last vdur"
602 template_vdur
= [db_vnfr
["vdur"][0]]
603 for vdu_id
, vdu_count
in vdu_delete
.items():
605 indexes_to_delete
= [
607 for iv
in enumerate(db_vnfr
["vdur"])
608 if iv
[1]["vdu-id-ref"] == vdu_id
612 "vdur.{}.status".format(i
): "DELETING"
613 for i
in indexes_to_delete
[-vdu_count
:]
617 # it must be deleted one by one because common.db does not allow otherwise
620 for v
in reversed(db_vnfr
["vdur"])
621 if v
["vdu-id-ref"] == vdu_id
623 for vdu
in vdus_to_delete
[:vdu_count
]:
626 {"_id": db_vnfr
["_id"]},
628 pull
={"vdur": {"_id": vdu
["_id"]}},
632 db_push
["vdur"] = db_vdu_push_list
634 db_push
["vdur-template"] = template_vdur
637 db_vnfr
["vdur-template"] = template_vdur
638 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
639 # modify passed dictionary db_vnfr
640 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
641 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
643 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
645 Updates database nsr with the RO info for the created vld
646 :param ns_update_nsr: dictionary to be filled with the updated info
647 :param db_nsr: content of db_nsr. This is also modified
648 :param nsr_desc_RO: nsr descriptor from RO
649 :return: Nothing, LcmException is raised on errors
652 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
653 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
654 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
656 vld
["vim-id"] = net_RO
.get("vim_net_id")
657 vld
["name"] = net_RO
.get("vim_name")
658 vld
["status"] = net_RO
.get("status")
659 vld
["status-detailed"] = net_RO
.get("error_msg")
660 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
664 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
667 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
669 for db_vnfr
in db_vnfrs
.values():
670 vnfr_update
= {"status": "ERROR"}
671 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
672 if "status" not in vdur
:
673 vdur
["status"] = "ERROR"
674 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
676 vdur
["status-detailed"] = str(error_text
)
678 "vdur.{}.status-detailed".format(vdu_index
)
680 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
681 except DbException
as e
:
682 self
.logger
.error("Cannot update vnf. {}".format(e
))
684 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
686 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
687 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
688 :param nsr_desc_RO: nsr descriptor from RO
689 :return: Nothing, LcmException is raised on errors
691 for vnf_index
, db_vnfr
in db_vnfrs
.items():
692 for vnf_RO
in nsr_desc_RO
["vnfs"]:
693 if vnf_RO
["member_vnf_index"] != vnf_index
:
696 if vnf_RO
.get("ip_address"):
697 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
[
700 elif not db_vnfr
.get("ip-address"):
701 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
702 raise LcmExceptionNoMgmtIP(
703 "ns member_vnf_index '{}' has no IP address".format(
708 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
709 vdur_RO_count_index
= 0
710 if vdur
.get("pdu-type"):
712 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
713 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
715 if vdur
["count-index"] != vdur_RO_count_index
:
716 vdur_RO_count_index
+= 1
718 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
719 if vdur_RO
.get("ip_address"):
720 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
722 vdur
["ip-address"] = None
723 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
724 vdur
["name"] = vdur_RO
.get("vim_name")
725 vdur
["status"] = vdur_RO
.get("status")
726 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
727 for ifacer
in get_iterable(vdur
, "interfaces"):
728 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
729 if ifacer
["name"] == interface_RO
.get("internal_name"):
730 ifacer
["ip-address"] = interface_RO
.get(
733 ifacer
["mac-address"] = interface_RO
.get(
739 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
740 "from VIM info".format(
741 vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]
744 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
748 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
750 vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]
754 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
755 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
756 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
758 vld
["vim-id"] = net_RO
.get("vim_net_id")
759 vld
["name"] = net_RO
.get("vim_name")
760 vld
["status"] = net_RO
.get("status")
761 vld
["status-detailed"] = net_RO
.get("error_msg")
762 vnfr_update
["vld.{}".format(vld_index
)] = vld
766 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
771 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
776 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
781 def _get_ns_config_info(self
, nsr_id
):
783 Generates a mapping between vnf,vdu elements and the N2VC id
784 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
785 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
786 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
787 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
789 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
790 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
792 ns_config_info
= {"osm-config-mapping": mapping
}
793 for vca
in vca_deployed_list
:
794 if not vca
["member-vnf-index"]:
796 if not vca
["vdu_id"]:
797 mapping
[vca
["member-vnf-index"]] = vca
["application"]
801 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
803 ] = vca
["application"]
804 return ns_config_info
806 async def _instantiate_ng_ro(
822 def get_vim_account(vim_account_id
):
824 if vim_account_id
in db_vims
:
825 return db_vims
[vim_account_id
]
826 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
827 db_vims
[vim_account_id
] = db_vim
830 # modify target_vld info with instantiation parameters
831 def parse_vld_instantiation_params(
832 target_vim
, target_vld
, vld_params
, target_sdn
834 if vld_params
.get("ip-profile"):
835 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_to_ro_ip_profile(
836 vld_params
["ip-profile"]
838 if vld_params
.get("provider-network"):
839 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
842 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
843 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
847 # check if WIM is needed; if needed, choose a feasible WIM able to connect VIMs
848 # if wim_account_id is specified in vld_params, validate if it is feasible.
849 wim_account_id
, db_wim
= select_feasible_wim_account(
850 db_nsr
, db_vnfrs
, target_vld
, vld_params
, self
.logger
854 # WIM is needed and a feasible one was found, populate WIM target and SDN ports
855 self
.logger
.info("WIM selected: {:s}".format(str(wim_account_id
)))
856 # update vld_params with correct WIM account Id
857 vld_params
["wimAccountId"] = wim_account_id
859 target_wim
= "wim:{}".format(wim_account_id
)
860 target_wim_attrs
= get_target_wim_attrs(nsr_id
, target_vld
, vld_params
)
861 sdn_ports
= get_sdn_ports(vld_params
, db_wim
)
862 if len(sdn_ports
) > 0:
863 target_vld
["vim_info"][target_wim
] = target_wim_attrs
864 target_vld
["vim_info"][target_wim
]["sdn-ports"] = sdn_ports
867 "Target VLD with WIM data: {:s}".format(str(target_vld
))
870 for param
in ("vim-network-name", "vim-network-id"):
871 if vld_params
.get(param
):
872 if isinstance(vld_params
[param
], dict):
873 for vim
, vim_net
in vld_params
[param
].items():
874 other_target_vim
= "vim:" + vim
876 target_vld
["vim_info"],
877 (other_target_vim
, param
.replace("-", "_")),
880 else: # isinstance str
881 target_vld
["vim_info"][target_vim
][
882 param
.replace("-", "_")
883 ] = vld_params
[param
]
884 if vld_params
.get("common_id"):
885 target_vld
["common_id"] = vld_params
.get("common_id")
887 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
888 def update_ns_vld_target(target
, ns_params
):
889 for vnf_params
in ns_params
.get("vnf", ()):
890 if vnf_params
.get("vimAccountId"):
894 for vnfr
in db_vnfrs
.values()
895 if vnf_params
["member-vnf-index"]
896 == vnfr
["member-vnf-index-ref"]
900 vdur
= next((vdur
for vdur
in target_vnf
.get("vdur", ())), None)
903 for a_index
, a_vld
in enumerate(target
["ns"]["vld"]):
904 target_vld
= find_in_list(
905 get_iterable(vdur
, "interfaces"),
906 lambda iface
: iface
.get("ns-vld-id") == a_vld
["name"],
909 vld_params
= find_in_list(
910 get_iterable(ns_params
, "vld"),
911 lambda v_vld
: v_vld
["name"] in (a_vld
["name"], a_vld
["id"]),
914 if vnf_params
.get("vimAccountId") not in a_vld
.get(
917 target_vim_network_list
= [
918 v
for _
, v
in a_vld
.get("vim_info").items()
920 target_vim_network_name
= next(
922 item
.get("vim_network_name", "")
923 for item
in target_vim_network_list
928 target
["ns"]["vld"][a_index
].get("vim_info").update(
930 "vim:{}".format(vnf_params
["vimAccountId"]): {
931 "vim_network_name": target_vim_network_name
,
937 for param
in ("vim-network-name", "vim-network-id"):
938 if vld_params
.get(param
) and isinstance(
939 vld_params
[param
], dict
941 for vim
, vim_net
in vld_params
[
944 other_target_vim
= "vim:" + vim
946 target
["ns"]["vld"][a_index
].get(
951 param
.replace("-", "_"),
956 nslcmop_id
= db_nslcmop
["_id"]
958 "name": db_nsr
["name"],
961 "image": deepcopy(db_nsr
["image"]),
962 "flavor": deepcopy(db_nsr
["flavor"]),
963 "action_id": nslcmop_id
,
964 "cloud_init_content": {},
966 for image
in target
["image"]:
967 image
["vim_info"] = {}
968 for flavor
in target
["flavor"]:
969 flavor
["vim_info"] = {}
970 if db_nsr
.get("affinity-or-anti-affinity-group"):
971 target
["affinity-or-anti-affinity-group"] = deepcopy(
972 db_nsr
["affinity-or-anti-affinity-group"]
974 for affinity_or_anti_affinity_group
in target
[
975 "affinity-or-anti-affinity-group"
977 affinity_or_anti_affinity_group
["vim_info"] = {}
979 if db_nslcmop
.get("lcmOperationType") != "instantiate":
980 # get parameters of instantiation:
981 db_nslcmop_instantiate
= self
.db
.get_list(
984 "nsInstanceId": db_nslcmop
["nsInstanceId"],
985 "lcmOperationType": "instantiate",
988 ns_params
= db_nslcmop_instantiate
.get("operationParams")
990 ns_params
= db_nslcmop
.get("operationParams")
991 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
992 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
995 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
996 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
1000 "mgmt-network": vld
.get("mgmt-network", False),
1001 "type": vld
.get("type"),
1004 "vim_network_name": vld
.get("vim-network-name"),
1005 "vim_account_id": ns_params
["vimAccountId"],
1009 # check if this network needs SDN assist
1010 if vld
.get("pci-interfaces"):
1011 db_vim
= get_vim_account(ns_params
["vimAccountId"])
1012 if vim_config
:= db_vim
.get("config"):
1013 if sdnc_id
:= vim_config
.get("sdn-controller"):
1014 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
1015 target_sdn
= "sdn:{}".format(sdnc_id
)
1016 target_vld
["vim_info"][target_sdn
] = {
1018 "target_vim": target_vim
,
1020 "type": vld
.get("type"),
1023 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
1024 for nsd_vnf_profile
in nsd_vnf_profiles
:
1025 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
1026 if cp
["virtual-link-profile-id"] == vld
["id"]:
1028 "member_vnf:{}.{}".format(
1029 cp
["constituent-cpd-id"][0][
1030 "constituent-base-element-id"
1032 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
1034 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
1036 # check at nsd descriptor, if there is an ip-profile
1038 nsd_vlp
= find_in_list(
1039 get_virtual_link_profiles(nsd
),
1040 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
1045 and nsd_vlp
.get("virtual-link-protocol-data")
1046 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1048 vld_params
["ip-profile"] = nsd_vlp
["virtual-link-protocol-data"][
1052 # update vld_params with instantiation params
1053 vld_instantiation_params
= find_in_list(
1054 get_iterable(ns_params
, "vld"),
1055 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
1057 if vld_instantiation_params
:
1058 vld_params
.update(vld_instantiation_params
)
1059 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
1060 target
["ns"]["vld"].append(target_vld
)
1061 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1062 update_ns_vld_target(target
, ns_params
)
1064 for vnfr
in db_vnfrs
.values():
1065 vnfd
= find_in_list(
1066 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
1068 vnf_params
= find_in_list(
1069 get_iterable(ns_params
, "vnf"),
1070 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
1072 target_vnf
= deepcopy(vnfr
)
1073 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
1074 for vld
in target_vnf
.get("vld", ()):
1075 # check if connected to a ns.vld, to fill target'
1076 vnf_cp
= find_in_list(
1077 vnfd
.get("int-virtual-link-desc", ()),
1078 lambda cpd
: cpd
.get("id") == vld
["id"],
1081 ns_cp
= "member_vnf:{}.{}".format(
1082 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
1084 if cp2target
.get(ns_cp
):
1085 vld
["target"] = cp2target
[ns_cp
]
1088 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
1090 # check if this network needs SDN assist
1092 if vld
.get("pci-interfaces"):
1093 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1094 sdnc_id
= db_vim
["config"].get("sdn-controller")
1096 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
1097 target_sdn
= "sdn:{}".format(sdnc_id
)
1098 vld
["vim_info"][target_sdn
] = {
1100 "target_vim": target_vim
,
1102 "type": vld
.get("type"),
1105 # check at vnfd descriptor, if there is an ip-profile
1107 vnfd_vlp
= find_in_list(
1108 get_virtual_link_profiles(vnfd
),
1109 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
1113 and vnfd_vlp
.get("virtual-link-protocol-data")
1114 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1116 vld_params
["ip-profile"] = vnfd_vlp
["virtual-link-protocol-data"][
1119 # update vld_params with instantiation params
1121 vld_instantiation_params
= find_in_list(
1122 get_iterable(vnf_params
, "internal-vld"),
1123 lambda i_vld
: i_vld
["name"] == vld
["id"],
1125 if vld_instantiation_params
:
1126 vld_params
.update(vld_instantiation_params
)
1127 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
1130 for vdur
in target_vnf
.get("vdur", ()):
1131 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
1132 continue # This vdu must not be created
1133 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
1135 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
1138 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
1139 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
1142 and vdu_configuration
.get("config-access")
1143 and vdu_configuration
.get("config-access").get("ssh-access")
1145 vdur
["ssh-keys"] = ssh_keys_all
1146 vdur
["ssh-access-required"] = vdu_configuration
[
1148 ]["ssh-access"]["required"]
1151 and vnf_configuration
.get("config-access")
1152 and vnf_configuration
.get("config-access").get("ssh-access")
1153 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
1155 vdur
["ssh-keys"] = ssh_keys_all
1156 vdur
["ssh-access-required"] = vnf_configuration
[
1158 ]["ssh-access"]["required"]
1159 elif ssh_keys_instantiation
and find_in_list(
1160 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
1162 vdur
["ssh-keys"] = ssh_keys_instantiation
1164 self
.logger
.debug("NS > vdur > {}".format(vdur
))
1166 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1168 if vdud
.get("cloud-init-file"):
1169 vdur
["cloud-init"] = "{}:file:{}".format(
1170 vnfd
["_id"], vdud
.get("cloud-init-file")
1172 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1173 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1174 base_folder
= vnfd
["_admin"]["storage"]
1175 if base_folder
["pkg-dir"]:
1176 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1177 base_folder
["folder"],
1178 base_folder
["pkg-dir"],
1179 vdud
.get("cloud-init-file"),
1182 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
1183 base_folder
["folder"],
1184 vdud
.get("cloud-init-file"),
1186 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1187 target
["cloud_init_content"][
1190 elif vdud
.get("cloud-init"):
1191 vdur
["cloud-init"] = "{}:vdu:{}".format(
1192 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1194 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1195 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1198 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1199 deploy_params_vdu
= self
._format
_additional
_params
(
1200 vdur
.get("additionalParams") or {}
1202 deploy_params_vdu
["OSM"] = get_osm_params(
1203 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1205 vdur
["additionalParams"] = deploy_params_vdu
1208 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1209 if target_vim
not in ns_flavor
["vim_info"]:
1210 ns_flavor
["vim_info"][target_vim
] = {}
1213 # in case alternative images are provided we must check if they should be applied
1214 # for the vim_type, modify the vim_type taking into account
1215 ns_image_id
= int(vdur
["ns-image-id"])
1216 if vdur
.get("alt-image-ids"):
1217 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1218 vim_type
= db_vim
["vim_type"]
1219 for alt_image_id
in vdur
.get("alt-image-ids"):
1220 ns_alt_image
= target
["image"][int(alt_image_id
)]
1221 if vim_type
== ns_alt_image
.get("vim-type"):
1222 # must use alternative image
1224 "use alternative image id: {}".format(alt_image_id
)
1226 ns_image_id
= alt_image_id
1227 vdur
["ns-image-id"] = ns_image_id
1229 ns_image
= target
["image"][int(ns_image_id
)]
1230 if target_vim
not in ns_image
["vim_info"]:
1231 ns_image
["vim_info"][target_vim
] = {}
1234 if vdur
.get("affinity-or-anti-affinity-group-id"):
1235 for ags_id
in vdur
["affinity-or-anti-affinity-group-id"]:
1236 ns_ags
= target
["affinity-or-anti-affinity-group"][int(ags_id
)]
1237 if target_vim
not in ns_ags
["vim_info"]:
1238 ns_ags
["vim_info"][target_vim
] = {}
1240 vdur
["vim_info"] = {target_vim
: {}}
1241 # instantiation parameters
1243 vdu_instantiation_params
= find_in_list(
1244 get_iterable(vnf_params
, "vdu"),
1245 lambda i_vdu
: i_vdu
["id"] == vdud
["id"],
1247 if vdu_instantiation_params
:
1248 # Parse the vdu_volumes from the instantiation params
1249 vdu_volumes
= get_volumes_from_instantiation_params(
1250 vdu_instantiation_params
, vdud
1252 vdur
["additionalParams"]["OSM"]["vdu_volumes"] = vdu_volumes
1253 vdur_list
.append(vdur
)
1254 target_vnf
["vdur"] = vdur_list
1255 target
["vnf"].append(target_vnf
)
1257 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
1258 desc
= await self
.RO
.deploy(nsr_id
, target
)
1259 self
.logger
.debug("RO return > {}".format(desc
))
1260 action_id
= desc
["action_id"]
1261 await self
._wait
_ng
_ro
(
1268 operation
="instantiation",
1273 "_admin.deployed.RO.operational-status": "running",
1274 "detailed-status": " ".join(stage
),
1276 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1277 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1278 self
._write
_op
_status
(nslcmop_id
, stage
)
1280 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1284 async def _wait_ng_ro(
1294 detailed_status_old
= None
1296 start_time
= start_time
or time()
1297 while time() <= start_time
+ timeout
:
1298 desc_status
= await self
.op_status_map
[operation
](nsr_id
, action_id
)
1299 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1300 if desc_status
["status"] == "FAILED":
1301 raise NgRoException(desc_status
["details"])
1302 elif desc_status
["status"] == "BUILD":
1304 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1305 elif desc_status
["status"] == "DONE":
1307 stage
[2] = "Deployed at VIM"
1310 assert False, "ROclient.check_ns_status returns unknown {}".format(
1311 desc_status
["status"]
1313 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1314 detailed_status_old
= stage
[2]
1315 db_nsr_update
["detailed-status"] = " ".join(stage
)
1316 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1317 self
._write
_op
_status
(nslcmop_id
, stage
)
1318 await asyncio
.sleep(15, loop
=self
.loop
)
1319 else: # timeout_ns_deploy
1320 raise NgRoException("Timeout waiting ns to deploy")
1322 async def _terminate_ng_ro(
1323 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1328 start_deploy
= time()
1335 "action_id": nslcmop_id
,
1337 desc
= await self
.RO
.deploy(nsr_id
, target
)
1338 action_id
= desc
["action_id"]
1339 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1342 + "ns terminate action at RO. action_id={}".format(action_id
)
1346 delete_timeout
= 20 * 60 # 20 minutes
1347 await self
._wait
_ng
_ro
(
1354 operation
="termination",
1356 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1358 await self
.RO
.delete(nsr_id
)
1359 except NgRoException
as e
:
1360 if e
.http_code
== 404: # not found
1361 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1362 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1364 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1366 elif e
.http_code
== 409: # conflict
1367 failed_detail
.append("delete conflict: {}".format(e
))
1370 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1373 failed_detail
.append("delete error: {}".format(e
))
1376 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1378 except Exception as e
:
1379 failed_detail
.append("delete error: {}".format(e
))
1381 logging_text
+ "RO_action_id={} delete error: {}".format(action_id
, e
)
1385 stage
[2] = "Error deleting from VIM"
1387 stage
[2] = "Deleted from VIM"
1388 db_nsr_update
["detailed-status"] = " ".join(stage
)
1389 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1390 self
._write
_op
_status
(nslcmop_id
, stage
)
1393 raise LcmException("; ".join(failed_detail
))
1396 async def instantiate_RO(
1410 :param logging_text: preffix text to use at logging
1411 :param nsr_id: nsr identity
1412 :param nsd: database content of ns descriptor
1413 :param db_nsr: database content of ns record
1414 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1416 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1417 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1418 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1419 :return: None or exception
1422 start_deploy
= time()
1423 ns_params
= db_nslcmop
.get("operationParams")
1424 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1425 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1427 timeout_ns_deploy
= self
.timeout
.ns_deploy
1429 # Check for and optionally request placement optimization. Database will be updated if placement activated
1430 stage
[2] = "Waiting for Placement."
1431 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1432 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1433 for vnfr
in db_vnfrs
.values():
1434 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1437 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1439 return await self
._instantiate
_ng
_ro
(
1452 except Exception as e
:
1453 stage
[2] = "ERROR deploying at VIM"
1454 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1456 "Error deploying at VIM {}".format(e
),
1457 exc_info
=not isinstance(
1460 ROclient
.ROClientException
,
1469 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1471 Wait for kdu to be up, get ip address
1472 :param logging_text: prefix use for logging
1476 :return: IP address, K8s services
1479 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1482 while nb_tries
< 360:
1483 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1487 for x
in get_iterable(db_vnfr
, "kdur")
1488 if x
.get("kdu-name") == kdu_name
1494 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1496 if kdur
.get("status"):
1497 if kdur
["status"] in ("READY", "ENABLED"):
1498 return kdur
.get("ip-address"), kdur
.get("services")
1501 "target KDU={} is in error state".format(kdu_name
)
1504 await asyncio
.sleep(10, loop
=self
.loop
)
1506 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1508 async def wait_vm_up_insert_key_ro(
1509 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1512 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1513 :param logging_text: prefix use for logging
1518 :param pub_key: public ssh key to inject, None to skip
1519 :param user: user to apply the public ssh key
1523 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1525 target_vdu_id
= None
1530 if ro_retries
>= 360: # 1 hour
1532 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1535 await asyncio
.sleep(10, loop
=self
.loop
)
1538 if not target_vdu_id
:
1539 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1541 if not vdu_id
: # for the VNF case
1542 if db_vnfr
.get("status") == "ERROR":
1544 "Cannot inject ssh-key because target VNF is in error state"
1546 ip_address
= db_vnfr
.get("ip-address")
1552 for x
in get_iterable(db_vnfr
, "vdur")
1553 if x
.get("ip-address") == ip_address
1561 for x
in get_iterable(db_vnfr
, "vdur")
1562 if x
.get("vdu-id-ref") == vdu_id
1563 and x
.get("count-index") == vdu_index
1569 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1570 ): # If only one, this should be the target vdu
1571 vdur
= db_vnfr
["vdur"][0]
1574 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1575 vnfr_id
, vdu_id
, vdu_index
1578 # New generation RO stores information at "vim_info"
1581 if vdur
.get("vim_info"):
1583 t
for t
in vdur
["vim_info"]
1584 ) # there should be only one key
1585 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1587 vdur
.get("pdu-type")
1588 or vdur
.get("status") == "ACTIVE"
1589 or ng_ro_status
== "ACTIVE"
1591 ip_address
= vdur
.get("ip-address")
1594 target_vdu_id
= vdur
["vdu-id-ref"]
1595 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1597 "Cannot inject ssh-key because target VM is in error state"
1600 if not target_vdu_id
:
1603 # inject public key into machine
1604 if pub_key
and user
:
1605 self
.logger
.debug(logging_text
+ "Inserting RO key")
1606 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1607 if vdur
.get("pdu-type"):
1608 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1613 "action": "inject_ssh_key",
1617 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1619 desc
= await self
.RO
.deploy(nsr_id
, target
)
1620 action_id
= desc
["action_id"]
1621 await self
._wait
_ng
_ro
(
1622 nsr_id
, action_id
, timeout
=600, operation
="instantiation"
1625 except NgRoException
as e
:
1627 "Reaching max tries injecting key. Error: {}".format(e
)
1634 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1636 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1638 my_vca
= vca_deployed_list
[vca_index
]
1639 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1640 # vdu or kdu: no dependencies
1644 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1645 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1646 configuration_status_list
= db_nsr
["configurationStatus"]
1647 for index
, vca_deployed
in enumerate(configuration_status_list
):
1648 if index
== vca_index
:
1651 if not my_vca
.get("member-vnf-index") or (
1652 vca_deployed
.get("member-vnf-index")
1653 == my_vca
.get("member-vnf-index")
1655 internal_status
= configuration_status_list
[index
].get("status")
1656 if internal_status
== "READY":
1658 elif internal_status
== "BROKEN":
1660 "Configuration aborted because dependent charm/s has failed"
1665 # no dependencies, return
1667 await asyncio
.sleep(10)
1670 raise LcmException("Configuration aborted because dependent charm/s timeout")
1672 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1675 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1677 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1678 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1681 async def instantiate_N2VC(
1699 ee_config_descriptor
,
1701 nsr_id
= db_nsr
["_id"]
1702 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1703 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1704 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1705 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1707 "collection": "nsrs",
1708 "filter": {"_id": nsr_id
},
1709 "path": db_update_entry
,
1714 element_under_configuration
= nsr_id
1718 vnfr_id
= db_vnfr
["_id"]
1719 osm_config
["osm"]["vnf_id"] = vnfr_id
1721 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1723 if vca_type
== "native_charm":
1726 index_number
= vdu_index
or 0
1729 element_type
= "VNF"
1730 element_under_configuration
= vnfr_id
1731 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1733 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1734 element_type
= "VDU"
1735 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1736 osm_config
["osm"]["vdu_id"] = vdu_id
1738 namespace
+= ".{}".format(kdu_name
)
1739 element_type
= "KDU"
1740 element_under_configuration
= kdu_name
1741 osm_config
["osm"]["kdu_name"] = kdu_name
1744 if base_folder
["pkg-dir"]:
1745 artifact_path
= "{}/{}/{}/{}".format(
1746 base_folder
["folder"],
1747 base_folder
["pkg-dir"],
1750 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1755 artifact_path
= "{}/Scripts/{}/{}/".format(
1756 base_folder
["folder"],
1759 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1764 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1766 # get initial_config_primitive_list that applies to this element
1767 initial_config_primitive_list
= config_descriptor
.get(
1768 "initial-config-primitive"
1772 "Initial config primitive list > {}".format(
1773 initial_config_primitive_list
1777 # add config if not present for NS charm
1778 ee_descriptor_id
= ee_config_descriptor
.get("id")
1779 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1780 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1781 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1785 "Initial config primitive list #2 > {}".format(
1786 initial_config_primitive_list
1789 # n2vc_redesign STEP 3.1
1790 # find old ee_id if exists
1791 ee_id
= vca_deployed
.get("ee_id")
1793 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1794 # create or register execution environment in VCA
1795 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1796 self
._write
_configuration
_status
(
1798 vca_index
=vca_index
,
1800 element_under_configuration
=element_under_configuration
,
1801 element_type
=element_type
,
1804 step
= "create execution environment"
1805 self
.logger
.debug(logging_text
+ step
)
1809 if vca_type
== "k8s_proxy_charm":
1810 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1811 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1812 namespace
=namespace
,
1813 artifact_path
=artifact_path
,
1817 elif vca_type
== "helm" or vca_type
== "helm-v3":
1818 ee_id
, credentials
= await self
.vca_map
[
1820 ].create_execution_environment(
1821 namespace
=namespace
,
1825 artifact_path
=artifact_path
,
1826 chart_model
=vca_name
,
1830 ee_id
, credentials
= await self
.vca_map
[
1832 ].create_execution_environment(
1833 namespace
=namespace
,
1839 elif vca_type
== "native_charm":
1840 step
= "Waiting to VM being up and getting IP address"
1841 self
.logger
.debug(logging_text
+ step
)
1842 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1851 credentials
= {"hostname": rw_mgmt_ip
}
1853 username
= deep_get(
1854 config_descriptor
, ("config-access", "ssh-access", "default-user")
1856 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1857 # merged. Meanwhile let's get username from initial-config-primitive
1858 if not username
and initial_config_primitive_list
:
1859 for config_primitive
in initial_config_primitive_list
:
1860 for param
in config_primitive
.get("parameter", ()):
1861 if param
["name"] == "ssh-username":
1862 username
= param
["value"]
1866 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1867 "'config-access.ssh-access.default-user'"
1869 credentials
["username"] = username
1870 # n2vc_redesign STEP 3.2
1872 self
._write
_configuration
_status
(
1874 vca_index
=vca_index
,
1875 status
="REGISTERING",
1876 element_under_configuration
=element_under_configuration
,
1877 element_type
=element_type
,
1880 step
= "register execution environment {}".format(credentials
)
1881 self
.logger
.debug(logging_text
+ step
)
1882 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1883 credentials
=credentials
,
1884 namespace
=namespace
,
1889 # for compatibility with MON/POL modules, the need model and application name at database
1890 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1891 ee_id_parts
= ee_id
.split(".")
1892 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1893 if len(ee_id_parts
) >= 2:
1894 model_name
= ee_id_parts
[0]
1895 application_name
= ee_id_parts
[1]
1896 db_nsr_update
[db_update_entry
+ "model"] = model_name
1897 db_nsr_update
[db_update_entry
+ "application"] = application_name
1899 # n2vc_redesign STEP 3.3
1900 step
= "Install configuration Software"
1902 self
._write
_configuration
_status
(
1904 vca_index
=vca_index
,
1905 status
="INSTALLING SW",
1906 element_under_configuration
=element_under_configuration
,
1907 element_type
=element_type
,
1908 other_update
=db_nsr_update
,
1911 # TODO check if already done
1912 self
.logger
.debug(logging_text
+ step
)
1914 if vca_type
== "native_charm":
1915 config_primitive
= next(
1916 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
1919 if config_primitive
:
1920 config
= self
._map
_primitive
_params
(
1921 config_primitive
, {}, deploy_params
1924 if vca_type
== "lxc_proxy_charm":
1925 if element_type
== "NS":
1926 num_units
= db_nsr
.get("config-units") or 1
1927 elif element_type
== "VNF":
1928 num_units
= db_vnfr
.get("config-units") or 1
1929 elif element_type
== "VDU":
1930 for v
in db_vnfr
["vdur"]:
1931 if vdu_id
== v
["vdu-id-ref"]:
1932 num_units
= v
.get("config-units") or 1
1934 if vca_type
!= "k8s_proxy_charm":
1935 await self
.vca_map
[vca_type
].install_configuration_sw(
1937 artifact_path
=artifact_path
,
1940 num_units
=num_units
,
1945 # write in db flag of configuration_sw already installed
1947 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
1950 # add relations for this VCA (wait for other peers related with this VCA)
1951 is_relation_added
= await self
._add
_vca
_relations
(
1952 logging_text
=logging_text
,
1955 vca_index
=vca_index
,
1958 if not is_relation_added
:
1959 raise LcmException("Relations could not be added to VCA.")
1961 # if SSH access is required, then get execution environment SSH public
1962 # if native charm we have waited already to VM be UP
1963 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1966 # self.logger.debug("get ssh key block")
1968 config_descriptor
, ("config-access", "ssh-access", "required")
1970 # self.logger.debug("ssh key needed")
1971 # Needed to inject a ssh key
1974 ("config-access", "ssh-access", "default-user"),
1976 step
= "Install configuration Software, getting public ssh key"
1977 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
1978 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
1981 step
= "Insert public key into VM user={} ssh_key={}".format(
1985 # self.logger.debug("no need to get ssh key")
1986 step
= "Waiting to VM being up and getting IP address"
1987 self
.logger
.debug(logging_text
+ step
)
1989 # default rw_mgmt_ip to None, avoiding the non definition of the variable
1992 # n2vc_redesign STEP 5.1
1993 # wait for RO (ip-address) Insert pub_key into VM
1996 rw_mgmt_ip
, services
= await self
.wait_kdu_up(
1997 logging_text
, nsr_id
, vnfr_id
, kdu_name
1999 vnfd
= self
.db
.get_one(
2001 {"_id": f
'{db_vnfr["vnfd-id"]}:{db_vnfr["revision"]}'},
2003 kdu
= get_kdu(vnfd
, kdu_name
)
2005 service
["name"] for service
in get_kdu_services(kdu
)
2007 exposed_services
= []
2008 for service
in services
:
2009 if any(s
in service
["name"] for s
in kdu_services
):
2010 exposed_services
.append(service
)
2011 await self
.vca_map
[vca_type
].exec_primitive(
2013 primitive_name
="config",
2015 "osm-config": json
.dumps(
2017 k8s
={"services": exposed_services
}
2024 # This verification is needed in order to avoid trying to add a public key
2025 # to a VM, when the VNF is a KNF (in the edge case where the user creates a VCA
2026 # for a KNF and not for its KDUs, the previous verification gives False, and the code
2027 # jumps to this block, meaning that there is the need to verify if the VNF is actually a VNF
2029 elif db_vnfr
.get("vdur"):
2030 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
2040 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
2042 # store rw_mgmt_ip in deploy params for later replacement
2043 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
2045 # n2vc_redesign STEP 6 Execute initial config primitive
2046 step
= "execute initial config primitive"
2048 # wait for dependent primitives execution (NS -> VNF -> VDU)
2049 if initial_config_primitive_list
:
2050 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
2052 # stage, in function of element type: vdu, kdu, vnf or ns
2053 my_vca
= vca_deployed_list
[vca_index
]
2054 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
2056 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
2057 elif my_vca
.get("member-vnf-index"):
2059 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
2062 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
2064 self
._write
_configuration
_status
(
2065 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
2068 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2070 check_if_terminated_needed
= True
2071 for initial_config_primitive
in initial_config_primitive_list
:
2072 # adding information on the vca_deployed if it is a NS execution environment
2073 if not vca_deployed
["member-vnf-index"]:
2074 deploy_params
["ns_config_info"] = json
.dumps(
2075 self
._get
_ns
_config
_info
(nsr_id
)
2077 # TODO check if already done
2078 primitive_params_
= self
._map
_primitive
_params
(
2079 initial_config_primitive
, {}, deploy_params
2082 step
= "execute primitive '{}' params '{}'".format(
2083 initial_config_primitive
["name"], primitive_params_
2085 self
.logger
.debug(logging_text
+ step
)
2086 await self
.vca_map
[vca_type
].exec_primitive(
2088 primitive_name
=initial_config_primitive
["name"],
2089 params_dict
=primitive_params_
,
2094 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2095 if check_if_terminated_needed
:
2096 if config_descriptor
.get("terminate-config-primitive"):
2098 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
2100 check_if_terminated_needed
= False
2102 # TODO register in database that primitive is done
2104 # STEP 7 Configure metrics
2105 if vca_type
== "helm" or vca_type
== "helm-v3":
2106 # TODO: review for those cases where the helm chart is a reference and
2107 # is not part of the NF package
2108 prometheus_jobs
= await self
.extract_prometheus_scrape_jobs(
2110 artifact_path
=artifact_path
,
2111 ee_config_descriptor
=ee_config_descriptor
,
2114 target_ip
=rw_mgmt_ip
,
2115 element_type
=element_type
,
2116 vnf_member_index
=db_vnfr
.get("member-vnf-index-ref", ""),
2118 vdu_index
=vdu_index
,
2120 kdu_index
=kdu_index
,
2126 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
2129 for job
in prometheus_jobs
:
2132 {"job_name": job
["job_name"]},
2135 fail_on_empty
=False,
2138 step
= "instantiated at VCA"
2139 self
.logger
.debug(logging_text
+ step
)
2141 self
._write
_configuration
_status
(
2142 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
2145 except Exception as e
: # TODO not use Exception but N2VC exception
2146 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2148 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
2151 "Exception while {} : {}".format(step
, e
), exc_info
=True
2153 self
._write
_configuration
_status
(
2154 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2156 raise LcmException("{}. {}".format(step
, e
)) from e
2158 def _write_ns_status(
2162 current_operation
: str,
2163 current_operation_id
: str,
2164 error_description
: str = None,
2165 error_detail
: str = None,
2166 other_update
: dict = None,
2169 Update db_nsr fields.
2172 :param current_operation:
2173 :param current_operation_id:
2174 :param error_description:
2175 :param error_detail:
2176 :param other_update: Other required changes at database if provided, will be cleared
2180 db_dict
= other_update
or {}
2183 ] = current_operation_id
# for backward compatibility
2184 db_dict
["_admin.current-operation"] = current_operation_id
2185 db_dict
["_admin.operation-type"] = (
2186 current_operation
if current_operation
!= "IDLE" else None
2188 db_dict
["currentOperation"] = current_operation
2189 db_dict
["currentOperationID"] = current_operation_id
2190 db_dict
["errorDescription"] = error_description
2191 db_dict
["errorDetail"] = error_detail
2194 db_dict
["nsState"] = ns_state
2195 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2196 except DbException
as e
:
2197 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2199 def _write_op_status(
2203 error_message
: str = None,
2204 queuePosition
: int = 0,
2205 operation_state
: str = None,
2206 other_update
: dict = None,
2209 db_dict
= other_update
or {}
2210 db_dict
["queuePosition"] = queuePosition
2211 if isinstance(stage
, list):
2212 db_dict
["stage"] = stage
[0]
2213 db_dict
["detailed-status"] = " ".join(stage
)
2214 elif stage
is not None:
2215 db_dict
["stage"] = str(stage
)
2217 if error_message
is not None:
2218 db_dict
["errorMessage"] = error_message
2219 if operation_state
is not None:
2220 db_dict
["operationState"] = operation_state
2221 db_dict
["statusEnteredTime"] = time()
2222 self
.update_db_2("nslcmops", op_id
, db_dict
)
2223 except DbException
as e
:
2225 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2228 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2230 nsr_id
= db_nsr
["_id"]
2231 # configurationStatus
2232 config_status
= db_nsr
.get("configurationStatus")
2235 "configurationStatus.{}.status".format(index
): status
2236 for index
, v
in enumerate(config_status
)
2240 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2242 except DbException
as e
:
2244 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2247 def _write_configuration_status(
2252 element_under_configuration
: str = None,
2253 element_type
: str = None,
2254 other_update
: dict = None,
2256 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2257 # .format(vca_index, status))
2260 db_path
= "configurationStatus.{}.".format(vca_index
)
2261 db_dict
= other_update
or {}
2263 db_dict
[db_path
+ "status"] = status
2264 if element_under_configuration
:
2266 db_path
+ "elementUnderConfiguration"
2267 ] = element_under_configuration
2269 db_dict
[db_path
+ "elementType"] = element_type
2270 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2271 except DbException
as e
:
2273 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2274 status
, nsr_id
, vca_index
, e
2278 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2280 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2281 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2282 Database is used because the result can be obtained from a different LCM worker in case of HA.
2283 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2284 :param db_nslcmop: database content of nslcmop
2285 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2286 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2287 computed 'vim-account-id'
2290 nslcmop_id
= db_nslcmop
["_id"]
2291 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2292 if placement_engine
== "PLA":
2294 logging_text
+ "Invoke and wait for placement optimization"
2296 await self
.msg
.aiowrite(
2297 "pla", "get_placement", {"nslcmopId": nslcmop_id
}, loop
=self
.loop
2299 db_poll_interval
= 5
2300 wait
= db_poll_interval
* 10
2302 while not pla_result
and wait
>= 0:
2303 await asyncio
.sleep(db_poll_interval
)
2304 wait
-= db_poll_interval
2305 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2306 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2310 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2313 for pla_vnf
in pla_result
["vnf"]:
2314 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2315 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2320 {"_id": vnfr
["_id"]},
2321 {"vim-account-id": pla_vnf
["vimAccountId"]},
2324 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2327 def update_nsrs_with_pla_result(self
, params
):
2329 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2331 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2333 except Exception as e
:
2334 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2336 async def instantiate(self
, nsr_id
, nslcmop_id
):
2339 :param nsr_id: ns instance to deploy
2340 :param nslcmop_id: operation to run
2344 # Try to lock HA task here
2345 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2346 if not task_is_locked_by_me
:
2348 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2352 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2353 self
.logger
.debug(logging_text
+ "Enter")
2355 # get all needed from database
2357 # database nsrs record
2360 # database nslcmops record
2363 # update operation on nsrs
2365 # update operation on nslcmops
2366 db_nslcmop_update
= {}
2368 timeout_ns_deploy
= self
.timeout
.ns_deploy
2370 nslcmop_operation_state
= None
2371 db_vnfrs
= {} # vnf's info indexed by member-index
2373 tasks_dict_info
= {} # from task to info text
2377 "Stage 1/5: preparation of the environment.",
2378 "Waiting for previous operations to terminate.",
2381 # ^ stage, step, VIM progress
2383 # wait for any previous tasks in process
2384 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2386 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2387 stage
[1] = "Reading from database."
2388 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2389 db_nsr_update
["detailed-status"] = "creating"
2390 db_nsr_update
["operational-status"] = "init"
2391 self
._write
_ns
_status
(
2393 ns_state
="BUILDING",
2394 current_operation
="INSTANTIATING",
2395 current_operation_id
=nslcmop_id
,
2396 other_update
=db_nsr_update
,
2398 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2400 # read from db: operation
2401 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2402 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2403 if db_nslcmop
["operationParams"].get("additionalParamsForVnf"):
2404 db_nslcmop
["operationParams"]["additionalParamsForVnf"] = json
.loads(
2405 db_nslcmop
["operationParams"]["additionalParamsForVnf"]
2407 ns_params
= db_nslcmop
.get("operationParams")
2408 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2409 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2412 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2413 self
.logger
.debug(logging_text
+ stage
[1])
2414 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2415 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2416 self
.logger
.debug(logging_text
+ stage
[1])
2417 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2418 self
.fs
.sync(db_nsr
["nsd-id"])
2420 # nsr_name = db_nsr["name"] # TODO short-name??
2422 # read from db: vnf's of this ns
2423 stage
[1] = "Getting vnfrs from db."
2424 self
.logger
.debug(logging_text
+ stage
[1])
2425 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2427 # read from db: vnfd's for every vnf
2428 db_vnfds
= [] # every vnfd data
2430 # for each vnf in ns, read vnfd
2431 for vnfr
in db_vnfrs_list
:
2432 if vnfr
.get("kdur"):
2434 for kdur
in vnfr
["kdur"]:
2435 if kdur
.get("additionalParams"):
2436 kdur
["additionalParams"] = json
.loads(
2437 kdur
["additionalParams"]
2439 kdur_list
.append(kdur
)
2440 vnfr
["kdur"] = kdur_list
2442 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2443 vnfd_id
= vnfr
["vnfd-id"]
2444 vnfd_ref
= vnfr
["vnfd-ref"]
2445 self
.fs
.sync(vnfd_id
)
2447 # if we haven't this vnfd, read it from db
2448 if vnfd_id
not in db_vnfds
:
2450 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2453 self
.logger
.debug(logging_text
+ stage
[1])
2454 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2457 db_vnfds
.append(vnfd
)
2459 # Get or generates the _admin.deployed.VCA list
2460 vca_deployed_list
= None
2461 if db_nsr
["_admin"].get("deployed"):
2462 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2463 if vca_deployed_list
is None:
2464 vca_deployed_list
= []
2465 configuration_status_list
= []
2466 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2467 db_nsr_update
["configurationStatus"] = configuration_status_list
2468 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2469 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2470 elif isinstance(vca_deployed_list
, dict):
2471 # maintain backward compatibility. Change a dict to list at database
2472 vca_deployed_list
= list(vca_deployed_list
.values())
2473 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2474 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2477 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2479 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2480 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2482 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2483 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2484 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2486 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2489 # n2vc_redesign STEP 2 Deploy Network Scenario
2490 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2491 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2493 stage
[1] = "Deploying KDUs."
2494 # self.logger.debug(logging_text + "Before deploy_kdus")
2495 # Call to deploy_kdus in case exists the "vdu:kdu" param
2496 await self
.deploy_kdus(
2497 logging_text
=logging_text
,
2499 nslcmop_id
=nslcmop_id
,
2502 task_instantiation_info
=tasks_dict_info
,
2505 stage
[1] = "Getting VCA public key."
2506 # n2vc_redesign STEP 1 Get VCA public ssh-key
2507 # feature 1429. Add n2vc public key to needed VMs
2508 n2vc_key
= self
.n2vc
.get_public_key()
2509 n2vc_key_list
= [n2vc_key
]
2510 if self
.vca_config
.public_key
:
2511 n2vc_key_list
.append(self
.vca_config
.public_key
)
2513 stage
[1] = "Deploying NS at VIM."
2514 task_ro
= asyncio
.ensure_future(
2515 self
.instantiate_RO(
2516 logging_text
=logging_text
,
2520 db_nslcmop
=db_nslcmop
,
2523 n2vc_key_list
=n2vc_key_list
,
2527 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2528 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2530 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2531 stage
[1] = "Deploying Execution Environments."
2532 self
.logger
.debug(logging_text
+ stage
[1])
2534 # create namespace and certificate if any helm based EE is present in the NS
2535 if check_helm_ee_in_ns(db_vnfds
):
2536 # TODO: create EE namespace
2537 # create TLS certificates
2538 await self
.vca_map
["helm-v3"].create_tls_certificate(
2539 secret_name
="ee-tls-{}".format(nsr_id
),
2542 usage
="server auth",
2545 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2546 for vnf_profile
in get_vnf_profiles(nsd
):
2547 vnfd_id
= vnf_profile
["vnfd-id"]
2548 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2549 member_vnf_index
= str(vnf_profile
["id"])
2550 db_vnfr
= db_vnfrs
[member_vnf_index
]
2551 base_folder
= vnfd
["_admin"]["storage"]
2558 # Get additional parameters
2559 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2560 if db_vnfr
.get("additionalParamsForVnf"):
2561 deploy_params
.update(
2562 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2565 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2566 if descriptor_config
:
2568 logging_text
=logging_text
2569 + "member_vnf_index={} ".format(member_vnf_index
),
2572 nslcmop_id
=nslcmop_id
,
2578 member_vnf_index
=member_vnf_index
,
2579 vdu_index
=vdu_index
,
2580 kdu_index
=kdu_index
,
2582 deploy_params
=deploy_params
,
2583 descriptor_config
=descriptor_config
,
2584 base_folder
=base_folder
,
2585 task_instantiation_info
=tasks_dict_info
,
2589 # Deploy charms for each VDU that supports one.
2590 for vdud
in get_vdu_list(vnfd
):
2592 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2593 vdur
= find_in_list(
2594 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2597 if vdur
.get("additionalParams"):
2598 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2600 deploy_params_vdu
= deploy_params
2601 deploy_params_vdu
["OSM"] = get_osm_params(
2602 db_vnfr
, vdu_id
, vdu_count_index
=0
2604 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2606 self
.logger
.debug("VDUD > {}".format(vdud
))
2608 "Descriptor config > {}".format(descriptor_config
)
2610 if descriptor_config
:
2614 for vdu_index
in range(vdud_count
):
2615 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2617 logging_text
=logging_text
2618 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2619 member_vnf_index
, vdu_id
, vdu_index
2623 nslcmop_id
=nslcmop_id
,
2629 kdu_index
=kdu_index
,
2630 member_vnf_index
=member_vnf_index
,
2631 vdu_index
=vdu_index
,
2633 deploy_params
=deploy_params_vdu
,
2634 descriptor_config
=descriptor_config
,
2635 base_folder
=base_folder
,
2636 task_instantiation_info
=tasks_dict_info
,
2639 for kdud
in get_kdu_list(vnfd
):
2640 kdu_name
= kdud
["name"]
2641 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2642 if descriptor_config
:
2646 kdu_index
, kdur
= next(
2648 for x
in enumerate(db_vnfr
["kdur"])
2649 if x
[1]["kdu-name"] == kdu_name
2651 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2652 if kdur
.get("additionalParams"):
2653 deploy_params_kdu
.update(
2654 parse_yaml_strings(kdur
["additionalParams"].copy())
2658 logging_text
=logging_text
,
2661 nslcmop_id
=nslcmop_id
,
2667 member_vnf_index
=member_vnf_index
,
2668 vdu_index
=vdu_index
,
2669 kdu_index
=kdu_index
,
2671 deploy_params
=deploy_params_kdu
,
2672 descriptor_config
=descriptor_config
,
2673 base_folder
=base_folder
,
2674 task_instantiation_info
=tasks_dict_info
,
2678 # Check if this NS has a charm configuration
2679 descriptor_config
= nsd
.get("ns-configuration")
2680 if descriptor_config
and descriptor_config
.get("juju"):
2683 member_vnf_index
= None
2690 # Get additional parameters
2691 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2692 if db_nsr
.get("additionalParamsForNs"):
2693 deploy_params
.update(
2694 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2696 base_folder
= nsd
["_admin"]["storage"]
2698 logging_text
=logging_text
,
2701 nslcmop_id
=nslcmop_id
,
2707 member_vnf_index
=member_vnf_index
,
2708 vdu_index
=vdu_index
,
2709 kdu_index
=kdu_index
,
2711 deploy_params
=deploy_params
,
2712 descriptor_config
=descriptor_config
,
2713 base_folder
=base_folder
,
2714 task_instantiation_info
=tasks_dict_info
,
2718 # rest of staff will be done at finally
2721 ROclient
.ROClientException
,
2727 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
2730 except asyncio
.CancelledError
:
2732 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
2734 exc
= "Operation was cancelled"
2735 except Exception as e
:
2736 exc
= traceback
.format_exc()
2737 self
.logger
.critical(
2738 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
2743 error_list
.append(str(exc
))
2745 # wait for pending tasks
2747 stage
[1] = "Waiting for instantiate pending tasks."
2748 self
.logger
.debug(logging_text
+ stage
[1])
2749 error_list
+= await self
._wait
_for
_tasks
(
2757 stage
[1] = stage
[2] = ""
2758 except asyncio
.CancelledError
:
2759 error_list
.append("Cancelled")
2760 # TODO cancel all tasks
2761 except Exception as exc
:
2762 error_list
.append(str(exc
))
2764 # update operation-status
2765 db_nsr_update
["operational-status"] = "running"
2766 # let's begin with VCA 'configured' status (later we can change it)
2767 db_nsr_update
["config-status"] = "configured"
2768 for task
, task_name
in tasks_dict_info
.items():
2769 if not task
.done() or task
.cancelled() or task
.exception():
2770 if task_name
.startswith(self
.task_name_deploy_vca
):
2771 # A N2VC task is pending
2772 db_nsr_update
["config-status"] = "failed"
2774 # RO or KDU task is pending
2775 db_nsr_update
["operational-status"] = "failed"
2777 # update status at database
2779 error_detail
= ". ".join(error_list
)
2780 self
.logger
.error(logging_text
+ error_detail
)
2781 error_description_nslcmop
= "{} Detail: {}".format(
2782 stage
[0], error_detail
2784 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
2785 nslcmop_id
, stage
[0]
2788 db_nsr_update
["detailed-status"] = (
2789 error_description_nsr
+ " Detail: " + error_detail
2791 db_nslcmop_update
["detailed-status"] = error_detail
2792 nslcmop_operation_state
= "FAILED"
2796 error_description_nsr
= error_description_nslcmop
= None
2798 db_nsr_update
["detailed-status"] = "Done"
2799 db_nslcmop_update
["detailed-status"] = "Done"
2800 nslcmop_operation_state
= "COMPLETED"
2803 self
._write
_ns
_status
(
2806 current_operation
="IDLE",
2807 current_operation_id
=None,
2808 error_description
=error_description_nsr
,
2809 error_detail
=error_detail
,
2810 other_update
=db_nsr_update
,
2812 self
._write
_op
_status
(
2815 error_message
=error_description_nslcmop
,
2816 operation_state
=nslcmop_operation_state
,
2817 other_update
=db_nslcmop_update
,
2820 if nslcmop_operation_state
:
2822 await self
.msg
.aiowrite(
2827 "nslcmop_id": nslcmop_id
,
2828 "operationState": nslcmop_operation_state
,
2832 except Exception as e
:
2834 logging_text
+ "kafka_write notification Exception {}".format(e
)
2837 self
.logger
.debug(logging_text
+ "Exit")
2838 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2840 def _get_vnfd(self
, vnfd_id
: str, projects_read
: str, cached_vnfds
: Dict
[str, Any
]):
2841 if vnfd_id
not in cached_vnfds
:
2842 cached_vnfds
[vnfd_id
] = self
.db
.get_one(
2843 "vnfds", {"id": vnfd_id
, "_admin.projects_read": projects_read
}
2845 return cached_vnfds
[vnfd_id
]
2847 def _get_vnfr(self
, nsr_id
: str, vnf_profile_id
: str, cached_vnfrs
: Dict
[str, Any
]):
2848 if vnf_profile_id
not in cached_vnfrs
:
2849 cached_vnfrs
[vnf_profile_id
] = self
.db
.get_one(
2852 "member-vnf-index-ref": vnf_profile_id
,
2853 "nsr-id-ref": nsr_id
,
2856 return cached_vnfrs
[vnf_profile_id
]
2858 def _is_deployed_vca_in_relation(
2859 self
, vca
: DeployedVCA
, relation
: Relation
2862 for endpoint
in (relation
.provider
, relation
.requirer
):
2863 if endpoint
["kdu-resource-profile-id"]:
2866 vca
.vnf_profile_id
== endpoint
.vnf_profile_id
2867 and vca
.vdu_profile_id
== endpoint
.vdu_profile_id
2868 and vca
.execution_environment_ref
== endpoint
.execution_environment_ref
2874 def _update_ee_relation_data_with_implicit_data(
2875 self
, nsr_id
, nsd
, ee_relation_data
, cached_vnfds
, vnf_profile_id
: str = None
2877 ee_relation_data
= safe_get_ee_relation(
2878 nsr_id
, ee_relation_data
, vnf_profile_id
=vnf_profile_id
2880 ee_relation_level
= EELevel
.get_level(ee_relation_data
)
2881 if (ee_relation_level
in (EELevel
.VNF
, EELevel
.VDU
)) and not ee_relation_data
[
2882 "execution-environment-ref"
2884 vnf_profile
= get_vnf_profile(nsd
, ee_relation_data
["vnf-profile-id"])
2885 vnfd_id
= vnf_profile
["vnfd-id"]
2886 project
= nsd
["_admin"]["projects_read"][0]
2887 db_vnfd
= self
._get
_vnfd
(vnfd_id
, project
, cached_vnfds
)
2890 if ee_relation_level
== EELevel
.VNF
2891 else ee_relation_data
["vdu-profile-id"]
2893 ee
= get_juju_ee_ref(db_vnfd
, entity_id
)
2896 f
"not execution environments found for ee_relation {ee_relation_data}"
2898 ee_relation_data
["execution-environment-ref"] = ee
["id"]
2899 return ee_relation_data
2901 def _get_ns_relations(
2904 nsd
: Dict
[str, Any
],
2906 cached_vnfds
: Dict
[str, Any
],
2907 ) -> List
[Relation
]:
2909 db_ns_relations
= get_ns_configuration_relation_list(nsd
)
2910 for r
in db_ns_relations
:
2911 provider_dict
= None
2912 requirer_dict
= None
2913 if all(key
in r
for key
in ("provider", "requirer")):
2914 provider_dict
= r
["provider"]
2915 requirer_dict
= r
["requirer"]
2916 elif "entities" in r
:
2917 provider_id
= r
["entities"][0]["id"]
2920 "endpoint": r
["entities"][0]["endpoint"],
2922 if provider_id
!= nsd
["id"]:
2923 provider_dict
["vnf-profile-id"] = provider_id
2924 requirer_id
= r
["entities"][1]["id"]
2927 "endpoint": r
["entities"][1]["endpoint"],
2929 if requirer_id
!= nsd
["id"]:
2930 requirer_dict
["vnf-profile-id"] = requirer_id
2933 "provider/requirer or entities must be included in the relation."
2935 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2936 nsr_id
, nsd
, provider_dict
, cached_vnfds
2938 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2939 nsr_id
, nsd
, requirer_dict
, cached_vnfds
2941 provider
= EERelation(relation_provider
)
2942 requirer
= EERelation(relation_requirer
)
2943 relation
= Relation(r
["name"], provider
, requirer
)
2944 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
2946 relations
.append(relation
)
2949 def _get_vnf_relations(
2952 nsd
: Dict
[str, Any
],
2954 cached_vnfds
: Dict
[str, Any
],
2955 ) -> List
[Relation
]:
2957 if vca
.target_element
== "ns":
2958 self
.logger
.debug("VCA is a NS charm, not a VNF.")
2960 vnf_profile
= get_vnf_profile(nsd
, vca
.vnf_profile_id
)
2961 vnf_profile_id
= vnf_profile
["id"]
2962 vnfd_id
= vnf_profile
["vnfd-id"]
2963 project
= nsd
["_admin"]["projects_read"][0]
2964 db_vnfd
= self
._get
_vnfd
(vnfd_id
, project
, cached_vnfds
)
2965 db_vnf_relations
= get_relation_list(db_vnfd
, vnfd_id
)
2966 for r
in db_vnf_relations
:
2967 provider_dict
= None
2968 requirer_dict
= None
2969 if all(key
in r
for key
in ("provider", "requirer")):
2970 provider_dict
= r
["provider"]
2971 requirer_dict
= r
["requirer"]
2972 elif "entities" in r
:
2973 provider_id
= r
["entities"][0]["id"]
2976 "vnf-profile-id": vnf_profile_id
,
2977 "endpoint": r
["entities"][0]["endpoint"],
2979 if provider_id
!= vnfd_id
:
2980 provider_dict
["vdu-profile-id"] = provider_id
2981 requirer_id
= r
["entities"][1]["id"]
2984 "vnf-profile-id": vnf_profile_id
,
2985 "endpoint": r
["entities"][1]["endpoint"],
2987 if requirer_id
!= vnfd_id
:
2988 requirer_dict
["vdu-profile-id"] = requirer_id
2991 "provider/requirer or entities must be included in the relation."
2993 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2994 nsr_id
, nsd
, provider_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
2996 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2997 nsr_id
, nsd
, requirer_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
2999 provider
= EERelation(relation_provider
)
3000 requirer
= EERelation(relation_requirer
)
3001 relation
= Relation(r
["name"], provider
, requirer
)
3002 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
3004 relations
.append(relation
)
3007 def _get_kdu_resource_data(
3009 ee_relation
: EERelation
,
3010 db_nsr
: Dict
[str, Any
],
3011 cached_vnfds
: Dict
[str, Any
],
3012 ) -> DeployedK8sResource
:
3013 nsd
= get_nsd(db_nsr
)
3014 vnf_profiles
= get_vnf_profiles(nsd
)
3015 vnfd_id
= find_in_list(
3017 lambda vnf_profile
: vnf_profile
["id"] == ee_relation
.vnf_profile_id
,
3019 project
= nsd
["_admin"]["projects_read"][0]
3020 db_vnfd
= self
._get
_vnfd
(vnfd_id
, project
, cached_vnfds
)
3021 kdu_resource_profile
= get_kdu_resource_profile(
3022 db_vnfd
, ee_relation
.kdu_resource_profile_id
3024 kdu_name
= kdu_resource_profile
["kdu-name"]
3025 deployed_kdu
, _
= get_deployed_kdu(
3026 db_nsr
.get("_admin", ()).get("deployed", ()),
3028 ee_relation
.vnf_profile_id
,
3030 deployed_kdu
.update({"resource-name": kdu_resource_profile
["resource-name"]})
3033 def _get_deployed_component(
3035 ee_relation
: EERelation
,
3036 db_nsr
: Dict
[str, Any
],
3037 cached_vnfds
: Dict
[str, Any
],
3038 ) -> DeployedComponent
:
3039 nsr_id
= db_nsr
["_id"]
3040 deployed_component
= None
3041 ee_level
= EELevel
.get_level(ee_relation
)
3042 if ee_level
== EELevel
.NS
:
3043 vca
= get_deployed_vca(db_nsr
, {"vdu_id": None, "member-vnf-index": None})
3045 deployed_component
= DeployedVCA(nsr_id
, vca
)
3046 elif ee_level
== EELevel
.VNF
:
3047 vca
= get_deployed_vca(
3051 "member-vnf-index": ee_relation
.vnf_profile_id
,
3052 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3056 deployed_component
= DeployedVCA(nsr_id
, vca
)
3057 elif ee_level
== EELevel
.VDU
:
3058 vca
= get_deployed_vca(
3061 "vdu_id": ee_relation
.vdu_profile_id
,
3062 "member-vnf-index": ee_relation
.vnf_profile_id
,
3063 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3067 deployed_component
= DeployedVCA(nsr_id
, vca
)
3068 elif ee_level
== EELevel
.KDU
:
3069 kdu_resource_data
= self
._get
_kdu
_resource
_data
(
3070 ee_relation
, db_nsr
, cached_vnfds
3072 if kdu_resource_data
:
3073 deployed_component
= DeployedK8sResource(kdu_resource_data
)
3074 return deployed_component
3076 async def _add_relation(
3080 db_nsr
: Dict
[str, Any
],
3081 cached_vnfds
: Dict
[str, Any
],
3082 cached_vnfrs
: Dict
[str, Any
],
3084 deployed_provider
= self
._get
_deployed
_component
(
3085 relation
.provider
, db_nsr
, cached_vnfds
3087 deployed_requirer
= self
._get
_deployed
_component
(
3088 relation
.requirer
, db_nsr
, cached_vnfds
3092 and deployed_requirer
3093 and deployed_provider
.config_sw_installed
3094 and deployed_requirer
.config_sw_installed
3096 provider_db_vnfr
= (
3098 relation
.provider
.nsr_id
,
3099 relation
.provider
.vnf_profile_id
,
3102 if relation
.provider
.vnf_profile_id
3105 requirer_db_vnfr
= (
3107 relation
.requirer
.nsr_id
,
3108 relation
.requirer
.vnf_profile_id
,
3111 if relation
.requirer
.vnf_profile_id
3114 provider_vca_id
= self
.get_vca_id(provider_db_vnfr
, db_nsr
)
3115 requirer_vca_id
= self
.get_vca_id(requirer_db_vnfr
, db_nsr
)
3116 provider_relation_endpoint
= RelationEndpoint(
3117 deployed_provider
.ee_id
,
3119 relation
.provider
.endpoint
,
3121 requirer_relation_endpoint
= RelationEndpoint(
3122 deployed_requirer
.ee_id
,
3124 relation
.requirer
.endpoint
,
3127 await self
.vca_map
[vca_type
].add_relation(
3128 provider
=provider_relation_endpoint
,
3129 requirer
=requirer_relation_endpoint
,
3131 except N2VCException
as exception
:
3132 self
.logger
.error(exception
)
3133 raise LcmException(exception
)
3137 async def _add_vca_relations(
3143 timeout
: int = 3600,
3146 # 1. find all relations for this VCA
3147 # 2. wait for other peers related
3151 # STEP 1: find all relations for this VCA
3154 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3155 nsd
= get_nsd(db_nsr
)
3158 deployed_vca_dict
= get_deployed_vca_list(db_nsr
)[vca_index
]
3159 my_vca
= DeployedVCA(nsr_id
, deployed_vca_dict
)
3164 relations
.extend(self
._get
_ns
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3165 relations
.extend(self
._get
_vnf
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3167 # if no relations, terminate
3169 self
.logger
.debug(logging_text
+ " No relations")
3172 self
.logger
.debug(logging_text
+ " adding relations {}".format(relations
))
3179 if now
- start
>= timeout
:
3180 self
.logger
.error(logging_text
+ " : timeout adding relations")
3183 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3184 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3186 # for each relation, find the VCA's related
3187 for relation
in relations
.copy():
3188 added
= await self
._add
_relation
(
3196 relations
.remove(relation
)
3199 self
.logger
.debug("Relations added")
3201 await asyncio
.sleep(5.0)
3205 except Exception as e
:
3206 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
3209 async def _install_kdu(
3217 k8s_instance_info
: dict,
3218 k8params
: dict = None,
3223 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
3226 "collection": "nsrs",
3227 "filter": {"_id": nsr_id
},
3228 "path": nsr_db_path
,
3231 if k8s_instance_info
.get("kdu-deployment-name"):
3232 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
3234 kdu_instance
= self
.k8scluster_map
[
3236 ].generate_kdu_instance_name(
3237 db_dict
=db_dict_install
,
3238 kdu_model
=k8s_instance_info
["kdu-model"],
3239 kdu_name
=k8s_instance_info
["kdu-name"],
3242 # Update the nsrs table with the kdu-instance value
3246 _desc
={nsr_db_path
+ ".kdu-instance": kdu_instance
},
3249 # Update the nsrs table with the actual namespace being used, if the k8scluster-type is `juju` or
3250 # `juju-bundle`. This verification is needed because there is not a standard/homogeneous namespace
3251 # between the Helm Charts and Juju Bundles-based KNFs. If we found a way of having an homogeneous
3252 # namespace, this first verification could be removed, and the next step would be done for any kind
3254 # TODO -> find a way to have an homogeneous namespace between the Helm Charts and Juju Bundles-based
3255 # KNFs (Bug 2027: https://osm.etsi.org/bugzilla/show_bug.cgi?id=2027)
3256 if k8sclustertype
in ("juju", "juju-bundle"):
3257 # First, verify if the current namespace is present in the `_admin.projects_read` (if not, it means
3258 # that the user passed a namespace which he wants its KDU to be deployed in)
3264 "_admin.projects_write": k8s_instance_info
["namespace"],
3265 "_admin.projects_read": k8s_instance_info
["namespace"],
3271 f
"Updating namespace/model for Juju Bundle from {k8s_instance_info['namespace']} to {kdu_instance}"
3276 _desc
={f
"{nsr_db_path}.namespace": kdu_instance
},
3278 k8s_instance_info
["namespace"] = kdu_instance
3280 await self
.k8scluster_map
[k8sclustertype
].install(
3281 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3282 kdu_model
=k8s_instance_info
["kdu-model"],
3285 db_dict
=db_dict_install
,
3287 kdu_name
=k8s_instance_info
["kdu-name"],
3288 namespace
=k8s_instance_info
["namespace"],
3289 kdu_instance
=kdu_instance
,
3293 # Obtain services to obtain management service ip
3294 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
3295 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3296 kdu_instance
=kdu_instance
,
3297 namespace
=k8s_instance_info
["namespace"],
3300 # Obtain management service info (if exists)
3301 vnfr_update_dict
= {}
3302 kdu_config
= get_configuration(vnfd
, kdud
["name"])
3304 target_ee_list
= kdu_config
.get("execution-environment-list", [])
3309 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
3312 for service
in kdud
.get("service", [])
3313 if service
.get("mgmt-service")
3315 for mgmt_service
in mgmt_services
:
3316 for service
in services
:
3317 if service
["name"].startswith(mgmt_service
["name"]):
3318 # Mgmt service found, Obtain service ip
3319 ip
= service
.get("external_ip", service
.get("cluster_ip"))
3320 if isinstance(ip
, list) and len(ip
) == 1:
3324 "kdur.{}.ip-address".format(kdu_index
)
3327 # Check if must update also mgmt ip at the vnf
3328 service_external_cp
= mgmt_service
.get(
3329 "external-connection-point-ref"
3331 if service_external_cp
:
3333 deep_get(vnfd
, ("mgmt-interface", "cp"))
3334 == service_external_cp
3336 vnfr_update_dict
["ip-address"] = ip
3341 "external-connection-point-ref", ""
3343 == service_external_cp
,
3346 "kdur.{}.ip-address".format(kdu_index
)
3351 "Mgmt service name: {} not found".format(
3352 mgmt_service
["name"]
3356 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3357 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3359 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3362 and kdu_config
.get("initial-config-primitive")
3363 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3365 initial_config_primitive_list
= kdu_config
.get(
3366 "initial-config-primitive"
3368 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3370 for initial_config_primitive
in initial_config_primitive_list
:
3371 primitive_params_
= self
._map
_primitive
_params
(
3372 initial_config_primitive
, {}, {}
3375 await asyncio
.wait_for(
3376 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3377 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3378 kdu_instance
=kdu_instance
,
3379 primitive_name
=initial_config_primitive
["name"],
3380 params
=primitive_params_
,
3381 db_dict
=db_dict_install
,
3387 except Exception as e
:
3388 # Prepare update db with error and raise exception
3391 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3395 vnfr_data
.get("_id"),
3396 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3399 # ignore to keep original exception
3401 # reraise original error
3406 async def deploy_kdus(
3413 task_instantiation_info
,
3415 # Launch kdus if present in the descriptor
3417 k8scluster_id_2_uuic
= {
3418 "helm-chart-v3": {},
3423 async def _get_cluster_id(cluster_id
, cluster_type
):
3424 nonlocal k8scluster_id_2_uuic
3425 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3426 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3428 # check if K8scluster is creating and wait look if previous tasks in process
3429 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3430 "k8scluster", cluster_id
3433 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3434 task_name
, cluster_id
3436 self
.logger
.debug(logging_text
+ text
)
3437 await asyncio
.wait(task_dependency
, timeout
=3600)
3439 db_k8scluster
= self
.db
.get_one(
3440 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3442 if not db_k8scluster
:
3443 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3445 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3447 if cluster_type
== "helm-chart-v3":
3449 # backward compatibility for existing clusters that have not been initialized for helm v3
3450 k8s_credentials
= yaml
.safe_dump(
3451 db_k8scluster
.get("credentials")
3453 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3454 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3456 db_k8scluster_update
= {}
3457 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3458 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3459 db_k8scluster_update
[
3460 "_admin.helm-chart-v3.created"
3462 db_k8scluster_update
[
3463 "_admin.helm-chart-v3.operationalState"
3466 "k8sclusters", cluster_id
, db_k8scluster_update
3468 except Exception as e
:
3471 + "error initializing helm-v3 cluster: {}".format(str(e
))
3474 "K8s cluster '{}' has not been initialized for '{}'".format(
3475 cluster_id
, cluster_type
3480 "K8s cluster '{}' has not been initialized for '{}'".format(
3481 cluster_id
, cluster_type
3484 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3487 logging_text
+= "Deploy kdus: "
3490 db_nsr_update
= {"_admin.deployed.K8s": []}
3491 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3494 updated_cluster_list
= []
3495 updated_v3_cluster_list
= []
3497 for vnfr_data
in db_vnfrs
.values():
3498 vca_id
= self
.get_vca_id(vnfr_data
, {})
3499 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3500 # Step 0: Prepare and set parameters
3501 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3502 vnfd_id
= vnfr_data
.get("vnfd-id")
3503 vnfd_with_id
= find_in_list(
3504 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3508 for kdud
in vnfd_with_id
["kdu"]
3509 if kdud
["name"] == kdur
["kdu-name"]
3511 namespace
= kdur
.get("k8s-namespace")
3512 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3513 if kdur
.get("helm-chart"):
3514 kdumodel
= kdur
["helm-chart"]
3515 # Default version: helm3, if helm-version is v2 assign v2
3516 k8sclustertype
= "helm-chart-v3"
3517 self
.logger
.debug("kdur: {}".format(kdur
))
3519 kdur
.get("helm-version")
3520 and kdur
.get("helm-version") == "v2"
3522 k8sclustertype
= "helm-chart"
3523 elif kdur
.get("juju-bundle"):
3524 kdumodel
= kdur
["juju-bundle"]
3525 k8sclustertype
= "juju-bundle"
3528 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3529 "juju-bundle. Maybe an old NBI version is running".format(
3530 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3533 # check if kdumodel is a file and exists
3535 vnfd_with_id
= find_in_list(
3536 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3538 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3539 if storage
: # may be not present if vnfd has not artifacts
3540 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3541 if storage
["pkg-dir"]:
3542 filename
= "{}/{}/{}s/{}".format(
3549 filename
= "{}/Scripts/{}s/{}".format(
3554 if self
.fs
.file_exists(
3555 filename
, mode
="file"
3556 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3557 kdumodel
= self
.fs
.path
+ filename
3558 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3560 except Exception: # it is not a file
3563 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3564 step
= "Synchronize repos for k8s cluster '{}'".format(
3567 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3571 k8sclustertype
== "helm-chart"
3572 and cluster_uuid
not in updated_cluster_list
3574 k8sclustertype
== "helm-chart-v3"
3575 and cluster_uuid
not in updated_v3_cluster_list
3577 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3578 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3579 cluster_uuid
=cluster_uuid
3582 if del_repo_list
or added_repo_dict
:
3583 if k8sclustertype
== "helm-chart":
3585 "_admin.helm_charts_added." + item
: None
3586 for item
in del_repo_list
3589 "_admin.helm_charts_added." + item
: name
3590 for item
, name
in added_repo_dict
.items()
3592 updated_cluster_list
.append(cluster_uuid
)
3593 elif k8sclustertype
== "helm-chart-v3":
3595 "_admin.helm_charts_v3_added." + item
: None
3596 for item
in del_repo_list
3599 "_admin.helm_charts_v3_added." + item
: name
3600 for item
, name
in added_repo_dict
.items()
3602 updated_v3_cluster_list
.append(cluster_uuid
)
3604 logging_text
+ "repos synchronized on k8s cluster "
3605 "'{}' to_delete: {}, to_add: {}".format(
3606 k8s_cluster_id
, del_repo_list
, added_repo_dict
3611 {"_id": k8s_cluster_id
},
3617 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3618 vnfr_data
["member-vnf-index-ref"],
3622 k8s_instance_info
= {
3623 "kdu-instance": None,
3624 "k8scluster-uuid": cluster_uuid
,
3625 "k8scluster-type": k8sclustertype
,
3626 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3627 "kdu-name": kdur
["kdu-name"],
3628 "kdu-model": kdumodel
,
3629 "namespace": namespace
,
3630 "kdu-deployment-name": kdu_deployment_name
,
3632 db_path
= "_admin.deployed.K8s.{}".format(index
)
3633 db_nsr_update
[db_path
] = k8s_instance_info
3634 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3635 vnfd_with_id
= find_in_list(
3636 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3638 task
= asyncio
.ensure_future(
3647 k8params
=desc_params
,
3652 self
.lcm_tasks
.register(
3656 "instantiate_KDU-{}".format(index
),
3659 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3665 except (LcmException
, asyncio
.CancelledError
):
3667 except Exception as e
:
3668 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3669 if isinstance(e
, (N2VCException
, DbException
)):
3670 self
.logger
.error(logging_text
+ msg
)
3672 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3673 raise LcmException(msg
)
3676 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3696 task_instantiation_info
,
3699 # launch instantiate_N2VC in a asyncio task and register task object
3700 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3701 # if not found, create one entry and update database
3702 # fill db_nsr._admin.deployed.VCA.<index>
3705 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
3709 get_charm_name
= False
3710 if "execution-environment-list" in descriptor_config
:
3711 ee_list
= descriptor_config
.get("execution-environment-list", [])
3712 elif "juju" in descriptor_config
:
3713 ee_list
= [descriptor_config
] # ns charms
3714 if "execution-environment-list" not in descriptor_config
:
3715 # charm name is only required for ns charms
3716 get_charm_name
= True
3717 else: # other types as script are not supported
3720 for ee_item
in ee_list
:
3723 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3724 ee_item
.get("juju"), ee_item
.get("helm-chart")
3727 ee_descriptor_id
= ee_item
.get("id")
3728 if ee_item
.get("juju"):
3729 vca_name
= ee_item
["juju"].get("charm")
3731 charm_name
= self
.find_charm_name(db_nsr
, str(vca_name
))
3734 if ee_item
["juju"].get("charm") is not None
3737 if ee_item
["juju"].get("cloud") == "k8s":
3738 vca_type
= "k8s_proxy_charm"
3739 elif ee_item
["juju"].get("proxy") is False:
3740 vca_type
= "native_charm"
3741 elif ee_item
.get("helm-chart"):
3742 vca_name
= ee_item
["helm-chart"]
3743 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
3746 vca_type
= "helm-v3"
3749 logging_text
+ "skipping non juju neither charm configuration"
3754 for vca_index
, vca_deployed
in enumerate(
3755 db_nsr
["_admin"]["deployed"]["VCA"]
3757 if not vca_deployed
:
3760 vca_deployed
.get("member-vnf-index") == member_vnf_index
3761 and vca_deployed
.get("vdu_id") == vdu_id
3762 and vca_deployed
.get("kdu_name") == kdu_name
3763 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
3764 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
3768 # not found, create one.
3770 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
3773 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
3775 target
+= "/kdu/{}".format(kdu_name
)
3777 "target_element": target
,
3778 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3779 "member-vnf-index": member_vnf_index
,
3781 "kdu_name": kdu_name
,
3782 "vdu_count_index": vdu_index
,
3783 "operational-status": "init", # TODO revise
3784 "detailed-status": "", # TODO revise
3785 "step": "initial-deploy", # TODO revise
3787 "vdu_name": vdu_name
,
3789 "ee_descriptor_id": ee_descriptor_id
,
3790 "charm_name": charm_name
,
3794 # create VCA and configurationStatus in db
3796 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
3797 "configurationStatus.{}".format(vca_index
): dict(),
3799 self
.update_db_2("nsrs", nsr_id
, db_dict
)
3801 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
3803 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
3804 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
3805 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
3808 task_n2vc
= asyncio
.ensure_future(
3809 self
.instantiate_N2VC(
3810 logging_text
=logging_text
,
3811 vca_index
=vca_index
,
3817 vdu_index
=vdu_index
,
3818 kdu_index
=kdu_index
,
3819 deploy_params
=deploy_params
,
3820 config_descriptor
=descriptor_config
,
3821 base_folder
=base_folder
,
3822 nslcmop_id
=nslcmop_id
,
3826 ee_config_descriptor
=ee_item
,
3829 self
.lcm_tasks
.register(
3833 "instantiate_N2VC-{}".format(vca_index
),
3836 task_instantiation_info
[
3838 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
3839 member_vnf_index
or "", vdu_id
or ""
3843 def _create_nslcmop(nsr_id
, operation
, params
):
3845 Creates a ns-lcm-opp content to be stored at database.
3846 :param nsr_id: internal id of the instance
3847 :param operation: instantiate, terminate, scale, action, ...
3848 :param params: user parameters for the operation
3849 :return: dictionary following SOL005 format
3851 # Raise exception if invalid arguments
3852 if not (nsr_id
and operation
and params
):
3854 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3861 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3862 "operationState": "PROCESSING",
3863 "statusEnteredTime": now
,
3864 "nsInstanceId": nsr_id
,
3865 "lcmOperationType": operation
,
3867 "isAutomaticInvocation": False,
3868 "operationParams": params
,
3869 "isCancelPending": False,
3871 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
3872 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
3877 def _format_additional_params(self
, params
):
3878 params
= params
or {}
3879 for key
, value
in params
.items():
3880 if str(value
).startswith("!!yaml "):
3881 params
[key
] = yaml
.safe_load(value
[7:])
3884 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
3885 primitive
= seq
.get("name")
3886 primitive_params
= {}
3888 "member_vnf_index": vnf_index
,
3889 "primitive": primitive
,
3890 "primitive_params": primitive_params
,
3893 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
3897 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
3898 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
3899 if op
.get("operationState") == "COMPLETED":
3900 # b. Skip sub-operation
3901 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3902 return self
.SUBOPERATION_STATUS_SKIP
3904 # c. retry executing sub-operation
3905 # The sub-operation exists, and operationState != 'COMPLETED'
3906 # Update operationState = 'PROCESSING' to indicate a retry.
3907 operationState
= "PROCESSING"
3908 detailed_status
= "In progress"
3909 self
._update
_suboperation
_status
(
3910 db_nslcmop
, op_index
, operationState
, detailed_status
3912 # Return the sub-operation index
3913 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3914 # with arguments extracted from the sub-operation
3917 # Find a sub-operation where all keys in a matching dictionary must match
3918 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3919 def _find_suboperation(self
, db_nslcmop
, match
):
3920 if db_nslcmop
and match
:
3921 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
3922 for i
, op
in enumerate(op_list
):
3923 if all(op
.get(k
) == match
[k
] for k
in match
):
3925 return self
.SUBOPERATION_STATUS_NOT_FOUND
3927 # Update status for a sub-operation given its index
3928 def _update_suboperation_status(
3929 self
, db_nslcmop
, op_index
, operationState
, detailed_status
3931 # Update DB for HA tasks
3932 q_filter
= {"_id": db_nslcmop
["_id"]}
3934 "_admin.operations.{}.operationState".format(op_index
): operationState
,
3935 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
3938 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
3941 # Add sub-operation, return the index of the added sub-operation
3942 # Optionally, set operationState, detailed-status, and operationType
3943 # Status and type are currently set for 'scale' sub-operations:
3944 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3945 # 'detailed-status' : status message
3946 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3947 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3948 def _add_suboperation(
3956 mapped_primitive_params
,
3957 operationState
=None,
3958 detailed_status
=None,
3961 RO_scaling_info
=None,
3964 return self
.SUBOPERATION_STATUS_NOT_FOUND
3965 # Get the "_admin.operations" list, if it exists
3966 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
3967 op_list
= db_nslcmop_admin
.get("operations")
3968 # Create or append to the "_admin.operations" list
3970 "member_vnf_index": vnf_index
,
3972 "vdu_count_index": vdu_count_index
,
3973 "primitive": primitive
,
3974 "primitive_params": mapped_primitive_params
,
3977 new_op
["operationState"] = operationState
3979 new_op
["detailed-status"] = detailed_status
3981 new_op
["lcmOperationType"] = operationType
3983 new_op
["RO_nsr_id"] = RO_nsr_id
3985 new_op
["RO_scaling_info"] = RO_scaling_info
3987 # No existing operations, create key 'operations' with current operation as first list element
3988 db_nslcmop_admin
.update({"operations": [new_op
]})
3989 op_list
= db_nslcmop_admin
.get("operations")
3991 # Existing operations, append operation to list
3992 op_list
.append(new_op
)
3994 db_nslcmop_update
= {"_admin.operations": op_list
}
3995 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
3996 op_index
= len(op_list
) - 1
3999 # Helper methods for scale() sub-operations
4001 # pre-scale/post-scale:
4002 # Check for 3 different cases:
4003 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
4004 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
4005 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
4006 def _check_or_add_scale_suboperation(
4010 vnf_config_primitive
,
4014 RO_scaling_info
=None,
4016 # Find this sub-operation
4017 if RO_nsr_id
and RO_scaling_info
:
4018 operationType
= "SCALE-RO"
4020 "member_vnf_index": vnf_index
,
4021 "RO_nsr_id": RO_nsr_id
,
4022 "RO_scaling_info": RO_scaling_info
,
4026 "member_vnf_index": vnf_index
,
4027 "primitive": vnf_config_primitive
,
4028 "primitive_params": primitive_params
,
4029 "lcmOperationType": operationType
,
4031 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
4032 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
4033 # a. New sub-operation
4034 # The sub-operation does not exist, add it.
4035 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
4036 # The following parameters are set to None for all kind of scaling:
4038 vdu_count_index
= None
4040 if RO_nsr_id
and RO_scaling_info
:
4041 vnf_config_primitive
= None
4042 primitive_params
= None
4045 RO_scaling_info
= None
4046 # Initial status for sub-operation
4047 operationState
= "PROCESSING"
4048 detailed_status
= "In progress"
4049 # Add sub-operation for pre/post-scaling (zero or more operations)
4050 self
._add
_suboperation
(
4056 vnf_config_primitive
,
4064 return self
.SUBOPERATION_STATUS_NEW
4066 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
4067 # or op_index (operationState != 'COMPLETED')
4068 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
4070 # Function to return execution_environment id
4072 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
4073 # TODO vdu_index_count
4074 for vca
in vca_deployed_list
:
4075 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
4078 async def destroy_N2VC(
4086 exec_primitives
=True,
4091 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
4092 :param logging_text:
4094 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
4095 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
4096 :param vca_index: index in the database _admin.deployed.VCA
4097 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
4098 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
4099 not executed properly
4100 :param scaling_in: True destroys the application, False destroys the model
4101 :return: None or exception
4106 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
4107 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
4111 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
4113 # execute terminate_primitives
4115 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
4116 config_descriptor
.get("terminate-config-primitive"),
4117 vca_deployed
.get("ee_descriptor_id"),
4119 vdu_id
= vca_deployed
.get("vdu_id")
4120 vdu_count_index
= vca_deployed
.get("vdu_count_index")
4121 vdu_name
= vca_deployed
.get("vdu_name")
4122 vnf_index
= vca_deployed
.get("member-vnf-index")
4123 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
4124 for seq
in terminate_primitives
:
4125 # For each sequence in list, get primitive and call _ns_execute_primitive()
4126 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
4127 vnf_index
, seq
.get("name")
4129 self
.logger
.debug(logging_text
+ step
)
4130 # Create the primitive for each sequence, i.e. "primitive": "touch"
4131 primitive
= seq
.get("name")
4132 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
4137 self
._add
_suboperation
(
4144 mapped_primitive_params
,
4146 # Sub-operations: Call _ns_execute_primitive() instead of action()
4148 result
, result_detail
= await self
._ns
_execute
_primitive
(
4149 vca_deployed
["ee_id"],
4151 mapped_primitive_params
,
4155 except LcmException
:
4156 # this happens when VCA is not deployed. In this case it is not needed to terminate
4158 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
4159 if result
not in result_ok
:
4161 "terminate_primitive {} for vnf_member_index={} fails with "
4162 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
4164 # set that this VCA do not need terminated
4165 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
4169 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
4172 # Delete Prometheus Jobs if any
4173 # This uses NSR_ID, so it will destroy any jobs under this index
4174 self
.db
.del_list("prometheus_jobs", {"nsr_id": db_nslcmop
["nsInstanceId"]})
4177 await self
.vca_map
[vca_type
].delete_execution_environment(
4178 vca_deployed
["ee_id"],
4179 scaling_in
=scaling_in
,
4184 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
4185 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
4186 namespace
= "." + db_nsr
["_id"]
4188 await self
.n2vc
.delete_namespace(
4189 namespace
=namespace
,
4190 total_timeout
=self
.timeout
.charm_delete
,
4193 except N2VCNotFound
: # already deleted. Skip
4195 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
4197 async def terminate(self
, nsr_id
, nslcmop_id
):
4198 # Try to lock HA task here
4199 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4200 if not task_is_locked_by_me
:
4203 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4204 self
.logger
.debug(logging_text
+ "Enter")
4205 timeout_ns_terminate
= self
.timeout
.ns_terminate
4208 operation_params
= None
4210 error_list
= [] # annotates all failed error messages
4211 db_nslcmop_update
= {}
4212 autoremove
= False # autoremove after terminated
4213 tasks_dict_info
= {}
4216 "Stage 1/3: Preparing task.",
4217 "Waiting for previous operations to terminate.",
4220 # ^ contains [stage, step, VIM-status]
4222 # wait for any previous tasks in process
4223 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4225 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4226 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4227 operation_params
= db_nslcmop
.get("operationParams") or {}
4228 if operation_params
.get("timeout_ns_terminate"):
4229 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4230 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4231 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4233 db_nsr_update
["operational-status"] = "terminating"
4234 db_nsr_update
["config-status"] = "terminating"
4235 self
._write
_ns
_status
(
4237 ns_state
="TERMINATING",
4238 current_operation
="TERMINATING",
4239 current_operation_id
=nslcmop_id
,
4240 other_update
=db_nsr_update
,
4242 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4243 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4244 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4247 stage
[1] = "Getting vnf descriptors from db."
4248 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4250 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4252 db_vnfds_from_id
= {}
4253 db_vnfds_from_member_index
= {}
4255 for vnfr
in db_vnfrs_list
:
4256 vnfd_id
= vnfr
["vnfd-id"]
4257 if vnfd_id
not in db_vnfds_from_id
:
4258 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4259 db_vnfds_from_id
[vnfd_id
] = vnfd
4260 db_vnfds_from_member_index
[
4261 vnfr
["member-vnf-index-ref"]
4262 ] = db_vnfds_from_id
[vnfd_id
]
4264 # Destroy individual execution environments when there are terminating primitives.
4265 # Rest of EE will be deleted at once
4266 # TODO - check before calling _destroy_N2VC
4267 # if not operation_params.get("skip_terminate_primitives"):#
4268 # or not vca.get("needed_terminate"):
4269 stage
[0] = "Stage 2/3 execute terminating primitives."
4270 self
.logger
.debug(logging_text
+ stage
[0])
4271 stage
[1] = "Looking execution environment that needs terminate."
4272 self
.logger
.debug(logging_text
+ stage
[1])
4274 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4275 config_descriptor
= None
4276 vca_member_vnf_index
= vca
.get("member-vnf-index")
4277 vca_id
= self
.get_vca_id(
4278 db_vnfrs_dict
.get(vca_member_vnf_index
)
4279 if vca_member_vnf_index
4283 if not vca
or not vca
.get("ee_id"):
4285 if not vca
.get("member-vnf-index"):
4287 config_descriptor
= db_nsr
.get("ns-configuration")
4288 elif vca
.get("vdu_id"):
4289 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4290 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4291 elif vca
.get("kdu_name"):
4292 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4293 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4295 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4296 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4297 vca_type
= vca
.get("type")
4298 exec_terminate_primitives
= not operation_params
.get(
4299 "skip_terminate_primitives"
4300 ) and vca
.get("needed_terminate")
4301 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4302 # pending native charms
4304 True if vca_type
in ("helm", "helm-v3", "native_charm") else False
4306 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4307 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4308 task
= asyncio
.ensure_future(
4316 exec_terminate_primitives
,
4320 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4322 # wait for pending tasks of terminate primitives
4326 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4328 error_list
= await self
._wait
_for
_tasks
(
4331 min(self
.timeout
.charm_delete
, timeout_ns_terminate
),
4335 tasks_dict_info
.clear()
4337 return # raise LcmException("; ".join(error_list))
4339 # remove All execution environments at once
4340 stage
[0] = "Stage 3/3 delete all."
4342 if nsr_deployed
.get("VCA"):
4343 stage
[1] = "Deleting all execution environments."
4344 self
.logger
.debug(logging_text
+ stage
[1])
4345 vca_id
= self
.get_vca_id({}, db_nsr
)
4346 task_delete_ee
= asyncio
.ensure_future(
4348 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4349 timeout
=self
.timeout
.charm_delete
,
4352 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4353 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4355 # Delete Namespace and Certificates if necessary
4356 if check_helm_ee_in_ns(list(db_vnfds_from_member_index
.values())):
4357 await self
.vca_map
["helm-v3"].delete_tls_certificate(
4358 certificate_name
=db_nslcmop
["nsInstanceId"],
4360 # TODO: Delete namespace
4362 # Delete from k8scluster
4363 stage
[1] = "Deleting KDUs."
4364 self
.logger
.debug(logging_text
+ stage
[1])
4365 # print(nsr_deployed)
4366 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4367 if not kdu
or not kdu
.get("kdu-instance"):
4369 kdu_instance
= kdu
.get("kdu-instance")
4370 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4371 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4372 vca_id
= self
.get_vca_id({}, db_nsr
)
4373 task_delete_kdu_instance
= asyncio
.ensure_future(
4374 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4375 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4376 kdu_instance
=kdu_instance
,
4378 namespace
=kdu
.get("namespace"),
4384 + "Unknown k8s deployment type {}".format(
4385 kdu
.get("k8scluster-type")
4390 task_delete_kdu_instance
4391 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4394 stage
[1] = "Deleting ns from VIM."
4395 if self
.ro_config
.ng
:
4396 task_delete_ro
= asyncio
.ensure_future(
4397 self
._terminate
_ng
_ro
(
4398 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4401 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4403 # rest of staff will be done at finally
4406 ROclient
.ROClientException
,
4411 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4413 except asyncio
.CancelledError
:
4415 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4417 exc
= "Operation was cancelled"
4418 except Exception as e
:
4419 exc
= traceback
.format_exc()
4420 self
.logger
.critical(
4421 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4426 error_list
.append(str(exc
))
4428 # wait for pending tasks
4430 stage
[1] = "Waiting for terminate pending tasks."
4431 self
.logger
.debug(logging_text
+ stage
[1])
4432 error_list
+= await self
._wait
_for
_tasks
(
4435 timeout_ns_terminate
,
4439 stage
[1] = stage
[2] = ""
4440 except asyncio
.CancelledError
:
4441 error_list
.append("Cancelled")
4442 # TODO cancell all tasks
4443 except Exception as exc
:
4444 error_list
.append(str(exc
))
4445 # update status at database
4447 error_detail
= "; ".join(error_list
)
4448 # self.logger.error(logging_text + error_detail)
4449 error_description_nslcmop
= "{} Detail: {}".format(
4450 stage
[0], error_detail
4452 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4453 nslcmop_id
, stage
[0]
4456 db_nsr_update
["operational-status"] = "failed"
4457 db_nsr_update
["detailed-status"] = (
4458 error_description_nsr
+ " Detail: " + error_detail
4460 db_nslcmop_update
["detailed-status"] = error_detail
4461 nslcmop_operation_state
= "FAILED"
4465 error_description_nsr
= error_description_nslcmop
= None
4466 ns_state
= "NOT_INSTANTIATED"
4467 db_nsr_update
["operational-status"] = "terminated"
4468 db_nsr_update
["detailed-status"] = "Done"
4469 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4470 db_nslcmop_update
["detailed-status"] = "Done"
4471 nslcmop_operation_state
= "COMPLETED"
4474 self
._write
_ns
_status
(
4477 current_operation
="IDLE",
4478 current_operation_id
=None,
4479 error_description
=error_description_nsr
,
4480 error_detail
=error_detail
,
4481 other_update
=db_nsr_update
,
4483 self
._write
_op
_status
(
4486 error_message
=error_description_nslcmop
,
4487 operation_state
=nslcmop_operation_state
,
4488 other_update
=db_nslcmop_update
,
4490 if ns_state
== "NOT_INSTANTIATED":
4494 {"nsr-id-ref": nsr_id
},
4495 {"_admin.nsState": "NOT_INSTANTIATED"},
4497 except DbException
as e
:
4500 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4504 if operation_params
:
4505 autoremove
= operation_params
.get("autoremove", False)
4506 if nslcmop_operation_state
:
4508 await self
.msg
.aiowrite(
4513 "nslcmop_id": nslcmop_id
,
4514 "operationState": nslcmop_operation_state
,
4515 "autoremove": autoremove
,
4519 except Exception as e
:
4521 logging_text
+ "kafka_write notification Exception {}".format(e
)
4524 self
.logger
.debug(logging_text
+ "Exit")
4525 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4527 async def _wait_for_tasks(
4528 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4531 error_detail_list
= []
4533 pending_tasks
= list(created_tasks_info
.keys())
4534 num_tasks
= len(pending_tasks
)
4536 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4537 self
._write
_op
_status
(nslcmop_id
, stage
)
4538 while pending_tasks
:
4540 _timeout
= timeout
+ time_start
- time()
4541 done
, pending_tasks
= await asyncio
.wait(
4542 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4544 num_done
+= len(done
)
4545 if not done
: # Timeout
4546 for task
in pending_tasks
:
4547 new_error
= created_tasks_info
[task
] + ": Timeout"
4548 error_detail_list
.append(new_error
)
4549 error_list
.append(new_error
)
4552 if task
.cancelled():
4555 exc
= task
.exception()
4557 if isinstance(exc
, asyncio
.TimeoutError
):
4559 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4560 error_list
.append(created_tasks_info
[task
])
4561 error_detail_list
.append(new_error
)
4568 ROclient
.ROClientException
,
4574 self
.logger
.error(logging_text
+ new_error
)
4576 exc_traceback
= "".join(
4577 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4581 + created_tasks_info
[task
]
4587 logging_text
+ created_tasks_info
[task
] + ": Done"
4589 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4591 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4592 if nsr_id
: # update also nsr
4597 "errorDescription": "Error at: " + ", ".join(error_list
),
4598 "errorDetail": ". ".join(error_detail_list
),
4601 self
._write
_op
_status
(nslcmop_id
, stage
)
4602 return error_detail_list
4605 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4607 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4608 The default-value is used. If it is between < > it look for a value at instantiation_params
4609 :param primitive_desc: portion of VNFD/NSD that describes primitive
4610 :param params: Params provided by user
4611 :param instantiation_params: Instantiation params provided by user
4612 :return: a dictionary with the calculated params
4614 calculated_params
= {}
4615 for parameter
in primitive_desc
.get("parameter", ()):
4616 param_name
= parameter
["name"]
4617 if param_name
in params
:
4618 calculated_params
[param_name
] = params
[param_name
]
4619 elif "default-value" in parameter
or "value" in parameter
:
4620 if "value" in parameter
:
4621 calculated_params
[param_name
] = parameter
["value"]
4623 calculated_params
[param_name
] = parameter
["default-value"]
4625 isinstance(calculated_params
[param_name
], str)
4626 and calculated_params
[param_name
].startswith("<")
4627 and calculated_params
[param_name
].endswith(">")
4629 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4630 calculated_params
[param_name
] = instantiation_params
[
4631 calculated_params
[param_name
][1:-1]
4635 "Parameter {} needed to execute primitive {} not provided".format(
4636 calculated_params
[param_name
], primitive_desc
["name"]
4641 "Parameter {} needed to execute primitive {} not provided".format(
4642 param_name
, primitive_desc
["name"]
4646 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4647 calculated_params
[param_name
] = yaml
.safe_dump(
4648 calculated_params
[param_name
], default_flow_style
=True, width
=256
4650 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4652 ].startswith("!!yaml "):
4653 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4654 if parameter
.get("data-type") == "INTEGER":
4656 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4657 except ValueError: # error converting string to int
4659 "Parameter {} of primitive {} must be integer".format(
4660 param_name
, primitive_desc
["name"]
4663 elif parameter
.get("data-type") == "BOOLEAN":
4664 calculated_params
[param_name
] = not (
4665 (str(calculated_params
[param_name
])).lower() == "false"
4668 # add always ns_config_info if primitive name is config
4669 if primitive_desc
["name"] == "config":
4670 if "ns_config_info" in instantiation_params
:
4671 calculated_params
["ns_config_info"] = instantiation_params
[
4674 return calculated_params
4676 def _look_for_deployed_vca(
4683 ee_descriptor_id
=None,
4685 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4686 for vca
in deployed_vca
:
4689 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4692 vdu_count_index
is not None
4693 and vdu_count_index
!= vca
["vdu_count_index"]
4696 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4698 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4702 # vca_deployed not found
4704 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4705 " is not deployed".format(
4714 ee_id
= vca
.get("ee_id")
4716 "type", "lxc_proxy_charm"
4717 ) # default value for backward compatibility - proxy charm
4720 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4721 "execution environment".format(
4722 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
4725 return ee_id
, vca_type
4727 async def _ns_execute_primitive(
4733 retries_interval
=30,
4740 if primitive
== "config":
4741 primitive_params
= {"params": primitive_params
}
4743 vca_type
= vca_type
or "lxc_proxy_charm"
4747 output
= await asyncio
.wait_for(
4748 self
.vca_map
[vca_type
].exec_primitive(
4750 primitive_name
=primitive
,
4751 params_dict
=primitive_params
,
4752 progress_timeout
=self
.timeout
.progress_primitive
,
4753 total_timeout
=self
.timeout
.primitive
,
4758 timeout
=timeout
or self
.timeout
.primitive
,
4762 except asyncio
.CancelledError
:
4764 except Exception as e
:
4768 "Error executing action {} on {} -> {}".format(
4773 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
4775 if isinstance(e
, asyncio
.TimeoutError
):
4777 message
="Timed out waiting for action to complete"
4779 return "FAILED", getattr(e
, "message", repr(e
))
4781 return "COMPLETED", output
4783 except (LcmException
, asyncio
.CancelledError
):
4785 except Exception as e
:
4786 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
4788 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
4790 Updating the vca_status with latest juju information in nsrs record
4791 :param: nsr_id: Id of the nsr
4792 :param: nslcmop_id: Id of the nslcmop
4796 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
4797 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4798 vca_id
= self
.get_vca_id({}, db_nsr
)
4799 if db_nsr
["_admin"]["deployed"]["K8s"]:
4800 for _
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
4801 cluster_uuid
, kdu_instance
, cluster_type
= (
4802 k8s
["k8scluster-uuid"],
4803 k8s
["kdu-instance"],
4804 k8s
["k8scluster-type"],
4806 await self
._on
_update
_k
8s
_db
(
4807 cluster_uuid
=cluster_uuid
,
4808 kdu_instance
=kdu_instance
,
4809 filter={"_id": nsr_id
},
4811 cluster_type
=cluster_type
,
4814 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
4815 table
, filter = "nsrs", {"_id": nsr_id
}
4816 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
4817 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
4819 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
4820 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
4822 async def action(self
, nsr_id
, nslcmop_id
):
4823 # Try to lock HA task here
4824 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4825 if not task_is_locked_by_me
:
4828 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
4829 self
.logger
.debug(logging_text
+ "Enter")
4830 # get all needed from database
4834 db_nslcmop_update
= {}
4835 nslcmop_operation_state
= None
4836 error_description_nslcmop
= None
4840 # wait for any previous tasks in process
4841 step
= "Waiting for previous operations to terminate"
4842 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4844 self
._write
_ns
_status
(
4847 current_operation
="RUNNING ACTION",
4848 current_operation_id
=nslcmop_id
,
4851 step
= "Getting information from database"
4852 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4853 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4854 if db_nslcmop
["operationParams"].get("primitive_params"):
4855 db_nslcmop
["operationParams"]["primitive_params"] = json
.loads(
4856 db_nslcmop
["operationParams"]["primitive_params"]
4859 nsr_deployed
= db_nsr
["_admin"].get("deployed")
4860 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
4861 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
4862 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
4863 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
4864 primitive
= db_nslcmop
["operationParams"]["primitive"]
4865 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
4866 timeout_ns_action
= db_nslcmop
["operationParams"].get(
4867 "timeout_ns_action", self
.timeout
.primitive
4871 step
= "Getting vnfr from database"
4872 db_vnfr
= self
.db
.get_one(
4873 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
4875 if db_vnfr
.get("kdur"):
4877 for kdur
in db_vnfr
["kdur"]:
4878 if kdur
.get("additionalParams"):
4879 kdur
["additionalParams"] = json
.loads(
4880 kdur
["additionalParams"]
4882 kdur_list
.append(kdur
)
4883 db_vnfr
["kdur"] = kdur_list
4884 step
= "Getting vnfd from database"
4885 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
4887 # Sync filesystem before running a primitive
4888 self
.fs
.sync(db_vnfr
["vnfd-id"])
4890 step
= "Getting nsd from database"
4891 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
4893 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
4894 # for backward compatibility
4895 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
4896 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
4897 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
4898 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4900 # look for primitive
4901 config_primitive_desc
= descriptor_configuration
= None
4903 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
4905 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
4907 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
4909 descriptor_configuration
= db_nsd
.get("ns-configuration")
4911 if descriptor_configuration
and descriptor_configuration
.get(
4914 for config_primitive
in descriptor_configuration
["config-primitive"]:
4915 if config_primitive
["name"] == primitive
:
4916 config_primitive_desc
= config_primitive
4919 if not config_primitive_desc
:
4920 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
4922 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
4926 primitive_name
= primitive
4927 ee_descriptor_id
= None
4929 primitive_name
= config_primitive_desc
.get(
4930 "execution-environment-primitive", primitive
4932 ee_descriptor_id
= config_primitive_desc
.get(
4933 "execution-environment-ref"
4939 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
4941 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
4944 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
4946 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
4948 desc_params
= parse_yaml_strings(
4949 db_vnfr
.get("additionalParamsForVnf")
4952 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
4953 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
4954 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
4956 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
4957 actions
.add(primitive
["name"])
4958 for primitive
in kdu_configuration
.get("config-primitive", []):
4959 actions
.add(primitive
["name"])
4961 nsr_deployed
["K8s"],
4962 lambda kdu
: kdu_name
== kdu
["kdu-name"]
4963 and kdu
["member-vnf-index"] == vnf_index
,
4967 if primitive_name
in actions
4968 and kdu
["k8scluster-type"] not in ("helm-chart", "helm-chart-v3")
4972 # TODO check if ns is in a proper status
4974 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
4976 # kdur and desc_params already set from before
4977 if primitive_params
:
4978 desc_params
.update(primitive_params
)
4979 # TODO Check if we will need something at vnf level
4980 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
4982 kdu_name
== kdu
["kdu-name"]
4983 and kdu
["member-vnf-index"] == vnf_index
4988 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
4991 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
4992 msg
= "unknown k8scluster-type '{}'".format(
4993 kdu
.get("k8scluster-type")
4995 raise LcmException(msg
)
4998 "collection": "nsrs",
4999 "filter": {"_id": nsr_id
},
5000 "path": "_admin.deployed.K8s.{}".format(index
),
5004 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
5006 step
= "Executing kdu {}".format(primitive_name
)
5007 if primitive_name
== "upgrade":
5008 if desc_params
.get("kdu_model"):
5009 kdu_model
= desc_params
.get("kdu_model")
5010 del desc_params
["kdu_model"]
5012 kdu_model
= kdu
.get("kdu-model")
5013 parts
= kdu_model
.split(sep
=":")
5015 kdu_model
= parts
[0]
5016 if desc_params
.get("kdu_atomic_upgrade"):
5017 atomic_upgrade
= desc_params
.get(
5018 "kdu_atomic_upgrade"
5019 ).lower() in ("yes", "true", "1")
5020 del desc_params
["kdu_atomic_upgrade"]
5022 atomic_upgrade
= True
5024 detailed_status
= await asyncio
.wait_for(
5025 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
5026 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5027 kdu_instance
=kdu
.get("kdu-instance"),
5028 atomic
=atomic_upgrade
,
5029 kdu_model
=kdu_model
,
5032 timeout
=timeout_ns_action
,
5034 timeout
=timeout_ns_action
+ 10,
5037 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
5039 elif primitive_name
== "rollback":
5040 detailed_status
= await asyncio
.wait_for(
5041 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
5042 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5043 kdu_instance
=kdu
.get("kdu-instance"),
5046 timeout
=timeout_ns_action
,
5048 elif primitive_name
== "status":
5049 detailed_status
= await asyncio
.wait_for(
5050 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
5051 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5052 kdu_instance
=kdu
.get("kdu-instance"),
5055 timeout
=timeout_ns_action
,
5058 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
5059 kdu
["kdu-name"], nsr_id
5061 params
= self
._map
_primitive
_params
(
5062 config_primitive_desc
, primitive_params
, desc_params
5065 detailed_status
= await asyncio
.wait_for(
5066 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
5067 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5068 kdu_instance
=kdu_instance
,
5069 primitive_name
=primitive_name
,
5072 timeout
=timeout_ns_action
,
5075 timeout
=timeout_ns_action
,
5079 nslcmop_operation_state
= "COMPLETED"
5081 detailed_status
= ""
5082 nslcmop_operation_state
= "FAILED"
5084 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5085 nsr_deployed
["VCA"],
5086 member_vnf_index
=vnf_index
,
5088 vdu_count_index
=vdu_count_index
,
5089 ee_descriptor_id
=ee_descriptor_id
,
5091 for vca_index
, vca_deployed
in enumerate(
5092 db_nsr
["_admin"]["deployed"]["VCA"]
5094 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5096 "collection": "nsrs",
5097 "filter": {"_id": nsr_id
},
5098 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
5102 nslcmop_operation_state
,
5104 ) = await self
._ns
_execute
_primitive
(
5106 primitive
=primitive_name
,
5107 primitive_params
=self
._map
_primitive
_params
(
5108 config_primitive_desc
, primitive_params
, desc_params
5110 timeout
=timeout_ns_action
,
5116 db_nslcmop_update
["detailed-status"] = detailed_status
5117 error_description_nslcmop
= (
5118 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
5122 + "Done with result {} {}".format(
5123 nslcmop_operation_state
, detailed_status
5126 return # database update is called inside finally
5128 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5129 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5131 except asyncio
.CancelledError
:
5133 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5135 exc
= "Operation was cancelled"
5136 except asyncio
.TimeoutError
:
5137 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5139 except Exception as e
:
5140 exc
= traceback
.format_exc()
5141 self
.logger
.critical(
5142 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5151 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5152 nslcmop_operation_state
= "FAILED"
5154 self
._write
_ns
_status
(
5158 ], # TODO check if degraded. For the moment use previous status
5159 current_operation
="IDLE",
5160 current_operation_id
=None,
5161 # error_description=error_description_nsr,
5162 # error_detail=error_detail,
5163 other_update
=db_nsr_update
,
5166 self
._write
_op
_status
(
5169 error_message
=error_description_nslcmop
,
5170 operation_state
=nslcmop_operation_state
,
5171 other_update
=db_nslcmop_update
,
5174 if nslcmop_operation_state
:
5176 await self
.msg
.aiowrite(
5181 "nslcmop_id": nslcmop_id
,
5182 "operationState": nslcmop_operation_state
,
5186 except Exception as e
:
5188 logging_text
+ "kafka_write notification Exception {}".format(e
)
5190 self
.logger
.debug(logging_text
+ "Exit")
5191 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5192 return nslcmop_operation_state
, detailed_status
5194 async def terminate_vdus(
5195 self
, db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
5197 """This method terminates VDUs
5200 db_vnfr: VNF instance record
5201 member_vnf_index: VNF index to identify the VDUs to be removed
5202 db_nsr: NS instance record
5203 update_db_nslcmops: Nslcmop update record
5205 vca_scaling_info
= []
5206 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5207 scaling_info
["scaling_direction"] = "IN"
5208 scaling_info
["vdu-delete"] = {}
5209 scaling_info
["kdu-delete"] = {}
5210 db_vdur
= db_vnfr
.get("vdur")
5211 vdur_list
= copy(db_vdur
)
5213 for index
, vdu
in enumerate(vdur_list
):
5214 vca_scaling_info
.append(
5216 "osm_vdu_id": vdu
["vdu-id-ref"],
5217 "member-vnf-index": member_vnf_index
,
5219 "vdu_index": count_index
,
5222 scaling_info
["vdu-delete"][vdu
["vdu-id-ref"]] = count_index
5223 scaling_info
["vdu"].append(
5225 "name": vdu
.get("name") or vdu
.get("vdu-name"),
5226 "vdu_id": vdu
["vdu-id-ref"],
5230 for interface
in vdu
["interfaces"]:
5231 scaling_info
["vdu"][index
]["interface"].append(
5233 "name": interface
["name"],
5234 "ip_address": interface
["ip-address"],
5235 "mac_address": interface
.get("mac-address"),
5238 self
.logger
.info("NS update scaling info{}".format(scaling_info
))
5239 stage
[2] = "Terminating VDUs"
5240 if scaling_info
.get("vdu-delete"):
5241 # scale_process = "RO"
5242 if self
.ro_config
.ng
:
5243 await self
._scale
_ng
_ro
(
5252 async def remove_vnf(self
, nsr_id
, nslcmop_id
, vnf_instance_id
):
5253 """This method is to Remove VNF instances from NS.
5256 nsr_id: NS instance id
5257 nslcmop_id: nslcmop id of update
5258 vnf_instance_id: id of the VNF instance to be removed
5261 result: (str, str) COMPLETED/FAILED, details
5265 logging_text
= "Task ns={} update ".format(nsr_id
)
5266 check_vnfr_count
= len(self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}))
5267 self
.logger
.info("check_vnfr_count {}".format(check_vnfr_count
))
5268 if check_vnfr_count
> 1:
5269 stage
= ["", "", ""]
5270 step
= "Getting nslcmop from database"
5272 step
+ " after having waited for previous tasks to be completed"
5274 # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5275 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5276 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5277 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5278 """ db_vnfr = self.db.get_one(
5279 "vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id}) """
5281 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5282 await self
.terminate_vdus(
5291 constituent_vnfr
= db_nsr
.get("constituent-vnfr-ref")
5292 constituent_vnfr
.remove(db_vnfr
.get("_id"))
5293 db_nsr_update
["constituent-vnfr-ref"] = db_nsr
.get(
5294 "constituent-vnfr-ref"
5296 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5297 self
.db
.del_one("vnfrs", {"_id": db_vnfr
.get("_id")})
5298 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5299 return "COMPLETED", "Done"
5301 step
= "Terminate VNF Failed with"
5303 "{} Cannot terminate the last VNF in this NS.".format(
5307 except (LcmException
, asyncio
.CancelledError
):
5309 except Exception as e
:
5310 self
.logger
.debug("Error removing VNF {}".format(e
))
5311 return "FAILED", "Error removing VNF {}".format(e
)
5313 async def _ns_redeploy_vnf(
5321 """This method updates and redeploys VNF instances
5324 nsr_id: NS instance id
5325 nslcmop_id: nslcmop id
5326 db_vnfd: VNF descriptor
5327 db_vnfr: VNF instance record
5328 db_nsr: NS instance record
5331 result: (str, str) COMPLETED/FAILED, details
5335 stage
= ["", "", ""]
5336 logging_text
= "Task ns={} update ".format(nsr_id
)
5337 latest_vnfd_revision
= db_vnfd
["_admin"].get("revision")
5338 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5340 # Terminate old VNF resources
5341 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5342 await self
.terminate_vdus(
5351 # old_vnfd_id = db_vnfr["vnfd-id"]
5352 # new_db_vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
5353 new_db_vnfd
= db_vnfd
5354 # new_vnfd_ref = new_db_vnfd["id"]
5355 # new_vnfd_id = vnfd_id
5359 for cp
in new_db_vnfd
.get("ext-cpd", ()):
5361 "name": cp
.get("id"),
5362 "connection-point-id": cp
.get("int-cpd", {}).get("cpd"),
5363 "connection-point-vdu-id": cp
.get("int-cpd", {}).get("vdu-id"),
5366 new_vnfr_cp
.append(vnf_cp
)
5367 new_vdur
= update_db_nslcmops
["operationParams"]["newVdur"]
5368 # new_vdur = self._create_vdur_descriptor_from_vnfd(db_nsd, db_vnfd, old_db_vnfd, vnfd_id, db_nsr, member_vnf_index)
5369 # new_vnfr_update = {"vnfd-ref": new_vnfd_ref, "vnfd-id": new_vnfd_id, "connection-point": new_vnfr_cp, "vdur": new_vdur, "ip-address": ""}
5371 "revision": latest_vnfd_revision
,
5372 "connection-point": new_vnfr_cp
,
5376 self
.update_db_2("vnfrs", db_vnfr
["_id"], new_vnfr_update
)
5377 updated_db_vnfr
= self
.db
.get_one(
5379 {"member-vnf-index-ref": member_vnf_index
, "nsr-id-ref": nsr_id
},
5382 # Instantiate new VNF resources
5383 # update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5384 vca_scaling_info
= []
5385 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5386 scaling_info
["scaling_direction"] = "OUT"
5387 scaling_info
["vdu-create"] = {}
5388 scaling_info
["kdu-create"] = {}
5389 vdud_instantiate_list
= db_vnfd
["vdu"]
5390 for index
, vdud
in enumerate(vdud_instantiate_list
):
5391 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(vdud
, db_vnfd
)
5393 additional_params
= (
5394 self
._get
_vdu
_additional
_params
(updated_db_vnfr
, vdud
["id"])
5397 cloud_init_list
= []
5399 # TODO Information of its own ip is not available because db_vnfr is not updated.
5400 additional_params
["OSM"] = get_osm_params(
5401 updated_db_vnfr
, vdud
["id"], 1
5403 cloud_init_list
.append(
5404 self
._parse
_cloud
_init
(
5411 vca_scaling_info
.append(
5413 "osm_vdu_id": vdud
["id"],
5414 "member-vnf-index": member_vnf_index
,
5416 "vdu_index": count_index
,
5419 scaling_info
["vdu-create"][vdud
["id"]] = count_index
5420 if self
.ro_config
.ng
:
5422 "New Resources to be deployed: {}".format(scaling_info
)
5424 await self
._scale
_ng
_ro
(
5432 return "COMPLETED", "Done"
5433 except (LcmException
, asyncio
.CancelledError
):
5435 except Exception as e
:
5436 self
.logger
.debug("Error updating VNF {}".format(e
))
5437 return "FAILED", "Error updating VNF {}".format(e
)
5439 async def _ns_charm_upgrade(
5445 timeout
: float = None,
5447 """This method upgrade charms in VNF instances
5450 ee_id: Execution environment id
5451 path: Local path to the charm
5453 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
5454 timeout: (Float) Timeout for the ns update operation
5457 result: (str, str) COMPLETED/FAILED, details
5460 charm_type
= charm_type
or "lxc_proxy_charm"
5461 output
= await self
.vca_map
[charm_type
].upgrade_charm(
5465 charm_type
=charm_type
,
5466 timeout
=timeout
or self
.timeout
.ns_update
,
5470 return "COMPLETED", output
5472 except (LcmException
, asyncio
.CancelledError
):
5475 except Exception as e
:
5476 self
.logger
.debug("Error upgrading charm {}".format(path
))
5478 return "FAILED", "Error upgrading charm {}: {}".format(path
, e
)
5480 async def update(self
, nsr_id
, nslcmop_id
):
5481 """Update NS according to different update types
5483 This method performs upgrade of VNF instances then updates the revision
5484 number in VNF record
5487 nsr_id: Network service will be updated
5488 nslcmop_id: ns lcm operation id
5491 It may raise DbException, LcmException, N2VCException, K8sException
5494 # Try to lock HA task here
5495 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5496 if not task_is_locked_by_me
:
5499 logging_text
= "Task ns={} update={} ".format(nsr_id
, nslcmop_id
)
5500 self
.logger
.debug(logging_text
+ "Enter")
5502 # Set the required variables to be filled up later
5504 db_nslcmop_update
= {}
5506 nslcmop_operation_state
= None
5508 error_description_nslcmop
= ""
5510 change_type
= "updated"
5511 detailed_status
= ""
5512 member_vnf_index
= None
5515 # wait for any previous tasks in process
5516 step
= "Waiting for previous operations to terminate"
5517 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5518 self
._write
_ns
_status
(
5521 current_operation
="UPDATING",
5522 current_operation_id
=nslcmop_id
,
5525 step
= "Getting nslcmop from database"
5526 db_nslcmop
= self
.db
.get_one(
5527 "nslcmops", {"_id": nslcmop_id
}, fail_on_empty
=False
5529 update_type
= db_nslcmop
["operationParams"]["updateType"]
5531 step
= "Getting nsr from database"
5532 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5533 old_operational_status
= db_nsr
["operational-status"]
5534 db_nsr_update
["operational-status"] = "updating"
5535 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5536 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5538 if update_type
== "CHANGE_VNFPKG":
5539 # Get the input parameters given through update request
5540 vnf_instance_id
= db_nslcmop
["operationParams"][
5541 "changeVnfPackageData"
5542 ].get("vnfInstanceId")
5544 vnfd_id
= db_nslcmop
["operationParams"]["changeVnfPackageData"].get(
5547 timeout_seconds
= db_nslcmop
["operationParams"].get("timeout_ns_update")
5549 step
= "Getting vnfr from database"
5550 db_vnfr
= self
.db
.get_one(
5551 "vnfrs", {"_id": vnf_instance_id
}, fail_on_empty
=False
5554 step
= "Getting vnfds from database"
5556 latest_vnfd
= self
.db
.get_one(
5557 "vnfds", {"_id": vnfd_id
}, fail_on_empty
=False
5559 latest_vnfd_revision
= latest_vnfd
["_admin"].get("revision")
5562 current_vnf_revision
= db_vnfr
.get("revision", 1)
5563 current_vnfd
= self
.db
.get_one(
5565 {"_id": vnfd_id
+ ":" + str(current_vnf_revision
)},
5566 fail_on_empty
=False,
5568 # Charm artifact paths will be filled up later
5570 current_charm_artifact_path
,
5571 target_charm_artifact_path
,
5572 charm_artifact_paths
,
5574 ) = ([], [], [], [])
5576 step
= "Checking if revision has changed in VNFD"
5577 if current_vnf_revision
!= latest_vnfd_revision
:
5578 change_type
= "policy_updated"
5580 # There is new revision of VNFD, update operation is required
5581 current_vnfd_path
= vnfd_id
+ ":" + str(current_vnf_revision
)
5582 latest_vnfd_path
= vnfd_id
+ ":" + str(latest_vnfd_revision
)
5584 step
= "Removing the VNFD packages if they exist in the local path"
5585 shutil
.rmtree(self
.fs
.path
+ current_vnfd_path
, ignore_errors
=True)
5586 shutil
.rmtree(self
.fs
.path
+ latest_vnfd_path
, ignore_errors
=True)
5588 step
= "Get the VNFD packages from FSMongo"
5589 self
.fs
.sync(from_path
=latest_vnfd_path
)
5590 self
.fs
.sync(from_path
=current_vnfd_path
)
5593 "Get the charm-type, charm-id, ee-id if there is deployed VCA"
5595 current_base_folder
= current_vnfd
["_admin"]["storage"]
5596 latest_base_folder
= latest_vnfd
["_admin"]["storage"]
5598 for vca_index
, vca_deployed
in enumerate(
5599 get_iterable(nsr_deployed
, "VCA")
5601 vnf_index
= db_vnfr
.get("member-vnf-index-ref")
5603 # Getting charm-id and charm-type
5604 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5605 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5606 vca_type
= vca_deployed
.get("type")
5607 vdu_count_index
= vca_deployed
.get("vdu_count_index")
5610 ee_id
= vca_deployed
.get("ee_id")
5612 step
= "Getting descriptor config"
5613 if current_vnfd
.get("kdu"):
5614 search_key
= "kdu_name"
5616 search_key
= "vnfd_id"
5618 entity_id
= vca_deployed
.get(search_key
)
5620 descriptor_config
= get_configuration(
5621 current_vnfd
, entity_id
5624 if "execution-environment-list" in descriptor_config
:
5625 ee_list
= descriptor_config
.get(
5626 "execution-environment-list", []
5631 # There could be several charm used in the same VNF
5632 for ee_item
in ee_list
:
5633 if ee_item
.get("juju"):
5634 step
= "Getting charm name"
5635 charm_name
= ee_item
["juju"].get("charm")
5637 step
= "Setting Charm artifact paths"
5638 current_charm_artifact_path
.append(
5639 get_charm_artifact_path(
5640 current_base_folder
,
5643 current_vnf_revision
,
5646 target_charm_artifact_path
.append(
5647 get_charm_artifact_path(
5651 latest_vnfd_revision
,
5654 elif ee_item
.get("helm-chart"):
5655 # add chart to list and all parameters
5656 step
= "Getting helm chart name"
5657 chart_name
= ee_item
.get("helm-chart")
5659 ee_item
.get("helm-version")
5660 and ee_item
.get("helm-version") == "v2"
5664 vca_type
= "helm-v3"
5665 step
= "Setting Helm chart artifact paths"
5667 helm_artifacts
.append(
5669 "current_artifact_path": get_charm_artifact_path(
5670 current_base_folder
,
5673 current_vnf_revision
,
5675 "target_artifact_path": get_charm_artifact_path(
5679 latest_vnfd_revision
,
5682 "vca_index": vca_index
,
5683 "vdu_index": vdu_count_index
,
5687 charm_artifact_paths
= zip(
5688 current_charm_artifact_path
, target_charm_artifact_path
5691 step
= "Checking if software version has changed in VNFD"
5692 if find_software_version(current_vnfd
) != find_software_version(
5695 step
= "Checking if existing VNF has charm"
5696 for current_charm_path
, target_charm_path
in list(
5697 charm_artifact_paths
5699 if current_charm_path
:
5701 "Software version change is not supported as VNF instance {} has charm.".format(
5706 # There is no change in the charm package, then redeploy the VNF
5707 # based on new descriptor
5708 step
= "Redeploying VNF"
5709 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5710 (result
, detailed_status
) = await self
._ns
_redeploy
_vnf
(
5711 nsr_id
, nslcmop_id
, latest_vnfd
, db_vnfr
, db_nsr
5713 if result
== "FAILED":
5714 nslcmop_operation_state
= result
5715 error_description_nslcmop
= detailed_status
5716 db_nslcmop_update
["detailed-status"] = detailed_status
5719 + " step {} Done with result {} {}".format(
5720 step
, nslcmop_operation_state
, detailed_status
5725 step
= "Checking if any charm package has changed or not"
5726 for current_charm_path
, target_charm_path
in list(
5727 charm_artifact_paths
5731 and target_charm_path
5732 and self
.check_charm_hash_changed(
5733 current_charm_path
, target_charm_path
5736 step
= "Checking whether VNF uses juju bundle"
5737 if check_juju_bundle_existence(current_vnfd
):
5739 "Charm upgrade is not supported for the instance which"
5740 " uses juju-bundle: {}".format(
5741 check_juju_bundle_existence(current_vnfd
)
5745 step
= "Upgrading Charm"
5749 ) = await self
._ns
_charm
_upgrade
(
5752 charm_type
=vca_type
,
5753 path
=self
.fs
.path
+ target_charm_path
,
5754 timeout
=timeout_seconds
,
5757 if result
== "FAILED":
5758 nslcmop_operation_state
= result
5759 error_description_nslcmop
= detailed_status
5761 db_nslcmop_update
["detailed-status"] = detailed_status
5764 + " step {} Done with result {} {}".format(
5765 step
, nslcmop_operation_state
, detailed_status
5769 step
= "Updating policies"
5770 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5771 result
= "COMPLETED"
5772 detailed_status
= "Done"
5773 db_nslcmop_update
["detailed-status"] = "Done"
5776 for item
in helm_artifacts
:
5778 item
["current_artifact_path"]
5779 and item
["target_artifact_path"]
5780 and self
.check_charm_hash_changed(
5781 item
["current_artifact_path"],
5782 item
["target_artifact_path"],
5786 db_update_entry
= "_admin.deployed.VCA.{}.".format(
5789 vnfr_id
= db_vnfr
["_id"]
5790 osm_config
= {"osm": {"ns_id": nsr_id
, "vnf_id": vnfr_id
}}
5792 "collection": "nsrs",
5793 "filter": {"_id": nsr_id
},
5794 "path": db_update_entry
,
5796 vca_type
, namespace
, helm_id
= get_ee_id_parts(item
["ee_id"])
5797 await self
.vca_map
[vca_type
].upgrade_execution_environment(
5798 namespace
=namespace
,
5802 artifact_path
=item
["target_artifact_path"],
5805 vnf_id
= db_vnfr
.get("vnfd-ref")
5806 config_descriptor
= get_configuration(latest_vnfd
, vnf_id
)
5807 self
.logger
.debug("get ssh key block")
5811 ("config-access", "ssh-access", "required"),
5813 # Needed to inject a ssh key
5816 ("config-access", "ssh-access", "default-user"),
5819 "Install configuration Software, getting public ssh key"
5821 pub_key
= await self
.vca_map
[
5823 ].get_ee_ssh_public__key(
5824 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
5828 "Insert public key into VM user={} ssh_key={}".format(
5832 self
.logger
.debug(logging_text
+ step
)
5834 # wait for RO (ip-address) Insert pub_key into VM
5835 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
5845 initial_config_primitive_list
= config_descriptor
.get(
5846 "initial-config-primitive"
5848 config_primitive
= next(
5851 for p
in initial_config_primitive_list
5852 if p
["name"] == "config"
5856 if not config_primitive
:
5859 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
5861 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
5862 if db_vnfr
.get("additionalParamsForVnf"):
5863 deploy_params
.update(
5865 db_vnfr
["additionalParamsForVnf"].copy()
5868 primitive_params_
= self
._map
_primitive
_params
(
5869 config_primitive
, {}, deploy_params
5872 step
= "execute primitive '{}' params '{}'".format(
5873 config_primitive
["name"], primitive_params_
5875 self
.logger
.debug(logging_text
+ step
)
5876 await self
.vca_map
[vca_type
].exec_primitive(
5878 primitive_name
=config_primitive
["name"],
5879 params_dict
=primitive_params_
,
5885 step
= "Updating policies"
5886 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5887 detailed_status
= "Done"
5888 db_nslcmop_update
["detailed-status"] = "Done"
5890 # If nslcmop_operation_state is None, so any operation is not failed.
5891 if not nslcmop_operation_state
:
5892 nslcmop_operation_state
= "COMPLETED"
5894 # If update CHANGE_VNFPKG nslcmop_operation is successful
5895 # vnf revision need to be updated
5896 vnfr_update
["revision"] = latest_vnfd_revision
5897 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
5901 + " task Done with result {} {}".format(
5902 nslcmop_operation_state
, detailed_status
5905 elif update_type
== "REMOVE_VNF":
5906 # This part is included in https://osm.etsi.org/gerrit/11876
5907 vnf_instance_id
= db_nslcmop
["operationParams"]["removeVnfInstanceId"]
5908 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5909 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5910 step
= "Removing VNF"
5911 (result
, detailed_status
) = await self
.remove_vnf(
5912 nsr_id
, nslcmop_id
, vnf_instance_id
5914 if result
== "FAILED":
5915 nslcmop_operation_state
= result
5916 error_description_nslcmop
= detailed_status
5917 db_nslcmop_update
["detailed-status"] = detailed_status
5918 change_type
= "vnf_terminated"
5919 if not nslcmop_operation_state
:
5920 nslcmop_operation_state
= "COMPLETED"
5923 + " task Done with result {} {}".format(
5924 nslcmop_operation_state
, detailed_status
5928 elif update_type
== "OPERATE_VNF":
5929 vnf_id
= db_nslcmop
["operationParams"]["operateVnfData"][
5932 operation_type
= db_nslcmop
["operationParams"]["operateVnfData"][
5935 additional_param
= db_nslcmop
["operationParams"]["operateVnfData"][
5938 (result
, detailed_status
) = await self
.rebuild_start_stop(
5939 nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
5941 if result
== "FAILED":
5942 nslcmop_operation_state
= result
5943 error_description_nslcmop
= detailed_status
5944 db_nslcmop_update
["detailed-status"] = detailed_status
5945 if not nslcmop_operation_state
:
5946 nslcmop_operation_state
= "COMPLETED"
5949 + " task Done with result {} {}".format(
5950 nslcmop_operation_state
, detailed_status
5954 # If nslcmop_operation_state is None, so any operation is not failed.
5955 # All operations are executed in overall.
5956 if not nslcmop_operation_state
:
5957 nslcmop_operation_state
= "COMPLETED"
5958 db_nsr_update
["operational-status"] = old_operational_status
5960 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5961 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5963 except asyncio
.CancelledError
:
5965 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5967 exc
= "Operation was cancelled"
5968 except asyncio
.TimeoutError
:
5969 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5971 except Exception as e
:
5972 exc
= traceback
.format_exc()
5973 self
.logger
.critical(
5974 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5983 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5984 nslcmop_operation_state
= "FAILED"
5985 db_nsr_update
["operational-status"] = old_operational_status
5987 self
._write
_ns
_status
(
5989 ns_state
=db_nsr
["nsState"],
5990 current_operation
="IDLE",
5991 current_operation_id
=None,
5992 other_update
=db_nsr_update
,
5995 self
._write
_op
_status
(
5998 error_message
=error_description_nslcmop
,
5999 operation_state
=nslcmop_operation_state
,
6000 other_update
=db_nslcmop_update
,
6003 if nslcmop_operation_state
:
6007 "nslcmop_id": nslcmop_id
,
6008 "operationState": nslcmop_operation_state
,
6011 change_type
in ("vnf_terminated", "policy_updated")
6012 and member_vnf_index
6014 msg
.update({"vnf_member_index": member_vnf_index
})
6015 await self
.msg
.aiowrite("ns", change_type
, msg
, loop
=self
.loop
)
6016 except Exception as e
:
6018 logging_text
+ "kafka_write notification Exception {}".format(e
)
6020 self
.logger
.debug(logging_text
+ "Exit")
6021 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_update")
6022 return nslcmop_operation_state
, detailed_status
6024 async def scale(self
, nsr_id
, nslcmop_id
):
6025 # Try to lock HA task here
6026 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
6027 if not task_is_locked_by_me
:
6030 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
6031 stage
= ["", "", ""]
6032 tasks_dict_info
= {}
6033 # ^ stage, step, VIM progress
6034 self
.logger
.debug(logging_text
+ "Enter")
6035 # get all needed from database
6037 db_nslcmop_update
= {}
6040 # in case of error, indicates what part of scale was failed to put nsr at error status
6041 scale_process
= None
6042 old_operational_status
= ""
6043 old_config_status
= ""
6046 # wait for any previous tasks in process
6047 step
= "Waiting for previous operations to terminate"
6048 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
6049 self
._write
_ns
_status
(
6052 current_operation
="SCALING",
6053 current_operation_id
=nslcmop_id
,
6056 step
= "Getting nslcmop from database"
6058 step
+ " after having waited for previous tasks to be completed"
6060 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
6062 step
= "Getting nsr from database"
6063 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
6064 old_operational_status
= db_nsr
["operational-status"]
6065 old_config_status
= db_nsr
["config-status"]
6067 step
= "Parsing scaling parameters"
6068 db_nsr_update
["operational-status"] = "scaling"
6069 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6070 nsr_deployed
= db_nsr
["_admin"].get("deployed")
6072 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
6074 ]["member-vnf-index"]
6075 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
6077 ]["scaling-group-descriptor"]
6078 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
6079 # for backward compatibility
6080 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
6081 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
6082 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
6083 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6085 step
= "Getting vnfr from database"
6086 db_vnfr
= self
.db
.get_one(
6087 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
6090 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
6092 step
= "Getting vnfd from database"
6093 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
6095 base_folder
= db_vnfd
["_admin"]["storage"]
6097 step
= "Getting scaling-group-descriptor"
6098 scaling_descriptor
= find_in_list(
6099 get_scaling_aspect(db_vnfd
),
6100 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
6102 if not scaling_descriptor
:
6104 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
6105 "at vnfd:scaling-group-descriptor".format(scaling_group
)
6108 step
= "Sending scale order to VIM"
6109 # TODO check if ns is in a proper status
6111 if not db_nsr
["_admin"].get("scaling-group"):
6116 "_admin.scaling-group": [
6117 {"name": scaling_group
, "nb-scale-op": 0}
6121 admin_scale_index
= 0
6123 for admin_scale_index
, admin_scale_info
in enumerate(
6124 db_nsr
["_admin"]["scaling-group"]
6126 if admin_scale_info
["name"] == scaling_group
:
6127 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
6129 else: # not found, set index one plus last element and add new entry with the name
6130 admin_scale_index
+= 1
6132 "_admin.scaling-group.{}.name".format(admin_scale_index
)
6135 vca_scaling_info
= []
6136 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
6137 if scaling_type
== "SCALE_OUT":
6138 if "aspect-delta-details" not in scaling_descriptor
:
6140 "Aspect delta details not fount in scaling descriptor {}".format(
6141 scaling_descriptor
["name"]
6144 # count if max-instance-count is reached
6145 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6147 scaling_info
["scaling_direction"] = "OUT"
6148 scaling_info
["vdu-create"] = {}
6149 scaling_info
["kdu-create"] = {}
6150 for delta
in deltas
:
6151 for vdu_delta
in delta
.get("vdu-delta", {}):
6152 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
6153 # vdu_index also provides the number of instance of the targeted vdu
6154 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6155 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
6159 additional_params
= (
6160 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
6163 cloud_init_list
= []
6165 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6166 max_instance_count
= 10
6167 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
6168 max_instance_count
= vdu_profile
.get(
6169 "max-number-of-instances", 10
6172 default_instance_num
= get_number_of_instances(
6175 instances_number
= vdu_delta
.get("number-of-instances", 1)
6176 nb_scale_op
+= instances_number
6178 new_instance_count
= nb_scale_op
+ default_instance_num
6179 # Control if new count is over max and vdu count is less than max.
6180 # Then assign new instance count
6181 if new_instance_count
> max_instance_count
> vdu_count
:
6182 instances_number
= new_instance_count
- max_instance_count
6184 instances_number
= instances_number
6186 if new_instance_count
> max_instance_count
:
6188 "reached the limit of {} (max-instance-count) "
6189 "scaling-out operations for the "
6190 "scaling-group-descriptor '{}'".format(
6191 nb_scale_op
, scaling_group
6194 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6196 # TODO Information of its own ip is not available because db_vnfr is not updated.
6197 additional_params
["OSM"] = get_osm_params(
6198 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
6200 cloud_init_list
.append(
6201 self
._parse
_cloud
_init
(
6208 vca_scaling_info
.append(
6210 "osm_vdu_id": vdu_delta
["id"],
6211 "member-vnf-index": vnf_index
,
6213 "vdu_index": vdu_index
+ x
,
6216 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
6217 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6218 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6219 kdu_name
= kdu_profile
["kdu-name"]
6220 resource_name
= kdu_profile
.get("resource-name", "")
6222 # Might have different kdus in the same delta
6223 # Should have list for each kdu
6224 if not scaling_info
["kdu-create"].get(kdu_name
, None):
6225 scaling_info
["kdu-create"][kdu_name
] = []
6227 kdur
= get_kdur(db_vnfr
, kdu_name
)
6228 if kdur
.get("helm-chart"):
6229 k8s_cluster_type
= "helm-chart-v3"
6230 self
.logger
.debug("kdur: {}".format(kdur
))
6232 kdur
.get("helm-version")
6233 and kdur
.get("helm-version") == "v2"
6235 k8s_cluster_type
= "helm-chart"
6236 elif kdur
.get("juju-bundle"):
6237 k8s_cluster_type
= "juju-bundle"
6240 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6241 "juju-bundle. Maybe an old NBI version is running".format(
6242 db_vnfr
["member-vnf-index-ref"], kdu_name
6246 max_instance_count
= 10
6247 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
6248 max_instance_count
= kdu_profile
.get(
6249 "max-number-of-instances", 10
6252 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
6253 deployed_kdu
, _
= get_deployed_kdu(
6254 nsr_deployed
, kdu_name
, vnf_index
6256 if deployed_kdu
is None:
6258 "KDU '{}' for vnf '{}' not deployed".format(
6262 kdu_instance
= deployed_kdu
.get("kdu-instance")
6263 instance_num
= await self
.k8scluster_map
[
6269 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6270 kdu_model
=deployed_kdu
.get("kdu-model"),
6272 kdu_replica_count
= instance_num
+ kdu_delta
.get(
6273 "number-of-instances", 1
6276 # Control if new count is over max and instance_num is less than max.
6277 # Then assign max instance number to kdu replica count
6278 if kdu_replica_count
> max_instance_count
> instance_num
:
6279 kdu_replica_count
= max_instance_count
6280 if kdu_replica_count
> max_instance_count
:
6282 "reached the limit of {} (max-instance-count) "
6283 "scaling-out operations for the "
6284 "scaling-group-descriptor '{}'".format(
6285 instance_num
, scaling_group
6289 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6290 vca_scaling_info
.append(
6292 "osm_kdu_id": kdu_name
,
6293 "member-vnf-index": vnf_index
,
6295 "kdu_index": instance_num
+ x
- 1,
6298 scaling_info
["kdu-create"][kdu_name
].append(
6300 "member-vnf-index": vnf_index
,
6302 "k8s-cluster-type": k8s_cluster_type
,
6303 "resource-name": resource_name
,
6304 "scale": kdu_replica_count
,
6307 elif scaling_type
== "SCALE_IN":
6308 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6310 scaling_info
["scaling_direction"] = "IN"
6311 scaling_info
["vdu-delete"] = {}
6312 scaling_info
["kdu-delete"] = {}
6314 for delta
in deltas
:
6315 for vdu_delta
in delta
.get("vdu-delta", {}):
6316 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6317 min_instance_count
= 0
6318 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6319 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
6320 min_instance_count
= vdu_profile
["min-number-of-instances"]
6322 default_instance_num
= get_number_of_instances(
6323 db_vnfd
, vdu_delta
["id"]
6325 instance_num
= vdu_delta
.get("number-of-instances", 1)
6326 nb_scale_op
-= instance_num
6328 new_instance_count
= nb_scale_op
+ default_instance_num
6330 if new_instance_count
< min_instance_count
< vdu_count
:
6331 instances_number
= min_instance_count
- new_instance_count
6333 instances_number
= instance_num
6335 if new_instance_count
< min_instance_count
:
6337 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6338 "scaling-group-descriptor '{}'".format(
6339 nb_scale_op
, scaling_group
6342 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6343 vca_scaling_info
.append(
6345 "osm_vdu_id": vdu_delta
["id"],
6346 "member-vnf-index": vnf_index
,
6348 "vdu_index": vdu_index
- 1 - x
,
6351 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
6352 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6353 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6354 kdu_name
= kdu_profile
["kdu-name"]
6355 resource_name
= kdu_profile
.get("resource-name", "")
6357 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
6358 scaling_info
["kdu-delete"][kdu_name
] = []
6360 kdur
= get_kdur(db_vnfr
, kdu_name
)
6361 if kdur
.get("helm-chart"):
6362 k8s_cluster_type
= "helm-chart-v3"
6363 self
.logger
.debug("kdur: {}".format(kdur
))
6365 kdur
.get("helm-version")
6366 and kdur
.get("helm-version") == "v2"
6368 k8s_cluster_type
= "helm-chart"
6369 elif kdur
.get("juju-bundle"):
6370 k8s_cluster_type
= "juju-bundle"
6373 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6374 "juju-bundle. Maybe an old NBI version is running".format(
6375 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
6379 min_instance_count
= 0
6380 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
6381 min_instance_count
= kdu_profile
["min-number-of-instances"]
6383 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
6384 deployed_kdu
, _
= get_deployed_kdu(
6385 nsr_deployed
, kdu_name
, vnf_index
6387 if deployed_kdu
is None:
6389 "KDU '{}' for vnf '{}' not deployed".format(
6393 kdu_instance
= deployed_kdu
.get("kdu-instance")
6394 instance_num
= await self
.k8scluster_map
[
6400 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6401 kdu_model
=deployed_kdu
.get("kdu-model"),
6403 kdu_replica_count
= instance_num
- kdu_delta
.get(
6404 "number-of-instances", 1
6407 if kdu_replica_count
< min_instance_count
< instance_num
:
6408 kdu_replica_count
= min_instance_count
6409 if kdu_replica_count
< min_instance_count
:
6411 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6412 "scaling-group-descriptor '{}'".format(
6413 instance_num
, scaling_group
6417 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6418 vca_scaling_info
.append(
6420 "osm_kdu_id": kdu_name
,
6421 "member-vnf-index": vnf_index
,
6423 "kdu_index": instance_num
- x
- 1,
6426 scaling_info
["kdu-delete"][kdu_name
].append(
6428 "member-vnf-index": vnf_index
,
6430 "k8s-cluster-type": k8s_cluster_type
,
6431 "resource-name": resource_name
,
6432 "scale": kdu_replica_count
,
6436 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
6437 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
6438 if scaling_info
["scaling_direction"] == "IN":
6439 for vdur
in reversed(db_vnfr
["vdur"]):
6440 if vdu_delete
.get(vdur
["vdu-id-ref"]):
6441 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
6442 scaling_info
["vdu"].append(
6444 "name": vdur
.get("name") or vdur
.get("vdu-name"),
6445 "vdu_id": vdur
["vdu-id-ref"],
6449 for interface
in vdur
["interfaces"]:
6450 scaling_info
["vdu"][-1]["interface"].append(
6452 "name": interface
["name"],
6453 "ip_address": interface
["ip-address"],
6454 "mac_address": interface
.get("mac-address"),
6457 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
6460 step
= "Executing pre-scale vnf-config-primitive"
6461 if scaling_descriptor
.get("scaling-config-action"):
6462 for scaling_config_action
in scaling_descriptor
[
6463 "scaling-config-action"
6466 scaling_config_action
.get("trigger") == "pre-scale-in"
6467 and scaling_type
== "SCALE_IN"
6469 scaling_config_action
.get("trigger") == "pre-scale-out"
6470 and scaling_type
== "SCALE_OUT"
6472 vnf_config_primitive
= scaling_config_action
[
6473 "vnf-config-primitive-name-ref"
6475 step
= db_nslcmop_update
[
6477 ] = "executing pre-scale scaling-config-action '{}'".format(
6478 vnf_config_primitive
6481 # look for primitive
6482 for config_primitive
in (
6483 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6484 ).get("config-primitive", ()):
6485 if config_primitive
["name"] == vnf_config_primitive
:
6489 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
6490 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
6491 "primitive".format(scaling_group
, vnf_config_primitive
)
6494 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6495 if db_vnfr
.get("additionalParamsForVnf"):
6496 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6498 scale_process
= "VCA"
6499 db_nsr_update
["config-status"] = "configuring pre-scaling"
6500 primitive_params
= self
._map
_primitive
_params
(
6501 config_primitive
, {}, vnfr_params
6504 # Pre-scale retry check: Check if this sub-operation has been executed before
6505 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6508 vnf_config_primitive
,
6512 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6513 # Skip sub-operation
6514 result
= "COMPLETED"
6515 result_detail
= "Done"
6518 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6519 vnf_config_primitive
, result
, result_detail
6523 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6524 # New sub-operation: Get index of this sub-operation
6526 len(db_nslcmop
.get("_admin", {}).get("operations"))
6531 + "vnf_config_primitive={} New sub-operation".format(
6532 vnf_config_primitive
6536 # retry: Get registered params for this existing sub-operation
6537 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6540 vnf_index
= op
.get("member_vnf_index")
6541 vnf_config_primitive
= op
.get("primitive")
6542 primitive_params
= op
.get("primitive_params")
6545 + "vnf_config_primitive={} Sub-operation retry".format(
6546 vnf_config_primitive
6549 # Execute the primitive, either with new (first-time) or registered (reintent) args
6550 ee_descriptor_id
= config_primitive
.get(
6551 "execution-environment-ref"
6553 primitive_name
= config_primitive
.get(
6554 "execution-environment-primitive", vnf_config_primitive
6556 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6557 nsr_deployed
["VCA"],
6558 member_vnf_index
=vnf_index
,
6560 vdu_count_index
=None,
6561 ee_descriptor_id
=ee_descriptor_id
,
6563 result
, result_detail
= await self
._ns
_execute
_primitive
(
6572 + "vnf_config_primitive={} Done with result {} {}".format(
6573 vnf_config_primitive
, result
, result_detail
6576 # Update operationState = COMPLETED | FAILED
6577 self
._update
_suboperation
_status
(
6578 db_nslcmop
, op_index
, result
, result_detail
6581 if result
== "FAILED":
6582 raise LcmException(result_detail
)
6583 db_nsr_update
["config-status"] = old_config_status
6584 scale_process
= None
6588 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
6591 "_admin.scaling-group.{}.time".format(admin_scale_index
)
6594 # SCALE-IN VCA - BEGIN
6595 if vca_scaling_info
:
6596 step
= db_nslcmop_update
[
6598 ] = "Deleting the execution environments"
6599 scale_process
= "VCA"
6600 for vca_info
in vca_scaling_info
:
6601 if vca_info
["type"] == "delete" and not vca_info
.get("osm_kdu_id"):
6602 member_vnf_index
= str(vca_info
["member-vnf-index"])
6604 logging_text
+ "vdu info: {}".format(vca_info
)
6606 if vca_info
.get("osm_vdu_id"):
6607 vdu_id
= vca_info
["osm_vdu_id"]
6608 vdu_index
= int(vca_info
["vdu_index"])
6611 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6612 member_vnf_index
, vdu_id
, vdu_index
6614 stage
[2] = step
= "Scaling in VCA"
6615 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6616 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
6617 config_update
= db_nsr
["configurationStatus"]
6618 for vca_index
, vca
in enumerate(vca_update
):
6620 (vca
or vca
.get("ee_id"))
6621 and vca
["member-vnf-index"] == member_vnf_index
6622 and vca
["vdu_count_index"] == vdu_index
6624 if vca
.get("vdu_id"):
6625 config_descriptor
= get_configuration(
6626 db_vnfd
, vca
.get("vdu_id")
6628 elif vca
.get("kdu_name"):
6629 config_descriptor
= get_configuration(
6630 db_vnfd
, vca
.get("kdu_name")
6633 config_descriptor
= get_configuration(
6634 db_vnfd
, db_vnfd
["id"]
6636 operation_params
= (
6637 db_nslcmop
.get("operationParams") or {}
6639 exec_terminate_primitives
= not operation_params
.get(
6640 "skip_terminate_primitives"
6641 ) and vca
.get("needed_terminate")
6642 task
= asyncio
.ensure_future(
6651 exec_primitives
=exec_terminate_primitives
,
6655 timeout
=self
.timeout
.charm_delete
,
6658 tasks_dict_info
[task
] = "Terminating VCA {}".format(
6661 del vca_update
[vca_index
]
6662 del config_update
[vca_index
]
6663 # wait for pending tasks of terminate primitives
6667 + "Waiting for tasks {}".format(
6668 list(tasks_dict_info
.keys())
6671 error_list
= await self
._wait
_for
_tasks
(
6675 self
.timeout
.charm_delete
, self
.timeout
.ns_terminate
6680 tasks_dict_info
.clear()
6682 raise LcmException("; ".join(error_list
))
6684 db_vca_and_config_update
= {
6685 "_admin.deployed.VCA": vca_update
,
6686 "configurationStatus": config_update
,
6689 "nsrs", db_nsr
["_id"], db_vca_and_config_update
6691 scale_process
= None
6692 # SCALE-IN VCA - END
6695 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
6696 scale_process
= "RO"
6697 if self
.ro_config
.ng
:
6698 await self
._scale
_ng
_ro
(
6699 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
6701 scaling_info
.pop("vdu-create", None)
6702 scaling_info
.pop("vdu-delete", None)
6704 scale_process
= None
6708 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
6709 scale_process
= "KDU"
6710 await self
._scale
_kdu
(
6711 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6713 scaling_info
.pop("kdu-create", None)
6714 scaling_info
.pop("kdu-delete", None)
6716 scale_process
= None
6720 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6722 # SCALE-UP VCA - BEGIN
6723 if vca_scaling_info
:
6724 step
= db_nslcmop_update
[
6726 ] = "Creating new execution environments"
6727 scale_process
= "VCA"
6728 for vca_info
in vca_scaling_info
:
6729 if vca_info
["type"] == "create" and not vca_info
.get("osm_kdu_id"):
6730 member_vnf_index
= str(vca_info
["member-vnf-index"])
6732 logging_text
+ "vdu info: {}".format(vca_info
)
6734 vnfd_id
= db_vnfr
["vnfd-ref"]
6735 if vca_info
.get("osm_vdu_id"):
6736 vdu_index
= int(vca_info
["vdu_index"])
6737 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
6738 if db_vnfr
.get("additionalParamsForVnf"):
6739 deploy_params
.update(
6741 db_vnfr
["additionalParamsForVnf"].copy()
6744 descriptor_config
= get_configuration(
6745 db_vnfd
, db_vnfd
["id"]
6747 if descriptor_config
:
6753 logging_text
=logging_text
6754 + "member_vnf_index={} ".format(member_vnf_index
),
6757 nslcmop_id
=nslcmop_id
,
6763 kdu_index
=kdu_index
,
6764 member_vnf_index
=member_vnf_index
,
6765 vdu_index
=vdu_index
,
6767 deploy_params
=deploy_params
,
6768 descriptor_config
=descriptor_config
,
6769 base_folder
=base_folder
,
6770 task_instantiation_info
=tasks_dict_info
,
6773 vdu_id
= vca_info
["osm_vdu_id"]
6774 vdur
= find_in_list(
6775 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
6777 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
6778 if vdur
.get("additionalParams"):
6779 deploy_params_vdu
= parse_yaml_strings(
6780 vdur
["additionalParams"]
6783 deploy_params_vdu
= deploy_params
6784 deploy_params_vdu
["OSM"] = get_osm_params(
6785 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
6787 if descriptor_config
:
6793 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6794 member_vnf_index
, vdu_id
, vdu_index
6796 stage
[2] = step
= "Scaling out VCA"
6797 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6799 logging_text
=logging_text
6800 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6801 member_vnf_index
, vdu_id
, vdu_index
6805 nslcmop_id
=nslcmop_id
,
6811 member_vnf_index
=member_vnf_index
,
6812 vdu_index
=vdu_index
,
6813 kdu_index
=kdu_index
,
6815 deploy_params
=deploy_params_vdu
,
6816 descriptor_config
=descriptor_config
,
6817 base_folder
=base_folder
,
6818 task_instantiation_info
=tasks_dict_info
,
6821 # SCALE-UP VCA - END
6822 scale_process
= None
6825 # execute primitive service POST-SCALING
6826 step
= "Executing post-scale vnf-config-primitive"
6827 if scaling_descriptor
.get("scaling-config-action"):
6828 for scaling_config_action
in scaling_descriptor
[
6829 "scaling-config-action"
6832 scaling_config_action
.get("trigger") == "post-scale-in"
6833 and scaling_type
== "SCALE_IN"
6835 scaling_config_action
.get("trigger") == "post-scale-out"
6836 and scaling_type
== "SCALE_OUT"
6838 vnf_config_primitive
= scaling_config_action
[
6839 "vnf-config-primitive-name-ref"
6841 step
= db_nslcmop_update
[
6843 ] = "executing post-scale scaling-config-action '{}'".format(
6844 vnf_config_primitive
6847 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6848 if db_vnfr
.get("additionalParamsForVnf"):
6849 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6851 # look for primitive
6852 for config_primitive
in (
6853 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6854 ).get("config-primitive", ()):
6855 if config_primitive
["name"] == vnf_config_primitive
:
6859 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
6860 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
6861 "config-primitive".format(
6862 scaling_group
, vnf_config_primitive
6865 scale_process
= "VCA"
6866 db_nsr_update
["config-status"] = "configuring post-scaling"
6867 primitive_params
= self
._map
_primitive
_params
(
6868 config_primitive
, {}, vnfr_params
6871 # Post-scale retry check: Check if this sub-operation has been executed before
6872 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6875 vnf_config_primitive
,
6879 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6880 # Skip sub-operation
6881 result
= "COMPLETED"
6882 result_detail
= "Done"
6885 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6886 vnf_config_primitive
, result
, result_detail
6890 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6891 # New sub-operation: Get index of this sub-operation
6893 len(db_nslcmop
.get("_admin", {}).get("operations"))
6898 + "vnf_config_primitive={} New sub-operation".format(
6899 vnf_config_primitive
6903 # retry: Get registered params for this existing sub-operation
6904 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6907 vnf_index
= op
.get("member_vnf_index")
6908 vnf_config_primitive
= op
.get("primitive")
6909 primitive_params
= op
.get("primitive_params")
6912 + "vnf_config_primitive={} Sub-operation retry".format(
6913 vnf_config_primitive
6916 # Execute the primitive, either with new (first-time) or registered (reintent) args
6917 ee_descriptor_id
= config_primitive
.get(
6918 "execution-environment-ref"
6920 primitive_name
= config_primitive
.get(
6921 "execution-environment-primitive", vnf_config_primitive
6923 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6924 nsr_deployed
["VCA"],
6925 member_vnf_index
=vnf_index
,
6927 vdu_count_index
=None,
6928 ee_descriptor_id
=ee_descriptor_id
,
6930 result
, result_detail
= await self
._ns
_execute
_primitive
(
6939 + "vnf_config_primitive={} Done with result {} {}".format(
6940 vnf_config_primitive
, result
, result_detail
6943 # Update operationState = COMPLETED | FAILED
6944 self
._update
_suboperation
_status
(
6945 db_nslcmop
, op_index
, result
, result_detail
6948 if result
== "FAILED":
6949 raise LcmException(result_detail
)
6950 db_nsr_update
["config-status"] = old_config_status
6951 scale_process
= None
6956 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
6957 db_nsr_update
["operational-status"] = (
6959 if old_operational_status
== "failed"
6960 else old_operational_status
6962 db_nsr_update
["config-status"] = old_config_status
6965 ROclient
.ROClientException
,
6970 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
6972 except asyncio
.CancelledError
:
6974 logging_text
+ "Cancelled Exception while '{}'".format(step
)
6976 exc
= "Operation was cancelled"
6977 except Exception as e
:
6978 exc
= traceback
.format_exc()
6979 self
.logger
.critical(
6980 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6984 self
._write
_ns
_status
(
6987 current_operation
="IDLE",
6988 current_operation_id
=None,
6991 stage
[1] = "Waiting for instantiate pending tasks."
6992 self
.logger
.debug(logging_text
+ stage
[1])
6993 exc
= await self
._wait
_for
_tasks
(
6996 self
.timeout
.ns_deploy
,
7004 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7005 nslcmop_operation_state
= "FAILED"
7007 db_nsr_update
["operational-status"] = old_operational_status
7008 db_nsr_update
["config-status"] = old_config_status
7009 db_nsr_update
["detailed-status"] = ""
7011 if "VCA" in scale_process
:
7012 db_nsr_update
["config-status"] = "failed"
7013 if "RO" in scale_process
:
7014 db_nsr_update
["operational-status"] = "failed"
7017 ] = "FAILED scaling nslcmop={} {}: {}".format(
7018 nslcmop_id
, step
, exc
7021 error_description_nslcmop
= None
7022 nslcmop_operation_state
= "COMPLETED"
7023 db_nslcmop_update
["detailed-status"] = "Done"
7025 self
._write
_op
_status
(
7028 error_message
=error_description_nslcmop
,
7029 operation_state
=nslcmop_operation_state
,
7030 other_update
=db_nslcmop_update
,
7033 self
._write
_ns
_status
(
7036 current_operation
="IDLE",
7037 current_operation_id
=None,
7038 other_update
=db_nsr_update
,
7041 if nslcmop_operation_state
:
7045 "nslcmop_id": nslcmop_id
,
7046 "operationState": nslcmop_operation_state
,
7048 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
7049 except Exception as e
:
7051 logging_text
+ "kafka_write notification Exception {}".format(e
)
7053 self
.logger
.debug(logging_text
+ "Exit")
7054 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
7056 async def _scale_kdu(
7057 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
7059 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
7060 for kdu_name
in _scaling_info
:
7061 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
7062 deployed_kdu
, index
= get_deployed_kdu(
7063 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
7065 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
7066 kdu_instance
= deployed_kdu
["kdu-instance"]
7067 kdu_model
= deployed_kdu
.get("kdu-model")
7068 scale
= int(kdu_scaling_info
["scale"])
7069 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
7072 "collection": "nsrs",
7073 "filter": {"_id": nsr_id
},
7074 "path": "_admin.deployed.K8s.{}".format(index
),
7077 step
= "scaling application {}".format(
7078 kdu_scaling_info
["resource-name"]
7080 self
.logger
.debug(logging_text
+ step
)
7082 if kdu_scaling_info
["type"] == "delete":
7083 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7086 and kdu_config
.get("terminate-config-primitive")
7087 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7089 terminate_config_primitive_list
= kdu_config
.get(
7090 "terminate-config-primitive"
7092 terminate_config_primitive_list
.sort(
7093 key
=lambda val
: int(val
["seq"])
7097 terminate_config_primitive
7098 ) in terminate_config_primitive_list
:
7099 primitive_params_
= self
._map
_primitive
_params
(
7100 terminate_config_primitive
, {}, {}
7102 step
= "execute terminate config primitive"
7103 self
.logger
.debug(logging_text
+ step
)
7104 await asyncio
.wait_for(
7105 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7106 cluster_uuid
=cluster_uuid
,
7107 kdu_instance
=kdu_instance
,
7108 primitive_name
=terminate_config_primitive
["name"],
7109 params
=primitive_params_
,
7111 total_timeout
=self
.timeout
.primitive
,
7114 timeout
=self
.timeout
.primitive
7115 * self
.timeout
.primitive_outer_factor
,
7118 await asyncio
.wait_for(
7119 self
.k8scluster_map
[k8s_cluster_type
].scale(
7120 kdu_instance
=kdu_instance
,
7122 resource_name
=kdu_scaling_info
["resource-name"],
7123 total_timeout
=self
.timeout
.scale_on_error
,
7125 cluster_uuid
=cluster_uuid
,
7126 kdu_model
=kdu_model
,
7130 timeout
=self
.timeout
.scale_on_error
7131 * self
.timeout
.scale_on_error_outer_factor
,
7134 if kdu_scaling_info
["type"] == "create":
7135 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7138 and kdu_config
.get("initial-config-primitive")
7139 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7141 initial_config_primitive_list
= kdu_config
.get(
7142 "initial-config-primitive"
7144 initial_config_primitive_list
.sort(
7145 key
=lambda val
: int(val
["seq"])
7148 for initial_config_primitive
in initial_config_primitive_list
:
7149 primitive_params_
= self
._map
_primitive
_params
(
7150 initial_config_primitive
, {}, {}
7152 step
= "execute initial config primitive"
7153 self
.logger
.debug(logging_text
+ step
)
7154 await asyncio
.wait_for(
7155 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7156 cluster_uuid
=cluster_uuid
,
7157 kdu_instance
=kdu_instance
,
7158 primitive_name
=initial_config_primitive
["name"],
7159 params
=primitive_params_
,
7166 async def _scale_ng_ro(
7167 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
7169 nsr_id
= db_nslcmop
["nsInstanceId"]
7170 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7173 # read from db: vnfd's for every vnf
7176 # for each vnf in ns, read vnfd
7177 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
7178 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
7179 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
7180 # if we haven't this vnfd, read it from db
7181 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
7183 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7184 db_vnfds
.append(vnfd
)
7185 n2vc_key
= self
.n2vc
.get_public_key()
7186 n2vc_key_list
= [n2vc_key
]
7189 vdu_scaling_info
.get("vdu-create"),
7190 vdu_scaling_info
.get("vdu-delete"),
7193 # db_vnfr has been updated, update db_vnfrs to use it
7194 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
7195 await self
._instantiate
_ng
_ro
(
7205 start_deploy
=time(),
7206 timeout_ns_deploy
=self
.timeout
.ns_deploy
,
7208 if vdu_scaling_info
.get("vdu-delete"):
7210 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
7213 async def extract_prometheus_scrape_jobs(
7217 ee_config_descriptor
: dict,
7222 vnf_member_index
: str = "",
7224 vdu_index
: int = None,
7226 kdu_index
: int = None,
7228 """Method to extract prometheus scrape jobs from EE's Prometheus template job file
7229 This method will wait until the corresponding VDU or KDU is fully instantiated
7232 ee_id (str): Execution Environment ID
7233 artifact_path (str): Path where the EE's content is (including the Prometheus template file)
7234 ee_config_descriptor (dict): Execution Environment's configuration descriptor
7235 vnfr_id (str): VNFR ID where this EE applies
7236 nsr_id (str): NSR ID where this EE applies
7237 target_ip (str): VDU/KDU instance IP address
7238 element_type (str): NS or VNF or VDU or KDU
7239 vnf_member_index (str, optional): VNF index where this EE applies. Defaults to "".
7240 vdu_id (str, optional): VDU ID where this EE applies. Defaults to "".
7241 vdu_index (int, optional): VDU index where this EE applies. Defaults to None.
7242 kdu_name (str, optional): KDU name where this EE applies. Defaults to "".
7243 kdu_index (int, optional): KDU index where this EE applies. Defaults to None.
7246 LcmException: When the VDU or KDU instance was not found in an hour
7249 _type_: Prometheus jobs
7251 # default the vdur and kdur names to an empty string, to avoid any later
7252 # problem with Prometheus when the element type is not VDU or KDU
7256 # look if exist a file called 'prometheus*.j2' and
7257 artifact_content
= self
.fs
.dir_ls(artifact_path
)
7261 for f
in artifact_content
7262 if f
.startswith("prometheus") and f
.endswith(".j2")
7268 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
7271 # obtain the VDUR or KDUR, if the element type is VDU or KDU
7272 if element_type
in ("VDU", "KDU"):
7273 for _
in range(360):
7274 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
7275 if vdu_id
and vdu_index
is not None:
7279 for x
in get_iterable(db_vnfr
, "vdur")
7281 x
.get("vdu-id-ref") == vdu_id
7282 and x
.get("count-index") == vdu_index
7287 if vdur
.get("name"):
7288 vdur_name
= vdur
.get("name")
7290 if kdu_name
and kdu_index
is not None:
7294 for x
in get_iterable(db_vnfr
, "kdur")
7296 x
.get("kdu-name") == kdu_name
7297 and x
.get("count-index") == kdu_index
7302 if kdur
.get("name"):
7303 kdur_name
= kdur
.get("name")
7306 await asyncio
.sleep(10, loop
=self
.loop
)
7308 if vdu_id
and vdu_index
is not None:
7310 f
"Timeout waiting VDU with name={vdu_id} and index={vdu_index} to be intantiated"
7312 if kdu_name
and kdu_index
is not None:
7314 f
"Timeout waiting KDU with name={kdu_name} and index={kdu_index} to be intantiated"
7318 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
7319 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
7321 vnfr_id
= vnfr_id
.replace("-", "")
7323 "JOB_NAME": vnfr_id
,
7324 "TARGET_IP": target_ip
,
7325 "EXPORTER_POD_IP": host_name
,
7326 "EXPORTER_POD_PORT": host_port
,
7328 "VNF_MEMBER_INDEX": vnf_member_index
,
7329 "VDUR_NAME": vdur_name
,
7330 "KDUR_NAME": kdur_name
,
7331 "ELEMENT_TYPE": element_type
,
7333 job_list
= parse_job(job_data
, variables
)
7334 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
7335 for job
in job_list
:
7337 not isinstance(job
.get("job_name"), str)
7338 or vnfr_id
not in job
["job_name"]
7340 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
7341 job
["nsr_id"] = nsr_id
7342 job
["vnfr_id"] = vnfr_id
7345 async def rebuild_start_stop(
7346 self
, nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
7348 logging_text
= "Task ns={} {}={} ".format(nsr_id
, operation_type
, nslcmop_id
)
7349 self
.logger
.info(logging_text
+ "Enter")
7350 stage
= ["Preparing the environment", ""]
7351 # database nsrs record
7355 # in case of error, indicates what part of scale was failed to put nsr at error status
7356 start_deploy
= time()
7358 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_id
})
7359 vim_account_id
= db_vnfr
.get("vim-account-id")
7360 vim_info_key
= "vim:" + vim_account_id
7361 vdu_id
= additional_param
["vdu_id"]
7362 vdurs
= [item
for item
in db_vnfr
["vdur"] if item
["vdu-id-ref"] == vdu_id
]
7363 vdur
= find_in_list(
7364 vdurs
, lambda vdu
: vdu
["count-index"] == additional_param
["count-index"]
7367 vdu_vim_name
= vdur
["name"]
7368 vim_vm_id
= vdur
["vim_info"][vim_info_key
]["vim_id"]
7369 target_vim
, _
= next(k_v
for k_v
in vdur
["vim_info"].items())
7371 raise LcmException("Target vdu is not found")
7372 self
.logger
.info("vdu_vim_name >> {} ".format(vdu_vim_name
))
7373 # wait for any previous tasks in process
7374 stage
[1] = "Waiting for previous operations to terminate"
7375 self
.logger
.info(stage
[1])
7376 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7378 stage
[1] = "Reading from database."
7379 self
.logger
.info(stage
[1])
7380 self
._write
_ns
_status
(
7383 current_operation
=operation_type
.upper(),
7384 current_operation_id
=nslcmop_id
,
7386 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7389 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
7390 db_nsr_update
["operational-status"] = operation_type
7391 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7395 "vim_vm_id": vim_vm_id
,
7397 "vdu_index": additional_param
["count-index"],
7398 "vdu_id": vdur
["id"],
7399 "target_vim": target_vim
,
7400 "vim_account_id": vim_account_id
,
7403 stage
[1] = "Sending rebuild request to RO... {}".format(desc
)
7404 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7405 self
.logger
.info("ro nsr id: {}".format(nsr_id
))
7406 result_dict
= await self
.RO
.operate(nsr_id
, desc
, operation_type
)
7407 self
.logger
.info("response from RO: {}".format(result_dict
))
7408 action_id
= result_dict
["action_id"]
7409 await self
._wait
_ng
_ro
(
7414 self
.timeout
.operate
,
7416 "start_stop_rebuild",
7418 return "COMPLETED", "Done"
7419 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7420 self
.logger
.error("Exit Exception {}".format(e
))
7422 except asyncio
.CancelledError
:
7423 self
.logger
.error("Cancelled Exception while '{}'".format(stage
))
7424 exc
= "Operation was cancelled"
7425 except Exception as e
:
7426 exc
= traceback
.format_exc()
7427 self
.logger
.critical(
7428 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7430 return "FAILED", "Error in operate VNF {}".format(exc
)
7432 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7434 Get VCA Cloud and VCA Cloud Credentials for the VIM account
7436 :param: vim_account_id: VIM Account ID
7438 :return: (cloud_name, cloud_credential)
7440 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7441 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
7443 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7445 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
7447 :param: vim_account_id: VIM Account ID
7449 :return: (cloud_name, cloud_credential)
7451 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7452 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")
7454 async def migrate(self
, nsr_id
, nslcmop_id
):
7456 Migrate VNFs and VDUs instances in a NS
7458 :param: nsr_id: NS Instance ID
7459 :param: nslcmop_id: nslcmop ID of migrate
7462 # Try to lock HA task here
7463 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7464 if not task_is_locked_by_me
:
7466 logging_text
= "Task ns={} migrate ".format(nsr_id
)
7467 self
.logger
.debug(logging_text
+ "Enter")
7468 # get all needed from database
7470 db_nslcmop_update
= {}
7471 nslcmop_operation_state
= None
7475 # in case of error, indicates what part of scale was failed to put nsr at error status
7476 start_deploy
= time()
7479 # wait for any previous tasks in process
7480 step
= "Waiting for previous operations to terminate"
7481 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7483 self
._write
_ns
_status
(
7486 current_operation
="MIGRATING",
7487 current_operation_id
=nslcmop_id
,
7489 step
= "Getting nslcmop from database"
7491 step
+ " after having waited for previous tasks to be completed"
7493 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7494 migrate_params
= db_nslcmop
.get("operationParams")
7497 target
.update(migrate_params
)
7498 desc
= await self
.RO
.migrate(nsr_id
, target
)
7499 self
.logger
.debug("RO return > {}".format(desc
))
7500 action_id
= desc
["action_id"]
7501 await self
._wait
_ng
_ro
(
7506 self
.timeout
.migrate
,
7507 operation
="migrate",
7509 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7510 self
.logger
.error("Exit Exception {}".format(e
))
7512 except asyncio
.CancelledError
:
7513 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
7514 exc
= "Operation was cancelled"
7515 except Exception as e
:
7516 exc
= traceback
.format_exc()
7517 self
.logger
.critical(
7518 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7521 self
._write
_ns
_status
(
7524 current_operation
="IDLE",
7525 current_operation_id
=None,
7528 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
7529 nslcmop_operation_state
= "FAILED"
7531 nslcmop_operation_state
= "COMPLETED"
7532 db_nslcmop_update
["detailed-status"] = "Done"
7533 db_nsr_update
["detailed-status"] = "Done"
7535 self
._write
_op
_status
(
7539 operation_state
=nslcmop_operation_state
,
7540 other_update
=db_nslcmop_update
,
7542 if nslcmop_operation_state
:
7546 "nslcmop_id": nslcmop_id
,
7547 "operationState": nslcmop_operation_state
,
7549 await self
.msg
.aiowrite("ns", "migrated", msg
, loop
=self
.loop
)
7550 except Exception as e
:
7552 logging_text
+ "kafka_write notification Exception {}".format(e
)
7554 self
.logger
.debug(logging_text
+ "Exit")
7555 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_migrate")
7557 async def heal(self
, nsr_id
, nslcmop_id
):
7561 :param nsr_id: ns instance to heal
7562 :param nslcmop_id: operation to run
7566 # Try to lock HA task here
7567 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7568 if not task_is_locked_by_me
:
7571 logging_text
= "Task ns={} heal={} ".format(nsr_id
, nslcmop_id
)
7572 stage
= ["", "", ""]
7573 tasks_dict_info
= {}
7574 # ^ stage, step, VIM progress
7575 self
.logger
.debug(logging_text
+ "Enter")
7576 # get all needed from database
7578 db_nslcmop_update
= {}
7580 db_vnfrs
= {} # vnf's info indexed by _id
7582 old_operational_status
= ""
7583 old_config_status
= ""
7586 # wait for any previous tasks in process
7587 step
= "Waiting for previous operations to terminate"
7588 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7589 self
._write
_ns
_status
(
7592 current_operation
="HEALING",
7593 current_operation_id
=nslcmop_id
,
7596 step
= "Getting nslcmop from database"
7598 step
+ " after having waited for previous tasks to be completed"
7600 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7602 step
= "Getting nsr from database"
7603 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
7604 old_operational_status
= db_nsr
["operational-status"]
7605 old_config_status
= db_nsr
["config-status"]
7608 "_admin.deployed.RO.operational-status": "healing",
7610 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7612 step
= "Sending heal order to VIM"
7614 logging_text
=logging_text
,
7616 db_nslcmop
=db_nslcmop
,
7621 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
7622 self
.logger
.debug(logging_text
+ stage
[1])
7623 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7624 self
.fs
.sync(db_nsr
["nsd-id"])
7626 # read from db: vnfr's of this ns
7627 step
= "Getting vnfrs from db"
7628 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
7629 for vnfr
in db_vnfrs_list
:
7630 db_vnfrs
[vnfr
["_id"]] = vnfr
7631 self
.logger
.debug("ns.heal db_vnfrs={}".format(db_vnfrs
))
7633 # Check for each target VNF
7634 target_list
= db_nslcmop
.get("operationParams", {}).get("healVnfData", {})
7635 for target_vnf
in target_list
:
7636 # Find this VNF in the list from DB
7637 vnfr_id
= target_vnf
.get("vnfInstanceId", None)
7639 db_vnfr
= db_vnfrs
[vnfr_id
]
7640 vnfd_id
= db_vnfr
.get("vnfd-id")
7641 vnfd_ref
= db_vnfr
.get("vnfd-ref")
7642 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7643 base_folder
= vnfd
["_admin"]["storage"]
7648 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
7649 member_vnf_index
= db_vnfr
.get("member-vnf-index-ref")
7651 # Check each target VDU and deploy N2VC
7652 target_vdu_list
= target_vnf
.get("additionalParams", {}).get(
7655 if not target_vdu_list
:
7656 # Codigo nuevo para crear diccionario
7657 target_vdu_list
= []
7658 for existing_vdu
in db_vnfr
.get("vdur"):
7659 vdu_name
= existing_vdu
.get("vdu-name", None)
7660 vdu_index
= existing_vdu
.get("count-index", 0)
7661 vdu_run_day1
= target_vnf
.get("additionalParams", {}).get(
7664 vdu_to_be_healed
= {
7666 "count-index": vdu_index
,
7667 "run-day1": vdu_run_day1
,
7669 target_vdu_list
.append(vdu_to_be_healed
)
7670 for target_vdu
in target_vdu_list
:
7671 deploy_params_vdu
= target_vdu
7672 # Set run-day1 vnf level value if not vdu level value exists
7673 if not deploy_params_vdu
.get("run-day1") and target_vnf
[
7676 deploy_params_vdu
["run-day1"] = target_vnf
[
7679 vdu_name
= target_vdu
.get("vdu-id", None)
7680 # TODO: Get vdu_id from vdud.
7682 # For multi instance VDU count-index is mandatory
7683 # For single session VDU count-indes is 0
7684 vdu_index
= target_vdu
.get("count-index", 0)
7686 # n2vc_redesign STEP 3 to 6 Deploy N2VC
7687 stage
[1] = "Deploying Execution Environments."
7688 self
.logger
.debug(logging_text
+ stage
[1])
7690 # VNF Level charm. Normal case when proxy charms.
7691 # If target instance is management machine continue with actions: recreate EE for native charms or reinject juju key for proxy charms.
7692 descriptor_config
= get_configuration(vnfd
, vnfd_ref
)
7693 if descriptor_config
:
7694 # Continue if healed machine is management machine
7695 vnf_ip_address
= db_vnfr
.get("ip-address")
7696 target_instance
= None
7697 for instance
in db_vnfr
.get("vdur", None):
7699 instance
["vdu-name"] == vdu_name
7700 and instance
["count-index"] == vdu_index
7702 target_instance
= instance
7704 if vnf_ip_address
== target_instance
.get("ip-address"):
7706 logging_text
=logging_text
7707 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7708 member_vnf_index
, vdu_name
, vdu_index
7712 nslcmop_id
=nslcmop_id
,
7718 member_vnf_index
=member_vnf_index
,
7721 deploy_params
=deploy_params_vdu
,
7722 descriptor_config
=descriptor_config
,
7723 base_folder
=base_folder
,
7724 task_instantiation_info
=tasks_dict_info
,
7728 # VDU Level charm. Normal case with native charms.
7729 descriptor_config
= get_configuration(vnfd
, vdu_name
)
7730 if descriptor_config
:
7732 logging_text
=logging_text
7733 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7734 member_vnf_index
, vdu_name
, vdu_index
7738 nslcmop_id
=nslcmop_id
,
7744 member_vnf_index
=member_vnf_index
,
7745 vdu_index
=vdu_index
,
7747 deploy_params
=deploy_params_vdu
,
7748 descriptor_config
=descriptor_config
,
7749 base_folder
=base_folder
,
7750 task_instantiation_info
=tasks_dict_info
,
7755 ROclient
.ROClientException
,
7760 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7762 except asyncio
.CancelledError
:
7764 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7766 exc
= "Operation was cancelled"
7767 except Exception as e
:
7768 exc
= traceback
.format_exc()
7769 self
.logger
.critical(
7770 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7775 stage
[1] = "Waiting for healing pending tasks."
7776 self
.logger
.debug(logging_text
+ stage
[1])
7777 exc
= await self
._wait
_for
_tasks
(
7780 self
.timeout
.ns_deploy
,
7788 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7789 nslcmop_operation_state
= "FAILED"
7791 db_nsr_update
["operational-status"] = old_operational_status
7792 db_nsr_update
["config-status"] = old_config_status
7795 ] = "FAILED healing nslcmop={} {}: {}".format(nslcmop_id
, step
, exc
)
7796 for task
, task_name
in tasks_dict_info
.items():
7797 if not task
.done() or task
.cancelled() or task
.exception():
7798 if task_name
.startswith(self
.task_name_deploy_vca
):
7799 # A N2VC task is pending
7800 db_nsr_update
["config-status"] = "failed"
7802 # RO task is pending
7803 db_nsr_update
["operational-status"] = "failed"
7805 error_description_nslcmop
= None
7806 nslcmop_operation_state
= "COMPLETED"
7807 db_nslcmop_update
["detailed-status"] = "Done"
7808 db_nsr_update
["detailed-status"] = "Done"
7809 db_nsr_update
["operational-status"] = "running"
7810 db_nsr_update
["config-status"] = "configured"
7812 self
._write
_op
_status
(
7815 error_message
=error_description_nslcmop
,
7816 operation_state
=nslcmop_operation_state
,
7817 other_update
=db_nslcmop_update
,
7820 self
._write
_ns
_status
(
7823 current_operation
="IDLE",
7824 current_operation_id
=None,
7825 other_update
=db_nsr_update
,
7828 if nslcmop_operation_state
:
7832 "nslcmop_id": nslcmop_id
,
7833 "operationState": nslcmop_operation_state
,
7835 await self
.msg
.aiowrite("ns", "healed", msg
, loop
=self
.loop
)
7836 except Exception as e
:
7838 logging_text
+ "kafka_write notification Exception {}".format(e
)
7840 self
.logger
.debug(logging_text
+ "Exit")
7841 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_heal")
7852 :param logging_text: preffix text to use at logging
7853 :param nsr_id: nsr identity
7854 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
7855 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
7856 :return: None or exception
7859 def get_vim_account(vim_account_id
):
7861 if vim_account_id
in db_vims
:
7862 return db_vims
[vim_account_id
]
7863 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
7864 db_vims
[vim_account_id
] = db_vim
7869 ns_params
= db_nslcmop
.get("operationParams")
7870 if ns_params
and ns_params
.get("timeout_ns_heal"):
7871 timeout_ns_heal
= ns_params
["timeout_ns_heal"]
7873 timeout_ns_heal
= self
.timeout
.ns_heal
7877 nslcmop_id
= db_nslcmop
["_id"]
7879 "action_id": nslcmop_id
,
7881 self
.logger
.warning(
7882 "db_nslcmop={} and timeout_ns_heal={}".format(
7883 db_nslcmop
, timeout_ns_heal
7886 target
.update(db_nslcmop
.get("operationParams", {}))
7888 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
7889 desc
= await self
.RO
.recreate(nsr_id
, target
)
7890 self
.logger
.debug("RO return > {}".format(desc
))
7891 action_id
= desc
["action_id"]
7892 # waits for RO to complete because Reinjecting juju key at ro can find VM in state Deleted
7893 await self
._wait
_ng
_ro
(
7900 operation
="healing",
7905 "_admin.deployed.RO.operational-status": "running",
7906 "detailed-status": " ".join(stage
),
7908 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7909 self
._write
_op
_status
(nslcmop_id
, stage
)
7911 logging_text
+ "ns healed at RO. RO_id={}".format(action_id
)
7914 except Exception as e
:
7915 stage
[2] = "ERROR healing at VIM"
7916 # self.set_vnfr_at_error(db_vnfrs, str(e))
7918 "Error healing at VIM {}".format(e
),
7919 exc_info
=not isinstance(
7922 ROclient
.ROClientException
,
7948 task_instantiation_info
,
7951 # launch instantiate_N2VC in a asyncio task and register task object
7952 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
7953 # if not found, create one entry and update database
7954 # fill db_nsr._admin.deployed.VCA.<index>
7957 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
7961 get_charm_name
= False
7962 if "execution-environment-list" in descriptor_config
:
7963 ee_list
= descriptor_config
.get("execution-environment-list", [])
7964 elif "juju" in descriptor_config
:
7965 ee_list
= [descriptor_config
] # ns charms
7966 if "execution-environment-list" not in descriptor_config
:
7967 # charm name is only required for ns charms
7968 get_charm_name
= True
7969 else: # other types as script are not supported
7972 for ee_item
in ee_list
:
7975 + "_deploy_n2vc ee_item juju={}, helm={}".format(
7976 ee_item
.get("juju"), ee_item
.get("helm-chart")
7979 ee_descriptor_id
= ee_item
.get("id")
7980 if ee_item
.get("juju"):
7981 vca_name
= ee_item
["juju"].get("charm")
7983 charm_name
= self
.find_charm_name(db_nsr
, str(vca_name
))
7986 if ee_item
["juju"].get("charm") is not None
7989 if ee_item
["juju"].get("cloud") == "k8s":
7990 vca_type
= "k8s_proxy_charm"
7991 elif ee_item
["juju"].get("proxy") is False:
7992 vca_type
= "native_charm"
7993 elif ee_item
.get("helm-chart"):
7994 vca_name
= ee_item
["helm-chart"]
7995 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
7998 vca_type
= "helm-v3"
8001 logging_text
+ "skipping non juju neither charm configuration"
8006 for vca_index
, vca_deployed
in enumerate(
8007 db_nsr
["_admin"]["deployed"]["VCA"]
8009 if not vca_deployed
:
8012 vca_deployed
.get("member-vnf-index") == member_vnf_index
8013 and vca_deployed
.get("vdu_id") == vdu_id
8014 and vca_deployed
.get("kdu_name") == kdu_name
8015 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
8016 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
8020 # not found, create one.
8022 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
8025 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
8027 target
+= "/kdu/{}".format(kdu_name
)
8029 "target_element": target
,
8030 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
8031 "member-vnf-index": member_vnf_index
,
8033 "kdu_name": kdu_name
,
8034 "vdu_count_index": vdu_index
,
8035 "operational-status": "init", # TODO revise
8036 "detailed-status": "", # TODO revise
8037 "step": "initial-deploy", # TODO revise
8039 "vdu_name": vdu_name
,
8041 "ee_descriptor_id": ee_descriptor_id
,
8042 "charm_name": charm_name
,
8046 # create VCA and configurationStatus in db
8048 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
8049 "configurationStatus.{}".format(vca_index
): dict(),
8051 self
.update_db_2("nsrs", nsr_id
, db_dict
)
8053 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
8055 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
8056 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
8057 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
8060 task_n2vc
= asyncio
.ensure_future(
8062 logging_text
=logging_text
,
8063 vca_index
=vca_index
,
8069 vdu_index
=vdu_index
,
8070 deploy_params
=deploy_params
,
8071 config_descriptor
=descriptor_config
,
8072 base_folder
=base_folder
,
8073 nslcmop_id
=nslcmop_id
,
8077 ee_config_descriptor
=ee_item
,
8080 self
.lcm_tasks
.register(
8084 "instantiate_N2VC-{}".format(vca_index
),
8087 task_instantiation_info
[
8089 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
8090 member_vnf_index
or "", vdu_id
or ""
8093 async def heal_N2VC(
8110 ee_config_descriptor
,
8112 nsr_id
= db_nsr
["_id"]
8113 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
8114 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
8115 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
8116 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
8118 "collection": "nsrs",
8119 "filter": {"_id": nsr_id
},
8120 "path": db_update_entry
,
8125 element_under_configuration
= nsr_id
8129 vnfr_id
= db_vnfr
["_id"]
8130 osm_config
["osm"]["vnf_id"] = vnfr_id
8132 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
8134 if vca_type
== "native_charm":
8137 index_number
= vdu_index
or 0
8140 element_type
= "VNF"
8141 element_under_configuration
= vnfr_id
8142 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
8144 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
8145 element_type
= "VDU"
8146 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
8147 osm_config
["osm"]["vdu_id"] = vdu_id
8149 namespace
+= ".{}".format(kdu_name
)
8150 element_type
= "KDU"
8151 element_under_configuration
= kdu_name
8152 osm_config
["osm"]["kdu_name"] = kdu_name
8155 if base_folder
["pkg-dir"]:
8156 artifact_path
= "{}/{}/{}/{}".format(
8157 base_folder
["folder"],
8158 base_folder
["pkg-dir"],
8161 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8166 artifact_path
= "{}/Scripts/{}/{}/".format(
8167 base_folder
["folder"],
8170 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8175 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
8177 # get initial_config_primitive_list that applies to this element
8178 initial_config_primitive_list
= config_descriptor
.get(
8179 "initial-config-primitive"
8183 "Initial config primitive list > {}".format(
8184 initial_config_primitive_list
8188 # add config if not present for NS charm
8189 ee_descriptor_id
= ee_config_descriptor
.get("id")
8190 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
8191 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
8192 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
8196 "Initial config primitive list #2 > {}".format(
8197 initial_config_primitive_list
8200 # n2vc_redesign STEP 3.1
8201 # find old ee_id if exists
8202 ee_id
= vca_deployed
.get("ee_id")
8204 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
8205 # create or register execution environment in VCA. Only for native charms when healing
8206 if vca_type
== "native_charm":
8207 step
= "Waiting to VM being up and getting IP address"
8208 self
.logger
.debug(logging_text
+ step
)
8209 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8218 credentials
= {"hostname": rw_mgmt_ip
}
8220 username
= deep_get(
8221 config_descriptor
, ("config-access", "ssh-access", "default-user")
8223 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
8224 # merged. Meanwhile let's get username from initial-config-primitive
8225 if not username
and initial_config_primitive_list
:
8226 for config_primitive
in initial_config_primitive_list
:
8227 for param
in config_primitive
.get("parameter", ()):
8228 if param
["name"] == "ssh-username":
8229 username
= param
["value"]
8233 "Cannot determine the username neither with 'initial-config-primitive' nor with "
8234 "'config-access.ssh-access.default-user'"
8236 credentials
["username"] = username
8238 # n2vc_redesign STEP 3.2
8239 # TODO: Before healing at RO it is needed to destroy native charm units to be deleted.
8240 self
._write
_configuration
_status
(
8242 vca_index
=vca_index
,
8243 status
="REGISTERING",
8244 element_under_configuration
=element_under_configuration
,
8245 element_type
=element_type
,
8248 step
= "register execution environment {}".format(credentials
)
8249 self
.logger
.debug(logging_text
+ step
)
8250 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
8251 credentials
=credentials
,
8252 namespace
=namespace
,
8257 # update ee_id en db
8259 "_admin.deployed.VCA.{}.ee_id".format(vca_index
): ee_id
,
8261 self
.update_db_2("nsrs", nsr_id
, db_dict_ee_id
)
8263 # for compatibility with MON/POL modules, the need model and application name at database
8264 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
8265 # Not sure if this need to be done when healing
8267 ee_id_parts = ee_id.split(".")
8268 db_nsr_update = {db_update_entry + "ee_id": ee_id}
8269 if len(ee_id_parts) >= 2:
8270 model_name = ee_id_parts[0]
8271 application_name = ee_id_parts[1]
8272 db_nsr_update[db_update_entry + "model"] = model_name
8273 db_nsr_update[db_update_entry + "application"] = application_name
8276 # n2vc_redesign STEP 3.3
8277 # Install configuration software. Only for native charms.
8278 step
= "Install configuration Software"
8280 self
._write
_configuration
_status
(
8282 vca_index
=vca_index
,
8283 status
="INSTALLING SW",
8284 element_under_configuration
=element_under_configuration
,
8285 element_type
=element_type
,
8286 # other_update=db_nsr_update,
8290 # TODO check if already done
8291 self
.logger
.debug(logging_text
+ step
)
8293 if vca_type
== "native_charm":
8294 config_primitive
= next(
8295 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
8298 if config_primitive
:
8299 config
= self
._map
_primitive
_params
(
8300 config_primitive
, {}, deploy_params
8302 await self
.vca_map
[vca_type
].install_configuration_sw(
8304 artifact_path
=artifact_path
,
8312 # write in db flag of configuration_sw already installed
8314 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
8317 # Not sure if this need to be done when healing
8319 # add relations for this VCA (wait for other peers related with this VCA)
8320 await self._add_vca_relations(
8321 logging_text=logging_text,
8324 vca_index=vca_index,
8328 # if SSH access is required, then get execution environment SSH public
8329 # if native charm we have waited already to VM be UP
8330 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
8333 # self.logger.debug("get ssh key block")
8335 config_descriptor
, ("config-access", "ssh-access", "required")
8337 # self.logger.debug("ssh key needed")
8338 # Needed to inject a ssh key
8341 ("config-access", "ssh-access", "default-user"),
8343 step
= "Install configuration Software, getting public ssh key"
8344 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
8345 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
8348 step
= "Insert public key into VM user={} ssh_key={}".format(
8352 # self.logger.debug("no need to get ssh key")
8353 step
= "Waiting to VM being up and getting IP address"
8354 self
.logger
.debug(logging_text
+ step
)
8356 # n2vc_redesign STEP 5.1
8357 # wait for RO (ip-address) Insert pub_key into VM
8358 # IMPORTANT: We need do wait for RO to complete healing operation.
8359 await self
._wait
_heal
_ro
(nsr_id
, self
.timeout
.ns_heal
)
8362 rw_mgmt_ip
= await self
.wait_kdu_up(
8363 logging_text
, nsr_id
, vnfr_id
, kdu_name
8366 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8376 rw_mgmt_ip
= None # This is for a NS configuration
8378 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
8380 # store rw_mgmt_ip in deploy params for later replacement
8381 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
8384 # get run-day1 operation parameter
8385 runDay1
= deploy_params
.get("run-day1", False)
8387 "Healing vnf={}, vdu={}, runDay1 ={}".format(vnfr_id
, vdu_id
, runDay1
)
8390 # n2vc_redesign STEP 6 Execute initial config primitive
8391 step
= "execute initial config primitive"
8393 # wait for dependent primitives execution (NS -> VNF -> VDU)
8394 if initial_config_primitive_list
:
8395 await self
._wait
_dependent
_n
2vc
(
8396 nsr_id
, vca_deployed_list
, vca_index
8399 # stage, in function of element type: vdu, kdu, vnf or ns
8400 my_vca
= vca_deployed_list
[vca_index
]
8401 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
8403 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
8404 elif my_vca
.get("member-vnf-index"):
8406 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
8409 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
8411 self
._write
_configuration
_status
(
8412 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
8415 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
8417 check_if_terminated_needed
= True
8418 for initial_config_primitive
in initial_config_primitive_list
:
8419 # adding information on the vca_deployed if it is a NS execution environment
8420 if not vca_deployed
["member-vnf-index"]:
8421 deploy_params
["ns_config_info"] = json
.dumps(
8422 self
._get
_ns
_config
_info
(nsr_id
)
8424 # TODO check if already done
8425 primitive_params_
= self
._map
_primitive
_params
(
8426 initial_config_primitive
, {}, deploy_params
8429 step
= "execute primitive '{}' params '{}'".format(
8430 initial_config_primitive
["name"], primitive_params_
8432 self
.logger
.debug(logging_text
+ step
)
8433 await self
.vca_map
[vca_type
].exec_primitive(
8435 primitive_name
=initial_config_primitive
["name"],
8436 params_dict
=primitive_params_
,
8441 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
8442 if check_if_terminated_needed
:
8443 if config_descriptor
.get("terminate-config-primitive"):
8447 {db_update_entry
+ "needed_terminate": True},
8449 check_if_terminated_needed
= False
8451 # TODO register in database that primitive is done
8453 # STEP 7 Configure metrics
8454 # Not sure if this need to be done when healing
8456 if vca_type == "helm" or vca_type == "helm-v3":
8457 prometheus_jobs = await self.extract_prometheus_scrape_jobs(
8459 artifact_path=artifact_path,
8460 ee_config_descriptor=ee_config_descriptor,
8463 target_ip=rw_mgmt_ip,
8469 {db_update_entry + "prometheus_jobs": prometheus_jobs},
8472 for job in prometheus_jobs:
8475 {"job_name": job["job_name"]},
8478 fail_on_empty=False,
8482 step
= "instantiated at VCA"
8483 self
.logger
.debug(logging_text
+ step
)
8485 self
._write
_configuration
_status
(
8486 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
8489 except Exception as e
: # TODO not use Exception but N2VC exception
8490 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
8492 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
8495 "Exception while {} : {}".format(step
, e
), exc_info
=True
8497 self
._write
_configuration
_status
(
8498 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
8500 raise LcmException("{} {}".format(step
, e
)) from e
8502 async def _wait_heal_ro(
8508 while time() <= start_time
+ timeout
:
8509 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
8510 operational_status_ro
= db_nsr
["_admin"]["deployed"]["RO"][
8511 "operational-status"
8513 self
.logger
.debug("Wait Heal RO > {}".format(operational_status_ro
))
8514 if operational_status_ro
!= "healing":
8516 await asyncio
.sleep(15, loop
=self
.loop
)
8517 else: # timeout_ns_deploy
8518 raise NgRoException("Timeout waiting ns to deploy")
8520 async def vertical_scale(self
, nsr_id
, nslcmop_id
):
8522 Vertical Scale the VDUs in a NS
8524 :param: nsr_id: NS Instance ID
8525 :param: nslcmop_id: nslcmop ID of migrate
8528 # Try to lock HA task here
8529 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
8530 if not task_is_locked_by_me
:
8532 logging_text
= "Task ns={} vertical scale ".format(nsr_id
)
8533 self
.logger
.debug(logging_text
+ "Enter")
8534 # get all needed from database
8536 db_nslcmop_update
= {}
8537 nslcmop_operation_state
= None
8541 # in case of error, indicates what part of scale was failed to put nsr at error status
8542 start_deploy
= time()
8545 # wait for any previous tasks in process
8546 step
= "Waiting for previous operations to terminate"
8547 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
8549 self
._write
_ns
_status
(
8552 current_operation
="VerticalScale",
8553 current_operation_id
=nslcmop_id
,
8555 step
= "Getting nslcmop from database"
8557 step
+ " after having waited for previous tasks to be completed"
8559 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
8560 operationParams
= db_nslcmop
.get("operationParams")
8562 target
.update(operationParams
)
8563 desc
= await self
.RO
.vertical_scale(nsr_id
, target
)
8564 self
.logger
.debug("RO return > {}".format(desc
))
8565 action_id
= desc
["action_id"]
8566 await self
._wait
_ng
_ro
(
8571 self
.timeout
.verticalscale
,
8572 operation
="verticalscale",
8574 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
8575 self
.logger
.error("Exit Exception {}".format(e
))
8577 except asyncio
.CancelledError
:
8578 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
8579 exc
= "Operation was cancelled"
8580 except Exception as e
:
8581 exc
= traceback
.format_exc()
8582 self
.logger
.critical(
8583 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
8586 self
._write
_ns
_status
(
8589 current_operation
="IDLE",
8590 current_operation_id
=None,
8593 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
8594 nslcmop_operation_state
= "FAILED"
8596 nslcmop_operation_state
= "COMPLETED"
8597 db_nslcmop_update
["detailed-status"] = "Done"
8598 db_nsr_update
["detailed-status"] = "Done"
8600 self
._write
_op
_status
(
8604 operation_state
=nslcmop_operation_state
,
8605 other_update
=db_nslcmop_update
,
8607 if nslcmop_operation_state
:
8611 "nslcmop_id": nslcmop_id
,
8612 "operationState": nslcmop_operation_state
,
8614 await self
.msg
.aiowrite("ns", "verticalscaled", msg
, loop
=self
.loop
)
8615 except Exception as e
:
8617 logging_text
+ "kafka_write notification Exception {}".format(e
)
8619 self
.logger
.debug(logging_text
+ "Exit")
8620 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_verticalscale")