1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 from typing
import Any
, Dict
, List
24 import logging
.handlers
35 from osm_lcm
import ROclient
36 from osm_lcm
.data_utils
.nsr
import (
39 get_deployed_vca_list
,
42 from osm_lcm
.data_utils
.vca
import (
51 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
52 from osm_lcm
.lcm_utils
import (
59 check_juju_bundle_existence
,
60 get_charm_artifact_path
,
62 from osm_lcm
.data_utils
.nsd
import (
63 get_ns_configuration_relation_list
,
67 from osm_lcm
.data_utils
.vnfd
import (
73 get_ee_sorted_initial_config_primitive_list
,
74 get_ee_sorted_terminate_config_primitive_list
,
76 get_virtual_link_profiles
,
81 get_number_of_instances
,
83 get_kdu_resource_profile
,
84 find_software_version
,
86 from osm_lcm
.data_utils
.list_utils
import find_in_list
87 from osm_lcm
.data_utils
.vnfr
import (
91 get_volumes_from_instantiation_params
,
93 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
94 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
95 from n2vc
.definitions
import RelationEndpoint
96 from n2vc
.k8s_helm_conn
import K8sHelmConnector
97 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
98 from n2vc
.k8s_juju_conn
import K8sJujuConnector
100 from osm_common
.dbbase
import DbException
101 from osm_common
.fsbase
import FsException
103 from osm_lcm
.data_utils
.database
.database
import Database
104 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
106 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
107 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
109 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
110 from osm_lcm
.osm_config
import OsmConfigBuilder
111 from osm_lcm
.prometheus
import parse_job
113 from copy
import copy
, deepcopy
114 from time
import time
115 from uuid
import uuid4
117 from random
import randint
119 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
122 class NsLcm(LcmBase
):
123 timeout_vca_on_error
= (
125 ) # Time for charm from first time at blocked,error status to mark as failed
126 timeout_ns_deploy
= 2 * 3600 # default global timeout for deployment a ns
127 timeout_ns_terminate
= 1800 # default global timeout for un deployment a ns
128 timeout_ns_heal
= 1800 # default global timeout for un deployment a ns
129 timeout_charm_delete
= 10 * 60
130 timeout_primitive
= 30 * 60 # timeout for primitive execution
131 timeout_ns_update
= 30 * 60 # timeout for ns update
132 timeout_progress_primitive
= (
134 ) # timeout for some progress in a primitive execution
135 timeout_migrate
= 1800 # default global timeout for migrating vnfs
136 timeout_operate
= 1800 # default global timeout for migrating vnfs
137 timeout_verticalscale
= 1800 # default global timeout for Vertical Sclaing
138 SUBOPERATION_STATUS_NOT_FOUND
= -1
139 SUBOPERATION_STATUS_NEW
= -2
140 SUBOPERATION_STATUS_SKIP
= -3
141 task_name_deploy_vca
= "Deploying VCA"
143 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
145 Init, Connect to database, filesystem storage, and messaging
146 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
149 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
151 self
.db
= Database().instance
.db
152 self
.fs
= Filesystem().instance
.fs
154 self
.lcm_tasks
= lcm_tasks
155 self
.timeout
= config
["timeout"]
156 self
.ro_config
= config
["ro_config"]
157 self
.ng_ro
= config
["ro_config"].get("ng")
158 self
.vca_config
= config
["VCA"].copy()
160 # create N2VC connector
161 self
.n2vc
= N2VCJujuConnector(
164 on_update_db
=self
._on
_update
_n
2vc
_db
,
169 self
.conn_helm_ee
= LCMHelmConn(
172 vca_config
=self
.vca_config
,
173 on_update_db
=self
._on
_update
_n
2vc
_db
,
176 self
.k8sclusterhelm2
= K8sHelmConnector(
177 kubectl_command
=self
.vca_config
.get("kubectlpath"),
178 helm_command
=self
.vca_config
.get("helmpath"),
185 self
.k8sclusterhelm3
= K8sHelm3Connector(
186 kubectl_command
=self
.vca_config
.get("kubectlpath"),
187 helm_command
=self
.vca_config
.get("helm3path"),
194 self
.k8sclusterjuju
= K8sJujuConnector(
195 kubectl_command
=self
.vca_config
.get("kubectlpath"),
196 juju_command
=self
.vca_config
.get("jujupath"),
199 on_update_db
=self
._on
_update
_k
8s
_db
,
204 self
.k8scluster_map
= {
205 "helm-chart": self
.k8sclusterhelm2
,
206 "helm-chart-v3": self
.k8sclusterhelm3
,
207 "chart": self
.k8sclusterhelm3
,
208 "juju-bundle": self
.k8sclusterjuju
,
209 "juju": self
.k8sclusterjuju
,
213 "lxc_proxy_charm": self
.n2vc
,
214 "native_charm": self
.n2vc
,
215 "k8s_proxy_charm": self
.n2vc
,
216 "helm": self
.conn_helm_ee
,
217 "helm-v3": self
.conn_helm_ee
,
221 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
)
223 self
.op_status_map
= {
224 "instantiation": self
.RO
.status
,
225 "termination": self
.RO
.status
,
226 "migrate": self
.RO
.status
,
227 "healing": self
.RO
.recreate_status
,
228 "verticalscale": self
.RO
.status
,
229 "start_stop_rebuild": self
.RO
.status
,
233 def increment_ip_mac(ip_mac
, vm_index
=1):
234 if not isinstance(ip_mac
, str):
237 # try with ipv4 look for last dot
238 i
= ip_mac
.rfind(".")
241 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
242 # try with ipv6 or mac look for last colon. Operate in hex
243 i
= ip_mac
.rfind(":")
246 # format in hex, len can be 2 for mac or 4 for ipv6
247 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
248 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
254 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
256 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
259 # TODO filter RO descriptor fields...
263 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
264 db_dict
["deploymentStatus"] = ro_descriptor
265 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
267 except Exception as e
:
269 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id
, e
)
272 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
274 # remove last dot from path (if exists)
275 if path
.endswith("."):
278 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
279 # .format(table, filter, path, updated_data))
282 nsr_id
= filter.get("_id")
284 # read ns record from database
285 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
286 current_ns_status
= nsr
.get("nsState")
288 # get vca status for NS
289 status_dict
= await self
.n2vc
.get_status(
290 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
295 db_dict
["vcaStatus"] = status_dict
297 # update configurationStatus for this VCA
299 vca_index
= int(path
[path
.rfind(".") + 1 :])
302 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
304 vca_status
= vca_list
[vca_index
].get("status")
306 configuration_status_list
= nsr
.get("configurationStatus")
307 config_status
= configuration_status_list
[vca_index
].get("status")
309 if config_status
== "BROKEN" and vca_status
!= "failed":
310 db_dict
["configurationStatus"][vca_index
] = "READY"
311 elif config_status
!= "BROKEN" and vca_status
== "failed":
312 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
313 except Exception as e
:
314 # not update configurationStatus
315 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
317 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
318 # if nsState = 'DEGRADED' check if all is OK
320 if current_ns_status
in ("READY", "DEGRADED"):
321 error_description
= ""
323 if status_dict
.get("machines"):
324 for machine_id
in status_dict
.get("machines"):
325 machine
= status_dict
.get("machines").get(machine_id
)
326 # check machine agent-status
327 if machine
.get("agent-status"):
328 s
= machine
.get("agent-status").get("status")
331 error_description
+= (
332 "machine {} agent-status={} ; ".format(
336 # check machine instance status
337 if machine
.get("instance-status"):
338 s
= machine
.get("instance-status").get("status")
341 error_description
+= (
342 "machine {} instance-status={} ; ".format(
347 if status_dict
.get("applications"):
348 for app_id
in status_dict
.get("applications"):
349 app
= status_dict
.get("applications").get(app_id
)
350 # check application status
351 if app
.get("status"):
352 s
= app
.get("status").get("status")
355 error_description
+= (
356 "application {} status={} ; ".format(app_id
, s
)
359 if error_description
:
360 db_dict
["errorDescription"] = error_description
361 if current_ns_status
== "READY" and is_degraded
:
362 db_dict
["nsState"] = "DEGRADED"
363 if current_ns_status
== "DEGRADED" and not is_degraded
:
364 db_dict
["nsState"] = "READY"
367 self
.update_db_2("nsrs", nsr_id
, db_dict
)
369 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
371 except Exception as e
:
372 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
374 async def _on_update_k8s_db(
375 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None, cluster_type
="juju"
378 Updating vca status in NSR record
379 :param cluster_uuid: UUID of a k8s cluster
380 :param kdu_instance: The unique name of the KDU instance
381 :param filter: To get nsr_id
382 :cluster_type: The cluster type (juju, k8s)
386 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
387 # .format(cluster_uuid, kdu_instance, filter))
389 nsr_id
= filter.get("_id")
391 vca_status
= await self
.k8scluster_map
[cluster_type
].status_kdu(
392 cluster_uuid
=cluster_uuid
,
393 kdu_instance
=kdu_instance
,
395 complete_status
=True,
401 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
404 f
"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
408 self
.update_db_2("nsrs", nsr_id
, db_dict
)
409 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
411 except Exception as e
:
412 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
415 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
417 env
= Environment(undefined
=StrictUndefined
, autoescape
=True)
418 template
= env
.from_string(cloud_init_text
)
419 return template
.render(additional_params
or {})
420 except UndefinedError
as e
:
422 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
423 "file, must be provided in the instantiation parameters inside the "
424 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
426 except (TemplateError
, TemplateNotFound
) as e
:
428 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
433 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
434 cloud_init_content
= cloud_init_file
= None
436 if vdu
.get("cloud-init-file"):
437 base_folder
= vnfd
["_admin"]["storage"]
438 if base_folder
["pkg-dir"]:
439 cloud_init_file
= "{}/{}/cloud_init/{}".format(
440 base_folder
["folder"],
441 base_folder
["pkg-dir"],
442 vdu
["cloud-init-file"],
445 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
446 base_folder
["folder"],
447 vdu
["cloud-init-file"],
449 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
450 cloud_init_content
= ci_file
.read()
451 elif vdu
.get("cloud-init"):
452 cloud_init_content
= vdu
["cloud-init"]
454 return cloud_init_content
455 except FsException
as e
:
457 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
458 vnfd
["id"], vdu
["id"], cloud_init_file
, e
462 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
464 (vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]), {}
466 additional_params
= vdur
.get("additionalParams")
467 return parse_yaml_strings(additional_params
)
469 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
471 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
472 :param vnfd: input vnfd
473 :param new_id: overrides vnf id if provided
474 :param additionalParams: Instantiation params for VNFs provided
475 :param nsrId: Id of the NSR
476 :return: copy of vnfd
478 vnfd_RO
= deepcopy(vnfd
)
479 # remove unused by RO configuration, monitoring, scaling and internal keys
480 vnfd_RO
.pop("_id", None)
481 vnfd_RO
.pop("_admin", None)
482 vnfd_RO
.pop("monitoring-param", None)
483 vnfd_RO
.pop("scaling-group-descriptor", None)
484 vnfd_RO
.pop("kdu", None)
485 vnfd_RO
.pop("k8s-cluster", None)
487 vnfd_RO
["id"] = new_id
489 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
490 for vdu
in get_iterable(vnfd_RO
, "vdu"):
491 vdu
.pop("cloud-init-file", None)
492 vdu
.pop("cloud-init", None)
496 def ip_profile_2_RO(ip_profile
):
497 RO_ip_profile
= deepcopy(ip_profile
)
498 if "dns-server" in RO_ip_profile
:
499 if isinstance(RO_ip_profile
["dns-server"], list):
500 RO_ip_profile
["dns-address"] = []
501 for ds
in RO_ip_profile
.pop("dns-server"):
502 RO_ip_profile
["dns-address"].append(ds
["address"])
504 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
505 if RO_ip_profile
.get("ip-version") == "ipv4":
506 RO_ip_profile
["ip-version"] = "IPv4"
507 if RO_ip_profile
.get("ip-version") == "ipv6":
508 RO_ip_profile
["ip-version"] = "IPv6"
509 if "dhcp-params" in RO_ip_profile
:
510 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
513 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
514 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
515 if db_vim
["_admin"]["operationalState"] != "ENABLED":
517 "VIM={} is not available. operationalState={}".format(
518 vim_account
, db_vim
["_admin"]["operationalState"]
521 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
524 def get_ro_wim_id_for_wim_account(self
, wim_account
):
525 if isinstance(wim_account
, str):
526 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
527 if db_wim
["_admin"]["operationalState"] != "ENABLED":
529 "WIM={} is not available. operationalState={}".format(
530 wim_account
, db_wim
["_admin"]["operationalState"]
533 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
538 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
540 db_vdu_push_list
= []
542 db_update
= {"_admin.modified": time()}
544 for vdu_id
, vdu_count
in vdu_create
.items():
548 for vdur
in reversed(db_vnfr
["vdur"])
549 if vdur
["vdu-id-ref"] == vdu_id
554 # Read the template saved in the db:
556 "No vdur in the database. Using the vdur-template to scale"
558 vdur_template
= db_vnfr
.get("vdur-template")
559 if not vdur_template
:
561 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
565 vdur
= vdur_template
[0]
566 # Delete a template from the database after using it
569 {"_id": db_vnfr
["_id"]},
571 pull
={"vdur-template": {"_id": vdur
["_id"]}},
573 for count
in range(vdu_count
):
574 vdur_copy
= deepcopy(vdur
)
575 vdur_copy
["status"] = "BUILD"
576 vdur_copy
["status-detailed"] = None
577 vdur_copy
["ip-address"] = None
578 vdur_copy
["_id"] = str(uuid4())
579 vdur_copy
["count-index"] += count
+ 1
580 vdur_copy
["id"] = "{}-{}".format(
581 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
583 vdur_copy
.pop("vim_info", None)
584 for iface
in vdur_copy
["interfaces"]:
585 if iface
.get("fixed-ip"):
586 iface
["ip-address"] = self
.increment_ip_mac(
587 iface
["ip-address"], count
+ 1
590 iface
.pop("ip-address", None)
591 if iface
.get("fixed-mac"):
592 iface
["mac-address"] = self
.increment_ip_mac(
593 iface
["mac-address"], count
+ 1
596 iface
.pop("mac-address", None)
600 ) # only first vdu can be managment of vnf
601 db_vdu_push_list
.append(vdur_copy
)
602 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
604 if len(db_vnfr
["vdur"]) == 1:
605 # The scale will move to 0 instances
607 "Scaling to 0 !, creating the template with the last vdur"
609 template_vdur
= [db_vnfr
["vdur"][0]]
610 for vdu_id
, vdu_count
in vdu_delete
.items():
612 indexes_to_delete
= [
614 for iv
in enumerate(db_vnfr
["vdur"])
615 if iv
[1]["vdu-id-ref"] == vdu_id
619 "vdur.{}.status".format(i
): "DELETING"
620 for i
in indexes_to_delete
[-vdu_count
:]
624 # it must be deleted one by one because common.db does not allow otherwise
627 for v
in reversed(db_vnfr
["vdur"])
628 if v
["vdu-id-ref"] == vdu_id
630 for vdu
in vdus_to_delete
[:vdu_count
]:
633 {"_id": db_vnfr
["_id"]},
635 pull
={"vdur": {"_id": vdu
["_id"]}},
639 db_push
["vdur"] = db_vdu_push_list
641 db_push
["vdur-template"] = template_vdur
644 db_vnfr
["vdur-template"] = template_vdur
645 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
646 # modify passed dictionary db_vnfr
647 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
648 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
650 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
652 Updates database nsr with the RO info for the created vld
653 :param ns_update_nsr: dictionary to be filled with the updated info
654 :param db_nsr: content of db_nsr. This is also modified
655 :param nsr_desc_RO: nsr descriptor from RO
656 :return: Nothing, LcmException is raised on errors
659 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
660 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
661 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
663 vld
["vim-id"] = net_RO
.get("vim_net_id")
664 vld
["name"] = net_RO
.get("vim_name")
665 vld
["status"] = net_RO
.get("status")
666 vld
["status-detailed"] = net_RO
.get("error_msg")
667 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
671 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
674 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
676 for db_vnfr
in db_vnfrs
.values():
677 vnfr_update
= {"status": "ERROR"}
678 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
679 if "status" not in vdur
:
680 vdur
["status"] = "ERROR"
681 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
683 vdur
["status-detailed"] = str(error_text
)
685 "vdur.{}.status-detailed".format(vdu_index
)
687 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
688 except DbException
as e
:
689 self
.logger
.error("Cannot update vnf. {}".format(e
))
691 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
693 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
694 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
695 :param nsr_desc_RO: nsr descriptor from RO
696 :return: Nothing, LcmException is raised on errors
698 for vnf_index
, db_vnfr
in db_vnfrs
.items():
699 for vnf_RO
in nsr_desc_RO
["vnfs"]:
700 if vnf_RO
["member_vnf_index"] != vnf_index
:
703 if vnf_RO
.get("ip_address"):
704 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
[
707 elif not db_vnfr
.get("ip-address"):
708 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
709 raise LcmExceptionNoMgmtIP(
710 "ns member_vnf_index '{}' has no IP address".format(
715 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
716 vdur_RO_count_index
= 0
717 if vdur
.get("pdu-type"):
719 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
720 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
722 if vdur
["count-index"] != vdur_RO_count_index
:
723 vdur_RO_count_index
+= 1
725 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
726 if vdur_RO
.get("ip_address"):
727 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
729 vdur
["ip-address"] = None
730 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
731 vdur
["name"] = vdur_RO
.get("vim_name")
732 vdur
["status"] = vdur_RO
.get("status")
733 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
734 for ifacer
in get_iterable(vdur
, "interfaces"):
735 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
736 if ifacer
["name"] == interface_RO
.get("internal_name"):
737 ifacer
["ip-address"] = interface_RO
.get(
740 ifacer
["mac-address"] = interface_RO
.get(
746 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
747 "from VIM info".format(
748 vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]
751 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
755 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
757 vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]
761 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
762 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
763 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
765 vld
["vim-id"] = net_RO
.get("vim_net_id")
766 vld
["name"] = net_RO
.get("vim_name")
767 vld
["status"] = net_RO
.get("status")
768 vld
["status-detailed"] = net_RO
.get("error_msg")
769 vnfr_update
["vld.{}".format(vld_index
)] = vld
773 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
778 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
783 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
788 def _get_ns_config_info(self
, nsr_id
):
790 Generates a mapping between vnf,vdu elements and the N2VC id
791 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
792 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
793 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
794 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
796 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
797 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
799 ns_config_info
= {"osm-config-mapping": mapping
}
800 for vca
in vca_deployed_list
:
801 if not vca
["member-vnf-index"]:
803 if not vca
["vdu_id"]:
804 mapping
[vca
["member-vnf-index"]] = vca
["application"]
808 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
810 ] = vca
["application"]
811 return ns_config_info
813 async def _instantiate_ng_ro(
830 def get_vim_account(vim_account_id
):
832 if vim_account_id
in db_vims
:
833 return db_vims
[vim_account_id
]
834 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
835 db_vims
[vim_account_id
] = db_vim
838 # modify target_vld info with instantiation parameters
839 def parse_vld_instantiation_params(
840 target_vim
, target_vld
, vld_params
, target_sdn
842 if vld_params
.get("ip-profile"):
843 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
[
846 if vld_params
.get("provider-network"):
847 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
850 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
851 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
854 if vld_params
.get("wimAccountId"):
855 target_wim
= "wim:{}".format(vld_params
["wimAccountId"])
856 target_vld
["vim_info"][target_wim
] = {}
857 for param
in ("vim-network-name", "vim-network-id"):
858 if vld_params
.get(param
):
859 if isinstance(vld_params
[param
], dict):
860 for vim
, vim_net
in vld_params
[param
].items():
861 other_target_vim
= "vim:" + vim
863 target_vld
["vim_info"],
864 (other_target_vim
, param
.replace("-", "_")),
867 else: # isinstance str
868 target_vld
["vim_info"][target_vim
][
869 param
.replace("-", "_")
870 ] = vld_params
[param
]
871 if vld_params
.get("common_id"):
872 target_vld
["common_id"] = vld_params
.get("common_id")
874 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
875 def update_ns_vld_target(target
, ns_params
):
876 for vnf_params
in ns_params
.get("vnf", ()):
877 if vnf_params
.get("vimAccountId"):
881 for vnfr
in db_vnfrs
.values()
882 if vnf_params
["member-vnf-index"]
883 == vnfr
["member-vnf-index-ref"]
887 vdur
= next((vdur
for vdur
in target_vnf
.get("vdur", ())), None)
888 for a_index
, a_vld
in enumerate(target
["ns"]["vld"]):
889 target_vld
= find_in_list(
890 get_iterable(vdur
, "interfaces"),
891 lambda iface
: iface
.get("ns-vld-id") == a_vld
["name"],
894 vld_params
= find_in_list(
895 get_iterable(ns_params
, "vld"),
896 lambda v_vld
: v_vld
["name"] in (a_vld
["name"], a_vld
["id"]),
900 if vnf_params
.get("vimAccountId") not in a_vld
.get(
903 target_vim_network_list
= [
904 v
for _
, v
in a_vld
.get("vim_info").items()
906 target_vim_network_name
= next(
908 item
.get("vim_network_name", "")
909 for item
in target_vim_network_list
914 target
["ns"]["vld"][a_index
].get("vim_info").update(
916 "vim:{}".format(vnf_params
["vimAccountId"]): {
917 "vim_network_name": target_vim_network_name
,
923 for param
in ("vim-network-name", "vim-network-id"):
924 if vld_params
.get(param
) and isinstance(
925 vld_params
[param
], dict
927 for vim
, vim_net
in vld_params
[
930 other_target_vim
= "vim:" + vim
932 target
["ns"]["vld"][a_index
].get(
937 param
.replace("-", "_"),
942 nslcmop_id
= db_nslcmop
["_id"]
944 "name": db_nsr
["name"],
947 "image": deepcopy(db_nsr
["image"]),
948 "flavor": deepcopy(db_nsr
["flavor"]),
949 "action_id": nslcmop_id
,
950 "cloud_init_content": {},
952 for image
in target
["image"]:
953 image
["vim_info"] = {}
954 for flavor
in target
["flavor"]:
955 flavor
["vim_info"] = {}
956 if db_nsr
.get("affinity-or-anti-affinity-group"):
957 target
["affinity-or-anti-affinity-group"] = deepcopy(
958 db_nsr
["affinity-or-anti-affinity-group"]
960 for affinity_or_anti_affinity_group
in target
[
961 "affinity-or-anti-affinity-group"
963 affinity_or_anti_affinity_group
["vim_info"] = {}
965 if db_nslcmop
.get("lcmOperationType") != "instantiate":
966 # get parameters of instantiation:
967 db_nslcmop_instantiate
= self
.db
.get_list(
970 "nsInstanceId": db_nslcmop
["nsInstanceId"],
971 "lcmOperationType": "instantiate",
974 ns_params
= db_nslcmop_instantiate
.get("operationParams")
976 ns_params
= db_nslcmop
.get("operationParams")
977 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
978 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
981 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
982 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
986 "mgmt-network": vld
.get("mgmt-network", False),
987 "type": vld
.get("type"),
990 "vim_network_name": vld
.get("vim-network-name"),
991 "vim_account_id": ns_params
["vimAccountId"],
995 # check if this network needs SDN assist
996 if vld
.get("pci-interfaces"):
997 db_vim
= get_vim_account(ns_params
["vimAccountId"])
998 sdnc_id
= db_vim
["config"].get("sdn-controller")
1000 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
1001 target_sdn
= "sdn:{}".format(sdnc_id
)
1002 target_vld
["vim_info"][target_sdn
] = {
1004 "target_vim": target_vim
,
1006 "type": vld
.get("type"),
1009 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
1010 for nsd_vnf_profile
in nsd_vnf_profiles
:
1011 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
1012 if cp
["virtual-link-profile-id"] == vld
["id"]:
1014 "member_vnf:{}.{}".format(
1015 cp
["constituent-cpd-id"][0][
1016 "constituent-base-element-id"
1018 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
1020 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
1022 # check at nsd descriptor, if there is an ip-profile
1024 nsd_vlp
= find_in_list(
1025 get_virtual_link_profiles(nsd
),
1026 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
1031 and nsd_vlp
.get("virtual-link-protocol-data")
1032 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1034 ip_profile_source_data
= nsd_vlp
["virtual-link-protocol-data"][
1037 ip_profile_dest_data
= {}
1038 if "ip-version" in ip_profile_source_data
:
1039 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1042 if "cidr" in ip_profile_source_data
:
1043 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1046 if "gateway-ip" in ip_profile_source_data
:
1047 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
[
1050 if "dhcp-enabled" in ip_profile_source_data
:
1051 ip_profile_dest_data
["dhcp-params"] = {
1052 "enabled": ip_profile_source_data
["dhcp-enabled"]
1054 vld_params
["ip-profile"] = ip_profile_dest_data
1056 # update vld_params with instantiation params
1057 vld_instantiation_params
= find_in_list(
1058 get_iterable(ns_params
, "vld"),
1059 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
1061 if vld_instantiation_params
:
1062 vld_params
.update(vld_instantiation_params
)
1063 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
1064 target
["ns"]["vld"].append(target_vld
)
1065 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1066 update_ns_vld_target(target
, ns_params
)
1068 for vnfr
in db_vnfrs
.values():
1069 vnfd
= find_in_list(
1070 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
1072 vnf_params
= find_in_list(
1073 get_iterable(ns_params
, "vnf"),
1074 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
1076 target_vnf
= deepcopy(vnfr
)
1077 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
1078 for vld
in target_vnf
.get("vld", ()):
1079 # check if connected to a ns.vld, to fill target'
1080 vnf_cp
= find_in_list(
1081 vnfd
.get("int-virtual-link-desc", ()),
1082 lambda cpd
: cpd
.get("id") == vld
["id"],
1085 ns_cp
= "member_vnf:{}.{}".format(
1086 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
1088 if cp2target
.get(ns_cp
):
1089 vld
["target"] = cp2target
[ns_cp
]
1092 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
1094 # check if this network needs SDN assist
1096 if vld
.get("pci-interfaces"):
1097 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1098 sdnc_id
= db_vim
["config"].get("sdn-controller")
1100 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
1101 target_sdn
= "sdn:{}".format(sdnc_id
)
1102 vld
["vim_info"][target_sdn
] = {
1104 "target_vim": target_vim
,
1106 "type": vld
.get("type"),
1109 # check at vnfd descriptor, if there is an ip-profile
1111 vnfd_vlp
= find_in_list(
1112 get_virtual_link_profiles(vnfd
),
1113 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
1117 and vnfd_vlp
.get("virtual-link-protocol-data")
1118 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1120 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"][
1123 ip_profile_dest_data
= {}
1124 if "ip-version" in ip_profile_source_data
:
1125 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1128 if "cidr" in ip_profile_source_data
:
1129 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1132 if "gateway-ip" in ip_profile_source_data
:
1133 ip_profile_dest_data
[
1135 ] = ip_profile_source_data
["gateway-ip"]
1136 if "dhcp-enabled" in ip_profile_source_data
:
1137 ip_profile_dest_data
["dhcp-params"] = {
1138 "enabled": ip_profile_source_data
["dhcp-enabled"]
1141 vld_params
["ip-profile"] = ip_profile_dest_data
1142 # update vld_params with instantiation params
1144 vld_instantiation_params
= find_in_list(
1145 get_iterable(vnf_params
, "internal-vld"),
1146 lambda i_vld
: i_vld
["name"] == vld
["id"],
1148 if vld_instantiation_params
:
1149 vld_params
.update(vld_instantiation_params
)
1150 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
1153 for vdur
in target_vnf
.get("vdur", ()):
1154 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
1155 continue # This vdu must not be created
1156 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
1158 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
1161 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
1162 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
1165 and vdu_configuration
.get("config-access")
1166 and vdu_configuration
.get("config-access").get("ssh-access")
1168 vdur
["ssh-keys"] = ssh_keys_all
1169 vdur
["ssh-access-required"] = vdu_configuration
[
1171 ]["ssh-access"]["required"]
1174 and vnf_configuration
.get("config-access")
1175 and vnf_configuration
.get("config-access").get("ssh-access")
1176 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
1178 vdur
["ssh-keys"] = ssh_keys_all
1179 vdur
["ssh-access-required"] = vnf_configuration
[
1181 ]["ssh-access"]["required"]
1182 elif ssh_keys_instantiation
and find_in_list(
1183 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
1185 vdur
["ssh-keys"] = ssh_keys_instantiation
1187 self
.logger
.debug("NS > vdur > {}".format(vdur
))
1189 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1191 if vdud
.get("cloud-init-file"):
1192 vdur
["cloud-init"] = "{}:file:{}".format(
1193 vnfd
["_id"], vdud
.get("cloud-init-file")
1195 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1196 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1197 base_folder
= vnfd
["_admin"]["storage"]
1198 if base_folder
["pkg-dir"]:
1199 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1200 base_folder
["folder"],
1201 base_folder
["pkg-dir"],
1202 vdud
.get("cloud-init-file"),
1205 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
1206 base_folder
["folder"],
1207 vdud
.get("cloud-init-file"),
1209 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1210 target
["cloud_init_content"][
1213 elif vdud
.get("cloud-init"):
1214 vdur
["cloud-init"] = "{}:vdu:{}".format(
1215 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1217 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1218 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1221 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1222 deploy_params_vdu
= self
._format
_additional
_params
(
1223 vdur
.get("additionalParams") or {}
1225 deploy_params_vdu
["OSM"] = get_osm_params(
1226 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1228 vdur
["additionalParams"] = deploy_params_vdu
1231 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1232 if target_vim
not in ns_flavor
["vim_info"]:
1233 ns_flavor
["vim_info"][target_vim
] = {}
1236 # in case alternative images are provided we must check if they should be applied
1237 # for the vim_type, modify the vim_type taking into account
1238 ns_image_id
= int(vdur
["ns-image-id"])
1239 if vdur
.get("alt-image-ids"):
1240 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1241 vim_type
= db_vim
["vim_type"]
1242 for alt_image_id
in vdur
.get("alt-image-ids"):
1243 ns_alt_image
= target
["image"][int(alt_image_id
)]
1244 if vim_type
== ns_alt_image
.get("vim-type"):
1245 # must use alternative image
1247 "use alternative image id: {}".format(alt_image_id
)
1249 ns_image_id
= alt_image_id
1250 vdur
["ns-image-id"] = ns_image_id
1252 ns_image
= target
["image"][int(ns_image_id
)]
1253 if target_vim
not in ns_image
["vim_info"]:
1254 ns_image
["vim_info"][target_vim
] = {}
1257 if vdur
.get("affinity-or-anti-affinity-group-id"):
1258 for ags_id
in vdur
["affinity-or-anti-affinity-group-id"]:
1259 ns_ags
= target
["affinity-or-anti-affinity-group"][int(ags_id
)]
1260 if target_vim
not in ns_ags
["vim_info"]:
1261 ns_ags
["vim_info"][target_vim
] = {}
1263 vdur
["vim_info"] = {target_vim
: {}}
1264 # instantiation parameters
1266 vdu_instantiation_params
= find_in_list(
1267 get_iterable(vnf_params
, "vdu"),
1268 lambda i_vdu
: i_vdu
["id"] == vdud
["id"],
1270 if vdu_instantiation_params
:
1271 # Parse the vdu_volumes from the instantiation params
1272 vdu_volumes
= get_volumes_from_instantiation_params(
1273 vdu_instantiation_params
, vdud
1275 vdur
["additionalParams"]["OSM"]["vdu_volumes"] = vdu_volumes
1276 vdur_list
.append(vdur
)
1277 target_vnf
["vdur"] = vdur_list
1278 target
["vnf"].append(target_vnf
)
1280 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
1281 desc
= await self
.RO
.deploy(nsr_id
, target
)
1282 self
.logger
.debug("RO return > {}".format(desc
))
1283 action_id
= desc
["action_id"]
1284 await self
._wait
_ng
_ro
(
1285 nsr_id
, action_id
, nslcmop_id
, start_deploy
, timeout_ns_deploy
, stage
,
1286 operation
="instantiation"
1291 "_admin.deployed.RO.operational-status": "running",
1292 "detailed-status": " ".join(stage
),
1294 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1295 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1296 self
._write
_op
_status
(nslcmop_id
, stage
)
1298 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1302 async def _wait_ng_ro(
1312 detailed_status_old
= None
1314 start_time
= start_time
or time()
1315 while time() <= start_time
+ timeout
:
1316 desc_status
= await self
.op_status_map
[operation
](nsr_id
, action_id
)
1317 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1318 if desc_status
["status"] == "FAILED":
1319 raise NgRoException(desc_status
["details"])
1320 elif desc_status
["status"] == "BUILD":
1322 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1323 elif desc_status
["status"] == "DONE":
1325 stage
[2] = "Deployed at VIM"
1328 assert False, "ROclient.check_ns_status returns unknown {}".format(
1329 desc_status
["status"]
1331 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1332 detailed_status_old
= stage
[2]
1333 db_nsr_update
["detailed-status"] = " ".join(stage
)
1334 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1335 self
._write
_op
_status
(nslcmop_id
, stage
)
1336 await asyncio
.sleep(15, loop
=self
.loop
)
1337 else: # timeout_ns_deploy
1338 raise NgRoException("Timeout waiting ns to deploy")
1340 async def _terminate_ng_ro(
1341 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1346 start_deploy
= time()
1353 "action_id": nslcmop_id
,
1355 desc
= await self
.RO
.deploy(nsr_id
, target
)
1356 action_id
= desc
["action_id"]
1357 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1358 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1361 + "ns terminate action at RO. action_id={}".format(action_id
)
1365 delete_timeout
= 20 * 60 # 20 minutes
1366 await self
._wait
_ng
_ro
(
1367 nsr_id
, action_id
, nslcmop_id
, start_deploy
, delete_timeout
, stage
,
1368 operation
="termination"
1371 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1372 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1374 await self
.RO
.delete(nsr_id
)
1375 except Exception as e
:
1376 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
1377 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1378 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1379 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1381 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1383 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
1384 failed_detail
.append("delete conflict: {}".format(e
))
1387 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1390 failed_detail
.append("delete error: {}".format(e
))
1393 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1397 stage
[2] = "Error deleting from VIM"
1399 stage
[2] = "Deleted from VIM"
1400 db_nsr_update
["detailed-status"] = " ".join(stage
)
1401 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1402 self
._write
_op
_status
(nslcmop_id
, stage
)
1405 raise LcmException("; ".join(failed_detail
))
1408 async def instantiate_RO(
1422 :param logging_text: preffix text to use at logging
1423 :param nsr_id: nsr identity
1424 :param nsd: database content of ns descriptor
1425 :param db_nsr: database content of ns record
1426 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1428 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1429 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1430 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1431 :return: None or exception
1434 start_deploy
= time()
1435 ns_params
= db_nslcmop
.get("operationParams")
1436 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1437 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1439 timeout_ns_deploy
= self
.timeout
.get(
1440 "ns_deploy", self
.timeout_ns_deploy
1443 # Check for and optionally request placement optimization. Database will be updated if placement activated
1444 stage
[2] = "Waiting for Placement."
1445 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1446 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1447 for vnfr
in db_vnfrs
.values():
1448 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1451 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1453 return await self
._instantiate
_ng
_ro
(
1466 except Exception as e
:
1467 stage
[2] = "ERROR deploying at VIM"
1468 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1470 "Error deploying at VIM {}".format(e
),
1471 exc_info
=not isinstance(
1474 ROclient
.ROClientException
,
1483 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1485 Wait for kdu to be up, get ip address
1486 :param logging_text: prefix use for logging
1490 :return: IP address, K8s services
1493 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1496 while nb_tries
< 360:
1497 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1501 for x
in get_iterable(db_vnfr
, "kdur")
1502 if x
.get("kdu-name") == kdu_name
1508 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1510 if kdur
.get("status"):
1511 if kdur
["status"] in ("READY", "ENABLED"):
1512 return kdur
.get("ip-address"), kdur
.get("services")
1515 "target KDU={} is in error state".format(kdu_name
)
1518 await asyncio
.sleep(10, loop
=self
.loop
)
1520 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1522 async def wait_vm_up_insert_key_ro(
1523 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1526 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1527 :param logging_text: prefix use for logging
1532 :param pub_key: public ssh key to inject, None to skip
1533 :param user: user to apply the public ssh key
1537 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1541 target_vdu_id
= None
1547 if ro_retries
>= 360: # 1 hour
1549 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1552 await asyncio
.sleep(10, loop
=self
.loop
)
1555 if not target_vdu_id
:
1556 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1558 if not vdu_id
: # for the VNF case
1559 if db_vnfr
.get("status") == "ERROR":
1561 "Cannot inject ssh-key because target VNF is in error state"
1563 ip_address
= db_vnfr
.get("ip-address")
1569 for x
in get_iterable(db_vnfr
, "vdur")
1570 if x
.get("ip-address") == ip_address
1578 for x
in get_iterable(db_vnfr
, "vdur")
1579 if x
.get("vdu-id-ref") == vdu_id
1580 and x
.get("count-index") == vdu_index
1586 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1587 ): # If only one, this should be the target vdu
1588 vdur
= db_vnfr
["vdur"][0]
1591 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1592 vnfr_id
, vdu_id
, vdu_index
1595 # New generation RO stores information at "vim_info"
1598 if vdur
.get("vim_info"):
1600 t
for t
in vdur
["vim_info"]
1601 ) # there should be only one key
1602 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1604 vdur
.get("pdu-type")
1605 or vdur
.get("status") == "ACTIVE"
1606 or ng_ro_status
== "ACTIVE"
1608 ip_address
= vdur
.get("ip-address")
1611 target_vdu_id
= vdur
["vdu-id-ref"]
1612 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1614 "Cannot inject ssh-key because target VM is in error state"
1617 if not target_vdu_id
:
1620 # inject public key into machine
1621 if pub_key
and user
:
1622 self
.logger
.debug(logging_text
+ "Inserting RO key")
1623 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1624 if vdur
.get("pdu-type"):
1625 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1628 ro_vm_id
= "{}-{}".format(
1629 db_vnfr
["member-vnf-index-ref"], target_vdu_id
1630 ) # TODO add vdu_index
1634 "action": "inject_ssh_key",
1638 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1640 desc
= await self
.RO
.deploy(nsr_id
, target
)
1641 action_id
= desc
["action_id"]
1642 await self
._wait
_ng
_ro
(nsr_id
, action_id
, timeout
=600, operation
="instantiation")
1645 # wait until NS is deployed at RO
1647 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1648 ro_nsr_id
= deep_get(
1649 db_nsrs
, ("_admin", "deployed", "RO", "nsr_id")
1653 result_dict
= await self
.RO
.create_action(
1655 item_id_name
=ro_nsr_id
,
1657 "add_public_key": pub_key
,
1662 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1663 if not result_dict
or not isinstance(result_dict
, dict):
1665 "Unknown response from RO when injecting key"
1667 for result
in result_dict
.values():
1668 if result
.get("vim_result") == 200:
1671 raise ROclient
.ROClientException(
1672 "error injecting key: {}".format(
1673 result
.get("description")
1677 except NgRoException
as e
:
1679 "Reaching max tries injecting key. Error: {}".format(e
)
1681 except ROclient
.ROClientException
as e
:
1685 + "error injecting key: {}. Retrying until {} seconds".format(
1692 "Reaching max tries injecting key. Error: {}".format(e
)
1699 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1701 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1703 my_vca
= vca_deployed_list
[vca_index
]
1704 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1705 # vdu or kdu: no dependencies
1709 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1710 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1711 configuration_status_list
= db_nsr
["configurationStatus"]
1712 for index
, vca_deployed
in enumerate(configuration_status_list
):
1713 if index
== vca_index
:
1716 if not my_vca
.get("member-vnf-index") or (
1717 vca_deployed
.get("member-vnf-index")
1718 == my_vca
.get("member-vnf-index")
1720 internal_status
= configuration_status_list
[index
].get("status")
1721 if internal_status
== "READY":
1723 elif internal_status
== "BROKEN":
1725 "Configuration aborted because dependent charm/s has failed"
1730 # no dependencies, return
1732 await asyncio
.sleep(10)
1735 raise LcmException("Configuration aborted because dependent charm/s timeout")
1737 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1740 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1742 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1743 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1746 async def instantiate_N2VC(
1763 ee_config_descriptor
,
1765 nsr_id
= db_nsr
["_id"]
1766 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1767 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1768 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1769 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1771 "collection": "nsrs",
1772 "filter": {"_id": nsr_id
},
1773 "path": db_update_entry
,
1779 element_under_configuration
= nsr_id
1783 vnfr_id
= db_vnfr
["_id"]
1784 osm_config
["osm"]["vnf_id"] = vnfr_id
1786 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1788 if vca_type
== "native_charm":
1791 index_number
= vdu_index
or 0
1794 element_type
= "VNF"
1795 element_under_configuration
= vnfr_id
1796 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1798 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1799 element_type
= "VDU"
1800 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1801 osm_config
["osm"]["vdu_id"] = vdu_id
1803 namespace
+= ".{}".format(kdu_name
)
1804 element_type
= "KDU"
1805 element_under_configuration
= kdu_name
1806 osm_config
["osm"]["kdu_name"] = kdu_name
1809 if base_folder
["pkg-dir"]:
1810 artifact_path
= "{}/{}/{}/{}".format(
1811 base_folder
["folder"],
1812 base_folder
["pkg-dir"],
1815 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1820 artifact_path
= "{}/Scripts/{}/{}/".format(
1821 base_folder
["folder"],
1824 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1829 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1831 # get initial_config_primitive_list that applies to this element
1832 initial_config_primitive_list
= config_descriptor
.get(
1833 "initial-config-primitive"
1837 "Initial config primitive list > {}".format(
1838 initial_config_primitive_list
1842 # add config if not present for NS charm
1843 ee_descriptor_id
= ee_config_descriptor
.get("id")
1844 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1845 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1846 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1850 "Initial config primitive list #2 > {}".format(
1851 initial_config_primitive_list
1854 # n2vc_redesign STEP 3.1
1855 # find old ee_id if exists
1856 ee_id
= vca_deployed
.get("ee_id")
1858 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1859 # create or register execution environment in VCA
1860 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1862 self
._write
_configuration
_status
(
1864 vca_index
=vca_index
,
1866 element_under_configuration
=element_under_configuration
,
1867 element_type
=element_type
,
1870 step
= "create execution environment"
1871 self
.logger
.debug(logging_text
+ step
)
1875 if vca_type
== "k8s_proxy_charm":
1876 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1877 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1878 namespace
=namespace
,
1879 artifact_path
=artifact_path
,
1883 elif vca_type
== "helm" or vca_type
== "helm-v3":
1884 ee_id
, credentials
= await self
.vca_map
[
1886 ].create_execution_environment(
1887 namespace
=namespace
,
1891 artifact_path
=artifact_path
,
1895 ee_id
, credentials
= await self
.vca_map
[
1897 ].create_execution_environment(
1898 namespace
=namespace
,
1904 elif vca_type
== "native_charm":
1905 step
= "Waiting to VM being up and getting IP address"
1906 self
.logger
.debug(logging_text
+ step
)
1907 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1916 credentials
= {"hostname": rw_mgmt_ip
}
1918 username
= deep_get(
1919 config_descriptor
, ("config-access", "ssh-access", "default-user")
1921 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1922 # merged. Meanwhile let's get username from initial-config-primitive
1923 if not username
and initial_config_primitive_list
:
1924 for config_primitive
in initial_config_primitive_list
:
1925 for param
in config_primitive
.get("parameter", ()):
1926 if param
["name"] == "ssh-username":
1927 username
= param
["value"]
1931 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1932 "'config-access.ssh-access.default-user'"
1934 credentials
["username"] = username
1935 # n2vc_redesign STEP 3.2
1937 self
._write
_configuration
_status
(
1939 vca_index
=vca_index
,
1940 status
="REGISTERING",
1941 element_under_configuration
=element_under_configuration
,
1942 element_type
=element_type
,
1945 step
= "register execution environment {}".format(credentials
)
1946 self
.logger
.debug(logging_text
+ step
)
1947 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1948 credentials
=credentials
,
1949 namespace
=namespace
,
1954 # for compatibility with MON/POL modules, the need model and application name at database
1955 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1956 ee_id_parts
= ee_id
.split(".")
1957 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1958 if len(ee_id_parts
) >= 2:
1959 model_name
= ee_id_parts
[0]
1960 application_name
= ee_id_parts
[1]
1961 db_nsr_update
[db_update_entry
+ "model"] = model_name
1962 db_nsr_update
[db_update_entry
+ "application"] = application_name
1964 # n2vc_redesign STEP 3.3
1965 step
= "Install configuration Software"
1967 self
._write
_configuration
_status
(
1969 vca_index
=vca_index
,
1970 status
="INSTALLING SW",
1971 element_under_configuration
=element_under_configuration
,
1972 element_type
=element_type
,
1973 other_update
=db_nsr_update
,
1976 # TODO check if already done
1977 self
.logger
.debug(logging_text
+ step
)
1979 if vca_type
== "native_charm":
1980 config_primitive
= next(
1981 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
1984 if config_primitive
:
1985 config
= self
._map
_primitive
_params
(
1986 config_primitive
, {}, deploy_params
1989 if vca_type
== "lxc_proxy_charm":
1990 if element_type
== "NS":
1991 num_units
= db_nsr
.get("config-units") or 1
1992 elif element_type
== "VNF":
1993 num_units
= db_vnfr
.get("config-units") or 1
1994 elif element_type
== "VDU":
1995 for v
in db_vnfr
["vdur"]:
1996 if vdu_id
== v
["vdu-id-ref"]:
1997 num_units
= v
.get("config-units") or 1
1999 if vca_type
!= "k8s_proxy_charm":
2000 await self
.vca_map
[vca_type
].install_configuration_sw(
2002 artifact_path
=artifact_path
,
2005 num_units
=num_units
,
2010 # write in db flag of configuration_sw already installed
2012 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
2015 # add relations for this VCA (wait for other peers related with this VCA)
2016 await self
._add
_vca
_relations
(
2017 logging_text
=logging_text
,
2020 vca_index
=vca_index
,
2023 # if SSH access is required, then get execution environment SSH public
2024 # if native charm we have waited already to VM be UP
2025 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
2028 # self.logger.debug("get ssh key block")
2030 config_descriptor
, ("config-access", "ssh-access", "required")
2032 # self.logger.debug("ssh key needed")
2033 # Needed to inject a ssh key
2036 ("config-access", "ssh-access", "default-user"),
2038 step
= "Install configuration Software, getting public ssh key"
2039 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
2040 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
2043 step
= "Insert public key into VM user={} ssh_key={}".format(
2047 # self.logger.debug("no need to get ssh key")
2048 step
= "Waiting to VM being up and getting IP address"
2049 self
.logger
.debug(logging_text
+ step
)
2051 # default rw_mgmt_ip to None, avoiding the non definition of the variable
2054 # n2vc_redesign STEP 5.1
2055 # wait for RO (ip-address) Insert pub_key into VM
2058 rw_mgmt_ip
, services
= await self
.wait_kdu_up(
2059 logging_text
, nsr_id
, vnfr_id
, kdu_name
2061 vnfd
= self
.db
.get_one(
2063 {"_id": f
'{db_vnfr["vnfd-id"]}:{db_vnfr["revision"]}'},
2065 kdu
= get_kdu(vnfd
, kdu_name
)
2067 service
["name"] for service
in get_kdu_services(kdu
)
2069 exposed_services
= []
2070 for service
in services
:
2071 if any(s
in service
["name"] for s
in kdu_services
):
2072 exposed_services
.append(service
)
2073 await self
.vca_map
[vca_type
].exec_primitive(
2075 primitive_name
="config",
2077 "osm-config": json
.dumps(
2079 k8s
={"services": exposed_services
}
2086 # This verification is needed in order to avoid trying to add a public key
2087 # to a VM, when the VNF is a KNF (in the edge case where the user creates a VCA
2088 # for a KNF and not for its KDUs, the previous verification gives False, and the code
2089 # jumps to this block, meaning that there is the need to verify if the VNF is actually a VNF
2091 elif db_vnfr
.get('vdur'):
2092 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
2102 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
2104 # store rw_mgmt_ip in deploy params for later replacement
2105 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
2107 # n2vc_redesign STEP 6 Execute initial config primitive
2108 step
= "execute initial config primitive"
2110 # wait for dependent primitives execution (NS -> VNF -> VDU)
2111 if initial_config_primitive_list
:
2112 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
2114 # stage, in function of element type: vdu, kdu, vnf or ns
2115 my_vca
= vca_deployed_list
[vca_index
]
2116 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
2118 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
2119 elif my_vca
.get("member-vnf-index"):
2121 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
2124 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
2126 self
._write
_configuration
_status
(
2127 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
2130 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2132 check_if_terminated_needed
= True
2133 for initial_config_primitive
in initial_config_primitive_list
:
2134 # adding information on the vca_deployed if it is a NS execution environment
2135 if not vca_deployed
["member-vnf-index"]:
2136 deploy_params
["ns_config_info"] = json
.dumps(
2137 self
._get
_ns
_config
_info
(nsr_id
)
2139 # TODO check if already done
2140 primitive_params_
= self
._map
_primitive
_params
(
2141 initial_config_primitive
, {}, deploy_params
2144 step
= "execute primitive '{}' params '{}'".format(
2145 initial_config_primitive
["name"], primitive_params_
2147 self
.logger
.debug(logging_text
+ step
)
2148 await self
.vca_map
[vca_type
].exec_primitive(
2150 primitive_name
=initial_config_primitive
["name"],
2151 params_dict
=primitive_params_
,
2156 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2157 if check_if_terminated_needed
:
2158 if config_descriptor
.get("terminate-config-primitive"):
2160 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
2162 check_if_terminated_needed
= False
2164 # TODO register in database that primitive is done
2166 # STEP 7 Configure metrics
2167 if vca_type
== "helm" or vca_type
== "helm-v3":
2168 prometheus_jobs
= await self
.extract_prometheus_scrape_jobs(
2170 artifact_path
=artifact_path
,
2171 ee_config_descriptor
=ee_config_descriptor
,
2174 target_ip
=rw_mgmt_ip
,
2180 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
2183 for job
in prometheus_jobs
:
2186 {"job_name": job
["job_name"]},
2189 fail_on_empty
=False,
2192 step
= "instantiated at VCA"
2193 self
.logger
.debug(logging_text
+ step
)
2195 self
._write
_configuration
_status
(
2196 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
2199 except Exception as e
: # TODO not use Exception but N2VC exception
2200 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2202 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
2205 "Exception while {} : {}".format(step
, e
), exc_info
=True
2207 self
._write
_configuration
_status
(
2208 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2210 raise LcmException("{} {}".format(step
, e
)) from e
2212 def _write_ns_status(
2216 current_operation
: str,
2217 current_operation_id
: str,
2218 error_description
: str = None,
2219 error_detail
: str = None,
2220 other_update
: dict = None,
2223 Update db_nsr fields.
2226 :param current_operation:
2227 :param current_operation_id:
2228 :param error_description:
2229 :param error_detail:
2230 :param other_update: Other required changes at database if provided, will be cleared
2234 db_dict
= other_update
or {}
2237 ] = current_operation_id
# for backward compatibility
2238 db_dict
["_admin.current-operation"] = current_operation_id
2239 db_dict
["_admin.operation-type"] = (
2240 current_operation
if current_operation
!= "IDLE" else None
2242 db_dict
["currentOperation"] = current_operation
2243 db_dict
["currentOperationID"] = current_operation_id
2244 db_dict
["errorDescription"] = error_description
2245 db_dict
["errorDetail"] = error_detail
2248 db_dict
["nsState"] = ns_state
2249 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2250 except DbException
as e
:
2251 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2253 def _write_op_status(
2257 error_message
: str = None,
2258 queuePosition
: int = 0,
2259 operation_state
: str = None,
2260 other_update
: dict = None,
2263 db_dict
= other_update
or {}
2264 db_dict
["queuePosition"] = queuePosition
2265 if isinstance(stage
, list):
2266 db_dict
["stage"] = stage
[0]
2267 db_dict
["detailed-status"] = " ".join(stage
)
2268 elif stage
is not None:
2269 db_dict
["stage"] = str(stage
)
2271 if error_message
is not None:
2272 db_dict
["errorMessage"] = error_message
2273 if operation_state
is not None:
2274 db_dict
["operationState"] = operation_state
2275 db_dict
["statusEnteredTime"] = time()
2276 self
.update_db_2("nslcmops", op_id
, db_dict
)
2277 except DbException
as e
:
2279 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2282 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2284 nsr_id
= db_nsr
["_id"]
2285 # configurationStatus
2286 config_status
= db_nsr
.get("configurationStatus")
2289 "configurationStatus.{}.status".format(index
): status
2290 for index
, v
in enumerate(config_status
)
2294 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2296 except DbException
as e
:
2298 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2301 def _write_configuration_status(
2306 element_under_configuration
: str = None,
2307 element_type
: str = None,
2308 other_update
: dict = None,
2311 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2312 # .format(vca_index, status))
2315 db_path
= "configurationStatus.{}.".format(vca_index
)
2316 db_dict
= other_update
or {}
2318 db_dict
[db_path
+ "status"] = status
2319 if element_under_configuration
:
2321 db_path
+ "elementUnderConfiguration"
2322 ] = element_under_configuration
2324 db_dict
[db_path
+ "elementType"] = element_type
2325 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2326 except DbException
as e
:
2328 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2329 status
, nsr_id
, vca_index
, e
2333 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2335 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2336 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2337 Database is used because the result can be obtained from a different LCM worker in case of HA.
2338 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2339 :param db_nslcmop: database content of nslcmop
2340 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2341 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2342 computed 'vim-account-id'
2345 nslcmop_id
= db_nslcmop
["_id"]
2346 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2347 if placement_engine
== "PLA":
2349 logging_text
+ "Invoke and wait for placement optimization"
2351 await self
.msg
.aiowrite(
2352 "pla", "get_placement", {"nslcmopId": nslcmop_id
}, loop
=self
.loop
2354 db_poll_interval
= 5
2355 wait
= db_poll_interval
* 10
2357 while not pla_result
and wait
>= 0:
2358 await asyncio
.sleep(db_poll_interval
)
2359 wait
-= db_poll_interval
2360 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2361 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2365 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2368 for pla_vnf
in pla_result
["vnf"]:
2369 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2370 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2375 {"_id": vnfr
["_id"]},
2376 {"vim-account-id": pla_vnf
["vimAccountId"]},
2379 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2382 def update_nsrs_with_pla_result(self
, params
):
2384 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2386 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2388 except Exception as e
:
2389 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2391 async def instantiate(self
, nsr_id
, nslcmop_id
):
2394 :param nsr_id: ns instance to deploy
2395 :param nslcmop_id: operation to run
2399 # Try to lock HA task here
2400 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2401 if not task_is_locked_by_me
:
2403 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2407 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2408 self
.logger
.debug(logging_text
+ "Enter")
2410 # get all needed from database
2412 # database nsrs record
2415 # database nslcmops record
2418 # update operation on nsrs
2420 # update operation on nslcmops
2421 db_nslcmop_update
= {}
2423 nslcmop_operation_state
= None
2424 db_vnfrs
= {} # vnf's info indexed by member-index
2426 tasks_dict_info
= {} # from task to info text
2430 "Stage 1/5: preparation of the environment.",
2431 "Waiting for previous operations to terminate.",
2434 # ^ stage, step, VIM progress
2436 # wait for any previous tasks in process
2437 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2439 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2440 stage
[1] = "Reading from database."
2441 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2442 db_nsr_update
["detailed-status"] = "creating"
2443 db_nsr_update
["operational-status"] = "init"
2444 self
._write
_ns
_status
(
2446 ns_state
="BUILDING",
2447 current_operation
="INSTANTIATING",
2448 current_operation_id
=nslcmop_id
,
2449 other_update
=db_nsr_update
,
2451 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2453 # read from db: operation
2454 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2455 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2456 if db_nslcmop
["operationParams"].get("additionalParamsForVnf"):
2457 db_nslcmop
["operationParams"]["additionalParamsForVnf"] = json
.loads(
2458 db_nslcmop
["operationParams"]["additionalParamsForVnf"]
2460 ns_params
= db_nslcmop
.get("operationParams")
2461 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2462 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2464 timeout_ns_deploy
= self
.timeout
.get(
2465 "ns_deploy", self
.timeout_ns_deploy
2469 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2470 self
.logger
.debug(logging_text
+ stage
[1])
2471 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2472 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2473 self
.logger
.debug(logging_text
+ stage
[1])
2474 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2475 self
.fs
.sync(db_nsr
["nsd-id"])
2477 # nsr_name = db_nsr["name"] # TODO short-name??
2479 # read from db: vnf's of this ns
2480 stage
[1] = "Getting vnfrs from db."
2481 self
.logger
.debug(logging_text
+ stage
[1])
2482 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2484 # read from db: vnfd's for every vnf
2485 db_vnfds
= [] # every vnfd data
2487 # for each vnf in ns, read vnfd
2488 for vnfr
in db_vnfrs_list
:
2489 if vnfr
.get("kdur"):
2491 for kdur
in vnfr
["kdur"]:
2492 if kdur
.get("additionalParams"):
2493 kdur
["additionalParams"] = json
.loads(
2494 kdur
["additionalParams"]
2496 kdur_list
.append(kdur
)
2497 vnfr
["kdur"] = kdur_list
2499 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2500 vnfd_id
= vnfr
["vnfd-id"]
2501 vnfd_ref
= vnfr
["vnfd-ref"]
2502 self
.fs
.sync(vnfd_id
)
2504 # if we haven't this vnfd, read it from db
2505 if vnfd_id
not in db_vnfds
:
2507 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2510 self
.logger
.debug(logging_text
+ stage
[1])
2511 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2514 db_vnfds
.append(vnfd
)
2516 # Get or generates the _admin.deployed.VCA list
2517 vca_deployed_list
= None
2518 if db_nsr
["_admin"].get("deployed"):
2519 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2520 if vca_deployed_list
is None:
2521 vca_deployed_list
= []
2522 configuration_status_list
= []
2523 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2524 db_nsr_update
["configurationStatus"] = configuration_status_list
2525 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2526 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2527 elif isinstance(vca_deployed_list
, dict):
2528 # maintain backward compatibility. Change a dict to list at database
2529 vca_deployed_list
= list(vca_deployed_list
.values())
2530 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2531 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2534 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2536 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2537 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2539 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2540 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2541 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2543 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2546 # n2vc_redesign STEP 2 Deploy Network Scenario
2547 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2548 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2550 stage
[1] = "Deploying KDUs."
2551 # self.logger.debug(logging_text + "Before deploy_kdus")
2552 # Call to deploy_kdus in case exists the "vdu:kdu" param
2553 await self
.deploy_kdus(
2554 logging_text
=logging_text
,
2556 nslcmop_id
=nslcmop_id
,
2559 task_instantiation_info
=tasks_dict_info
,
2562 stage
[1] = "Getting VCA public key."
2563 # n2vc_redesign STEP 1 Get VCA public ssh-key
2564 # feature 1429. Add n2vc public key to needed VMs
2565 n2vc_key
= self
.n2vc
.get_public_key()
2566 n2vc_key_list
= [n2vc_key
]
2567 if self
.vca_config
.get("public_key"):
2568 n2vc_key_list
.append(self
.vca_config
["public_key"])
2570 stage
[1] = "Deploying NS at VIM."
2571 task_ro
= asyncio
.ensure_future(
2572 self
.instantiate_RO(
2573 logging_text
=logging_text
,
2577 db_nslcmop
=db_nslcmop
,
2580 n2vc_key_list
=n2vc_key_list
,
2584 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2585 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2587 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2588 stage
[1] = "Deploying Execution Environments."
2589 self
.logger
.debug(logging_text
+ stage
[1])
2591 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2592 for vnf_profile
in get_vnf_profiles(nsd
):
2593 vnfd_id
= vnf_profile
["vnfd-id"]
2594 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2595 member_vnf_index
= str(vnf_profile
["id"])
2596 db_vnfr
= db_vnfrs
[member_vnf_index
]
2597 base_folder
= vnfd
["_admin"]["storage"]
2603 # Get additional parameters
2604 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2605 if db_vnfr
.get("additionalParamsForVnf"):
2606 deploy_params
.update(
2607 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2610 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2611 if descriptor_config
:
2613 logging_text
=logging_text
2614 + "member_vnf_index={} ".format(member_vnf_index
),
2617 nslcmop_id
=nslcmop_id
,
2623 member_vnf_index
=member_vnf_index
,
2624 vdu_index
=vdu_index
,
2626 deploy_params
=deploy_params
,
2627 descriptor_config
=descriptor_config
,
2628 base_folder
=base_folder
,
2629 task_instantiation_info
=tasks_dict_info
,
2633 # Deploy charms for each VDU that supports one.
2634 for vdud
in get_vdu_list(vnfd
):
2636 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2637 vdur
= find_in_list(
2638 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2641 if vdur
.get("additionalParams"):
2642 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2644 deploy_params_vdu
= deploy_params
2645 deploy_params_vdu
["OSM"] = get_osm_params(
2646 db_vnfr
, vdu_id
, vdu_count_index
=0
2648 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2650 self
.logger
.debug("VDUD > {}".format(vdud
))
2652 "Descriptor config > {}".format(descriptor_config
)
2654 if descriptor_config
:
2657 for vdu_index
in range(vdud_count
):
2658 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2660 logging_text
=logging_text
2661 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2662 member_vnf_index
, vdu_id
, vdu_index
2666 nslcmop_id
=nslcmop_id
,
2672 member_vnf_index
=member_vnf_index
,
2673 vdu_index
=vdu_index
,
2675 deploy_params
=deploy_params_vdu
,
2676 descriptor_config
=descriptor_config
,
2677 base_folder
=base_folder
,
2678 task_instantiation_info
=tasks_dict_info
,
2681 for kdud
in get_kdu_list(vnfd
):
2682 kdu_name
= kdud
["name"]
2683 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2684 if descriptor_config
:
2689 x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
2691 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2692 if kdur
.get("additionalParams"):
2693 deploy_params_kdu
.update(
2694 parse_yaml_strings(kdur
["additionalParams"].copy())
2698 logging_text
=logging_text
,
2701 nslcmop_id
=nslcmop_id
,
2707 member_vnf_index
=member_vnf_index
,
2708 vdu_index
=vdu_index
,
2710 deploy_params
=deploy_params_kdu
,
2711 descriptor_config
=descriptor_config
,
2712 base_folder
=base_folder
,
2713 task_instantiation_info
=tasks_dict_info
,
2717 # Check if this NS has a charm configuration
2718 descriptor_config
= nsd
.get("ns-configuration")
2719 if descriptor_config
and descriptor_config
.get("juju"):
2722 member_vnf_index
= None
2728 # Get additional parameters
2729 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2730 if db_nsr
.get("additionalParamsForNs"):
2731 deploy_params
.update(
2732 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2734 base_folder
= nsd
["_admin"]["storage"]
2736 logging_text
=logging_text
,
2739 nslcmop_id
=nslcmop_id
,
2745 member_vnf_index
=member_vnf_index
,
2746 vdu_index
=vdu_index
,
2748 deploy_params
=deploy_params
,
2749 descriptor_config
=descriptor_config
,
2750 base_folder
=base_folder
,
2751 task_instantiation_info
=tasks_dict_info
,
2755 # rest of staff will be done at finally
2758 ROclient
.ROClientException
,
2764 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
2767 except asyncio
.CancelledError
:
2769 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
2771 exc
= "Operation was cancelled"
2772 except Exception as e
:
2773 exc
= traceback
.format_exc()
2774 self
.logger
.critical(
2775 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
2780 error_list
.append(str(exc
))
2782 # wait for pending tasks
2784 stage
[1] = "Waiting for instantiate pending tasks."
2785 self
.logger
.debug(logging_text
+ stage
[1])
2786 error_list
+= await self
._wait
_for
_tasks
(
2794 stage
[1] = stage
[2] = ""
2795 except asyncio
.CancelledError
:
2796 error_list
.append("Cancelled")
2797 # TODO cancel all tasks
2798 except Exception as exc
:
2799 error_list
.append(str(exc
))
2801 # update operation-status
2802 db_nsr_update
["operational-status"] = "running"
2803 # let's begin with VCA 'configured' status (later we can change it)
2804 db_nsr_update
["config-status"] = "configured"
2805 for task
, task_name
in tasks_dict_info
.items():
2806 if not task
.done() or task
.cancelled() or task
.exception():
2807 if task_name
.startswith(self
.task_name_deploy_vca
):
2808 # A N2VC task is pending
2809 db_nsr_update
["config-status"] = "failed"
2811 # RO or KDU task is pending
2812 db_nsr_update
["operational-status"] = "failed"
2814 # update status at database
2816 error_detail
= ". ".join(error_list
)
2817 self
.logger
.error(logging_text
+ error_detail
)
2818 error_description_nslcmop
= "{} Detail: {}".format(
2819 stage
[0], error_detail
2821 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
2822 nslcmop_id
, stage
[0]
2825 db_nsr_update
["detailed-status"] = (
2826 error_description_nsr
+ " Detail: " + error_detail
2828 db_nslcmop_update
["detailed-status"] = error_detail
2829 nslcmop_operation_state
= "FAILED"
2833 error_description_nsr
= error_description_nslcmop
= None
2835 db_nsr_update
["detailed-status"] = "Done"
2836 db_nslcmop_update
["detailed-status"] = "Done"
2837 nslcmop_operation_state
= "COMPLETED"
2840 self
._write
_ns
_status
(
2843 current_operation
="IDLE",
2844 current_operation_id
=None,
2845 error_description
=error_description_nsr
,
2846 error_detail
=error_detail
,
2847 other_update
=db_nsr_update
,
2849 self
._write
_op
_status
(
2852 error_message
=error_description_nslcmop
,
2853 operation_state
=nslcmop_operation_state
,
2854 other_update
=db_nslcmop_update
,
2857 if nslcmop_operation_state
:
2859 await self
.msg
.aiowrite(
2864 "nslcmop_id": nslcmop_id
,
2865 "operationState": nslcmop_operation_state
,
2869 except Exception as e
:
2871 logging_text
+ "kafka_write notification Exception {}".format(e
)
2874 self
.logger
.debug(logging_text
+ "Exit")
2875 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2877 def _get_vnfd(self
, vnfd_id
: str, cached_vnfds
: Dict
[str, Any
]):
2878 if vnfd_id
not in cached_vnfds
:
2879 cached_vnfds
[vnfd_id
] = self
.db
.get_one("vnfds", {"id": vnfd_id
})
2880 return cached_vnfds
[vnfd_id
]
2882 def _get_vnfr(self
, nsr_id
: str, vnf_profile_id
: str, cached_vnfrs
: Dict
[str, Any
]):
2883 if vnf_profile_id
not in cached_vnfrs
:
2884 cached_vnfrs
[vnf_profile_id
] = self
.db
.get_one(
2887 "member-vnf-index-ref": vnf_profile_id
,
2888 "nsr-id-ref": nsr_id
,
2891 return cached_vnfrs
[vnf_profile_id
]
2893 def _is_deployed_vca_in_relation(
2894 self
, vca
: DeployedVCA
, relation
: Relation
2897 for endpoint
in (relation
.provider
, relation
.requirer
):
2898 if endpoint
["kdu-resource-profile-id"]:
2901 vca
.vnf_profile_id
== endpoint
.vnf_profile_id
2902 and vca
.vdu_profile_id
== endpoint
.vdu_profile_id
2903 and vca
.execution_environment_ref
== endpoint
.execution_environment_ref
2909 def _update_ee_relation_data_with_implicit_data(
2910 self
, nsr_id
, nsd
, ee_relation_data
, cached_vnfds
, vnf_profile_id
: str = None
2912 ee_relation_data
= safe_get_ee_relation(
2913 nsr_id
, ee_relation_data
, vnf_profile_id
=vnf_profile_id
2915 ee_relation_level
= EELevel
.get_level(ee_relation_data
)
2916 if (ee_relation_level
in (EELevel
.VNF
, EELevel
.VDU
)) and not ee_relation_data
[
2917 "execution-environment-ref"
2919 vnf_profile
= get_vnf_profile(nsd
, ee_relation_data
["vnf-profile-id"])
2920 vnfd_id
= vnf_profile
["vnfd-id"]
2921 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2924 if ee_relation_level
== EELevel
.VNF
2925 else ee_relation_data
["vdu-profile-id"]
2927 ee
= get_juju_ee_ref(db_vnfd
, entity_id
)
2930 f
"not execution environments found for ee_relation {ee_relation_data}"
2932 ee_relation_data
["execution-environment-ref"] = ee
["id"]
2933 return ee_relation_data
2935 def _get_ns_relations(
2938 nsd
: Dict
[str, Any
],
2940 cached_vnfds
: Dict
[str, Any
],
2941 ) -> List
[Relation
]:
2943 db_ns_relations
= get_ns_configuration_relation_list(nsd
)
2944 for r
in db_ns_relations
:
2945 provider_dict
= None
2946 requirer_dict
= None
2947 if all(key
in r
for key
in ("provider", "requirer")):
2948 provider_dict
= r
["provider"]
2949 requirer_dict
= r
["requirer"]
2950 elif "entities" in r
:
2951 provider_id
= r
["entities"][0]["id"]
2954 "endpoint": r
["entities"][0]["endpoint"],
2956 if provider_id
!= nsd
["id"]:
2957 provider_dict
["vnf-profile-id"] = provider_id
2958 requirer_id
= r
["entities"][1]["id"]
2961 "endpoint": r
["entities"][1]["endpoint"],
2963 if requirer_id
!= nsd
["id"]:
2964 requirer_dict
["vnf-profile-id"] = requirer_id
2967 "provider/requirer or entities must be included in the relation."
2969 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2970 nsr_id
, nsd
, provider_dict
, cached_vnfds
2972 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2973 nsr_id
, nsd
, requirer_dict
, cached_vnfds
2975 provider
= EERelation(relation_provider
)
2976 requirer
= EERelation(relation_requirer
)
2977 relation
= Relation(r
["name"], provider
, requirer
)
2978 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
2980 relations
.append(relation
)
2983 def _get_vnf_relations(
2986 nsd
: Dict
[str, Any
],
2988 cached_vnfds
: Dict
[str, Any
],
2989 ) -> List
[Relation
]:
2991 vnf_profile
= get_vnf_profile(nsd
, vca
.vnf_profile_id
)
2992 vnf_profile_id
= vnf_profile
["id"]
2993 vnfd_id
= vnf_profile
["vnfd-id"]
2994 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2995 db_vnf_relations
= get_relation_list(db_vnfd
, vnfd_id
)
2996 for r
in db_vnf_relations
:
2997 provider_dict
= None
2998 requirer_dict
= None
2999 if all(key
in r
for key
in ("provider", "requirer")):
3000 provider_dict
= r
["provider"]
3001 requirer_dict
= r
["requirer"]
3002 elif "entities" in r
:
3003 provider_id
= r
["entities"][0]["id"]
3006 "vnf-profile-id": vnf_profile_id
,
3007 "endpoint": r
["entities"][0]["endpoint"],
3009 if provider_id
!= vnfd_id
:
3010 provider_dict
["vdu-profile-id"] = provider_id
3011 requirer_id
= r
["entities"][1]["id"]
3014 "vnf-profile-id": vnf_profile_id
,
3015 "endpoint": r
["entities"][1]["endpoint"],
3017 if requirer_id
!= vnfd_id
:
3018 requirer_dict
["vdu-profile-id"] = requirer_id
3021 "provider/requirer or entities must be included in the relation."
3023 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3024 nsr_id
, nsd
, provider_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3026 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3027 nsr_id
, nsd
, requirer_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3029 provider
= EERelation(relation_provider
)
3030 requirer
= EERelation(relation_requirer
)
3031 relation
= Relation(r
["name"], provider
, requirer
)
3032 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
3034 relations
.append(relation
)
3037 def _get_kdu_resource_data(
3039 ee_relation
: EERelation
,
3040 db_nsr
: Dict
[str, Any
],
3041 cached_vnfds
: Dict
[str, Any
],
3042 ) -> DeployedK8sResource
:
3043 nsd
= get_nsd(db_nsr
)
3044 vnf_profiles
= get_vnf_profiles(nsd
)
3045 vnfd_id
= find_in_list(
3047 lambda vnf_profile
: vnf_profile
["id"] == ee_relation
.vnf_profile_id
,
3049 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
3050 kdu_resource_profile
= get_kdu_resource_profile(
3051 db_vnfd
, ee_relation
.kdu_resource_profile_id
3053 kdu_name
= kdu_resource_profile
["kdu-name"]
3054 deployed_kdu
, _
= get_deployed_kdu(
3055 db_nsr
.get("_admin", ()).get("deployed", ()),
3057 ee_relation
.vnf_profile_id
,
3059 deployed_kdu
.update({"resource-name": kdu_resource_profile
["resource-name"]})
3062 def _get_deployed_component(
3064 ee_relation
: EERelation
,
3065 db_nsr
: Dict
[str, Any
],
3066 cached_vnfds
: Dict
[str, Any
],
3067 ) -> DeployedComponent
:
3068 nsr_id
= db_nsr
["_id"]
3069 deployed_component
= None
3070 ee_level
= EELevel
.get_level(ee_relation
)
3071 if ee_level
== EELevel
.NS
:
3072 vca
= get_deployed_vca(db_nsr
, {"vdu_id": None, "member-vnf-index": None})
3074 deployed_component
= DeployedVCA(nsr_id
, vca
)
3075 elif ee_level
== EELevel
.VNF
:
3076 vca
= get_deployed_vca(
3080 "member-vnf-index": ee_relation
.vnf_profile_id
,
3081 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3085 deployed_component
= DeployedVCA(nsr_id
, vca
)
3086 elif ee_level
== EELevel
.VDU
:
3087 vca
= get_deployed_vca(
3090 "vdu_id": ee_relation
.vdu_profile_id
,
3091 "member-vnf-index": ee_relation
.vnf_profile_id
,
3092 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3096 deployed_component
= DeployedVCA(nsr_id
, vca
)
3097 elif ee_level
== EELevel
.KDU
:
3098 kdu_resource_data
= self
._get
_kdu
_resource
_data
(
3099 ee_relation
, db_nsr
, cached_vnfds
3101 if kdu_resource_data
:
3102 deployed_component
= DeployedK8sResource(kdu_resource_data
)
3103 return deployed_component
3105 async def _add_relation(
3109 db_nsr
: Dict
[str, Any
],
3110 cached_vnfds
: Dict
[str, Any
],
3111 cached_vnfrs
: Dict
[str, Any
],
3113 deployed_provider
= self
._get
_deployed
_component
(
3114 relation
.provider
, db_nsr
, cached_vnfds
3116 deployed_requirer
= self
._get
_deployed
_component
(
3117 relation
.requirer
, db_nsr
, cached_vnfds
3121 and deployed_requirer
3122 and deployed_provider
.config_sw_installed
3123 and deployed_requirer
.config_sw_installed
3125 provider_db_vnfr
= (
3127 relation
.provider
.nsr_id
,
3128 relation
.provider
.vnf_profile_id
,
3131 if relation
.provider
.vnf_profile_id
3134 requirer_db_vnfr
= (
3136 relation
.requirer
.nsr_id
,
3137 relation
.requirer
.vnf_profile_id
,
3140 if relation
.requirer
.vnf_profile_id
3143 provider_vca_id
= self
.get_vca_id(provider_db_vnfr
, db_nsr
)
3144 requirer_vca_id
= self
.get_vca_id(requirer_db_vnfr
, db_nsr
)
3145 provider_relation_endpoint
= RelationEndpoint(
3146 deployed_provider
.ee_id
,
3148 relation
.provider
.endpoint
,
3150 requirer_relation_endpoint
= RelationEndpoint(
3151 deployed_requirer
.ee_id
,
3153 relation
.requirer
.endpoint
,
3155 await self
.vca_map
[vca_type
].add_relation(
3156 provider
=provider_relation_endpoint
,
3157 requirer
=requirer_relation_endpoint
,
3159 # remove entry from relations list
3163 async def _add_vca_relations(
3169 timeout
: int = 3600,
3173 # 1. find all relations for this VCA
3174 # 2. wait for other peers related
3178 # STEP 1: find all relations for this VCA
3181 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3182 nsd
= get_nsd(db_nsr
)
3185 deployed_vca_dict
= get_deployed_vca_list(db_nsr
)[vca_index
]
3186 my_vca
= DeployedVCA(nsr_id
, deployed_vca_dict
)
3191 relations
.extend(self
._get
_ns
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3192 relations
.extend(self
._get
_vnf
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3194 # if no relations, terminate
3196 self
.logger
.debug(logging_text
+ " No relations")
3199 self
.logger
.debug(logging_text
+ " adding relations {}".format(relations
))
3206 if now
- start
>= timeout
:
3207 self
.logger
.error(logging_text
+ " : timeout adding relations")
3210 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3211 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3213 # for each relation, find the VCA's related
3214 for relation
in relations
.copy():
3215 added
= await self
._add
_relation
(
3223 relations
.remove(relation
)
3226 self
.logger
.debug("Relations added")
3228 await asyncio
.sleep(5.0)
3232 except Exception as e
:
3233 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
3236 async def _install_kdu(
3244 k8s_instance_info
: dict,
3245 k8params
: dict = None,
3251 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
3254 "collection": "nsrs",
3255 "filter": {"_id": nsr_id
},
3256 "path": nsr_db_path
,
3259 if k8s_instance_info
.get("kdu-deployment-name"):
3260 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
3262 kdu_instance
= self
.k8scluster_map
[
3264 ].generate_kdu_instance_name(
3265 db_dict
=db_dict_install
,
3266 kdu_model
=k8s_instance_info
["kdu-model"],
3267 kdu_name
=k8s_instance_info
["kdu-name"],
3270 # Update the nsrs table with the kdu-instance value
3274 _desc
={nsr_db_path
+ ".kdu-instance": kdu_instance
},
3277 # Update the nsrs table with the actual namespace being used, if the k8scluster-type is `juju` or
3278 # `juju-bundle`. This verification is needed because there is not a standard/homogeneous namespace
3279 # between the Helm Charts and Juju Bundles-based KNFs. If we found a way of having an homogeneous
3280 # namespace, this first verification could be removed, and the next step would be done for any kind
3282 # TODO -> find a way to have an homogeneous namespace between the Helm Charts and Juju Bundles-based
3283 # KNFs (Bug 2027: https://osm.etsi.org/bugzilla/show_bug.cgi?id=2027)
3284 if k8sclustertype
in ("juju", "juju-bundle"):
3285 # First, verify if the current namespace is present in the `_admin.projects_read` (if not, it means
3286 # that the user passed a namespace which he wants its KDU to be deployed in)
3292 "_admin.projects_write": k8s_instance_info
["namespace"],
3293 "_admin.projects_read": k8s_instance_info
["namespace"],
3299 f
"Updating namespace/model for Juju Bundle from {k8s_instance_info['namespace']} to {kdu_instance}"
3304 _desc
={f
"{nsr_db_path}.namespace": kdu_instance
},
3306 k8s_instance_info
["namespace"] = kdu_instance
3308 await self
.k8scluster_map
[k8sclustertype
].install(
3309 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3310 kdu_model
=k8s_instance_info
["kdu-model"],
3313 db_dict
=db_dict_install
,
3315 kdu_name
=k8s_instance_info
["kdu-name"],
3316 namespace
=k8s_instance_info
["namespace"],
3317 kdu_instance
=kdu_instance
,
3321 # Obtain services to obtain management service ip
3322 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
3323 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3324 kdu_instance
=kdu_instance
,
3325 namespace
=k8s_instance_info
["namespace"],
3328 # Obtain management service info (if exists)
3329 vnfr_update_dict
= {}
3330 kdu_config
= get_configuration(vnfd
, kdud
["name"])
3332 target_ee_list
= kdu_config
.get("execution-environment-list", [])
3337 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
3340 for service
in kdud
.get("service", [])
3341 if service
.get("mgmt-service")
3343 for mgmt_service
in mgmt_services
:
3344 for service
in services
:
3345 if service
["name"].startswith(mgmt_service
["name"]):
3346 # Mgmt service found, Obtain service ip
3347 ip
= service
.get("external_ip", service
.get("cluster_ip"))
3348 if isinstance(ip
, list) and len(ip
) == 1:
3352 "kdur.{}.ip-address".format(kdu_index
)
3355 # Check if must update also mgmt ip at the vnf
3356 service_external_cp
= mgmt_service
.get(
3357 "external-connection-point-ref"
3359 if service_external_cp
:
3361 deep_get(vnfd
, ("mgmt-interface", "cp"))
3362 == service_external_cp
3364 vnfr_update_dict
["ip-address"] = ip
3369 "external-connection-point-ref", ""
3371 == service_external_cp
,
3374 "kdur.{}.ip-address".format(kdu_index
)
3379 "Mgmt service name: {} not found".format(
3380 mgmt_service
["name"]
3384 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3385 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3387 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3390 and kdu_config
.get("initial-config-primitive")
3391 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3393 initial_config_primitive_list
= kdu_config
.get(
3394 "initial-config-primitive"
3396 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3398 for initial_config_primitive
in initial_config_primitive_list
:
3399 primitive_params_
= self
._map
_primitive
_params
(
3400 initial_config_primitive
, {}, {}
3403 await asyncio
.wait_for(
3404 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3405 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3406 kdu_instance
=kdu_instance
,
3407 primitive_name
=initial_config_primitive
["name"],
3408 params
=primitive_params_
,
3409 db_dict
=db_dict_install
,
3415 except Exception as e
:
3416 # Prepare update db with error and raise exception
3419 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3423 vnfr_data
.get("_id"),
3424 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3427 # ignore to keep original exception
3429 # reraise original error
3434 async def deploy_kdus(
3441 task_instantiation_info
,
3443 # Launch kdus if present in the descriptor
3445 k8scluster_id_2_uuic
= {
3446 "helm-chart-v3": {},
3451 async def _get_cluster_id(cluster_id
, cluster_type
):
3452 nonlocal k8scluster_id_2_uuic
3453 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3454 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3456 # check if K8scluster is creating and wait look if previous tasks in process
3457 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3458 "k8scluster", cluster_id
3461 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3462 task_name
, cluster_id
3464 self
.logger
.debug(logging_text
+ text
)
3465 await asyncio
.wait(task_dependency
, timeout
=3600)
3467 db_k8scluster
= self
.db
.get_one(
3468 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3470 if not db_k8scluster
:
3471 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3473 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3475 if cluster_type
== "helm-chart-v3":
3477 # backward compatibility for existing clusters that have not been initialized for helm v3
3478 k8s_credentials
= yaml
.safe_dump(
3479 db_k8scluster
.get("credentials")
3481 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3482 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3484 db_k8scluster_update
= {}
3485 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3486 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3487 db_k8scluster_update
[
3488 "_admin.helm-chart-v3.created"
3490 db_k8scluster_update
[
3491 "_admin.helm-chart-v3.operationalState"
3494 "k8sclusters", cluster_id
, db_k8scluster_update
3496 except Exception as e
:
3499 + "error initializing helm-v3 cluster: {}".format(str(e
))
3502 "K8s cluster '{}' has not been initialized for '{}'".format(
3503 cluster_id
, cluster_type
3508 "K8s cluster '{}' has not been initialized for '{}'".format(
3509 cluster_id
, cluster_type
3512 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3515 logging_text
+= "Deploy kdus: "
3518 db_nsr_update
= {"_admin.deployed.K8s": []}
3519 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3522 updated_cluster_list
= []
3523 updated_v3_cluster_list
= []
3525 for vnfr_data
in db_vnfrs
.values():
3526 vca_id
= self
.get_vca_id(vnfr_data
, {})
3527 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3528 # Step 0: Prepare and set parameters
3529 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3530 vnfd_id
= vnfr_data
.get("vnfd-id")
3531 vnfd_with_id
= find_in_list(
3532 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3536 for kdud
in vnfd_with_id
["kdu"]
3537 if kdud
["name"] == kdur
["kdu-name"]
3539 namespace
= kdur
.get("k8s-namespace")
3540 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3541 if kdur
.get("helm-chart"):
3542 kdumodel
= kdur
["helm-chart"]
3543 # Default version: helm3, if helm-version is v2 assign v2
3544 k8sclustertype
= "helm-chart-v3"
3545 self
.logger
.debug("kdur: {}".format(kdur
))
3547 kdur
.get("helm-version")
3548 and kdur
.get("helm-version") == "v2"
3550 k8sclustertype
= "helm-chart"
3551 elif kdur
.get("juju-bundle"):
3552 kdumodel
= kdur
["juju-bundle"]
3553 k8sclustertype
= "juju-bundle"
3556 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3557 "juju-bundle. Maybe an old NBI version is running".format(
3558 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3561 # check if kdumodel is a file and exists
3563 vnfd_with_id
= find_in_list(
3564 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3566 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3567 if storage
: # may be not present if vnfd has not artifacts
3568 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3569 if storage
["pkg-dir"]:
3570 filename
= "{}/{}/{}s/{}".format(
3577 filename
= "{}/Scripts/{}s/{}".format(
3582 if self
.fs
.file_exists(
3583 filename
, mode
="file"
3584 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3585 kdumodel
= self
.fs
.path
+ filename
3586 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3588 except Exception: # it is not a file
3591 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3592 step
= "Synchronize repos for k8s cluster '{}'".format(
3595 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3599 k8sclustertype
== "helm-chart"
3600 and cluster_uuid
not in updated_cluster_list
3602 k8sclustertype
== "helm-chart-v3"
3603 and cluster_uuid
not in updated_v3_cluster_list
3605 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3606 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3607 cluster_uuid
=cluster_uuid
3610 if del_repo_list
or added_repo_dict
:
3611 if k8sclustertype
== "helm-chart":
3613 "_admin.helm_charts_added." + item
: None
3614 for item
in del_repo_list
3617 "_admin.helm_charts_added." + item
: name
3618 for item
, name
in added_repo_dict
.items()
3620 updated_cluster_list
.append(cluster_uuid
)
3621 elif k8sclustertype
== "helm-chart-v3":
3623 "_admin.helm_charts_v3_added." + item
: None
3624 for item
in del_repo_list
3627 "_admin.helm_charts_v3_added." + item
: name
3628 for item
, name
in added_repo_dict
.items()
3630 updated_v3_cluster_list
.append(cluster_uuid
)
3632 logging_text
+ "repos synchronized on k8s cluster "
3633 "'{}' to_delete: {}, to_add: {}".format(
3634 k8s_cluster_id
, del_repo_list
, added_repo_dict
3639 {"_id": k8s_cluster_id
},
3645 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3646 vnfr_data
["member-vnf-index-ref"],
3650 k8s_instance_info
= {
3651 "kdu-instance": None,
3652 "k8scluster-uuid": cluster_uuid
,
3653 "k8scluster-type": k8sclustertype
,
3654 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3655 "kdu-name": kdur
["kdu-name"],
3656 "kdu-model": kdumodel
,
3657 "namespace": namespace
,
3658 "kdu-deployment-name": kdu_deployment_name
,
3660 db_path
= "_admin.deployed.K8s.{}".format(index
)
3661 db_nsr_update
[db_path
] = k8s_instance_info
3662 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3663 vnfd_with_id
= find_in_list(
3664 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3666 task
= asyncio
.ensure_future(
3675 k8params
=desc_params
,
3680 self
.lcm_tasks
.register(
3684 "instantiate_KDU-{}".format(index
),
3687 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3693 except (LcmException
, asyncio
.CancelledError
):
3695 except Exception as e
:
3696 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3697 if isinstance(e
, (N2VCException
, DbException
)):
3698 self
.logger
.error(logging_text
+ msg
)
3700 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3701 raise LcmException(msg
)
3704 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3723 task_instantiation_info
,
3726 # launch instantiate_N2VC in a asyncio task and register task object
3727 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3728 # if not found, create one entry and update database
3729 # fill db_nsr._admin.deployed.VCA.<index>
3732 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
3734 if "execution-environment-list" in descriptor_config
:
3735 ee_list
= descriptor_config
.get("execution-environment-list", [])
3736 elif "juju" in descriptor_config
:
3737 ee_list
= [descriptor_config
] # ns charms
3738 else: # other types as script are not supported
3741 for ee_item
in ee_list
:
3744 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3745 ee_item
.get("juju"), ee_item
.get("helm-chart")
3748 ee_descriptor_id
= ee_item
.get("id")
3749 if ee_item
.get("juju"):
3750 vca_name
= ee_item
["juju"].get("charm")
3753 if ee_item
["juju"].get("charm") is not None
3756 if ee_item
["juju"].get("cloud") == "k8s":
3757 vca_type
= "k8s_proxy_charm"
3758 elif ee_item
["juju"].get("proxy") is False:
3759 vca_type
= "native_charm"
3760 elif ee_item
.get("helm-chart"):
3761 vca_name
= ee_item
["helm-chart"]
3762 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
3765 vca_type
= "helm-v3"
3768 logging_text
+ "skipping non juju neither charm configuration"
3773 for vca_index
, vca_deployed
in enumerate(
3774 db_nsr
["_admin"]["deployed"]["VCA"]
3776 if not vca_deployed
:
3779 vca_deployed
.get("member-vnf-index") == member_vnf_index
3780 and vca_deployed
.get("vdu_id") == vdu_id
3781 and vca_deployed
.get("kdu_name") == kdu_name
3782 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
3783 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
3787 # not found, create one.
3789 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
3792 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
3794 target
+= "/kdu/{}".format(kdu_name
)
3796 "target_element": target
,
3797 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3798 "member-vnf-index": member_vnf_index
,
3800 "kdu_name": kdu_name
,
3801 "vdu_count_index": vdu_index
,
3802 "operational-status": "init", # TODO revise
3803 "detailed-status": "", # TODO revise
3804 "step": "initial-deploy", # TODO revise
3806 "vdu_name": vdu_name
,
3808 "ee_descriptor_id": ee_descriptor_id
,
3812 # create VCA and configurationStatus in db
3814 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
3815 "configurationStatus.{}".format(vca_index
): dict(),
3817 self
.update_db_2("nsrs", nsr_id
, db_dict
)
3819 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
3821 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
3822 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
3823 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
3826 task_n2vc
= asyncio
.ensure_future(
3827 self
.instantiate_N2VC(
3828 logging_text
=logging_text
,
3829 vca_index
=vca_index
,
3835 vdu_index
=vdu_index
,
3836 deploy_params
=deploy_params
,
3837 config_descriptor
=descriptor_config
,
3838 base_folder
=base_folder
,
3839 nslcmop_id
=nslcmop_id
,
3843 ee_config_descriptor
=ee_item
,
3846 self
.lcm_tasks
.register(
3850 "instantiate_N2VC-{}".format(vca_index
),
3853 task_instantiation_info
[
3855 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
3856 member_vnf_index
or "", vdu_id
or ""
3860 def _create_nslcmop(nsr_id
, operation
, params
):
3862 Creates a ns-lcm-opp content to be stored at database.
3863 :param nsr_id: internal id of the instance
3864 :param operation: instantiate, terminate, scale, action, ...
3865 :param params: user parameters for the operation
3866 :return: dictionary following SOL005 format
3868 # Raise exception if invalid arguments
3869 if not (nsr_id
and operation
and params
):
3871 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3878 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3879 "operationState": "PROCESSING",
3880 "statusEnteredTime": now
,
3881 "nsInstanceId": nsr_id
,
3882 "lcmOperationType": operation
,
3884 "isAutomaticInvocation": False,
3885 "operationParams": params
,
3886 "isCancelPending": False,
3888 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
3889 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
3894 def _format_additional_params(self
, params
):
3895 params
= params
or {}
3896 for key
, value
in params
.items():
3897 if str(value
).startswith("!!yaml "):
3898 params
[key
] = yaml
.safe_load(value
[7:])
3901 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
3902 primitive
= seq
.get("name")
3903 primitive_params
= {}
3905 "member_vnf_index": vnf_index
,
3906 "primitive": primitive
,
3907 "primitive_params": primitive_params
,
3910 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
3914 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
3915 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
3916 if op
.get("operationState") == "COMPLETED":
3917 # b. Skip sub-operation
3918 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3919 return self
.SUBOPERATION_STATUS_SKIP
3921 # c. retry executing sub-operation
3922 # The sub-operation exists, and operationState != 'COMPLETED'
3923 # Update operationState = 'PROCESSING' to indicate a retry.
3924 operationState
= "PROCESSING"
3925 detailed_status
= "In progress"
3926 self
._update
_suboperation
_status
(
3927 db_nslcmop
, op_index
, operationState
, detailed_status
3929 # Return the sub-operation index
3930 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3931 # with arguments extracted from the sub-operation
3934 # Find a sub-operation where all keys in a matching dictionary must match
3935 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3936 def _find_suboperation(self
, db_nslcmop
, match
):
3937 if db_nslcmop
and match
:
3938 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
3939 for i
, op
in enumerate(op_list
):
3940 if all(op
.get(k
) == match
[k
] for k
in match
):
3942 return self
.SUBOPERATION_STATUS_NOT_FOUND
3944 # Update status for a sub-operation given its index
3945 def _update_suboperation_status(
3946 self
, db_nslcmop
, op_index
, operationState
, detailed_status
3948 # Update DB for HA tasks
3949 q_filter
= {"_id": db_nslcmop
["_id"]}
3951 "_admin.operations.{}.operationState".format(op_index
): operationState
,
3952 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
3955 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
3958 # Add sub-operation, return the index of the added sub-operation
3959 # Optionally, set operationState, detailed-status, and operationType
3960 # Status and type are currently set for 'scale' sub-operations:
3961 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3962 # 'detailed-status' : status message
3963 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3964 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3965 def _add_suboperation(
3973 mapped_primitive_params
,
3974 operationState
=None,
3975 detailed_status
=None,
3978 RO_scaling_info
=None,
3981 return self
.SUBOPERATION_STATUS_NOT_FOUND
3982 # Get the "_admin.operations" list, if it exists
3983 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
3984 op_list
= db_nslcmop_admin
.get("operations")
3985 # Create or append to the "_admin.operations" list
3987 "member_vnf_index": vnf_index
,
3989 "vdu_count_index": vdu_count_index
,
3990 "primitive": primitive
,
3991 "primitive_params": mapped_primitive_params
,
3994 new_op
["operationState"] = operationState
3996 new_op
["detailed-status"] = detailed_status
3998 new_op
["lcmOperationType"] = operationType
4000 new_op
["RO_nsr_id"] = RO_nsr_id
4002 new_op
["RO_scaling_info"] = RO_scaling_info
4004 # No existing operations, create key 'operations' with current operation as first list element
4005 db_nslcmop_admin
.update({"operations": [new_op
]})
4006 op_list
= db_nslcmop_admin
.get("operations")
4008 # Existing operations, append operation to list
4009 op_list
.append(new_op
)
4011 db_nslcmop_update
= {"_admin.operations": op_list
}
4012 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
4013 op_index
= len(op_list
) - 1
4016 # Helper methods for scale() sub-operations
4018 # pre-scale/post-scale:
4019 # Check for 3 different cases:
4020 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
4021 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
4022 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
4023 def _check_or_add_scale_suboperation(
4027 vnf_config_primitive
,
4031 RO_scaling_info
=None,
4033 # Find this sub-operation
4034 if RO_nsr_id
and RO_scaling_info
:
4035 operationType
= "SCALE-RO"
4037 "member_vnf_index": vnf_index
,
4038 "RO_nsr_id": RO_nsr_id
,
4039 "RO_scaling_info": RO_scaling_info
,
4043 "member_vnf_index": vnf_index
,
4044 "primitive": vnf_config_primitive
,
4045 "primitive_params": primitive_params
,
4046 "lcmOperationType": operationType
,
4048 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
4049 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
4050 # a. New sub-operation
4051 # The sub-operation does not exist, add it.
4052 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
4053 # The following parameters are set to None for all kind of scaling:
4055 vdu_count_index
= None
4057 if RO_nsr_id
and RO_scaling_info
:
4058 vnf_config_primitive
= None
4059 primitive_params
= None
4062 RO_scaling_info
= None
4063 # Initial status for sub-operation
4064 operationState
= "PROCESSING"
4065 detailed_status
= "In progress"
4066 # Add sub-operation for pre/post-scaling (zero or more operations)
4067 self
._add
_suboperation
(
4073 vnf_config_primitive
,
4081 return self
.SUBOPERATION_STATUS_NEW
4083 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
4084 # or op_index (operationState != 'COMPLETED')
4085 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
4087 # Function to return execution_environment id
4089 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
4090 # TODO vdu_index_count
4091 for vca
in vca_deployed_list
:
4092 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
4095 async def destroy_N2VC(
4103 exec_primitives
=True,
4108 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
4109 :param logging_text:
4111 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
4112 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
4113 :param vca_index: index in the database _admin.deployed.VCA
4114 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
4115 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
4116 not executed properly
4117 :param scaling_in: True destroys the application, False destroys the model
4118 :return: None or exception
4123 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
4124 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
4128 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
4130 # execute terminate_primitives
4132 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
4133 config_descriptor
.get("terminate-config-primitive"),
4134 vca_deployed
.get("ee_descriptor_id"),
4136 vdu_id
= vca_deployed
.get("vdu_id")
4137 vdu_count_index
= vca_deployed
.get("vdu_count_index")
4138 vdu_name
= vca_deployed
.get("vdu_name")
4139 vnf_index
= vca_deployed
.get("member-vnf-index")
4140 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
4141 for seq
in terminate_primitives
:
4142 # For each sequence in list, get primitive and call _ns_execute_primitive()
4143 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
4144 vnf_index
, seq
.get("name")
4146 self
.logger
.debug(logging_text
+ step
)
4147 # Create the primitive for each sequence, i.e. "primitive": "touch"
4148 primitive
= seq
.get("name")
4149 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
4154 self
._add
_suboperation
(
4161 mapped_primitive_params
,
4163 # Sub-operations: Call _ns_execute_primitive() instead of action()
4165 result
, result_detail
= await self
._ns
_execute
_primitive
(
4166 vca_deployed
["ee_id"],
4168 mapped_primitive_params
,
4172 except LcmException
:
4173 # this happens when VCA is not deployed. In this case it is not needed to terminate
4175 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
4176 if result
not in result_ok
:
4178 "terminate_primitive {} for vnf_member_index={} fails with "
4179 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
4181 # set that this VCA do not need terminated
4182 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
4186 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
4189 # Delete Prometheus Jobs if any
4190 # This uses NSR_ID, so it will destroy any jobs under this index
4191 self
.db
.del_list("prometheus_jobs", {"nsr_id": db_nslcmop
["nsInstanceId"]})
4194 await self
.vca_map
[vca_type
].delete_execution_environment(
4195 vca_deployed
["ee_id"],
4196 scaling_in
=scaling_in
,
4201 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
4202 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
4203 namespace
= "." + db_nsr
["_id"]
4205 await self
.n2vc
.delete_namespace(
4206 namespace
=namespace
,
4207 total_timeout
=self
.timeout_charm_delete
,
4210 except N2VCNotFound
: # already deleted. Skip
4212 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
4214 async def _terminate_RO(
4215 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4218 Terminates a deployment from RO
4219 :param logging_text:
4220 :param nsr_deployed: db_nsr._admin.deployed
4223 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
4224 this method will update only the index 2, but it will write on database the concatenated content of the list
4229 ro_nsr_id
= ro_delete_action
= None
4230 if nsr_deployed
and nsr_deployed
.get("RO"):
4231 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
4232 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
4235 stage
[2] = "Deleting ns from VIM."
4236 db_nsr_update
["detailed-status"] = " ".join(stage
)
4237 self
._write
_op
_status
(nslcmop_id
, stage
)
4238 self
.logger
.debug(logging_text
+ stage
[2])
4239 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4240 self
._write
_op
_status
(nslcmop_id
, stage
)
4241 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
4242 ro_delete_action
= desc
["action_id"]
4244 "_admin.deployed.RO.nsr_delete_action_id"
4245 ] = ro_delete_action
4246 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4247 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4248 if ro_delete_action
:
4249 # wait until NS is deleted from VIM
4250 stage
[2] = "Waiting ns deleted from VIM."
4251 detailed_status_old
= None
4255 + " RO_id={} ro_delete_action={}".format(
4256 ro_nsr_id
, ro_delete_action
4259 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4260 self
._write
_op
_status
(nslcmop_id
, stage
)
4262 delete_timeout
= 20 * 60 # 20 minutes
4263 while delete_timeout
> 0:
4264 desc
= await self
.RO
.show(
4266 item_id_name
=ro_nsr_id
,
4267 extra_item
="action",
4268 extra_item_id
=ro_delete_action
,
4272 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
4274 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
4275 if ns_status
== "ERROR":
4276 raise ROclient
.ROClientException(ns_status_info
)
4277 elif ns_status
== "BUILD":
4278 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
4279 elif ns_status
== "ACTIVE":
4280 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4281 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4286 ), "ROclient.check_action_status returns unknown {}".format(
4289 if stage
[2] != detailed_status_old
:
4290 detailed_status_old
= stage
[2]
4291 db_nsr_update
["detailed-status"] = " ".join(stage
)
4292 self
._write
_op
_status
(nslcmop_id
, stage
)
4293 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4294 await asyncio
.sleep(5, loop
=self
.loop
)
4296 else: # delete_timeout <= 0:
4297 raise ROclient
.ROClientException(
4298 "Timeout waiting ns deleted from VIM"
4301 except Exception as e
:
4302 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4304 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4306 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4307 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4308 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4310 logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
)
4313 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4315 failed_detail
.append("delete conflict: {}".format(e
))
4318 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
)
4321 failed_detail
.append("delete error: {}".format(e
))
4323 logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
)
4327 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
4328 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
4330 stage
[2] = "Deleting nsd from RO."
4331 db_nsr_update
["detailed-status"] = " ".join(stage
)
4332 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4333 self
._write
_op
_status
(nslcmop_id
, stage
)
4334 await self
.RO
.delete("nsd", ro_nsd_id
)
4336 logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
)
4338 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4339 except Exception as e
:
4341 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4343 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4345 logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
)
4348 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4350 failed_detail
.append(
4351 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
)
4353 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4355 failed_detail
.append(
4356 "ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
)
4358 self
.logger
.error(logging_text
+ failed_detail
[-1])
4360 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
4361 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
4362 if not vnf_deployed
or not vnf_deployed
["id"]:
4365 ro_vnfd_id
= vnf_deployed
["id"]
4368 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4369 vnf_deployed
["member-vnf-index"], ro_vnfd_id
4371 db_nsr_update
["detailed-status"] = " ".join(stage
)
4372 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4373 self
._write
_op
_status
(nslcmop_id
, stage
)
4374 await self
.RO
.delete("vnfd", ro_vnfd_id
)
4376 logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
)
4378 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
4379 except Exception as e
:
4381 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4384 "_admin.deployed.RO.vnfd.{}.id".format(index
)
4388 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
)
4391 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4393 failed_detail
.append(
4394 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
)
4396 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4398 failed_detail
.append(
4399 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
)
4401 self
.logger
.error(logging_text
+ failed_detail
[-1])
4404 stage
[2] = "Error deleting from VIM"
4406 stage
[2] = "Deleted from VIM"
4407 db_nsr_update
["detailed-status"] = " ".join(stage
)
4408 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4409 self
._write
_op
_status
(nslcmop_id
, stage
)
4412 raise LcmException("; ".join(failed_detail
))
4414 async def terminate(self
, nsr_id
, nslcmop_id
):
4415 # Try to lock HA task here
4416 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4417 if not task_is_locked_by_me
:
4420 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4421 self
.logger
.debug(logging_text
+ "Enter")
4422 timeout_ns_terminate
= self
.timeout_ns_terminate
4425 operation_params
= None
4427 error_list
= [] # annotates all failed error messages
4428 db_nslcmop_update
= {}
4429 autoremove
= False # autoremove after terminated
4430 tasks_dict_info
= {}
4433 "Stage 1/3: Preparing task.",
4434 "Waiting for previous operations to terminate.",
4437 # ^ contains [stage, step, VIM-status]
4439 # wait for any previous tasks in process
4440 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4442 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4443 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4444 operation_params
= db_nslcmop
.get("operationParams") or {}
4445 if operation_params
.get("timeout_ns_terminate"):
4446 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4447 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4448 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4450 db_nsr_update
["operational-status"] = "terminating"
4451 db_nsr_update
["config-status"] = "terminating"
4452 self
._write
_ns
_status
(
4454 ns_state
="TERMINATING",
4455 current_operation
="TERMINATING",
4456 current_operation_id
=nslcmop_id
,
4457 other_update
=db_nsr_update
,
4459 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4460 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4461 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4464 stage
[1] = "Getting vnf descriptors from db."
4465 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4467 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4469 db_vnfds_from_id
= {}
4470 db_vnfds_from_member_index
= {}
4472 for vnfr
in db_vnfrs_list
:
4473 vnfd_id
= vnfr
["vnfd-id"]
4474 if vnfd_id
not in db_vnfds_from_id
:
4475 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4476 db_vnfds_from_id
[vnfd_id
] = vnfd
4477 db_vnfds_from_member_index
[
4478 vnfr
["member-vnf-index-ref"]
4479 ] = db_vnfds_from_id
[vnfd_id
]
4481 # Destroy individual execution environments when there are terminating primitives.
4482 # Rest of EE will be deleted at once
4483 # TODO - check before calling _destroy_N2VC
4484 # if not operation_params.get("skip_terminate_primitives"):#
4485 # or not vca.get("needed_terminate"):
4486 stage
[0] = "Stage 2/3 execute terminating primitives."
4487 self
.logger
.debug(logging_text
+ stage
[0])
4488 stage
[1] = "Looking execution environment that needs terminate."
4489 self
.logger
.debug(logging_text
+ stage
[1])
4491 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4492 config_descriptor
= None
4493 vca_member_vnf_index
= vca
.get("member-vnf-index")
4494 vca_id
= self
.get_vca_id(
4495 db_vnfrs_dict
.get(vca_member_vnf_index
)
4496 if vca_member_vnf_index
4500 if not vca
or not vca
.get("ee_id"):
4502 if not vca
.get("member-vnf-index"):
4504 config_descriptor
= db_nsr
.get("ns-configuration")
4505 elif vca
.get("vdu_id"):
4506 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4507 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4508 elif vca
.get("kdu_name"):
4509 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4510 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4512 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4513 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4514 vca_type
= vca
.get("type")
4515 exec_terminate_primitives
= not operation_params
.get(
4516 "skip_terminate_primitives"
4517 ) and vca
.get("needed_terminate")
4518 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4519 # pending native charms
4521 True if vca_type
in ("helm", "helm-v3", "native_charm") else False
4523 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4524 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4525 task
= asyncio
.ensure_future(
4533 exec_terminate_primitives
,
4537 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4539 # wait for pending tasks of terminate primitives
4543 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4545 error_list
= await self
._wait
_for
_tasks
(
4548 min(self
.timeout_charm_delete
, timeout_ns_terminate
),
4552 tasks_dict_info
.clear()
4554 return # raise LcmException("; ".join(error_list))
4556 # remove All execution environments at once
4557 stage
[0] = "Stage 3/3 delete all."
4559 if nsr_deployed
.get("VCA"):
4560 stage
[1] = "Deleting all execution environments."
4561 self
.logger
.debug(logging_text
+ stage
[1])
4562 vca_id
= self
.get_vca_id({}, db_nsr
)
4563 task_delete_ee
= asyncio
.ensure_future(
4565 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4566 timeout
=self
.timeout_charm_delete
,
4569 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4570 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4572 # Delete from k8scluster
4573 stage
[1] = "Deleting KDUs."
4574 self
.logger
.debug(logging_text
+ stage
[1])
4575 # print(nsr_deployed)
4576 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4577 if not kdu
or not kdu
.get("kdu-instance"):
4579 kdu_instance
= kdu
.get("kdu-instance")
4580 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4581 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4582 vca_id
= self
.get_vca_id({}, db_nsr
)
4583 task_delete_kdu_instance
= asyncio
.ensure_future(
4584 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4585 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4586 kdu_instance
=kdu_instance
,
4588 namespace
=kdu
.get("namespace"),
4594 + "Unknown k8s deployment type {}".format(
4595 kdu
.get("k8scluster-type")
4600 task_delete_kdu_instance
4601 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4604 stage
[1] = "Deleting ns from VIM."
4606 task_delete_ro
= asyncio
.ensure_future(
4607 self
._terminate
_ng
_ro
(
4608 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4612 task_delete_ro
= asyncio
.ensure_future(
4614 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4617 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4619 # rest of staff will be done at finally
4622 ROclient
.ROClientException
,
4627 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4629 except asyncio
.CancelledError
:
4631 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4633 exc
= "Operation was cancelled"
4634 except Exception as e
:
4635 exc
= traceback
.format_exc()
4636 self
.logger
.critical(
4637 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4642 error_list
.append(str(exc
))
4644 # wait for pending tasks
4646 stage
[1] = "Waiting for terminate pending tasks."
4647 self
.logger
.debug(logging_text
+ stage
[1])
4648 error_list
+= await self
._wait
_for
_tasks
(
4651 timeout_ns_terminate
,
4655 stage
[1] = stage
[2] = ""
4656 except asyncio
.CancelledError
:
4657 error_list
.append("Cancelled")
4658 # TODO cancell all tasks
4659 except Exception as exc
:
4660 error_list
.append(str(exc
))
4661 # update status at database
4663 error_detail
= "; ".join(error_list
)
4664 # self.logger.error(logging_text + error_detail)
4665 error_description_nslcmop
= "{} Detail: {}".format(
4666 stage
[0], error_detail
4668 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4669 nslcmop_id
, stage
[0]
4672 db_nsr_update
["operational-status"] = "failed"
4673 db_nsr_update
["detailed-status"] = (
4674 error_description_nsr
+ " Detail: " + error_detail
4676 db_nslcmop_update
["detailed-status"] = error_detail
4677 nslcmop_operation_state
= "FAILED"
4681 error_description_nsr
= error_description_nslcmop
= None
4682 ns_state
= "NOT_INSTANTIATED"
4683 db_nsr_update
["operational-status"] = "terminated"
4684 db_nsr_update
["detailed-status"] = "Done"
4685 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4686 db_nslcmop_update
["detailed-status"] = "Done"
4687 nslcmop_operation_state
= "COMPLETED"
4690 self
._write
_ns
_status
(
4693 current_operation
="IDLE",
4694 current_operation_id
=None,
4695 error_description
=error_description_nsr
,
4696 error_detail
=error_detail
,
4697 other_update
=db_nsr_update
,
4699 self
._write
_op
_status
(
4702 error_message
=error_description_nslcmop
,
4703 operation_state
=nslcmop_operation_state
,
4704 other_update
=db_nslcmop_update
,
4706 if ns_state
== "NOT_INSTANTIATED":
4710 {"nsr-id-ref": nsr_id
},
4711 {"_admin.nsState": "NOT_INSTANTIATED"},
4713 except DbException
as e
:
4716 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4720 if operation_params
:
4721 autoremove
= operation_params
.get("autoremove", False)
4722 if nslcmop_operation_state
:
4724 await self
.msg
.aiowrite(
4729 "nslcmop_id": nslcmop_id
,
4730 "operationState": nslcmop_operation_state
,
4731 "autoremove": autoremove
,
4735 except Exception as e
:
4737 logging_text
+ "kafka_write notification Exception {}".format(e
)
4740 self
.logger
.debug(logging_text
+ "Exit")
4741 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4743 async def _wait_for_tasks(
4744 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4747 error_detail_list
= []
4749 pending_tasks
= list(created_tasks_info
.keys())
4750 num_tasks
= len(pending_tasks
)
4752 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4753 self
._write
_op
_status
(nslcmop_id
, stage
)
4754 while pending_tasks
:
4756 _timeout
= timeout
+ time_start
- time()
4757 done
, pending_tasks
= await asyncio
.wait(
4758 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4760 num_done
+= len(done
)
4761 if not done
: # Timeout
4762 for task
in pending_tasks
:
4763 new_error
= created_tasks_info
[task
] + ": Timeout"
4764 error_detail_list
.append(new_error
)
4765 error_list
.append(new_error
)
4768 if task
.cancelled():
4771 exc
= task
.exception()
4773 if isinstance(exc
, asyncio
.TimeoutError
):
4775 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4776 error_list
.append(created_tasks_info
[task
])
4777 error_detail_list
.append(new_error
)
4784 ROclient
.ROClientException
,
4790 self
.logger
.error(logging_text
+ new_error
)
4792 exc_traceback
= "".join(
4793 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4797 + created_tasks_info
[task
]
4803 logging_text
+ created_tasks_info
[task
] + ": Done"
4805 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4807 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4808 if nsr_id
: # update also nsr
4813 "errorDescription": "Error at: " + ", ".join(error_list
),
4814 "errorDetail": ". ".join(error_detail_list
),
4817 self
._write
_op
_status
(nslcmop_id
, stage
)
4818 return error_detail_list
4821 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4823 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4824 The default-value is used. If it is between < > it look for a value at instantiation_params
4825 :param primitive_desc: portion of VNFD/NSD that describes primitive
4826 :param params: Params provided by user
4827 :param instantiation_params: Instantiation params provided by user
4828 :return: a dictionary with the calculated params
4830 calculated_params
= {}
4831 for parameter
in primitive_desc
.get("parameter", ()):
4832 param_name
= parameter
["name"]
4833 if param_name
in params
:
4834 calculated_params
[param_name
] = params
[param_name
]
4835 elif "default-value" in parameter
or "value" in parameter
:
4836 if "value" in parameter
:
4837 calculated_params
[param_name
] = parameter
["value"]
4839 calculated_params
[param_name
] = parameter
["default-value"]
4841 isinstance(calculated_params
[param_name
], str)
4842 and calculated_params
[param_name
].startswith("<")
4843 and calculated_params
[param_name
].endswith(">")
4845 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4846 calculated_params
[param_name
] = instantiation_params
[
4847 calculated_params
[param_name
][1:-1]
4851 "Parameter {} needed to execute primitive {} not provided".format(
4852 calculated_params
[param_name
], primitive_desc
["name"]
4857 "Parameter {} needed to execute primitive {} not provided".format(
4858 param_name
, primitive_desc
["name"]
4862 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4863 calculated_params
[param_name
] = yaml
.safe_dump(
4864 calculated_params
[param_name
], default_flow_style
=True, width
=256
4866 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4868 ].startswith("!!yaml "):
4869 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4870 if parameter
.get("data-type") == "INTEGER":
4872 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4873 except ValueError: # error converting string to int
4875 "Parameter {} of primitive {} must be integer".format(
4876 param_name
, primitive_desc
["name"]
4879 elif parameter
.get("data-type") == "BOOLEAN":
4880 calculated_params
[param_name
] = not (
4881 (str(calculated_params
[param_name
])).lower() == "false"
4884 # add always ns_config_info if primitive name is config
4885 if primitive_desc
["name"] == "config":
4886 if "ns_config_info" in instantiation_params
:
4887 calculated_params
["ns_config_info"] = instantiation_params
[
4890 return calculated_params
4892 def _look_for_deployed_vca(
4899 ee_descriptor_id
=None,
4901 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4902 for vca
in deployed_vca
:
4905 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4908 vdu_count_index
is not None
4909 and vdu_count_index
!= vca
["vdu_count_index"]
4912 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4914 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4918 # vca_deployed not found
4920 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4921 " is not deployed".format(
4930 ee_id
= vca
.get("ee_id")
4932 "type", "lxc_proxy_charm"
4933 ) # default value for backward compatibility - proxy charm
4936 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4937 "execution environment".format(
4938 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
4941 return ee_id
, vca_type
4943 async def _ns_execute_primitive(
4949 retries_interval
=30,
4956 if primitive
== "config":
4957 primitive_params
= {"params": primitive_params
}
4959 vca_type
= vca_type
or "lxc_proxy_charm"
4963 output
= await asyncio
.wait_for(
4964 self
.vca_map
[vca_type
].exec_primitive(
4966 primitive_name
=primitive
,
4967 params_dict
=primitive_params
,
4968 progress_timeout
=self
.timeout_progress_primitive
,
4969 total_timeout
=self
.timeout_primitive
,
4974 timeout
=timeout
or self
.timeout_primitive
,
4978 except asyncio
.CancelledError
:
4980 except Exception as e
: # asyncio.TimeoutError
4981 if isinstance(e
, asyncio
.TimeoutError
):
4986 "Error executing action {} on {} -> {}".format(
4991 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
4993 return "FAILED", str(e
)
4995 return "COMPLETED", output
4997 except (LcmException
, asyncio
.CancelledError
):
4999 except Exception as e
:
5000 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
5002 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
5004 Updating the vca_status with latest juju information in nsrs record
5005 :param: nsr_id: Id of the nsr
5006 :param: nslcmop_id: Id of the nslcmop
5010 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
5011 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5012 vca_id
= self
.get_vca_id({}, db_nsr
)
5013 if db_nsr
["_admin"]["deployed"]["K8s"]:
5014 for _
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
5015 cluster_uuid
, kdu_instance
, cluster_type
= (
5016 k8s
["k8scluster-uuid"],
5017 k8s
["kdu-instance"],
5018 k8s
["k8scluster-type"],
5020 await self
._on
_update
_k
8s
_db
(
5021 cluster_uuid
=cluster_uuid
,
5022 kdu_instance
=kdu_instance
,
5023 filter={"_id": nsr_id
},
5025 cluster_type
=cluster_type
,
5028 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
5029 table
, filter = "nsrs", {"_id": nsr_id
}
5030 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
5031 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
5033 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
5034 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
5036 async def action(self
, nsr_id
, nslcmop_id
):
5037 # Try to lock HA task here
5038 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5039 if not task_is_locked_by_me
:
5042 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
5043 self
.logger
.debug(logging_text
+ "Enter")
5044 # get all needed from database
5048 db_nslcmop_update
= {}
5049 nslcmop_operation_state
= None
5050 error_description_nslcmop
= None
5053 # wait for any previous tasks in process
5054 step
= "Waiting for previous operations to terminate"
5055 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5057 self
._write
_ns
_status
(
5060 current_operation
="RUNNING ACTION",
5061 current_operation_id
=nslcmop_id
,
5064 step
= "Getting information from database"
5065 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5066 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5067 if db_nslcmop
["operationParams"].get("primitive_params"):
5068 db_nslcmop
["operationParams"]["primitive_params"] = json
.loads(
5069 db_nslcmop
["operationParams"]["primitive_params"]
5072 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5073 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
5074 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
5075 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
5076 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
5077 primitive
= db_nslcmop
["operationParams"]["primitive"]
5078 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
5079 timeout_ns_action
= db_nslcmop
["operationParams"].get(
5080 "timeout_ns_action", self
.timeout_primitive
5084 step
= "Getting vnfr from database"
5085 db_vnfr
= self
.db
.get_one(
5086 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
5088 if db_vnfr
.get("kdur"):
5090 for kdur
in db_vnfr
["kdur"]:
5091 if kdur
.get("additionalParams"):
5092 kdur
["additionalParams"] = json
.loads(
5093 kdur
["additionalParams"]
5095 kdur_list
.append(kdur
)
5096 db_vnfr
["kdur"] = kdur_list
5097 step
= "Getting vnfd from database"
5098 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
5100 # Sync filesystem before running a primitive
5101 self
.fs
.sync(db_vnfr
["vnfd-id"])
5103 step
= "Getting nsd from database"
5104 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
5106 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5107 # for backward compatibility
5108 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5109 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5110 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5111 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5113 # look for primitive
5114 config_primitive_desc
= descriptor_configuration
= None
5116 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
5118 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
5120 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
5122 descriptor_configuration
= db_nsd
.get("ns-configuration")
5124 if descriptor_configuration
and descriptor_configuration
.get(
5127 for config_primitive
in descriptor_configuration
["config-primitive"]:
5128 if config_primitive
["name"] == primitive
:
5129 config_primitive_desc
= config_primitive
5132 if not config_primitive_desc
:
5133 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
5135 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
5139 primitive_name
= primitive
5140 ee_descriptor_id
= None
5142 primitive_name
= config_primitive_desc
.get(
5143 "execution-environment-primitive", primitive
5145 ee_descriptor_id
= config_primitive_desc
.get(
5146 "execution-environment-ref"
5152 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
5154 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
5157 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
5159 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
5161 desc_params
= parse_yaml_strings(
5162 db_vnfr
.get("additionalParamsForVnf")
5165 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
5166 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
5167 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
5169 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
5170 actions
.add(primitive
["name"])
5171 for primitive
in kdu_configuration
.get("config-primitive", []):
5172 actions
.add(primitive
["name"])
5174 nsr_deployed
["K8s"],
5175 lambda kdu
: kdu_name
== kdu
["kdu-name"]
5176 and kdu
["member-vnf-index"] == vnf_index
,
5180 if primitive_name
in actions
5181 and kdu
["k8scluster-type"] not in ("helm-chart", "helm-chart-v3")
5185 # TODO check if ns is in a proper status
5187 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
5189 # kdur and desc_params already set from before
5190 if primitive_params
:
5191 desc_params
.update(primitive_params
)
5192 # TODO Check if we will need something at vnf level
5193 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
5195 kdu_name
== kdu
["kdu-name"]
5196 and kdu
["member-vnf-index"] == vnf_index
5201 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
5204 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
5205 msg
= "unknown k8scluster-type '{}'".format(
5206 kdu
.get("k8scluster-type")
5208 raise LcmException(msg
)
5211 "collection": "nsrs",
5212 "filter": {"_id": nsr_id
},
5213 "path": "_admin.deployed.K8s.{}".format(index
),
5217 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
5219 step
= "Executing kdu {}".format(primitive_name
)
5220 if primitive_name
== "upgrade":
5221 if desc_params
.get("kdu_model"):
5222 kdu_model
= desc_params
.get("kdu_model")
5223 del desc_params
["kdu_model"]
5225 kdu_model
= kdu
.get("kdu-model")
5226 parts
= kdu_model
.split(sep
=":")
5228 kdu_model
= parts
[0]
5230 detailed_status
= await asyncio
.wait_for(
5231 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
5232 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5233 kdu_instance
=kdu
.get("kdu-instance"),
5235 kdu_model
=kdu_model
,
5238 timeout
=timeout_ns_action
,
5240 timeout
=timeout_ns_action
+ 10,
5243 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
5245 elif primitive_name
== "rollback":
5246 detailed_status
= await asyncio
.wait_for(
5247 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
5248 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5249 kdu_instance
=kdu
.get("kdu-instance"),
5252 timeout
=timeout_ns_action
,
5254 elif primitive_name
== "status":
5255 detailed_status
= await asyncio
.wait_for(
5256 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
5257 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5258 kdu_instance
=kdu
.get("kdu-instance"),
5261 timeout
=timeout_ns_action
,
5264 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
5265 kdu
["kdu-name"], nsr_id
5267 params
= self
._map
_primitive
_params
(
5268 config_primitive_desc
, primitive_params
, desc_params
5271 detailed_status
= await asyncio
.wait_for(
5272 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
5273 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5274 kdu_instance
=kdu_instance
,
5275 primitive_name
=primitive_name
,
5278 timeout
=timeout_ns_action
,
5281 timeout
=timeout_ns_action
,
5285 nslcmop_operation_state
= "COMPLETED"
5287 detailed_status
= ""
5288 nslcmop_operation_state
= "FAILED"
5290 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5291 nsr_deployed
["VCA"],
5292 member_vnf_index
=vnf_index
,
5294 vdu_count_index
=vdu_count_index
,
5295 ee_descriptor_id
=ee_descriptor_id
,
5297 for vca_index
, vca_deployed
in enumerate(
5298 db_nsr
["_admin"]["deployed"]["VCA"]
5300 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5302 "collection": "nsrs",
5303 "filter": {"_id": nsr_id
},
5304 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
5308 nslcmop_operation_state
,
5310 ) = await self
._ns
_execute
_primitive
(
5312 primitive
=primitive_name
,
5313 primitive_params
=self
._map
_primitive
_params
(
5314 config_primitive_desc
, primitive_params
, desc_params
5316 timeout
=timeout_ns_action
,
5322 db_nslcmop_update
["detailed-status"] = detailed_status
5323 error_description_nslcmop
= (
5324 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
5328 + " task Done with result {} {}".format(
5329 nslcmop_operation_state
, detailed_status
5332 return # database update is called inside finally
5334 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5335 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5337 except asyncio
.CancelledError
:
5339 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5341 exc
= "Operation was cancelled"
5342 except asyncio
.TimeoutError
:
5343 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5345 except Exception as e
:
5346 exc
= traceback
.format_exc()
5347 self
.logger
.critical(
5348 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5357 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5358 nslcmop_operation_state
= "FAILED"
5360 self
._write
_ns
_status
(
5364 ], # TODO check if degraded. For the moment use previous status
5365 current_operation
="IDLE",
5366 current_operation_id
=None,
5367 # error_description=error_description_nsr,
5368 # error_detail=error_detail,
5369 other_update
=db_nsr_update
,
5372 self
._write
_op
_status
(
5375 error_message
=error_description_nslcmop
,
5376 operation_state
=nslcmop_operation_state
,
5377 other_update
=db_nslcmop_update
,
5380 if nslcmop_operation_state
:
5382 await self
.msg
.aiowrite(
5387 "nslcmop_id": nslcmop_id
,
5388 "operationState": nslcmop_operation_state
,
5392 except Exception as e
:
5394 logging_text
+ "kafka_write notification Exception {}".format(e
)
5396 self
.logger
.debug(logging_text
+ "Exit")
5397 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5398 return nslcmop_operation_state
, detailed_status
5400 async def terminate_vdus(
5401 self
, db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
5403 """This method terminates VDUs
5406 db_vnfr: VNF instance record
5407 member_vnf_index: VNF index to identify the VDUs to be removed
5408 db_nsr: NS instance record
5409 update_db_nslcmops: Nslcmop update record
5411 vca_scaling_info
= []
5412 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5413 scaling_info
["scaling_direction"] = "IN"
5414 scaling_info
["vdu-delete"] = {}
5415 scaling_info
["kdu-delete"] = {}
5416 db_vdur
= db_vnfr
.get("vdur")
5417 vdur_list
= copy(db_vdur
)
5419 for index
, vdu
in enumerate(vdur_list
):
5420 vca_scaling_info
.append(
5422 "osm_vdu_id": vdu
["vdu-id-ref"],
5423 "member-vnf-index": member_vnf_index
,
5425 "vdu_index": count_index
,
5427 scaling_info
["vdu-delete"][vdu
["vdu-id-ref"]] = count_index
5428 scaling_info
["vdu"].append(
5430 "name": vdu
.get("name") or vdu
.get("vdu-name"),
5431 "vdu_id": vdu
["vdu-id-ref"],
5434 for interface
in vdu
["interfaces"]:
5435 scaling_info
["vdu"][index
]["interface"].append(
5437 "name": interface
["name"],
5438 "ip_address": interface
["ip-address"],
5439 "mac_address": interface
.get("mac-address"),
5441 self
.logger
.info("NS update scaling info{}".format(scaling_info
))
5442 stage
[2] = "Terminating VDUs"
5443 if scaling_info
.get("vdu-delete"):
5444 # scale_process = "RO"
5445 if self
.ro_config
.get("ng"):
5446 await self
._scale
_ng
_ro
(
5447 logging_text
, db_nsr
, update_db_nslcmops
, db_vnfr
, scaling_info
, stage
5450 async def remove_vnf(
5451 self
, nsr_id
, nslcmop_id
, vnf_instance_id
5453 """This method is to Remove VNF instances from NS.
5456 nsr_id: NS instance id
5457 nslcmop_id: nslcmop id of update
5458 vnf_instance_id: id of the VNF instance to be removed
5461 result: (str, str) COMPLETED/FAILED, details
5465 logging_text
= "Task ns={} update ".format(nsr_id
)
5466 check_vnfr_count
= len(self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}))
5467 self
.logger
.info("check_vnfr_count {}".format(check_vnfr_count
))
5468 if check_vnfr_count
> 1:
5469 stage
= ["", "", ""]
5470 step
= "Getting nslcmop from database"
5471 self
.logger
.debug(step
+ " after having waited for previous tasks to be completed")
5472 # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5473 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5474 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5475 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5476 """ db_vnfr = self.db.get_one(
5477 "vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id}) """
5479 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5480 await self
.terminate_vdus(db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
)
5482 constituent_vnfr
= db_nsr
.get("constituent-vnfr-ref")
5483 constituent_vnfr
.remove(db_vnfr
.get("_id"))
5484 db_nsr_update
["constituent-vnfr-ref"] = db_nsr
.get("constituent-vnfr-ref")
5485 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5486 self
.db
.del_one("vnfrs", {"_id": db_vnfr
.get("_id")})
5487 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5488 return "COMPLETED", "Done"
5490 step
= "Terminate VNF Failed with"
5491 raise LcmException("{} Cannot terminate the last VNF in this NS.".format(
5493 except (LcmException
, asyncio
.CancelledError
):
5495 except Exception as e
:
5496 self
.logger
.debug("Error removing VNF {}".format(e
))
5497 return "FAILED", "Error removing VNF {}".format(e
)
5499 async def _ns_redeploy_vnf(
5500 self
, nsr_id
, nslcmop_id
, db_vnfd
, db_vnfr
, db_nsr
,
5502 """This method updates and redeploys VNF instances
5505 nsr_id: NS instance id
5506 nslcmop_id: nslcmop id
5507 db_vnfd: VNF descriptor
5508 db_vnfr: VNF instance record
5509 db_nsr: NS instance record
5512 result: (str, str) COMPLETED/FAILED, details
5516 stage
= ["", "", ""]
5517 logging_text
= "Task ns={} update ".format(nsr_id
)
5518 latest_vnfd_revision
= db_vnfd
["_admin"].get("revision")
5519 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5521 # Terminate old VNF resources
5522 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5523 await self
.terminate_vdus(db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
)
5525 # old_vnfd_id = db_vnfr["vnfd-id"]
5526 # new_db_vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
5527 new_db_vnfd
= db_vnfd
5528 # new_vnfd_ref = new_db_vnfd["id"]
5529 # new_vnfd_id = vnfd_id
5533 for cp
in new_db_vnfd
.get("ext-cpd", ()):
5535 "name": cp
.get("id"),
5536 "connection-point-id": cp
.get("int-cpd", {}).get("cpd"),
5537 "connection-point-vdu-id": cp
.get("int-cpd", {}).get("vdu-id"),
5540 new_vnfr_cp
.append(vnf_cp
)
5541 new_vdur
= update_db_nslcmops
["operationParams"]["newVdur"]
5542 # new_vdur = self._create_vdur_descriptor_from_vnfd(db_nsd, db_vnfd, old_db_vnfd, vnfd_id, db_nsr, member_vnf_index)
5543 # new_vnfr_update = {"vnfd-ref": new_vnfd_ref, "vnfd-id": new_vnfd_id, "connection-point": new_vnfr_cp, "vdur": new_vdur, "ip-address": ""}
5544 new_vnfr_update
= {"revision": latest_vnfd_revision
, "connection-point": new_vnfr_cp
, "vdur": new_vdur
, "ip-address": ""}
5545 self
.update_db_2("vnfrs", db_vnfr
["_id"], new_vnfr_update
)
5546 updated_db_vnfr
= self
.db
.get_one(
5547 "vnfrs", {"member-vnf-index-ref": member_vnf_index
, "nsr-id-ref": nsr_id
}
5550 # Instantiate new VNF resources
5551 # update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5552 vca_scaling_info
= []
5553 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5554 scaling_info
["scaling_direction"] = "OUT"
5555 scaling_info
["vdu-create"] = {}
5556 scaling_info
["kdu-create"] = {}
5557 vdud_instantiate_list
= db_vnfd
["vdu"]
5558 for index
, vdud
in enumerate(vdud_instantiate_list
):
5559 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
5563 additional_params
= (
5564 self
._get
_vdu
_additional
_params
(updated_db_vnfr
, vdud
["id"])
5567 cloud_init_list
= []
5569 # TODO Information of its own ip is not available because db_vnfr is not updated.
5570 additional_params
["OSM"] = get_osm_params(
5571 updated_db_vnfr
, vdud
["id"], 1
5573 cloud_init_list
.append(
5574 self
._parse
_cloud
_init
(
5581 vca_scaling_info
.append(
5583 "osm_vdu_id": vdud
["id"],
5584 "member-vnf-index": member_vnf_index
,
5586 "vdu_index": count_index
,
5589 scaling_info
["vdu-create"][vdud
["id"]] = count_index
5590 if self
.ro_config
.get("ng"):
5592 "New Resources to be deployed: {}".format(scaling_info
))
5593 await self
._scale
_ng
_ro
(
5594 logging_text
, db_nsr
, update_db_nslcmops
, updated_db_vnfr
, scaling_info
, stage
5596 return "COMPLETED", "Done"
5597 except (LcmException
, asyncio
.CancelledError
):
5599 except Exception as e
:
5600 self
.logger
.debug("Error updating VNF {}".format(e
))
5601 return "FAILED", "Error updating VNF {}".format(e
)
5603 async def _ns_charm_upgrade(
5609 timeout
: float = None,
5611 """This method upgrade charms in VNF instances
5614 ee_id: Execution environment id
5615 path: Local path to the charm
5617 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
5618 timeout: (Float) Timeout for the ns update operation
5621 result: (str, str) COMPLETED/FAILED, details
5624 charm_type
= charm_type
or "lxc_proxy_charm"
5625 output
= await self
.vca_map
[charm_type
].upgrade_charm(
5629 charm_type
=charm_type
,
5630 timeout
=timeout
or self
.timeout_ns_update
,
5634 return "COMPLETED", output
5636 except (LcmException
, asyncio
.CancelledError
):
5639 except Exception as e
:
5641 self
.logger
.debug("Error upgrading charm {}".format(path
))
5643 return "FAILED", "Error upgrading charm {}: {}".format(path
, e
)
5645 async def update(self
, nsr_id
, nslcmop_id
):
5646 """Update NS according to different update types
5648 This method performs upgrade of VNF instances then updates the revision
5649 number in VNF record
5652 nsr_id: Network service will be updated
5653 nslcmop_id: ns lcm operation id
5656 It may raise DbException, LcmException, N2VCException, K8sException
5659 # Try to lock HA task here
5660 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5661 if not task_is_locked_by_me
:
5664 logging_text
= "Task ns={} update={} ".format(nsr_id
, nslcmop_id
)
5665 self
.logger
.debug(logging_text
+ "Enter")
5667 # Set the required variables to be filled up later
5669 db_nslcmop_update
= {}
5671 nslcmop_operation_state
= None
5673 error_description_nslcmop
= ""
5675 change_type
= "updated"
5676 detailed_status
= ""
5679 # wait for any previous tasks in process
5680 step
= "Waiting for previous operations to terminate"
5681 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5682 self
._write
_ns
_status
(
5685 current_operation
="UPDATING",
5686 current_operation_id
=nslcmop_id
,
5689 step
= "Getting nslcmop from database"
5690 db_nslcmop
= self
.db
.get_one(
5691 "nslcmops", {"_id": nslcmop_id
}, fail_on_empty
=False
5693 update_type
= db_nslcmop
["operationParams"]["updateType"]
5695 step
= "Getting nsr from database"
5696 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5697 old_operational_status
= db_nsr
["operational-status"]
5698 db_nsr_update
["operational-status"] = "updating"
5699 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5700 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5702 if update_type
== "CHANGE_VNFPKG":
5704 # Get the input parameters given through update request
5705 vnf_instance_id
= db_nslcmop
["operationParams"][
5706 "changeVnfPackageData"
5707 ].get("vnfInstanceId")
5709 vnfd_id
= db_nslcmop
["operationParams"]["changeVnfPackageData"].get(
5712 timeout_seconds
= db_nslcmop
["operationParams"].get("timeout_ns_update")
5714 step
= "Getting vnfr from database"
5715 db_vnfr
= self
.db
.get_one(
5716 "vnfrs", {"_id": vnf_instance_id
}, fail_on_empty
=False
5719 step
= "Getting vnfds from database"
5721 latest_vnfd
= self
.db
.get_one(
5722 "vnfds", {"_id": vnfd_id
}, fail_on_empty
=False
5724 latest_vnfd_revision
= latest_vnfd
["_admin"].get("revision")
5727 current_vnf_revision
= db_vnfr
.get("revision", 1)
5728 current_vnfd
= self
.db
.get_one(
5730 {"_id": vnfd_id
+ ":" + str(current_vnf_revision
)},
5731 fail_on_empty
=False,
5733 # Charm artifact paths will be filled up later
5735 current_charm_artifact_path
,
5736 target_charm_artifact_path
,
5737 charm_artifact_paths
,
5740 step
= "Checking if revision has changed in VNFD"
5741 if current_vnf_revision
!= latest_vnfd_revision
:
5743 change_type
= "policy_updated"
5745 # There is new revision of VNFD, update operation is required
5746 current_vnfd_path
= vnfd_id
+ ":" + str(current_vnf_revision
)
5747 latest_vnfd_path
= vnfd_id
+ ":" + str(latest_vnfd_revision
)
5749 step
= "Removing the VNFD packages if they exist in the local path"
5750 shutil
.rmtree(self
.fs
.path
+ current_vnfd_path
, ignore_errors
=True)
5751 shutil
.rmtree(self
.fs
.path
+ latest_vnfd_path
, ignore_errors
=True)
5753 step
= "Get the VNFD packages from FSMongo"
5754 self
.fs
.sync(from_path
=latest_vnfd_path
)
5755 self
.fs
.sync(from_path
=current_vnfd_path
)
5758 "Get the charm-type, charm-id, ee-id if there is deployed VCA"
5760 base_folder
= latest_vnfd
["_admin"]["storage"]
5762 for charm_index
, charm_deployed
in enumerate(
5763 get_iterable(nsr_deployed
, "VCA")
5765 vnf_index
= db_vnfr
.get("member-vnf-index-ref")
5767 # Getting charm-id and charm-type
5768 if charm_deployed
.get("member-vnf-index") == vnf_index
:
5769 charm_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5770 charm_type
= charm_deployed
.get("type")
5773 ee_id
= charm_deployed
.get("ee_id")
5775 step
= "Getting descriptor config"
5776 descriptor_config
= get_configuration(
5777 current_vnfd
, current_vnfd
["id"]
5780 if "execution-environment-list" in descriptor_config
:
5781 ee_list
= descriptor_config
.get(
5782 "execution-environment-list", []
5787 # There could be several charm used in the same VNF
5788 for ee_item
in ee_list
:
5789 if ee_item
.get("juju"):
5791 step
= "Getting charm name"
5792 charm_name
= ee_item
["juju"].get("charm")
5794 step
= "Setting Charm artifact paths"
5795 current_charm_artifact_path
.append(
5796 get_charm_artifact_path(
5800 current_vnf_revision
,
5803 target_charm_artifact_path
.append(
5804 get_charm_artifact_path(
5808 latest_vnfd_revision
,
5812 charm_artifact_paths
= zip(
5813 current_charm_artifact_path
, target_charm_artifact_path
5816 step
= "Checking if software version has changed in VNFD"
5817 if find_software_version(current_vnfd
) != find_software_version(
5821 step
= "Checking if existing VNF has charm"
5822 for current_charm_path
, target_charm_path
in list(
5823 charm_artifact_paths
5825 if current_charm_path
:
5827 "Software version change is not supported as VNF instance {} has charm.".format(
5832 # There is no change in the charm package, then redeploy the VNF
5833 # based on new descriptor
5834 step
= "Redeploying VNF"
5835 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5839 ) = await self
._ns
_redeploy
_vnf
(
5846 if result
== "FAILED":
5847 nslcmop_operation_state
= result
5848 error_description_nslcmop
= detailed_status
5849 db_nslcmop_update
["detailed-status"] = detailed_status
5852 + " step {} Done with result {} {}".format(
5853 step
, nslcmop_operation_state
, detailed_status
5858 step
= "Checking if any charm package has changed or not"
5859 for current_charm_path
, target_charm_path
in list(
5860 charm_artifact_paths
5864 and target_charm_path
5865 and self
.check_charm_hash_changed(
5866 current_charm_path
, target_charm_path
5870 step
= "Checking whether VNF uses juju bundle"
5871 if check_juju_bundle_existence(current_vnfd
):
5874 "Charm upgrade is not supported for the instance which"
5875 " uses juju-bundle: {}".format(
5876 check_juju_bundle_existence(current_vnfd
)
5880 step
= "Upgrading Charm"
5884 ) = await self
._ns
_charm
_upgrade
(
5887 charm_type
=charm_type
,
5888 path
=self
.fs
.path
+ target_charm_path
,
5889 timeout
=timeout_seconds
,
5892 if result
== "FAILED":
5893 nslcmop_operation_state
= result
5894 error_description_nslcmop
= detailed_status
5896 db_nslcmop_update
["detailed-status"] = detailed_status
5899 + " step {} Done with result {} {}".format(
5900 step
, nslcmop_operation_state
, detailed_status
5904 step
= "Updating policies"
5905 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5906 result
= "COMPLETED"
5907 detailed_status
= "Done"
5908 db_nslcmop_update
["detailed-status"] = "Done"
5910 # If nslcmop_operation_state is None, so any operation is not failed.
5911 if not nslcmop_operation_state
:
5912 nslcmop_operation_state
= "COMPLETED"
5914 # If update CHANGE_VNFPKG nslcmop_operation is successful
5915 # vnf revision need to be updated
5916 vnfr_update
["revision"] = latest_vnfd_revision
5917 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
5921 + " task Done with result {} {}".format(
5922 nslcmop_operation_state
, detailed_status
5925 elif update_type
== "REMOVE_VNF":
5926 # This part is included in https://osm.etsi.org/gerrit/11876
5927 vnf_instance_id
= db_nslcmop
["operationParams"]["removeVnfInstanceId"]
5928 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5929 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5930 step
= "Removing VNF"
5931 (result
, detailed_status
) = await self
.remove_vnf(nsr_id
, nslcmop_id
, vnf_instance_id
)
5932 if result
== "FAILED":
5933 nslcmop_operation_state
= result
5934 error_description_nslcmop
= detailed_status
5935 db_nslcmop_update
["detailed-status"] = detailed_status
5936 change_type
= "vnf_terminated"
5937 if not nslcmop_operation_state
:
5938 nslcmop_operation_state
= "COMPLETED"
5941 + " task Done with result {} {}".format(
5942 nslcmop_operation_state
, detailed_status
5946 elif update_type
== "OPERATE_VNF":
5947 vnf_id
= db_nslcmop
["operationParams"]["operateVnfData"]["vnfInstanceId"]
5948 operation_type
= db_nslcmop
["operationParams"]["operateVnfData"]["changeStateTo"]
5949 additional_param
= db_nslcmop
["operationParams"]["operateVnfData"]["additionalParam"]
5950 (result
, detailed_status
) = await self
.rebuild_start_stop(
5951 nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
5953 if result
== "FAILED":
5954 nslcmop_operation_state
= result
5955 error_description_nslcmop
= detailed_status
5956 db_nslcmop_update
["detailed-status"] = detailed_status
5957 if not nslcmop_operation_state
:
5958 nslcmop_operation_state
= "COMPLETED"
5961 + " task Done with result {} {}".format(
5962 nslcmop_operation_state
, detailed_status
5966 # If nslcmop_operation_state is None, so any operation is not failed.
5967 # All operations are executed in overall.
5968 if not nslcmop_operation_state
:
5969 nslcmop_operation_state
= "COMPLETED"
5970 db_nsr_update
["operational-status"] = old_operational_status
5972 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5973 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5975 except asyncio
.CancelledError
:
5977 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5979 exc
= "Operation was cancelled"
5980 except asyncio
.TimeoutError
:
5981 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5983 except Exception as e
:
5984 exc
= traceback
.format_exc()
5985 self
.logger
.critical(
5986 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5995 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5996 nslcmop_operation_state
= "FAILED"
5997 db_nsr_update
["operational-status"] = old_operational_status
5999 self
._write
_ns
_status
(
6001 ns_state
=db_nsr
["nsState"],
6002 current_operation
="IDLE",
6003 current_operation_id
=None,
6004 other_update
=db_nsr_update
,
6007 self
._write
_op
_status
(
6010 error_message
=error_description_nslcmop
,
6011 operation_state
=nslcmop_operation_state
,
6012 other_update
=db_nslcmop_update
,
6015 if nslcmop_operation_state
:
6019 "nslcmop_id": nslcmop_id
,
6020 "operationState": nslcmop_operation_state
,
6022 if change_type
in ("vnf_terminated", "policy_updated"):
6023 msg
.update({"vnf_member_index": member_vnf_index
})
6024 await self
.msg
.aiowrite("ns", change_type
, msg
, loop
=self
.loop
)
6025 except Exception as e
:
6027 logging_text
+ "kafka_write notification Exception {}".format(e
)
6029 self
.logger
.debug(logging_text
+ "Exit")
6030 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_update")
6031 return nslcmop_operation_state
, detailed_status
6033 async def scale(self
, nsr_id
, nslcmop_id
):
6034 # Try to lock HA task here
6035 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
6036 if not task_is_locked_by_me
:
6039 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
6040 stage
= ["", "", ""]
6041 tasks_dict_info
= {}
6042 # ^ stage, step, VIM progress
6043 self
.logger
.debug(logging_text
+ "Enter")
6044 # get all needed from database
6046 db_nslcmop_update
= {}
6049 # in case of error, indicates what part of scale was failed to put nsr at error status
6050 scale_process
= None
6051 old_operational_status
= ""
6052 old_config_status
= ""
6055 # wait for any previous tasks in process
6056 step
= "Waiting for previous operations to terminate"
6057 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
6058 self
._write
_ns
_status
(
6061 current_operation
="SCALING",
6062 current_operation_id
=nslcmop_id
,
6065 step
= "Getting nslcmop from database"
6067 step
+ " after having waited for previous tasks to be completed"
6069 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
6071 step
= "Getting nsr from database"
6072 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
6073 old_operational_status
= db_nsr
["operational-status"]
6074 old_config_status
= db_nsr
["config-status"]
6076 step
= "Parsing scaling parameters"
6077 db_nsr_update
["operational-status"] = "scaling"
6078 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6079 nsr_deployed
= db_nsr
["_admin"].get("deployed")
6081 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
6083 ]["member-vnf-index"]
6084 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
6086 ]["scaling-group-descriptor"]
6087 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
6088 # for backward compatibility
6089 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
6090 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
6091 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
6092 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6094 step
= "Getting vnfr from database"
6095 db_vnfr
= self
.db
.get_one(
6096 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
6099 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
6101 step
= "Getting vnfd from database"
6102 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
6104 base_folder
= db_vnfd
["_admin"]["storage"]
6106 step
= "Getting scaling-group-descriptor"
6107 scaling_descriptor
= find_in_list(
6108 get_scaling_aspect(db_vnfd
),
6109 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
6111 if not scaling_descriptor
:
6113 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
6114 "at vnfd:scaling-group-descriptor".format(scaling_group
)
6117 step
= "Sending scale order to VIM"
6118 # TODO check if ns is in a proper status
6120 if not db_nsr
["_admin"].get("scaling-group"):
6125 "_admin.scaling-group": [
6126 {"name": scaling_group
, "nb-scale-op": 0}
6130 admin_scale_index
= 0
6132 for admin_scale_index
, admin_scale_info
in enumerate(
6133 db_nsr
["_admin"]["scaling-group"]
6135 if admin_scale_info
["name"] == scaling_group
:
6136 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
6138 else: # not found, set index one plus last element and add new entry with the name
6139 admin_scale_index
+= 1
6141 "_admin.scaling-group.{}.name".format(admin_scale_index
)
6144 vca_scaling_info
= []
6145 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
6146 if scaling_type
== "SCALE_OUT":
6147 if "aspect-delta-details" not in scaling_descriptor
:
6149 "Aspect delta details not fount in scaling descriptor {}".format(
6150 scaling_descriptor
["name"]
6153 # count if max-instance-count is reached
6154 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6156 scaling_info
["scaling_direction"] = "OUT"
6157 scaling_info
["vdu-create"] = {}
6158 scaling_info
["kdu-create"] = {}
6159 for delta
in deltas
:
6160 for vdu_delta
in delta
.get("vdu-delta", {}):
6161 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
6162 # vdu_index also provides the number of instance of the targeted vdu
6163 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6164 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
6168 additional_params
= (
6169 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
6172 cloud_init_list
= []
6174 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6175 max_instance_count
= 10
6176 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
6177 max_instance_count
= vdu_profile
.get(
6178 "max-number-of-instances", 10
6181 default_instance_num
= get_number_of_instances(
6184 instances_number
= vdu_delta
.get("number-of-instances", 1)
6185 nb_scale_op
+= instances_number
6187 new_instance_count
= nb_scale_op
+ default_instance_num
6188 # Control if new count is over max and vdu count is less than max.
6189 # Then assign new instance count
6190 if new_instance_count
> max_instance_count
> vdu_count
:
6191 instances_number
= new_instance_count
- max_instance_count
6193 instances_number
= instances_number
6195 if new_instance_count
> max_instance_count
:
6197 "reached the limit of {} (max-instance-count) "
6198 "scaling-out operations for the "
6199 "scaling-group-descriptor '{}'".format(
6200 nb_scale_op
, scaling_group
6203 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6205 # TODO Information of its own ip is not available because db_vnfr is not updated.
6206 additional_params
["OSM"] = get_osm_params(
6207 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
6209 cloud_init_list
.append(
6210 self
._parse
_cloud
_init
(
6217 vca_scaling_info
.append(
6219 "osm_vdu_id": vdu_delta
["id"],
6220 "member-vnf-index": vnf_index
,
6222 "vdu_index": vdu_index
+ x
,
6225 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
6226 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6227 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6228 kdu_name
= kdu_profile
["kdu-name"]
6229 resource_name
= kdu_profile
.get("resource-name", "")
6231 # Might have different kdus in the same delta
6232 # Should have list for each kdu
6233 if not scaling_info
["kdu-create"].get(kdu_name
, None):
6234 scaling_info
["kdu-create"][kdu_name
] = []
6236 kdur
= get_kdur(db_vnfr
, kdu_name
)
6237 if kdur
.get("helm-chart"):
6238 k8s_cluster_type
= "helm-chart-v3"
6239 self
.logger
.debug("kdur: {}".format(kdur
))
6241 kdur
.get("helm-version")
6242 and kdur
.get("helm-version") == "v2"
6244 k8s_cluster_type
= "helm-chart"
6245 elif kdur
.get("juju-bundle"):
6246 k8s_cluster_type
= "juju-bundle"
6249 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6250 "juju-bundle. Maybe an old NBI version is running".format(
6251 db_vnfr
["member-vnf-index-ref"], kdu_name
6255 max_instance_count
= 10
6256 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
6257 max_instance_count
= kdu_profile
.get(
6258 "max-number-of-instances", 10
6261 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
6262 deployed_kdu
, _
= get_deployed_kdu(
6263 nsr_deployed
, kdu_name
, vnf_index
6265 if deployed_kdu
is None:
6267 "KDU '{}' for vnf '{}' not deployed".format(
6271 kdu_instance
= deployed_kdu
.get("kdu-instance")
6272 instance_num
= await self
.k8scluster_map
[
6278 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6279 kdu_model
=deployed_kdu
.get("kdu-model"),
6281 kdu_replica_count
= instance_num
+ kdu_delta
.get(
6282 "number-of-instances", 1
6285 # Control if new count is over max and instance_num is less than max.
6286 # Then assign max instance number to kdu replica count
6287 if kdu_replica_count
> max_instance_count
> instance_num
:
6288 kdu_replica_count
= max_instance_count
6289 if kdu_replica_count
> max_instance_count
:
6291 "reached the limit of {} (max-instance-count) "
6292 "scaling-out operations for the "
6293 "scaling-group-descriptor '{}'".format(
6294 instance_num
, scaling_group
6298 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6299 vca_scaling_info
.append(
6301 "osm_kdu_id": kdu_name
,
6302 "member-vnf-index": vnf_index
,
6304 "kdu_index": instance_num
+ x
- 1,
6307 scaling_info
["kdu-create"][kdu_name
].append(
6309 "member-vnf-index": vnf_index
,
6311 "k8s-cluster-type": k8s_cluster_type
,
6312 "resource-name": resource_name
,
6313 "scale": kdu_replica_count
,
6316 elif scaling_type
== "SCALE_IN":
6317 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6319 scaling_info
["scaling_direction"] = "IN"
6320 scaling_info
["vdu-delete"] = {}
6321 scaling_info
["kdu-delete"] = {}
6323 for delta
in deltas
:
6324 for vdu_delta
in delta
.get("vdu-delta", {}):
6325 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6326 min_instance_count
= 0
6327 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6328 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
6329 min_instance_count
= vdu_profile
["min-number-of-instances"]
6331 default_instance_num
= get_number_of_instances(
6332 db_vnfd
, vdu_delta
["id"]
6334 instance_num
= vdu_delta
.get("number-of-instances", 1)
6335 nb_scale_op
-= instance_num
6337 new_instance_count
= nb_scale_op
+ default_instance_num
6339 if new_instance_count
< min_instance_count
< vdu_count
:
6340 instances_number
= min_instance_count
- new_instance_count
6342 instances_number
= instance_num
6344 if new_instance_count
< min_instance_count
:
6346 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6347 "scaling-group-descriptor '{}'".format(
6348 nb_scale_op
, scaling_group
6351 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6352 vca_scaling_info
.append(
6354 "osm_vdu_id": vdu_delta
["id"],
6355 "member-vnf-index": vnf_index
,
6357 "vdu_index": vdu_index
- 1 - x
,
6360 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
6361 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6362 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6363 kdu_name
= kdu_profile
["kdu-name"]
6364 resource_name
= kdu_profile
.get("resource-name", "")
6366 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
6367 scaling_info
["kdu-delete"][kdu_name
] = []
6369 kdur
= get_kdur(db_vnfr
, kdu_name
)
6370 if kdur
.get("helm-chart"):
6371 k8s_cluster_type
= "helm-chart-v3"
6372 self
.logger
.debug("kdur: {}".format(kdur
))
6374 kdur
.get("helm-version")
6375 and kdur
.get("helm-version") == "v2"
6377 k8s_cluster_type
= "helm-chart"
6378 elif kdur
.get("juju-bundle"):
6379 k8s_cluster_type
= "juju-bundle"
6382 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6383 "juju-bundle. Maybe an old NBI version is running".format(
6384 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
6388 min_instance_count
= 0
6389 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
6390 min_instance_count
= kdu_profile
["min-number-of-instances"]
6392 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
6393 deployed_kdu
, _
= get_deployed_kdu(
6394 nsr_deployed
, kdu_name
, vnf_index
6396 if deployed_kdu
is None:
6398 "KDU '{}' for vnf '{}' not deployed".format(
6402 kdu_instance
= deployed_kdu
.get("kdu-instance")
6403 instance_num
= await self
.k8scluster_map
[
6409 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6410 kdu_model
=deployed_kdu
.get("kdu-model"),
6412 kdu_replica_count
= instance_num
- kdu_delta
.get(
6413 "number-of-instances", 1
6416 if kdu_replica_count
< min_instance_count
< instance_num
:
6417 kdu_replica_count
= min_instance_count
6418 if kdu_replica_count
< min_instance_count
:
6420 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6421 "scaling-group-descriptor '{}'".format(
6422 instance_num
, scaling_group
6426 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6427 vca_scaling_info
.append(
6429 "osm_kdu_id": kdu_name
,
6430 "member-vnf-index": vnf_index
,
6432 "kdu_index": instance_num
- x
- 1,
6435 scaling_info
["kdu-delete"][kdu_name
].append(
6437 "member-vnf-index": vnf_index
,
6439 "k8s-cluster-type": k8s_cluster_type
,
6440 "resource-name": resource_name
,
6441 "scale": kdu_replica_count
,
6445 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
6446 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
6447 if scaling_info
["scaling_direction"] == "IN":
6448 for vdur
in reversed(db_vnfr
["vdur"]):
6449 if vdu_delete
.get(vdur
["vdu-id-ref"]):
6450 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
6451 scaling_info
["vdu"].append(
6453 "name": vdur
.get("name") or vdur
.get("vdu-name"),
6454 "vdu_id": vdur
["vdu-id-ref"],
6458 for interface
in vdur
["interfaces"]:
6459 scaling_info
["vdu"][-1]["interface"].append(
6461 "name": interface
["name"],
6462 "ip_address": interface
["ip-address"],
6463 "mac_address": interface
.get("mac-address"),
6466 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
6469 step
= "Executing pre-scale vnf-config-primitive"
6470 if scaling_descriptor
.get("scaling-config-action"):
6471 for scaling_config_action
in scaling_descriptor
[
6472 "scaling-config-action"
6475 scaling_config_action
.get("trigger") == "pre-scale-in"
6476 and scaling_type
== "SCALE_IN"
6478 scaling_config_action
.get("trigger") == "pre-scale-out"
6479 and scaling_type
== "SCALE_OUT"
6481 vnf_config_primitive
= scaling_config_action
[
6482 "vnf-config-primitive-name-ref"
6484 step
= db_nslcmop_update
[
6486 ] = "executing pre-scale scaling-config-action '{}'".format(
6487 vnf_config_primitive
6490 # look for primitive
6491 for config_primitive
in (
6492 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6493 ).get("config-primitive", ()):
6494 if config_primitive
["name"] == vnf_config_primitive
:
6498 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
6499 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
6500 "primitive".format(scaling_group
, vnf_config_primitive
)
6503 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6504 if db_vnfr
.get("additionalParamsForVnf"):
6505 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6507 scale_process
= "VCA"
6508 db_nsr_update
["config-status"] = "configuring pre-scaling"
6509 primitive_params
= self
._map
_primitive
_params
(
6510 config_primitive
, {}, vnfr_params
6513 # Pre-scale retry check: Check if this sub-operation has been executed before
6514 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6517 vnf_config_primitive
,
6521 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6522 # Skip sub-operation
6523 result
= "COMPLETED"
6524 result_detail
= "Done"
6527 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6528 vnf_config_primitive
, result
, result_detail
6532 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6533 # New sub-operation: Get index of this sub-operation
6535 len(db_nslcmop
.get("_admin", {}).get("operations"))
6540 + "vnf_config_primitive={} New sub-operation".format(
6541 vnf_config_primitive
6545 # retry: Get registered params for this existing sub-operation
6546 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6549 vnf_index
= op
.get("member_vnf_index")
6550 vnf_config_primitive
= op
.get("primitive")
6551 primitive_params
= op
.get("primitive_params")
6554 + "vnf_config_primitive={} Sub-operation retry".format(
6555 vnf_config_primitive
6558 # Execute the primitive, either with new (first-time) or registered (reintent) args
6559 ee_descriptor_id
= config_primitive
.get(
6560 "execution-environment-ref"
6562 primitive_name
= config_primitive
.get(
6563 "execution-environment-primitive", vnf_config_primitive
6565 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6566 nsr_deployed
["VCA"],
6567 member_vnf_index
=vnf_index
,
6569 vdu_count_index
=None,
6570 ee_descriptor_id
=ee_descriptor_id
,
6572 result
, result_detail
= await self
._ns
_execute
_primitive
(
6581 + "vnf_config_primitive={} Done with result {} {}".format(
6582 vnf_config_primitive
, result
, result_detail
6585 # Update operationState = COMPLETED | FAILED
6586 self
._update
_suboperation
_status
(
6587 db_nslcmop
, op_index
, result
, result_detail
6590 if result
== "FAILED":
6591 raise LcmException(result_detail
)
6592 db_nsr_update
["config-status"] = old_config_status
6593 scale_process
= None
6597 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
6600 "_admin.scaling-group.{}.time".format(admin_scale_index
)
6603 # SCALE-IN VCA - BEGIN
6604 if vca_scaling_info
:
6605 step
= db_nslcmop_update
[
6607 ] = "Deleting the execution environments"
6608 scale_process
= "VCA"
6609 for vca_info
in vca_scaling_info
:
6610 if vca_info
["type"] == "delete" and not vca_info
.get("osm_kdu_id"):
6611 member_vnf_index
= str(vca_info
["member-vnf-index"])
6613 logging_text
+ "vdu info: {}".format(vca_info
)
6615 if vca_info
.get("osm_vdu_id"):
6616 vdu_id
= vca_info
["osm_vdu_id"]
6617 vdu_index
= int(vca_info
["vdu_index"])
6620 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6621 member_vnf_index
, vdu_id
, vdu_index
6623 stage
[2] = step
= "Scaling in VCA"
6624 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6625 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
6626 config_update
= db_nsr
["configurationStatus"]
6627 for vca_index
, vca
in enumerate(vca_update
):
6629 (vca
or vca
.get("ee_id"))
6630 and vca
["member-vnf-index"] == member_vnf_index
6631 and vca
["vdu_count_index"] == vdu_index
6633 if vca
.get("vdu_id"):
6634 config_descriptor
= get_configuration(
6635 db_vnfd
, vca
.get("vdu_id")
6637 elif vca
.get("kdu_name"):
6638 config_descriptor
= get_configuration(
6639 db_vnfd
, vca
.get("kdu_name")
6642 config_descriptor
= get_configuration(
6643 db_vnfd
, db_vnfd
["id"]
6645 operation_params
= (
6646 db_nslcmop
.get("operationParams") or {}
6648 exec_terminate_primitives
= not operation_params
.get(
6649 "skip_terminate_primitives"
6650 ) and vca
.get("needed_terminate")
6651 task
= asyncio
.ensure_future(
6660 exec_primitives
=exec_terminate_primitives
,
6664 timeout
=self
.timeout_charm_delete
,
6667 tasks_dict_info
[task
] = "Terminating VCA {}".format(
6670 del vca_update
[vca_index
]
6671 del config_update
[vca_index
]
6672 # wait for pending tasks of terminate primitives
6676 + "Waiting for tasks {}".format(
6677 list(tasks_dict_info
.keys())
6680 error_list
= await self
._wait
_for
_tasks
(
6684 self
.timeout_charm_delete
, self
.timeout_ns_terminate
6689 tasks_dict_info
.clear()
6691 raise LcmException("; ".join(error_list
))
6693 db_vca_and_config_update
= {
6694 "_admin.deployed.VCA": vca_update
,
6695 "configurationStatus": config_update
,
6698 "nsrs", db_nsr
["_id"], db_vca_and_config_update
6700 scale_process
= None
6701 # SCALE-IN VCA - END
6704 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
6705 scale_process
= "RO"
6706 if self
.ro_config
.get("ng"):
6707 await self
._scale
_ng
_ro
(
6708 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
6710 scaling_info
.pop("vdu-create", None)
6711 scaling_info
.pop("vdu-delete", None)
6713 scale_process
= None
6717 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
6718 scale_process
= "KDU"
6719 await self
._scale
_kdu
(
6720 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6722 scaling_info
.pop("kdu-create", None)
6723 scaling_info
.pop("kdu-delete", None)
6725 scale_process
= None
6729 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6731 # SCALE-UP VCA - BEGIN
6732 if vca_scaling_info
:
6733 step
= db_nslcmop_update
[
6735 ] = "Creating new execution environments"
6736 scale_process
= "VCA"
6737 for vca_info
in vca_scaling_info
:
6738 if vca_info
["type"] == "create" and not vca_info
.get("osm_kdu_id"):
6739 member_vnf_index
= str(vca_info
["member-vnf-index"])
6741 logging_text
+ "vdu info: {}".format(vca_info
)
6743 vnfd_id
= db_vnfr
["vnfd-ref"]
6744 if vca_info
.get("osm_vdu_id"):
6745 vdu_index
= int(vca_info
["vdu_index"])
6746 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
6747 if db_vnfr
.get("additionalParamsForVnf"):
6748 deploy_params
.update(
6750 db_vnfr
["additionalParamsForVnf"].copy()
6753 descriptor_config
= get_configuration(
6754 db_vnfd
, db_vnfd
["id"]
6756 if descriptor_config
:
6761 logging_text
=logging_text
6762 + "member_vnf_index={} ".format(member_vnf_index
),
6765 nslcmop_id
=nslcmop_id
,
6771 member_vnf_index
=member_vnf_index
,
6772 vdu_index
=vdu_index
,
6774 deploy_params
=deploy_params
,
6775 descriptor_config
=descriptor_config
,
6776 base_folder
=base_folder
,
6777 task_instantiation_info
=tasks_dict_info
,
6780 vdu_id
= vca_info
["osm_vdu_id"]
6781 vdur
= find_in_list(
6782 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
6784 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
6785 if vdur
.get("additionalParams"):
6786 deploy_params_vdu
= parse_yaml_strings(
6787 vdur
["additionalParams"]
6790 deploy_params_vdu
= deploy_params
6791 deploy_params_vdu
["OSM"] = get_osm_params(
6792 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
6794 if descriptor_config
:
6799 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6800 member_vnf_index
, vdu_id
, vdu_index
6802 stage
[2] = step
= "Scaling out VCA"
6803 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6805 logging_text
=logging_text
6806 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6807 member_vnf_index
, vdu_id
, vdu_index
6811 nslcmop_id
=nslcmop_id
,
6817 member_vnf_index
=member_vnf_index
,
6818 vdu_index
=vdu_index
,
6820 deploy_params
=deploy_params_vdu
,
6821 descriptor_config
=descriptor_config
,
6822 base_folder
=base_folder
,
6823 task_instantiation_info
=tasks_dict_info
,
6826 # SCALE-UP VCA - END
6827 scale_process
= None
6830 # execute primitive service POST-SCALING
6831 step
= "Executing post-scale vnf-config-primitive"
6832 if scaling_descriptor
.get("scaling-config-action"):
6833 for scaling_config_action
in scaling_descriptor
[
6834 "scaling-config-action"
6837 scaling_config_action
.get("trigger") == "post-scale-in"
6838 and scaling_type
== "SCALE_IN"
6840 scaling_config_action
.get("trigger") == "post-scale-out"
6841 and scaling_type
== "SCALE_OUT"
6843 vnf_config_primitive
= scaling_config_action
[
6844 "vnf-config-primitive-name-ref"
6846 step
= db_nslcmop_update
[
6848 ] = "executing post-scale scaling-config-action '{}'".format(
6849 vnf_config_primitive
6852 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6853 if db_vnfr
.get("additionalParamsForVnf"):
6854 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6856 # look for primitive
6857 for config_primitive
in (
6858 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6859 ).get("config-primitive", ()):
6860 if config_primitive
["name"] == vnf_config_primitive
:
6864 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
6865 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
6866 "config-primitive".format(
6867 scaling_group
, vnf_config_primitive
6870 scale_process
= "VCA"
6871 db_nsr_update
["config-status"] = "configuring post-scaling"
6872 primitive_params
= self
._map
_primitive
_params
(
6873 config_primitive
, {}, vnfr_params
6876 # Post-scale retry check: Check if this sub-operation has been executed before
6877 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6880 vnf_config_primitive
,
6884 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6885 # Skip sub-operation
6886 result
= "COMPLETED"
6887 result_detail
= "Done"
6890 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6891 vnf_config_primitive
, result
, result_detail
6895 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6896 # New sub-operation: Get index of this sub-operation
6898 len(db_nslcmop
.get("_admin", {}).get("operations"))
6903 + "vnf_config_primitive={} New sub-operation".format(
6904 vnf_config_primitive
6908 # retry: Get registered params for this existing sub-operation
6909 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6912 vnf_index
= op
.get("member_vnf_index")
6913 vnf_config_primitive
= op
.get("primitive")
6914 primitive_params
= op
.get("primitive_params")
6917 + "vnf_config_primitive={} Sub-operation retry".format(
6918 vnf_config_primitive
6921 # Execute the primitive, either with new (first-time) or registered (reintent) args
6922 ee_descriptor_id
= config_primitive
.get(
6923 "execution-environment-ref"
6925 primitive_name
= config_primitive
.get(
6926 "execution-environment-primitive", vnf_config_primitive
6928 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6929 nsr_deployed
["VCA"],
6930 member_vnf_index
=vnf_index
,
6932 vdu_count_index
=None,
6933 ee_descriptor_id
=ee_descriptor_id
,
6935 result
, result_detail
= await self
._ns
_execute
_primitive
(
6944 + "vnf_config_primitive={} Done with result {} {}".format(
6945 vnf_config_primitive
, result
, result_detail
6948 # Update operationState = COMPLETED | FAILED
6949 self
._update
_suboperation
_status
(
6950 db_nslcmop
, op_index
, result
, result_detail
6953 if result
== "FAILED":
6954 raise LcmException(result_detail
)
6955 db_nsr_update
["config-status"] = old_config_status
6956 scale_process
= None
6961 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
6962 db_nsr_update
["operational-status"] = (
6964 if old_operational_status
== "failed"
6965 else old_operational_status
6967 db_nsr_update
["config-status"] = old_config_status
6970 ROclient
.ROClientException
,
6975 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
6977 except asyncio
.CancelledError
:
6979 logging_text
+ "Cancelled Exception while '{}'".format(step
)
6981 exc
= "Operation was cancelled"
6982 except Exception as e
:
6983 exc
= traceback
.format_exc()
6984 self
.logger
.critical(
6985 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6989 self
._write
_ns
_status
(
6992 current_operation
="IDLE",
6993 current_operation_id
=None,
6996 stage
[1] = "Waiting for instantiate pending tasks."
6997 self
.logger
.debug(logging_text
+ stage
[1])
6998 exc
= await self
._wait
_for
_tasks
(
7001 self
.timeout_ns_deploy
,
7009 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7010 nslcmop_operation_state
= "FAILED"
7012 db_nsr_update
["operational-status"] = old_operational_status
7013 db_nsr_update
["config-status"] = old_config_status
7014 db_nsr_update
["detailed-status"] = ""
7016 if "VCA" in scale_process
:
7017 db_nsr_update
["config-status"] = "failed"
7018 if "RO" in scale_process
:
7019 db_nsr_update
["operational-status"] = "failed"
7022 ] = "FAILED scaling nslcmop={} {}: {}".format(
7023 nslcmop_id
, step
, exc
7026 error_description_nslcmop
= None
7027 nslcmop_operation_state
= "COMPLETED"
7028 db_nslcmop_update
["detailed-status"] = "Done"
7030 self
._write
_op
_status
(
7033 error_message
=error_description_nslcmop
,
7034 operation_state
=nslcmop_operation_state
,
7035 other_update
=db_nslcmop_update
,
7038 self
._write
_ns
_status
(
7041 current_operation
="IDLE",
7042 current_operation_id
=None,
7043 other_update
=db_nsr_update
,
7046 if nslcmop_operation_state
:
7050 "nslcmop_id": nslcmop_id
,
7051 "operationState": nslcmop_operation_state
,
7053 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
7054 except Exception as e
:
7056 logging_text
+ "kafka_write notification Exception {}".format(e
)
7058 self
.logger
.debug(logging_text
+ "Exit")
7059 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
7061 async def _scale_kdu(
7062 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
7064 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
7065 for kdu_name
in _scaling_info
:
7066 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
7067 deployed_kdu
, index
= get_deployed_kdu(
7068 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
7070 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
7071 kdu_instance
= deployed_kdu
["kdu-instance"]
7072 kdu_model
= deployed_kdu
.get("kdu-model")
7073 scale
= int(kdu_scaling_info
["scale"])
7074 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
7077 "collection": "nsrs",
7078 "filter": {"_id": nsr_id
},
7079 "path": "_admin.deployed.K8s.{}".format(index
),
7082 step
= "scaling application {}".format(
7083 kdu_scaling_info
["resource-name"]
7085 self
.logger
.debug(logging_text
+ step
)
7087 if kdu_scaling_info
["type"] == "delete":
7088 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7091 and kdu_config
.get("terminate-config-primitive")
7092 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7094 terminate_config_primitive_list
= kdu_config
.get(
7095 "terminate-config-primitive"
7097 terminate_config_primitive_list
.sort(
7098 key
=lambda val
: int(val
["seq"])
7102 terminate_config_primitive
7103 ) in terminate_config_primitive_list
:
7104 primitive_params_
= self
._map
_primitive
_params
(
7105 terminate_config_primitive
, {}, {}
7107 step
= "execute terminate config primitive"
7108 self
.logger
.debug(logging_text
+ step
)
7109 await asyncio
.wait_for(
7110 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7111 cluster_uuid
=cluster_uuid
,
7112 kdu_instance
=kdu_instance
,
7113 primitive_name
=terminate_config_primitive
["name"],
7114 params
=primitive_params_
,
7121 await asyncio
.wait_for(
7122 self
.k8scluster_map
[k8s_cluster_type
].scale(
7125 kdu_scaling_info
["resource-name"],
7127 cluster_uuid
=cluster_uuid
,
7128 kdu_model
=kdu_model
,
7132 timeout
=self
.timeout_vca_on_error
,
7135 if kdu_scaling_info
["type"] == "create":
7136 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7139 and kdu_config
.get("initial-config-primitive")
7140 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7142 initial_config_primitive_list
= kdu_config
.get(
7143 "initial-config-primitive"
7145 initial_config_primitive_list
.sort(
7146 key
=lambda val
: int(val
["seq"])
7149 for initial_config_primitive
in initial_config_primitive_list
:
7150 primitive_params_
= self
._map
_primitive
_params
(
7151 initial_config_primitive
, {}, {}
7153 step
= "execute initial config primitive"
7154 self
.logger
.debug(logging_text
+ step
)
7155 await asyncio
.wait_for(
7156 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7157 cluster_uuid
=cluster_uuid
,
7158 kdu_instance
=kdu_instance
,
7159 primitive_name
=initial_config_primitive
["name"],
7160 params
=primitive_params_
,
7167 async def _scale_ng_ro(
7168 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
7170 nsr_id
= db_nslcmop
["nsInstanceId"]
7171 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7174 # read from db: vnfd's for every vnf
7177 # for each vnf in ns, read vnfd
7178 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
7179 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
7180 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
7181 # if we haven't this vnfd, read it from db
7182 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
7184 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7185 db_vnfds
.append(vnfd
)
7186 n2vc_key
= self
.n2vc
.get_public_key()
7187 n2vc_key_list
= [n2vc_key
]
7190 vdu_scaling_info
.get("vdu-create"),
7191 vdu_scaling_info
.get("vdu-delete"),
7194 # db_vnfr has been updated, update db_vnfrs to use it
7195 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
7196 await self
._instantiate
_ng
_ro
(
7206 start_deploy
=time(),
7207 timeout_ns_deploy
=self
.timeout_ns_deploy
,
7209 if vdu_scaling_info
.get("vdu-delete"):
7211 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
7214 async def extract_prometheus_scrape_jobs(
7215 self
, ee_id
, artifact_path
, ee_config_descriptor
, vnfr_id
, nsr_id
, target_ip
7217 # look if exist a file called 'prometheus*.j2' and
7218 artifact_content
= self
.fs
.dir_ls(artifact_path
)
7222 for f
in artifact_content
7223 if f
.startswith("prometheus") and f
.endswith(".j2")
7229 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
7233 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
7234 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
7236 vnfr_id
= vnfr_id
.replace("-", "")
7238 "JOB_NAME": vnfr_id
,
7239 "TARGET_IP": target_ip
,
7240 "EXPORTER_POD_IP": host_name
,
7241 "EXPORTER_POD_PORT": host_port
,
7243 job_list
= parse_job(job_data
, variables
)
7244 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
7245 for job
in job_list
:
7247 not isinstance(job
.get("job_name"), str)
7248 or vnfr_id
not in job
["job_name"]
7250 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
7251 job
["nsr_id"] = nsr_id
7252 job
["vnfr_id"] = vnfr_id
7255 async def rebuild_start_stop(self
, nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
):
7256 logging_text
= "Task ns={} {}={} ".format(nsr_id
, operation_type
, nslcmop_id
)
7257 self
.logger
.info(logging_text
+ "Enter")
7258 stage
= ["Preparing the environment", ""]
7259 # database nsrs record
7263 # in case of error, indicates what part of scale was failed to put nsr at error status
7264 start_deploy
= time()
7266 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_id
})
7267 vim_account_id
= db_vnfr
.get("vim-account-id")
7268 vim_info_key
= "vim:" + vim_account_id
7269 vdu_id
= additional_param
["vdu_id"]
7270 vdurs
= [item
for item
in db_vnfr
["vdur"] if item
["vdu-id-ref"] == vdu_id
]
7271 vdur
= find_in_list(
7272 vdurs
, lambda vdu
: vdu
["count-index"] == additional_param
["count-index"]
7275 vdu_vim_name
= vdur
["name"]
7276 vim_vm_id
= vdur
["vim_info"][vim_info_key
]["vim_id"]
7277 target_vim
, _
= next(k_v
for k_v
in vdur
["vim_info"].items())
7279 raise LcmException("Target vdu is not found")
7280 self
.logger
.info("vdu_vim_name >> {} ".format(vdu_vim_name
))
7281 # wait for any previous tasks in process
7282 stage
[1] = "Waiting for previous operations to terminate"
7283 self
.logger
.info(stage
[1])
7284 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
7286 stage
[1] = "Reading from database."
7287 self
.logger
.info(stage
[1])
7288 self
._write
_ns
_status
(
7291 current_operation
=operation_type
.upper(),
7292 current_operation_id
=nslcmop_id
7294 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7297 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
7298 db_nsr_update
["operational-status"] = operation_type
7299 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7303 "vim_vm_id": vim_vm_id
,
7305 "vdu_index": additional_param
["count-index"],
7306 "vdu_id": vdur
["id"],
7307 "target_vim": target_vim
,
7308 "vim_account_id": vim_account_id
7311 stage
[1] = "Sending rebuild request to RO... {}".format(desc
)
7312 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7313 self
.logger
.info("ro nsr id: {}".format(nsr_id
))
7314 result_dict
= await self
.RO
.operate(nsr_id
, desc
, operation_type
)
7315 self
.logger
.info("response from RO: {}".format(result_dict
))
7316 action_id
= result_dict
["action_id"]
7317 await self
._wait
_ng
_ro
(
7318 nsr_id
, action_id
, nslcmop_id
, start_deploy
,
7319 self
.timeout_operate
, None, "start_stop_rebuild",
7321 return "COMPLETED", "Done"
7322 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7323 self
.logger
.error("Exit Exception {}".format(e
))
7325 except asyncio
.CancelledError
:
7326 self
.logger
.error("Cancelled Exception while '{}'".format(stage
))
7327 exc
= "Operation was cancelled"
7328 except Exception as e
:
7329 exc
= traceback
.format_exc()
7330 self
.logger
.critical("Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
7331 return "FAILED", "Error in operate VNF {}".format(exc
)
7333 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7335 Get VCA Cloud and VCA Cloud Credentials for the VIM account
7337 :param: vim_account_id: VIM Account ID
7339 :return: (cloud_name, cloud_credential)
7341 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7342 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
7344 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7346 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
7348 :param: vim_account_id: VIM Account ID
7350 :return: (cloud_name, cloud_credential)
7352 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7353 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")
7355 async def migrate(self
, nsr_id
, nslcmop_id
):
7357 Migrate VNFs and VDUs instances in a NS
7359 :param: nsr_id: NS Instance ID
7360 :param: nslcmop_id: nslcmop ID of migrate
7363 # Try to lock HA task here
7364 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7365 if not task_is_locked_by_me
:
7367 logging_text
= "Task ns={} migrate ".format(nsr_id
)
7368 self
.logger
.debug(logging_text
+ "Enter")
7369 # get all needed from database
7371 db_nslcmop_update
= {}
7372 nslcmop_operation_state
= None
7376 # in case of error, indicates what part of scale was failed to put nsr at error status
7377 start_deploy
= time()
7380 # wait for any previous tasks in process
7381 step
= "Waiting for previous operations to terminate"
7382 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7384 self
._write
_ns
_status
(
7387 current_operation
="MIGRATING",
7388 current_operation_id
=nslcmop_id
,
7390 step
= "Getting nslcmop from database"
7392 step
+ " after having waited for previous tasks to be completed"
7394 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7395 migrate_params
= db_nslcmop
.get("operationParams")
7398 target
.update(migrate_params
)
7399 desc
= await self
.RO
.migrate(nsr_id
, target
)
7400 self
.logger
.debug("RO return > {}".format(desc
))
7401 action_id
= desc
["action_id"]
7402 await self
._wait
_ng
_ro
(
7403 nsr_id
, action_id
, nslcmop_id
, start_deploy
, self
.timeout_migrate
,
7406 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7407 self
.logger
.error("Exit Exception {}".format(e
))
7409 except asyncio
.CancelledError
:
7410 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
7411 exc
= "Operation was cancelled"
7412 except Exception as e
:
7413 exc
= traceback
.format_exc()
7414 self
.logger
.critical(
7415 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7418 self
._write
_ns
_status
(
7421 current_operation
="IDLE",
7422 current_operation_id
=None,
7425 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
7426 nslcmop_operation_state
= "FAILED"
7428 nslcmop_operation_state
= "COMPLETED"
7429 db_nslcmop_update
["detailed-status"] = "Done"
7430 db_nsr_update
["detailed-status"] = "Done"
7432 self
._write
_op
_status
(
7436 operation_state
=nslcmop_operation_state
,
7437 other_update
=db_nslcmop_update
,
7439 if nslcmop_operation_state
:
7443 "nslcmop_id": nslcmop_id
,
7444 "operationState": nslcmop_operation_state
,
7446 await self
.msg
.aiowrite("ns", "migrated", msg
, loop
=self
.loop
)
7447 except Exception as e
:
7449 logging_text
+ "kafka_write notification Exception {}".format(e
)
7451 self
.logger
.debug(logging_text
+ "Exit")
7452 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_migrate")
7455 async def heal(self
, nsr_id
, nslcmop_id
):
7459 :param nsr_id: ns instance to heal
7460 :param nslcmop_id: operation to run
7464 # Try to lock HA task here
7465 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7466 if not task_is_locked_by_me
:
7469 logging_text
= "Task ns={} heal={} ".format(nsr_id
, nslcmop_id
)
7470 stage
= ["", "", ""]
7471 tasks_dict_info
= {}
7472 # ^ stage, step, VIM progress
7473 self
.logger
.debug(logging_text
+ "Enter")
7474 # get all needed from database
7476 db_nslcmop_update
= {}
7478 db_vnfrs
= {} # vnf's info indexed by _id
7480 old_operational_status
= ""
7481 old_config_status
= ""
7484 # wait for any previous tasks in process
7485 step
= "Waiting for previous operations to terminate"
7486 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7487 self
._write
_ns
_status
(
7490 current_operation
="HEALING",
7491 current_operation_id
=nslcmop_id
,
7494 step
= "Getting nslcmop from database"
7496 step
+ " after having waited for previous tasks to be completed"
7498 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7500 step
= "Getting nsr from database"
7501 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
7502 old_operational_status
= db_nsr
["operational-status"]
7503 old_config_status
= db_nsr
["config-status"]
7506 "_admin.deployed.RO.operational-status": "healing",
7508 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7510 step
= "Sending heal order to VIM"
7511 task_ro
= asyncio
.ensure_future(
7513 logging_text
=logging_text
,
7515 db_nslcmop
=db_nslcmop
,
7519 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "heal_RO", task_ro
)
7520 tasks_dict_info
[task_ro
] = "Healing at VIM"
7524 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
7525 self
.logger
.debug(logging_text
+ stage
[1])
7526 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7527 self
.fs
.sync(db_nsr
["nsd-id"])
7529 # read from db: vnfr's of this ns
7530 step
= "Getting vnfrs from db"
7531 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
7532 for vnfr
in db_vnfrs_list
:
7533 db_vnfrs
[vnfr
["_id"]] = vnfr
7534 self
.logger
.debug("ns.heal db_vnfrs={}".format(db_vnfrs
))
7536 # Check for each target VNF
7537 target_list
= db_nslcmop
.get("operationParams", {}).get("healVnfData", {})
7538 for target_vnf
in target_list
:
7539 # Find this VNF in the list from DB
7540 vnfr_id
= target_vnf
.get("vnfInstanceId", None)
7542 db_vnfr
= db_vnfrs
[vnfr_id
]
7543 vnfd_id
= db_vnfr
.get("vnfd-id")
7544 vnfd_ref
= db_vnfr
.get("vnfd-ref")
7545 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7546 base_folder
= vnfd
["_admin"]["storage"]
7551 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
7552 member_vnf_index
= db_vnfr
.get("member-vnf-index-ref")
7554 # Check each target VDU and deploy N2VC
7555 for target_vdu
in target_vnf
["additionalParams"].get("vdu", None):
7556 deploy_params_vdu
= target_vdu
7557 # Set run-day1 vnf level value if not vdu level value exists
7558 if not deploy_params_vdu
.get("run-day1") and target_vnf
["additionalParams"].get("run-day1"):
7559 deploy_params_vdu
["run-day1"] = target_vnf
["additionalParams"].get("run-day1")
7560 vdu_name
= target_vdu
.get("vdu-id", None)
7561 # TODO: Get vdu_id from vdud.
7563 # For multi instance VDU count-index is mandatory
7564 # For single session VDU count-indes is 0
7565 vdu_index
= target_vdu
.get("count-index",0)
7567 # n2vc_redesign STEP 3 to 6 Deploy N2VC
7568 stage
[1] = "Deploying Execution Environments."
7569 self
.logger
.debug(logging_text
+ stage
[1])
7571 # VNF Level charm. Normal case when proxy charms.
7572 # If target instance is management machine continue with actions: recreate EE for native charms or reinject juju key for proxy charms.
7573 descriptor_config
= get_configuration(vnfd
, vnfd_ref
)
7574 if descriptor_config
:
7575 # Continue if healed machine is management machine
7576 vnf_ip_address
= db_vnfr
.get("ip-address")
7577 target_instance
= None
7578 for instance
in db_vnfr
.get("vdur", None):
7579 if ( instance
["vdu-name"] == vdu_name
and instance
["count-index"] == vdu_index
):
7580 target_instance
= instance
7582 if vnf_ip_address
== target_instance
.get("ip-address"):
7584 logging_text
=logging_text
7585 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7586 member_vnf_index
, vdu_name
, vdu_index
7590 nslcmop_id
=nslcmop_id
,
7596 member_vnf_index
=member_vnf_index
,
7599 deploy_params
=deploy_params_vdu
,
7600 descriptor_config
=descriptor_config
,
7601 base_folder
=base_folder
,
7602 task_instantiation_info
=tasks_dict_info
,
7606 # VDU Level charm. Normal case with native charms.
7607 descriptor_config
= get_configuration(vnfd
, vdu_name
)
7608 if descriptor_config
:
7610 logging_text
=logging_text
7611 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7612 member_vnf_index
, vdu_name
, vdu_index
7616 nslcmop_id
=nslcmop_id
,
7622 member_vnf_index
=member_vnf_index
,
7623 vdu_index
=vdu_index
,
7625 deploy_params
=deploy_params_vdu
,
7626 descriptor_config
=descriptor_config
,
7627 base_folder
=base_folder
,
7628 task_instantiation_info
=tasks_dict_info
,
7633 ROclient
.ROClientException
,
7638 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7640 except asyncio
.CancelledError
:
7642 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7644 exc
= "Operation was cancelled"
7645 except Exception as e
:
7646 exc
= traceback
.format_exc()
7647 self
.logger
.critical(
7648 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7653 stage
[1] = "Waiting for healing pending tasks."
7654 self
.logger
.debug(logging_text
+ stage
[1])
7655 exc
= await self
._wait
_for
_tasks
(
7658 self
.timeout_ns_deploy
,
7666 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7667 nslcmop_operation_state
= "FAILED"
7669 db_nsr_update
["operational-status"] = old_operational_status
7670 db_nsr_update
["config-status"] = old_config_status
7673 ] = "FAILED healing nslcmop={} {}: {}".format(
7674 nslcmop_id
, step
, exc
7676 for task
, task_name
in tasks_dict_info
.items():
7677 if not task
.done() or task
.cancelled() or task
.exception():
7678 if task_name
.startswith(self
.task_name_deploy_vca
):
7679 # A N2VC task is pending
7680 db_nsr_update
["config-status"] = "failed"
7682 # RO task is pending
7683 db_nsr_update
["operational-status"] = "failed"
7685 error_description_nslcmop
= None
7686 nslcmop_operation_state
= "COMPLETED"
7687 db_nslcmop_update
["detailed-status"] = "Done"
7688 db_nsr_update
["detailed-status"] = "Done"
7689 db_nsr_update
["operational-status"] = "running"
7690 db_nsr_update
["config-status"] = "configured"
7692 self
._write
_op
_status
(
7695 error_message
=error_description_nslcmop
,
7696 operation_state
=nslcmop_operation_state
,
7697 other_update
=db_nslcmop_update
,
7700 self
._write
_ns
_status
(
7703 current_operation
="IDLE",
7704 current_operation_id
=None,
7705 other_update
=db_nsr_update
,
7708 if nslcmop_operation_state
:
7712 "nslcmop_id": nslcmop_id
,
7713 "operationState": nslcmop_operation_state
,
7715 await self
.msg
.aiowrite("ns", "healed", msg
, loop
=self
.loop
)
7716 except Exception as e
:
7718 logging_text
+ "kafka_write notification Exception {}".format(e
)
7720 self
.logger
.debug(logging_text
+ "Exit")
7721 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_heal")
7732 :param logging_text: preffix text to use at logging
7733 :param nsr_id: nsr identity
7734 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
7735 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
7736 :return: None or exception
7738 def get_vim_account(vim_account_id
):
7740 if vim_account_id
in db_vims
:
7741 return db_vims
[vim_account_id
]
7742 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
7743 db_vims
[vim_account_id
] = db_vim
7748 ns_params
= db_nslcmop
.get("operationParams")
7749 if ns_params
and ns_params
.get("timeout_ns_heal"):
7750 timeout_ns_heal
= ns_params
["timeout_ns_heal"]
7752 timeout_ns_heal
= self
.timeout
.get(
7753 "ns_heal", self
.timeout_ns_heal
7758 nslcmop_id
= db_nslcmop
["_id"]
7760 "action_id": nslcmop_id
,
7762 self
.logger
.warning("db_nslcmop={} and timeout_ns_heal={}".format(db_nslcmop
,timeout_ns_heal
))
7763 target
.update(db_nslcmop
.get("operationParams", {}))
7765 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
7766 desc
= await self
.RO
.recreate(nsr_id
, target
)
7767 self
.logger
.debug("RO return > {}".format(desc
))
7768 action_id
= desc
["action_id"]
7769 # waits for RO to complete because Reinjecting juju key at ro can find VM in state Deleted
7770 await self
._wait
_ng
_ro
(
7771 nsr_id
, action_id
, nslcmop_id
, start_heal
, timeout_ns_heal
, stage
,
7777 "_admin.deployed.RO.operational-status": "running",
7778 "detailed-status": " ".join(stage
),
7780 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7781 self
._write
_op
_status
(nslcmop_id
, stage
)
7783 logging_text
+ "ns healed at RO. RO_id={}".format(action_id
)
7786 except Exception as e
:
7787 stage
[2] = "ERROR healing at VIM"
7788 #self.set_vnfr_at_error(db_vnfrs, str(e))
7790 "Error healing at VIM {}".format(e
),
7791 exc_info
=not isinstance(
7794 ROclient
.ROClientException
,
7820 task_instantiation_info
,
7823 # launch instantiate_N2VC in a asyncio task and register task object
7824 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
7825 # if not found, create one entry and update database
7826 # fill db_nsr._admin.deployed.VCA.<index>
7829 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
7831 if "execution-environment-list" in descriptor_config
:
7832 ee_list
= descriptor_config
.get("execution-environment-list", [])
7833 elif "juju" in descriptor_config
:
7834 ee_list
= [descriptor_config
] # ns charms
7835 else: # other types as script are not supported
7838 for ee_item
in ee_list
:
7841 + "_deploy_n2vc ee_item juju={}, helm={}".format(
7842 ee_item
.get("juju"), ee_item
.get("helm-chart")
7845 ee_descriptor_id
= ee_item
.get("id")
7846 if ee_item
.get("juju"):
7847 vca_name
= ee_item
["juju"].get("charm")
7850 if ee_item
["juju"].get("charm") is not None
7853 if ee_item
["juju"].get("cloud") == "k8s":
7854 vca_type
= "k8s_proxy_charm"
7855 elif ee_item
["juju"].get("proxy") is False:
7856 vca_type
= "native_charm"
7857 elif ee_item
.get("helm-chart"):
7858 vca_name
= ee_item
["helm-chart"]
7859 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
7862 vca_type
= "helm-v3"
7865 logging_text
+ "skipping non juju neither charm configuration"
7870 for vca_index
, vca_deployed
in enumerate(
7871 db_nsr
["_admin"]["deployed"]["VCA"]
7873 if not vca_deployed
:
7876 vca_deployed
.get("member-vnf-index") == member_vnf_index
7877 and vca_deployed
.get("vdu_id") == vdu_id
7878 and vca_deployed
.get("kdu_name") == kdu_name
7879 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
7880 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
7884 # not found, create one.
7886 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
7889 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
7891 target
+= "/kdu/{}".format(kdu_name
)
7893 "target_element": target
,
7894 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
7895 "member-vnf-index": member_vnf_index
,
7897 "kdu_name": kdu_name
,
7898 "vdu_count_index": vdu_index
,
7899 "operational-status": "init", # TODO revise
7900 "detailed-status": "", # TODO revise
7901 "step": "initial-deploy", # TODO revise
7903 "vdu_name": vdu_name
,
7905 "ee_descriptor_id": ee_descriptor_id
,
7909 # create VCA and configurationStatus in db
7911 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
7912 "configurationStatus.{}".format(vca_index
): dict(),
7914 self
.update_db_2("nsrs", nsr_id
, db_dict
)
7916 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
7918 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
7919 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
7920 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
7923 task_n2vc
= asyncio
.ensure_future(
7925 logging_text
=logging_text
,
7926 vca_index
=vca_index
,
7932 vdu_index
=vdu_index
,
7933 deploy_params
=deploy_params
,
7934 config_descriptor
=descriptor_config
,
7935 base_folder
=base_folder
,
7936 nslcmop_id
=nslcmop_id
,
7940 ee_config_descriptor
=ee_item
,
7943 self
.lcm_tasks
.register(
7947 "instantiate_N2VC-{}".format(vca_index
),
7950 task_instantiation_info
[
7952 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
7953 member_vnf_index
or "", vdu_id
or ""
7956 async def heal_N2VC(
7973 ee_config_descriptor
,
7975 nsr_id
= db_nsr
["_id"]
7976 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
7977 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
7978 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
7979 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
7981 "collection": "nsrs",
7982 "filter": {"_id": nsr_id
},
7983 "path": db_update_entry
,
7989 element_under_configuration
= nsr_id
7993 vnfr_id
= db_vnfr
["_id"]
7994 osm_config
["osm"]["vnf_id"] = vnfr_id
7996 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
7998 if vca_type
== "native_charm":
8001 index_number
= vdu_index
or 0
8004 element_type
= "VNF"
8005 element_under_configuration
= vnfr_id
8006 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
8008 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
8009 element_type
= "VDU"
8010 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
8011 osm_config
["osm"]["vdu_id"] = vdu_id
8013 namespace
+= ".{}".format(kdu_name
)
8014 element_type
= "KDU"
8015 element_under_configuration
= kdu_name
8016 osm_config
["osm"]["kdu_name"] = kdu_name
8019 if base_folder
["pkg-dir"]:
8020 artifact_path
= "{}/{}/{}/{}".format(
8021 base_folder
["folder"],
8022 base_folder
["pkg-dir"],
8025 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8030 artifact_path
= "{}/Scripts/{}/{}/".format(
8031 base_folder
["folder"],
8034 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8039 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
8041 # get initial_config_primitive_list that applies to this element
8042 initial_config_primitive_list
= config_descriptor
.get(
8043 "initial-config-primitive"
8047 "Initial config primitive list > {}".format(
8048 initial_config_primitive_list
8052 # add config if not present for NS charm
8053 ee_descriptor_id
= ee_config_descriptor
.get("id")
8054 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
8055 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
8056 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
8060 "Initial config primitive list #2 > {}".format(
8061 initial_config_primitive_list
8064 # n2vc_redesign STEP 3.1
8065 # find old ee_id if exists
8066 ee_id
= vca_deployed
.get("ee_id")
8068 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
8069 # create or register execution environment in VCA. Only for native charms when healing
8070 if vca_type
== "native_charm":
8071 step
= "Waiting to VM being up and getting IP address"
8072 self
.logger
.debug(logging_text
+ step
)
8073 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8082 credentials
= {"hostname": rw_mgmt_ip
}
8084 username
= deep_get(
8085 config_descriptor
, ("config-access", "ssh-access", "default-user")
8087 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
8088 # merged. Meanwhile let's get username from initial-config-primitive
8089 if not username
and initial_config_primitive_list
:
8090 for config_primitive
in initial_config_primitive_list
:
8091 for param
in config_primitive
.get("parameter", ()):
8092 if param
["name"] == "ssh-username":
8093 username
= param
["value"]
8097 "Cannot determine the username neither with 'initial-config-primitive' nor with "
8098 "'config-access.ssh-access.default-user'"
8100 credentials
["username"] = username
8102 # n2vc_redesign STEP 3.2
8103 # TODO: Before healing at RO it is needed to destroy native charm units to be deleted.
8104 self
._write
_configuration
_status
(
8106 vca_index
=vca_index
,
8107 status
="REGISTERING",
8108 element_under_configuration
=element_under_configuration
,
8109 element_type
=element_type
,
8112 step
= "register execution environment {}".format(credentials
)
8113 self
.logger
.debug(logging_text
+ step
)
8114 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
8115 credentials
=credentials
,
8116 namespace
=namespace
,
8121 # update ee_id en db
8123 "_admin.deployed.VCA.{}.ee_id".format(vca_index
): ee_id
,
8125 self
.update_db_2("nsrs", nsr_id
, db_dict_ee_id
)
8127 # for compatibility with MON/POL modules, the need model and application name at database
8128 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
8129 # Not sure if this need to be done when healing
8131 ee_id_parts = ee_id.split(".")
8132 db_nsr_update = {db_update_entry + "ee_id": ee_id}
8133 if len(ee_id_parts) >= 2:
8134 model_name = ee_id_parts[0]
8135 application_name = ee_id_parts[1]
8136 db_nsr_update[db_update_entry + "model"] = model_name
8137 db_nsr_update[db_update_entry + "application"] = application_name
8140 # n2vc_redesign STEP 3.3
8141 # Install configuration software. Only for native charms.
8142 step
= "Install configuration Software"
8144 self
._write
_configuration
_status
(
8146 vca_index
=vca_index
,
8147 status
="INSTALLING SW",
8148 element_under_configuration
=element_under_configuration
,
8149 element_type
=element_type
,
8150 #other_update=db_nsr_update,
8154 # TODO check if already done
8155 self
.logger
.debug(logging_text
+ step
)
8157 if vca_type
== "native_charm":
8158 config_primitive
= next(
8159 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
8162 if config_primitive
:
8163 config
= self
._map
_primitive
_params
(
8164 config_primitive
, {}, deploy_params
8166 await self
.vca_map
[vca_type
].install_configuration_sw(
8168 artifact_path
=artifact_path
,
8176 # write in db flag of configuration_sw already installed
8178 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
8181 # Not sure if this need to be done when healing
8183 # add relations for this VCA (wait for other peers related with this VCA)
8184 await self._add_vca_relations(
8185 logging_text=logging_text,
8188 vca_index=vca_index,
8192 # if SSH access is required, then get execution environment SSH public
8193 # if native charm we have waited already to VM be UP
8194 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
8197 # self.logger.debug("get ssh key block")
8199 config_descriptor
, ("config-access", "ssh-access", "required")
8201 # self.logger.debug("ssh key needed")
8202 # Needed to inject a ssh key
8205 ("config-access", "ssh-access", "default-user"),
8207 step
= "Install configuration Software, getting public ssh key"
8208 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
8209 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
8212 step
= "Insert public key into VM user={} ssh_key={}".format(
8216 # self.logger.debug("no need to get ssh key")
8217 step
= "Waiting to VM being up and getting IP address"
8218 self
.logger
.debug(logging_text
+ step
)
8220 # n2vc_redesign STEP 5.1
8221 # wait for RO (ip-address) Insert pub_key into VM
8222 # IMPORTANT: We need do wait for RO to complete healing operation.
8223 await self
._wait
_heal
_ro
(nsr_id
,self
.timeout_ns_heal
)
8226 rw_mgmt_ip
= await self
.wait_kdu_up(
8227 logging_text
, nsr_id
, vnfr_id
, kdu_name
8230 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8240 rw_mgmt_ip
= None # This is for a NS configuration
8242 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
8244 # store rw_mgmt_ip in deploy params for later replacement
8245 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
8248 # get run-day1 operation parameter
8249 runDay1
= deploy_params
.get("run-day1",False)
8250 self
.logger
.debug(" Healing vnf={}, vdu={}, runDay1 ={}".format(vnfr_id
,vdu_id
,runDay1
))
8252 # n2vc_redesign STEP 6 Execute initial config primitive
8253 step
= "execute initial config primitive"
8255 # wait for dependent primitives execution (NS -> VNF -> VDU)
8256 if initial_config_primitive_list
:
8257 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
8259 # stage, in function of element type: vdu, kdu, vnf or ns
8260 my_vca
= vca_deployed_list
[vca_index
]
8261 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
8263 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
8264 elif my_vca
.get("member-vnf-index"):
8266 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
8269 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
8271 self
._write
_configuration
_status
(
8272 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
8275 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
8277 check_if_terminated_needed
= True
8278 for initial_config_primitive
in initial_config_primitive_list
:
8279 # adding information on the vca_deployed if it is a NS execution environment
8280 if not vca_deployed
["member-vnf-index"]:
8281 deploy_params
["ns_config_info"] = json
.dumps(
8282 self
._get
_ns
_config
_info
(nsr_id
)
8284 # TODO check if already done
8285 primitive_params_
= self
._map
_primitive
_params
(
8286 initial_config_primitive
, {}, deploy_params
8289 step
= "execute primitive '{}' params '{}'".format(
8290 initial_config_primitive
["name"], primitive_params_
8292 self
.logger
.debug(logging_text
+ step
)
8293 await self
.vca_map
[vca_type
].exec_primitive(
8295 primitive_name
=initial_config_primitive
["name"],
8296 params_dict
=primitive_params_
,
8301 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
8302 if check_if_terminated_needed
:
8303 if config_descriptor
.get("terminate-config-primitive"):
8305 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
8307 check_if_terminated_needed
= False
8309 # TODO register in database that primitive is done
8311 # STEP 7 Configure metrics
8312 # Not sure if this need to be done when healing
8314 if vca_type == "helm" or vca_type == "helm-v3":
8315 prometheus_jobs = await self.extract_prometheus_scrape_jobs(
8317 artifact_path=artifact_path,
8318 ee_config_descriptor=ee_config_descriptor,
8321 target_ip=rw_mgmt_ip,
8327 {db_update_entry + "prometheus_jobs": prometheus_jobs},
8330 for job in prometheus_jobs:
8333 {"job_name": job["job_name"]},
8336 fail_on_empty=False,
8340 step
= "instantiated at VCA"
8341 self
.logger
.debug(logging_text
+ step
)
8343 self
._write
_configuration
_status
(
8344 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
8347 except Exception as e
: # TODO not use Exception but N2VC exception
8348 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
8350 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
8353 "Exception while {} : {}".format(step
, e
), exc_info
=True
8355 self
._write
_configuration
_status
(
8356 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
8358 raise LcmException("{} {}".format(step
, e
)) from e
8360 async def _wait_heal_ro(
8366 while time() <= start_time
+ timeout
:
8367 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
8368 operational_status_ro
= db_nsr
["_admin"]["deployed"]["RO"]["operational-status"]
8369 self
.logger
.debug("Wait Heal RO > {}".format(operational_status_ro
))
8370 if operational_status_ro
!= "healing":
8372 await asyncio
.sleep(15, loop
=self
.loop
)
8373 else: # timeout_ns_deploy
8374 raise NgRoException("Timeout waiting ns to deploy")
8376 async def vertical_scale(self
, nsr_id
, nslcmop_id
):
8378 Vertical Scale the VDUs in a NS
8380 :param: nsr_id: NS Instance ID
8381 :param: nslcmop_id: nslcmop ID of migrate
8384 # Try to lock HA task here
8385 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
8386 if not task_is_locked_by_me
:
8388 logging_text
= "Task ns={} vertical scale ".format(nsr_id
)
8389 self
.logger
.debug(logging_text
+ "Enter")
8390 # get all needed from database
8392 db_nslcmop_update
= {}
8393 nslcmop_operation_state
= None
8397 # in case of error, indicates what part of scale was failed to put nsr at error status
8398 start_deploy
= time()
8401 # wait for any previous tasks in process
8402 step
= "Waiting for previous operations to terminate"
8403 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
8405 self
._write
_ns
_status
(
8408 current_operation
="VerticalScale",
8409 current_operation_id
=nslcmop_id
8411 step
= "Getting nslcmop from database"
8412 self
.logger
.debug(step
+ " after having waited for previous tasks to be completed")
8413 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
8414 operationParams
= db_nslcmop
.get("operationParams")
8416 target
.update(operationParams
)
8417 desc
= await self
.RO
.vertical_scale(nsr_id
, target
)
8418 self
.logger
.debug("RO return > {}".format(desc
))
8419 action_id
= desc
["action_id"]
8420 await self
._wait
_ng
_ro
(
8421 nsr_id
, action_id
, nslcmop_id
, start_deploy
, self
.timeout_verticalscale
,
8422 operation
="verticalscale"
8424 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
8425 self
.logger
.error("Exit Exception {}".format(e
))
8427 except asyncio
.CancelledError
:
8428 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
8429 exc
= "Operation was cancelled"
8430 except Exception as e
:
8431 exc
= traceback
.format_exc()
8432 self
.logger
.critical("Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
8434 self
._write
_ns
_status
(
8437 current_operation
="IDLE",
8438 current_operation_id
=None,
8443 ] = "FAILED {}: {}".format(step
, exc
)
8444 nslcmop_operation_state
= "FAILED"
8446 nslcmop_operation_state
= "COMPLETED"
8447 db_nslcmop_update
["detailed-status"] = "Done"
8448 db_nsr_update
["detailed-status"] = "Done"
8450 self
._write
_op
_status
(
8454 operation_state
=nslcmop_operation_state
,
8455 other_update
=db_nslcmop_update
,
8457 if nslcmop_operation_state
:
8461 "nslcmop_id": nslcmop_id
,
8462 "operationState": nslcmop_operation_state
,
8464 await self
.msg
.aiowrite("ns", "verticalscaled", msg
, loop
=self
.loop
)
8465 except Exception as e
:
8467 logging_text
+ "kafka_write notification Exception {}".format(e
)
8469 self
.logger
.debug(logging_text
+ "Exit")
8470 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_verticalscale")