1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 from typing
import Any
, Dict
, List
24 import logging
.handlers
35 from osm_lcm
import ROclient
36 from osm_lcm
.data_utils
.nsr
import (
39 get_deployed_vca_list
,
42 from osm_lcm
.data_utils
.vca
import (
51 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
52 from osm_lcm
.lcm_utils
import (
59 check_juju_bundle_existence
,
60 get_charm_artifact_path
,
62 from osm_lcm
.data_utils
.nsd
import (
63 get_ns_configuration_relation_list
,
67 from osm_lcm
.data_utils
.vnfd
import (
73 get_ee_sorted_initial_config_primitive_list
,
74 get_ee_sorted_terminate_config_primitive_list
,
76 get_virtual_link_profiles
,
81 get_number_of_instances
,
83 get_kdu_resource_profile
,
84 find_software_version
,
86 from osm_lcm
.data_utils
.list_utils
import find_in_list
87 from osm_lcm
.data_utils
.vnfr
import (
91 get_volumes_from_instantiation_params
,
93 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
94 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
95 from n2vc
.definitions
import RelationEndpoint
96 from n2vc
.k8s_helm_conn
import K8sHelmConnector
97 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
98 from n2vc
.k8s_juju_conn
import K8sJujuConnector
100 from osm_common
.dbbase
import DbException
101 from osm_common
.fsbase
import FsException
103 from osm_lcm
.data_utils
.database
.database
import Database
104 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
106 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
107 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
109 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
110 from osm_lcm
.osm_config
import OsmConfigBuilder
111 from osm_lcm
.prometheus
import parse_job
113 from copy
import copy
, deepcopy
114 from time
import time
115 from uuid
import uuid4
117 from random
import randint
119 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
122 class NsLcm(LcmBase
):
123 timeout_vca_on_error
= (
125 ) # Time for charm from first time at blocked,error status to mark as failed
126 timeout_ns_deploy
= 2 * 3600 # default global timeout for deployment a ns
127 timeout_ns_terminate
= 1800 # default global timeout for un deployment a ns
128 timeout_ns_heal
= 1800 # default global timeout for un deployment a ns
129 timeout_charm_delete
= 10 * 60
130 timeout_primitive
= 30 * 60 # timeout for primitive execution
131 timeout_ns_update
= 30 * 60 # timeout for ns update
132 timeout_progress_primitive
= (
134 ) # timeout for some progress in a primitive execution
135 timeout_migrate
= 1800 # default global timeout for migrating vnfs
136 timeout_operate
= 1800 # default global timeout for migrating vnfs
137 timeout_verticalscale
= 1800 # default global timeout for Vertical Sclaing
138 SUBOPERATION_STATUS_NOT_FOUND
= -1
139 SUBOPERATION_STATUS_NEW
= -2
140 SUBOPERATION_STATUS_SKIP
= -3
141 task_name_deploy_vca
= "Deploying VCA"
143 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
145 Init, Connect to database, filesystem storage, and messaging
146 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
149 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
151 self
.db
= Database().instance
.db
152 self
.fs
= Filesystem().instance
.fs
154 self
.lcm_tasks
= lcm_tasks
155 self
.timeout
= config
["timeout"]
156 self
.ro_config
= config
["ro_config"]
157 self
.ng_ro
= config
["ro_config"].get("ng")
158 self
.vca_config
= config
["VCA"].copy()
160 # create N2VC connector
161 self
.n2vc
= N2VCJujuConnector(
164 on_update_db
=self
._on
_update
_n
2vc
_db
,
169 self
.conn_helm_ee
= LCMHelmConn(
172 vca_config
=self
.vca_config
,
173 on_update_db
=self
._on
_update
_n
2vc
_db
,
176 self
.k8sclusterhelm2
= K8sHelmConnector(
177 kubectl_command
=self
.vca_config
.get("kubectlpath"),
178 helm_command
=self
.vca_config
.get("helmpath"),
185 self
.k8sclusterhelm3
= K8sHelm3Connector(
186 kubectl_command
=self
.vca_config
.get("kubectlpath"),
187 helm_command
=self
.vca_config
.get("helm3path"),
194 self
.k8sclusterjuju
= K8sJujuConnector(
195 kubectl_command
=self
.vca_config
.get("kubectlpath"),
196 juju_command
=self
.vca_config
.get("jujupath"),
199 on_update_db
=self
._on
_update
_k
8s
_db
,
204 self
.k8scluster_map
= {
205 "helm-chart": self
.k8sclusterhelm2
,
206 "helm-chart-v3": self
.k8sclusterhelm3
,
207 "chart": self
.k8sclusterhelm3
,
208 "juju-bundle": self
.k8sclusterjuju
,
209 "juju": self
.k8sclusterjuju
,
213 "lxc_proxy_charm": self
.n2vc
,
214 "native_charm": self
.n2vc
,
215 "k8s_proxy_charm": self
.n2vc
,
216 "helm": self
.conn_helm_ee
,
217 "helm-v3": self
.conn_helm_ee
,
221 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
)
223 self
.op_status_map
= {
224 "instantiation": self
.RO
.status
,
225 "termination": self
.RO
.status
,
226 "migrate": self
.RO
.status
,
227 "healing": self
.RO
.recreate_status
,
231 def increment_ip_mac(ip_mac
, vm_index
=1):
232 if not isinstance(ip_mac
, str):
235 # try with ipv4 look for last dot
236 i
= ip_mac
.rfind(".")
239 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
240 # try with ipv6 or mac look for last colon. Operate in hex
241 i
= ip_mac
.rfind(":")
244 # format in hex, len can be 2 for mac or 4 for ipv6
245 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
246 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
252 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
254 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
257 # TODO filter RO descriptor fields...
261 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
262 db_dict
["deploymentStatus"] = ro_descriptor
263 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
265 except Exception as e
:
267 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id
, e
)
270 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
272 # remove last dot from path (if exists)
273 if path
.endswith("."):
276 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
277 # .format(table, filter, path, updated_data))
280 nsr_id
= filter.get("_id")
282 # read ns record from database
283 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
284 current_ns_status
= nsr
.get("nsState")
286 # get vca status for NS
287 status_dict
= await self
.n2vc
.get_status(
288 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
293 db_dict
["vcaStatus"] = status_dict
295 # update configurationStatus for this VCA
297 vca_index
= int(path
[path
.rfind(".") + 1 :])
300 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
302 vca_status
= vca_list
[vca_index
].get("status")
304 configuration_status_list
= nsr
.get("configurationStatus")
305 config_status
= configuration_status_list
[vca_index
].get("status")
307 if config_status
== "BROKEN" and vca_status
!= "failed":
308 db_dict
["configurationStatus"][vca_index
] = "READY"
309 elif config_status
!= "BROKEN" and vca_status
== "failed":
310 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
311 except Exception as e
:
312 # not update configurationStatus
313 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
315 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
316 # if nsState = 'DEGRADED' check if all is OK
318 if current_ns_status
in ("READY", "DEGRADED"):
319 error_description
= ""
321 if status_dict
.get("machines"):
322 for machine_id
in status_dict
.get("machines"):
323 machine
= status_dict
.get("machines").get(machine_id
)
324 # check machine agent-status
325 if machine
.get("agent-status"):
326 s
= machine
.get("agent-status").get("status")
329 error_description
+= (
330 "machine {} agent-status={} ; ".format(
334 # check machine instance status
335 if machine
.get("instance-status"):
336 s
= machine
.get("instance-status").get("status")
339 error_description
+= (
340 "machine {} instance-status={} ; ".format(
345 if status_dict
.get("applications"):
346 for app_id
in status_dict
.get("applications"):
347 app
= status_dict
.get("applications").get(app_id
)
348 # check application status
349 if app
.get("status"):
350 s
= app
.get("status").get("status")
353 error_description
+= (
354 "application {} status={} ; ".format(app_id
, s
)
357 if error_description
:
358 db_dict
["errorDescription"] = error_description
359 if current_ns_status
== "READY" and is_degraded
:
360 db_dict
["nsState"] = "DEGRADED"
361 if current_ns_status
== "DEGRADED" and not is_degraded
:
362 db_dict
["nsState"] = "READY"
365 self
.update_db_2("nsrs", nsr_id
, db_dict
)
367 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
369 except Exception as e
:
370 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
372 async def _on_update_k8s_db(
373 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None, cluster_type
="juju"
376 Updating vca status in NSR record
377 :param cluster_uuid: UUID of a k8s cluster
378 :param kdu_instance: The unique name of the KDU instance
379 :param filter: To get nsr_id
380 :cluster_type: The cluster type (juju, k8s)
384 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
385 # .format(cluster_uuid, kdu_instance, filter))
387 nsr_id
= filter.get("_id")
389 vca_status
= await self
.k8scluster_map
[cluster_type
].status_kdu(
390 cluster_uuid
=cluster_uuid
,
391 kdu_instance
=kdu_instance
,
393 complete_status
=True,
399 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
402 f
"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
406 self
.update_db_2("nsrs", nsr_id
, db_dict
)
407 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
409 except Exception as e
:
410 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
413 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
415 env
= Environment(undefined
=StrictUndefined
)
416 template
= env
.from_string(cloud_init_text
)
417 return template
.render(additional_params
or {})
418 except UndefinedError
as e
:
420 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
421 "file, must be provided in the instantiation parameters inside the "
422 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
424 except (TemplateError
, TemplateNotFound
) as e
:
426 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
431 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
432 cloud_init_content
= cloud_init_file
= None
434 if vdu
.get("cloud-init-file"):
435 base_folder
= vnfd
["_admin"]["storage"]
436 if base_folder
["pkg-dir"]:
437 cloud_init_file
= "{}/{}/cloud_init/{}".format(
438 base_folder
["folder"],
439 base_folder
["pkg-dir"],
440 vdu
["cloud-init-file"],
443 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
444 base_folder
["folder"],
445 vdu
["cloud-init-file"],
447 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
448 cloud_init_content
= ci_file
.read()
449 elif vdu
.get("cloud-init"):
450 cloud_init_content
= vdu
["cloud-init"]
452 return cloud_init_content
453 except FsException
as e
:
455 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
456 vnfd
["id"], vdu
["id"], cloud_init_file
, e
460 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
462 (vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]), {}
464 additional_params
= vdur
.get("additionalParams")
465 return parse_yaml_strings(additional_params
)
467 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
469 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
470 :param vnfd: input vnfd
471 :param new_id: overrides vnf id if provided
472 :param additionalParams: Instantiation params for VNFs provided
473 :param nsrId: Id of the NSR
474 :return: copy of vnfd
476 vnfd_RO
= deepcopy(vnfd
)
477 # remove unused by RO configuration, monitoring, scaling and internal keys
478 vnfd_RO
.pop("_id", None)
479 vnfd_RO
.pop("_admin", None)
480 vnfd_RO
.pop("monitoring-param", None)
481 vnfd_RO
.pop("scaling-group-descriptor", None)
482 vnfd_RO
.pop("kdu", None)
483 vnfd_RO
.pop("k8s-cluster", None)
485 vnfd_RO
["id"] = new_id
487 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
488 for vdu
in get_iterable(vnfd_RO
, "vdu"):
489 vdu
.pop("cloud-init-file", None)
490 vdu
.pop("cloud-init", None)
494 def ip_profile_2_RO(ip_profile
):
495 RO_ip_profile
= deepcopy(ip_profile
)
496 if "dns-server" in RO_ip_profile
:
497 if isinstance(RO_ip_profile
["dns-server"], list):
498 RO_ip_profile
["dns-address"] = []
499 for ds
in RO_ip_profile
.pop("dns-server"):
500 RO_ip_profile
["dns-address"].append(ds
["address"])
502 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
503 if RO_ip_profile
.get("ip-version") == "ipv4":
504 RO_ip_profile
["ip-version"] = "IPv4"
505 if RO_ip_profile
.get("ip-version") == "ipv6":
506 RO_ip_profile
["ip-version"] = "IPv6"
507 if "dhcp-params" in RO_ip_profile
:
508 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
511 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
512 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
513 if db_vim
["_admin"]["operationalState"] != "ENABLED":
515 "VIM={} is not available. operationalState={}".format(
516 vim_account
, db_vim
["_admin"]["operationalState"]
519 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
522 def get_ro_wim_id_for_wim_account(self
, wim_account
):
523 if isinstance(wim_account
, str):
524 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
525 if db_wim
["_admin"]["operationalState"] != "ENABLED":
527 "WIM={} is not available. operationalState={}".format(
528 wim_account
, db_wim
["_admin"]["operationalState"]
531 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
536 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
538 db_vdu_push_list
= []
540 db_update
= {"_admin.modified": time()}
542 for vdu_id
, vdu_count
in vdu_create
.items():
546 for vdur
in reversed(db_vnfr
["vdur"])
547 if vdur
["vdu-id-ref"] == vdu_id
552 # Read the template saved in the db:
554 "No vdur in the database. Using the vdur-template to scale"
556 vdur_template
= db_vnfr
.get("vdur-template")
557 if not vdur_template
:
559 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
563 vdur
= vdur_template
[0]
564 # Delete a template from the database after using it
567 {"_id": db_vnfr
["_id"]},
569 pull
={"vdur-template": {"_id": vdur
["_id"]}},
571 for count
in range(vdu_count
):
572 vdur_copy
= deepcopy(vdur
)
573 vdur_copy
["status"] = "BUILD"
574 vdur_copy
["status-detailed"] = None
575 vdur_copy
["ip-address"] = None
576 vdur_copy
["_id"] = str(uuid4())
577 vdur_copy
["count-index"] += count
+ 1
578 vdur_copy
["id"] = "{}-{}".format(
579 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
581 vdur_copy
.pop("vim_info", None)
582 for iface
in vdur_copy
["interfaces"]:
583 if iface
.get("fixed-ip"):
584 iface
["ip-address"] = self
.increment_ip_mac(
585 iface
["ip-address"], count
+ 1
588 iface
.pop("ip-address", None)
589 if iface
.get("fixed-mac"):
590 iface
["mac-address"] = self
.increment_ip_mac(
591 iface
["mac-address"], count
+ 1
594 iface
.pop("mac-address", None)
598 ) # only first vdu can be managment of vnf
599 db_vdu_push_list
.append(vdur_copy
)
600 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
602 if len(db_vnfr
["vdur"]) == 1:
603 # The scale will move to 0 instances
605 "Scaling to 0 !, creating the template with the last vdur"
607 template_vdur
= [db_vnfr
["vdur"][0]]
608 for vdu_id
, vdu_count
in vdu_delete
.items():
610 indexes_to_delete
= [
612 for iv
in enumerate(db_vnfr
["vdur"])
613 if iv
[1]["vdu-id-ref"] == vdu_id
617 "vdur.{}.status".format(i
): "DELETING"
618 for i
in indexes_to_delete
[-vdu_count
:]
622 # it must be deleted one by one because common.db does not allow otherwise
625 for v
in reversed(db_vnfr
["vdur"])
626 if v
["vdu-id-ref"] == vdu_id
628 for vdu
in vdus_to_delete
[:vdu_count
]:
631 {"_id": db_vnfr
["_id"]},
633 pull
={"vdur": {"_id": vdu
["_id"]}},
637 db_push
["vdur"] = db_vdu_push_list
639 db_push
["vdur-template"] = template_vdur
642 db_vnfr
["vdur-template"] = template_vdur
643 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
644 # modify passed dictionary db_vnfr
645 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
646 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
648 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
650 Updates database nsr with the RO info for the created vld
651 :param ns_update_nsr: dictionary to be filled with the updated info
652 :param db_nsr: content of db_nsr. This is also modified
653 :param nsr_desc_RO: nsr descriptor from RO
654 :return: Nothing, LcmException is raised on errors
657 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
658 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
659 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
661 vld
["vim-id"] = net_RO
.get("vim_net_id")
662 vld
["name"] = net_RO
.get("vim_name")
663 vld
["status"] = net_RO
.get("status")
664 vld
["status-detailed"] = net_RO
.get("error_msg")
665 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
669 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
672 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
674 for db_vnfr
in db_vnfrs
.values():
675 vnfr_update
= {"status": "ERROR"}
676 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
677 if "status" not in vdur
:
678 vdur
["status"] = "ERROR"
679 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
681 vdur
["status-detailed"] = str(error_text
)
683 "vdur.{}.status-detailed".format(vdu_index
)
685 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
686 except DbException
as e
:
687 self
.logger
.error("Cannot update vnf. {}".format(e
))
689 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
691 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
692 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
693 :param nsr_desc_RO: nsr descriptor from RO
694 :return: Nothing, LcmException is raised on errors
696 for vnf_index
, db_vnfr
in db_vnfrs
.items():
697 for vnf_RO
in nsr_desc_RO
["vnfs"]:
698 if vnf_RO
["member_vnf_index"] != vnf_index
:
701 if vnf_RO
.get("ip_address"):
702 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
[
705 elif not db_vnfr
.get("ip-address"):
706 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
707 raise LcmExceptionNoMgmtIP(
708 "ns member_vnf_index '{}' has no IP address".format(
713 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
714 vdur_RO_count_index
= 0
715 if vdur
.get("pdu-type"):
717 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
718 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
720 if vdur
["count-index"] != vdur_RO_count_index
:
721 vdur_RO_count_index
+= 1
723 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
724 if vdur_RO
.get("ip_address"):
725 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
727 vdur
["ip-address"] = None
728 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
729 vdur
["name"] = vdur_RO
.get("vim_name")
730 vdur
["status"] = vdur_RO
.get("status")
731 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
732 for ifacer
in get_iterable(vdur
, "interfaces"):
733 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
734 if ifacer
["name"] == interface_RO
.get("internal_name"):
735 ifacer
["ip-address"] = interface_RO
.get(
738 ifacer
["mac-address"] = interface_RO
.get(
744 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
745 "from VIM info".format(
746 vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]
749 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
753 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
755 vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]
759 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
760 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
761 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
763 vld
["vim-id"] = net_RO
.get("vim_net_id")
764 vld
["name"] = net_RO
.get("vim_name")
765 vld
["status"] = net_RO
.get("status")
766 vld
["status-detailed"] = net_RO
.get("error_msg")
767 vnfr_update
["vld.{}".format(vld_index
)] = vld
771 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
776 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
781 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
786 def _get_ns_config_info(self
, nsr_id
):
788 Generates a mapping between vnf,vdu elements and the N2VC id
789 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
790 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
791 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
792 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
794 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
795 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
797 ns_config_info
= {"osm-config-mapping": mapping
}
798 for vca
in vca_deployed_list
:
799 if not vca
["member-vnf-index"]:
801 if not vca
["vdu_id"]:
802 mapping
[vca
["member-vnf-index"]] = vca
["application"]
806 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
808 ] = vca
["application"]
809 return ns_config_info
811 async def _instantiate_ng_ro(
828 def get_vim_account(vim_account_id
):
830 if vim_account_id
in db_vims
:
831 return db_vims
[vim_account_id
]
832 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
833 db_vims
[vim_account_id
] = db_vim
836 # modify target_vld info with instantiation parameters
837 def parse_vld_instantiation_params(
838 target_vim
, target_vld
, vld_params
, target_sdn
840 if vld_params
.get("ip-profile"):
841 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
[
844 if vld_params
.get("provider-network"):
845 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
848 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
849 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
852 if vld_params
.get("wimAccountId"):
853 target_wim
= "wim:{}".format(vld_params
["wimAccountId"])
854 target_vld
["vim_info"][target_wim
] = {}
855 for param
in ("vim-network-name", "vim-network-id"):
856 if vld_params
.get(param
):
857 if isinstance(vld_params
[param
], dict):
858 for vim
, vim_net
in vld_params
[param
].items():
859 other_target_vim
= "vim:" + vim
861 target_vld
["vim_info"],
862 (other_target_vim
, param
.replace("-", "_")),
865 else: # isinstance str
866 target_vld
["vim_info"][target_vim
][
867 param
.replace("-", "_")
868 ] = vld_params
[param
]
869 if vld_params
.get("common_id"):
870 target_vld
["common_id"] = vld_params
.get("common_id")
872 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
873 def update_ns_vld_target(target
, ns_params
):
874 for vnf_params
in ns_params
.get("vnf", ()):
875 if vnf_params
.get("vimAccountId"):
879 for vnfr
in db_vnfrs
.values()
880 if vnf_params
["member-vnf-index"]
881 == vnfr
["member-vnf-index-ref"]
885 vdur
= next((vdur
for vdur
in target_vnf
.get("vdur", ())), None)
886 for a_index
, a_vld
in enumerate(target
["ns"]["vld"]):
887 target_vld
= find_in_list(
888 get_iterable(vdur
, "interfaces"),
889 lambda iface
: iface
.get("ns-vld-id") == a_vld
["name"],
892 vld_params
= find_in_list(
893 get_iterable(ns_params
, "vld"),
894 lambda v_vld
: v_vld
["name"] in (a_vld
["name"], a_vld
["id"]),
898 if vnf_params
.get("vimAccountId") not in a_vld
.get(
901 target_vim_network_list
= [
902 v
for _
, v
in a_vld
.get("vim_info").items()
904 target_vim_network_name
= next(
906 item
.get("vim_network_name", "")
907 for item
in target_vim_network_list
912 target
["ns"]["vld"][a_index
].get("vim_info").update(
914 "vim:{}".format(vnf_params
["vimAccountId"]): {
915 "vim_network_name": target_vim_network_name
,
921 for param
in ("vim-network-name", "vim-network-id"):
922 if vld_params
.get(param
) and isinstance(
923 vld_params
[param
], dict
925 for vim
, vim_net
in vld_params
[
928 other_target_vim
= "vim:" + vim
930 target
["ns"]["vld"][a_index
].get(
935 param
.replace("-", "_"),
940 nslcmop_id
= db_nslcmop
["_id"]
942 "name": db_nsr
["name"],
945 "image": deepcopy(db_nsr
["image"]),
946 "flavor": deepcopy(db_nsr
["flavor"]),
947 "action_id": nslcmop_id
,
948 "cloud_init_content": {},
950 for image
in target
["image"]:
951 image
["vim_info"] = {}
952 for flavor
in target
["flavor"]:
953 flavor
["vim_info"] = {}
954 if db_nsr
.get("affinity-or-anti-affinity-group"):
955 target
["affinity-or-anti-affinity-group"] = deepcopy(
956 db_nsr
["affinity-or-anti-affinity-group"]
958 for affinity_or_anti_affinity_group
in target
[
959 "affinity-or-anti-affinity-group"
961 affinity_or_anti_affinity_group
["vim_info"] = {}
963 if db_nslcmop
.get("lcmOperationType") != "instantiate":
964 # get parameters of instantiation:
965 db_nslcmop_instantiate
= self
.db
.get_list(
968 "nsInstanceId": db_nslcmop
["nsInstanceId"],
969 "lcmOperationType": "instantiate",
972 ns_params
= db_nslcmop_instantiate
.get("operationParams")
974 ns_params
= db_nslcmop
.get("operationParams")
975 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
976 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
979 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
980 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
984 "mgmt-network": vld
.get("mgmt-network", False),
985 "type": vld
.get("type"),
988 "vim_network_name": vld
.get("vim-network-name"),
989 "vim_account_id": ns_params
["vimAccountId"],
993 # check if this network needs SDN assist
994 if vld
.get("pci-interfaces"):
995 db_vim
= get_vim_account(ns_params
["vimAccountId"])
996 sdnc_id
= db_vim
["config"].get("sdn-controller")
998 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
999 target_sdn
= "sdn:{}".format(sdnc_id
)
1000 target_vld
["vim_info"][target_sdn
] = {
1002 "target_vim": target_vim
,
1004 "type": vld
.get("type"),
1007 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
1008 for nsd_vnf_profile
in nsd_vnf_profiles
:
1009 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
1010 if cp
["virtual-link-profile-id"] == vld
["id"]:
1012 "member_vnf:{}.{}".format(
1013 cp
["constituent-cpd-id"][0][
1014 "constituent-base-element-id"
1016 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
1018 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
1020 # check at nsd descriptor, if there is an ip-profile
1022 nsd_vlp
= find_in_list(
1023 get_virtual_link_profiles(nsd
),
1024 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
1029 and nsd_vlp
.get("virtual-link-protocol-data")
1030 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1032 ip_profile_source_data
= nsd_vlp
["virtual-link-protocol-data"][
1035 ip_profile_dest_data
= {}
1036 if "ip-version" in ip_profile_source_data
:
1037 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1040 if "cidr" in ip_profile_source_data
:
1041 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1044 if "gateway-ip" in ip_profile_source_data
:
1045 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
[
1048 if "dhcp-enabled" in ip_profile_source_data
:
1049 ip_profile_dest_data
["dhcp-params"] = {
1050 "enabled": ip_profile_source_data
["dhcp-enabled"]
1052 vld_params
["ip-profile"] = ip_profile_dest_data
1054 # update vld_params with instantiation params
1055 vld_instantiation_params
= find_in_list(
1056 get_iterable(ns_params
, "vld"),
1057 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
1059 if vld_instantiation_params
:
1060 vld_params
.update(vld_instantiation_params
)
1061 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
1062 target
["ns"]["vld"].append(target_vld
)
1063 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1064 update_ns_vld_target(target
, ns_params
)
1066 for vnfr
in db_vnfrs
.values():
1067 vnfd
= find_in_list(
1068 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
1070 vnf_params
= find_in_list(
1071 get_iterable(ns_params
, "vnf"),
1072 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
1074 target_vnf
= deepcopy(vnfr
)
1075 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
1076 for vld
in target_vnf
.get("vld", ()):
1077 # check if connected to a ns.vld, to fill target'
1078 vnf_cp
= find_in_list(
1079 vnfd
.get("int-virtual-link-desc", ()),
1080 lambda cpd
: cpd
.get("id") == vld
["id"],
1083 ns_cp
= "member_vnf:{}.{}".format(
1084 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
1086 if cp2target
.get(ns_cp
):
1087 vld
["target"] = cp2target
[ns_cp
]
1090 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
1092 # check if this network needs SDN assist
1094 if vld
.get("pci-interfaces"):
1095 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1096 sdnc_id
= db_vim
["config"].get("sdn-controller")
1098 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
1099 target_sdn
= "sdn:{}".format(sdnc_id
)
1100 vld
["vim_info"][target_sdn
] = {
1102 "target_vim": target_vim
,
1104 "type": vld
.get("type"),
1107 # check at vnfd descriptor, if there is an ip-profile
1109 vnfd_vlp
= find_in_list(
1110 get_virtual_link_profiles(vnfd
),
1111 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
1115 and vnfd_vlp
.get("virtual-link-protocol-data")
1116 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1118 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"][
1121 ip_profile_dest_data
= {}
1122 if "ip-version" in ip_profile_source_data
:
1123 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1126 if "cidr" in ip_profile_source_data
:
1127 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1130 if "gateway-ip" in ip_profile_source_data
:
1131 ip_profile_dest_data
[
1133 ] = ip_profile_source_data
["gateway-ip"]
1134 if "dhcp-enabled" in ip_profile_source_data
:
1135 ip_profile_dest_data
["dhcp-params"] = {
1136 "enabled": ip_profile_source_data
["dhcp-enabled"]
1139 vld_params
["ip-profile"] = ip_profile_dest_data
1140 # update vld_params with instantiation params
1142 vld_instantiation_params
= find_in_list(
1143 get_iterable(vnf_params
, "internal-vld"),
1144 lambda i_vld
: i_vld
["name"] == vld
["id"],
1146 if vld_instantiation_params
:
1147 vld_params
.update(vld_instantiation_params
)
1148 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
1151 for vdur
in target_vnf
.get("vdur", ()):
1152 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
1153 continue # This vdu must not be created
1154 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
1156 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
1159 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
1160 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
1163 and vdu_configuration
.get("config-access")
1164 and vdu_configuration
.get("config-access").get("ssh-access")
1166 vdur
["ssh-keys"] = ssh_keys_all
1167 vdur
["ssh-access-required"] = vdu_configuration
[
1169 ]["ssh-access"]["required"]
1172 and vnf_configuration
.get("config-access")
1173 and vnf_configuration
.get("config-access").get("ssh-access")
1174 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
1176 vdur
["ssh-keys"] = ssh_keys_all
1177 vdur
["ssh-access-required"] = vnf_configuration
[
1179 ]["ssh-access"]["required"]
1180 elif ssh_keys_instantiation
and find_in_list(
1181 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
1183 vdur
["ssh-keys"] = ssh_keys_instantiation
1185 self
.logger
.debug("NS > vdur > {}".format(vdur
))
1187 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1189 if vdud
.get("cloud-init-file"):
1190 vdur
["cloud-init"] = "{}:file:{}".format(
1191 vnfd
["_id"], vdud
.get("cloud-init-file")
1193 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1194 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1195 base_folder
= vnfd
["_admin"]["storage"]
1196 if base_folder
["pkg-dir"]:
1197 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1198 base_folder
["folder"],
1199 base_folder
["pkg-dir"],
1200 vdud
.get("cloud-init-file"),
1203 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
1204 base_folder
["folder"],
1205 vdud
.get("cloud-init-file"),
1207 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1208 target
["cloud_init_content"][
1211 elif vdud
.get("cloud-init"):
1212 vdur
["cloud-init"] = "{}:vdu:{}".format(
1213 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1215 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1216 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1219 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1220 deploy_params_vdu
= self
._format
_additional
_params
(
1221 vdur
.get("additionalParams") or {}
1223 deploy_params_vdu
["OSM"] = get_osm_params(
1224 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1226 vdur
["additionalParams"] = deploy_params_vdu
1229 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1230 if target_vim
not in ns_flavor
["vim_info"]:
1231 ns_flavor
["vim_info"][target_vim
] = {}
1234 # in case alternative images are provided we must check if they should be applied
1235 # for the vim_type, modify the vim_type taking into account
1236 ns_image_id
= int(vdur
["ns-image-id"])
1237 if vdur
.get("alt-image-ids"):
1238 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1239 vim_type
= db_vim
["vim_type"]
1240 for alt_image_id
in vdur
.get("alt-image-ids"):
1241 ns_alt_image
= target
["image"][int(alt_image_id
)]
1242 if vim_type
== ns_alt_image
.get("vim-type"):
1243 # must use alternative image
1245 "use alternative image id: {}".format(alt_image_id
)
1247 ns_image_id
= alt_image_id
1248 vdur
["ns-image-id"] = ns_image_id
1250 ns_image
= target
["image"][int(ns_image_id
)]
1251 if target_vim
not in ns_image
["vim_info"]:
1252 ns_image
["vim_info"][target_vim
] = {}
1255 if vdur
.get("affinity-or-anti-affinity-group-id"):
1256 for ags_id
in vdur
["affinity-or-anti-affinity-group-id"]:
1257 ns_ags
= target
["affinity-or-anti-affinity-group"][int(ags_id
)]
1258 if target_vim
not in ns_ags
["vim_info"]:
1259 ns_ags
["vim_info"][target_vim
] = {}
1261 vdur
["vim_info"] = {target_vim
: {}}
1262 # instantiation parameters
1264 vdu_instantiation_params
= find_in_list(
1265 get_iterable(vnf_params
, "vdu"),
1266 lambda i_vdu
: i_vdu
["id"] == vdud
["id"],
1268 if vdu_instantiation_params
:
1269 # Parse the vdu_volumes from the instantiation params
1270 vdu_volumes
= get_volumes_from_instantiation_params(
1271 vdu_instantiation_params
, vdud
1273 vdur
["additionalParams"]["OSM"]["vdu_volumes"] = vdu_volumes
1274 vdur_list
.append(vdur
)
1275 target_vnf
["vdur"] = vdur_list
1276 target
["vnf"].append(target_vnf
)
1278 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
1279 desc
= await self
.RO
.deploy(nsr_id
, target
)
1280 self
.logger
.debug("RO return > {}".format(desc
))
1281 action_id
= desc
["action_id"]
1282 await self
._wait
_ng
_ro
(
1283 nsr_id
, action_id
, nslcmop_id
, start_deploy
, timeout_ns_deploy
, stage
,
1284 operation
="instantiation"
1289 "_admin.deployed.RO.operational-status": "running",
1290 "detailed-status": " ".join(stage
),
1292 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1293 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1294 self
._write
_op
_status
(nslcmop_id
, stage
)
1296 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1300 async def _wait_ng_ro(
1310 detailed_status_old
= None
1312 start_time
= start_time
or time()
1313 while time() <= start_time
+ timeout
:
1314 desc_status
= await self
.op_status_map
[operation
](nsr_id
, action_id
)
1315 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1316 if desc_status
["status"] == "FAILED":
1317 raise NgRoException(desc_status
["details"])
1318 elif desc_status
["status"] == "BUILD":
1320 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1321 elif desc_status
["status"] == "DONE":
1323 stage
[2] = "Deployed at VIM"
1326 assert False, "ROclient.check_ns_status returns unknown {}".format(
1327 desc_status
["status"]
1329 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1330 detailed_status_old
= stage
[2]
1331 db_nsr_update
["detailed-status"] = " ".join(stage
)
1332 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1333 self
._write
_op
_status
(nslcmop_id
, stage
)
1334 await asyncio
.sleep(15, loop
=self
.loop
)
1335 else: # timeout_ns_deploy
1336 raise NgRoException("Timeout waiting ns to deploy")
1338 async def _terminate_ng_ro(
1339 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1344 start_deploy
= time()
1351 "action_id": nslcmop_id
,
1353 desc
= await self
.RO
.deploy(nsr_id
, target
)
1354 action_id
= desc
["action_id"]
1355 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1356 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1359 + "ns terminate action at RO. action_id={}".format(action_id
)
1363 delete_timeout
= 20 * 60 # 20 minutes
1364 await self
._wait
_ng
_ro
(
1365 nsr_id
, action_id
, nslcmop_id
, start_deploy
, delete_timeout
, stage
,
1366 operation
="termination"
1369 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1370 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1372 await self
.RO
.delete(nsr_id
)
1373 except Exception as e
:
1374 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
1375 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1376 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1377 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1379 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1381 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
1382 failed_detail
.append("delete conflict: {}".format(e
))
1385 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1388 failed_detail
.append("delete error: {}".format(e
))
1391 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1395 stage
[2] = "Error deleting from VIM"
1397 stage
[2] = "Deleted from VIM"
1398 db_nsr_update
["detailed-status"] = " ".join(stage
)
1399 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1400 self
._write
_op
_status
(nslcmop_id
, stage
)
1403 raise LcmException("; ".join(failed_detail
))
1406 async def instantiate_RO(
1420 :param logging_text: preffix text to use at logging
1421 :param nsr_id: nsr identity
1422 :param nsd: database content of ns descriptor
1423 :param db_nsr: database content of ns record
1424 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1426 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1427 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1428 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1429 :return: None or exception
1432 start_deploy
= time()
1433 ns_params
= db_nslcmop
.get("operationParams")
1434 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1435 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1437 timeout_ns_deploy
= self
.timeout
.get(
1438 "ns_deploy", self
.timeout_ns_deploy
1441 # Check for and optionally request placement optimization. Database will be updated if placement activated
1442 stage
[2] = "Waiting for Placement."
1443 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1444 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1445 for vnfr
in db_vnfrs
.values():
1446 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1449 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1451 return await self
._instantiate
_ng
_ro
(
1464 except Exception as e
:
1465 stage
[2] = "ERROR deploying at VIM"
1466 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1468 "Error deploying at VIM {}".format(e
),
1469 exc_info
=not isinstance(
1472 ROclient
.ROClientException
,
1481 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1483 Wait for kdu to be up, get ip address
1484 :param logging_text: prefix use for logging
1488 :return: IP address, K8s services
1491 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1494 while nb_tries
< 360:
1495 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1499 for x
in get_iterable(db_vnfr
, "kdur")
1500 if x
.get("kdu-name") == kdu_name
1506 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1508 if kdur
.get("status"):
1509 if kdur
["status"] in ("READY", "ENABLED"):
1510 return kdur
.get("ip-address"), kdur
.get("services")
1513 "target KDU={} is in error state".format(kdu_name
)
1516 await asyncio
.sleep(10, loop
=self
.loop
)
1518 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1520 async def wait_vm_up_insert_key_ro(
1521 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1524 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1525 :param logging_text: prefix use for logging
1530 :param pub_key: public ssh key to inject, None to skip
1531 :param user: user to apply the public ssh key
1535 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1539 target_vdu_id
= None
1545 if ro_retries
>= 360: # 1 hour
1547 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1550 await asyncio
.sleep(10, loop
=self
.loop
)
1553 if not target_vdu_id
:
1554 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1556 if not vdu_id
: # for the VNF case
1557 if db_vnfr
.get("status") == "ERROR":
1559 "Cannot inject ssh-key because target VNF is in error state"
1561 ip_address
= db_vnfr
.get("ip-address")
1567 for x
in get_iterable(db_vnfr
, "vdur")
1568 if x
.get("ip-address") == ip_address
1576 for x
in get_iterable(db_vnfr
, "vdur")
1577 if x
.get("vdu-id-ref") == vdu_id
1578 and x
.get("count-index") == vdu_index
1584 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1585 ): # If only one, this should be the target vdu
1586 vdur
= db_vnfr
["vdur"][0]
1589 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1590 vnfr_id
, vdu_id
, vdu_index
1593 # New generation RO stores information at "vim_info"
1596 if vdur
.get("vim_info"):
1598 t
for t
in vdur
["vim_info"]
1599 ) # there should be only one key
1600 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1602 vdur
.get("pdu-type")
1603 or vdur
.get("status") == "ACTIVE"
1604 or ng_ro_status
== "ACTIVE"
1606 ip_address
= vdur
.get("ip-address")
1609 target_vdu_id
= vdur
["vdu-id-ref"]
1610 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1612 "Cannot inject ssh-key because target VM is in error state"
1615 if not target_vdu_id
:
1618 # inject public key into machine
1619 if pub_key
and user
:
1620 self
.logger
.debug(logging_text
+ "Inserting RO key")
1621 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1622 if vdur
.get("pdu-type"):
1623 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1626 ro_vm_id
= "{}-{}".format(
1627 db_vnfr
["member-vnf-index-ref"], target_vdu_id
1628 ) # TODO add vdu_index
1632 "action": "inject_ssh_key",
1636 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1638 desc
= await self
.RO
.deploy(nsr_id
, target
)
1639 action_id
= desc
["action_id"]
1640 await self
._wait
_ng
_ro
(nsr_id
, action_id
, timeout
=600, operation
="instantiation")
1643 # wait until NS is deployed at RO
1645 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1646 ro_nsr_id
= deep_get(
1647 db_nsrs
, ("_admin", "deployed", "RO", "nsr_id")
1651 result_dict
= await self
.RO
.create_action(
1653 item_id_name
=ro_nsr_id
,
1655 "add_public_key": pub_key
,
1660 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1661 if not result_dict
or not isinstance(result_dict
, dict):
1663 "Unknown response from RO when injecting key"
1665 for result
in result_dict
.values():
1666 if result
.get("vim_result") == 200:
1669 raise ROclient
.ROClientException(
1670 "error injecting key: {}".format(
1671 result
.get("description")
1675 except NgRoException
as e
:
1677 "Reaching max tries injecting key. Error: {}".format(e
)
1679 except ROclient
.ROClientException
as e
:
1683 + "error injecting key: {}. Retrying until {} seconds".format(
1690 "Reaching max tries injecting key. Error: {}".format(e
)
1697 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1699 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1701 my_vca
= vca_deployed_list
[vca_index
]
1702 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1703 # vdu or kdu: no dependencies
1707 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1708 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1709 configuration_status_list
= db_nsr
["configurationStatus"]
1710 for index
, vca_deployed
in enumerate(configuration_status_list
):
1711 if index
== vca_index
:
1714 if not my_vca
.get("member-vnf-index") or (
1715 vca_deployed
.get("member-vnf-index")
1716 == my_vca
.get("member-vnf-index")
1718 internal_status
= configuration_status_list
[index
].get("status")
1719 if internal_status
== "READY":
1721 elif internal_status
== "BROKEN":
1723 "Configuration aborted because dependent charm/s has failed"
1728 # no dependencies, return
1730 await asyncio
.sleep(10)
1733 raise LcmException("Configuration aborted because dependent charm/s timeout")
1735 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1738 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1740 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1741 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1744 async def instantiate_N2VC(
1761 ee_config_descriptor
,
1763 nsr_id
= db_nsr
["_id"]
1764 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1765 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1766 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1767 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1769 "collection": "nsrs",
1770 "filter": {"_id": nsr_id
},
1771 "path": db_update_entry
,
1777 element_under_configuration
= nsr_id
1781 vnfr_id
= db_vnfr
["_id"]
1782 osm_config
["osm"]["vnf_id"] = vnfr_id
1784 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1786 if vca_type
== "native_charm":
1789 index_number
= vdu_index
or 0
1792 element_type
= "VNF"
1793 element_under_configuration
= vnfr_id
1794 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1796 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1797 element_type
= "VDU"
1798 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1799 osm_config
["osm"]["vdu_id"] = vdu_id
1801 namespace
+= ".{}".format(kdu_name
)
1802 element_type
= "KDU"
1803 element_under_configuration
= kdu_name
1804 osm_config
["osm"]["kdu_name"] = kdu_name
1807 if base_folder
["pkg-dir"]:
1808 artifact_path
= "{}/{}/{}/{}".format(
1809 base_folder
["folder"],
1810 base_folder
["pkg-dir"],
1813 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1818 artifact_path
= "{}/Scripts/{}/{}/".format(
1819 base_folder
["folder"],
1822 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1827 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1829 # get initial_config_primitive_list that applies to this element
1830 initial_config_primitive_list
= config_descriptor
.get(
1831 "initial-config-primitive"
1835 "Initial config primitive list > {}".format(
1836 initial_config_primitive_list
1840 # add config if not present for NS charm
1841 ee_descriptor_id
= ee_config_descriptor
.get("id")
1842 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1843 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1844 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1848 "Initial config primitive list #2 > {}".format(
1849 initial_config_primitive_list
1852 # n2vc_redesign STEP 3.1
1853 # find old ee_id if exists
1854 ee_id
= vca_deployed
.get("ee_id")
1856 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1857 # create or register execution environment in VCA
1858 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1860 self
._write
_configuration
_status
(
1862 vca_index
=vca_index
,
1864 element_under_configuration
=element_under_configuration
,
1865 element_type
=element_type
,
1868 step
= "create execution environment"
1869 self
.logger
.debug(logging_text
+ step
)
1873 if vca_type
== "k8s_proxy_charm":
1874 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1875 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1876 namespace
=namespace
,
1877 artifact_path
=artifact_path
,
1881 elif vca_type
== "helm" or vca_type
== "helm-v3":
1882 ee_id
, credentials
= await self
.vca_map
[
1884 ].create_execution_environment(
1885 namespace
=namespace
,
1889 artifact_path
=artifact_path
,
1893 ee_id
, credentials
= await self
.vca_map
[
1895 ].create_execution_environment(
1896 namespace
=namespace
,
1902 elif vca_type
== "native_charm":
1903 step
= "Waiting to VM being up and getting IP address"
1904 self
.logger
.debug(logging_text
+ step
)
1905 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1914 credentials
= {"hostname": rw_mgmt_ip
}
1916 username
= deep_get(
1917 config_descriptor
, ("config-access", "ssh-access", "default-user")
1919 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1920 # merged. Meanwhile let's get username from initial-config-primitive
1921 if not username
and initial_config_primitive_list
:
1922 for config_primitive
in initial_config_primitive_list
:
1923 for param
in config_primitive
.get("parameter", ()):
1924 if param
["name"] == "ssh-username":
1925 username
= param
["value"]
1929 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1930 "'config-access.ssh-access.default-user'"
1932 credentials
["username"] = username
1933 # n2vc_redesign STEP 3.2
1935 self
._write
_configuration
_status
(
1937 vca_index
=vca_index
,
1938 status
="REGISTERING",
1939 element_under_configuration
=element_under_configuration
,
1940 element_type
=element_type
,
1943 step
= "register execution environment {}".format(credentials
)
1944 self
.logger
.debug(logging_text
+ step
)
1945 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1946 credentials
=credentials
,
1947 namespace
=namespace
,
1952 # for compatibility with MON/POL modules, the need model and application name at database
1953 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1954 ee_id_parts
= ee_id
.split(".")
1955 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1956 if len(ee_id_parts
) >= 2:
1957 model_name
= ee_id_parts
[0]
1958 application_name
= ee_id_parts
[1]
1959 db_nsr_update
[db_update_entry
+ "model"] = model_name
1960 db_nsr_update
[db_update_entry
+ "application"] = application_name
1962 # n2vc_redesign STEP 3.3
1963 step
= "Install configuration Software"
1965 self
._write
_configuration
_status
(
1967 vca_index
=vca_index
,
1968 status
="INSTALLING SW",
1969 element_under_configuration
=element_under_configuration
,
1970 element_type
=element_type
,
1971 other_update
=db_nsr_update
,
1974 # TODO check if already done
1975 self
.logger
.debug(logging_text
+ step
)
1977 if vca_type
== "native_charm":
1978 config_primitive
= next(
1979 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
1982 if config_primitive
:
1983 config
= self
._map
_primitive
_params
(
1984 config_primitive
, {}, deploy_params
1987 if vca_type
== "lxc_proxy_charm":
1988 if element_type
== "NS":
1989 num_units
= db_nsr
.get("config-units") or 1
1990 elif element_type
== "VNF":
1991 num_units
= db_vnfr
.get("config-units") or 1
1992 elif element_type
== "VDU":
1993 for v
in db_vnfr
["vdur"]:
1994 if vdu_id
== v
["vdu-id-ref"]:
1995 num_units
= v
.get("config-units") or 1
1997 if vca_type
!= "k8s_proxy_charm":
1998 await self
.vca_map
[vca_type
].install_configuration_sw(
2000 artifact_path
=artifact_path
,
2003 num_units
=num_units
,
2008 # write in db flag of configuration_sw already installed
2010 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
2013 # add relations for this VCA (wait for other peers related with this VCA)
2014 await self
._add
_vca
_relations
(
2015 logging_text
=logging_text
,
2018 vca_index
=vca_index
,
2021 # if SSH access is required, then get execution environment SSH public
2022 # if native charm we have waited already to VM be UP
2023 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
2026 # self.logger.debug("get ssh key block")
2028 config_descriptor
, ("config-access", "ssh-access", "required")
2030 # self.logger.debug("ssh key needed")
2031 # Needed to inject a ssh key
2034 ("config-access", "ssh-access", "default-user"),
2036 step
= "Install configuration Software, getting public ssh key"
2037 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
2038 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
2041 step
= "Insert public key into VM user={} ssh_key={}".format(
2045 # self.logger.debug("no need to get ssh key")
2046 step
= "Waiting to VM being up and getting IP address"
2047 self
.logger
.debug(logging_text
+ step
)
2049 # default rw_mgmt_ip to None, avoiding the non definition of the variable
2052 # n2vc_redesign STEP 5.1
2053 # wait for RO (ip-address) Insert pub_key into VM
2056 rw_mgmt_ip
, services
= await self
.wait_kdu_up(
2057 logging_text
, nsr_id
, vnfr_id
, kdu_name
2059 vnfd
= self
.db
.get_one(
2061 {"_id": f
'{db_vnfr["vnfd-id"]}:{db_vnfr["revision"]}'},
2063 kdu
= get_kdu(vnfd
, kdu_name
)
2065 service
["name"] for service
in get_kdu_services(kdu
)
2067 exposed_services
= []
2068 for service
in services
:
2069 if any(s
in service
["name"] for s
in kdu_services
):
2070 exposed_services
.append(service
)
2071 await self
.vca_map
[vca_type
].exec_primitive(
2073 primitive_name
="config",
2075 "osm-config": json
.dumps(
2077 k8s
={"services": exposed_services
}
2084 # This verification is needed in order to avoid trying to add a public key
2085 # to a VM, when the VNF is a KNF (in the edge case where the user creates a VCA
2086 # for a KNF and not for its KDUs, the previous verification gives False, and the code
2087 # jumps to this block, meaning that there is the need to verify if the VNF is actually a VNF
2089 elif db_vnfr
.get('vdur'):
2090 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
2100 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
2102 # store rw_mgmt_ip in deploy params for later replacement
2103 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
2105 # n2vc_redesign STEP 6 Execute initial config primitive
2106 step
= "execute initial config primitive"
2108 # wait for dependent primitives execution (NS -> VNF -> VDU)
2109 if initial_config_primitive_list
:
2110 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
2112 # stage, in function of element type: vdu, kdu, vnf or ns
2113 my_vca
= vca_deployed_list
[vca_index
]
2114 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
2116 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
2117 elif my_vca
.get("member-vnf-index"):
2119 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
2122 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
2124 self
._write
_configuration
_status
(
2125 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
2128 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2130 check_if_terminated_needed
= True
2131 for initial_config_primitive
in initial_config_primitive_list
:
2132 # adding information on the vca_deployed if it is a NS execution environment
2133 if not vca_deployed
["member-vnf-index"]:
2134 deploy_params
["ns_config_info"] = json
.dumps(
2135 self
._get
_ns
_config
_info
(nsr_id
)
2137 # TODO check if already done
2138 primitive_params_
= self
._map
_primitive
_params
(
2139 initial_config_primitive
, {}, deploy_params
2142 step
= "execute primitive '{}' params '{}'".format(
2143 initial_config_primitive
["name"], primitive_params_
2145 self
.logger
.debug(logging_text
+ step
)
2146 await self
.vca_map
[vca_type
].exec_primitive(
2148 primitive_name
=initial_config_primitive
["name"],
2149 params_dict
=primitive_params_
,
2154 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2155 if check_if_terminated_needed
:
2156 if config_descriptor
.get("terminate-config-primitive"):
2158 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
2160 check_if_terminated_needed
= False
2162 # TODO register in database that primitive is done
2164 # STEP 7 Configure metrics
2165 if vca_type
== "helm" or vca_type
== "helm-v3":
2166 prometheus_jobs
= await self
.extract_prometheus_scrape_jobs(
2168 artifact_path
=artifact_path
,
2169 ee_config_descriptor
=ee_config_descriptor
,
2172 target_ip
=rw_mgmt_ip
,
2178 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
2181 for job
in prometheus_jobs
:
2184 {"job_name": job
["job_name"]},
2187 fail_on_empty
=False,
2190 step
= "instantiated at VCA"
2191 self
.logger
.debug(logging_text
+ step
)
2193 self
._write
_configuration
_status
(
2194 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
2197 except Exception as e
: # TODO not use Exception but N2VC exception
2198 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2200 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
2203 "Exception while {} : {}".format(step
, e
), exc_info
=True
2205 self
._write
_configuration
_status
(
2206 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2208 raise LcmException("{} {}".format(step
, e
)) from e
2210 def _write_ns_status(
2214 current_operation
: str,
2215 current_operation_id
: str,
2216 error_description
: str = None,
2217 error_detail
: str = None,
2218 other_update
: dict = None,
2221 Update db_nsr fields.
2224 :param current_operation:
2225 :param current_operation_id:
2226 :param error_description:
2227 :param error_detail:
2228 :param other_update: Other required changes at database if provided, will be cleared
2232 db_dict
= other_update
or {}
2235 ] = current_operation_id
# for backward compatibility
2236 db_dict
["_admin.current-operation"] = current_operation_id
2237 db_dict
["_admin.operation-type"] = (
2238 current_operation
if current_operation
!= "IDLE" else None
2240 db_dict
["currentOperation"] = current_operation
2241 db_dict
["currentOperationID"] = current_operation_id
2242 db_dict
["errorDescription"] = error_description
2243 db_dict
["errorDetail"] = error_detail
2246 db_dict
["nsState"] = ns_state
2247 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2248 except DbException
as e
:
2249 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2251 def _write_op_status(
2255 error_message
: str = None,
2256 queuePosition
: int = 0,
2257 operation_state
: str = None,
2258 other_update
: dict = None,
2261 db_dict
= other_update
or {}
2262 db_dict
["queuePosition"] = queuePosition
2263 if isinstance(stage
, list):
2264 db_dict
["stage"] = stage
[0]
2265 db_dict
["detailed-status"] = " ".join(stage
)
2266 elif stage
is not None:
2267 db_dict
["stage"] = str(stage
)
2269 if error_message
is not None:
2270 db_dict
["errorMessage"] = error_message
2271 if operation_state
is not None:
2272 db_dict
["operationState"] = operation_state
2273 db_dict
["statusEnteredTime"] = time()
2274 self
.update_db_2("nslcmops", op_id
, db_dict
)
2275 except DbException
as e
:
2277 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2280 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2282 nsr_id
= db_nsr
["_id"]
2283 # configurationStatus
2284 config_status
= db_nsr
.get("configurationStatus")
2287 "configurationStatus.{}.status".format(index
): status
2288 for index
, v
in enumerate(config_status
)
2292 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2294 except DbException
as e
:
2296 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2299 def _write_configuration_status(
2304 element_under_configuration
: str = None,
2305 element_type
: str = None,
2306 other_update
: dict = None,
2309 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2310 # .format(vca_index, status))
2313 db_path
= "configurationStatus.{}.".format(vca_index
)
2314 db_dict
= other_update
or {}
2316 db_dict
[db_path
+ "status"] = status
2317 if element_under_configuration
:
2319 db_path
+ "elementUnderConfiguration"
2320 ] = element_under_configuration
2322 db_dict
[db_path
+ "elementType"] = element_type
2323 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2324 except DbException
as e
:
2326 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2327 status
, nsr_id
, vca_index
, e
2331 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2333 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2334 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2335 Database is used because the result can be obtained from a different LCM worker in case of HA.
2336 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2337 :param db_nslcmop: database content of nslcmop
2338 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2339 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2340 computed 'vim-account-id'
2343 nslcmop_id
= db_nslcmop
["_id"]
2344 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2345 if placement_engine
== "PLA":
2347 logging_text
+ "Invoke and wait for placement optimization"
2349 await self
.msg
.aiowrite(
2350 "pla", "get_placement", {"nslcmopId": nslcmop_id
}, loop
=self
.loop
2352 db_poll_interval
= 5
2353 wait
= db_poll_interval
* 10
2355 while not pla_result
and wait
>= 0:
2356 await asyncio
.sleep(db_poll_interval
)
2357 wait
-= db_poll_interval
2358 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2359 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2363 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2366 for pla_vnf
in pla_result
["vnf"]:
2367 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2368 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2373 {"_id": vnfr
["_id"]},
2374 {"vim-account-id": pla_vnf
["vimAccountId"]},
2377 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2380 def update_nsrs_with_pla_result(self
, params
):
2382 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2384 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2386 except Exception as e
:
2387 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2389 async def instantiate(self
, nsr_id
, nslcmop_id
):
2392 :param nsr_id: ns instance to deploy
2393 :param nslcmop_id: operation to run
2397 # Try to lock HA task here
2398 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2399 if not task_is_locked_by_me
:
2401 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2405 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2406 self
.logger
.debug(logging_text
+ "Enter")
2408 # get all needed from database
2410 # database nsrs record
2413 # database nslcmops record
2416 # update operation on nsrs
2418 # update operation on nslcmops
2419 db_nslcmop_update
= {}
2421 nslcmop_operation_state
= None
2422 db_vnfrs
= {} # vnf's info indexed by member-index
2424 tasks_dict_info
= {} # from task to info text
2428 "Stage 1/5: preparation of the environment.",
2429 "Waiting for previous operations to terminate.",
2432 # ^ stage, step, VIM progress
2434 # wait for any previous tasks in process
2435 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2437 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2438 stage
[1] = "Reading from database."
2439 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2440 db_nsr_update
["detailed-status"] = "creating"
2441 db_nsr_update
["operational-status"] = "init"
2442 self
._write
_ns
_status
(
2444 ns_state
="BUILDING",
2445 current_operation
="INSTANTIATING",
2446 current_operation_id
=nslcmop_id
,
2447 other_update
=db_nsr_update
,
2449 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2451 # read from db: operation
2452 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2453 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2454 if db_nslcmop
["operationParams"].get("additionalParamsForVnf"):
2455 db_nslcmop
["operationParams"]["additionalParamsForVnf"] = json
.loads(
2456 db_nslcmop
["operationParams"]["additionalParamsForVnf"]
2458 ns_params
= db_nslcmop
.get("operationParams")
2459 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2460 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2462 timeout_ns_deploy
= self
.timeout
.get(
2463 "ns_deploy", self
.timeout_ns_deploy
2467 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2468 self
.logger
.debug(logging_text
+ stage
[1])
2469 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2470 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2471 self
.logger
.debug(logging_text
+ stage
[1])
2472 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2473 self
.fs
.sync(db_nsr
["nsd-id"])
2475 # nsr_name = db_nsr["name"] # TODO short-name??
2477 # read from db: vnf's of this ns
2478 stage
[1] = "Getting vnfrs from db."
2479 self
.logger
.debug(logging_text
+ stage
[1])
2480 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2482 # read from db: vnfd's for every vnf
2483 db_vnfds
= [] # every vnfd data
2485 # for each vnf in ns, read vnfd
2486 for vnfr
in db_vnfrs_list
:
2487 if vnfr
.get("kdur"):
2489 for kdur
in vnfr
["kdur"]:
2490 if kdur
.get("additionalParams"):
2491 kdur
["additionalParams"] = json
.loads(
2492 kdur
["additionalParams"]
2494 kdur_list
.append(kdur
)
2495 vnfr
["kdur"] = kdur_list
2497 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2498 vnfd_id
= vnfr
["vnfd-id"]
2499 vnfd_ref
= vnfr
["vnfd-ref"]
2500 self
.fs
.sync(vnfd_id
)
2502 # if we haven't this vnfd, read it from db
2503 if vnfd_id
not in db_vnfds
:
2505 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2508 self
.logger
.debug(logging_text
+ stage
[1])
2509 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2512 db_vnfds
.append(vnfd
)
2514 # Get or generates the _admin.deployed.VCA list
2515 vca_deployed_list
= None
2516 if db_nsr
["_admin"].get("deployed"):
2517 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2518 if vca_deployed_list
is None:
2519 vca_deployed_list
= []
2520 configuration_status_list
= []
2521 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2522 db_nsr_update
["configurationStatus"] = configuration_status_list
2523 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2524 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2525 elif isinstance(vca_deployed_list
, dict):
2526 # maintain backward compatibility. Change a dict to list at database
2527 vca_deployed_list
= list(vca_deployed_list
.values())
2528 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2529 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2532 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2534 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2535 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2537 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2538 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2539 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2541 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2544 # n2vc_redesign STEP 2 Deploy Network Scenario
2545 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2546 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2548 stage
[1] = "Deploying KDUs."
2549 # self.logger.debug(logging_text + "Before deploy_kdus")
2550 # Call to deploy_kdus in case exists the "vdu:kdu" param
2551 await self
.deploy_kdus(
2552 logging_text
=logging_text
,
2554 nslcmop_id
=nslcmop_id
,
2557 task_instantiation_info
=tasks_dict_info
,
2560 stage
[1] = "Getting VCA public key."
2561 # n2vc_redesign STEP 1 Get VCA public ssh-key
2562 # feature 1429. Add n2vc public key to needed VMs
2563 n2vc_key
= self
.n2vc
.get_public_key()
2564 n2vc_key_list
= [n2vc_key
]
2565 if self
.vca_config
.get("public_key"):
2566 n2vc_key_list
.append(self
.vca_config
["public_key"])
2568 stage
[1] = "Deploying NS at VIM."
2569 task_ro
= asyncio
.ensure_future(
2570 self
.instantiate_RO(
2571 logging_text
=logging_text
,
2575 db_nslcmop
=db_nslcmop
,
2578 n2vc_key_list
=n2vc_key_list
,
2582 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2583 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2585 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2586 stage
[1] = "Deploying Execution Environments."
2587 self
.logger
.debug(logging_text
+ stage
[1])
2589 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2590 for vnf_profile
in get_vnf_profiles(nsd
):
2591 vnfd_id
= vnf_profile
["vnfd-id"]
2592 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2593 member_vnf_index
= str(vnf_profile
["id"])
2594 db_vnfr
= db_vnfrs
[member_vnf_index
]
2595 base_folder
= vnfd
["_admin"]["storage"]
2601 # Get additional parameters
2602 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2603 if db_vnfr
.get("additionalParamsForVnf"):
2604 deploy_params
.update(
2605 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2608 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2609 if descriptor_config
:
2611 logging_text
=logging_text
2612 + "member_vnf_index={} ".format(member_vnf_index
),
2615 nslcmop_id
=nslcmop_id
,
2621 member_vnf_index
=member_vnf_index
,
2622 vdu_index
=vdu_index
,
2624 deploy_params
=deploy_params
,
2625 descriptor_config
=descriptor_config
,
2626 base_folder
=base_folder
,
2627 task_instantiation_info
=tasks_dict_info
,
2631 # Deploy charms for each VDU that supports one.
2632 for vdud
in get_vdu_list(vnfd
):
2634 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2635 vdur
= find_in_list(
2636 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2639 if vdur
.get("additionalParams"):
2640 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2642 deploy_params_vdu
= deploy_params
2643 deploy_params_vdu
["OSM"] = get_osm_params(
2644 db_vnfr
, vdu_id
, vdu_count_index
=0
2646 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2648 self
.logger
.debug("VDUD > {}".format(vdud
))
2650 "Descriptor config > {}".format(descriptor_config
)
2652 if descriptor_config
:
2655 for vdu_index
in range(vdud_count
):
2656 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2658 logging_text
=logging_text
2659 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2660 member_vnf_index
, vdu_id
, vdu_index
2664 nslcmop_id
=nslcmop_id
,
2670 member_vnf_index
=member_vnf_index
,
2671 vdu_index
=vdu_index
,
2673 deploy_params
=deploy_params_vdu
,
2674 descriptor_config
=descriptor_config
,
2675 base_folder
=base_folder
,
2676 task_instantiation_info
=tasks_dict_info
,
2679 for kdud
in get_kdu_list(vnfd
):
2680 kdu_name
= kdud
["name"]
2681 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2682 if descriptor_config
:
2687 x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
2689 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2690 if kdur
.get("additionalParams"):
2691 deploy_params_kdu
.update(
2692 parse_yaml_strings(kdur
["additionalParams"].copy())
2696 logging_text
=logging_text
,
2699 nslcmop_id
=nslcmop_id
,
2705 member_vnf_index
=member_vnf_index
,
2706 vdu_index
=vdu_index
,
2708 deploy_params
=deploy_params_kdu
,
2709 descriptor_config
=descriptor_config
,
2710 base_folder
=base_folder
,
2711 task_instantiation_info
=tasks_dict_info
,
2715 # Check if this NS has a charm configuration
2716 descriptor_config
= nsd
.get("ns-configuration")
2717 if descriptor_config
and descriptor_config
.get("juju"):
2720 member_vnf_index
= None
2726 # Get additional parameters
2727 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2728 if db_nsr
.get("additionalParamsForNs"):
2729 deploy_params
.update(
2730 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2732 base_folder
= nsd
["_admin"]["storage"]
2734 logging_text
=logging_text
,
2737 nslcmop_id
=nslcmop_id
,
2743 member_vnf_index
=member_vnf_index
,
2744 vdu_index
=vdu_index
,
2746 deploy_params
=deploy_params
,
2747 descriptor_config
=descriptor_config
,
2748 base_folder
=base_folder
,
2749 task_instantiation_info
=tasks_dict_info
,
2753 # rest of staff will be done at finally
2756 ROclient
.ROClientException
,
2762 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
2765 except asyncio
.CancelledError
:
2767 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
2769 exc
= "Operation was cancelled"
2770 except Exception as e
:
2771 exc
= traceback
.format_exc()
2772 self
.logger
.critical(
2773 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
2778 error_list
.append(str(exc
))
2780 # wait for pending tasks
2782 stage
[1] = "Waiting for instantiate pending tasks."
2783 self
.logger
.debug(logging_text
+ stage
[1])
2784 error_list
+= await self
._wait
_for
_tasks
(
2792 stage
[1] = stage
[2] = ""
2793 except asyncio
.CancelledError
:
2794 error_list
.append("Cancelled")
2795 # TODO cancel all tasks
2796 except Exception as exc
:
2797 error_list
.append(str(exc
))
2799 # update operation-status
2800 db_nsr_update
["operational-status"] = "running"
2801 # let's begin with VCA 'configured' status (later we can change it)
2802 db_nsr_update
["config-status"] = "configured"
2803 for task
, task_name
in tasks_dict_info
.items():
2804 if not task
.done() or task
.cancelled() or task
.exception():
2805 if task_name
.startswith(self
.task_name_deploy_vca
):
2806 # A N2VC task is pending
2807 db_nsr_update
["config-status"] = "failed"
2809 # RO or KDU task is pending
2810 db_nsr_update
["operational-status"] = "failed"
2812 # update status at database
2814 error_detail
= ". ".join(error_list
)
2815 self
.logger
.error(logging_text
+ error_detail
)
2816 error_description_nslcmop
= "{} Detail: {}".format(
2817 stage
[0], error_detail
2819 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
2820 nslcmop_id
, stage
[0]
2823 db_nsr_update
["detailed-status"] = (
2824 error_description_nsr
+ " Detail: " + error_detail
2826 db_nslcmop_update
["detailed-status"] = error_detail
2827 nslcmop_operation_state
= "FAILED"
2831 error_description_nsr
= error_description_nslcmop
= None
2833 db_nsr_update
["detailed-status"] = "Done"
2834 db_nslcmop_update
["detailed-status"] = "Done"
2835 nslcmop_operation_state
= "COMPLETED"
2838 self
._write
_ns
_status
(
2841 current_operation
="IDLE",
2842 current_operation_id
=None,
2843 error_description
=error_description_nsr
,
2844 error_detail
=error_detail
,
2845 other_update
=db_nsr_update
,
2847 self
._write
_op
_status
(
2850 error_message
=error_description_nslcmop
,
2851 operation_state
=nslcmop_operation_state
,
2852 other_update
=db_nslcmop_update
,
2855 if nslcmop_operation_state
:
2857 await self
.msg
.aiowrite(
2862 "nslcmop_id": nslcmop_id
,
2863 "operationState": nslcmop_operation_state
,
2867 except Exception as e
:
2869 logging_text
+ "kafka_write notification Exception {}".format(e
)
2872 self
.logger
.debug(logging_text
+ "Exit")
2873 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2875 def _get_vnfd(self
, vnfd_id
: str, cached_vnfds
: Dict
[str, Any
]):
2876 if vnfd_id
not in cached_vnfds
:
2877 cached_vnfds
[vnfd_id
] = self
.db
.get_one("vnfds", {"id": vnfd_id
})
2878 return cached_vnfds
[vnfd_id
]
2880 def _get_vnfr(self
, nsr_id
: str, vnf_profile_id
: str, cached_vnfrs
: Dict
[str, Any
]):
2881 if vnf_profile_id
not in cached_vnfrs
:
2882 cached_vnfrs
[vnf_profile_id
] = self
.db
.get_one(
2885 "member-vnf-index-ref": vnf_profile_id
,
2886 "nsr-id-ref": nsr_id
,
2889 return cached_vnfrs
[vnf_profile_id
]
2891 def _is_deployed_vca_in_relation(
2892 self
, vca
: DeployedVCA
, relation
: Relation
2895 for endpoint
in (relation
.provider
, relation
.requirer
):
2896 if endpoint
["kdu-resource-profile-id"]:
2899 vca
.vnf_profile_id
== endpoint
.vnf_profile_id
2900 and vca
.vdu_profile_id
== endpoint
.vdu_profile_id
2901 and vca
.execution_environment_ref
== endpoint
.execution_environment_ref
2907 def _update_ee_relation_data_with_implicit_data(
2908 self
, nsr_id
, nsd
, ee_relation_data
, cached_vnfds
, vnf_profile_id
: str = None
2910 ee_relation_data
= safe_get_ee_relation(
2911 nsr_id
, ee_relation_data
, vnf_profile_id
=vnf_profile_id
2913 ee_relation_level
= EELevel
.get_level(ee_relation_data
)
2914 if (ee_relation_level
in (EELevel
.VNF
, EELevel
.VDU
)) and not ee_relation_data
[
2915 "execution-environment-ref"
2917 vnf_profile
= get_vnf_profile(nsd
, ee_relation_data
["vnf-profile-id"])
2918 vnfd_id
= vnf_profile
["vnfd-id"]
2919 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2922 if ee_relation_level
== EELevel
.VNF
2923 else ee_relation_data
["vdu-profile-id"]
2925 ee
= get_juju_ee_ref(db_vnfd
, entity_id
)
2928 f
"not execution environments found for ee_relation {ee_relation_data}"
2930 ee_relation_data
["execution-environment-ref"] = ee
["id"]
2931 return ee_relation_data
2933 def _get_ns_relations(
2936 nsd
: Dict
[str, Any
],
2938 cached_vnfds
: Dict
[str, Any
],
2939 ) -> List
[Relation
]:
2941 db_ns_relations
= get_ns_configuration_relation_list(nsd
)
2942 for r
in db_ns_relations
:
2943 provider_dict
= None
2944 requirer_dict
= None
2945 if all(key
in r
for key
in ("provider", "requirer")):
2946 provider_dict
= r
["provider"]
2947 requirer_dict
= r
["requirer"]
2948 elif "entities" in r
:
2949 provider_id
= r
["entities"][0]["id"]
2952 "endpoint": r
["entities"][0]["endpoint"],
2954 if provider_id
!= nsd
["id"]:
2955 provider_dict
["vnf-profile-id"] = provider_id
2956 requirer_id
= r
["entities"][1]["id"]
2959 "endpoint": r
["entities"][1]["endpoint"],
2961 if requirer_id
!= nsd
["id"]:
2962 requirer_dict
["vnf-profile-id"] = requirer_id
2965 "provider/requirer or entities must be included in the relation."
2967 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2968 nsr_id
, nsd
, provider_dict
, cached_vnfds
2970 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2971 nsr_id
, nsd
, requirer_dict
, cached_vnfds
2973 provider
= EERelation(relation_provider
)
2974 requirer
= EERelation(relation_requirer
)
2975 relation
= Relation(r
["name"], provider
, requirer
)
2976 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
2978 relations
.append(relation
)
2981 def _get_vnf_relations(
2984 nsd
: Dict
[str, Any
],
2986 cached_vnfds
: Dict
[str, Any
],
2987 ) -> List
[Relation
]:
2989 vnf_profile
= get_vnf_profile(nsd
, vca
.vnf_profile_id
)
2990 vnf_profile_id
= vnf_profile
["id"]
2991 vnfd_id
= vnf_profile
["vnfd-id"]
2992 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2993 db_vnf_relations
= get_relation_list(db_vnfd
, vnfd_id
)
2994 for r
in db_vnf_relations
:
2995 provider_dict
= None
2996 requirer_dict
= None
2997 if all(key
in r
for key
in ("provider", "requirer")):
2998 provider_dict
= r
["provider"]
2999 requirer_dict
= r
["requirer"]
3000 elif "entities" in r
:
3001 provider_id
= r
["entities"][0]["id"]
3004 "vnf-profile-id": vnf_profile_id
,
3005 "endpoint": r
["entities"][0]["endpoint"],
3007 if provider_id
!= vnfd_id
:
3008 provider_dict
["vdu-profile-id"] = provider_id
3009 requirer_id
= r
["entities"][1]["id"]
3012 "vnf-profile-id": vnf_profile_id
,
3013 "endpoint": r
["entities"][1]["endpoint"],
3015 if requirer_id
!= vnfd_id
:
3016 requirer_dict
["vdu-profile-id"] = requirer_id
3019 "provider/requirer or entities must be included in the relation."
3021 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3022 nsr_id
, nsd
, provider_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3024 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3025 nsr_id
, nsd
, requirer_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3027 provider
= EERelation(relation_provider
)
3028 requirer
= EERelation(relation_requirer
)
3029 relation
= Relation(r
["name"], provider
, requirer
)
3030 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
3032 relations
.append(relation
)
3035 def _get_kdu_resource_data(
3037 ee_relation
: EERelation
,
3038 db_nsr
: Dict
[str, Any
],
3039 cached_vnfds
: Dict
[str, Any
],
3040 ) -> DeployedK8sResource
:
3041 nsd
= get_nsd(db_nsr
)
3042 vnf_profiles
= get_vnf_profiles(nsd
)
3043 vnfd_id
= find_in_list(
3045 lambda vnf_profile
: vnf_profile
["id"] == ee_relation
.vnf_profile_id
,
3047 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
3048 kdu_resource_profile
= get_kdu_resource_profile(
3049 db_vnfd
, ee_relation
.kdu_resource_profile_id
3051 kdu_name
= kdu_resource_profile
["kdu-name"]
3052 deployed_kdu
, _
= get_deployed_kdu(
3053 db_nsr
.get("_admin", ()).get("deployed", ()),
3055 ee_relation
.vnf_profile_id
,
3057 deployed_kdu
.update({"resource-name": kdu_resource_profile
["resource-name"]})
3060 def _get_deployed_component(
3062 ee_relation
: EERelation
,
3063 db_nsr
: Dict
[str, Any
],
3064 cached_vnfds
: Dict
[str, Any
],
3065 ) -> DeployedComponent
:
3066 nsr_id
= db_nsr
["_id"]
3067 deployed_component
= None
3068 ee_level
= EELevel
.get_level(ee_relation
)
3069 if ee_level
== EELevel
.NS
:
3070 vca
= get_deployed_vca(db_nsr
, {"vdu_id": None, "member-vnf-index": None})
3072 deployed_component
= DeployedVCA(nsr_id
, vca
)
3073 elif ee_level
== EELevel
.VNF
:
3074 vca
= get_deployed_vca(
3078 "member-vnf-index": ee_relation
.vnf_profile_id
,
3079 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3083 deployed_component
= DeployedVCA(nsr_id
, vca
)
3084 elif ee_level
== EELevel
.VDU
:
3085 vca
= get_deployed_vca(
3088 "vdu_id": ee_relation
.vdu_profile_id
,
3089 "member-vnf-index": ee_relation
.vnf_profile_id
,
3090 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3094 deployed_component
= DeployedVCA(nsr_id
, vca
)
3095 elif ee_level
== EELevel
.KDU
:
3096 kdu_resource_data
= self
._get
_kdu
_resource
_data
(
3097 ee_relation
, db_nsr
, cached_vnfds
3099 if kdu_resource_data
:
3100 deployed_component
= DeployedK8sResource(kdu_resource_data
)
3101 return deployed_component
3103 async def _add_relation(
3107 db_nsr
: Dict
[str, Any
],
3108 cached_vnfds
: Dict
[str, Any
],
3109 cached_vnfrs
: Dict
[str, Any
],
3111 deployed_provider
= self
._get
_deployed
_component
(
3112 relation
.provider
, db_nsr
, cached_vnfds
3114 deployed_requirer
= self
._get
_deployed
_component
(
3115 relation
.requirer
, db_nsr
, cached_vnfds
3119 and deployed_requirer
3120 and deployed_provider
.config_sw_installed
3121 and deployed_requirer
.config_sw_installed
3123 provider_db_vnfr
= (
3125 relation
.provider
.nsr_id
,
3126 relation
.provider
.vnf_profile_id
,
3129 if relation
.provider
.vnf_profile_id
3132 requirer_db_vnfr
= (
3134 relation
.requirer
.nsr_id
,
3135 relation
.requirer
.vnf_profile_id
,
3138 if relation
.requirer
.vnf_profile_id
3141 provider_vca_id
= self
.get_vca_id(provider_db_vnfr
, db_nsr
)
3142 requirer_vca_id
= self
.get_vca_id(requirer_db_vnfr
, db_nsr
)
3143 provider_relation_endpoint
= RelationEndpoint(
3144 deployed_provider
.ee_id
,
3146 relation
.provider
.endpoint
,
3148 requirer_relation_endpoint
= RelationEndpoint(
3149 deployed_requirer
.ee_id
,
3151 relation
.requirer
.endpoint
,
3153 await self
.vca_map
[vca_type
].add_relation(
3154 provider
=provider_relation_endpoint
,
3155 requirer
=requirer_relation_endpoint
,
3157 # remove entry from relations list
3161 async def _add_vca_relations(
3167 timeout
: int = 3600,
3171 # 1. find all relations for this VCA
3172 # 2. wait for other peers related
3176 # STEP 1: find all relations for this VCA
3179 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3180 nsd
= get_nsd(db_nsr
)
3183 deployed_vca_dict
= get_deployed_vca_list(db_nsr
)[vca_index
]
3184 my_vca
= DeployedVCA(nsr_id
, deployed_vca_dict
)
3189 relations
.extend(self
._get
_ns
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3190 relations
.extend(self
._get
_vnf
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3192 # if no relations, terminate
3194 self
.logger
.debug(logging_text
+ " No relations")
3197 self
.logger
.debug(logging_text
+ " adding relations {}".format(relations
))
3204 if now
- start
>= timeout
:
3205 self
.logger
.error(logging_text
+ " : timeout adding relations")
3208 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3209 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3211 # for each relation, find the VCA's related
3212 for relation
in relations
.copy():
3213 added
= await self
._add
_relation
(
3221 relations
.remove(relation
)
3224 self
.logger
.debug("Relations added")
3226 await asyncio
.sleep(5.0)
3230 except Exception as e
:
3231 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
3234 async def _install_kdu(
3242 k8s_instance_info
: dict,
3243 k8params
: dict = None,
3249 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
3252 "collection": "nsrs",
3253 "filter": {"_id": nsr_id
},
3254 "path": nsr_db_path
,
3257 if k8s_instance_info
.get("kdu-deployment-name"):
3258 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
3260 kdu_instance
= self
.k8scluster_map
[
3262 ].generate_kdu_instance_name(
3263 db_dict
=db_dict_install
,
3264 kdu_model
=k8s_instance_info
["kdu-model"],
3265 kdu_name
=k8s_instance_info
["kdu-name"],
3268 # Update the nsrs table with the kdu-instance value
3272 _desc
={nsr_db_path
+ ".kdu-instance": kdu_instance
},
3275 # Update the nsrs table with the actual namespace being used, if the k8scluster-type is `juju` or
3276 # `juju-bundle`. This verification is needed because there is not a standard/homogeneous namespace
3277 # between the Helm Charts and Juju Bundles-based KNFs. If we found a way of having an homogeneous
3278 # namespace, this first verification could be removed, and the next step would be done for any kind
3280 # TODO -> find a way to have an homogeneous namespace between the Helm Charts and Juju Bundles-based
3281 # KNFs (Bug 2027: https://osm.etsi.org/bugzilla/show_bug.cgi?id=2027)
3282 if k8sclustertype
in ("juju", "juju-bundle"):
3283 # First, verify if the current namespace is present in the `_admin.projects_read` (if not, it means
3284 # that the user passed a namespace which he wants its KDU to be deployed in)
3290 "_admin.projects_write": k8s_instance_info
["namespace"],
3291 "_admin.projects_read": k8s_instance_info
["namespace"],
3297 f
"Updating namespace/model for Juju Bundle from {k8s_instance_info['namespace']} to {kdu_instance}"
3302 _desc
={f
"{nsr_db_path}.namespace": kdu_instance
},
3304 k8s_instance_info
["namespace"] = kdu_instance
3306 await self
.k8scluster_map
[k8sclustertype
].install(
3307 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3308 kdu_model
=k8s_instance_info
["kdu-model"],
3311 db_dict
=db_dict_install
,
3313 kdu_name
=k8s_instance_info
["kdu-name"],
3314 namespace
=k8s_instance_info
["namespace"],
3315 kdu_instance
=kdu_instance
,
3319 # Obtain services to obtain management service ip
3320 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
3321 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3322 kdu_instance
=kdu_instance
,
3323 namespace
=k8s_instance_info
["namespace"],
3326 # Obtain management service info (if exists)
3327 vnfr_update_dict
= {}
3328 kdu_config
= get_configuration(vnfd
, kdud
["name"])
3330 target_ee_list
= kdu_config
.get("execution-environment-list", [])
3335 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
3338 for service
in kdud
.get("service", [])
3339 if service
.get("mgmt-service")
3341 for mgmt_service
in mgmt_services
:
3342 for service
in services
:
3343 if service
["name"].startswith(mgmt_service
["name"]):
3344 # Mgmt service found, Obtain service ip
3345 ip
= service
.get("external_ip", service
.get("cluster_ip"))
3346 if isinstance(ip
, list) and len(ip
) == 1:
3350 "kdur.{}.ip-address".format(kdu_index
)
3353 # Check if must update also mgmt ip at the vnf
3354 service_external_cp
= mgmt_service
.get(
3355 "external-connection-point-ref"
3357 if service_external_cp
:
3359 deep_get(vnfd
, ("mgmt-interface", "cp"))
3360 == service_external_cp
3362 vnfr_update_dict
["ip-address"] = ip
3367 "external-connection-point-ref", ""
3369 == service_external_cp
,
3372 "kdur.{}.ip-address".format(kdu_index
)
3377 "Mgmt service name: {} not found".format(
3378 mgmt_service
["name"]
3382 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3383 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3385 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3388 and kdu_config
.get("initial-config-primitive")
3389 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3391 initial_config_primitive_list
= kdu_config
.get(
3392 "initial-config-primitive"
3394 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3396 for initial_config_primitive
in initial_config_primitive_list
:
3397 primitive_params_
= self
._map
_primitive
_params
(
3398 initial_config_primitive
, {}, {}
3401 await asyncio
.wait_for(
3402 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3403 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3404 kdu_instance
=kdu_instance
,
3405 primitive_name
=initial_config_primitive
["name"],
3406 params
=primitive_params_
,
3407 db_dict
=db_dict_install
,
3413 except Exception as e
:
3414 # Prepare update db with error and raise exception
3417 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3421 vnfr_data
.get("_id"),
3422 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3425 # ignore to keep original exception
3427 # reraise original error
3432 async def deploy_kdus(
3439 task_instantiation_info
,
3441 # Launch kdus if present in the descriptor
3443 k8scluster_id_2_uuic
= {
3444 "helm-chart-v3": {},
3449 async def _get_cluster_id(cluster_id
, cluster_type
):
3450 nonlocal k8scluster_id_2_uuic
3451 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3452 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3454 # check if K8scluster is creating and wait look if previous tasks in process
3455 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3456 "k8scluster", cluster_id
3459 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3460 task_name
, cluster_id
3462 self
.logger
.debug(logging_text
+ text
)
3463 await asyncio
.wait(task_dependency
, timeout
=3600)
3465 db_k8scluster
= self
.db
.get_one(
3466 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3468 if not db_k8scluster
:
3469 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3471 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3473 if cluster_type
== "helm-chart-v3":
3475 # backward compatibility for existing clusters that have not been initialized for helm v3
3476 k8s_credentials
= yaml
.safe_dump(
3477 db_k8scluster
.get("credentials")
3479 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3480 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3482 db_k8scluster_update
= {}
3483 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3484 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3485 db_k8scluster_update
[
3486 "_admin.helm-chart-v3.created"
3488 db_k8scluster_update
[
3489 "_admin.helm-chart-v3.operationalState"
3492 "k8sclusters", cluster_id
, db_k8scluster_update
3494 except Exception as e
:
3497 + "error initializing helm-v3 cluster: {}".format(str(e
))
3500 "K8s cluster '{}' has not been initialized for '{}'".format(
3501 cluster_id
, cluster_type
3506 "K8s cluster '{}' has not been initialized for '{}'".format(
3507 cluster_id
, cluster_type
3510 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3513 logging_text
+= "Deploy kdus: "
3516 db_nsr_update
= {"_admin.deployed.K8s": []}
3517 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3520 updated_cluster_list
= []
3521 updated_v3_cluster_list
= []
3523 for vnfr_data
in db_vnfrs
.values():
3524 vca_id
= self
.get_vca_id(vnfr_data
, {})
3525 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3526 # Step 0: Prepare and set parameters
3527 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3528 vnfd_id
= vnfr_data
.get("vnfd-id")
3529 vnfd_with_id
= find_in_list(
3530 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3534 for kdud
in vnfd_with_id
["kdu"]
3535 if kdud
["name"] == kdur
["kdu-name"]
3537 namespace
= kdur
.get("k8s-namespace")
3538 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3539 if kdur
.get("helm-chart"):
3540 kdumodel
= kdur
["helm-chart"]
3541 # Default version: helm3, if helm-version is v2 assign v2
3542 k8sclustertype
= "helm-chart-v3"
3543 self
.logger
.debug("kdur: {}".format(kdur
))
3545 kdur
.get("helm-version")
3546 and kdur
.get("helm-version") == "v2"
3548 k8sclustertype
= "helm-chart"
3549 elif kdur
.get("juju-bundle"):
3550 kdumodel
= kdur
["juju-bundle"]
3551 k8sclustertype
= "juju-bundle"
3554 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3555 "juju-bundle. Maybe an old NBI version is running".format(
3556 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3559 # check if kdumodel is a file and exists
3561 vnfd_with_id
= find_in_list(
3562 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3564 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3565 if storage
: # may be not present if vnfd has not artifacts
3566 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3567 if storage
["pkg-dir"]:
3568 filename
= "{}/{}/{}s/{}".format(
3575 filename
= "{}/Scripts/{}s/{}".format(
3580 if self
.fs
.file_exists(
3581 filename
, mode
="file"
3582 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3583 kdumodel
= self
.fs
.path
+ filename
3584 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3586 except Exception: # it is not a file
3589 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3590 step
= "Synchronize repos for k8s cluster '{}'".format(
3593 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3597 k8sclustertype
== "helm-chart"
3598 and cluster_uuid
not in updated_cluster_list
3600 k8sclustertype
== "helm-chart-v3"
3601 and cluster_uuid
not in updated_v3_cluster_list
3603 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3604 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3605 cluster_uuid
=cluster_uuid
3608 if del_repo_list
or added_repo_dict
:
3609 if k8sclustertype
== "helm-chart":
3611 "_admin.helm_charts_added." + item
: None
3612 for item
in del_repo_list
3615 "_admin.helm_charts_added." + item
: name
3616 for item
, name
in added_repo_dict
.items()
3618 updated_cluster_list
.append(cluster_uuid
)
3619 elif k8sclustertype
== "helm-chart-v3":
3621 "_admin.helm_charts_v3_added." + item
: None
3622 for item
in del_repo_list
3625 "_admin.helm_charts_v3_added." + item
: name
3626 for item
, name
in added_repo_dict
.items()
3628 updated_v3_cluster_list
.append(cluster_uuid
)
3630 logging_text
+ "repos synchronized on k8s cluster "
3631 "'{}' to_delete: {}, to_add: {}".format(
3632 k8s_cluster_id
, del_repo_list
, added_repo_dict
3637 {"_id": k8s_cluster_id
},
3643 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3644 vnfr_data
["member-vnf-index-ref"],
3648 k8s_instance_info
= {
3649 "kdu-instance": None,
3650 "k8scluster-uuid": cluster_uuid
,
3651 "k8scluster-type": k8sclustertype
,
3652 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3653 "kdu-name": kdur
["kdu-name"],
3654 "kdu-model": kdumodel
,
3655 "namespace": namespace
,
3656 "kdu-deployment-name": kdu_deployment_name
,
3658 db_path
= "_admin.deployed.K8s.{}".format(index
)
3659 db_nsr_update
[db_path
] = k8s_instance_info
3660 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3661 vnfd_with_id
= find_in_list(
3662 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3664 task
= asyncio
.ensure_future(
3673 k8params
=desc_params
,
3678 self
.lcm_tasks
.register(
3682 "instantiate_KDU-{}".format(index
),
3685 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3691 except (LcmException
, asyncio
.CancelledError
):
3693 except Exception as e
:
3694 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3695 if isinstance(e
, (N2VCException
, DbException
)):
3696 self
.logger
.error(logging_text
+ msg
)
3698 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3699 raise LcmException(msg
)
3702 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3721 task_instantiation_info
,
3724 # launch instantiate_N2VC in a asyncio task and register task object
3725 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3726 # if not found, create one entry and update database
3727 # fill db_nsr._admin.deployed.VCA.<index>
3730 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
3732 if "execution-environment-list" in descriptor_config
:
3733 ee_list
= descriptor_config
.get("execution-environment-list", [])
3734 elif "juju" in descriptor_config
:
3735 ee_list
= [descriptor_config
] # ns charms
3736 else: # other types as script are not supported
3739 for ee_item
in ee_list
:
3742 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3743 ee_item
.get("juju"), ee_item
.get("helm-chart")
3746 ee_descriptor_id
= ee_item
.get("id")
3747 if ee_item
.get("juju"):
3748 vca_name
= ee_item
["juju"].get("charm")
3751 if ee_item
["juju"].get("charm") is not None
3754 if ee_item
["juju"].get("cloud") == "k8s":
3755 vca_type
= "k8s_proxy_charm"
3756 elif ee_item
["juju"].get("proxy") is False:
3757 vca_type
= "native_charm"
3758 elif ee_item
.get("helm-chart"):
3759 vca_name
= ee_item
["helm-chart"]
3760 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
3763 vca_type
= "helm-v3"
3766 logging_text
+ "skipping non juju neither charm configuration"
3771 for vca_index
, vca_deployed
in enumerate(
3772 db_nsr
["_admin"]["deployed"]["VCA"]
3774 if not vca_deployed
:
3777 vca_deployed
.get("member-vnf-index") == member_vnf_index
3778 and vca_deployed
.get("vdu_id") == vdu_id
3779 and vca_deployed
.get("kdu_name") == kdu_name
3780 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
3781 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
3785 # not found, create one.
3787 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
3790 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
3792 target
+= "/kdu/{}".format(kdu_name
)
3794 "target_element": target
,
3795 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3796 "member-vnf-index": member_vnf_index
,
3798 "kdu_name": kdu_name
,
3799 "vdu_count_index": vdu_index
,
3800 "operational-status": "init", # TODO revise
3801 "detailed-status": "", # TODO revise
3802 "step": "initial-deploy", # TODO revise
3804 "vdu_name": vdu_name
,
3806 "ee_descriptor_id": ee_descriptor_id
,
3810 # create VCA and configurationStatus in db
3812 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
3813 "configurationStatus.{}".format(vca_index
): dict(),
3815 self
.update_db_2("nsrs", nsr_id
, db_dict
)
3817 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
3819 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
3820 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
3821 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
3824 task_n2vc
= asyncio
.ensure_future(
3825 self
.instantiate_N2VC(
3826 logging_text
=logging_text
,
3827 vca_index
=vca_index
,
3833 vdu_index
=vdu_index
,
3834 deploy_params
=deploy_params
,
3835 config_descriptor
=descriptor_config
,
3836 base_folder
=base_folder
,
3837 nslcmop_id
=nslcmop_id
,
3841 ee_config_descriptor
=ee_item
,
3844 self
.lcm_tasks
.register(
3848 "instantiate_N2VC-{}".format(vca_index
),
3851 task_instantiation_info
[
3853 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
3854 member_vnf_index
or "", vdu_id
or ""
3858 def _create_nslcmop(nsr_id
, operation
, params
):
3860 Creates a ns-lcm-opp content to be stored at database.
3861 :param nsr_id: internal id of the instance
3862 :param operation: instantiate, terminate, scale, action, ...
3863 :param params: user parameters for the operation
3864 :return: dictionary following SOL005 format
3866 # Raise exception if invalid arguments
3867 if not (nsr_id
and operation
and params
):
3869 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3876 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3877 "operationState": "PROCESSING",
3878 "statusEnteredTime": now
,
3879 "nsInstanceId": nsr_id
,
3880 "lcmOperationType": operation
,
3882 "isAutomaticInvocation": False,
3883 "operationParams": params
,
3884 "isCancelPending": False,
3886 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
3887 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
3892 def _format_additional_params(self
, params
):
3893 params
= params
or {}
3894 for key
, value
in params
.items():
3895 if str(value
).startswith("!!yaml "):
3896 params
[key
] = yaml
.safe_load(value
[7:])
3899 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
3900 primitive
= seq
.get("name")
3901 primitive_params
= {}
3903 "member_vnf_index": vnf_index
,
3904 "primitive": primitive
,
3905 "primitive_params": primitive_params
,
3908 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
3912 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
3913 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
3914 if op
.get("operationState") == "COMPLETED":
3915 # b. Skip sub-operation
3916 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3917 return self
.SUBOPERATION_STATUS_SKIP
3919 # c. retry executing sub-operation
3920 # The sub-operation exists, and operationState != 'COMPLETED'
3921 # Update operationState = 'PROCESSING' to indicate a retry.
3922 operationState
= "PROCESSING"
3923 detailed_status
= "In progress"
3924 self
._update
_suboperation
_status
(
3925 db_nslcmop
, op_index
, operationState
, detailed_status
3927 # Return the sub-operation index
3928 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3929 # with arguments extracted from the sub-operation
3932 # Find a sub-operation where all keys in a matching dictionary must match
3933 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3934 def _find_suboperation(self
, db_nslcmop
, match
):
3935 if db_nslcmop
and match
:
3936 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
3937 for i
, op
in enumerate(op_list
):
3938 if all(op
.get(k
) == match
[k
] for k
in match
):
3940 return self
.SUBOPERATION_STATUS_NOT_FOUND
3942 # Update status for a sub-operation given its index
3943 def _update_suboperation_status(
3944 self
, db_nslcmop
, op_index
, operationState
, detailed_status
3946 # Update DB for HA tasks
3947 q_filter
= {"_id": db_nslcmop
["_id"]}
3949 "_admin.operations.{}.operationState".format(op_index
): operationState
,
3950 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
3953 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
3956 # Add sub-operation, return the index of the added sub-operation
3957 # Optionally, set operationState, detailed-status, and operationType
3958 # Status and type are currently set for 'scale' sub-operations:
3959 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3960 # 'detailed-status' : status message
3961 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3962 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3963 def _add_suboperation(
3971 mapped_primitive_params
,
3972 operationState
=None,
3973 detailed_status
=None,
3976 RO_scaling_info
=None,
3979 return self
.SUBOPERATION_STATUS_NOT_FOUND
3980 # Get the "_admin.operations" list, if it exists
3981 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
3982 op_list
= db_nslcmop_admin
.get("operations")
3983 # Create or append to the "_admin.operations" list
3985 "member_vnf_index": vnf_index
,
3987 "vdu_count_index": vdu_count_index
,
3988 "primitive": primitive
,
3989 "primitive_params": mapped_primitive_params
,
3992 new_op
["operationState"] = operationState
3994 new_op
["detailed-status"] = detailed_status
3996 new_op
["lcmOperationType"] = operationType
3998 new_op
["RO_nsr_id"] = RO_nsr_id
4000 new_op
["RO_scaling_info"] = RO_scaling_info
4002 # No existing operations, create key 'operations' with current operation as first list element
4003 db_nslcmop_admin
.update({"operations": [new_op
]})
4004 op_list
= db_nslcmop_admin
.get("operations")
4006 # Existing operations, append operation to list
4007 op_list
.append(new_op
)
4009 db_nslcmop_update
= {"_admin.operations": op_list
}
4010 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
4011 op_index
= len(op_list
) - 1
4014 # Helper methods for scale() sub-operations
4016 # pre-scale/post-scale:
4017 # Check for 3 different cases:
4018 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
4019 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
4020 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
4021 def _check_or_add_scale_suboperation(
4025 vnf_config_primitive
,
4029 RO_scaling_info
=None,
4031 # Find this sub-operation
4032 if RO_nsr_id
and RO_scaling_info
:
4033 operationType
= "SCALE-RO"
4035 "member_vnf_index": vnf_index
,
4036 "RO_nsr_id": RO_nsr_id
,
4037 "RO_scaling_info": RO_scaling_info
,
4041 "member_vnf_index": vnf_index
,
4042 "primitive": vnf_config_primitive
,
4043 "primitive_params": primitive_params
,
4044 "lcmOperationType": operationType
,
4046 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
4047 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
4048 # a. New sub-operation
4049 # The sub-operation does not exist, add it.
4050 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
4051 # The following parameters are set to None for all kind of scaling:
4053 vdu_count_index
= None
4055 if RO_nsr_id
and RO_scaling_info
:
4056 vnf_config_primitive
= None
4057 primitive_params
= None
4060 RO_scaling_info
= None
4061 # Initial status for sub-operation
4062 operationState
= "PROCESSING"
4063 detailed_status
= "In progress"
4064 # Add sub-operation for pre/post-scaling (zero or more operations)
4065 self
._add
_suboperation
(
4071 vnf_config_primitive
,
4079 return self
.SUBOPERATION_STATUS_NEW
4081 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
4082 # or op_index (operationState != 'COMPLETED')
4083 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
4085 # Function to return execution_environment id
4087 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
4088 # TODO vdu_index_count
4089 for vca
in vca_deployed_list
:
4090 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
4093 async def destroy_N2VC(
4101 exec_primitives
=True,
4106 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
4107 :param logging_text:
4109 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
4110 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
4111 :param vca_index: index in the database _admin.deployed.VCA
4112 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
4113 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
4114 not executed properly
4115 :param scaling_in: True destroys the application, False destroys the model
4116 :return: None or exception
4121 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
4122 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
4126 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
4128 # execute terminate_primitives
4130 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
4131 config_descriptor
.get("terminate-config-primitive"),
4132 vca_deployed
.get("ee_descriptor_id"),
4134 vdu_id
= vca_deployed
.get("vdu_id")
4135 vdu_count_index
= vca_deployed
.get("vdu_count_index")
4136 vdu_name
= vca_deployed
.get("vdu_name")
4137 vnf_index
= vca_deployed
.get("member-vnf-index")
4138 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
4139 for seq
in terminate_primitives
:
4140 # For each sequence in list, get primitive and call _ns_execute_primitive()
4141 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
4142 vnf_index
, seq
.get("name")
4144 self
.logger
.debug(logging_text
+ step
)
4145 # Create the primitive for each sequence, i.e. "primitive": "touch"
4146 primitive
= seq
.get("name")
4147 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
4152 self
._add
_suboperation
(
4159 mapped_primitive_params
,
4161 # Sub-operations: Call _ns_execute_primitive() instead of action()
4163 result
, result_detail
= await self
._ns
_execute
_primitive
(
4164 vca_deployed
["ee_id"],
4166 mapped_primitive_params
,
4170 except LcmException
:
4171 # this happens when VCA is not deployed. In this case it is not needed to terminate
4173 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
4174 if result
not in result_ok
:
4176 "terminate_primitive {} for vnf_member_index={} fails with "
4177 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
4179 # set that this VCA do not need terminated
4180 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
4184 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
4187 # Delete Prometheus Jobs if any
4188 # This uses NSR_ID, so it will destroy any jobs under this index
4189 self
.db
.del_list("prometheus_jobs", {"nsr_id": db_nslcmop
["nsInstanceId"]})
4192 await self
.vca_map
[vca_type
].delete_execution_environment(
4193 vca_deployed
["ee_id"],
4194 scaling_in
=scaling_in
,
4199 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
4200 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
4201 namespace
= "." + db_nsr
["_id"]
4203 await self
.n2vc
.delete_namespace(
4204 namespace
=namespace
,
4205 total_timeout
=self
.timeout_charm_delete
,
4208 except N2VCNotFound
: # already deleted. Skip
4210 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
4212 async def _terminate_RO(
4213 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4216 Terminates a deployment from RO
4217 :param logging_text:
4218 :param nsr_deployed: db_nsr._admin.deployed
4221 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
4222 this method will update only the index 2, but it will write on database the concatenated content of the list
4227 ro_nsr_id
= ro_delete_action
= None
4228 if nsr_deployed
and nsr_deployed
.get("RO"):
4229 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
4230 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
4233 stage
[2] = "Deleting ns from VIM."
4234 db_nsr_update
["detailed-status"] = " ".join(stage
)
4235 self
._write
_op
_status
(nslcmop_id
, stage
)
4236 self
.logger
.debug(logging_text
+ stage
[2])
4237 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4238 self
._write
_op
_status
(nslcmop_id
, stage
)
4239 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
4240 ro_delete_action
= desc
["action_id"]
4242 "_admin.deployed.RO.nsr_delete_action_id"
4243 ] = ro_delete_action
4244 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4245 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4246 if ro_delete_action
:
4247 # wait until NS is deleted from VIM
4248 stage
[2] = "Waiting ns deleted from VIM."
4249 detailed_status_old
= None
4253 + " RO_id={} ro_delete_action={}".format(
4254 ro_nsr_id
, ro_delete_action
4257 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4258 self
._write
_op
_status
(nslcmop_id
, stage
)
4260 delete_timeout
= 20 * 60 # 20 minutes
4261 while delete_timeout
> 0:
4262 desc
= await self
.RO
.show(
4264 item_id_name
=ro_nsr_id
,
4265 extra_item
="action",
4266 extra_item_id
=ro_delete_action
,
4270 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
4272 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
4273 if ns_status
== "ERROR":
4274 raise ROclient
.ROClientException(ns_status_info
)
4275 elif ns_status
== "BUILD":
4276 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
4277 elif ns_status
== "ACTIVE":
4278 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4279 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4284 ), "ROclient.check_action_status returns unknown {}".format(
4287 if stage
[2] != detailed_status_old
:
4288 detailed_status_old
= stage
[2]
4289 db_nsr_update
["detailed-status"] = " ".join(stage
)
4290 self
._write
_op
_status
(nslcmop_id
, stage
)
4291 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4292 await asyncio
.sleep(5, loop
=self
.loop
)
4294 else: # delete_timeout <= 0:
4295 raise ROclient
.ROClientException(
4296 "Timeout waiting ns deleted from VIM"
4299 except Exception as e
:
4300 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4302 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4304 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4305 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4306 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4308 logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
)
4311 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4313 failed_detail
.append("delete conflict: {}".format(e
))
4316 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
)
4319 failed_detail
.append("delete error: {}".format(e
))
4321 logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
)
4325 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
4326 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
4328 stage
[2] = "Deleting nsd from RO."
4329 db_nsr_update
["detailed-status"] = " ".join(stage
)
4330 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4331 self
._write
_op
_status
(nslcmop_id
, stage
)
4332 await self
.RO
.delete("nsd", ro_nsd_id
)
4334 logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
)
4336 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4337 except Exception as e
:
4339 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4341 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4343 logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
)
4346 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4348 failed_detail
.append(
4349 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
)
4351 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4353 failed_detail
.append(
4354 "ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
)
4356 self
.logger
.error(logging_text
+ failed_detail
[-1])
4358 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
4359 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
4360 if not vnf_deployed
or not vnf_deployed
["id"]:
4363 ro_vnfd_id
= vnf_deployed
["id"]
4366 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4367 vnf_deployed
["member-vnf-index"], ro_vnfd_id
4369 db_nsr_update
["detailed-status"] = " ".join(stage
)
4370 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4371 self
._write
_op
_status
(nslcmop_id
, stage
)
4372 await self
.RO
.delete("vnfd", ro_vnfd_id
)
4374 logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
)
4376 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
4377 except Exception as e
:
4379 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4382 "_admin.deployed.RO.vnfd.{}.id".format(index
)
4386 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
)
4389 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4391 failed_detail
.append(
4392 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
)
4394 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4396 failed_detail
.append(
4397 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
)
4399 self
.logger
.error(logging_text
+ failed_detail
[-1])
4402 stage
[2] = "Error deleting from VIM"
4404 stage
[2] = "Deleted from VIM"
4405 db_nsr_update
["detailed-status"] = " ".join(stage
)
4406 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4407 self
._write
_op
_status
(nslcmop_id
, stage
)
4410 raise LcmException("; ".join(failed_detail
))
4412 async def terminate(self
, nsr_id
, nslcmop_id
):
4413 # Try to lock HA task here
4414 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4415 if not task_is_locked_by_me
:
4418 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4419 self
.logger
.debug(logging_text
+ "Enter")
4420 timeout_ns_terminate
= self
.timeout_ns_terminate
4423 operation_params
= None
4425 error_list
= [] # annotates all failed error messages
4426 db_nslcmop_update
= {}
4427 autoremove
= False # autoremove after terminated
4428 tasks_dict_info
= {}
4431 "Stage 1/3: Preparing task.",
4432 "Waiting for previous operations to terminate.",
4435 # ^ contains [stage, step, VIM-status]
4437 # wait for any previous tasks in process
4438 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4440 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4441 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4442 operation_params
= db_nslcmop
.get("operationParams") or {}
4443 if operation_params
.get("timeout_ns_terminate"):
4444 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4445 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4446 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4448 db_nsr_update
["operational-status"] = "terminating"
4449 db_nsr_update
["config-status"] = "terminating"
4450 self
._write
_ns
_status
(
4452 ns_state
="TERMINATING",
4453 current_operation
="TERMINATING",
4454 current_operation_id
=nslcmop_id
,
4455 other_update
=db_nsr_update
,
4457 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4458 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4459 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4462 stage
[1] = "Getting vnf descriptors from db."
4463 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4465 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4467 db_vnfds_from_id
= {}
4468 db_vnfds_from_member_index
= {}
4470 for vnfr
in db_vnfrs_list
:
4471 vnfd_id
= vnfr
["vnfd-id"]
4472 if vnfd_id
not in db_vnfds_from_id
:
4473 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4474 db_vnfds_from_id
[vnfd_id
] = vnfd
4475 db_vnfds_from_member_index
[
4476 vnfr
["member-vnf-index-ref"]
4477 ] = db_vnfds_from_id
[vnfd_id
]
4479 # Destroy individual execution environments when there are terminating primitives.
4480 # Rest of EE will be deleted at once
4481 # TODO - check before calling _destroy_N2VC
4482 # if not operation_params.get("skip_terminate_primitives"):#
4483 # or not vca.get("needed_terminate"):
4484 stage
[0] = "Stage 2/3 execute terminating primitives."
4485 self
.logger
.debug(logging_text
+ stage
[0])
4486 stage
[1] = "Looking execution environment that needs terminate."
4487 self
.logger
.debug(logging_text
+ stage
[1])
4489 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4490 config_descriptor
= None
4491 vca_member_vnf_index
= vca
.get("member-vnf-index")
4492 vca_id
= self
.get_vca_id(
4493 db_vnfrs_dict
.get(vca_member_vnf_index
)
4494 if vca_member_vnf_index
4498 if not vca
or not vca
.get("ee_id"):
4500 if not vca
.get("member-vnf-index"):
4502 config_descriptor
= db_nsr
.get("ns-configuration")
4503 elif vca
.get("vdu_id"):
4504 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4505 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4506 elif vca
.get("kdu_name"):
4507 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4508 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4510 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4511 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4512 vca_type
= vca
.get("type")
4513 exec_terminate_primitives
= not operation_params
.get(
4514 "skip_terminate_primitives"
4515 ) and vca
.get("needed_terminate")
4516 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4517 # pending native charms
4519 True if vca_type
in ("helm", "helm-v3", "native_charm") else False
4521 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4522 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4523 task
= asyncio
.ensure_future(
4531 exec_terminate_primitives
,
4535 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4537 # wait for pending tasks of terminate primitives
4541 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4543 error_list
= await self
._wait
_for
_tasks
(
4546 min(self
.timeout_charm_delete
, timeout_ns_terminate
),
4550 tasks_dict_info
.clear()
4552 return # raise LcmException("; ".join(error_list))
4554 # remove All execution environments at once
4555 stage
[0] = "Stage 3/3 delete all."
4557 if nsr_deployed
.get("VCA"):
4558 stage
[1] = "Deleting all execution environments."
4559 self
.logger
.debug(logging_text
+ stage
[1])
4560 vca_id
= self
.get_vca_id({}, db_nsr
)
4561 task_delete_ee
= asyncio
.ensure_future(
4563 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4564 timeout
=self
.timeout_charm_delete
,
4567 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4568 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4570 # Delete from k8scluster
4571 stage
[1] = "Deleting KDUs."
4572 self
.logger
.debug(logging_text
+ stage
[1])
4573 # print(nsr_deployed)
4574 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4575 if not kdu
or not kdu
.get("kdu-instance"):
4577 kdu_instance
= kdu
.get("kdu-instance")
4578 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4579 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4580 vca_id
= self
.get_vca_id({}, db_nsr
)
4581 task_delete_kdu_instance
= asyncio
.ensure_future(
4582 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4583 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4584 kdu_instance
=kdu_instance
,
4586 namespace
=kdu
.get("namespace"),
4592 + "Unknown k8s deployment type {}".format(
4593 kdu
.get("k8scluster-type")
4598 task_delete_kdu_instance
4599 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4602 stage
[1] = "Deleting ns from VIM."
4604 task_delete_ro
= asyncio
.ensure_future(
4605 self
._terminate
_ng
_ro
(
4606 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4610 task_delete_ro
= asyncio
.ensure_future(
4612 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4615 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4617 # rest of staff will be done at finally
4620 ROclient
.ROClientException
,
4625 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4627 except asyncio
.CancelledError
:
4629 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4631 exc
= "Operation was cancelled"
4632 except Exception as e
:
4633 exc
= traceback
.format_exc()
4634 self
.logger
.critical(
4635 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4640 error_list
.append(str(exc
))
4642 # wait for pending tasks
4644 stage
[1] = "Waiting for terminate pending tasks."
4645 self
.logger
.debug(logging_text
+ stage
[1])
4646 error_list
+= await self
._wait
_for
_tasks
(
4649 timeout_ns_terminate
,
4653 stage
[1] = stage
[2] = ""
4654 except asyncio
.CancelledError
:
4655 error_list
.append("Cancelled")
4656 # TODO cancell all tasks
4657 except Exception as exc
:
4658 error_list
.append(str(exc
))
4659 # update status at database
4661 error_detail
= "; ".join(error_list
)
4662 # self.logger.error(logging_text + error_detail)
4663 error_description_nslcmop
= "{} Detail: {}".format(
4664 stage
[0], error_detail
4666 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4667 nslcmop_id
, stage
[0]
4670 db_nsr_update
["operational-status"] = "failed"
4671 db_nsr_update
["detailed-status"] = (
4672 error_description_nsr
+ " Detail: " + error_detail
4674 db_nslcmop_update
["detailed-status"] = error_detail
4675 nslcmop_operation_state
= "FAILED"
4679 error_description_nsr
= error_description_nslcmop
= None
4680 ns_state
= "NOT_INSTANTIATED"
4681 db_nsr_update
["operational-status"] = "terminated"
4682 db_nsr_update
["detailed-status"] = "Done"
4683 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4684 db_nslcmop_update
["detailed-status"] = "Done"
4685 nslcmop_operation_state
= "COMPLETED"
4688 self
._write
_ns
_status
(
4691 current_operation
="IDLE",
4692 current_operation_id
=None,
4693 error_description
=error_description_nsr
,
4694 error_detail
=error_detail
,
4695 other_update
=db_nsr_update
,
4697 self
._write
_op
_status
(
4700 error_message
=error_description_nslcmop
,
4701 operation_state
=nslcmop_operation_state
,
4702 other_update
=db_nslcmop_update
,
4704 if ns_state
== "NOT_INSTANTIATED":
4708 {"nsr-id-ref": nsr_id
},
4709 {"_admin.nsState": "NOT_INSTANTIATED"},
4711 except DbException
as e
:
4714 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4718 if operation_params
:
4719 autoremove
= operation_params
.get("autoremove", False)
4720 if nslcmop_operation_state
:
4722 await self
.msg
.aiowrite(
4727 "nslcmop_id": nslcmop_id
,
4728 "operationState": nslcmop_operation_state
,
4729 "autoremove": autoremove
,
4733 except Exception as e
:
4735 logging_text
+ "kafka_write notification Exception {}".format(e
)
4738 self
.logger
.debug(logging_text
+ "Exit")
4739 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4741 async def _wait_for_tasks(
4742 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4745 error_detail_list
= []
4747 pending_tasks
= list(created_tasks_info
.keys())
4748 num_tasks
= len(pending_tasks
)
4750 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4751 self
._write
_op
_status
(nslcmop_id
, stage
)
4752 while pending_tasks
:
4754 _timeout
= timeout
+ time_start
- time()
4755 done
, pending_tasks
= await asyncio
.wait(
4756 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4758 num_done
+= len(done
)
4759 if not done
: # Timeout
4760 for task
in pending_tasks
:
4761 new_error
= created_tasks_info
[task
] + ": Timeout"
4762 error_detail_list
.append(new_error
)
4763 error_list
.append(new_error
)
4766 if task
.cancelled():
4769 exc
= task
.exception()
4771 if isinstance(exc
, asyncio
.TimeoutError
):
4773 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4774 error_list
.append(created_tasks_info
[task
])
4775 error_detail_list
.append(new_error
)
4782 ROclient
.ROClientException
,
4788 self
.logger
.error(logging_text
+ new_error
)
4790 exc_traceback
= "".join(
4791 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4795 + created_tasks_info
[task
]
4801 logging_text
+ created_tasks_info
[task
] + ": Done"
4803 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4805 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4806 if nsr_id
: # update also nsr
4811 "errorDescription": "Error at: " + ", ".join(error_list
),
4812 "errorDetail": ". ".join(error_detail_list
),
4815 self
._write
_op
_status
(nslcmop_id
, stage
)
4816 return error_detail_list
4819 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4821 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4822 The default-value is used. If it is between < > it look for a value at instantiation_params
4823 :param primitive_desc: portion of VNFD/NSD that describes primitive
4824 :param params: Params provided by user
4825 :param instantiation_params: Instantiation params provided by user
4826 :return: a dictionary with the calculated params
4828 calculated_params
= {}
4829 for parameter
in primitive_desc
.get("parameter", ()):
4830 param_name
= parameter
["name"]
4831 if param_name
in params
:
4832 calculated_params
[param_name
] = params
[param_name
]
4833 elif "default-value" in parameter
or "value" in parameter
:
4834 if "value" in parameter
:
4835 calculated_params
[param_name
] = parameter
["value"]
4837 calculated_params
[param_name
] = parameter
["default-value"]
4839 isinstance(calculated_params
[param_name
], str)
4840 and calculated_params
[param_name
].startswith("<")
4841 and calculated_params
[param_name
].endswith(">")
4843 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4844 calculated_params
[param_name
] = instantiation_params
[
4845 calculated_params
[param_name
][1:-1]
4849 "Parameter {} needed to execute primitive {} not provided".format(
4850 calculated_params
[param_name
], primitive_desc
["name"]
4855 "Parameter {} needed to execute primitive {} not provided".format(
4856 param_name
, primitive_desc
["name"]
4860 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4861 calculated_params
[param_name
] = yaml
.safe_dump(
4862 calculated_params
[param_name
], default_flow_style
=True, width
=256
4864 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4866 ].startswith("!!yaml "):
4867 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4868 if parameter
.get("data-type") == "INTEGER":
4870 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4871 except ValueError: # error converting string to int
4873 "Parameter {} of primitive {} must be integer".format(
4874 param_name
, primitive_desc
["name"]
4877 elif parameter
.get("data-type") == "BOOLEAN":
4878 calculated_params
[param_name
] = not (
4879 (str(calculated_params
[param_name
])).lower() == "false"
4882 # add always ns_config_info if primitive name is config
4883 if primitive_desc
["name"] == "config":
4884 if "ns_config_info" in instantiation_params
:
4885 calculated_params
["ns_config_info"] = instantiation_params
[
4888 return calculated_params
4890 def _look_for_deployed_vca(
4897 ee_descriptor_id
=None,
4899 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4900 for vca
in deployed_vca
:
4903 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4906 vdu_count_index
is not None
4907 and vdu_count_index
!= vca
["vdu_count_index"]
4910 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4912 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4916 # vca_deployed not found
4918 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4919 " is not deployed".format(
4928 ee_id
= vca
.get("ee_id")
4930 "type", "lxc_proxy_charm"
4931 ) # default value for backward compatibility - proxy charm
4934 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4935 "execution environment".format(
4936 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
4939 return ee_id
, vca_type
4941 async def _ns_execute_primitive(
4947 retries_interval
=30,
4954 if primitive
== "config":
4955 primitive_params
= {"params": primitive_params
}
4957 vca_type
= vca_type
or "lxc_proxy_charm"
4961 output
= await asyncio
.wait_for(
4962 self
.vca_map
[vca_type
].exec_primitive(
4964 primitive_name
=primitive
,
4965 params_dict
=primitive_params
,
4966 progress_timeout
=self
.timeout_progress_primitive
,
4967 total_timeout
=self
.timeout_primitive
,
4972 timeout
=timeout
or self
.timeout_primitive
,
4976 except asyncio
.CancelledError
:
4978 except Exception as e
: # asyncio.TimeoutError
4979 if isinstance(e
, asyncio
.TimeoutError
):
4984 "Error executing action {} on {} -> {}".format(
4989 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
4991 return "FAILED", str(e
)
4993 return "COMPLETED", output
4995 except (LcmException
, asyncio
.CancelledError
):
4997 except Exception as e
:
4998 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
5000 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
5002 Updating the vca_status with latest juju information in nsrs record
5003 :param: nsr_id: Id of the nsr
5004 :param: nslcmop_id: Id of the nslcmop
5008 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
5009 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5010 vca_id
= self
.get_vca_id({}, db_nsr
)
5011 if db_nsr
["_admin"]["deployed"]["K8s"]:
5012 for _
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
5013 cluster_uuid
, kdu_instance
, cluster_type
= (
5014 k8s
["k8scluster-uuid"],
5015 k8s
["kdu-instance"],
5016 k8s
["k8scluster-type"],
5018 await self
._on
_update
_k
8s
_db
(
5019 cluster_uuid
=cluster_uuid
,
5020 kdu_instance
=kdu_instance
,
5021 filter={"_id": nsr_id
},
5023 cluster_type
=cluster_type
,
5026 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
5027 table
, filter = "nsrs", {"_id": nsr_id
}
5028 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
5029 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
5031 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
5032 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
5034 async def action(self
, nsr_id
, nslcmop_id
):
5035 # Try to lock HA task here
5036 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5037 if not task_is_locked_by_me
:
5040 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
5041 self
.logger
.debug(logging_text
+ "Enter")
5042 # get all needed from database
5046 db_nslcmop_update
= {}
5047 nslcmop_operation_state
= None
5048 error_description_nslcmop
= None
5051 # wait for any previous tasks in process
5052 step
= "Waiting for previous operations to terminate"
5053 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5055 self
._write
_ns
_status
(
5058 current_operation
="RUNNING ACTION",
5059 current_operation_id
=nslcmop_id
,
5062 step
= "Getting information from database"
5063 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5064 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5065 if db_nslcmop
["operationParams"].get("primitive_params"):
5066 db_nslcmop
["operationParams"]["primitive_params"] = json
.loads(
5067 db_nslcmop
["operationParams"]["primitive_params"]
5070 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5071 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
5072 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
5073 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
5074 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
5075 primitive
= db_nslcmop
["operationParams"]["primitive"]
5076 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
5077 timeout_ns_action
= db_nslcmop
["operationParams"].get(
5078 "timeout_ns_action", self
.timeout_primitive
5082 step
= "Getting vnfr from database"
5083 db_vnfr
= self
.db
.get_one(
5084 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
5086 if db_vnfr
.get("kdur"):
5088 for kdur
in db_vnfr
["kdur"]:
5089 if kdur
.get("additionalParams"):
5090 kdur
["additionalParams"] = json
.loads(
5091 kdur
["additionalParams"]
5093 kdur_list
.append(kdur
)
5094 db_vnfr
["kdur"] = kdur_list
5095 step
= "Getting vnfd from database"
5096 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
5098 # Sync filesystem before running a primitive
5099 self
.fs
.sync(db_vnfr
["vnfd-id"])
5101 step
= "Getting nsd from database"
5102 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
5104 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5105 # for backward compatibility
5106 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5107 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5108 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5109 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5111 # look for primitive
5112 config_primitive_desc
= descriptor_configuration
= None
5114 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
5116 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
5118 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
5120 descriptor_configuration
= db_nsd
.get("ns-configuration")
5122 if descriptor_configuration
and descriptor_configuration
.get(
5125 for config_primitive
in descriptor_configuration
["config-primitive"]:
5126 if config_primitive
["name"] == primitive
:
5127 config_primitive_desc
= config_primitive
5130 if not config_primitive_desc
:
5131 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
5133 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
5137 primitive_name
= primitive
5138 ee_descriptor_id
= None
5140 primitive_name
= config_primitive_desc
.get(
5141 "execution-environment-primitive", primitive
5143 ee_descriptor_id
= config_primitive_desc
.get(
5144 "execution-environment-ref"
5150 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
5152 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
5155 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
5157 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
5159 desc_params
= parse_yaml_strings(
5160 db_vnfr
.get("additionalParamsForVnf")
5163 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
5164 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
5165 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
5167 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
5168 actions
.add(primitive
["name"])
5169 for primitive
in kdu_configuration
.get("config-primitive", []):
5170 actions
.add(primitive
["name"])
5172 nsr_deployed
["K8s"],
5173 lambda kdu
: kdu_name
== kdu
["kdu-name"]
5174 and kdu
["member-vnf-index"] == vnf_index
,
5178 if primitive_name
in actions
5179 and kdu
["k8scluster-type"] not in ("helm-chart", "helm-chart-v3")
5183 # TODO check if ns is in a proper status
5185 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
5187 # kdur and desc_params already set from before
5188 if primitive_params
:
5189 desc_params
.update(primitive_params
)
5190 # TODO Check if we will need something at vnf level
5191 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
5193 kdu_name
== kdu
["kdu-name"]
5194 and kdu
["member-vnf-index"] == vnf_index
5199 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
5202 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
5203 msg
= "unknown k8scluster-type '{}'".format(
5204 kdu
.get("k8scluster-type")
5206 raise LcmException(msg
)
5209 "collection": "nsrs",
5210 "filter": {"_id": nsr_id
},
5211 "path": "_admin.deployed.K8s.{}".format(index
),
5215 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
5217 step
= "Executing kdu {}".format(primitive_name
)
5218 if primitive_name
== "upgrade":
5219 if desc_params
.get("kdu_model"):
5220 kdu_model
= desc_params
.get("kdu_model")
5221 del desc_params
["kdu_model"]
5223 kdu_model
= kdu
.get("kdu-model")
5224 parts
= kdu_model
.split(sep
=":")
5226 kdu_model
= parts
[0]
5228 detailed_status
= await asyncio
.wait_for(
5229 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
5230 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5231 kdu_instance
=kdu
.get("kdu-instance"),
5233 kdu_model
=kdu_model
,
5236 timeout
=timeout_ns_action
,
5238 timeout
=timeout_ns_action
+ 10,
5241 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
5243 elif primitive_name
== "rollback":
5244 detailed_status
= await asyncio
.wait_for(
5245 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
5246 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5247 kdu_instance
=kdu
.get("kdu-instance"),
5250 timeout
=timeout_ns_action
,
5252 elif primitive_name
== "status":
5253 detailed_status
= await asyncio
.wait_for(
5254 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
5255 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5256 kdu_instance
=kdu
.get("kdu-instance"),
5259 timeout
=timeout_ns_action
,
5262 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
5263 kdu
["kdu-name"], nsr_id
5265 params
= self
._map
_primitive
_params
(
5266 config_primitive_desc
, primitive_params
, desc_params
5269 detailed_status
= await asyncio
.wait_for(
5270 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
5271 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5272 kdu_instance
=kdu_instance
,
5273 primitive_name
=primitive_name
,
5276 timeout
=timeout_ns_action
,
5279 timeout
=timeout_ns_action
,
5283 nslcmop_operation_state
= "COMPLETED"
5285 detailed_status
= ""
5286 nslcmop_operation_state
= "FAILED"
5288 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5289 nsr_deployed
["VCA"],
5290 member_vnf_index
=vnf_index
,
5292 vdu_count_index
=vdu_count_index
,
5293 ee_descriptor_id
=ee_descriptor_id
,
5295 for vca_index
, vca_deployed
in enumerate(
5296 db_nsr
["_admin"]["deployed"]["VCA"]
5298 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5300 "collection": "nsrs",
5301 "filter": {"_id": nsr_id
},
5302 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
5306 nslcmop_operation_state
,
5308 ) = await self
._ns
_execute
_primitive
(
5310 primitive
=primitive_name
,
5311 primitive_params
=self
._map
_primitive
_params
(
5312 config_primitive_desc
, primitive_params
, desc_params
5314 timeout
=timeout_ns_action
,
5320 db_nslcmop_update
["detailed-status"] = detailed_status
5321 error_description_nslcmop
= (
5322 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
5326 + " task Done with result {} {}".format(
5327 nslcmop_operation_state
, detailed_status
5330 return # database update is called inside finally
5332 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5333 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5335 except asyncio
.CancelledError
:
5337 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5339 exc
= "Operation was cancelled"
5340 except asyncio
.TimeoutError
:
5341 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5343 except Exception as e
:
5344 exc
= traceback
.format_exc()
5345 self
.logger
.critical(
5346 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5355 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5356 nslcmop_operation_state
= "FAILED"
5358 self
._write
_ns
_status
(
5362 ], # TODO check if degraded. For the moment use previous status
5363 current_operation
="IDLE",
5364 current_operation_id
=None,
5365 # error_description=error_description_nsr,
5366 # error_detail=error_detail,
5367 other_update
=db_nsr_update
,
5370 self
._write
_op
_status
(
5373 error_message
=error_description_nslcmop
,
5374 operation_state
=nslcmop_operation_state
,
5375 other_update
=db_nslcmop_update
,
5378 if nslcmop_operation_state
:
5380 await self
.msg
.aiowrite(
5385 "nslcmop_id": nslcmop_id
,
5386 "operationState": nslcmop_operation_state
,
5390 except Exception as e
:
5392 logging_text
+ "kafka_write notification Exception {}".format(e
)
5394 self
.logger
.debug(logging_text
+ "Exit")
5395 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5396 return nslcmop_operation_state
, detailed_status
5398 async def terminate_vdus(
5399 self
, db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
5401 """This method terminates VDUs
5404 db_vnfr: VNF instance record
5405 member_vnf_index: VNF index to identify the VDUs to be removed
5406 db_nsr: NS instance record
5407 update_db_nslcmops: Nslcmop update record
5409 vca_scaling_info
= []
5410 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5411 scaling_info
["scaling_direction"] = "IN"
5412 scaling_info
["vdu-delete"] = {}
5413 scaling_info
["kdu-delete"] = {}
5414 db_vdur
= db_vnfr
.get("vdur")
5415 vdur_list
= copy(db_vdur
)
5417 for index
, vdu
in enumerate(vdur_list
):
5418 vca_scaling_info
.append(
5420 "osm_vdu_id": vdu
["vdu-id-ref"],
5421 "member-vnf-index": member_vnf_index
,
5423 "vdu_index": count_index
,
5425 scaling_info
["vdu-delete"][vdu
["vdu-id-ref"]] = count_index
5426 scaling_info
["vdu"].append(
5428 "name": vdu
.get("name") or vdu
.get("vdu-name"),
5429 "vdu_id": vdu
["vdu-id-ref"],
5432 for interface
in vdu
["interfaces"]:
5433 scaling_info
["vdu"][index
]["interface"].append(
5435 "name": interface
["name"],
5436 "ip_address": interface
["ip-address"],
5437 "mac_address": interface
.get("mac-address"),
5439 self
.logger
.info("NS update scaling info{}".format(scaling_info
))
5440 stage
[2] = "Terminating VDUs"
5441 if scaling_info
.get("vdu-delete"):
5442 # scale_process = "RO"
5443 if self
.ro_config
.get("ng"):
5444 await self
._scale
_ng
_ro
(
5445 logging_text
, db_nsr
, update_db_nslcmops
, db_vnfr
, scaling_info
, stage
5448 async def remove_vnf(
5449 self
, nsr_id
, nslcmop_id
, vnf_instance_id
5451 """This method is to Remove VNF instances from NS.
5454 nsr_id: NS instance id
5455 nslcmop_id: nslcmop id of update
5456 vnf_instance_id: id of the VNF instance to be removed
5459 result: (str, str) COMPLETED/FAILED, details
5463 logging_text
= "Task ns={} update ".format(nsr_id
)
5464 check_vnfr_count
= len(self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}))
5465 self
.logger
.info("check_vnfr_count {}".format(check_vnfr_count
))
5466 if check_vnfr_count
> 1:
5467 stage
= ["", "", ""]
5468 step
= "Getting nslcmop from database"
5469 self
.logger
.debug(step
+ " after having waited for previous tasks to be completed")
5470 # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5471 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5472 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5473 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5474 """ db_vnfr = self.db.get_one(
5475 "vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id}) """
5477 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5478 await self
.terminate_vdus(db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
)
5480 constituent_vnfr
= db_nsr
.get("constituent-vnfr-ref")
5481 constituent_vnfr
.remove(db_vnfr
.get("_id"))
5482 db_nsr_update
["constituent-vnfr-ref"] = db_nsr
.get("constituent-vnfr-ref")
5483 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5484 self
.db
.del_one("vnfrs", {"_id": db_vnfr
.get("_id")})
5485 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5486 return "COMPLETED", "Done"
5488 step
= "Terminate VNF Failed with"
5489 raise LcmException("{} Cannot terminate the last VNF in this NS.".format(
5491 except (LcmException
, asyncio
.CancelledError
):
5493 except Exception as e
:
5494 self
.logger
.debug("Error removing VNF {}".format(e
))
5495 return "FAILED", "Error removing VNF {}".format(e
)
5497 async def _ns_redeploy_vnf(
5498 self
, nsr_id
, nslcmop_id
, db_vnfd
, db_vnfr
, db_nsr
,
5500 """This method updates and redeploys VNF instances
5503 nsr_id: NS instance id
5504 nslcmop_id: nslcmop id
5505 db_vnfd: VNF descriptor
5506 db_vnfr: VNF instance record
5507 db_nsr: NS instance record
5510 result: (str, str) COMPLETED/FAILED, details
5514 stage
= ["", "", ""]
5515 logging_text
= "Task ns={} update ".format(nsr_id
)
5516 latest_vnfd_revision
= db_vnfd
["_admin"].get("revision")
5517 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5519 # Terminate old VNF resources
5520 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5521 await self
.terminate_vdus(db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
)
5523 # old_vnfd_id = db_vnfr["vnfd-id"]
5524 # new_db_vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
5525 new_db_vnfd
= db_vnfd
5526 # new_vnfd_ref = new_db_vnfd["id"]
5527 # new_vnfd_id = vnfd_id
5531 for cp
in new_db_vnfd
.get("ext-cpd", ()):
5533 "name": cp
.get("id"),
5534 "connection-point-id": cp
.get("int-cpd", {}).get("cpd"),
5535 "connection-point-vdu-id": cp
.get("int-cpd", {}).get("vdu-id"),
5538 new_vnfr_cp
.append(vnf_cp
)
5539 new_vdur
= update_db_nslcmops
["operationParams"]["newVdur"]
5540 # new_vdur = self._create_vdur_descriptor_from_vnfd(db_nsd, db_vnfd, old_db_vnfd, vnfd_id, db_nsr, member_vnf_index)
5541 # new_vnfr_update = {"vnfd-ref": new_vnfd_ref, "vnfd-id": new_vnfd_id, "connection-point": new_vnfr_cp, "vdur": new_vdur, "ip-address": ""}
5542 new_vnfr_update
= {"revision": latest_vnfd_revision
, "connection-point": new_vnfr_cp
, "vdur": new_vdur
, "ip-address": ""}
5543 self
.update_db_2("vnfrs", db_vnfr
["_id"], new_vnfr_update
)
5544 updated_db_vnfr
= self
.db
.get_one(
5545 "vnfrs", {"member-vnf-index-ref": member_vnf_index
, "nsr-id-ref": nsr_id
}
5548 # Instantiate new VNF resources
5549 # update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5550 vca_scaling_info
= []
5551 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5552 scaling_info
["scaling_direction"] = "OUT"
5553 scaling_info
["vdu-create"] = {}
5554 scaling_info
["kdu-create"] = {}
5555 vdud_instantiate_list
= db_vnfd
["vdu"]
5556 for index
, vdud
in enumerate(vdud_instantiate_list
):
5557 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
5561 additional_params
= (
5562 self
._get
_vdu
_additional
_params
(updated_db_vnfr
, vdud
["id"])
5565 cloud_init_list
= []
5567 # TODO Information of its own ip is not available because db_vnfr is not updated.
5568 additional_params
["OSM"] = get_osm_params(
5569 updated_db_vnfr
, vdud
["id"], 1
5571 cloud_init_list
.append(
5572 self
._parse
_cloud
_init
(
5579 vca_scaling_info
.append(
5581 "osm_vdu_id": vdud
["id"],
5582 "member-vnf-index": member_vnf_index
,
5584 "vdu_index": count_index
,
5587 scaling_info
["vdu-create"][vdud
["id"]] = count_index
5588 if self
.ro_config
.get("ng"):
5590 "New Resources to be deployed: {}".format(scaling_info
))
5591 await self
._scale
_ng
_ro
(
5592 logging_text
, db_nsr
, update_db_nslcmops
, updated_db_vnfr
, scaling_info
, stage
5594 return "COMPLETED", "Done"
5595 except (LcmException
, asyncio
.CancelledError
):
5597 except Exception as e
:
5598 self
.logger
.debug("Error updating VNF {}".format(e
))
5599 return "FAILED", "Error updating VNF {}".format(e
)
5601 async def _ns_charm_upgrade(
5607 timeout
: float = None,
5609 """This method upgrade charms in VNF instances
5612 ee_id: Execution environment id
5613 path: Local path to the charm
5615 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
5616 timeout: (Float) Timeout for the ns update operation
5619 result: (str, str) COMPLETED/FAILED, details
5622 charm_type
= charm_type
or "lxc_proxy_charm"
5623 output
= await self
.vca_map
[charm_type
].upgrade_charm(
5627 charm_type
=charm_type
,
5628 timeout
=timeout
or self
.timeout_ns_update
,
5632 return "COMPLETED", output
5634 except (LcmException
, asyncio
.CancelledError
):
5637 except Exception as e
:
5639 self
.logger
.debug("Error upgrading charm {}".format(path
))
5641 return "FAILED", "Error upgrading charm {}: {}".format(path
, e
)
5643 async def update(self
, nsr_id
, nslcmop_id
):
5644 """Update NS according to different update types
5646 This method performs upgrade of VNF instances then updates the revision
5647 number in VNF record
5650 nsr_id: Network service will be updated
5651 nslcmop_id: ns lcm operation id
5654 It may raise DbException, LcmException, N2VCException, K8sException
5657 # Try to lock HA task here
5658 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5659 if not task_is_locked_by_me
:
5662 logging_text
= "Task ns={} update={} ".format(nsr_id
, nslcmop_id
)
5663 self
.logger
.debug(logging_text
+ "Enter")
5665 # Set the required variables to be filled up later
5667 db_nslcmop_update
= {}
5669 nslcmop_operation_state
= None
5671 error_description_nslcmop
= ""
5673 change_type
= "updated"
5674 detailed_status
= ""
5677 # wait for any previous tasks in process
5678 step
= "Waiting for previous operations to terminate"
5679 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5680 self
._write
_ns
_status
(
5683 current_operation
="UPDATING",
5684 current_operation_id
=nslcmop_id
,
5687 step
= "Getting nslcmop from database"
5688 db_nslcmop
= self
.db
.get_one(
5689 "nslcmops", {"_id": nslcmop_id
}, fail_on_empty
=False
5691 update_type
= db_nslcmop
["operationParams"]["updateType"]
5693 step
= "Getting nsr from database"
5694 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5695 old_operational_status
= db_nsr
["operational-status"]
5696 db_nsr_update
["operational-status"] = "updating"
5697 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5698 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5700 if update_type
== "CHANGE_VNFPKG":
5702 # Get the input parameters given through update request
5703 vnf_instance_id
= db_nslcmop
["operationParams"][
5704 "changeVnfPackageData"
5705 ].get("vnfInstanceId")
5707 vnfd_id
= db_nslcmop
["operationParams"]["changeVnfPackageData"].get(
5710 timeout_seconds
= db_nslcmop
["operationParams"].get("timeout_ns_update")
5712 step
= "Getting vnfr from database"
5713 db_vnfr
= self
.db
.get_one(
5714 "vnfrs", {"_id": vnf_instance_id
}, fail_on_empty
=False
5717 step
= "Getting vnfds from database"
5719 latest_vnfd
= self
.db
.get_one(
5720 "vnfds", {"_id": vnfd_id
}, fail_on_empty
=False
5722 latest_vnfd_revision
= latest_vnfd
["_admin"].get("revision")
5725 current_vnf_revision
= db_vnfr
.get("revision", 1)
5726 current_vnfd
= self
.db
.get_one(
5728 {"_id": vnfd_id
+ ":" + str(current_vnf_revision
)},
5729 fail_on_empty
=False,
5731 # Charm artifact paths will be filled up later
5733 current_charm_artifact_path
,
5734 target_charm_artifact_path
,
5735 charm_artifact_paths
,
5738 step
= "Checking if revision has changed in VNFD"
5739 if current_vnf_revision
!= latest_vnfd_revision
:
5741 change_type
= "policy_updated"
5743 # There is new revision of VNFD, update operation is required
5744 current_vnfd_path
= vnfd_id
+ ":" + str(current_vnf_revision
)
5745 latest_vnfd_path
= vnfd_id
+ ":" + str(latest_vnfd_revision
)
5747 step
= "Removing the VNFD packages if they exist in the local path"
5748 shutil
.rmtree(self
.fs
.path
+ current_vnfd_path
, ignore_errors
=True)
5749 shutil
.rmtree(self
.fs
.path
+ latest_vnfd_path
, ignore_errors
=True)
5751 step
= "Get the VNFD packages from FSMongo"
5752 self
.fs
.sync(from_path
=latest_vnfd_path
)
5753 self
.fs
.sync(from_path
=current_vnfd_path
)
5756 "Get the charm-type, charm-id, ee-id if there is deployed VCA"
5758 base_folder
= latest_vnfd
["_admin"]["storage"]
5760 for charm_index
, charm_deployed
in enumerate(
5761 get_iterable(nsr_deployed
, "VCA")
5763 vnf_index
= db_vnfr
.get("member-vnf-index-ref")
5765 # Getting charm-id and charm-type
5766 if charm_deployed
.get("member-vnf-index") == vnf_index
:
5767 charm_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5768 charm_type
= charm_deployed
.get("type")
5771 ee_id
= charm_deployed
.get("ee_id")
5773 step
= "Getting descriptor config"
5774 descriptor_config
= get_configuration(
5775 current_vnfd
, current_vnfd
["id"]
5778 if "execution-environment-list" in descriptor_config
:
5779 ee_list
= descriptor_config
.get(
5780 "execution-environment-list", []
5785 # There could be several charm used in the same VNF
5786 for ee_item
in ee_list
:
5787 if ee_item
.get("juju"):
5789 step
= "Getting charm name"
5790 charm_name
= ee_item
["juju"].get("charm")
5792 step
= "Setting Charm artifact paths"
5793 current_charm_artifact_path
.append(
5794 get_charm_artifact_path(
5798 current_vnf_revision
,
5801 target_charm_artifact_path
.append(
5802 get_charm_artifact_path(
5806 latest_vnfd_revision
,
5810 charm_artifact_paths
= zip(
5811 current_charm_artifact_path
, target_charm_artifact_path
5814 step
= "Checking if software version has changed in VNFD"
5815 if find_software_version(current_vnfd
) != find_software_version(
5819 step
= "Checking if existing VNF has charm"
5820 for current_charm_path
, target_charm_path
in list(
5821 charm_artifact_paths
5823 if current_charm_path
:
5825 "Software version change is not supported as VNF instance {} has charm.".format(
5830 # There is no change in the charm package, then redeploy the VNF
5831 # based on new descriptor
5832 step
= "Redeploying VNF"
5833 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5837 ) = await self
._ns
_redeploy
_vnf
(
5844 if result
== "FAILED":
5845 nslcmop_operation_state
= result
5846 error_description_nslcmop
= detailed_status
5847 db_nslcmop_update
["detailed-status"] = detailed_status
5850 + " step {} Done with result {} {}".format(
5851 step
, nslcmop_operation_state
, detailed_status
5856 step
= "Checking if any charm package has changed or not"
5857 for current_charm_path
, target_charm_path
in list(
5858 charm_artifact_paths
5862 and target_charm_path
5863 and self
.check_charm_hash_changed(
5864 current_charm_path
, target_charm_path
5868 step
= "Checking whether VNF uses juju bundle"
5869 if check_juju_bundle_existence(current_vnfd
):
5872 "Charm upgrade is not supported for the instance which"
5873 " uses juju-bundle: {}".format(
5874 check_juju_bundle_existence(current_vnfd
)
5878 step
= "Upgrading Charm"
5882 ) = await self
._ns
_charm
_upgrade
(
5885 charm_type
=charm_type
,
5886 path
=self
.fs
.path
+ target_charm_path
,
5887 timeout
=timeout_seconds
,
5890 if result
== "FAILED":
5891 nslcmop_operation_state
= result
5892 error_description_nslcmop
= detailed_status
5894 db_nslcmop_update
["detailed-status"] = detailed_status
5897 + " step {} Done with result {} {}".format(
5898 step
, nslcmop_operation_state
, detailed_status
5902 step
= "Updating policies"
5903 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5904 result
= "COMPLETED"
5905 detailed_status
= "Done"
5906 db_nslcmop_update
["detailed-status"] = "Done"
5908 # If nslcmop_operation_state is None, so any operation is not failed.
5909 if not nslcmop_operation_state
:
5910 nslcmop_operation_state
= "COMPLETED"
5912 # If update CHANGE_VNFPKG nslcmop_operation is successful
5913 # vnf revision need to be updated
5914 vnfr_update
["revision"] = latest_vnfd_revision
5915 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
5919 + " task Done with result {} {}".format(
5920 nslcmop_operation_state
, detailed_status
5923 elif update_type
== "REMOVE_VNF":
5924 # This part is included in https://osm.etsi.org/gerrit/11876
5925 vnf_instance_id
= db_nslcmop
["operationParams"]["removeVnfInstanceId"]
5926 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5927 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5928 step
= "Removing VNF"
5929 (result
, detailed_status
) = await self
.remove_vnf(nsr_id
, nslcmop_id
, vnf_instance_id
)
5930 if result
== "FAILED":
5931 nslcmop_operation_state
= result
5932 error_description_nslcmop
= detailed_status
5933 db_nslcmop_update
["detailed-status"] = detailed_status
5934 change_type
= "vnf_terminated"
5935 if not nslcmop_operation_state
:
5936 nslcmop_operation_state
= "COMPLETED"
5939 + " task Done with result {} {}".format(
5940 nslcmop_operation_state
, detailed_status
5944 elif update_type
== "OPERATE_VNF":
5945 vnf_id
= db_nslcmop
["operationParams"]["operateVnfData"]["vnfInstanceId"]
5946 operation_type
= db_nslcmop
["operationParams"]["operateVnfData"]["changeStateTo"]
5947 additional_param
= db_nslcmop
["operationParams"]["operateVnfData"]["additionalParam"]
5948 (result
, detailed_status
) = await self
.rebuild_start_stop(
5949 nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
5951 if result
== "FAILED":
5952 nslcmop_operation_state
= result
5953 error_description_nslcmop
= detailed_status
5954 db_nslcmop_update
["detailed-status"] = detailed_status
5955 if not nslcmop_operation_state
:
5956 nslcmop_operation_state
= "COMPLETED"
5959 + " task Done with result {} {}".format(
5960 nslcmop_operation_state
, detailed_status
5964 # If nslcmop_operation_state is None, so any operation is not failed.
5965 # All operations are executed in overall.
5966 if not nslcmop_operation_state
:
5967 nslcmop_operation_state
= "COMPLETED"
5968 db_nsr_update
["operational-status"] = old_operational_status
5970 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5971 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5973 except asyncio
.CancelledError
:
5975 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5977 exc
= "Operation was cancelled"
5978 except asyncio
.TimeoutError
:
5979 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5981 except Exception as e
:
5982 exc
= traceback
.format_exc()
5983 self
.logger
.critical(
5984 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5993 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5994 nslcmop_operation_state
= "FAILED"
5995 db_nsr_update
["operational-status"] = old_operational_status
5997 self
._write
_ns
_status
(
5999 ns_state
=db_nsr
["nsState"],
6000 current_operation
="IDLE",
6001 current_operation_id
=None,
6002 other_update
=db_nsr_update
,
6005 self
._write
_op
_status
(
6008 error_message
=error_description_nslcmop
,
6009 operation_state
=nslcmop_operation_state
,
6010 other_update
=db_nslcmop_update
,
6013 if nslcmop_operation_state
:
6017 "nslcmop_id": nslcmop_id
,
6018 "operationState": nslcmop_operation_state
,
6020 if change_type
in ("vnf_terminated", "policy_updated"):
6021 msg
.update({"vnf_member_index": member_vnf_index
})
6022 await self
.msg
.aiowrite("ns", change_type
, msg
, loop
=self
.loop
)
6023 except Exception as e
:
6025 logging_text
+ "kafka_write notification Exception {}".format(e
)
6027 self
.logger
.debug(logging_text
+ "Exit")
6028 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_update")
6029 return nslcmop_operation_state
, detailed_status
6031 async def scale(self
, nsr_id
, nslcmop_id
):
6032 # Try to lock HA task here
6033 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
6034 if not task_is_locked_by_me
:
6037 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
6038 stage
= ["", "", ""]
6039 tasks_dict_info
= {}
6040 # ^ stage, step, VIM progress
6041 self
.logger
.debug(logging_text
+ "Enter")
6042 # get all needed from database
6044 db_nslcmop_update
= {}
6047 # in case of error, indicates what part of scale was failed to put nsr at error status
6048 scale_process
= None
6049 old_operational_status
= ""
6050 old_config_status
= ""
6053 # wait for any previous tasks in process
6054 step
= "Waiting for previous operations to terminate"
6055 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
6056 self
._write
_ns
_status
(
6059 current_operation
="SCALING",
6060 current_operation_id
=nslcmop_id
,
6063 step
= "Getting nslcmop from database"
6065 step
+ " after having waited for previous tasks to be completed"
6067 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
6069 step
= "Getting nsr from database"
6070 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
6071 old_operational_status
= db_nsr
["operational-status"]
6072 old_config_status
= db_nsr
["config-status"]
6074 step
= "Parsing scaling parameters"
6075 db_nsr_update
["operational-status"] = "scaling"
6076 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6077 nsr_deployed
= db_nsr
["_admin"].get("deployed")
6079 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
6081 ]["member-vnf-index"]
6082 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
6084 ]["scaling-group-descriptor"]
6085 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
6086 # for backward compatibility
6087 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
6088 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
6089 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
6090 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6092 step
= "Getting vnfr from database"
6093 db_vnfr
= self
.db
.get_one(
6094 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
6097 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
6099 step
= "Getting vnfd from database"
6100 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
6102 base_folder
= db_vnfd
["_admin"]["storage"]
6104 step
= "Getting scaling-group-descriptor"
6105 scaling_descriptor
= find_in_list(
6106 get_scaling_aspect(db_vnfd
),
6107 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
6109 if not scaling_descriptor
:
6111 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
6112 "at vnfd:scaling-group-descriptor".format(scaling_group
)
6115 step
= "Sending scale order to VIM"
6116 # TODO check if ns is in a proper status
6118 if not db_nsr
["_admin"].get("scaling-group"):
6123 "_admin.scaling-group": [
6124 {"name": scaling_group
, "nb-scale-op": 0}
6128 admin_scale_index
= 0
6130 for admin_scale_index
, admin_scale_info
in enumerate(
6131 db_nsr
["_admin"]["scaling-group"]
6133 if admin_scale_info
["name"] == scaling_group
:
6134 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
6136 else: # not found, set index one plus last element and add new entry with the name
6137 admin_scale_index
+= 1
6139 "_admin.scaling-group.{}.name".format(admin_scale_index
)
6142 vca_scaling_info
= []
6143 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
6144 if scaling_type
== "SCALE_OUT":
6145 if "aspect-delta-details" not in scaling_descriptor
:
6147 "Aspect delta details not fount in scaling descriptor {}".format(
6148 scaling_descriptor
["name"]
6151 # count if max-instance-count is reached
6152 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6154 scaling_info
["scaling_direction"] = "OUT"
6155 scaling_info
["vdu-create"] = {}
6156 scaling_info
["kdu-create"] = {}
6157 for delta
in deltas
:
6158 for vdu_delta
in delta
.get("vdu-delta", {}):
6159 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
6160 # vdu_index also provides the number of instance of the targeted vdu
6161 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6162 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
6166 additional_params
= (
6167 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
6170 cloud_init_list
= []
6172 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6173 max_instance_count
= 10
6174 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
6175 max_instance_count
= vdu_profile
.get(
6176 "max-number-of-instances", 10
6179 default_instance_num
= get_number_of_instances(
6182 instances_number
= vdu_delta
.get("number-of-instances", 1)
6183 nb_scale_op
+= instances_number
6185 new_instance_count
= nb_scale_op
+ default_instance_num
6186 # Control if new count is over max and vdu count is less than max.
6187 # Then assign new instance count
6188 if new_instance_count
> max_instance_count
> vdu_count
:
6189 instances_number
= new_instance_count
- max_instance_count
6191 instances_number
= instances_number
6193 if new_instance_count
> max_instance_count
:
6195 "reached the limit of {} (max-instance-count) "
6196 "scaling-out operations for the "
6197 "scaling-group-descriptor '{}'".format(
6198 nb_scale_op
, scaling_group
6201 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6203 # TODO Information of its own ip is not available because db_vnfr is not updated.
6204 additional_params
["OSM"] = get_osm_params(
6205 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
6207 cloud_init_list
.append(
6208 self
._parse
_cloud
_init
(
6215 vca_scaling_info
.append(
6217 "osm_vdu_id": vdu_delta
["id"],
6218 "member-vnf-index": vnf_index
,
6220 "vdu_index": vdu_index
+ x
,
6223 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
6224 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6225 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6226 kdu_name
= kdu_profile
["kdu-name"]
6227 resource_name
= kdu_profile
.get("resource-name", "")
6229 # Might have different kdus in the same delta
6230 # Should have list for each kdu
6231 if not scaling_info
["kdu-create"].get(kdu_name
, None):
6232 scaling_info
["kdu-create"][kdu_name
] = []
6234 kdur
= get_kdur(db_vnfr
, kdu_name
)
6235 if kdur
.get("helm-chart"):
6236 k8s_cluster_type
= "helm-chart-v3"
6237 self
.logger
.debug("kdur: {}".format(kdur
))
6239 kdur
.get("helm-version")
6240 and kdur
.get("helm-version") == "v2"
6242 k8s_cluster_type
= "helm-chart"
6243 elif kdur
.get("juju-bundle"):
6244 k8s_cluster_type
= "juju-bundle"
6247 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6248 "juju-bundle. Maybe an old NBI version is running".format(
6249 db_vnfr
["member-vnf-index-ref"], kdu_name
6253 max_instance_count
= 10
6254 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
6255 max_instance_count
= kdu_profile
.get(
6256 "max-number-of-instances", 10
6259 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
6260 deployed_kdu
, _
= get_deployed_kdu(
6261 nsr_deployed
, kdu_name
, vnf_index
6263 if deployed_kdu
is None:
6265 "KDU '{}' for vnf '{}' not deployed".format(
6269 kdu_instance
= deployed_kdu
.get("kdu-instance")
6270 instance_num
= await self
.k8scluster_map
[
6276 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6277 kdu_model
=deployed_kdu
.get("kdu-model"),
6279 kdu_replica_count
= instance_num
+ kdu_delta
.get(
6280 "number-of-instances", 1
6283 # Control if new count is over max and instance_num is less than max.
6284 # Then assign max instance number to kdu replica count
6285 if kdu_replica_count
> max_instance_count
> instance_num
:
6286 kdu_replica_count
= max_instance_count
6287 if kdu_replica_count
> max_instance_count
:
6289 "reached the limit of {} (max-instance-count) "
6290 "scaling-out operations for the "
6291 "scaling-group-descriptor '{}'".format(
6292 instance_num
, scaling_group
6296 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6297 vca_scaling_info
.append(
6299 "osm_kdu_id": kdu_name
,
6300 "member-vnf-index": vnf_index
,
6302 "kdu_index": instance_num
+ x
- 1,
6305 scaling_info
["kdu-create"][kdu_name
].append(
6307 "member-vnf-index": vnf_index
,
6309 "k8s-cluster-type": k8s_cluster_type
,
6310 "resource-name": resource_name
,
6311 "scale": kdu_replica_count
,
6314 elif scaling_type
== "SCALE_IN":
6315 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6317 scaling_info
["scaling_direction"] = "IN"
6318 scaling_info
["vdu-delete"] = {}
6319 scaling_info
["kdu-delete"] = {}
6321 for delta
in deltas
:
6322 for vdu_delta
in delta
.get("vdu-delta", {}):
6323 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6324 min_instance_count
= 0
6325 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6326 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
6327 min_instance_count
= vdu_profile
["min-number-of-instances"]
6329 default_instance_num
= get_number_of_instances(
6330 db_vnfd
, vdu_delta
["id"]
6332 instance_num
= vdu_delta
.get("number-of-instances", 1)
6333 nb_scale_op
-= instance_num
6335 new_instance_count
= nb_scale_op
+ default_instance_num
6337 if new_instance_count
< min_instance_count
< vdu_count
:
6338 instances_number
= min_instance_count
- new_instance_count
6340 instances_number
= instance_num
6342 if new_instance_count
< min_instance_count
:
6344 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6345 "scaling-group-descriptor '{}'".format(
6346 nb_scale_op
, scaling_group
6349 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6350 vca_scaling_info
.append(
6352 "osm_vdu_id": vdu_delta
["id"],
6353 "member-vnf-index": vnf_index
,
6355 "vdu_index": vdu_index
- 1 - x
,
6358 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
6359 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6360 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6361 kdu_name
= kdu_profile
["kdu-name"]
6362 resource_name
= kdu_profile
.get("resource-name", "")
6364 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
6365 scaling_info
["kdu-delete"][kdu_name
] = []
6367 kdur
= get_kdur(db_vnfr
, kdu_name
)
6368 if kdur
.get("helm-chart"):
6369 k8s_cluster_type
= "helm-chart-v3"
6370 self
.logger
.debug("kdur: {}".format(kdur
))
6372 kdur
.get("helm-version")
6373 and kdur
.get("helm-version") == "v2"
6375 k8s_cluster_type
= "helm-chart"
6376 elif kdur
.get("juju-bundle"):
6377 k8s_cluster_type
= "juju-bundle"
6380 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6381 "juju-bundle. Maybe an old NBI version is running".format(
6382 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
6386 min_instance_count
= 0
6387 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
6388 min_instance_count
= kdu_profile
["min-number-of-instances"]
6390 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
6391 deployed_kdu
, _
= get_deployed_kdu(
6392 nsr_deployed
, kdu_name
, vnf_index
6394 if deployed_kdu
is None:
6396 "KDU '{}' for vnf '{}' not deployed".format(
6400 kdu_instance
= deployed_kdu
.get("kdu-instance")
6401 instance_num
= await self
.k8scluster_map
[
6407 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6408 kdu_model
=deployed_kdu
.get("kdu-model"),
6410 kdu_replica_count
= instance_num
- kdu_delta
.get(
6411 "number-of-instances", 1
6414 if kdu_replica_count
< min_instance_count
< instance_num
:
6415 kdu_replica_count
= min_instance_count
6416 if kdu_replica_count
< min_instance_count
:
6418 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6419 "scaling-group-descriptor '{}'".format(
6420 instance_num
, scaling_group
6424 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6425 vca_scaling_info
.append(
6427 "osm_kdu_id": kdu_name
,
6428 "member-vnf-index": vnf_index
,
6430 "kdu_index": instance_num
- x
- 1,
6433 scaling_info
["kdu-delete"][kdu_name
].append(
6435 "member-vnf-index": vnf_index
,
6437 "k8s-cluster-type": k8s_cluster_type
,
6438 "resource-name": resource_name
,
6439 "scale": kdu_replica_count
,
6443 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
6444 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
6445 if scaling_info
["scaling_direction"] == "IN":
6446 for vdur
in reversed(db_vnfr
["vdur"]):
6447 if vdu_delete
.get(vdur
["vdu-id-ref"]):
6448 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
6449 scaling_info
["vdu"].append(
6451 "name": vdur
.get("name") or vdur
.get("vdu-name"),
6452 "vdu_id": vdur
["vdu-id-ref"],
6456 for interface
in vdur
["interfaces"]:
6457 scaling_info
["vdu"][-1]["interface"].append(
6459 "name": interface
["name"],
6460 "ip_address": interface
["ip-address"],
6461 "mac_address": interface
.get("mac-address"),
6464 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
6467 step
= "Executing pre-scale vnf-config-primitive"
6468 if scaling_descriptor
.get("scaling-config-action"):
6469 for scaling_config_action
in scaling_descriptor
[
6470 "scaling-config-action"
6473 scaling_config_action
.get("trigger") == "pre-scale-in"
6474 and scaling_type
== "SCALE_IN"
6476 scaling_config_action
.get("trigger") == "pre-scale-out"
6477 and scaling_type
== "SCALE_OUT"
6479 vnf_config_primitive
= scaling_config_action
[
6480 "vnf-config-primitive-name-ref"
6482 step
= db_nslcmop_update
[
6484 ] = "executing pre-scale scaling-config-action '{}'".format(
6485 vnf_config_primitive
6488 # look for primitive
6489 for config_primitive
in (
6490 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6491 ).get("config-primitive", ()):
6492 if config_primitive
["name"] == vnf_config_primitive
:
6496 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
6497 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
6498 "primitive".format(scaling_group
, vnf_config_primitive
)
6501 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6502 if db_vnfr
.get("additionalParamsForVnf"):
6503 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6505 scale_process
= "VCA"
6506 db_nsr_update
["config-status"] = "configuring pre-scaling"
6507 primitive_params
= self
._map
_primitive
_params
(
6508 config_primitive
, {}, vnfr_params
6511 # Pre-scale retry check: Check if this sub-operation has been executed before
6512 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6515 vnf_config_primitive
,
6519 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6520 # Skip sub-operation
6521 result
= "COMPLETED"
6522 result_detail
= "Done"
6525 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6526 vnf_config_primitive
, result
, result_detail
6530 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6531 # New sub-operation: Get index of this sub-operation
6533 len(db_nslcmop
.get("_admin", {}).get("operations"))
6538 + "vnf_config_primitive={} New sub-operation".format(
6539 vnf_config_primitive
6543 # retry: Get registered params for this existing sub-operation
6544 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6547 vnf_index
= op
.get("member_vnf_index")
6548 vnf_config_primitive
= op
.get("primitive")
6549 primitive_params
= op
.get("primitive_params")
6552 + "vnf_config_primitive={} Sub-operation retry".format(
6553 vnf_config_primitive
6556 # Execute the primitive, either with new (first-time) or registered (reintent) args
6557 ee_descriptor_id
= config_primitive
.get(
6558 "execution-environment-ref"
6560 primitive_name
= config_primitive
.get(
6561 "execution-environment-primitive", vnf_config_primitive
6563 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6564 nsr_deployed
["VCA"],
6565 member_vnf_index
=vnf_index
,
6567 vdu_count_index
=None,
6568 ee_descriptor_id
=ee_descriptor_id
,
6570 result
, result_detail
= await self
._ns
_execute
_primitive
(
6579 + "vnf_config_primitive={} Done with result {} {}".format(
6580 vnf_config_primitive
, result
, result_detail
6583 # Update operationState = COMPLETED | FAILED
6584 self
._update
_suboperation
_status
(
6585 db_nslcmop
, op_index
, result
, result_detail
6588 if result
== "FAILED":
6589 raise LcmException(result_detail
)
6590 db_nsr_update
["config-status"] = old_config_status
6591 scale_process
= None
6595 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
6598 "_admin.scaling-group.{}.time".format(admin_scale_index
)
6601 # SCALE-IN VCA - BEGIN
6602 if vca_scaling_info
:
6603 step
= db_nslcmop_update
[
6605 ] = "Deleting the execution environments"
6606 scale_process
= "VCA"
6607 for vca_info
in vca_scaling_info
:
6608 if vca_info
["type"] == "delete" and not vca_info
.get("osm_kdu_id"):
6609 member_vnf_index
= str(vca_info
["member-vnf-index"])
6611 logging_text
+ "vdu info: {}".format(vca_info
)
6613 if vca_info
.get("osm_vdu_id"):
6614 vdu_id
= vca_info
["osm_vdu_id"]
6615 vdu_index
= int(vca_info
["vdu_index"])
6618 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6619 member_vnf_index
, vdu_id
, vdu_index
6621 stage
[2] = step
= "Scaling in VCA"
6622 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6623 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
6624 config_update
= db_nsr
["configurationStatus"]
6625 for vca_index
, vca
in enumerate(vca_update
):
6627 (vca
or vca
.get("ee_id"))
6628 and vca
["member-vnf-index"] == member_vnf_index
6629 and vca
["vdu_count_index"] == vdu_index
6631 if vca
.get("vdu_id"):
6632 config_descriptor
= get_configuration(
6633 db_vnfd
, vca
.get("vdu_id")
6635 elif vca
.get("kdu_name"):
6636 config_descriptor
= get_configuration(
6637 db_vnfd
, vca
.get("kdu_name")
6640 config_descriptor
= get_configuration(
6641 db_vnfd
, db_vnfd
["id"]
6643 operation_params
= (
6644 db_nslcmop
.get("operationParams") or {}
6646 exec_terminate_primitives
= not operation_params
.get(
6647 "skip_terminate_primitives"
6648 ) and vca
.get("needed_terminate")
6649 task
= asyncio
.ensure_future(
6658 exec_primitives
=exec_terminate_primitives
,
6662 timeout
=self
.timeout_charm_delete
,
6665 tasks_dict_info
[task
] = "Terminating VCA {}".format(
6668 del vca_update
[vca_index
]
6669 del config_update
[vca_index
]
6670 # wait for pending tasks of terminate primitives
6674 + "Waiting for tasks {}".format(
6675 list(tasks_dict_info
.keys())
6678 error_list
= await self
._wait
_for
_tasks
(
6682 self
.timeout_charm_delete
, self
.timeout_ns_terminate
6687 tasks_dict_info
.clear()
6689 raise LcmException("; ".join(error_list
))
6691 db_vca_and_config_update
= {
6692 "_admin.deployed.VCA": vca_update
,
6693 "configurationStatus": config_update
,
6696 "nsrs", db_nsr
["_id"], db_vca_and_config_update
6698 scale_process
= None
6699 # SCALE-IN VCA - END
6702 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
6703 scale_process
= "RO"
6704 if self
.ro_config
.get("ng"):
6705 await self
._scale
_ng
_ro
(
6706 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
6708 scaling_info
.pop("vdu-create", None)
6709 scaling_info
.pop("vdu-delete", None)
6711 scale_process
= None
6715 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
6716 scale_process
= "KDU"
6717 await self
._scale
_kdu
(
6718 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6720 scaling_info
.pop("kdu-create", None)
6721 scaling_info
.pop("kdu-delete", None)
6723 scale_process
= None
6727 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6729 # SCALE-UP VCA - BEGIN
6730 if vca_scaling_info
:
6731 step
= db_nslcmop_update
[
6733 ] = "Creating new execution environments"
6734 scale_process
= "VCA"
6735 for vca_info
in vca_scaling_info
:
6736 if vca_info
["type"] == "create" and not vca_info
.get("osm_kdu_id"):
6737 member_vnf_index
= str(vca_info
["member-vnf-index"])
6739 logging_text
+ "vdu info: {}".format(vca_info
)
6741 vnfd_id
= db_vnfr
["vnfd-ref"]
6742 if vca_info
.get("osm_vdu_id"):
6743 vdu_index
= int(vca_info
["vdu_index"])
6744 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
6745 if db_vnfr
.get("additionalParamsForVnf"):
6746 deploy_params
.update(
6748 db_vnfr
["additionalParamsForVnf"].copy()
6751 descriptor_config
= get_configuration(
6752 db_vnfd
, db_vnfd
["id"]
6754 if descriptor_config
:
6759 logging_text
=logging_text
6760 + "member_vnf_index={} ".format(member_vnf_index
),
6763 nslcmop_id
=nslcmop_id
,
6769 member_vnf_index
=member_vnf_index
,
6770 vdu_index
=vdu_index
,
6772 deploy_params
=deploy_params
,
6773 descriptor_config
=descriptor_config
,
6774 base_folder
=base_folder
,
6775 task_instantiation_info
=tasks_dict_info
,
6778 vdu_id
= vca_info
["osm_vdu_id"]
6779 vdur
= find_in_list(
6780 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
6782 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
6783 if vdur
.get("additionalParams"):
6784 deploy_params_vdu
= parse_yaml_strings(
6785 vdur
["additionalParams"]
6788 deploy_params_vdu
= deploy_params
6789 deploy_params_vdu
["OSM"] = get_osm_params(
6790 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
6792 if descriptor_config
:
6797 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6798 member_vnf_index
, vdu_id
, vdu_index
6800 stage
[2] = step
= "Scaling out VCA"
6801 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6803 logging_text
=logging_text
6804 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6805 member_vnf_index
, vdu_id
, vdu_index
6809 nslcmop_id
=nslcmop_id
,
6815 member_vnf_index
=member_vnf_index
,
6816 vdu_index
=vdu_index
,
6818 deploy_params
=deploy_params_vdu
,
6819 descriptor_config
=descriptor_config
,
6820 base_folder
=base_folder
,
6821 task_instantiation_info
=tasks_dict_info
,
6824 # SCALE-UP VCA - END
6825 scale_process
= None
6828 # execute primitive service POST-SCALING
6829 step
= "Executing post-scale vnf-config-primitive"
6830 if scaling_descriptor
.get("scaling-config-action"):
6831 for scaling_config_action
in scaling_descriptor
[
6832 "scaling-config-action"
6835 scaling_config_action
.get("trigger") == "post-scale-in"
6836 and scaling_type
== "SCALE_IN"
6838 scaling_config_action
.get("trigger") == "post-scale-out"
6839 and scaling_type
== "SCALE_OUT"
6841 vnf_config_primitive
= scaling_config_action
[
6842 "vnf-config-primitive-name-ref"
6844 step
= db_nslcmop_update
[
6846 ] = "executing post-scale scaling-config-action '{}'".format(
6847 vnf_config_primitive
6850 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6851 if db_vnfr
.get("additionalParamsForVnf"):
6852 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6854 # look for primitive
6855 for config_primitive
in (
6856 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6857 ).get("config-primitive", ()):
6858 if config_primitive
["name"] == vnf_config_primitive
:
6862 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
6863 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
6864 "config-primitive".format(
6865 scaling_group
, vnf_config_primitive
6868 scale_process
= "VCA"
6869 db_nsr_update
["config-status"] = "configuring post-scaling"
6870 primitive_params
= self
._map
_primitive
_params
(
6871 config_primitive
, {}, vnfr_params
6874 # Post-scale retry check: Check if this sub-operation has been executed before
6875 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6878 vnf_config_primitive
,
6882 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6883 # Skip sub-operation
6884 result
= "COMPLETED"
6885 result_detail
= "Done"
6888 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6889 vnf_config_primitive
, result
, result_detail
6893 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6894 # New sub-operation: Get index of this sub-operation
6896 len(db_nslcmop
.get("_admin", {}).get("operations"))
6901 + "vnf_config_primitive={} New sub-operation".format(
6902 vnf_config_primitive
6906 # retry: Get registered params for this existing sub-operation
6907 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6910 vnf_index
= op
.get("member_vnf_index")
6911 vnf_config_primitive
= op
.get("primitive")
6912 primitive_params
= op
.get("primitive_params")
6915 + "vnf_config_primitive={} Sub-operation retry".format(
6916 vnf_config_primitive
6919 # Execute the primitive, either with new (first-time) or registered (reintent) args
6920 ee_descriptor_id
= config_primitive
.get(
6921 "execution-environment-ref"
6923 primitive_name
= config_primitive
.get(
6924 "execution-environment-primitive", vnf_config_primitive
6926 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6927 nsr_deployed
["VCA"],
6928 member_vnf_index
=vnf_index
,
6930 vdu_count_index
=None,
6931 ee_descriptor_id
=ee_descriptor_id
,
6933 result
, result_detail
= await self
._ns
_execute
_primitive
(
6942 + "vnf_config_primitive={} Done with result {} {}".format(
6943 vnf_config_primitive
, result
, result_detail
6946 # Update operationState = COMPLETED | FAILED
6947 self
._update
_suboperation
_status
(
6948 db_nslcmop
, op_index
, result
, result_detail
6951 if result
== "FAILED":
6952 raise LcmException(result_detail
)
6953 db_nsr_update
["config-status"] = old_config_status
6954 scale_process
= None
6959 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
6960 db_nsr_update
["operational-status"] = (
6962 if old_operational_status
== "failed"
6963 else old_operational_status
6965 db_nsr_update
["config-status"] = old_config_status
6968 ROclient
.ROClientException
,
6973 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
6975 except asyncio
.CancelledError
:
6977 logging_text
+ "Cancelled Exception while '{}'".format(step
)
6979 exc
= "Operation was cancelled"
6980 except Exception as e
:
6981 exc
= traceback
.format_exc()
6982 self
.logger
.critical(
6983 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6987 self
._write
_ns
_status
(
6990 current_operation
="IDLE",
6991 current_operation_id
=None,
6994 stage
[1] = "Waiting for instantiate pending tasks."
6995 self
.logger
.debug(logging_text
+ stage
[1])
6996 exc
= await self
._wait
_for
_tasks
(
6999 self
.timeout_ns_deploy
,
7007 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7008 nslcmop_operation_state
= "FAILED"
7010 db_nsr_update
["operational-status"] = old_operational_status
7011 db_nsr_update
["config-status"] = old_config_status
7012 db_nsr_update
["detailed-status"] = ""
7014 if "VCA" in scale_process
:
7015 db_nsr_update
["config-status"] = "failed"
7016 if "RO" in scale_process
:
7017 db_nsr_update
["operational-status"] = "failed"
7020 ] = "FAILED scaling nslcmop={} {}: {}".format(
7021 nslcmop_id
, step
, exc
7024 error_description_nslcmop
= None
7025 nslcmop_operation_state
= "COMPLETED"
7026 db_nslcmop_update
["detailed-status"] = "Done"
7028 self
._write
_op
_status
(
7031 error_message
=error_description_nslcmop
,
7032 operation_state
=nslcmop_operation_state
,
7033 other_update
=db_nslcmop_update
,
7036 self
._write
_ns
_status
(
7039 current_operation
="IDLE",
7040 current_operation_id
=None,
7041 other_update
=db_nsr_update
,
7044 if nslcmop_operation_state
:
7048 "nslcmop_id": nslcmop_id
,
7049 "operationState": nslcmop_operation_state
,
7051 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
7052 except Exception as e
:
7054 logging_text
+ "kafka_write notification Exception {}".format(e
)
7056 self
.logger
.debug(logging_text
+ "Exit")
7057 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
7059 async def _scale_kdu(
7060 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
7062 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
7063 for kdu_name
in _scaling_info
:
7064 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
7065 deployed_kdu
, index
= get_deployed_kdu(
7066 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
7068 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
7069 kdu_instance
= deployed_kdu
["kdu-instance"]
7070 kdu_model
= deployed_kdu
.get("kdu-model")
7071 scale
= int(kdu_scaling_info
["scale"])
7072 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
7075 "collection": "nsrs",
7076 "filter": {"_id": nsr_id
},
7077 "path": "_admin.deployed.K8s.{}".format(index
),
7080 step
= "scaling application {}".format(
7081 kdu_scaling_info
["resource-name"]
7083 self
.logger
.debug(logging_text
+ step
)
7085 if kdu_scaling_info
["type"] == "delete":
7086 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7089 and kdu_config
.get("terminate-config-primitive")
7090 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7092 terminate_config_primitive_list
= kdu_config
.get(
7093 "terminate-config-primitive"
7095 terminate_config_primitive_list
.sort(
7096 key
=lambda val
: int(val
["seq"])
7100 terminate_config_primitive
7101 ) in terminate_config_primitive_list
:
7102 primitive_params_
= self
._map
_primitive
_params
(
7103 terminate_config_primitive
, {}, {}
7105 step
= "execute terminate config primitive"
7106 self
.logger
.debug(logging_text
+ step
)
7107 await asyncio
.wait_for(
7108 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7109 cluster_uuid
=cluster_uuid
,
7110 kdu_instance
=kdu_instance
,
7111 primitive_name
=terminate_config_primitive
["name"],
7112 params
=primitive_params_
,
7119 await asyncio
.wait_for(
7120 self
.k8scluster_map
[k8s_cluster_type
].scale(
7123 kdu_scaling_info
["resource-name"],
7125 cluster_uuid
=cluster_uuid
,
7126 kdu_model
=kdu_model
,
7130 timeout
=self
.timeout_vca_on_error
,
7133 if kdu_scaling_info
["type"] == "create":
7134 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7137 and kdu_config
.get("initial-config-primitive")
7138 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7140 initial_config_primitive_list
= kdu_config
.get(
7141 "initial-config-primitive"
7143 initial_config_primitive_list
.sort(
7144 key
=lambda val
: int(val
["seq"])
7147 for initial_config_primitive
in initial_config_primitive_list
:
7148 primitive_params_
= self
._map
_primitive
_params
(
7149 initial_config_primitive
, {}, {}
7151 step
= "execute initial config primitive"
7152 self
.logger
.debug(logging_text
+ step
)
7153 await asyncio
.wait_for(
7154 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7155 cluster_uuid
=cluster_uuid
,
7156 kdu_instance
=kdu_instance
,
7157 primitive_name
=initial_config_primitive
["name"],
7158 params
=primitive_params_
,
7165 async def _scale_ng_ro(
7166 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
7168 nsr_id
= db_nslcmop
["nsInstanceId"]
7169 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7172 # read from db: vnfd's for every vnf
7175 # for each vnf in ns, read vnfd
7176 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
7177 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
7178 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
7179 # if we haven't this vnfd, read it from db
7180 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
7182 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7183 db_vnfds
.append(vnfd
)
7184 n2vc_key
= self
.n2vc
.get_public_key()
7185 n2vc_key_list
= [n2vc_key
]
7188 vdu_scaling_info
.get("vdu-create"),
7189 vdu_scaling_info
.get("vdu-delete"),
7192 # db_vnfr has been updated, update db_vnfrs to use it
7193 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
7194 await self
._instantiate
_ng
_ro
(
7204 start_deploy
=time(),
7205 timeout_ns_deploy
=self
.timeout_ns_deploy
,
7207 if vdu_scaling_info
.get("vdu-delete"):
7209 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
7212 async def extract_prometheus_scrape_jobs(
7213 self
, ee_id
, artifact_path
, ee_config_descriptor
, vnfr_id
, nsr_id
, target_ip
7215 # look if exist a file called 'prometheus*.j2' and
7216 artifact_content
= self
.fs
.dir_ls(artifact_path
)
7220 for f
in artifact_content
7221 if f
.startswith("prometheus") and f
.endswith(".j2")
7227 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
7231 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
7232 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
7234 vnfr_id
= vnfr_id
.replace("-", "")
7236 "JOB_NAME": vnfr_id
,
7237 "TARGET_IP": target_ip
,
7238 "EXPORTER_POD_IP": host_name
,
7239 "EXPORTER_POD_PORT": host_port
,
7241 job_list
= parse_job(job_data
, variables
)
7242 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
7243 for job
in job_list
:
7245 not isinstance(job
.get("job_name"), str)
7246 or vnfr_id
not in job
["job_name"]
7248 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
7249 job
["nsr_id"] = nsr_id
7250 job
["vnfr_id"] = vnfr_id
7253 async def rebuild_start_stop(self
, nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
):
7254 logging_text
= "Task ns={} {}={} ".format(nsr_id
, operation_type
, nslcmop_id
)
7255 self
.logger
.info(logging_text
+ "Enter")
7256 stage
= ["Preparing the environment", ""]
7257 # database nsrs record
7261 # in case of error, indicates what part of scale was failed to put nsr at error status
7262 start_deploy
= time()
7264 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_id
})
7265 vim_account_id
= db_vnfr
.get("vim-account-id")
7266 vim_info_key
= "vim:" + vim_account_id
7267 vdur
= find_in_list(
7268 db_vnfr
["vdur"], lambda vdu
: vdu
["count-index"] == additional_param
["count-index"]
7271 vdu_vim_name
= vdur
["name"]
7272 vim_vm_id
= vdur
["vim_info"][vim_info_key
]["vim_id"]
7273 target_vim
, _
= next(k_v
for k_v
in vdur
["vim_info"].items())
7274 self
.logger
.info("vdu_vim_name >> {} ".format(vdu_vim_name
))
7275 # wait for any previous tasks in process
7276 stage
[1] = "Waiting for previous operations to terminate"
7277 self
.logger
.info(stage
[1])
7278 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
7280 stage
[1] = "Reading from database."
7281 self
.logger
.info(stage
[1])
7282 self
._write
_ns
_status
(
7285 current_operation
=operation_type
.upper(),
7286 current_operation_id
=nslcmop_id
7288 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7291 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
7292 db_nsr_update
["operational-status"] = operation_type
7293 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7297 "vim_vm_id": vim_vm_id
,
7299 "vdu_index": additional_param
["count-index"],
7300 "vdu_id": vdur
["id"],
7301 "target_vim": target_vim
,
7302 "vim_account_id": vim_account_id
7305 stage
[1] = "Sending rebuild request to RO... {}".format(desc
)
7306 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7307 self
.logger
.info("ro nsr id: {}".format(nsr_id
))
7308 result_dict
= await self
.RO
.operate(nsr_id
, desc
, operation_type
)
7309 self
.logger
.info("response from RO: {}".format(result_dict
))
7310 action_id
= result_dict
["action_id"]
7311 await self
._wait
_ng
_ro
(
7312 nsr_id
, action_id
, nslcmop_id
, start_deploy
, self
.timeout_operate
7314 return "COMPLETED", "Done"
7315 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7316 self
.logger
.error("Exit Exception {}".format(e
))
7318 except asyncio
.CancelledError
:
7319 self
.logger
.error("Cancelled Exception while '{}'".format(stage
))
7320 exc
= "Operation was cancelled"
7321 except Exception as e
:
7322 exc
= traceback
.format_exc()
7323 self
.logger
.critical("Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
7324 return "FAILED", "Error in operate VNF {}".format(exc
)
7326 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7328 Get VCA Cloud and VCA Cloud Credentials for the VIM account
7330 :param: vim_account_id: VIM Account ID
7332 :return: (cloud_name, cloud_credential)
7334 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7335 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
7337 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7339 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
7341 :param: vim_account_id: VIM Account ID
7343 :return: (cloud_name, cloud_credential)
7345 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7346 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")
7348 async def migrate(self
, nsr_id
, nslcmop_id
):
7350 Migrate VNFs and VDUs instances in a NS
7352 :param: nsr_id: NS Instance ID
7353 :param: nslcmop_id: nslcmop ID of migrate
7356 # Try to lock HA task here
7357 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7358 if not task_is_locked_by_me
:
7360 logging_text
= "Task ns={} migrate ".format(nsr_id
)
7361 self
.logger
.debug(logging_text
+ "Enter")
7362 # get all needed from database
7364 db_nslcmop_update
= {}
7365 nslcmop_operation_state
= None
7369 # in case of error, indicates what part of scale was failed to put nsr at error status
7370 start_deploy
= time()
7373 # wait for any previous tasks in process
7374 step
= "Waiting for previous operations to terminate"
7375 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7377 self
._write
_ns
_status
(
7380 current_operation
="MIGRATING",
7381 current_operation_id
=nslcmop_id
,
7383 step
= "Getting nslcmop from database"
7385 step
+ " after having waited for previous tasks to be completed"
7387 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7388 migrate_params
= db_nslcmop
.get("operationParams")
7391 target
.update(migrate_params
)
7392 desc
= await self
.RO
.migrate(nsr_id
, target
)
7393 self
.logger
.debug("RO return > {}".format(desc
))
7394 action_id
= desc
["action_id"]
7395 await self
._wait
_ng
_ro
(
7396 nsr_id
, action_id
, nslcmop_id
, start_deploy
, self
.timeout_migrate
,
7399 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7400 self
.logger
.error("Exit Exception {}".format(e
))
7402 except asyncio
.CancelledError
:
7403 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
7404 exc
= "Operation was cancelled"
7405 except Exception as e
:
7406 exc
= traceback
.format_exc()
7407 self
.logger
.critical(
7408 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7411 self
._write
_ns
_status
(
7414 current_operation
="IDLE",
7415 current_operation_id
=None,
7418 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
7419 nslcmop_operation_state
= "FAILED"
7421 nslcmop_operation_state
= "COMPLETED"
7422 db_nslcmop_update
["detailed-status"] = "Done"
7423 db_nsr_update
["detailed-status"] = "Done"
7425 self
._write
_op
_status
(
7429 operation_state
=nslcmop_operation_state
,
7430 other_update
=db_nslcmop_update
,
7432 if nslcmop_operation_state
:
7436 "nslcmop_id": nslcmop_id
,
7437 "operationState": nslcmop_operation_state
,
7439 await self
.msg
.aiowrite("ns", "migrated", msg
, loop
=self
.loop
)
7440 except Exception as e
:
7442 logging_text
+ "kafka_write notification Exception {}".format(e
)
7444 self
.logger
.debug(logging_text
+ "Exit")
7445 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_migrate")
7448 async def heal(self
, nsr_id
, nslcmop_id
):
7452 :param nsr_id: ns instance to heal
7453 :param nslcmop_id: operation to run
7457 # Try to lock HA task here
7458 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7459 if not task_is_locked_by_me
:
7462 logging_text
= "Task ns={} heal={} ".format(nsr_id
, nslcmop_id
)
7463 stage
= ["", "", ""]
7464 tasks_dict_info
= {}
7465 # ^ stage, step, VIM progress
7466 self
.logger
.debug(logging_text
+ "Enter")
7467 # get all needed from database
7469 db_nslcmop_update
= {}
7471 db_vnfrs
= {} # vnf's info indexed by _id
7473 old_operational_status
= ""
7474 old_config_status
= ""
7477 # wait for any previous tasks in process
7478 step
= "Waiting for previous operations to terminate"
7479 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7480 self
._write
_ns
_status
(
7483 current_operation
="HEALING",
7484 current_operation_id
=nslcmop_id
,
7487 step
= "Getting nslcmop from database"
7489 step
+ " after having waited for previous tasks to be completed"
7491 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7493 step
= "Getting nsr from database"
7494 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
7495 old_operational_status
= db_nsr
["operational-status"]
7496 old_config_status
= db_nsr
["config-status"]
7499 "_admin.deployed.RO.operational-status": "healing",
7501 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7503 step
= "Sending heal order to VIM"
7504 task_ro
= asyncio
.ensure_future(
7506 logging_text
=logging_text
,
7508 db_nslcmop
=db_nslcmop
,
7512 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "heal_RO", task_ro
)
7513 tasks_dict_info
[task_ro
] = "Healing at VIM"
7517 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
7518 self
.logger
.debug(logging_text
+ stage
[1])
7519 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7520 self
.fs
.sync(db_nsr
["nsd-id"])
7522 # read from db: vnfr's of this ns
7523 step
= "Getting vnfrs from db"
7524 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
7525 for vnfr
in db_vnfrs_list
:
7526 db_vnfrs
[vnfr
["_id"]] = vnfr
7527 self
.logger
.debug("ns.heal db_vnfrs={}".format(db_vnfrs
))
7529 # Check for each target VNF
7530 target_list
= db_nslcmop
.get("operationParams", {}).get("healVnfData", {})
7531 for target_vnf
in target_list
:
7532 # Find this VNF in the list from DB
7533 vnfr_id
= target_vnf
.get("vnfInstanceId", None)
7535 db_vnfr
= db_vnfrs
[vnfr_id
]
7536 vnfd_id
= db_vnfr
.get("vnfd-id")
7537 vnfd_ref
= db_vnfr
.get("vnfd-ref")
7538 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7539 base_folder
= vnfd
["_admin"]["storage"]
7544 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
7545 member_vnf_index
= db_vnfr
.get("member-vnf-index-ref")
7547 # Check each target VDU and deploy N2VC
7548 for target_vdu
in target_vnf
["additionalParams"].get("vdu", None):
7549 deploy_params_vdu
= target_vdu
7550 # Set run-day1 vnf level value if not vdu level value exists
7551 if not deploy_params_vdu
.get("run-day1") and target_vnf
["additionalParams"].get("run-day1"):
7552 deploy_params_vdu
["run-day1"] = target_vnf
["additionalParams"].get("run-day1")
7553 vdu_name
= target_vdu
.get("vdu-id", None)
7554 # TODO: Get vdu_id from vdud.
7556 # For multi instance VDU count-index is mandatory
7557 # For single session VDU count-indes is 0
7558 vdu_index
= target_vdu
.get("count-index",0)
7560 # n2vc_redesign STEP 3 to 6 Deploy N2VC
7561 stage
[1] = "Deploying Execution Environments."
7562 self
.logger
.debug(logging_text
+ stage
[1])
7564 # VNF Level charm. Normal case when proxy charms.
7565 # If target instance is management machine continue with actions: recreate EE for native charms or reinject juju key for proxy charms.
7566 descriptor_config
= get_configuration(vnfd
, vnfd_ref
)
7567 if descriptor_config
:
7568 # Continue if healed machine is management machine
7569 vnf_ip_address
= db_vnfr
.get("ip-address")
7570 target_instance
= None
7571 for instance
in db_vnfr
.get("vdur", None):
7572 if ( instance
["vdu-name"] == vdu_name
and instance
["count-index"] == vdu_index
):
7573 target_instance
= instance
7575 if vnf_ip_address
== target_instance
.get("ip-address"):
7577 logging_text
=logging_text
7578 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7579 member_vnf_index
, vdu_name
, vdu_index
7583 nslcmop_id
=nslcmop_id
,
7589 member_vnf_index
=member_vnf_index
,
7592 deploy_params
=deploy_params_vdu
,
7593 descriptor_config
=descriptor_config
,
7594 base_folder
=base_folder
,
7595 task_instantiation_info
=tasks_dict_info
,
7599 # VDU Level charm. Normal case with native charms.
7600 descriptor_config
= get_configuration(vnfd
, vdu_name
)
7601 if descriptor_config
:
7603 logging_text
=logging_text
7604 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7605 member_vnf_index
, vdu_name
, vdu_index
7609 nslcmop_id
=nslcmop_id
,
7615 member_vnf_index
=member_vnf_index
,
7616 vdu_index
=vdu_index
,
7618 deploy_params
=deploy_params_vdu
,
7619 descriptor_config
=descriptor_config
,
7620 base_folder
=base_folder
,
7621 task_instantiation_info
=tasks_dict_info
,
7626 ROclient
.ROClientException
,
7631 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7633 except asyncio
.CancelledError
:
7635 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7637 exc
= "Operation was cancelled"
7638 except Exception as e
:
7639 exc
= traceback
.format_exc()
7640 self
.logger
.critical(
7641 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7646 stage
[1] = "Waiting for healing pending tasks."
7647 self
.logger
.debug(logging_text
+ stage
[1])
7648 exc
= await self
._wait
_for
_tasks
(
7651 self
.timeout_ns_deploy
,
7659 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7660 nslcmop_operation_state
= "FAILED"
7662 db_nsr_update
["operational-status"] = old_operational_status
7663 db_nsr_update
["config-status"] = old_config_status
7666 ] = "FAILED healing nslcmop={} {}: {}".format(
7667 nslcmop_id
, step
, exc
7669 for task
, task_name
in tasks_dict_info
.items():
7670 if not task
.done() or task
.cancelled() or task
.exception():
7671 if task_name
.startswith(self
.task_name_deploy_vca
):
7672 # A N2VC task is pending
7673 db_nsr_update
["config-status"] = "failed"
7675 # RO task is pending
7676 db_nsr_update
["operational-status"] = "failed"
7678 error_description_nslcmop
= None
7679 nslcmop_operation_state
= "COMPLETED"
7680 db_nslcmop_update
["detailed-status"] = "Done"
7681 db_nsr_update
["detailed-status"] = "Done"
7682 db_nsr_update
["operational-status"] = "running"
7683 db_nsr_update
["config-status"] = "configured"
7685 self
._write
_op
_status
(
7688 error_message
=error_description_nslcmop
,
7689 operation_state
=nslcmop_operation_state
,
7690 other_update
=db_nslcmop_update
,
7693 self
._write
_ns
_status
(
7696 current_operation
="IDLE",
7697 current_operation_id
=None,
7698 other_update
=db_nsr_update
,
7701 if nslcmop_operation_state
:
7705 "nslcmop_id": nslcmop_id
,
7706 "operationState": nslcmop_operation_state
,
7708 await self
.msg
.aiowrite("ns", "healed", msg
, loop
=self
.loop
)
7709 except Exception as e
:
7711 logging_text
+ "kafka_write notification Exception {}".format(e
)
7713 self
.logger
.debug(logging_text
+ "Exit")
7714 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_heal")
7725 :param logging_text: preffix text to use at logging
7726 :param nsr_id: nsr identity
7727 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
7728 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
7729 :return: None or exception
7731 def get_vim_account(vim_account_id
):
7733 if vim_account_id
in db_vims
:
7734 return db_vims
[vim_account_id
]
7735 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
7736 db_vims
[vim_account_id
] = db_vim
7741 ns_params
= db_nslcmop
.get("operationParams")
7742 if ns_params
and ns_params
.get("timeout_ns_heal"):
7743 timeout_ns_heal
= ns_params
["timeout_ns_heal"]
7745 timeout_ns_heal
= self
.timeout
.get(
7746 "ns_heal", self
.timeout_ns_heal
7751 nslcmop_id
= db_nslcmop
["_id"]
7753 "action_id": nslcmop_id
,
7755 self
.logger
.warning("db_nslcmop={} and timeout_ns_heal={}".format(db_nslcmop
,timeout_ns_heal
))
7756 target
.update(db_nslcmop
.get("operationParams", {}))
7758 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
7759 desc
= await self
.RO
.recreate(nsr_id
, target
)
7760 self
.logger
.debug("RO return > {}".format(desc
))
7761 action_id
= desc
["action_id"]
7762 # waits for RO to complete because Reinjecting juju key at ro can find VM in state Deleted
7763 await self
._wait
_ng
_ro
(
7764 nsr_id
, action_id
, nslcmop_id
, start_heal
, timeout_ns_heal
, stage
,
7770 "_admin.deployed.RO.operational-status": "running",
7771 "detailed-status": " ".join(stage
),
7773 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7774 self
._write
_op
_status
(nslcmop_id
, stage
)
7776 logging_text
+ "ns healed at RO. RO_id={}".format(action_id
)
7779 except Exception as e
:
7780 stage
[2] = "ERROR healing at VIM"
7781 #self.set_vnfr_at_error(db_vnfrs, str(e))
7783 "Error healing at VIM {}".format(e
),
7784 exc_info
=not isinstance(
7787 ROclient
.ROClientException
,
7813 task_instantiation_info
,
7816 # launch instantiate_N2VC in a asyncio task and register task object
7817 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
7818 # if not found, create one entry and update database
7819 # fill db_nsr._admin.deployed.VCA.<index>
7822 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
7824 if "execution-environment-list" in descriptor_config
:
7825 ee_list
= descriptor_config
.get("execution-environment-list", [])
7826 elif "juju" in descriptor_config
:
7827 ee_list
= [descriptor_config
] # ns charms
7828 else: # other types as script are not supported
7831 for ee_item
in ee_list
:
7834 + "_deploy_n2vc ee_item juju={}, helm={}".format(
7835 ee_item
.get("juju"), ee_item
.get("helm-chart")
7838 ee_descriptor_id
= ee_item
.get("id")
7839 if ee_item
.get("juju"):
7840 vca_name
= ee_item
["juju"].get("charm")
7843 if ee_item
["juju"].get("charm") is not None
7846 if ee_item
["juju"].get("cloud") == "k8s":
7847 vca_type
= "k8s_proxy_charm"
7848 elif ee_item
["juju"].get("proxy") is False:
7849 vca_type
= "native_charm"
7850 elif ee_item
.get("helm-chart"):
7851 vca_name
= ee_item
["helm-chart"]
7852 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
7855 vca_type
= "helm-v3"
7858 logging_text
+ "skipping non juju neither charm configuration"
7863 for vca_index
, vca_deployed
in enumerate(
7864 db_nsr
["_admin"]["deployed"]["VCA"]
7866 if not vca_deployed
:
7869 vca_deployed
.get("member-vnf-index") == member_vnf_index
7870 and vca_deployed
.get("vdu_id") == vdu_id
7871 and vca_deployed
.get("kdu_name") == kdu_name
7872 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
7873 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
7877 # not found, create one.
7879 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
7882 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
7884 target
+= "/kdu/{}".format(kdu_name
)
7886 "target_element": target
,
7887 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
7888 "member-vnf-index": member_vnf_index
,
7890 "kdu_name": kdu_name
,
7891 "vdu_count_index": vdu_index
,
7892 "operational-status": "init", # TODO revise
7893 "detailed-status": "", # TODO revise
7894 "step": "initial-deploy", # TODO revise
7896 "vdu_name": vdu_name
,
7898 "ee_descriptor_id": ee_descriptor_id
,
7902 # create VCA and configurationStatus in db
7904 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
7905 "configurationStatus.{}".format(vca_index
): dict(),
7907 self
.update_db_2("nsrs", nsr_id
, db_dict
)
7909 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
7911 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
7912 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
7913 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
7916 task_n2vc
= asyncio
.ensure_future(
7918 logging_text
=logging_text
,
7919 vca_index
=vca_index
,
7925 vdu_index
=vdu_index
,
7926 deploy_params
=deploy_params
,
7927 config_descriptor
=descriptor_config
,
7928 base_folder
=base_folder
,
7929 nslcmop_id
=nslcmop_id
,
7933 ee_config_descriptor
=ee_item
,
7936 self
.lcm_tasks
.register(
7940 "instantiate_N2VC-{}".format(vca_index
),
7943 task_instantiation_info
[
7945 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
7946 member_vnf_index
or "", vdu_id
or ""
7949 async def heal_N2VC(
7966 ee_config_descriptor
,
7968 nsr_id
= db_nsr
["_id"]
7969 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
7970 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
7971 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
7972 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
7974 "collection": "nsrs",
7975 "filter": {"_id": nsr_id
},
7976 "path": db_update_entry
,
7982 element_under_configuration
= nsr_id
7986 vnfr_id
= db_vnfr
["_id"]
7987 osm_config
["osm"]["vnf_id"] = vnfr_id
7989 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
7991 if vca_type
== "native_charm":
7994 index_number
= vdu_index
or 0
7997 element_type
= "VNF"
7998 element_under_configuration
= vnfr_id
7999 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
8001 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
8002 element_type
= "VDU"
8003 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
8004 osm_config
["osm"]["vdu_id"] = vdu_id
8006 namespace
+= ".{}".format(kdu_name
)
8007 element_type
= "KDU"
8008 element_under_configuration
= kdu_name
8009 osm_config
["osm"]["kdu_name"] = kdu_name
8012 if base_folder
["pkg-dir"]:
8013 artifact_path
= "{}/{}/{}/{}".format(
8014 base_folder
["folder"],
8015 base_folder
["pkg-dir"],
8018 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8023 artifact_path
= "{}/Scripts/{}/{}/".format(
8024 base_folder
["folder"],
8027 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8032 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
8034 # get initial_config_primitive_list that applies to this element
8035 initial_config_primitive_list
= config_descriptor
.get(
8036 "initial-config-primitive"
8040 "Initial config primitive list > {}".format(
8041 initial_config_primitive_list
8045 # add config if not present for NS charm
8046 ee_descriptor_id
= ee_config_descriptor
.get("id")
8047 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
8048 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
8049 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
8053 "Initial config primitive list #2 > {}".format(
8054 initial_config_primitive_list
8057 # n2vc_redesign STEP 3.1
8058 # find old ee_id if exists
8059 ee_id
= vca_deployed
.get("ee_id")
8061 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
8062 # create or register execution environment in VCA. Only for native charms when healing
8063 if vca_type
== "native_charm":
8064 step
= "Waiting to VM being up and getting IP address"
8065 self
.logger
.debug(logging_text
+ step
)
8066 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8075 credentials
= {"hostname": rw_mgmt_ip
}
8077 username
= deep_get(
8078 config_descriptor
, ("config-access", "ssh-access", "default-user")
8080 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
8081 # merged. Meanwhile let's get username from initial-config-primitive
8082 if not username
and initial_config_primitive_list
:
8083 for config_primitive
in initial_config_primitive_list
:
8084 for param
in config_primitive
.get("parameter", ()):
8085 if param
["name"] == "ssh-username":
8086 username
= param
["value"]
8090 "Cannot determine the username neither with 'initial-config-primitive' nor with "
8091 "'config-access.ssh-access.default-user'"
8093 credentials
["username"] = username
8095 # n2vc_redesign STEP 3.2
8096 # TODO: Before healing at RO it is needed to destroy native charm units to be deleted.
8097 self
._write
_configuration
_status
(
8099 vca_index
=vca_index
,
8100 status
="REGISTERING",
8101 element_under_configuration
=element_under_configuration
,
8102 element_type
=element_type
,
8105 step
= "register execution environment {}".format(credentials
)
8106 self
.logger
.debug(logging_text
+ step
)
8107 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
8108 credentials
=credentials
,
8109 namespace
=namespace
,
8114 # update ee_id en db
8116 "_admin.deployed.VCA.{}.ee_id".format(vca_index
): ee_id
,
8118 self
.update_db_2("nsrs", nsr_id
, db_dict_ee_id
)
8120 # for compatibility with MON/POL modules, the need model and application name at database
8121 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
8122 # Not sure if this need to be done when healing
8124 ee_id_parts = ee_id.split(".")
8125 db_nsr_update = {db_update_entry + "ee_id": ee_id}
8126 if len(ee_id_parts) >= 2:
8127 model_name = ee_id_parts[0]
8128 application_name = ee_id_parts[1]
8129 db_nsr_update[db_update_entry + "model"] = model_name
8130 db_nsr_update[db_update_entry + "application"] = application_name
8133 # n2vc_redesign STEP 3.3
8134 # Install configuration software. Only for native charms.
8135 step
= "Install configuration Software"
8137 self
._write
_configuration
_status
(
8139 vca_index
=vca_index
,
8140 status
="INSTALLING SW",
8141 element_under_configuration
=element_under_configuration
,
8142 element_type
=element_type
,
8143 #other_update=db_nsr_update,
8147 # TODO check if already done
8148 self
.logger
.debug(logging_text
+ step
)
8150 if vca_type
== "native_charm":
8151 config_primitive
= next(
8152 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
8155 if config_primitive
:
8156 config
= self
._map
_primitive
_params
(
8157 config_primitive
, {}, deploy_params
8159 await self
.vca_map
[vca_type
].install_configuration_sw(
8161 artifact_path
=artifact_path
,
8169 # write in db flag of configuration_sw already installed
8171 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
8174 # Not sure if this need to be done when healing
8176 # add relations for this VCA (wait for other peers related with this VCA)
8177 await self._add_vca_relations(
8178 logging_text=logging_text,
8181 vca_index=vca_index,
8185 # if SSH access is required, then get execution environment SSH public
8186 # if native charm we have waited already to VM be UP
8187 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
8190 # self.logger.debug("get ssh key block")
8192 config_descriptor
, ("config-access", "ssh-access", "required")
8194 # self.logger.debug("ssh key needed")
8195 # Needed to inject a ssh key
8198 ("config-access", "ssh-access", "default-user"),
8200 step
= "Install configuration Software, getting public ssh key"
8201 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
8202 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
8205 step
= "Insert public key into VM user={} ssh_key={}".format(
8209 # self.logger.debug("no need to get ssh key")
8210 step
= "Waiting to VM being up and getting IP address"
8211 self
.logger
.debug(logging_text
+ step
)
8213 # n2vc_redesign STEP 5.1
8214 # wait for RO (ip-address) Insert pub_key into VM
8215 # IMPORTANT: We need do wait for RO to complete healing operation.
8216 await self
._wait
_heal
_ro
(nsr_id
,self
.timeout_ns_heal
)
8219 rw_mgmt_ip
= await self
.wait_kdu_up(
8220 logging_text
, nsr_id
, vnfr_id
, kdu_name
8223 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8233 rw_mgmt_ip
= None # This is for a NS configuration
8235 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
8237 # store rw_mgmt_ip in deploy params for later replacement
8238 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
8241 # get run-day1 operation parameter
8242 runDay1
= deploy_params
.get("run-day1",False)
8243 self
.logger
.debug(" Healing vnf={}, vdu={}, runDay1 ={}".format(vnfr_id
,vdu_id
,runDay1
))
8245 # n2vc_redesign STEP 6 Execute initial config primitive
8246 step
= "execute initial config primitive"
8248 # wait for dependent primitives execution (NS -> VNF -> VDU)
8249 if initial_config_primitive_list
:
8250 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
8252 # stage, in function of element type: vdu, kdu, vnf or ns
8253 my_vca
= vca_deployed_list
[vca_index
]
8254 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
8256 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
8257 elif my_vca
.get("member-vnf-index"):
8259 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
8262 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
8264 self
._write
_configuration
_status
(
8265 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
8268 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
8270 check_if_terminated_needed
= True
8271 for initial_config_primitive
in initial_config_primitive_list
:
8272 # adding information on the vca_deployed if it is a NS execution environment
8273 if not vca_deployed
["member-vnf-index"]:
8274 deploy_params
["ns_config_info"] = json
.dumps(
8275 self
._get
_ns
_config
_info
(nsr_id
)
8277 # TODO check if already done
8278 primitive_params_
= self
._map
_primitive
_params
(
8279 initial_config_primitive
, {}, deploy_params
8282 step
= "execute primitive '{}' params '{}'".format(
8283 initial_config_primitive
["name"], primitive_params_
8285 self
.logger
.debug(logging_text
+ step
)
8286 await self
.vca_map
[vca_type
].exec_primitive(
8288 primitive_name
=initial_config_primitive
["name"],
8289 params_dict
=primitive_params_
,
8294 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
8295 if check_if_terminated_needed
:
8296 if config_descriptor
.get("terminate-config-primitive"):
8298 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
8300 check_if_terminated_needed
= False
8302 # TODO register in database that primitive is done
8304 # STEP 7 Configure metrics
8305 # Not sure if this need to be done when healing
8307 if vca_type == "helm" or vca_type == "helm-v3":
8308 prometheus_jobs = await self.extract_prometheus_scrape_jobs(
8310 artifact_path=artifact_path,
8311 ee_config_descriptor=ee_config_descriptor,
8314 target_ip=rw_mgmt_ip,
8320 {db_update_entry + "prometheus_jobs": prometheus_jobs},
8323 for job in prometheus_jobs:
8326 {"job_name": job["job_name"]},
8329 fail_on_empty=False,
8333 step
= "instantiated at VCA"
8334 self
.logger
.debug(logging_text
+ step
)
8336 self
._write
_configuration
_status
(
8337 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
8340 except Exception as e
: # TODO not use Exception but N2VC exception
8341 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
8343 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
8346 "Exception while {} : {}".format(step
, e
), exc_info
=True
8348 self
._write
_configuration
_status
(
8349 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
8351 raise LcmException("{} {}".format(step
, e
)) from e
8353 async def _wait_heal_ro(
8359 while time() <= start_time
+ timeout
:
8360 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
8361 operational_status_ro
= db_nsr
["_admin"]["deployed"]["RO"]["operational-status"]
8362 self
.logger
.debug("Wait Heal RO > {}".format(operational_status_ro
))
8363 if operational_status_ro
!= "healing":
8365 await asyncio
.sleep(15, loop
=self
.loop
)
8366 else: # timeout_ns_deploy
8367 raise NgRoException("Timeout waiting ns to deploy")
8369 async def vertical_scale(self
, nsr_id
, nslcmop_id
):
8371 Vertical Scale the VDUs in a NS
8373 :param: nsr_id: NS Instance ID
8374 :param: nslcmop_id: nslcmop ID of migrate
8377 # Try to lock HA task here
8378 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
8379 if not task_is_locked_by_me
:
8381 logging_text
= "Task ns={} vertical scale ".format(nsr_id
)
8382 self
.logger
.debug(logging_text
+ "Enter")
8383 # get all needed from database
8385 db_nslcmop_update
= {}
8386 nslcmop_operation_state
= None
8390 # in case of error, indicates what part of scale was failed to put nsr at error status
8391 start_deploy
= time()
8394 # wait for any previous tasks in process
8395 step
= "Waiting for previous operations to terminate"
8396 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
8398 self
._write
_ns
_status
(
8401 current_operation
="VerticalScale",
8402 current_operation_id
=nslcmop_id
8404 step
= "Getting nslcmop from database"
8405 self
.logger
.debug(step
+ " after having waited for previous tasks to be completed")
8406 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
8407 operationParams
= db_nslcmop
.get("operationParams")
8409 target
.update(operationParams
)
8410 desc
= await self
.RO
.vertical_scale(nsr_id
, target
)
8411 self
.logger
.debug("RO return > {}".format(desc
))
8412 action_id
= desc
["action_id"]
8413 await self
._wait
_ng
_ro
(
8414 nsr_id
, action_id
, nslcmop_id
, start_deploy
, self
.timeout_verticalscale
8416 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
8417 self
.logger
.error("Exit Exception {}".format(e
))
8419 except asyncio
.CancelledError
:
8420 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
8421 exc
= "Operation was cancelled"
8422 except Exception as e
:
8423 exc
= traceback
.format_exc()
8424 self
.logger
.critical("Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
8426 self
._write
_ns
_status
(
8429 current_operation
="IDLE",
8430 current_operation_id
=None,
8435 ] = "FAILED {}: {}".format(step
, exc
)
8436 nslcmop_operation_state
= "FAILED"
8438 nslcmop_operation_state
= "COMPLETED"
8439 db_nslcmop_update
["detailed-status"] = "Done"
8440 db_nsr_update
["detailed-status"] = "Done"
8442 self
._write
_op
_status
(
8446 operation_state
=nslcmop_operation_state
,
8447 other_update
=db_nslcmop_update
,
8449 if nslcmop_operation_state
:
8453 "nslcmop_id": nslcmop_id
,
8454 "operationState": nslcmop_operation_state
,
8456 await self
.msg
.aiowrite("ns", "verticalscaled", msg
, loop
=self
.loop
)
8457 except Exception as e
:
8459 logging_text
+ "kafka_write notification Exception {}".format(e
)
8461 self
.logger
.debug(logging_text
+ "Exit")
8462 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_verticalscale")