1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 from typing
import Any
, Dict
, List
24 import logging
.handlers
36 from osm_lcm
import ROclient
37 from osm_lcm
.data_utils
.nsr
import (
40 get_deployed_vca_list
,
43 from osm_lcm
.data_utils
.vca
import (
52 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
53 from osm_lcm
.lcm_utils
import (
60 check_juju_bundle_existence
,
61 get_charm_artifact_path
,
63 from osm_lcm
.data_utils
.nsd
import (
64 get_ns_configuration_relation_list
,
68 from osm_lcm
.data_utils
.vnfd
import (
74 get_ee_sorted_initial_config_primitive_list
,
75 get_ee_sorted_terminate_config_primitive_list
,
77 get_virtual_link_profiles
,
82 get_number_of_instances
,
84 get_kdu_resource_profile
,
85 find_software_version
,
87 from osm_lcm
.data_utils
.list_utils
import find_in_list
88 from osm_lcm
.data_utils
.vnfr
import (
92 get_volumes_from_instantiation_params
,
94 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
95 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
96 from n2vc
.definitions
import RelationEndpoint
97 from n2vc
.k8s_helm_conn
import K8sHelmConnector
98 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
99 from n2vc
.k8s_juju_conn
import K8sJujuConnector
101 from osm_common
.dbbase
import DbException
102 from osm_common
.fsbase
import FsException
104 from osm_lcm
.data_utils
.database
.database
import Database
105 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
107 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
108 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
110 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
111 from osm_lcm
.osm_config
import OsmConfigBuilder
112 from osm_lcm
.prometheus
import parse_job
114 from copy
import copy
, deepcopy
115 from time
import time
116 from uuid
import uuid4
118 from random
import randint
120 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
123 class NsLcm(LcmBase
):
124 timeout_vca_on_error
= (
126 ) # Time for charm from first time at blocked,error status to mark as failed
127 timeout_ns_deploy
= 2 * 3600 # default global timeout for deployment a ns
128 timeout_ns_terminate
= 1800 # default global timeout for un deployment a ns
129 timeout_ns_heal
= 1800 # default global timeout for un deployment a ns
130 timeout_charm_delete
= 10 * 60
131 timeout_primitive
= 30 * 60 # timeout for primitive execution
132 timeout_ns_update
= 30 * 60 # timeout for ns update
133 timeout_progress_primitive
= (
135 ) # timeout for some progress in a primitive execution
136 timeout_migrate
= 1800 # default global timeout for migrating vnfs
137 timeout_operate
= 1800 # default global timeout for migrating vnfs
138 timeout_verticalscale
= 1800 # default global timeout for Vertical Sclaing
139 SUBOPERATION_STATUS_NOT_FOUND
= -1
140 SUBOPERATION_STATUS_NEW
= -2
141 SUBOPERATION_STATUS_SKIP
= -3
142 task_name_deploy_vca
= "Deploying VCA"
144 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
146 Init, Connect to database, filesystem storage, and messaging
147 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
150 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
152 self
.db
= Database().instance
.db
153 self
.fs
= Filesystem().instance
.fs
155 self
.lcm_tasks
= lcm_tasks
156 self
.timeout
= config
["timeout"]
157 self
.ro_config
= config
["ro_config"]
158 self
.ng_ro
= config
["ro_config"].get("ng")
159 self
.vca_config
= config
["VCA"].copy()
161 # create N2VC connector
162 self
.n2vc
= N2VCJujuConnector(
165 on_update_db
=self
._on
_update
_n
2vc
_db
,
170 self
.conn_helm_ee
= LCMHelmConn(
173 vca_config
=self
.vca_config
,
174 on_update_db
=self
._on
_update
_n
2vc
_db
,
177 self
.k8sclusterhelm2
= K8sHelmConnector(
178 kubectl_command
=self
.vca_config
.get("kubectlpath"),
179 helm_command
=self
.vca_config
.get("helmpath"),
186 self
.k8sclusterhelm3
= K8sHelm3Connector(
187 kubectl_command
=self
.vca_config
.get("kubectlpath"),
188 helm_command
=self
.vca_config
.get("helm3path"),
195 self
.k8sclusterjuju
= K8sJujuConnector(
196 kubectl_command
=self
.vca_config
.get("kubectlpath"),
197 juju_command
=self
.vca_config
.get("jujupath"),
200 on_update_db
=self
._on
_update
_k
8s
_db
,
205 self
.k8scluster_map
= {
206 "helm-chart": self
.k8sclusterhelm2
,
207 "helm-chart-v3": self
.k8sclusterhelm3
,
208 "chart": self
.k8sclusterhelm3
,
209 "juju-bundle": self
.k8sclusterjuju
,
210 "juju": self
.k8sclusterjuju
,
214 "lxc_proxy_charm": self
.n2vc
,
215 "native_charm": self
.n2vc
,
216 "k8s_proxy_charm": self
.n2vc
,
217 "helm": self
.conn_helm_ee
,
218 "helm-v3": self
.conn_helm_ee
,
222 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
)
224 self
.op_status_map
= {
225 "instantiation": self
.RO
.status
,
226 "termination": self
.RO
.status
,
227 "migrate": self
.RO
.status
,
228 "healing": self
.RO
.recreate_status
,
229 "verticalscale": self
.RO
.status
,
230 "start_stop_rebuild": self
.RO
.status
,
234 def increment_ip_mac(ip_mac
, vm_index
=1):
235 if not isinstance(ip_mac
, str):
238 # try with ipv4 look for last dot
239 i
= ip_mac
.rfind(".")
242 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
243 # try with ipv6 or mac look for last colon. Operate in hex
244 i
= ip_mac
.rfind(":")
247 # format in hex, len can be 2 for mac or 4 for ipv6
248 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
249 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
255 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
257 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
260 # TODO filter RO descriptor fields...
264 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
265 db_dict
["deploymentStatus"] = ro_descriptor
266 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
268 except Exception as e
:
270 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id
, e
)
273 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
275 # remove last dot from path (if exists)
276 if path
.endswith("."):
279 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
280 # .format(table, filter, path, updated_data))
283 nsr_id
= filter.get("_id")
285 # read ns record from database
286 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
287 current_ns_status
= nsr
.get("nsState")
289 # get vca status for NS
290 status_dict
= await self
.n2vc
.get_status(
291 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
296 db_dict
["vcaStatus"] = status_dict
298 # update configurationStatus for this VCA
300 vca_index
= int(path
[path
.rfind(".") + 1 :])
303 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
305 vca_status
= vca_list
[vca_index
].get("status")
307 configuration_status_list
= nsr
.get("configurationStatus")
308 config_status
= configuration_status_list
[vca_index
].get("status")
310 if config_status
== "BROKEN" and vca_status
!= "failed":
311 db_dict
["configurationStatus"][vca_index
] = "READY"
312 elif config_status
!= "BROKEN" and vca_status
== "failed":
313 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
314 except Exception as e
:
315 # not update configurationStatus
316 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
318 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
319 # if nsState = 'DEGRADED' check if all is OK
321 if current_ns_status
in ("READY", "DEGRADED"):
322 error_description
= ""
324 if status_dict
.get("machines"):
325 for machine_id
in status_dict
.get("machines"):
326 machine
= status_dict
.get("machines").get(machine_id
)
327 # check machine agent-status
328 if machine
.get("agent-status"):
329 s
= machine
.get("agent-status").get("status")
332 error_description
+= (
333 "machine {} agent-status={} ; ".format(
337 # check machine instance status
338 if machine
.get("instance-status"):
339 s
= machine
.get("instance-status").get("status")
342 error_description
+= (
343 "machine {} instance-status={} ; ".format(
348 if status_dict
.get("applications"):
349 for app_id
in status_dict
.get("applications"):
350 app
= status_dict
.get("applications").get(app_id
)
351 # check application status
352 if app
.get("status"):
353 s
= app
.get("status").get("status")
356 error_description
+= (
357 "application {} status={} ; ".format(app_id
, s
)
360 if error_description
:
361 db_dict
["errorDescription"] = error_description
362 if current_ns_status
== "READY" and is_degraded
:
363 db_dict
["nsState"] = "DEGRADED"
364 if current_ns_status
== "DEGRADED" and not is_degraded
:
365 db_dict
["nsState"] = "READY"
368 self
.update_db_2("nsrs", nsr_id
, db_dict
)
370 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
372 except Exception as e
:
373 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
375 async def _on_update_k8s_db(
376 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None, cluster_type
="juju"
379 Updating vca status in NSR record
380 :param cluster_uuid: UUID of a k8s cluster
381 :param kdu_instance: The unique name of the KDU instance
382 :param filter: To get nsr_id
383 :cluster_type: The cluster type (juju, k8s)
387 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
388 # .format(cluster_uuid, kdu_instance, filter))
390 nsr_id
= filter.get("_id")
392 vca_status
= await self
.k8scluster_map
[cluster_type
].status_kdu(
393 cluster_uuid
=cluster_uuid
,
394 kdu_instance
=kdu_instance
,
396 complete_status
=True,
402 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
405 f
"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
409 self
.update_db_2("nsrs", nsr_id
, db_dict
)
410 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
412 except Exception as e
:
413 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
416 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
419 undefined
=StrictUndefined
,
420 autoescape
=select_autoescape(default_for_string
=True, default
=True),
422 template
= env
.from_string(cloud_init_text
)
423 return template
.render(additional_params
or {})
424 except UndefinedError
as e
:
426 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
427 "file, must be provided in the instantiation parameters inside the "
428 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
430 except (TemplateError
, TemplateNotFound
) as e
:
432 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
437 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
438 cloud_init_content
= cloud_init_file
= None
440 if vdu
.get("cloud-init-file"):
441 base_folder
= vnfd
["_admin"]["storage"]
442 if base_folder
["pkg-dir"]:
443 cloud_init_file
= "{}/{}/cloud_init/{}".format(
444 base_folder
["folder"],
445 base_folder
["pkg-dir"],
446 vdu
["cloud-init-file"],
449 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
450 base_folder
["folder"],
451 vdu
["cloud-init-file"],
453 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
454 cloud_init_content
= ci_file
.read()
455 elif vdu
.get("cloud-init"):
456 cloud_init_content
= vdu
["cloud-init"]
458 return cloud_init_content
459 except FsException
as e
:
461 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
462 vnfd
["id"], vdu
["id"], cloud_init_file
, e
466 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
468 (vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]), {}
470 additional_params
= vdur
.get("additionalParams")
471 return parse_yaml_strings(additional_params
)
473 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
475 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
476 :param vnfd: input vnfd
477 :param new_id: overrides vnf id if provided
478 :param additionalParams: Instantiation params for VNFs provided
479 :param nsrId: Id of the NSR
480 :return: copy of vnfd
482 vnfd_RO
= deepcopy(vnfd
)
483 # remove unused by RO configuration, monitoring, scaling and internal keys
484 vnfd_RO
.pop("_id", None)
485 vnfd_RO
.pop("_admin", None)
486 vnfd_RO
.pop("monitoring-param", None)
487 vnfd_RO
.pop("scaling-group-descriptor", None)
488 vnfd_RO
.pop("kdu", None)
489 vnfd_RO
.pop("k8s-cluster", None)
491 vnfd_RO
["id"] = new_id
493 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
494 for vdu
in get_iterable(vnfd_RO
, "vdu"):
495 vdu
.pop("cloud-init-file", None)
496 vdu
.pop("cloud-init", None)
500 def ip_profile_2_RO(ip_profile
):
501 RO_ip_profile
= deepcopy(ip_profile
)
502 if "dns-server" in RO_ip_profile
:
503 if isinstance(RO_ip_profile
["dns-server"], list):
504 RO_ip_profile
["dns-address"] = []
505 for ds
in RO_ip_profile
.pop("dns-server"):
506 RO_ip_profile
["dns-address"].append(ds
["address"])
508 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
509 if RO_ip_profile
.get("ip-version") == "ipv4":
510 RO_ip_profile
["ip-version"] = "IPv4"
511 if RO_ip_profile
.get("ip-version") == "ipv6":
512 RO_ip_profile
["ip-version"] = "IPv6"
513 if "dhcp-params" in RO_ip_profile
:
514 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
517 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
518 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
519 if db_vim
["_admin"]["operationalState"] != "ENABLED":
521 "VIM={} is not available. operationalState={}".format(
522 vim_account
, db_vim
["_admin"]["operationalState"]
525 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
528 def get_ro_wim_id_for_wim_account(self
, wim_account
):
529 if isinstance(wim_account
, str):
530 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
531 if db_wim
["_admin"]["operationalState"] != "ENABLED":
533 "WIM={} is not available. operationalState={}".format(
534 wim_account
, db_wim
["_admin"]["operationalState"]
537 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
542 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
544 db_vdu_push_list
= []
546 db_update
= {"_admin.modified": time()}
548 for vdu_id
, vdu_count
in vdu_create
.items():
552 for vdur
in reversed(db_vnfr
["vdur"])
553 if vdur
["vdu-id-ref"] == vdu_id
558 # Read the template saved in the db:
560 "No vdur in the database. Using the vdur-template to scale"
562 vdur_template
= db_vnfr
.get("vdur-template")
563 if not vdur_template
:
565 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
569 vdur
= vdur_template
[0]
570 # Delete a template from the database after using it
573 {"_id": db_vnfr
["_id"]},
575 pull
={"vdur-template": {"_id": vdur
["_id"]}},
577 for count
in range(vdu_count
):
578 vdur_copy
= deepcopy(vdur
)
579 vdur_copy
["status"] = "BUILD"
580 vdur_copy
["status-detailed"] = None
581 vdur_copy
["ip-address"] = None
582 vdur_copy
["_id"] = str(uuid4())
583 vdur_copy
["count-index"] += count
+ 1
584 vdur_copy
["id"] = "{}-{}".format(
585 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
587 vdur_copy
.pop("vim_info", None)
588 for iface
in vdur_copy
["interfaces"]:
589 if iface
.get("fixed-ip"):
590 iface
["ip-address"] = self
.increment_ip_mac(
591 iface
["ip-address"], count
+ 1
594 iface
.pop("ip-address", None)
595 if iface
.get("fixed-mac"):
596 iface
["mac-address"] = self
.increment_ip_mac(
597 iface
["mac-address"], count
+ 1
600 iface
.pop("mac-address", None)
604 ) # only first vdu can be managment of vnf
605 db_vdu_push_list
.append(vdur_copy
)
606 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
608 if len(db_vnfr
["vdur"]) == 1:
609 # The scale will move to 0 instances
611 "Scaling to 0 !, creating the template with the last vdur"
613 template_vdur
= [db_vnfr
["vdur"][0]]
614 for vdu_id
, vdu_count
in vdu_delete
.items():
616 indexes_to_delete
= [
618 for iv
in enumerate(db_vnfr
["vdur"])
619 if iv
[1]["vdu-id-ref"] == vdu_id
623 "vdur.{}.status".format(i
): "DELETING"
624 for i
in indexes_to_delete
[-vdu_count
:]
628 # it must be deleted one by one because common.db does not allow otherwise
631 for v
in reversed(db_vnfr
["vdur"])
632 if v
["vdu-id-ref"] == vdu_id
634 for vdu
in vdus_to_delete
[:vdu_count
]:
637 {"_id": db_vnfr
["_id"]},
639 pull
={"vdur": {"_id": vdu
["_id"]}},
643 db_push
["vdur"] = db_vdu_push_list
645 db_push
["vdur-template"] = template_vdur
648 db_vnfr
["vdur-template"] = template_vdur
649 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
650 # modify passed dictionary db_vnfr
651 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
652 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
654 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
656 Updates database nsr with the RO info for the created vld
657 :param ns_update_nsr: dictionary to be filled with the updated info
658 :param db_nsr: content of db_nsr. This is also modified
659 :param nsr_desc_RO: nsr descriptor from RO
660 :return: Nothing, LcmException is raised on errors
663 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
664 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
665 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
667 vld
["vim-id"] = net_RO
.get("vim_net_id")
668 vld
["name"] = net_RO
.get("vim_name")
669 vld
["status"] = net_RO
.get("status")
670 vld
["status-detailed"] = net_RO
.get("error_msg")
671 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
675 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
678 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
680 for db_vnfr
in db_vnfrs
.values():
681 vnfr_update
= {"status": "ERROR"}
682 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
683 if "status" not in vdur
:
684 vdur
["status"] = "ERROR"
685 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
687 vdur
["status-detailed"] = str(error_text
)
689 "vdur.{}.status-detailed".format(vdu_index
)
691 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
692 except DbException
as e
:
693 self
.logger
.error("Cannot update vnf. {}".format(e
))
695 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
697 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
698 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
699 :param nsr_desc_RO: nsr descriptor from RO
700 :return: Nothing, LcmException is raised on errors
702 for vnf_index
, db_vnfr
in db_vnfrs
.items():
703 for vnf_RO
in nsr_desc_RO
["vnfs"]:
704 if vnf_RO
["member_vnf_index"] != vnf_index
:
707 if vnf_RO
.get("ip_address"):
708 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
[
711 elif not db_vnfr
.get("ip-address"):
712 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
713 raise LcmExceptionNoMgmtIP(
714 "ns member_vnf_index '{}' has no IP address".format(
719 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
720 vdur_RO_count_index
= 0
721 if vdur
.get("pdu-type"):
723 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
724 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
726 if vdur
["count-index"] != vdur_RO_count_index
:
727 vdur_RO_count_index
+= 1
729 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
730 if vdur_RO
.get("ip_address"):
731 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
733 vdur
["ip-address"] = None
734 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
735 vdur
["name"] = vdur_RO
.get("vim_name")
736 vdur
["status"] = vdur_RO
.get("status")
737 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
738 for ifacer
in get_iterable(vdur
, "interfaces"):
739 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
740 if ifacer
["name"] == interface_RO
.get("internal_name"):
741 ifacer
["ip-address"] = interface_RO
.get(
744 ifacer
["mac-address"] = interface_RO
.get(
750 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
751 "from VIM info".format(
752 vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]
755 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
759 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
761 vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]
765 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
766 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
767 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
769 vld
["vim-id"] = net_RO
.get("vim_net_id")
770 vld
["name"] = net_RO
.get("vim_name")
771 vld
["status"] = net_RO
.get("status")
772 vld
["status-detailed"] = net_RO
.get("error_msg")
773 vnfr_update
["vld.{}".format(vld_index
)] = vld
777 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
782 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
787 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
792 def _get_ns_config_info(self
, nsr_id
):
794 Generates a mapping between vnf,vdu elements and the N2VC id
795 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
796 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
797 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
798 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
800 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
801 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
803 ns_config_info
= {"osm-config-mapping": mapping
}
804 for vca
in vca_deployed_list
:
805 if not vca
["member-vnf-index"]:
807 if not vca
["vdu_id"]:
808 mapping
[vca
["member-vnf-index"]] = vca
["application"]
812 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
814 ] = vca
["application"]
815 return ns_config_info
817 async def _instantiate_ng_ro(
834 def get_vim_account(vim_account_id
):
836 if vim_account_id
in db_vims
:
837 return db_vims
[vim_account_id
]
838 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
839 db_vims
[vim_account_id
] = db_vim
842 # modify target_vld info with instantiation parameters
843 def parse_vld_instantiation_params(
844 target_vim
, target_vld
, vld_params
, target_sdn
846 if vld_params
.get("ip-profile"):
847 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
[
850 if vld_params
.get("provider-network"):
851 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
854 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
855 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
858 if vld_params
.get("wimAccountId"):
859 target_wim
= "wim:{}".format(vld_params
["wimAccountId"])
860 target_vld
["vim_info"][target_wim
] = {}
861 for param
in ("vim-network-name", "vim-network-id"):
862 if vld_params
.get(param
):
863 if isinstance(vld_params
[param
], dict):
864 for vim
, vim_net
in vld_params
[param
].items():
865 other_target_vim
= "vim:" + vim
867 target_vld
["vim_info"],
868 (other_target_vim
, param
.replace("-", "_")),
871 else: # isinstance str
872 target_vld
["vim_info"][target_vim
][
873 param
.replace("-", "_")
874 ] = vld_params
[param
]
875 if vld_params
.get("common_id"):
876 target_vld
["common_id"] = vld_params
.get("common_id")
878 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
879 def update_ns_vld_target(target
, ns_params
):
880 for vnf_params
in ns_params
.get("vnf", ()):
881 if vnf_params
.get("vimAccountId"):
885 for vnfr
in db_vnfrs
.values()
886 if vnf_params
["member-vnf-index"]
887 == vnfr
["member-vnf-index-ref"]
891 vdur
= next((vdur
for vdur
in target_vnf
.get("vdur", ())), None)
892 for a_index
, a_vld
in enumerate(target
["ns"]["vld"]):
893 target_vld
= find_in_list(
894 get_iterable(vdur
, "interfaces"),
895 lambda iface
: iface
.get("ns-vld-id") == a_vld
["name"],
898 vld_params
= find_in_list(
899 get_iterable(ns_params
, "vld"),
900 lambda v_vld
: v_vld
["name"] in (a_vld
["name"], a_vld
["id"]),
904 if vnf_params
.get("vimAccountId") not in a_vld
.get(
907 target_vim_network_list
= [
908 v
for _
, v
in a_vld
.get("vim_info").items()
910 target_vim_network_name
= next(
912 item
.get("vim_network_name", "")
913 for item
in target_vim_network_list
918 target
["ns"]["vld"][a_index
].get("vim_info").update(
920 "vim:{}".format(vnf_params
["vimAccountId"]): {
921 "vim_network_name": target_vim_network_name
,
927 for param
in ("vim-network-name", "vim-network-id"):
928 if vld_params
.get(param
) and isinstance(
929 vld_params
[param
], dict
931 for vim
, vim_net
in vld_params
[
934 other_target_vim
= "vim:" + vim
936 target
["ns"]["vld"][a_index
].get(
941 param
.replace("-", "_"),
946 nslcmop_id
= db_nslcmop
["_id"]
948 "name": db_nsr
["name"],
951 "image": deepcopy(db_nsr
["image"]),
952 "flavor": deepcopy(db_nsr
["flavor"]),
953 "action_id": nslcmop_id
,
954 "cloud_init_content": {},
956 for image
in target
["image"]:
957 image
["vim_info"] = {}
958 for flavor
in target
["flavor"]:
959 flavor
["vim_info"] = {}
960 if db_nsr
.get("affinity-or-anti-affinity-group"):
961 target
["affinity-or-anti-affinity-group"] = deepcopy(
962 db_nsr
["affinity-or-anti-affinity-group"]
964 for affinity_or_anti_affinity_group
in target
[
965 "affinity-or-anti-affinity-group"
967 affinity_or_anti_affinity_group
["vim_info"] = {}
969 if db_nslcmop
.get("lcmOperationType") != "instantiate":
970 # get parameters of instantiation:
971 db_nslcmop_instantiate
= self
.db
.get_list(
974 "nsInstanceId": db_nslcmop
["nsInstanceId"],
975 "lcmOperationType": "instantiate",
978 ns_params
= db_nslcmop_instantiate
.get("operationParams")
980 ns_params
= db_nslcmop
.get("operationParams")
981 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
982 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
985 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
986 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
990 "mgmt-network": vld
.get("mgmt-network", False),
991 "type": vld
.get("type"),
994 "vim_network_name": vld
.get("vim-network-name"),
995 "vim_account_id": ns_params
["vimAccountId"],
999 # check if this network needs SDN assist
1000 if vld
.get("pci-interfaces"):
1001 db_vim
= get_vim_account(ns_params
["vimAccountId"])
1002 sdnc_id
= db_vim
["config"].get("sdn-controller")
1004 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
1005 target_sdn
= "sdn:{}".format(sdnc_id
)
1006 target_vld
["vim_info"][target_sdn
] = {
1008 "target_vim": target_vim
,
1010 "type": vld
.get("type"),
1013 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
1014 for nsd_vnf_profile
in nsd_vnf_profiles
:
1015 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
1016 if cp
["virtual-link-profile-id"] == vld
["id"]:
1018 "member_vnf:{}.{}".format(
1019 cp
["constituent-cpd-id"][0][
1020 "constituent-base-element-id"
1022 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
1024 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
1026 # check at nsd descriptor, if there is an ip-profile
1028 nsd_vlp
= find_in_list(
1029 get_virtual_link_profiles(nsd
),
1030 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
1035 and nsd_vlp
.get("virtual-link-protocol-data")
1036 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1038 ip_profile_source_data
= nsd_vlp
["virtual-link-protocol-data"][
1041 ip_profile_dest_data
= {}
1042 if "ip-version" in ip_profile_source_data
:
1043 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1046 if "cidr" in ip_profile_source_data
:
1047 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1050 if "gateway-ip" in ip_profile_source_data
:
1051 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
[
1054 if "dhcp-enabled" in ip_profile_source_data
:
1055 ip_profile_dest_data
["dhcp-params"] = {
1056 "enabled": ip_profile_source_data
["dhcp-enabled"]
1058 vld_params
["ip-profile"] = ip_profile_dest_data
1060 # update vld_params with instantiation params
1061 vld_instantiation_params
= find_in_list(
1062 get_iterable(ns_params
, "vld"),
1063 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
1065 if vld_instantiation_params
:
1066 vld_params
.update(vld_instantiation_params
)
1067 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
1068 target
["ns"]["vld"].append(target_vld
)
1069 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1070 update_ns_vld_target(target
, ns_params
)
1072 for vnfr
in db_vnfrs
.values():
1073 vnfd
= find_in_list(
1074 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
1076 vnf_params
= find_in_list(
1077 get_iterable(ns_params
, "vnf"),
1078 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
1080 target_vnf
= deepcopy(vnfr
)
1081 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
1082 for vld
in target_vnf
.get("vld", ()):
1083 # check if connected to a ns.vld, to fill target'
1084 vnf_cp
= find_in_list(
1085 vnfd
.get("int-virtual-link-desc", ()),
1086 lambda cpd
: cpd
.get("id") == vld
["id"],
1089 ns_cp
= "member_vnf:{}.{}".format(
1090 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
1092 if cp2target
.get(ns_cp
):
1093 vld
["target"] = cp2target
[ns_cp
]
1096 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
1098 # check if this network needs SDN assist
1100 if vld
.get("pci-interfaces"):
1101 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1102 sdnc_id
= db_vim
["config"].get("sdn-controller")
1104 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
1105 target_sdn
= "sdn:{}".format(sdnc_id
)
1106 vld
["vim_info"][target_sdn
] = {
1108 "target_vim": target_vim
,
1110 "type": vld
.get("type"),
1113 # check at vnfd descriptor, if there is an ip-profile
1115 vnfd_vlp
= find_in_list(
1116 get_virtual_link_profiles(vnfd
),
1117 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
1121 and vnfd_vlp
.get("virtual-link-protocol-data")
1122 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1124 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"][
1127 ip_profile_dest_data
= {}
1128 if "ip-version" in ip_profile_source_data
:
1129 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1132 if "cidr" in ip_profile_source_data
:
1133 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1136 if "gateway-ip" in ip_profile_source_data
:
1137 ip_profile_dest_data
[
1139 ] = ip_profile_source_data
["gateway-ip"]
1140 if "dhcp-enabled" in ip_profile_source_data
:
1141 ip_profile_dest_data
["dhcp-params"] = {
1142 "enabled": ip_profile_source_data
["dhcp-enabled"]
1145 vld_params
["ip-profile"] = ip_profile_dest_data
1146 # update vld_params with instantiation params
1148 vld_instantiation_params
= find_in_list(
1149 get_iterable(vnf_params
, "internal-vld"),
1150 lambda i_vld
: i_vld
["name"] == vld
["id"],
1152 if vld_instantiation_params
:
1153 vld_params
.update(vld_instantiation_params
)
1154 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
1157 for vdur
in target_vnf
.get("vdur", ()):
1158 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
1159 continue # This vdu must not be created
1160 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
1162 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
1165 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
1166 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
1169 and vdu_configuration
.get("config-access")
1170 and vdu_configuration
.get("config-access").get("ssh-access")
1172 vdur
["ssh-keys"] = ssh_keys_all
1173 vdur
["ssh-access-required"] = vdu_configuration
[
1175 ]["ssh-access"]["required"]
1178 and vnf_configuration
.get("config-access")
1179 and vnf_configuration
.get("config-access").get("ssh-access")
1180 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
1182 vdur
["ssh-keys"] = ssh_keys_all
1183 vdur
["ssh-access-required"] = vnf_configuration
[
1185 ]["ssh-access"]["required"]
1186 elif ssh_keys_instantiation
and find_in_list(
1187 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
1189 vdur
["ssh-keys"] = ssh_keys_instantiation
1191 self
.logger
.debug("NS > vdur > {}".format(vdur
))
1193 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1195 if vdud
.get("cloud-init-file"):
1196 vdur
["cloud-init"] = "{}:file:{}".format(
1197 vnfd
["_id"], vdud
.get("cloud-init-file")
1199 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1200 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1201 base_folder
= vnfd
["_admin"]["storage"]
1202 if base_folder
["pkg-dir"]:
1203 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1204 base_folder
["folder"],
1205 base_folder
["pkg-dir"],
1206 vdud
.get("cloud-init-file"),
1209 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
1210 base_folder
["folder"],
1211 vdud
.get("cloud-init-file"),
1213 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1214 target
["cloud_init_content"][
1217 elif vdud
.get("cloud-init"):
1218 vdur
["cloud-init"] = "{}:vdu:{}".format(
1219 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1221 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1222 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1225 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1226 deploy_params_vdu
= self
._format
_additional
_params
(
1227 vdur
.get("additionalParams") or {}
1229 deploy_params_vdu
["OSM"] = get_osm_params(
1230 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1232 vdur
["additionalParams"] = deploy_params_vdu
1235 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1236 if target_vim
not in ns_flavor
["vim_info"]:
1237 ns_flavor
["vim_info"][target_vim
] = {}
1240 # in case alternative images are provided we must check if they should be applied
1241 # for the vim_type, modify the vim_type taking into account
1242 ns_image_id
= int(vdur
["ns-image-id"])
1243 if vdur
.get("alt-image-ids"):
1244 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1245 vim_type
= db_vim
["vim_type"]
1246 for alt_image_id
in vdur
.get("alt-image-ids"):
1247 ns_alt_image
= target
["image"][int(alt_image_id
)]
1248 if vim_type
== ns_alt_image
.get("vim-type"):
1249 # must use alternative image
1251 "use alternative image id: {}".format(alt_image_id
)
1253 ns_image_id
= alt_image_id
1254 vdur
["ns-image-id"] = ns_image_id
1256 ns_image
= target
["image"][int(ns_image_id
)]
1257 if target_vim
not in ns_image
["vim_info"]:
1258 ns_image
["vim_info"][target_vim
] = {}
1261 if vdur
.get("affinity-or-anti-affinity-group-id"):
1262 for ags_id
in vdur
["affinity-or-anti-affinity-group-id"]:
1263 ns_ags
= target
["affinity-or-anti-affinity-group"][int(ags_id
)]
1264 if target_vim
not in ns_ags
["vim_info"]:
1265 ns_ags
["vim_info"][target_vim
] = {}
1267 vdur
["vim_info"] = {target_vim
: {}}
1268 # instantiation parameters
1270 vdu_instantiation_params
= find_in_list(
1271 get_iterable(vnf_params
, "vdu"),
1272 lambda i_vdu
: i_vdu
["id"] == vdud
["id"],
1274 if vdu_instantiation_params
:
1275 # Parse the vdu_volumes from the instantiation params
1276 vdu_volumes
= get_volumes_from_instantiation_params(
1277 vdu_instantiation_params
, vdud
1279 vdur
["additionalParams"]["OSM"]["vdu_volumes"] = vdu_volumes
1280 vdur_list
.append(vdur
)
1281 target_vnf
["vdur"] = vdur_list
1282 target
["vnf"].append(target_vnf
)
1284 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
1285 desc
= await self
.RO
.deploy(nsr_id
, target
)
1286 self
.logger
.debug("RO return > {}".format(desc
))
1287 action_id
= desc
["action_id"]
1288 await self
._wait
_ng
_ro
(
1295 operation
="instantiation",
1300 "_admin.deployed.RO.operational-status": "running",
1301 "detailed-status": " ".join(stage
),
1303 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1304 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1305 self
._write
_op
_status
(nslcmop_id
, stage
)
1307 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1311 async def _wait_ng_ro(
1321 detailed_status_old
= None
1323 start_time
= start_time
or time()
1324 while time() <= start_time
+ timeout
:
1325 desc_status
= await self
.op_status_map
[operation
](nsr_id
, action_id
)
1326 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1327 if desc_status
["status"] == "FAILED":
1328 raise NgRoException(desc_status
["details"])
1329 elif desc_status
["status"] == "BUILD":
1331 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1332 elif desc_status
["status"] == "DONE":
1334 stage
[2] = "Deployed at VIM"
1337 assert False, "ROclient.check_ns_status returns unknown {}".format(
1338 desc_status
["status"]
1340 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1341 detailed_status_old
= stage
[2]
1342 db_nsr_update
["detailed-status"] = " ".join(stage
)
1343 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1344 self
._write
_op
_status
(nslcmop_id
, stage
)
1345 await asyncio
.sleep(15, loop
=self
.loop
)
1346 else: # timeout_ns_deploy
1347 raise NgRoException("Timeout waiting ns to deploy")
1349 async def _terminate_ng_ro(
1350 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1355 start_deploy
= time()
1362 "action_id": nslcmop_id
,
1364 desc
= await self
.RO
.deploy(nsr_id
, target
)
1365 action_id
= desc
["action_id"]
1366 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1367 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1370 + "ns terminate action at RO. action_id={}".format(action_id
)
1374 delete_timeout
= 20 * 60 # 20 minutes
1375 await self
._wait
_ng
_ro
(
1382 operation
="termination",
1385 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1386 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1388 await self
.RO
.delete(nsr_id
)
1389 except Exception as e
:
1390 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
1391 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1392 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1393 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1395 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1397 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
1398 failed_detail
.append("delete conflict: {}".format(e
))
1401 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1404 failed_detail
.append("delete error: {}".format(e
))
1407 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1411 stage
[2] = "Error deleting from VIM"
1413 stage
[2] = "Deleted from VIM"
1414 db_nsr_update
["detailed-status"] = " ".join(stage
)
1415 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1416 self
._write
_op
_status
(nslcmop_id
, stage
)
1419 raise LcmException("; ".join(failed_detail
))
1422 async def instantiate_RO(
1436 :param logging_text: preffix text to use at logging
1437 :param nsr_id: nsr identity
1438 :param nsd: database content of ns descriptor
1439 :param db_nsr: database content of ns record
1440 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1442 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1443 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1444 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1445 :return: None or exception
1448 start_deploy
= time()
1449 ns_params
= db_nslcmop
.get("operationParams")
1450 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1451 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1453 timeout_ns_deploy
= self
.timeout
.get(
1454 "ns_deploy", self
.timeout_ns_deploy
1457 # Check for and optionally request placement optimization. Database will be updated if placement activated
1458 stage
[2] = "Waiting for Placement."
1459 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1460 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1461 for vnfr
in db_vnfrs
.values():
1462 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1465 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1467 return await self
._instantiate
_ng
_ro
(
1480 except Exception as e
:
1481 stage
[2] = "ERROR deploying at VIM"
1482 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1484 "Error deploying at VIM {}".format(e
),
1485 exc_info
=not isinstance(
1488 ROclient
.ROClientException
,
1497 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1499 Wait for kdu to be up, get ip address
1500 :param logging_text: prefix use for logging
1504 :return: IP address, K8s services
1507 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1510 while nb_tries
< 360:
1511 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1515 for x
in get_iterable(db_vnfr
, "kdur")
1516 if x
.get("kdu-name") == kdu_name
1522 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1524 if kdur
.get("status"):
1525 if kdur
["status"] in ("READY", "ENABLED"):
1526 return kdur
.get("ip-address"), kdur
.get("services")
1529 "target KDU={} is in error state".format(kdu_name
)
1532 await asyncio
.sleep(10, loop
=self
.loop
)
1534 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1536 async def wait_vm_up_insert_key_ro(
1537 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1540 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1541 :param logging_text: prefix use for logging
1546 :param pub_key: public ssh key to inject, None to skip
1547 :param user: user to apply the public ssh key
1551 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1555 target_vdu_id
= None
1561 if ro_retries
>= 360: # 1 hour
1563 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1566 await asyncio
.sleep(10, loop
=self
.loop
)
1569 if not target_vdu_id
:
1570 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1572 if not vdu_id
: # for the VNF case
1573 if db_vnfr
.get("status") == "ERROR":
1575 "Cannot inject ssh-key because target VNF is in error state"
1577 ip_address
= db_vnfr
.get("ip-address")
1583 for x
in get_iterable(db_vnfr
, "vdur")
1584 if x
.get("ip-address") == ip_address
1592 for x
in get_iterable(db_vnfr
, "vdur")
1593 if x
.get("vdu-id-ref") == vdu_id
1594 and x
.get("count-index") == vdu_index
1600 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1601 ): # If only one, this should be the target vdu
1602 vdur
= db_vnfr
["vdur"][0]
1605 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1606 vnfr_id
, vdu_id
, vdu_index
1609 # New generation RO stores information at "vim_info"
1612 if vdur
.get("vim_info"):
1614 t
for t
in vdur
["vim_info"]
1615 ) # there should be only one key
1616 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1618 vdur
.get("pdu-type")
1619 or vdur
.get("status") == "ACTIVE"
1620 or ng_ro_status
== "ACTIVE"
1622 ip_address
= vdur
.get("ip-address")
1625 target_vdu_id
= vdur
["vdu-id-ref"]
1626 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1628 "Cannot inject ssh-key because target VM is in error state"
1631 if not target_vdu_id
:
1634 # inject public key into machine
1635 if pub_key
and user
:
1636 self
.logger
.debug(logging_text
+ "Inserting RO key")
1637 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1638 if vdur
.get("pdu-type"):
1639 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1642 ro_vm_id
= "{}-{}".format(
1643 db_vnfr
["member-vnf-index-ref"], target_vdu_id
1644 ) # TODO add vdu_index
1648 "action": "inject_ssh_key",
1652 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1654 desc
= await self
.RO
.deploy(nsr_id
, target
)
1655 action_id
= desc
["action_id"]
1656 await self
._wait
_ng
_ro
(
1657 nsr_id
, action_id
, timeout
=600, operation
="instantiation"
1661 # wait until NS is deployed at RO
1663 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1664 ro_nsr_id
= deep_get(
1665 db_nsrs
, ("_admin", "deployed", "RO", "nsr_id")
1669 result_dict
= await self
.RO
.create_action(
1671 item_id_name
=ro_nsr_id
,
1673 "add_public_key": pub_key
,
1678 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1679 if not result_dict
or not isinstance(result_dict
, dict):
1681 "Unknown response from RO when injecting key"
1683 for result
in result_dict
.values():
1684 if result
.get("vim_result") == 200:
1687 raise ROclient
.ROClientException(
1688 "error injecting key: {}".format(
1689 result
.get("description")
1693 except NgRoException
as e
:
1695 "Reaching max tries injecting key. Error: {}".format(e
)
1697 except ROclient
.ROClientException
as e
:
1701 + "error injecting key: {}. Retrying until {} seconds".format(
1708 "Reaching max tries injecting key. Error: {}".format(e
)
1715 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1717 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1719 my_vca
= vca_deployed_list
[vca_index
]
1720 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1721 # vdu or kdu: no dependencies
1725 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1726 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1727 configuration_status_list
= db_nsr
["configurationStatus"]
1728 for index
, vca_deployed
in enumerate(configuration_status_list
):
1729 if index
== vca_index
:
1732 if not my_vca
.get("member-vnf-index") or (
1733 vca_deployed
.get("member-vnf-index")
1734 == my_vca
.get("member-vnf-index")
1736 internal_status
= configuration_status_list
[index
].get("status")
1737 if internal_status
== "READY":
1739 elif internal_status
== "BROKEN":
1741 "Configuration aborted because dependent charm/s has failed"
1746 # no dependencies, return
1748 await asyncio
.sleep(10)
1751 raise LcmException("Configuration aborted because dependent charm/s timeout")
1753 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1756 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1758 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1759 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1762 async def instantiate_N2VC(
1779 ee_config_descriptor
,
1781 nsr_id
= db_nsr
["_id"]
1782 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1783 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1784 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1785 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1787 "collection": "nsrs",
1788 "filter": {"_id": nsr_id
},
1789 "path": db_update_entry
,
1795 element_under_configuration
= nsr_id
1799 vnfr_id
= db_vnfr
["_id"]
1800 osm_config
["osm"]["vnf_id"] = vnfr_id
1802 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1804 if vca_type
== "native_charm":
1807 index_number
= vdu_index
or 0
1810 element_type
= "VNF"
1811 element_under_configuration
= vnfr_id
1812 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1814 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1815 element_type
= "VDU"
1816 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1817 osm_config
["osm"]["vdu_id"] = vdu_id
1819 namespace
+= ".{}".format(kdu_name
)
1820 element_type
= "KDU"
1821 element_under_configuration
= kdu_name
1822 osm_config
["osm"]["kdu_name"] = kdu_name
1825 if base_folder
["pkg-dir"]:
1826 artifact_path
= "{}/{}/{}/{}".format(
1827 base_folder
["folder"],
1828 base_folder
["pkg-dir"],
1831 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1836 artifact_path
= "{}/Scripts/{}/{}/".format(
1837 base_folder
["folder"],
1840 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1845 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1847 # get initial_config_primitive_list that applies to this element
1848 initial_config_primitive_list
= config_descriptor
.get(
1849 "initial-config-primitive"
1853 "Initial config primitive list > {}".format(
1854 initial_config_primitive_list
1858 # add config if not present for NS charm
1859 ee_descriptor_id
= ee_config_descriptor
.get("id")
1860 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1861 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1862 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1866 "Initial config primitive list #2 > {}".format(
1867 initial_config_primitive_list
1870 # n2vc_redesign STEP 3.1
1871 # find old ee_id if exists
1872 ee_id
= vca_deployed
.get("ee_id")
1874 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1875 # create or register execution environment in VCA
1876 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1878 self
._write
_configuration
_status
(
1880 vca_index
=vca_index
,
1882 element_under_configuration
=element_under_configuration
,
1883 element_type
=element_type
,
1886 step
= "create execution environment"
1887 self
.logger
.debug(logging_text
+ step
)
1891 if vca_type
== "k8s_proxy_charm":
1892 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1893 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1894 namespace
=namespace
,
1895 artifact_path
=artifact_path
,
1899 elif vca_type
== "helm" or vca_type
== "helm-v3":
1900 ee_id
, credentials
= await self
.vca_map
[
1902 ].create_execution_environment(
1903 namespace
=namespace
,
1907 artifact_path
=artifact_path
,
1911 ee_id
, credentials
= await self
.vca_map
[
1913 ].create_execution_environment(
1914 namespace
=namespace
,
1920 elif vca_type
== "native_charm":
1921 step
= "Waiting to VM being up and getting IP address"
1922 self
.logger
.debug(logging_text
+ step
)
1923 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1932 credentials
= {"hostname": rw_mgmt_ip
}
1934 username
= deep_get(
1935 config_descriptor
, ("config-access", "ssh-access", "default-user")
1937 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1938 # merged. Meanwhile let's get username from initial-config-primitive
1939 if not username
and initial_config_primitive_list
:
1940 for config_primitive
in initial_config_primitive_list
:
1941 for param
in config_primitive
.get("parameter", ()):
1942 if param
["name"] == "ssh-username":
1943 username
= param
["value"]
1947 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1948 "'config-access.ssh-access.default-user'"
1950 credentials
["username"] = username
1951 # n2vc_redesign STEP 3.2
1953 self
._write
_configuration
_status
(
1955 vca_index
=vca_index
,
1956 status
="REGISTERING",
1957 element_under_configuration
=element_under_configuration
,
1958 element_type
=element_type
,
1961 step
= "register execution environment {}".format(credentials
)
1962 self
.logger
.debug(logging_text
+ step
)
1963 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1964 credentials
=credentials
,
1965 namespace
=namespace
,
1970 # for compatibility with MON/POL modules, the need model and application name at database
1971 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1972 ee_id_parts
= ee_id
.split(".")
1973 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1974 if len(ee_id_parts
) >= 2:
1975 model_name
= ee_id_parts
[0]
1976 application_name
= ee_id_parts
[1]
1977 db_nsr_update
[db_update_entry
+ "model"] = model_name
1978 db_nsr_update
[db_update_entry
+ "application"] = application_name
1980 # n2vc_redesign STEP 3.3
1981 step
= "Install configuration Software"
1983 self
._write
_configuration
_status
(
1985 vca_index
=vca_index
,
1986 status
="INSTALLING SW",
1987 element_under_configuration
=element_under_configuration
,
1988 element_type
=element_type
,
1989 other_update
=db_nsr_update
,
1992 # TODO check if already done
1993 self
.logger
.debug(logging_text
+ step
)
1995 if vca_type
== "native_charm":
1996 config_primitive
= next(
1997 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
2000 if config_primitive
:
2001 config
= self
._map
_primitive
_params
(
2002 config_primitive
, {}, deploy_params
2005 if vca_type
== "lxc_proxy_charm":
2006 if element_type
== "NS":
2007 num_units
= db_nsr
.get("config-units") or 1
2008 elif element_type
== "VNF":
2009 num_units
= db_vnfr
.get("config-units") or 1
2010 elif element_type
== "VDU":
2011 for v
in db_vnfr
["vdur"]:
2012 if vdu_id
== v
["vdu-id-ref"]:
2013 num_units
= v
.get("config-units") or 1
2015 if vca_type
!= "k8s_proxy_charm":
2016 await self
.vca_map
[vca_type
].install_configuration_sw(
2018 artifact_path
=artifact_path
,
2021 num_units
=num_units
,
2026 # write in db flag of configuration_sw already installed
2028 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
2031 # add relations for this VCA (wait for other peers related with this VCA)
2032 await self
._add
_vca
_relations
(
2033 logging_text
=logging_text
,
2036 vca_index
=vca_index
,
2039 # if SSH access is required, then get execution environment SSH public
2040 # if native charm we have waited already to VM be UP
2041 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
2044 # self.logger.debug("get ssh key block")
2046 config_descriptor
, ("config-access", "ssh-access", "required")
2048 # self.logger.debug("ssh key needed")
2049 # Needed to inject a ssh key
2052 ("config-access", "ssh-access", "default-user"),
2054 step
= "Install configuration Software, getting public ssh key"
2055 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
2056 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
2059 step
= "Insert public key into VM user={} ssh_key={}".format(
2063 # self.logger.debug("no need to get ssh key")
2064 step
= "Waiting to VM being up and getting IP address"
2065 self
.logger
.debug(logging_text
+ step
)
2067 # default rw_mgmt_ip to None, avoiding the non definition of the variable
2070 # n2vc_redesign STEP 5.1
2071 # wait for RO (ip-address) Insert pub_key into VM
2074 rw_mgmt_ip
, services
= await self
.wait_kdu_up(
2075 logging_text
, nsr_id
, vnfr_id
, kdu_name
2077 vnfd
= self
.db
.get_one(
2079 {"_id": f
'{db_vnfr["vnfd-id"]}:{db_vnfr["revision"]}'},
2081 kdu
= get_kdu(vnfd
, kdu_name
)
2083 service
["name"] for service
in get_kdu_services(kdu
)
2085 exposed_services
= []
2086 for service
in services
:
2087 if any(s
in service
["name"] for s
in kdu_services
):
2088 exposed_services
.append(service
)
2089 await self
.vca_map
[vca_type
].exec_primitive(
2091 primitive_name
="config",
2093 "osm-config": json
.dumps(
2095 k8s
={"services": exposed_services
}
2102 # This verification is needed in order to avoid trying to add a public key
2103 # to a VM, when the VNF is a KNF (in the edge case where the user creates a VCA
2104 # for a KNF and not for its KDUs, the previous verification gives False, and the code
2105 # jumps to this block, meaning that there is the need to verify if the VNF is actually a VNF
2107 elif db_vnfr
.get("vdur"):
2108 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
2118 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
2120 # store rw_mgmt_ip in deploy params for later replacement
2121 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
2123 # n2vc_redesign STEP 6 Execute initial config primitive
2124 step
= "execute initial config primitive"
2126 # wait for dependent primitives execution (NS -> VNF -> VDU)
2127 if initial_config_primitive_list
:
2128 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
2130 # stage, in function of element type: vdu, kdu, vnf or ns
2131 my_vca
= vca_deployed_list
[vca_index
]
2132 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
2134 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
2135 elif my_vca
.get("member-vnf-index"):
2137 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
2140 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
2142 self
._write
_configuration
_status
(
2143 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
2146 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2148 check_if_terminated_needed
= True
2149 for initial_config_primitive
in initial_config_primitive_list
:
2150 # adding information on the vca_deployed if it is a NS execution environment
2151 if not vca_deployed
["member-vnf-index"]:
2152 deploy_params
["ns_config_info"] = json
.dumps(
2153 self
._get
_ns
_config
_info
(nsr_id
)
2155 # TODO check if already done
2156 primitive_params_
= self
._map
_primitive
_params
(
2157 initial_config_primitive
, {}, deploy_params
2160 step
= "execute primitive '{}' params '{}'".format(
2161 initial_config_primitive
["name"], primitive_params_
2163 self
.logger
.debug(logging_text
+ step
)
2164 await self
.vca_map
[vca_type
].exec_primitive(
2166 primitive_name
=initial_config_primitive
["name"],
2167 params_dict
=primitive_params_
,
2172 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2173 if check_if_terminated_needed
:
2174 if config_descriptor
.get("terminate-config-primitive"):
2176 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
2178 check_if_terminated_needed
= False
2180 # TODO register in database that primitive is done
2182 # STEP 7 Configure metrics
2183 if vca_type
== "helm" or vca_type
== "helm-v3":
2184 prometheus_jobs
= await self
.extract_prometheus_scrape_jobs(
2186 artifact_path
=artifact_path
,
2187 ee_config_descriptor
=ee_config_descriptor
,
2190 target_ip
=rw_mgmt_ip
,
2196 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
2199 for job
in prometheus_jobs
:
2202 {"job_name": job
["job_name"]},
2205 fail_on_empty
=False,
2208 step
= "instantiated at VCA"
2209 self
.logger
.debug(logging_text
+ step
)
2211 self
._write
_configuration
_status
(
2212 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
2215 except Exception as e
: # TODO not use Exception but N2VC exception
2216 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2218 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
2221 "Exception while {} : {}".format(step
, e
), exc_info
=True
2223 self
._write
_configuration
_status
(
2224 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2226 raise LcmException("{} {}".format(step
, e
)) from e
2228 def _write_ns_status(
2232 current_operation
: str,
2233 current_operation_id
: str,
2234 error_description
: str = None,
2235 error_detail
: str = None,
2236 other_update
: dict = None,
2239 Update db_nsr fields.
2242 :param current_operation:
2243 :param current_operation_id:
2244 :param error_description:
2245 :param error_detail:
2246 :param other_update: Other required changes at database if provided, will be cleared
2250 db_dict
= other_update
or {}
2253 ] = current_operation_id
# for backward compatibility
2254 db_dict
["_admin.current-operation"] = current_operation_id
2255 db_dict
["_admin.operation-type"] = (
2256 current_operation
if current_operation
!= "IDLE" else None
2258 db_dict
["currentOperation"] = current_operation
2259 db_dict
["currentOperationID"] = current_operation_id
2260 db_dict
["errorDescription"] = error_description
2261 db_dict
["errorDetail"] = error_detail
2264 db_dict
["nsState"] = ns_state
2265 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2266 except DbException
as e
:
2267 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2269 def _write_op_status(
2273 error_message
: str = None,
2274 queuePosition
: int = 0,
2275 operation_state
: str = None,
2276 other_update
: dict = None,
2279 db_dict
= other_update
or {}
2280 db_dict
["queuePosition"] = queuePosition
2281 if isinstance(stage
, list):
2282 db_dict
["stage"] = stage
[0]
2283 db_dict
["detailed-status"] = " ".join(stage
)
2284 elif stage
is not None:
2285 db_dict
["stage"] = str(stage
)
2287 if error_message
is not None:
2288 db_dict
["errorMessage"] = error_message
2289 if operation_state
is not None:
2290 db_dict
["operationState"] = operation_state
2291 db_dict
["statusEnteredTime"] = time()
2292 self
.update_db_2("nslcmops", op_id
, db_dict
)
2293 except DbException
as e
:
2295 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2298 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2300 nsr_id
= db_nsr
["_id"]
2301 # configurationStatus
2302 config_status
= db_nsr
.get("configurationStatus")
2305 "configurationStatus.{}.status".format(index
): status
2306 for index
, v
in enumerate(config_status
)
2310 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2312 except DbException
as e
:
2314 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2317 def _write_configuration_status(
2322 element_under_configuration
: str = None,
2323 element_type
: str = None,
2324 other_update
: dict = None,
2327 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2328 # .format(vca_index, status))
2331 db_path
= "configurationStatus.{}.".format(vca_index
)
2332 db_dict
= other_update
or {}
2334 db_dict
[db_path
+ "status"] = status
2335 if element_under_configuration
:
2337 db_path
+ "elementUnderConfiguration"
2338 ] = element_under_configuration
2340 db_dict
[db_path
+ "elementType"] = element_type
2341 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2342 except DbException
as e
:
2344 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2345 status
, nsr_id
, vca_index
, e
2349 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2351 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2352 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2353 Database is used because the result can be obtained from a different LCM worker in case of HA.
2354 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2355 :param db_nslcmop: database content of nslcmop
2356 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2357 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2358 computed 'vim-account-id'
2361 nslcmop_id
= db_nslcmop
["_id"]
2362 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2363 if placement_engine
== "PLA":
2365 logging_text
+ "Invoke and wait for placement optimization"
2367 await self
.msg
.aiowrite(
2368 "pla", "get_placement", {"nslcmopId": nslcmop_id
}, loop
=self
.loop
2370 db_poll_interval
= 5
2371 wait
= db_poll_interval
* 10
2373 while not pla_result
and wait
>= 0:
2374 await asyncio
.sleep(db_poll_interval
)
2375 wait
-= db_poll_interval
2376 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2377 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2381 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2384 for pla_vnf
in pla_result
["vnf"]:
2385 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2386 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2391 {"_id": vnfr
["_id"]},
2392 {"vim-account-id": pla_vnf
["vimAccountId"]},
2395 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2398 def update_nsrs_with_pla_result(self
, params
):
2400 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2402 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2404 except Exception as e
:
2405 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2407 async def instantiate(self
, nsr_id
, nslcmop_id
):
2410 :param nsr_id: ns instance to deploy
2411 :param nslcmop_id: operation to run
2415 # Try to lock HA task here
2416 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2417 if not task_is_locked_by_me
:
2419 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2423 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2424 self
.logger
.debug(logging_text
+ "Enter")
2426 # get all needed from database
2428 # database nsrs record
2431 # database nslcmops record
2434 # update operation on nsrs
2436 # update operation on nslcmops
2437 db_nslcmop_update
= {}
2439 nslcmop_operation_state
= None
2440 db_vnfrs
= {} # vnf's info indexed by member-index
2442 tasks_dict_info
= {} # from task to info text
2446 "Stage 1/5: preparation of the environment.",
2447 "Waiting for previous operations to terminate.",
2450 # ^ stage, step, VIM progress
2452 # wait for any previous tasks in process
2453 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2455 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2456 stage
[1] = "Reading from database."
2457 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2458 db_nsr_update
["detailed-status"] = "creating"
2459 db_nsr_update
["operational-status"] = "init"
2460 self
._write
_ns
_status
(
2462 ns_state
="BUILDING",
2463 current_operation
="INSTANTIATING",
2464 current_operation_id
=nslcmop_id
,
2465 other_update
=db_nsr_update
,
2467 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2469 # read from db: operation
2470 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2471 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2472 if db_nslcmop
["operationParams"].get("additionalParamsForVnf"):
2473 db_nslcmop
["operationParams"]["additionalParamsForVnf"] = json
.loads(
2474 db_nslcmop
["operationParams"]["additionalParamsForVnf"]
2476 ns_params
= db_nslcmop
.get("operationParams")
2477 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2478 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2480 timeout_ns_deploy
= self
.timeout
.get(
2481 "ns_deploy", self
.timeout_ns_deploy
2485 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2486 self
.logger
.debug(logging_text
+ stage
[1])
2487 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2488 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2489 self
.logger
.debug(logging_text
+ stage
[1])
2490 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2491 self
.fs
.sync(db_nsr
["nsd-id"])
2493 # nsr_name = db_nsr["name"] # TODO short-name??
2495 # read from db: vnf's of this ns
2496 stage
[1] = "Getting vnfrs from db."
2497 self
.logger
.debug(logging_text
+ stage
[1])
2498 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2500 # read from db: vnfd's for every vnf
2501 db_vnfds
= [] # every vnfd data
2503 # for each vnf in ns, read vnfd
2504 for vnfr
in db_vnfrs_list
:
2505 if vnfr
.get("kdur"):
2507 for kdur
in vnfr
["kdur"]:
2508 if kdur
.get("additionalParams"):
2509 kdur
["additionalParams"] = json
.loads(
2510 kdur
["additionalParams"]
2512 kdur_list
.append(kdur
)
2513 vnfr
["kdur"] = kdur_list
2515 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2516 vnfd_id
= vnfr
["vnfd-id"]
2517 vnfd_ref
= vnfr
["vnfd-ref"]
2518 self
.fs
.sync(vnfd_id
)
2520 # if we haven't this vnfd, read it from db
2521 if vnfd_id
not in db_vnfds
:
2523 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2526 self
.logger
.debug(logging_text
+ stage
[1])
2527 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2530 db_vnfds
.append(vnfd
)
2532 # Get or generates the _admin.deployed.VCA list
2533 vca_deployed_list
= None
2534 if db_nsr
["_admin"].get("deployed"):
2535 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2536 if vca_deployed_list
is None:
2537 vca_deployed_list
= []
2538 configuration_status_list
= []
2539 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2540 db_nsr_update
["configurationStatus"] = configuration_status_list
2541 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2542 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2543 elif isinstance(vca_deployed_list
, dict):
2544 # maintain backward compatibility. Change a dict to list at database
2545 vca_deployed_list
= list(vca_deployed_list
.values())
2546 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2547 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2550 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2552 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2553 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2555 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2556 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2557 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2559 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2562 # n2vc_redesign STEP 2 Deploy Network Scenario
2563 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2564 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2566 stage
[1] = "Deploying KDUs."
2567 # self.logger.debug(logging_text + "Before deploy_kdus")
2568 # Call to deploy_kdus in case exists the "vdu:kdu" param
2569 await self
.deploy_kdus(
2570 logging_text
=logging_text
,
2572 nslcmop_id
=nslcmop_id
,
2575 task_instantiation_info
=tasks_dict_info
,
2578 stage
[1] = "Getting VCA public key."
2579 # n2vc_redesign STEP 1 Get VCA public ssh-key
2580 # feature 1429. Add n2vc public key to needed VMs
2581 n2vc_key
= self
.n2vc
.get_public_key()
2582 n2vc_key_list
= [n2vc_key
]
2583 if self
.vca_config
.get("public_key"):
2584 n2vc_key_list
.append(self
.vca_config
["public_key"])
2586 stage
[1] = "Deploying NS at VIM."
2587 task_ro
= asyncio
.ensure_future(
2588 self
.instantiate_RO(
2589 logging_text
=logging_text
,
2593 db_nslcmop
=db_nslcmop
,
2596 n2vc_key_list
=n2vc_key_list
,
2600 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2601 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2603 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2604 stage
[1] = "Deploying Execution Environments."
2605 self
.logger
.debug(logging_text
+ stage
[1])
2607 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2608 for vnf_profile
in get_vnf_profiles(nsd
):
2609 vnfd_id
= vnf_profile
["vnfd-id"]
2610 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2611 member_vnf_index
= str(vnf_profile
["id"])
2612 db_vnfr
= db_vnfrs
[member_vnf_index
]
2613 base_folder
= vnfd
["_admin"]["storage"]
2619 # Get additional parameters
2620 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2621 if db_vnfr
.get("additionalParamsForVnf"):
2622 deploy_params
.update(
2623 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2626 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2627 if descriptor_config
:
2629 logging_text
=logging_text
2630 + "member_vnf_index={} ".format(member_vnf_index
),
2633 nslcmop_id
=nslcmop_id
,
2639 member_vnf_index
=member_vnf_index
,
2640 vdu_index
=vdu_index
,
2642 deploy_params
=deploy_params
,
2643 descriptor_config
=descriptor_config
,
2644 base_folder
=base_folder
,
2645 task_instantiation_info
=tasks_dict_info
,
2649 # Deploy charms for each VDU that supports one.
2650 for vdud
in get_vdu_list(vnfd
):
2652 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2653 vdur
= find_in_list(
2654 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2657 if vdur
.get("additionalParams"):
2658 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2660 deploy_params_vdu
= deploy_params
2661 deploy_params_vdu
["OSM"] = get_osm_params(
2662 db_vnfr
, vdu_id
, vdu_count_index
=0
2664 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2666 self
.logger
.debug("VDUD > {}".format(vdud
))
2668 "Descriptor config > {}".format(descriptor_config
)
2670 if descriptor_config
:
2673 for vdu_index
in range(vdud_count
):
2674 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2676 logging_text
=logging_text
2677 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2678 member_vnf_index
, vdu_id
, vdu_index
2682 nslcmop_id
=nslcmop_id
,
2688 member_vnf_index
=member_vnf_index
,
2689 vdu_index
=vdu_index
,
2691 deploy_params
=deploy_params_vdu
,
2692 descriptor_config
=descriptor_config
,
2693 base_folder
=base_folder
,
2694 task_instantiation_info
=tasks_dict_info
,
2697 for kdud
in get_kdu_list(vnfd
):
2698 kdu_name
= kdud
["name"]
2699 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2700 if descriptor_config
:
2705 x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
2707 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2708 if kdur
.get("additionalParams"):
2709 deploy_params_kdu
.update(
2710 parse_yaml_strings(kdur
["additionalParams"].copy())
2714 logging_text
=logging_text
,
2717 nslcmop_id
=nslcmop_id
,
2723 member_vnf_index
=member_vnf_index
,
2724 vdu_index
=vdu_index
,
2726 deploy_params
=deploy_params_kdu
,
2727 descriptor_config
=descriptor_config
,
2728 base_folder
=base_folder
,
2729 task_instantiation_info
=tasks_dict_info
,
2733 # Check if this NS has a charm configuration
2734 descriptor_config
= nsd
.get("ns-configuration")
2735 if descriptor_config
and descriptor_config
.get("juju"):
2738 member_vnf_index
= None
2744 # Get additional parameters
2745 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2746 if db_nsr
.get("additionalParamsForNs"):
2747 deploy_params
.update(
2748 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2750 base_folder
= nsd
["_admin"]["storage"]
2752 logging_text
=logging_text
,
2755 nslcmop_id
=nslcmop_id
,
2761 member_vnf_index
=member_vnf_index
,
2762 vdu_index
=vdu_index
,
2764 deploy_params
=deploy_params
,
2765 descriptor_config
=descriptor_config
,
2766 base_folder
=base_folder
,
2767 task_instantiation_info
=tasks_dict_info
,
2771 # rest of staff will be done at finally
2774 ROclient
.ROClientException
,
2780 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
2783 except asyncio
.CancelledError
:
2785 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
2787 exc
= "Operation was cancelled"
2788 except Exception as e
:
2789 exc
= traceback
.format_exc()
2790 self
.logger
.critical(
2791 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
2796 error_list
.append(str(exc
))
2798 # wait for pending tasks
2800 stage
[1] = "Waiting for instantiate pending tasks."
2801 self
.logger
.debug(logging_text
+ stage
[1])
2802 error_list
+= await self
._wait
_for
_tasks
(
2810 stage
[1] = stage
[2] = ""
2811 except asyncio
.CancelledError
:
2812 error_list
.append("Cancelled")
2813 # TODO cancel all tasks
2814 except Exception as exc
:
2815 error_list
.append(str(exc
))
2817 # update operation-status
2818 db_nsr_update
["operational-status"] = "running"
2819 # let's begin with VCA 'configured' status (later we can change it)
2820 db_nsr_update
["config-status"] = "configured"
2821 for task
, task_name
in tasks_dict_info
.items():
2822 if not task
.done() or task
.cancelled() or task
.exception():
2823 if task_name
.startswith(self
.task_name_deploy_vca
):
2824 # A N2VC task is pending
2825 db_nsr_update
["config-status"] = "failed"
2827 # RO or KDU task is pending
2828 db_nsr_update
["operational-status"] = "failed"
2830 # update status at database
2832 error_detail
= ". ".join(error_list
)
2833 self
.logger
.error(logging_text
+ error_detail
)
2834 error_description_nslcmop
= "{} Detail: {}".format(
2835 stage
[0], error_detail
2837 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
2838 nslcmop_id
, stage
[0]
2841 db_nsr_update
["detailed-status"] = (
2842 error_description_nsr
+ " Detail: " + error_detail
2844 db_nslcmop_update
["detailed-status"] = error_detail
2845 nslcmop_operation_state
= "FAILED"
2849 error_description_nsr
= error_description_nslcmop
= None
2851 db_nsr_update
["detailed-status"] = "Done"
2852 db_nslcmop_update
["detailed-status"] = "Done"
2853 nslcmop_operation_state
= "COMPLETED"
2856 self
._write
_ns
_status
(
2859 current_operation
="IDLE",
2860 current_operation_id
=None,
2861 error_description
=error_description_nsr
,
2862 error_detail
=error_detail
,
2863 other_update
=db_nsr_update
,
2865 self
._write
_op
_status
(
2868 error_message
=error_description_nslcmop
,
2869 operation_state
=nslcmop_operation_state
,
2870 other_update
=db_nslcmop_update
,
2873 if nslcmop_operation_state
:
2875 await self
.msg
.aiowrite(
2880 "nslcmop_id": nslcmop_id
,
2881 "operationState": nslcmop_operation_state
,
2885 except Exception as e
:
2887 logging_text
+ "kafka_write notification Exception {}".format(e
)
2890 self
.logger
.debug(logging_text
+ "Exit")
2891 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2893 def _get_vnfd(self
, vnfd_id
: str, cached_vnfds
: Dict
[str, Any
]):
2894 if vnfd_id
not in cached_vnfds
:
2895 cached_vnfds
[vnfd_id
] = self
.db
.get_one("vnfds", {"id": vnfd_id
})
2896 return cached_vnfds
[vnfd_id
]
2898 def _get_vnfr(self
, nsr_id
: str, vnf_profile_id
: str, cached_vnfrs
: Dict
[str, Any
]):
2899 if vnf_profile_id
not in cached_vnfrs
:
2900 cached_vnfrs
[vnf_profile_id
] = self
.db
.get_one(
2903 "member-vnf-index-ref": vnf_profile_id
,
2904 "nsr-id-ref": nsr_id
,
2907 return cached_vnfrs
[vnf_profile_id
]
2909 def _is_deployed_vca_in_relation(
2910 self
, vca
: DeployedVCA
, relation
: Relation
2913 for endpoint
in (relation
.provider
, relation
.requirer
):
2914 if endpoint
["kdu-resource-profile-id"]:
2917 vca
.vnf_profile_id
== endpoint
.vnf_profile_id
2918 and vca
.vdu_profile_id
== endpoint
.vdu_profile_id
2919 and vca
.execution_environment_ref
== endpoint
.execution_environment_ref
2925 def _update_ee_relation_data_with_implicit_data(
2926 self
, nsr_id
, nsd
, ee_relation_data
, cached_vnfds
, vnf_profile_id
: str = None
2928 ee_relation_data
= safe_get_ee_relation(
2929 nsr_id
, ee_relation_data
, vnf_profile_id
=vnf_profile_id
2931 ee_relation_level
= EELevel
.get_level(ee_relation_data
)
2932 if (ee_relation_level
in (EELevel
.VNF
, EELevel
.VDU
)) and not ee_relation_data
[
2933 "execution-environment-ref"
2935 vnf_profile
= get_vnf_profile(nsd
, ee_relation_data
["vnf-profile-id"])
2936 vnfd_id
= vnf_profile
["vnfd-id"]
2937 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2940 if ee_relation_level
== EELevel
.VNF
2941 else ee_relation_data
["vdu-profile-id"]
2943 ee
= get_juju_ee_ref(db_vnfd
, entity_id
)
2946 f
"not execution environments found for ee_relation {ee_relation_data}"
2948 ee_relation_data
["execution-environment-ref"] = ee
["id"]
2949 return ee_relation_data
2951 def _get_ns_relations(
2954 nsd
: Dict
[str, Any
],
2956 cached_vnfds
: Dict
[str, Any
],
2957 ) -> List
[Relation
]:
2959 db_ns_relations
= get_ns_configuration_relation_list(nsd
)
2960 for r
in db_ns_relations
:
2961 provider_dict
= None
2962 requirer_dict
= None
2963 if all(key
in r
for key
in ("provider", "requirer")):
2964 provider_dict
= r
["provider"]
2965 requirer_dict
= r
["requirer"]
2966 elif "entities" in r
:
2967 provider_id
= r
["entities"][0]["id"]
2970 "endpoint": r
["entities"][0]["endpoint"],
2972 if provider_id
!= nsd
["id"]:
2973 provider_dict
["vnf-profile-id"] = provider_id
2974 requirer_id
= r
["entities"][1]["id"]
2977 "endpoint": r
["entities"][1]["endpoint"],
2979 if requirer_id
!= nsd
["id"]:
2980 requirer_dict
["vnf-profile-id"] = requirer_id
2983 "provider/requirer or entities must be included in the relation."
2985 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2986 nsr_id
, nsd
, provider_dict
, cached_vnfds
2988 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2989 nsr_id
, nsd
, requirer_dict
, cached_vnfds
2991 provider
= EERelation(relation_provider
)
2992 requirer
= EERelation(relation_requirer
)
2993 relation
= Relation(r
["name"], provider
, requirer
)
2994 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
2996 relations
.append(relation
)
2999 def _get_vnf_relations(
3002 nsd
: Dict
[str, Any
],
3004 cached_vnfds
: Dict
[str, Any
],
3005 ) -> List
[Relation
]:
3007 vnf_profile
= get_vnf_profile(nsd
, vca
.vnf_profile_id
)
3008 vnf_profile_id
= vnf_profile
["id"]
3009 vnfd_id
= vnf_profile
["vnfd-id"]
3010 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
3011 db_vnf_relations
= get_relation_list(db_vnfd
, vnfd_id
)
3012 for r
in db_vnf_relations
:
3013 provider_dict
= None
3014 requirer_dict
= None
3015 if all(key
in r
for key
in ("provider", "requirer")):
3016 provider_dict
= r
["provider"]
3017 requirer_dict
= r
["requirer"]
3018 elif "entities" in r
:
3019 provider_id
= r
["entities"][0]["id"]
3022 "vnf-profile-id": vnf_profile_id
,
3023 "endpoint": r
["entities"][0]["endpoint"],
3025 if provider_id
!= vnfd_id
:
3026 provider_dict
["vdu-profile-id"] = provider_id
3027 requirer_id
= r
["entities"][1]["id"]
3030 "vnf-profile-id": vnf_profile_id
,
3031 "endpoint": r
["entities"][1]["endpoint"],
3033 if requirer_id
!= vnfd_id
:
3034 requirer_dict
["vdu-profile-id"] = requirer_id
3037 "provider/requirer or entities must be included in the relation."
3039 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3040 nsr_id
, nsd
, provider_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3042 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3043 nsr_id
, nsd
, requirer_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3045 provider
= EERelation(relation_provider
)
3046 requirer
= EERelation(relation_requirer
)
3047 relation
= Relation(r
["name"], provider
, requirer
)
3048 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
3050 relations
.append(relation
)
3053 def _get_kdu_resource_data(
3055 ee_relation
: EERelation
,
3056 db_nsr
: Dict
[str, Any
],
3057 cached_vnfds
: Dict
[str, Any
],
3058 ) -> DeployedK8sResource
:
3059 nsd
= get_nsd(db_nsr
)
3060 vnf_profiles
= get_vnf_profiles(nsd
)
3061 vnfd_id
= find_in_list(
3063 lambda vnf_profile
: vnf_profile
["id"] == ee_relation
.vnf_profile_id
,
3065 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
3066 kdu_resource_profile
= get_kdu_resource_profile(
3067 db_vnfd
, ee_relation
.kdu_resource_profile_id
3069 kdu_name
= kdu_resource_profile
["kdu-name"]
3070 deployed_kdu
, _
= get_deployed_kdu(
3071 db_nsr
.get("_admin", ()).get("deployed", ()),
3073 ee_relation
.vnf_profile_id
,
3075 deployed_kdu
.update({"resource-name": kdu_resource_profile
["resource-name"]})
3078 def _get_deployed_component(
3080 ee_relation
: EERelation
,
3081 db_nsr
: Dict
[str, Any
],
3082 cached_vnfds
: Dict
[str, Any
],
3083 ) -> DeployedComponent
:
3084 nsr_id
= db_nsr
["_id"]
3085 deployed_component
= None
3086 ee_level
= EELevel
.get_level(ee_relation
)
3087 if ee_level
== EELevel
.NS
:
3088 vca
= get_deployed_vca(db_nsr
, {"vdu_id": None, "member-vnf-index": None})
3090 deployed_component
= DeployedVCA(nsr_id
, vca
)
3091 elif ee_level
== EELevel
.VNF
:
3092 vca
= get_deployed_vca(
3096 "member-vnf-index": ee_relation
.vnf_profile_id
,
3097 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3101 deployed_component
= DeployedVCA(nsr_id
, vca
)
3102 elif ee_level
== EELevel
.VDU
:
3103 vca
= get_deployed_vca(
3106 "vdu_id": ee_relation
.vdu_profile_id
,
3107 "member-vnf-index": ee_relation
.vnf_profile_id
,
3108 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3112 deployed_component
= DeployedVCA(nsr_id
, vca
)
3113 elif ee_level
== EELevel
.KDU
:
3114 kdu_resource_data
= self
._get
_kdu
_resource
_data
(
3115 ee_relation
, db_nsr
, cached_vnfds
3117 if kdu_resource_data
:
3118 deployed_component
= DeployedK8sResource(kdu_resource_data
)
3119 return deployed_component
3121 async def _add_relation(
3125 db_nsr
: Dict
[str, Any
],
3126 cached_vnfds
: Dict
[str, Any
],
3127 cached_vnfrs
: Dict
[str, Any
],
3129 deployed_provider
= self
._get
_deployed
_component
(
3130 relation
.provider
, db_nsr
, cached_vnfds
3132 deployed_requirer
= self
._get
_deployed
_component
(
3133 relation
.requirer
, db_nsr
, cached_vnfds
3137 and deployed_requirer
3138 and deployed_provider
.config_sw_installed
3139 and deployed_requirer
.config_sw_installed
3141 provider_db_vnfr
= (
3143 relation
.provider
.nsr_id
,
3144 relation
.provider
.vnf_profile_id
,
3147 if relation
.provider
.vnf_profile_id
3150 requirer_db_vnfr
= (
3152 relation
.requirer
.nsr_id
,
3153 relation
.requirer
.vnf_profile_id
,
3156 if relation
.requirer
.vnf_profile_id
3159 provider_vca_id
= self
.get_vca_id(provider_db_vnfr
, db_nsr
)
3160 requirer_vca_id
= self
.get_vca_id(requirer_db_vnfr
, db_nsr
)
3161 provider_relation_endpoint
= RelationEndpoint(
3162 deployed_provider
.ee_id
,
3164 relation
.provider
.endpoint
,
3166 requirer_relation_endpoint
= RelationEndpoint(
3167 deployed_requirer
.ee_id
,
3169 relation
.requirer
.endpoint
,
3171 await self
.vca_map
[vca_type
].add_relation(
3172 provider
=provider_relation_endpoint
,
3173 requirer
=requirer_relation_endpoint
,
3175 # remove entry from relations list
3179 async def _add_vca_relations(
3185 timeout
: int = 3600,
3189 # 1. find all relations for this VCA
3190 # 2. wait for other peers related
3194 # STEP 1: find all relations for this VCA
3197 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3198 nsd
= get_nsd(db_nsr
)
3201 deployed_vca_dict
= get_deployed_vca_list(db_nsr
)[vca_index
]
3202 my_vca
= DeployedVCA(nsr_id
, deployed_vca_dict
)
3207 relations
.extend(self
._get
_ns
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3208 relations
.extend(self
._get
_vnf
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3210 # if no relations, terminate
3212 self
.logger
.debug(logging_text
+ " No relations")
3215 self
.logger
.debug(logging_text
+ " adding relations {}".format(relations
))
3222 if now
- start
>= timeout
:
3223 self
.logger
.error(logging_text
+ " : timeout adding relations")
3226 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3227 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3229 # for each relation, find the VCA's related
3230 for relation
in relations
.copy():
3231 added
= await self
._add
_relation
(
3239 relations
.remove(relation
)
3242 self
.logger
.debug("Relations added")
3244 await asyncio
.sleep(5.0)
3248 except Exception as e
:
3249 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
3252 async def _install_kdu(
3260 k8s_instance_info
: dict,
3261 k8params
: dict = None,
3267 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
3270 "collection": "nsrs",
3271 "filter": {"_id": nsr_id
},
3272 "path": nsr_db_path
,
3275 if k8s_instance_info
.get("kdu-deployment-name"):
3276 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
3278 kdu_instance
= self
.k8scluster_map
[
3280 ].generate_kdu_instance_name(
3281 db_dict
=db_dict_install
,
3282 kdu_model
=k8s_instance_info
["kdu-model"],
3283 kdu_name
=k8s_instance_info
["kdu-name"],
3286 # Update the nsrs table with the kdu-instance value
3290 _desc
={nsr_db_path
+ ".kdu-instance": kdu_instance
},
3293 # Update the nsrs table with the actual namespace being used, if the k8scluster-type is `juju` or
3294 # `juju-bundle`. This verification is needed because there is not a standard/homogeneous namespace
3295 # between the Helm Charts and Juju Bundles-based KNFs. If we found a way of having an homogeneous
3296 # namespace, this first verification could be removed, and the next step would be done for any kind
3298 # TODO -> find a way to have an homogeneous namespace between the Helm Charts and Juju Bundles-based
3299 # KNFs (Bug 2027: https://osm.etsi.org/bugzilla/show_bug.cgi?id=2027)
3300 if k8sclustertype
in ("juju", "juju-bundle"):
3301 # First, verify if the current namespace is present in the `_admin.projects_read` (if not, it means
3302 # that the user passed a namespace which he wants its KDU to be deployed in)
3308 "_admin.projects_write": k8s_instance_info
["namespace"],
3309 "_admin.projects_read": k8s_instance_info
["namespace"],
3315 f
"Updating namespace/model for Juju Bundle from {k8s_instance_info['namespace']} to {kdu_instance}"
3320 _desc
={f
"{nsr_db_path}.namespace": kdu_instance
},
3322 k8s_instance_info
["namespace"] = kdu_instance
3324 await self
.k8scluster_map
[k8sclustertype
].install(
3325 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3326 kdu_model
=k8s_instance_info
["kdu-model"],
3329 db_dict
=db_dict_install
,
3331 kdu_name
=k8s_instance_info
["kdu-name"],
3332 namespace
=k8s_instance_info
["namespace"],
3333 kdu_instance
=kdu_instance
,
3337 # Obtain services to obtain management service ip
3338 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
3339 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3340 kdu_instance
=kdu_instance
,
3341 namespace
=k8s_instance_info
["namespace"],
3344 # Obtain management service info (if exists)
3345 vnfr_update_dict
= {}
3346 kdu_config
= get_configuration(vnfd
, kdud
["name"])
3348 target_ee_list
= kdu_config
.get("execution-environment-list", [])
3353 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
3356 for service
in kdud
.get("service", [])
3357 if service
.get("mgmt-service")
3359 for mgmt_service
in mgmt_services
:
3360 for service
in services
:
3361 if service
["name"].startswith(mgmt_service
["name"]):
3362 # Mgmt service found, Obtain service ip
3363 ip
= service
.get("external_ip", service
.get("cluster_ip"))
3364 if isinstance(ip
, list) and len(ip
) == 1:
3368 "kdur.{}.ip-address".format(kdu_index
)
3371 # Check if must update also mgmt ip at the vnf
3372 service_external_cp
= mgmt_service
.get(
3373 "external-connection-point-ref"
3375 if service_external_cp
:
3377 deep_get(vnfd
, ("mgmt-interface", "cp"))
3378 == service_external_cp
3380 vnfr_update_dict
["ip-address"] = ip
3385 "external-connection-point-ref", ""
3387 == service_external_cp
,
3390 "kdur.{}.ip-address".format(kdu_index
)
3395 "Mgmt service name: {} not found".format(
3396 mgmt_service
["name"]
3400 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3401 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3403 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3406 and kdu_config
.get("initial-config-primitive")
3407 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3409 initial_config_primitive_list
= kdu_config
.get(
3410 "initial-config-primitive"
3412 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3414 for initial_config_primitive
in initial_config_primitive_list
:
3415 primitive_params_
= self
._map
_primitive
_params
(
3416 initial_config_primitive
, {}, {}
3419 await asyncio
.wait_for(
3420 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3421 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3422 kdu_instance
=kdu_instance
,
3423 primitive_name
=initial_config_primitive
["name"],
3424 params
=primitive_params_
,
3425 db_dict
=db_dict_install
,
3431 except Exception as e
:
3432 # Prepare update db with error and raise exception
3435 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3439 vnfr_data
.get("_id"),
3440 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3443 # ignore to keep original exception
3445 # reraise original error
3450 async def deploy_kdus(
3457 task_instantiation_info
,
3459 # Launch kdus if present in the descriptor
3461 k8scluster_id_2_uuic
= {
3462 "helm-chart-v3": {},
3467 async def _get_cluster_id(cluster_id
, cluster_type
):
3468 nonlocal k8scluster_id_2_uuic
3469 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3470 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3472 # check if K8scluster is creating and wait look if previous tasks in process
3473 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3474 "k8scluster", cluster_id
3477 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3478 task_name
, cluster_id
3480 self
.logger
.debug(logging_text
+ text
)
3481 await asyncio
.wait(task_dependency
, timeout
=3600)
3483 db_k8scluster
= self
.db
.get_one(
3484 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3486 if not db_k8scluster
:
3487 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3489 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3491 if cluster_type
== "helm-chart-v3":
3493 # backward compatibility for existing clusters that have not been initialized for helm v3
3494 k8s_credentials
= yaml
.safe_dump(
3495 db_k8scluster
.get("credentials")
3497 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3498 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3500 db_k8scluster_update
= {}
3501 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3502 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3503 db_k8scluster_update
[
3504 "_admin.helm-chart-v3.created"
3506 db_k8scluster_update
[
3507 "_admin.helm-chart-v3.operationalState"
3510 "k8sclusters", cluster_id
, db_k8scluster_update
3512 except Exception as e
:
3515 + "error initializing helm-v3 cluster: {}".format(str(e
))
3518 "K8s cluster '{}' has not been initialized for '{}'".format(
3519 cluster_id
, cluster_type
3524 "K8s cluster '{}' has not been initialized for '{}'".format(
3525 cluster_id
, cluster_type
3528 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3531 logging_text
+= "Deploy kdus: "
3534 db_nsr_update
= {"_admin.deployed.K8s": []}
3535 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3538 updated_cluster_list
= []
3539 updated_v3_cluster_list
= []
3541 for vnfr_data
in db_vnfrs
.values():
3542 vca_id
= self
.get_vca_id(vnfr_data
, {})
3543 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3544 # Step 0: Prepare and set parameters
3545 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3546 vnfd_id
= vnfr_data
.get("vnfd-id")
3547 vnfd_with_id
= find_in_list(
3548 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3552 for kdud
in vnfd_with_id
["kdu"]
3553 if kdud
["name"] == kdur
["kdu-name"]
3555 namespace
= kdur
.get("k8s-namespace")
3556 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3557 if kdur
.get("helm-chart"):
3558 kdumodel
= kdur
["helm-chart"]
3559 # Default version: helm3, if helm-version is v2 assign v2
3560 k8sclustertype
= "helm-chart-v3"
3561 self
.logger
.debug("kdur: {}".format(kdur
))
3563 kdur
.get("helm-version")
3564 and kdur
.get("helm-version") == "v2"
3566 k8sclustertype
= "helm-chart"
3567 elif kdur
.get("juju-bundle"):
3568 kdumodel
= kdur
["juju-bundle"]
3569 k8sclustertype
= "juju-bundle"
3572 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3573 "juju-bundle. Maybe an old NBI version is running".format(
3574 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3577 # check if kdumodel is a file and exists
3579 vnfd_with_id
= find_in_list(
3580 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3582 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3583 if storage
: # may be not present if vnfd has not artifacts
3584 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3585 if storage
["pkg-dir"]:
3586 filename
= "{}/{}/{}s/{}".format(
3593 filename
= "{}/Scripts/{}s/{}".format(
3598 if self
.fs
.file_exists(
3599 filename
, mode
="file"
3600 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3601 kdumodel
= self
.fs
.path
+ filename
3602 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3604 except Exception: # it is not a file
3607 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3608 step
= "Synchronize repos for k8s cluster '{}'".format(
3611 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3615 k8sclustertype
== "helm-chart"
3616 and cluster_uuid
not in updated_cluster_list
3618 k8sclustertype
== "helm-chart-v3"
3619 and cluster_uuid
not in updated_v3_cluster_list
3621 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3622 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3623 cluster_uuid
=cluster_uuid
3626 if del_repo_list
or added_repo_dict
:
3627 if k8sclustertype
== "helm-chart":
3629 "_admin.helm_charts_added." + item
: None
3630 for item
in del_repo_list
3633 "_admin.helm_charts_added." + item
: name
3634 for item
, name
in added_repo_dict
.items()
3636 updated_cluster_list
.append(cluster_uuid
)
3637 elif k8sclustertype
== "helm-chart-v3":
3639 "_admin.helm_charts_v3_added." + item
: None
3640 for item
in del_repo_list
3643 "_admin.helm_charts_v3_added." + item
: name
3644 for item
, name
in added_repo_dict
.items()
3646 updated_v3_cluster_list
.append(cluster_uuid
)
3648 logging_text
+ "repos synchronized on k8s cluster "
3649 "'{}' to_delete: {}, to_add: {}".format(
3650 k8s_cluster_id
, del_repo_list
, added_repo_dict
3655 {"_id": k8s_cluster_id
},
3661 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3662 vnfr_data
["member-vnf-index-ref"],
3666 k8s_instance_info
= {
3667 "kdu-instance": None,
3668 "k8scluster-uuid": cluster_uuid
,
3669 "k8scluster-type": k8sclustertype
,
3670 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3671 "kdu-name": kdur
["kdu-name"],
3672 "kdu-model": kdumodel
,
3673 "namespace": namespace
,
3674 "kdu-deployment-name": kdu_deployment_name
,
3676 db_path
= "_admin.deployed.K8s.{}".format(index
)
3677 db_nsr_update
[db_path
] = k8s_instance_info
3678 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3679 vnfd_with_id
= find_in_list(
3680 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3682 task
= asyncio
.ensure_future(
3691 k8params
=desc_params
,
3696 self
.lcm_tasks
.register(
3700 "instantiate_KDU-{}".format(index
),
3703 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3709 except (LcmException
, asyncio
.CancelledError
):
3711 except Exception as e
:
3712 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3713 if isinstance(e
, (N2VCException
, DbException
)):
3714 self
.logger
.error(logging_text
+ msg
)
3716 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3717 raise LcmException(msg
)
3720 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3739 task_instantiation_info
,
3742 # launch instantiate_N2VC in a asyncio task and register task object
3743 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3744 # if not found, create one entry and update database
3745 # fill db_nsr._admin.deployed.VCA.<index>
3748 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
3752 get_charm_name
= False
3753 if "execution-environment-list" in descriptor_config
:
3754 ee_list
= descriptor_config
.get("execution-environment-list", [])
3755 elif "juju" in descriptor_config
:
3756 ee_list
= [descriptor_config
] # ns charms
3757 if "execution-environment-list" not in descriptor_config
:
3758 # charm name is only required for ns charms
3759 get_charm_name
= True
3760 else: # other types as script are not supported
3763 for ee_item
in ee_list
:
3766 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3767 ee_item
.get("juju"), ee_item
.get("helm-chart")
3770 ee_descriptor_id
= ee_item
.get("id")
3771 if ee_item
.get("juju"):
3772 vca_name
= ee_item
["juju"].get("charm")
3774 charm_name
= self
.find_charm_name(db_nsr
, str(vca_name
))
3777 if ee_item
["juju"].get("charm") is not None
3780 if ee_item
["juju"].get("cloud") == "k8s":
3781 vca_type
= "k8s_proxy_charm"
3782 elif ee_item
["juju"].get("proxy") is False:
3783 vca_type
= "native_charm"
3784 elif ee_item
.get("helm-chart"):
3785 vca_name
= ee_item
["helm-chart"]
3786 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
3789 vca_type
= "helm-v3"
3792 logging_text
+ "skipping non juju neither charm configuration"
3797 for vca_index
, vca_deployed
in enumerate(
3798 db_nsr
["_admin"]["deployed"]["VCA"]
3800 if not vca_deployed
:
3803 vca_deployed
.get("member-vnf-index") == member_vnf_index
3804 and vca_deployed
.get("vdu_id") == vdu_id
3805 and vca_deployed
.get("kdu_name") == kdu_name
3806 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
3807 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
3811 # not found, create one.
3813 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
3816 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
3818 target
+= "/kdu/{}".format(kdu_name
)
3820 "target_element": target
,
3821 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3822 "member-vnf-index": member_vnf_index
,
3824 "kdu_name": kdu_name
,
3825 "vdu_count_index": vdu_index
,
3826 "operational-status": "init", # TODO revise
3827 "detailed-status": "", # TODO revise
3828 "step": "initial-deploy", # TODO revise
3830 "vdu_name": vdu_name
,
3832 "ee_descriptor_id": ee_descriptor_id
,
3833 "charm_name": charm_name
,
3837 # create VCA and configurationStatus in db
3839 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
3840 "configurationStatus.{}".format(vca_index
): dict(),
3842 self
.update_db_2("nsrs", nsr_id
, db_dict
)
3844 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
3846 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
3847 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
3848 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
3851 task_n2vc
= asyncio
.ensure_future(
3852 self
.instantiate_N2VC(
3853 logging_text
=logging_text
,
3854 vca_index
=vca_index
,
3860 vdu_index
=vdu_index
,
3861 deploy_params
=deploy_params
,
3862 config_descriptor
=descriptor_config
,
3863 base_folder
=base_folder
,
3864 nslcmop_id
=nslcmop_id
,
3868 ee_config_descriptor
=ee_item
,
3871 self
.lcm_tasks
.register(
3875 "instantiate_N2VC-{}".format(vca_index
),
3878 task_instantiation_info
[
3880 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
3881 member_vnf_index
or "", vdu_id
or ""
3885 def _create_nslcmop(nsr_id
, operation
, params
):
3887 Creates a ns-lcm-opp content to be stored at database.
3888 :param nsr_id: internal id of the instance
3889 :param operation: instantiate, terminate, scale, action, ...
3890 :param params: user parameters for the operation
3891 :return: dictionary following SOL005 format
3893 # Raise exception if invalid arguments
3894 if not (nsr_id
and operation
and params
):
3896 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3903 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3904 "operationState": "PROCESSING",
3905 "statusEnteredTime": now
,
3906 "nsInstanceId": nsr_id
,
3907 "lcmOperationType": operation
,
3909 "isAutomaticInvocation": False,
3910 "operationParams": params
,
3911 "isCancelPending": False,
3913 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
3914 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
3919 def _format_additional_params(self
, params
):
3920 params
= params
or {}
3921 for key
, value
in params
.items():
3922 if str(value
).startswith("!!yaml "):
3923 params
[key
] = yaml
.safe_load(value
[7:])
3926 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
3927 primitive
= seq
.get("name")
3928 primitive_params
= {}
3930 "member_vnf_index": vnf_index
,
3931 "primitive": primitive
,
3932 "primitive_params": primitive_params
,
3935 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
3939 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
3940 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
3941 if op
.get("operationState") == "COMPLETED":
3942 # b. Skip sub-operation
3943 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3944 return self
.SUBOPERATION_STATUS_SKIP
3946 # c. retry executing sub-operation
3947 # The sub-operation exists, and operationState != 'COMPLETED'
3948 # Update operationState = 'PROCESSING' to indicate a retry.
3949 operationState
= "PROCESSING"
3950 detailed_status
= "In progress"
3951 self
._update
_suboperation
_status
(
3952 db_nslcmop
, op_index
, operationState
, detailed_status
3954 # Return the sub-operation index
3955 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3956 # with arguments extracted from the sub-operation
3959 # Find a sub-operation where all keys in a matching dictionary must match
3960 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3961 def _find_suboperation(self
, db_nslcmop
, match
):
3962 if db_nslcmop
and match
:
3963 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
3964 for i
, op
in enumerate(op_list
):
3965 if all(op
.get(k
) == match
[k
] for k
in match
):
3967 return self
.SUBOPERATION_STATUS_NOT_FOUND
3969 # Update status for a sub-operation given its index
3970 def _update_suboperation_status(
3971 self
, db_nslcmop
, op_index
, operationState
, detailed_status
3973 # Update DB for HA tasks
3974 q_filter
= {"_id": db_nslcmop
["_id"]}
3976 "_admin.operations.{}.operationState".format(op_index
): operationState
,
3977 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
3980 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
3983 # Add sub-operation, return the index of the added sub-operation
3984 # Optionally, set operationState, detailed-status, and operationType
3985 # Status and type are currently set for 'scale' sub-operations:
3986 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3987 # 'detailed-status' : status message
3988 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3989 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3990 def _add_suboperation(
3998 mapped_primitive_params
,
3999 operationState
=None,
4000 detailed_status
=None,
4003 RO_scaling_info
=None,
4006 return self
.SUBOPERATION_STATUS_NOT_FOUND
4007 # Get the "_admin.operations" list, if it exists
4008 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
4009 op_list
= db_nslcmop_admin
.get("operations")
4010 # Create or append to the "_admin.operations" list
4012 "member_vnf_index": vnf_index
,
4014 "vdu_count_index": vdu_count_index
,
4015 "primitive": primitive
,
4016 "primitive_params": mapped_primitive_params
,
4019 new_op
["operationState"] = operationState
4021 new_op
["detailed-status"] = detailed_status
4023 new_op
["lcmOperationType"] = operationType
4025 new_op
["RO_nsr_id"] = RO_nsr_id
4027 new_op
["RO_scaling_info"] = RO_scaling_info
4029 # No existing operations, create key 'operations' with current operation as first list element
4030 db_nslcmop_admin
.update({"operations": [new_op
]})
4031 op_list
= db_nslcmop_admin
.get("operations")
4033 # Existing operations, append operation to list
4034 op_list
.append(new_op
)
4036 db_nslcmop_update
= {"_admin.operations": op_list
}
4037 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
4038 op_index
= len(op_list
) - 1
4041 # Helper methods for scale() sub-operations
4043 # pre-scale/post-scale:
4044 # Check for 3 different cases:
4045 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
4046 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
4047 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
4048 def _check_or_add_scale_suboperation(
4052 vnf_config_primitive
,
4056 RO_scaling_info
=None,
4058 # Find this sub-operation
4059 if RO_nsr_id
and RO_scaling_info
:
4060 operationType
= "SCALE-RO"
4062 "member_vnf_index": vnf_index
,
4063 "RO_nsr_id": RO_nsr_id
,
4064 "RO_scaling_info": RO_scaling_info
,
4068 "member_vnf_index": vnf_index
,
4069 "primitive": vnf_config_primitive
,
4070 "primitive_params": primitive_params
,
4071 "lcmOperationType": operationType
,
4073 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
4074 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
4075 # a. New sub-operation
4076 # The sub-operation does not exist, add it.
4077 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
4078 # The following parameters are set to None for all kind of scaling:
4080 vdu_count_index
= None
4082 if RO_nsr_id
and RO_scaling_info
:
4083 vnf_config_primitive
= None
4084 primitive_params
= None
4087 RO_scaling_info
= None
4088 # Initial status for sub-operation
4089 operationState
= "PROCESSING"
4090 detailed_status
= "In progress"
4091 # Add sub-operation for pre/post-scaling (zero or more operations)
4092 self
._add
_suboperation
(
4098 vnf_config_primitive
,
4106 return self
.SUBOPERATION_STATUS_NEW
4108 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
4109 # or op_index (operationState != 'COMPLETED')
4110 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
4112 # Function to return execution_environment id
4114 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
4115 # TODO vdu_index_count
4116 for vca
in vca_deployed_list
:
4117 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
4120 async def destroy_N2VC(
4128 exec_primitives
=True,
4133 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
4134 :param logging_text:
4136 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
4137 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
4138 :param vca_index: index in the database _admin.deployed.VCA
4139 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
4140 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
4141 not executed properly
4142 :param scaling_in: True destroys the application, False destroys the model
4143 :return: None or exception
4148 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
4149 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
4153 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
4155 # execute terminate_primitives
4157 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
4158 config_descriptor
.get("terminate-config-primitive"),
4159 vca_deployed
.get("ee_descriptor_id"),
4161 vdu_id
= vca_deployed
.get("vdu_id")
4162 vdu_count_index
= vca_deployed
.get("vdu_count_index")
4163 vdu_name
= vca_deployed
.get("vdu_name")
4164 vnf_index
= vca_deployed
.get("member-vnf-index")
4165 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
4166 for seq
in terminate_primitives
:
4167 # For each sequence in list, get primitive and call _ns_execute_primitive()
4168 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
4169 vnf_index
, seq
.get("name")
4171 self
.logger
.debug(logging_text
+ step
)
4172 # Create the primitive for each sequence, i.e. "primitive": "touch"
4173 primitive
= seq
.get("name")
4174 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
4179 self
._add
_suboperation
(
4186 mapped_primitive_params
,
4188 # Sub-operations: Call _ns_execute_primitive() instead of action()
4190 result
, result_detail
= await self
._ns
_execute
_primitive
(
4191 vca_deployed
["ee_id"],
4193 mapped_primitive_params
,
4197 except LcmException
:
4198 # this happens when VCA is not deployed. In this case it is not needed to terminate
4200 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
4201 if result
not in result_ok
:
4203 "terminate_primitive {} for vnf_member_index={} fails with "
4204 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
4206 # set that this VCA do not need terminated
4207 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
4211 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
4214 # Delete Prometheus Jobs if any
4215 # This uses NSR_ID, so it will destroy any jobs under this index
4216 self
.db
.del_list("prometheus_jobs", {"nsr_id": db_nslcmop
["nsInstanceId"]})
4219 await self
.vca_map
[vca_type
].delete_execution_environment(
4220 vca_deployed
["ee_id"],
4221 scaling_in
=scaling_in
,
4226 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
4227 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
4228 namespace
= "." + db_nsr
["_id"]
4230 await self
.n2vc
.delete_namespace(
4231 namespace
=namespace
,
4232 total_timeout
=self
.timeout_charm_delete
,
4235 except N2VCNotFound
: # already deleted. Skip
4237 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
4239 async def _terminate_RO(
4240 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4243 Terminates a deployment from RO
4244 :param logging_text:
4245 :param nsr_deployed: db_nsr._admin.deployed
4248 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
4249 this method will update only the index 2, but it will write on database the concatenated content of the list
4254 ro_nsr_id
= ro_delete_action
= None
4255 if nsr_deployed
and nsr_deployed
.get("RO"):
4256 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
4257 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
4260 stage
[2] = "Deleting ns from VIM."
4261 db_nsr_update
["detailed-status"] = " ".join(stage
)
4262 self
._write
_op
_status
(nslcmop_id
, stage
)
4263 self
.logger
.debug(logging_text
+ stage
[2])
4264 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4265 self
._write
_op
_status
(nslcmop_id
, stage
)
4266 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
4267 ro_delete_action
= desc
["action_id"]
4269 "_admin.deployed.RO.nsr_delete_action_id"
4270 ] = ro_delete_action
4271 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4272 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4273 if ro_delete_action
:
4274 # wait until NS is deleted from VIM
4275 stage
[2] = "Waiting ns deleted from VIM."
4276 detailed_status_old
= None
4280 + " RO_id={} ro_delete_action={}".format(
4281 ro_nsr_id
, ro_delete_action
4284 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4285 self
._write
_op
_status
(nslcmop_id
, stage
)
4287 delete_timeout
= 20 * 60 # 20 minutes
4288 while delete_timeout
> 0:
4289 desc
= await self
.RO
.show(
4291 item_id_name
=ro_nsr_id
,
4292 extra_item
="action",
4293 extra_item_id
=ro_delete_action
,
4297 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
4299 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
4300 if ns_status
== "ERROR":
4301 raise ROclient
.ROClientException(ns_status_info
)
4302 elif ns_status
== "BUILD":
4303 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
4304 elif ns_status
== "ACTIVE":
4305 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4306 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4311 ), "ROclient.check_action_status returns unknown {}".format(
4314 if stage
[2] != detailed_status_old
:
4315 detailed_status_old
= stage
[2]
4316 db_nsr_update
["detailed-status"] = " ".join(stage
)
4317 self
._write
_op
_status
(nslcmop_id
, stage
)
4318 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4319 await asyncio
.sleep(5, loop
=self
.loop
)
4321 else: # delete_timeout <= 0:
4322 raise ROclient
.ROClientException(
4323 "Timeout waiting ns deleted from VIM"
4326 except Exception as e
:
4327 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4329 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4331 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4332 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4333 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4335 logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
)
4338 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4340 failed_detail
.append("delete conflict: {}".format(e
))
4343 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
)
4346 failed_detail
.append("delete error: {}".format(e
))
4348 logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
)
4352 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
4353 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
4355 stage
[2] = "Deleting nsd from RO."
4356 db_nsr_update
["detailed-status"] = " ".join(stage
)
4357 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4358 self
._write
_op
_status
(nslcmop_id
, stage
)
4359 await self
.RO
.delete("nsd", ro_nsd_id
)
4361 logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
)
4363 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4364 except Exception as e
:
4366 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4368 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4370 logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
)
4373 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4375 failed_detail
.append(
4376 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
)
4378 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4380 failed_detail
.append(
4381 "ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
)
4383 self
.logger
.error(logging_text
+ failed_detail
[-1])
4385 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
4386 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
4387 if not vnf_deployed
or not vnf_deployed
["id"]:
4390 ro_vnfd_id
= vnf_deployed
["id"]
4393 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4394 vnf_deployed
["member-vnf-index"], ro_vnfd_id
4396 db_nsr_update
["detailed-status"] = " ".join(stage
)
4397 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4398 self
._write
_op
_status
(nslcmop_id
, stage
)
4399 await self
.RO
.delete("vnfd", ro_vnfd_id
)
4401 logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
)
4403 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
4404 except Exception as e
:
4406 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4409 "_admin.deployed.RO.vnfd.{}.id".format(index
)
4413 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
)
4416 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4418 failed_detail
.append(
4419 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
)
4421 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4423 failed_detail
.append(
4424 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
)
4426 self
.logger
.error(logging_text
+ failed_detail
[-1])
4429 stage
[2] = "Error deleting from VIM"
4431 stage
[2] = "Deleted from VIM"
4432 db_nsr_update
["detailed-status"] = " ".join(stage
)
4433 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4434 self
._write
_op
_status
(nslcmop_id
, stage
)
4437 raise LcmException("; ".join(failed_detail
))
4439 async def terminate(self
, nsr_id
, nslcmop_id
):
4440 # Try to lock HA task here
4441 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4442 if not task_is_locked_by_me
:
4445 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4446 self
.logger
.debug(logging_text
+ "Enter")
4447 timeout_ns_terminate
= self
.timeout_ns_terminate
4450 operation_params
= None
4452 error_list
= [] # annotates all failed error messages
4453 db_nslcmop_update
= {}
4454 autoremove
= False # autoremove after terminated
4455 tasks_dict_info
= {}
4458 "Stage 1/3: Preparing task.",
4459 "Waiting for previous operations to terminate.",
4462 # ^ contains [stage, step, VIM-status]
4464 # wait for any previous tasks in process
4465 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4467 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4468 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4469 operation_params
= db_nslcmop
.get("operationParams") or {}
4470 if operation_params
.get("timeout_ns_terminate"):
4471 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4472 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4473 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4475 db_nsr_update
["operational-status"] = "terminating"
4476 db_nsr_update
["config-status"] = "terminating"
4477 self
._write
_ns
_status
(
4479 ns_state
="TERMINATING",
4480 current_operation
="TERMINATING",
4481 current_operation_id
=nslcmop_id
,
4482 other_update
=db_nsr_update
,
4484 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4485 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4486 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4489 stage
[1] = "Getting vnf descriptors from db."
4490 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4492 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4494 db_vnfds_from_id
= {}
4495 db_vnfds_from_member_index
= {}
4497 for vnfr
in db_vnfrs_list
:
4498 vnfd_id
= vnfr
["vnfd-id"]
4499 if vnfd_id
not in db_vnfds_from_id
:
4500 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4501 db_vnfds_from_id
[vnfd_id
] = vnfd
4502 db_vnfds_from_member_index
[
4503 vnfr
["member-vnf-index-ref"]
4504 ] = db_vnfds_from_id
[vnfd_id
]
4506 # Destroy individual execution environments when there are terminating primitives.
4507 # Rest of EE will be deleted at once
4508 # TODO - check before calling _destroy_N2VC
4509 # if not operation_params.get("skip_terminate_primitives"):#
4510 # or not vca.get("needed_terminate"):
4511 stage
[0] = "Stage 2/3 execute terminating primitives."
4512 self
.logger
.debug(logging_text
+ stage
[0])
4513 stage
[1] = "Looking execution environment that needs terminate."
4514 self
.logger
.debug(logging_text
+ stage
[1])
4516 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4517 config_descriptor
= None
4518 vca_member_vnf_index
= vca
.get("member-vnf-index")
4519 vca_id
= self
.get_vca_id(
4520 db_vnfrs_dict
.get(vca_member_vnf_index
)
4521 if vca_member_vnf_index
4525 if not vca
or not vca
.get("ee_id"):
4527 if not vca
.get("member-vnf-index"):
4529 config_descriptor
= db_nsr
.get("ns-configuration")
4530 elif vca
.get("vdu_id"):
4531 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4532 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4533 elif vca
.get("kdu_name"):
4534 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4535 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4537 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4538 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4539 vca_type
= vca
.get("type")
4540 exec_terminate_primitives
= not operation_params
.get(
4541 "skip_terminate_primitives"
4542 ) and vca
.get("needed_terminate")
4543 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4544 # pending native charms
4546 True if vca_type
in ("helm", "helm-v3", "native_charm") else False
4548 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4549 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4550 task
= asyncio
.ensure_future(
4558 exec_terminate_primitives
,
4562 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4564 # wait for pending tasks of terminate primitives
4568 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4570 error_list
= await self
._wait
_for
_tasks
(
4573 min(self
.timeout_charm_delete
, timeout_ns_terminate
),
4577 tasks_dict_info
.clear()
4579 return # raise LcmException("; ".join(error_list))
4581 # remove All execution environments at once
4582 stage
[0] = "Stage 3/3 delete all."
4584 if nsr_deployed
.get("VCA"):
4585 stage
[1] = "Deleting all execution environments."
4586 self
.logger
.debug(logging_text
+ stage
[1])
4587 vca_id
= self
.get_vca_id({}, db_nsr
)
4588 task_delete_ee
= asyncio
.ensure_future(
4590 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4591 timeout
=self
.timeout_charm_delete
,
4594 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4595 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4597 # Delete from k8scluster
4598 stage
[1] = "Deleting KDUs."
4599 self
.logger
.debug(logging_text
+ stage
[1])
4600 # print(nsr_deployed)
4601 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4602 if not kdu
or not kdu
.get("kdu-instance"):
4604 kdu_instance
= kdu
.get("kdu-instance")
4605 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4606 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4607 vca_id
= self
.get_vca_id({}, db_nsr
)
4608 task_delete_kdu_instance
= asyncio
.ensure_future(
4609 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4610 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4611 kdu_instance
=kdu_instance
,
4613 namespace
=kdu
.get("namespace"),
4619 + "Unknown k8s deployment type {}".format(
4620 kdu
.get("k8scluster-type")
4625 task_delete_kdu_instance
4626 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4629 stage
[1] = "Deleting ns from VIM."
4631 task_delete_ro
= asyncio
.ensure_future(
4632 self
._terminate
_ng
_ro
(
4633 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4637 task_delete_ro
= asyncio
.ensure_future(
4639 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4642 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4644 # rest of staff will be done at finally
4647 ROclient
.ROClientException
,
4652 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4654 except asyncio
.CancelledError
:
4656 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4658 exc
= "Operation was cancelled"
4659 except Exception as e
:
4660 exc
= traceback
.format_exc()
4661 self
.logger
.critical(
4662 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4667 error_list
.append(str(exc
))
4669 # wait for pending tasks
4671 stage
[1] = "Waiting for terminate pending tasks."
4672 self
.logger
.debug(logging_text
+ stage
[1])
4673 error_list
+= await self
._wait
_for
_tasks
(
4676 timeout_ns_terminate
,
4680 stage
[1] = stage
[2] = ""
4681 except asyncio
.CancelledError
:
4682 error_list
.append("Cancelled")
4683 # TODO cancell all tasks
4684 except Exception as exc
:
4685 error_list
.append(str(exc
))
4686 # update status at database
4688 error_detail
= "; ".join(error_list
)
4689 # self.logger.error(logging_text + error_detail)
4690 error_description_nslcmop
= "{} Detail: {}".format(
4691 stage
[0], error_detail
4693 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4694 nslcmop_id
, stage
[0]
4697 db_nsr_update
["operational-status"] = "failed"
4698 db_nsr_update
["detailed-status"] = (
4699 error_description_nsr
+ " Detail: " + error_detail
4701 db_nslcmop_update
["detailed-status"] = error_detail
4702 nslcmop_operation_state
= "FAILED"
4706 error_description_nsr
= error_description_nslcmop
= None
4707 ns_state
= "NOT_INSTANTIATED"
4708 db_nsr_update
["operational-status"] = "terminated"
4709 db_nsr_update
["detailed-status"] = "Done"
4710 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4711 db_nslcmop_update
["detailed-status"] = "Done"
4712 nslcmop_operation_state
= "COMPLETED"
4715 self
._write
_ns
_status
(
4718 current_operation
="IDLE",
4719 current_operation_id
=None,
4720 error_description
=error_description_nsr
,
4721 error_detail
=error_detail
,
4722 other_update
=db_nsr_update
,
4724 self
._write
_op
_status
(
4727 error_message
=error_description_nslcmop
,
4728 operation_state
=nslcmop_operation_state
,
4729 other_update
=db_nslcmop_update
,
4731 if ns_state
== "NOT_INSTANTIATED":
4735 {"nsr-id-ref": nsr_id
},
4736 {"_admin.nsState": "NOT_INSTANTIATED"},
4738 except DbException
as e
:
4741 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4745 if operation_params
:
4746 autoremove
= operation_params
.get("autoremove", False)
4747 if nslcmop_operation_state
:
4749 await self
.msg
.aiowrite(
4754 "nslcmop_id": nslcmop_id
,
4755 "operationState": nslcmop_operation_state
,
4756 "autoremove": autoremove
,
4760 except Exception as e
:
4762 logging_text
+ "kafka_write notification Exception {}".format(e
)
4765 self
.logger
.debug(logging_text
+ "Exit")
4766 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4768 async def _wait_for_tasks(
4769 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4772 error_detail_list
= []
4774 pending_tasks
= list(created_tasks_info
.keys())
4775 num_tasks
= len(pending_tasks
)
4777 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4778 self
._write
_op
_status
(nslcmop_id
, stage
)
4779 while pending_tasks
:
4781 _timeout
= timeout
+ time_start
- time()
4782 done
, pending_tasks
= await asyncio
.wait(
4783 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4785 num_done
+= len(done
)
4786 if not done
: # Timeout
4787 for task
in pending_tasks
:
4788 new_error
= created_tasks_info
[task
] + ": Timeout"
4789 error_detail_list
.append(new_error
)
4790 error_list
.append(new_error
)
4793 if task
.cancelled():
4796 exc
= task
.exception()
4798 if isinstance(exc
, asyncio
.TimeoutError
):
4800 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4801 error_list
.append(created_tasks_info
[task
])
4802 error_detail_list
.append(new_error
)
4809 ROclient
.ROClientException
,
4815 self
.logger
.error(logging_text
+ new_error
)
4817 exc_traceback
= "".join(
4818 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4822 + created_tasks_info
[task
]
4828 logging_text
+ created_tasks_info
[task
] + ": Done"
4830 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4832 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4833 if nsr_id
: # update also nsr
4838 "errorDescription": "Error at: " + ", ".join(error_list
),
4839 "errorDetail": ". ".join(error_detail_list
),
4842 self
._write
_op
_status
(nslcmop_id
, stage
)
4843 return error_detail_list
4846 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4848 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4849 The default-value is used. If it is between < > it look for a value at instantiation_params
4850 :param primitive_desc: portion of VNFD/NSD that describes primitive
4851 :param params: Params provided by user
4852 :param instantiation_params: Instantiation params provided by user
4853 :return: a dictionary with the calculated params
4855 calculated_params
= {}
4856 for parameter
in primitive_desc
.get("parameter", ()):
4857 param_name
= parameter
["name"]
4858 if param_name
in params
:
4859 calculated_params
[param_name
] = params
[param_name
]
4860 elif "default-value" in parameter
or "value" in parameter
:
4861 if "value" in parameter
:
4862 calculated_params
[param_name
] = parameter
["value"]
4864 calculated_params
[param_name
] = parameter
["default-value"]
4866 isinstance(calculated_params
[param_name
], str)
4867 and calculated_params
[param_name
].startswith("<")
4868 and calculated_params
[param_name
].endswith(">")
4870 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4871 calculated_params
[param_name
] = instantiation_params
[
4872 calculated_params
[param_name
][1:-1]
4876 "Parameter {} needed to execute primitive {} not provided".format(
4877 calculated_params
[param_name
], primitive_desc
["name"]
4882 "Parameter {} needed to execute primitive {} not provided".format(
4883 param_name
, primitive_desc
["name"]
4887 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4888 calculated_params
[param_name
] = yaml
.safe_dump(
4889 calculated_params
[param_name
], default_flow_style
=True, width
=256
4891 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4893 ].startswith("!!yaml "):
4894 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4895 if parameter
.get("data-type") == "INTEGER":
4897 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4898 except ValueError: # error converting string to int
4900 "Parameter {} of primitive {} must be integer".format(
4901 param_name
, primitive_desc
["name"]
4904 elif parameter
.get("data-type") == "BOOLEAN":
4905 calculated_params
[param_name
] = not (
4906 (str(calculated_params
[param_name
])).lower() == "false"
4909 # add always ns_config_info if primitive name is config
4910 if primitive_desc
["name"] == "config":
4911 if "ns_config_info" in instantiation_params
:
4912 calculated_params
["ns_config_info"] = instantiation_params
[
4915 return calculated_params
4917 def _look_for_deployed_vca(
4924 ee_descriptor_id
=None,
4926 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4927 for vca
in deployed_vca
:
4930 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4933 vdu_count_index
is not None
4934 and vdu_count_index
!= vca
["vdu_count_index"]
4937 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4939 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4943 # vca_deployed not found
4945 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4946 " is not deployed".format(
4955 ee_id
= vca
.get("ee_id")
4957 "type", "lxc_proxy_charm"
4958 ) # default value for backward compatibility - proxy charm
4961 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4962 "execution environment".format(
4963 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
4966 return ee_id
, vca_type
4968 async def _ns_execute_primitive(
4974 retries_interval
=30,
4981 if primitive
== "config":
4982 primitive_params
= {"params": primitive_params
}
4984 vca_type
= vca_type
or "lxc_proxy_charm"
4988 output
= await asyncio
.wait_for(
4989 self
.vca_map
[vca_type
].exec_primitive(
4991 primitive_name
=primitive
,
4992 params_dict
=primitive_params
,
4993 progress_timeout
=self
.timeout_progress_primitive
,
4994 total_timeout
=self
.timeout_primitive
,
4999 timeout
=timeout
or self
.timeout_primitive
,
5003 except asyncio
.CancelledError
:
5005 except Exception as e
:
5009 "Error executing action {} on {} -> {}".format(
5014 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
5016 if isinstance(e
, asyncio
.TimeoutError
):
5018 message
="Timed out waiting for action to complete"
5020 return "FAILED", getattr(e
, "message", repr(e
))
5022 return "COMPLETED", output
5024 except (LcmException
, asyncio
.CancelledError
):
5026 except Exception as e
:
5027 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
5029 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
5031 Updating the vca_status with latest juju information in nsrs record
5032 :param: nsr_id: Id of the nsr
5033 :param: nslcmop_id: Id of the nslcmop
5037 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
5038 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5039 vca_id
= self
.get_vca_id({}, db_nsr
)
5040 if db_nsr
["_admin"]["deployed"]["K8s"]:
5041 for _
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
5042 cluster_uuid
, kdu_instance
, cluster_type
= (
5043 k8s
["k8scluster-uuid"],
5044 k8s
["kdu-instance"],
5045 k8s
["k8scluster-type"],
5047 await self
._on
_update
_k
8s
_db
(
5048 cluster_uuid
=cluster_uuid
,
5049 kdu_instance
=kdu_instance
,
5050 filter={"_id": nsr_id
},
5052 cluster_type
=cluster_type
,
5055 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
5056 table
, filter = "nsrs", {"_id": nsr_id
}
5057 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
5058 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
5060 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
5061 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
5063 async def action(self
, nsr_id
, nslcmop_id
):
5064 # Try to lock HA task here
5065 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5066 if not task_is_locked_by_me
:
5069 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
5070 self
.logger
.debug(logging_text
+ "Enter")
5071 # get all needed from database
5075 db_nslcmop_update
= {}
5076 nslcmop_operation_state
= None
5077 error_description_nslcmop
= None
5080 # wait for any previous tasks in process
5081 step
= "Waiting for previous operations to terminate"
5082 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5084 self
._write
_ns
_status
(
5087 current_operation
="RUNNING ACTION",
5088 current_operation_id
=nslcmop_id
,
5091 step
= "Getting information from database"
5092 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5093 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5094 if db_nslcmop
["operationParams"].get("primitive_params"):
5095 db_nslcmop
["operationParams"]["primitive_params"] = json
.loads(
5096 db_nslcmop
["operationParams"]["primitive_params"]
5099 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5100 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
5101 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
5102 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
5103 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
5104 primitive
= db_nslcmop
["operationParams"]["primitive"]
5105 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
5106 timeout_ns_action
= db_nslcmop
["operationParams"].get(
5107 "timeout_ns_action", self
.timeout_primitive
5111 step
= "Getting vnfr from database"
5112 db_vnfr
= self
.db
.get_one(
5113 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
5115 if db_vnfr
.get("kdur"):
5117 for kdur
in db_vnfr
["kdur"]:
5118 if kdur
.get("additionalParams"):
5119 kdur
["additionalParams"] = json
.loads(
5120 kdur
["additionalParams"]
5122 kdur_list
.append(kdur
)
5123 db_vnfr
["kdur"] = kdur_list
5124 step
= "Getting vnfd from database"
5125 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
5127 # Sync filesystem before running a primitive
5128 self
.fs
.sync(db_vnfr
["vnfd-id"])
5130 step
= "Getting nsd from database"
5131 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
5133 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5134 # for backward compatibility
5135 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5136 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5137 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5138 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5140 # look for primitive
5141 config_primitive_desc
= descriptor_configuration
= None
5143 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
5145 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
5147 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
5149 descriptor_configuration
= db_nsd
.get("ns-configuration")
5151 if descriptor_configuration
and descriptor_configuration
.get(
5154 for config_primitive
in descriptor_configuration
["config-primitive"]:
5155 if config_primitive
["name"] == primitive
:
5156 config_primitive_desc
= config_primitive
5159 if not config_primitive_desc
:
5160 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
5162 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
5166 primitive_name
= primitive
5167 ee_descriptor_id
= None
5169 primitive_name
= config_primitive_desc
.get(
5170 "execution-environment-primitive", primitive
5172 ee_descriptor_id
= config_primitive_desc
.get(
5173 "execution-environment-ref"
5179 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
5181 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
5184 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
5186 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
5188 desc_params
= parse_yaml_strings(
5189 db_vnfr
.get("additionalParamsForVnf")
5192 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
5193 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
5194 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
5196 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
5197 actions
.add(primitive
["name"])
5198 for primitive
in kdu_configuration
.get("config-primitive", []):
5199 actions
.add(primitive
["name"])
5201 nsr_deployed
["K8s"],
5202 lambda kdu
: kdu_name
== kdu
["kdu-name"]
5203 and kdu
["member-vnf-index"] == vnf_index
,
5207 if primitive_name
in actions
5208 and kdu
["k8scluster-type"] not in ("helm-chart", "helm-chart-v3")
5212 # TODO check if ns is in a proper status
5214 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
5216 # kdur and desc_params already set from before
5217 if primitive_params
:
5218 desc_params
.update(primitive_params
)
5219 # TODO Check if we will need something at vnf level
5220 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
5222 kdu_name
== kdu
["kdu-name"]
5223 and kdu
["member-vnf-index"] == vnf_index
5228 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
5231 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
5232 msg
= "unknown k8scluster-type '{}'".format(
5233 kdu
.get("k8scluster-type")
5235 raise LcmException(msg
)
5238 "collection": "nsrs",
5239 "filter": {"_id": nsr_id
},
5240 "path": "_admin.deployed.K8s.{}".format(index
),
5244 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
5246 step
= "Executing kdu {}".format(primitive_name
)
5247 if primitive_name
== "upgrade":
5248 if desc_params
.get("kdu_model"):
5249 kdu_model
= desc_params
.get("kdu_model")
5250 del desc_params
["kdu_model"]
5252 kdu_model
= kdu
.get("kdu-model")
5253 parts
= kdu_model
.split(sep
=":")
5255 kdu_model
= parts
[0]
5257 detailed_status
= await asyncio
.wait_for(
5258 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
5259 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5260 kdu_instance
=kdu
.get("kdu-instance"),
5262 kdu_model
=kdu_model
,
5265 timeout
=timeout_ns_action
,
5267 timeout
=timeout_ns_action
+ 10,
5270 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
5272 elif primitive_name
== "rollback":
5273 detailed_status
= await asyncio
.wait_for(
5274 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
5275 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5276 kdu_instance
=kdu
.get("kdu-instance"),
5279 timeout
=timeout_ns_action
,
5281 elif primitive_name
== "status":
5282 detailed_status
= await asyncio
.wait_for(
5283 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
5284 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5285 kdu_instance
=kdu
.get("kdu-instance"),
5288 timeout
=timeout_ns_action
,
5291 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
5292 kdu
["kdu-name"], nsr_id
5294 params
= self
._map
_primitive
_params
(
5295 config_primitive_desc
, primitive_params
, desc_params
5298 detailed_status
= await asyncio
.wait_for(
5299 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
5300 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5301 kdu_instance
=kdu_instance
,
5302 primitive_name
=primitive_name
,
5305 timeout
=timeout_ns_action
,
5308 timeout
=timeout_ns_action
,
5312 nslcmop_operation_state
= "COMPLETED"
5314 detailed_status
= ""
5315 nslcmop_operation_state
= "FAILED"
5317 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5318 nsr_deployed
["VCA"],
5319 member_vnf_index
=vnf_index
,
5321 vdu_count_index
=vdu_count_index
,
5322 ee_descriptor_id
=ee_descriptor_id
,
5324 for vca_index
, vca_deployed
in enumerate(
5325 db_nsr
["_admin"]["deployed"]["VCA"]
5327 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5329 "collection": "nsrs",
5330 "filter": {"_id": nsr_id
},
5331 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
5335 nslcmop_operation_state
,
5337 ) = await self
._ns
_execute
_primitive
(
5339 primitive
=primitive_name
,
5340 primitive_params
=self
._map
_primitive
_params
(
5341 config_primitive_desc
, primitive_params
, desc_params
5343 timeout
=timeout_ns_action
,
5349 db_nslcmop_update
["detailed-status"] = detailed_status
5350 error_description_nslcmop
= (
5351 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
5355 + "Done with result {} {}".format(
5356 nslcmop_operation_state
, detailed_status
5359 return # database update is called inside finally
5361 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5362 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5364 except asyncio
.CancelledError
:
5366 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5368 exc
= "Operation was cancelled"
5369 except asyncio
.TimeoutError
:
5370 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5372 except Exception as e
:
5373 exc
= traceback
.format_exc()
5374 self
.logger
.critical(
5375 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5384 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5385 nslcmop_operation_state
= "FAILED"
5387 self
._write
_ns
_status
(
5391 ], # TODO check if degraded. For the moment use previous status
5392 current_operation
="IDLE",
5393 current_operation_id
=None,
5394 # error_description=error_description_nsr,
5395 # error_detail=error_detail,
5396 other_update
=db_nsr_update
,
5399 self
._write
_op
_status
(
5402 error_message
=error_description_nslcmop
,
5403 operation_state
=nslcmop_operation_state
,
5404 other_update
=db_nslcmop_update
,
5407 if nslcmop_operation_state
:
5409 await self
.msg
.aiowrite(
5414 "nslcmop_id": nslcmop_id
,
5415 "operationState": nslcmop_operation_state
,
5419 except Exception as e
:
5421 logging_text
+ "kafka_write notification Exception {}".format(e
)
5423 self
.logger
.debug(logging_text
+ "Exit")
5424 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5425 return nslcmop_operation_state
, detailed_status
5427 async def terminate_vdus(
5428 self
, db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
5430 """This method terminates VDUs
5433 db_vnfr: VNF instance record
5434 member_vnf_index: VNF index to identify the VDUs to be removed
5435 db_nsr: NS instance record
5436 update_db_nslcmops: Nslcmop update record
5438 vca_scaling_info
= []
5439 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5440 scaling_info
["scaling_direction"] = "IN"
5441 scaling_info
["vdu-delete"] = {}
5442 scaling_info
["kdu-delete"] = {}
5443 db_vdur
= db_vnfr
.get("vdur")
5444 vdur_list
= copy(db_vdur
)
5446 for index
, vdu
in enumerate(vdur_list
):
5447 vca_scaling_info
.append(
5449 "osm_vdu_id": vdu
["vdu-id-ref"],
5450 "member-vnf-index": member_vnf_index
,
5452 "vdu_index": count_index
,
5455 scaling_info
["vdu-delete"][vdu
["vdu-id-ref"]] = count_index
5456 scaling_info
["vdu"].append(
5458 "name": vdu
.get("name") or vdu
.get("vdu-name"),
5459 "vdu_id": vdu
["vdu-id-ref"],
5463 for interface
in vdu
["interfaces"]:
5464 scaling_info
["vdu"][index
]["interface"].append(
5466 "name": interface
["name"],
5467 "ip_address": interface
["ip-address"],
5468 "mac_address": interface
.get("mac-address"),
5471 self
.logger
.info("NS update scaling info{}".format(scaling_info
))
5472 stage
[2] = "Terminating VDUs"
5473 if scaling_info
.get("vdu-delete"):
5474 # scale_process = "RO"
5475 if self
.ro_config
.get("ng"):
5476 await self
._scale
_ng
_ro
(
5485 async def remove_vnf(self
, nsr_id
, nslcmop_id
, vnf_instance_id
):
5486 """This method is to Remove VNF instances from NS.
5489 nsr_id: NS instance id
5490 nslcmop_id: nslcmop id of update
5491 vnf_instance_id: id of the VNF instance to be removed
5494 result: (str, str) COMPLETED/FAILED, details
5498 logging_text
= "Task ns={} update ".format(nsr_id
)
5499 check_vnfr_count
= len(self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}))
5500 self
.logger
.info("check_vnfr_count {}".format(check_vnfr_count
))
5501 if check_vnfr_count
> 1:
5502 stage
= ["", "", ""]
5503 step
= "Getting nslcmop from database"
5505 step
+ " after having waited for previous tasks to be completed"
5507 # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5508 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5509 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5510 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5511 """ db_vnfr = self.db.get_one(
5512 "vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id}) """
5514 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5515 await self
.terminate_vdus(
5524 constituent_vnfr
= db_nsr
.get("constituent-vnfr-ref")
5525 constituent_vnfr
.remove(db_vnfr
.get("_id"))
5526 db_nsr_update
["constituent-vnfr-ref"] = db_nsr
.get(
5527 "constituent-vnfr-ref"
5529 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5530 self
.db
.del_one("vnfrs", {"_id": db_vnfr
.get("_id")})
5531 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5532 return "COMPLETED", "Done"
5534 step
= "Terminate VNF Failed with"
5536 "{} Cannot terminate the last VNF in this NS.".format(
5540 except (LcmException
, asyncio
.CancelledError
):
5542 except Exception as e
:
5543 self
.logger
.debug("Error removing VNF {}".format(e
))
5544 return "FAILED", "Error removing VNF {}".format(e
)
5546 async def _ns_redeploy_vnf(
5554 """This method updates and redeploys VNF instances
5557 nsr_id: NS instance id
5558 nslcmop_id: nslcmop id
5559 db_vnfd: VNF descriptor
5560 db_vnfr: VNF instance record
5561 db_nsr: NS instance record
5564 result: (str, str) COMPLETED/FAILED, details
5568 stage
= ["", "", ""]
5569 logging_text
= "Task ns={} update ".format(nsr_id
)
5570 latest_vnfd_revision
= db_vnfd
["_admin"].get("revision")
5571 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5573 # Terminate old VNF resources
5574 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5575 await self
.terminate_vdus(
5584 # old_vnfd_id = db_vnfr["vnfd-id"]
5585 # new_db_vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
5586 new_db_vnfd
= db_vnfd
5587 # new_vnfd_ref = new_db_vnfd["id"]
5588 # new_vnfd_id = vnfd_id
5592 for cp
in new_db_vnfd
.get("ext-cpd", ()):
5594 "name": cp
.get("id"),
5595 "connection-point-id": cp
.get("int-cpd", {}).get("cpd"),
5596 "connection-point-vdu-id": cp
.get("int-cpd", {}).get("vdu-id"),
5599 new_vnfr_cp
.append(vnf_cp
)
5600 new_vdur
= update_db_nslcmops
["operationParams"]["newVdur"]
5601 # new_vdur = self._create_vdur_descriptor_from_vnfd(db_nsd, db_vnfd, old_db_vnfd, vnfd_id, db_nsr, member_vnf_index)
5602 # new_vnfr_update = {"vnfd-ref": new_vnfd_ref, "vnfd-id": new_vnfd_id, "connection-point": new_vnfr_cp, "vdur": new_vdur, "ip-address": ""}
5604 "revision": latest_vnfd_revision
,
5605 "connection-point": new_vnfr_cp
,
5609 self
.update_db_2("vnfrs", db_vnfr
["_id"], new_vnfr_update
)
5610 updated_db_vnfr
= self
.db
.get_one(
5612 {"member-vnf-index-ref": member_vnf_index
, "nsr-id-ref": nsr_id
},
5615 # Instantiate new VNF resources
5616 # update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5617 vca_scaling_info
= []
5618 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5619 scaling_info
["scaling_direction"] = "OUT"
5620 scaling_info
["vdu-create"] = {}
5621 scaling_info
["kdu-create"] = {}
5622 vdud_instantiate_list
= db_vnfd
["vdu"]
5623 for index
, vdud
in enumerate(vdud_instantiate_list
):
5624 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(vdud
, db_vnfd
)
5626 additional_params
= (
5627 self
._get
_vdu
_additional
_params
(updated_db_vnfr
, vdud
["id"])
5630 cloud_init_list
= []
5632 # TODO Information of its own ip is not available because db_vnfr is not updated.
5633 additional_params
["OSM"] = get_osm_params(
5634 updated_db_vnfr
, vdud
["id"], 1
5636 cloud_init_list
.append(
5637 self
._parse
_cloud
_init
(
5644 vca_scaling_info
.append(
5646 "osm_vdu_id": vdud
["id"],
5647 "member-vnf-index": member_vnf_index
,
5649 "vdu_index": count_index
,
5652 scaling_info
["vdu-create"][vdud
["id"]] = count_index
5653 if self
.ro_config
.get("ng"):
5655 "New Resources to be deployed: {}".format(scaling_info
)
5657 await self
._scale
_ng
_ro
(
5665 return "COMPLETED", "Done"
5666 except (LcmException
, asyncio
.CancelledError
):
5668 except Exception as e
:
5669 self
.logger
.debug("Error updating VNF {}".format(e
))
5670 return "FAILED", "Error updating VNF {}".format(e
)
5672 async def _ns_charm_upgrade(
5678 timeout
: float = None,
5680 """This method upgrade charms in VNF instances
5683 ee_id: Execution environment id
5684 path: Local path to the charm
5686 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
5687 timeout: (Float) Timeout for the ns update operation
5690 result: (str, str) COMPLETED/FAILED, details
5693 charm_type
= charm_type
or "lxc_proxy_charm"
5694 output
= await self
.vca_map
[charm_type
].upgrade_charm(
5698 charm_type
=charm_type
,
5699 timeout
=timeout
or self
.timeout_ns_update
,
5703 return "COMPLETED", output
5705 except (LcmException
, asyncio
.CancelledError
):
5708 except Exception as e
:
5710 self
.logger
.debug("Error upgrading charm {}".format(path
))
5712 return "FAILED", "Error upgrading charm {}: {}".format(path
, e
)
5714 async def update(self
, nsr_id
, nslcmop_id
):
5715 """Update NS according to different update types
5717 This method performs upgrade of VNF instances then updates the revision
5718 number in VNF record
5721 nsr_id: Network service will be updated
5722 nslcmop_id: ns lcm operation id
5725 It may raise DbException, LcmException, N2VCException, K8sException
5728 # Try to lock HA task here
5729 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5730 if not task_is_locked_by_me
:
5733 logging_text
= "Task ns={} update={} ".format(nsr_id
, nslcmop_id
)
5734 self
.logger
.debug(logging_text
+ "Enter")
5736 # Set the required variables to be filled up later
5738 db_nslcmop_update
= {}
5740 nslcmop_operation_state
= None
5742 error_description_nslcmop
= ""
5744 change_type
= "updated"
5745 detailed_status
= ""
5748 # wait for any previous tasks in process
5749 step
= "Waiting for previous operations to terminate"
5750 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5751 self
._write
_ns
_status
(
5754 current_operation
="UPDATING",
5755 current_operation_id
=nslcmop_id
,
5758 step
= "Getting nslcmop from database"
5759 db_nslcmop
= self
.db
.get_one(
5760 "nslcmops", {"_id": nslcmop_id
}, fail_on_empty
=False
5762 update_type
= db_nslcmop
["operationParams"]["updateType"]
5764 step
= "Getting nsr from database"
5765 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5766 old_operational_status
= db_nsr
["operational-status"]
5767 db_nsr_update
["operational-status"] = "updating"
5768 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5769 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5771 if update_type
== "CHANGE_VNFPKG":
5773 # Get the input parameters given through update request
5774 vnf_instance_id
= db_nslcmop
["operationParams"][
5775 "changeVnfPackageData"
5776 ].get("vnfInstanceId")
5778 vnfd_id
= db_nslcmop
["operationParams"]["changeVnfPackageData"].get(
5781 timeout_seconds
= db_nslcmop
["operationParams"].get("timeout_ns_update")
5783 step
= "Getting vnfr from database"
5784 db_vnfr
= self
.db
.get_one(
5785 "vnfrs", {"_id": vnf_instance_id
}, fail_on_empty
=False
5788 step
= "Getting vnfds from database"
5790 latest_vnfd
= self
.db
.get_one(
5791 "vnfds", {"_id": vnfd_id
}, fail_on_empty
=False
5793 latest_vnfd_revision
= latest_vnfd
["_admin"].get("revision")
5796 current_vnf_revision
= db_vnfr
.get("revision", 1)
5797 current_vnfd
= self
.db
.get_one(
5799 {"_id": vnfd_id
+ ":" + str(current_vnf_revision
)},
5800 fail_on_empty
=False,
5802 # Charm artifact paths will be filled up later
5804 current_charm_artifact_path
,
5805 target_charm_artifact_path
,
5806 charm_artifact_paths
,
5809 step
= "Checking if revision has changed in VNFD"
5810 if current_vnf_revision
!= latest_vnfd_revision
:
5812 change_type
= "policy_updated"
5814 # There is new revision of VNFD, update operation is required
5815 current_vnfd_path
= vnfd_id
+ ":" + str(current_vnf_revision
)
5816 latest_vnfd_path
= vnfd_id
+ ":" + str(latest_vnfd_revision
)
5818 step
= "Removing the VNFD packages if they exist in the local path"
5819 shutil
.rmtree(self
.fs
.path
+ current_vnfd_path
, ignore_errors
=True)
5820 shutil
.rmtree(self
.fs
.path
+ latest_vnfd_path
, ignore_errors
=True)
5822 step
= "Get the VNFD packages from FSMongo"
5823 self
.fs
.sync(from_path
=latest_vnfd_path
)
5824 self
.fs
.sync(from_path
=current_vnfd_path
)
5827 "Get the charm-type, charm-id, ee-id if there is deployed VCA"
5829 base_folder
= latest_vnfd
["_admin"]["storage"]
5831 for charm_index
, charm_deployed
in enumerate(
5832 get_iterable(nsr_deployed
, "VCA")
5834 vnf_index
= db_vnfr
.get("member-vnf-index-ref")
5836 # Getting charm-id and charm-type
5837 if charm_deployed
.get("member-vnf-index") == vnf_index
:
5838 charm_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5839 charm_type
= charm_deployed
.get("type")
5842 ee_id
= charm_deployed
.get("ee_id")
5844 step
= "Getting descriptor config"
5845 descriptor_config
= get_configuration(
5846 current_vnfd
, current_vnfd
["id"]
5849 if "execution-environment-list" in descriptor_config
:
5850 ee_list
= descriptor_config
.get(
5851 "execution-environment-list", []
5856 # There could be several charm used in the same VNF
5857 for ee_item
in ee_list
:
5858 if ee_item
.get("juju"):
5860 step
= "Getting charm name"
5861 charm_name
= ee_item
["juju"].get("charm")
5863 step
= "Setting Charm artifact paths"
5864 current_charm_artifact_path
.append(
5865 get_charm_artifact_path(
5869 current_vnf_revision
,
5872 target_charm_artifact_path
.append(
5873 get_charm_artifact_path(
5877 latest_vnfd_revision
,
5881 charm_artifact_paths
= zip(
5882 current_charm_artifact_path
, target_charm_artifact_path
5885 step
= "Checking if software version has changed in VNFD"
5886 if find_software_version(current_vnfd
) != find_software_version(
5890 step
= "Checking if existing VNF has charm"
5891 for current_charm_path
, target_charm_path
in list(
5892 charm_artifact_paths
5894 if current_charm_path
:
5896 "Software version change is not supported as VNF instance {} has charm.".format(
5901 # There is no change in the charm package, then redeploy the VNF
5902 # based on new descriptor
5903 step
= "Redeploying VNF"
5904 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5905 (result
, detailed_status
) = await self
._ns
_redeploy
_vnf
(
5906 nsr_id
, nslcmop_id
, latest_vnfd
, db_vnfr
, db_nsr
5908 if result
== "FAILED":
5909 nslcmop_operation_state
= result
5910 error_description_nslcmop
= detailed_status
5911 db_nslcmop_update
["detailed-status"] = detailed_status
5914 + " step {} Done with result {} {}".format(
5915 step
, nslcmop_operation_state
, detailed_status
5920 step
= "Checking if any charm package has changed or not"
5921 for current_charm_path
, target_charm_path
in list(
5922 charm_artifact_paths
5926 and target_charm_path
5927 and self
.check_charm_hash_changed(
5928 current_charm_path
, target_charm_path
5932 step
= "Checking whether VNF uses juju bundle"
5933 if check_juju_bundle_existence(current_vnfd
):
5936 "Charm upgrade is not supported for the instance which"
5937 " uses juju-bundle: {}".format(
5938 check_juju_bundle_existence(current_vnfd
)
5942 step
= "Upgrading Charm"
5946 ) = await self
._ns
_charm
_upgrade
(
5949 charm_type
=charm_type
,
5950 path
=self
.fs
.path
+ target_charm_path
,
5951 timeout
=timeout_seconds
,
5954 if result
== "FAILED":
5955 nslcmop_operation_state
= result
5956 error_description_nslcmop
= detailed_status
5958 db_nslcmop_update
["detailed-status"] = detailed_status
5961 + " step {} Done with result {} {}".format(
5962 step
, nslcmop_operation_state
, detailed_status
5966 step
= "Updating policies"
5967 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5968 result
= "COMPLETED"
5969 detailed_status
= "Done"
5970 db_nslcmop_update
["detailed-status"] = "Done"
5972 # If nslcmop_operation_state is None, so any operation is not failed.
5973 if not nslcmop_operation_state
:
5974 nslcmop_operation_state
= "COMPLETED"
5976 # If update CHANGE_VNFPKG nslcmop_operation is successful
5977 # vnf revision need to be updated
5978 vnfr_update
["revision"] = latest_vnfd_revision
5979 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
5983 + " task Done with result {} {}".format(
5984 nslcmop_operation_state
, detailed_status
5987 elif update_type
== "REMOVE_VNF":
5988 # This part is included in https://osm.etsi.org/gerrit/11876
5989 vnf_instance_id
= db_nslcmop
["operationParams"]["removeVnfInstanceId"]
5990 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5991 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5992 step
= "Removing VNF"
5993 (result
, detailed_status
) = await self
.remove_vnf(
5994 nsr_id
, nslcmop_id
, vnf_instance_id
5996 if result
== "FAILED":
5997 nslcmop_operation_state
= result
5998 error_description_nslcmop
= detailed_status
5999 db_nslcmop_update
["detailed-status"] = detailed_status
6000 change_type
= "vnf_terminated"
6001 if not nslcmop_operation_state
:
6002 nslcmop_operation_state
= "COMPLETED"
6005 + " task Done with result {} {}".format(
6006 nslcmop_operation_state
, detailed_status
6010 elif update_type
== "OPERATE_VNF":
6011 vnf_id
= db_nslcmop
["operationParams"]["operateVnfData"][
6014 operation_type
= db_nslcmop
["operationParams"]["operateVnfData"][
6017 additional_param
= db_nslcmop
["operationParams"]["operateVnfData"][
6020 (result
, detailed_status
) = await self
.rebuild_start_stop(
6021 nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
6023 if result
== "FAILED":
6024 nslcmop_operation_state
= result
6025 error_description_nslcmop
= detailed_status
6026 db_nslcmop_update
["detailed-status"] = detailed_status
6027 if not nslcmop_operation_state
:
6028 nslcmop_operation_state
= "COMPLETED"
6031 + " task Done with result {} {}".format(
6032 nslcmop_operation_state
, detailed_status
6036 # If nslcmop_operation_state is None, so any operation is not failed.
6037 # All operations are executed in overall.
6038 if not nslcmop_operation_state
:
6039 nslcmop_operation_state
= "COMPLETED"
6040 db_nsr_update
["operational-status"] = old_operational_status
6042 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
6043 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
6045 except asyncio
.CancelledError
:
6047 logging_text
+ "Cancelled Exception while '{}'".format(step
)
6049 exc
= "Operation was cancelled"
6050 except asyncio
.TimeoutError
:
6051 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
6053 except Exception as e
:
6054 exc
= traceback
.format_exc()
6055 self
.logger
.critical(
6056 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6065 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
6066 nslcmop_operation_state
= "FAILED"
6067 db_nsr_update
["operational-status"] = old_operational_status
6069 self
._write
_ns
_status
(
6071 ns_state
=db_nsr
["nsState"],
6072 current_operation
="IDLE",
6073 current_operation_id
=None,
6074 other_update
=db_nsr_update
,
6077 self
._write
_op
_status
(
6080 error_message
=error_description_nslcmop
,
6081 operation_state
=nslcmop_operation_state
,
6082 other_update
=db_nslcmop_update
,
6085 if nslcmop_operation_state
:
6089 "nslcmop_id": nslcmop_id
,
6090 "operationState": nslcmop_operation_state
,
6092 if change_type
in ("vnf_terminated", "policy_updated"):
6093 msg
.update({"vnf_member_index": member_vnf_index
})
6094 await self
.msg
.aiowrite("ns", change_type
, msg
, loop
=self
.loop
)
6095 except Exception as e
:
6097 logging_text
+ "kafka_write notification Exception {}".format(e
)
6099 self
.logger
.debug(logging_text
+ "Exit")
6100 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_update")
6101 return nslcmop_operation_state
, detailed_status
6103 async def scale(self
, nsr_id
, nslcmop_id
):
6104 # Try to lock HA task here
6105 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
6106 if not task_is_locked_by_me
:
6109 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
6110 stage
= ["", "", ""]
6111 tasks_dict_info
= {}
6112 # ^ stage, step, VIM progress
6113 self
.logger
.debug(logging_text
+ "Enter")
6114 # get all needed from database
6116 db_nslcmop_update
= {}
6119 # in case of error, indicates what part of scale was failed to put nsr at error status
6120 scale_process
= None
6121 old_operational_status
= ""
6122 old_config_status
= ""
6125 # wait for any previous tasks in process
6126 step
= "Waiting for previous operations to terminate"
6127 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
6128 self
._write
_ns
_status
(
6131 current_operation
="SCALING",
6132 current_operation_id
=nslcmop_id
,
6135 step
= "Getting nslcmop from database"
6137 step
+ " after having waited for previous tasks to be completed"
6139 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
6141 step
= "Getting nsr from database"
6142 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
6143 old_operational_status
= db_nsr
["operational-status"]
6144 old_config_status
= db_nsr
["config-status"]
6146 step
= "Parsing scaling parameters"
6147 db_nsr_update
["operational-status"] = "scaling"
6148 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6149 nsr_deployed
= db_nsr
["_admin"].get("deployed")
6151 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
6153 ]["member-vnf-index"]
6154 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
6156 ]["scaling-group-descriptor"]
6157 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
6158 # for backward compatibility
6159 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
6160 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
6161 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
6162 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6164 step
= "Getting vnfr from database"
6165 db_vnfr
= self
.db
.get_one(
6166 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
6169 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
6171 step
= "Getting vnfd from database"
6172 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
6174 base_folder
= db_vnfd
["_admin"]["storage"]
6176 step
= "Getting scaling-group-descriptor"
6177 scaling_descriptor
= find_in_list(
6178 get_scaling_aspect(db_vnfd
),
6179 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
6181 if not scaling_descriptor
:
6183 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
6184 "at vnfd:scaling-group-descriptor".format(scaling_group
)
6187 step
= "Sending scale order to VIM"
6188 # TODO check if ns is in a proper status
6190 if not db_nsr
["_admin"].get("scaling-group"):
6195 "_admin.scaling-group": [
6196 {"name": scaling_group
, "nb-scale-op": 0}
6200 admin_scale_index
= 0
6202 for admin_scale_index
, admin_scale_info
in enumerate(
6203 db_nsr
["_admin"]["scaling-group"]
6205 if admin_scale_info
["name"] == scaling_group
:
6206 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
6208 else: # not found, set index one plus last element and add new entry with the name
6209 admin_scale_index
+= 1
6211 "_admin.scaling-group.{}.name".format(admin_scale_index
)
6214 vca_scaling_info
= []
6215 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
6216 if scaling_type
== "SCALE_OUT":
6217 if "aspect-delta-details" not in scaling_descriptor
:
6219 "Aspect delta details not fount in scaling descriptor {}".format(
6220 scaling_descriptor
["name"]
6223 # count if max-instance-count is reached
6224 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6226 scaling_info
["scaling_direction"] = "OUT"
6227 scaling_info
["vdu-create"] = {}
6228 scaling_info
["kdu-create"] = {}
6229 for delta
in deltas
:
6230 for vdu_delta
in delta
.get("vdu-delta", {}):
6231 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
6232 # vdu_index also provides the number of instance of the targeted vdu
6233 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6234 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
6238 additional_params
= (
6239 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
6242 cloud_init_list
= []
6244 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6245 max_instance_count
= 10
6246 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
6247 max_instance_count
= vdu_profile
.get(
6248 "max-number-of-instances", 10
6251 default_instance_num
= get_number_of_instances(
6254 instances_number
= vdu_delta
.get("number-of-instances", 1)
6255 nb_scale_op
+= instances_number
6257 new_instance_count
= nb_scale_op
+ default_instance_num
6258 # Control if new count is over max and vdu count is less than max.
6259 # Then assign new instance count
6260 if new_instance_count
> max_instance_count
> vdu_count
:
6261 instances_number
= new_instance_count
- max_instance_count
6263 instances_number
= instances_number
6265 if new_instance_count
> max_instance_count
:
6267 "reached the limit of {} (max-instance-count) "
6268 "scaling-out operations for the "
6269 "scaling-group-descriptor '{}'".format(
6270 nb_scale_op
, scaling_group
6273 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6275 # TODO Information of its own ip is not available because db_vnfr is not updated.
6276 additional_params
["OSM"] = get_osm_params(
6277 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
6279 cloud_init_list
.append(
6280 self
._parse
_cloud
_init
(
6287 vca_scaling_info
.append(
6289 "osm_vdu_id": vdu_delta
["id"],
6290 "member-vnf-index": vnf_index
,
6292 "vdu_index": vdu_index
+ x
,
6295 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
6296 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6297 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6298 kdu_name
= kdu_profile
["kdu-name"]
6299 resource_name
= kdu_profile
.get("resource-name", "")
6301 # Might have different kdus in the same delta
6302 # Should have list for each kdu
6303 if not scaling_info
["kdu-create"].get(kdu_name
, None):
6304 scaling_info
["kdu-create"][kdu_name
] = []
6306 kdur
= get_kdur(db_vnfr
, kdu_name
)
6307 if kdur
.get("helm-chart"):
6308 k8s_cluster_type
= "helm-chart-v3"
6309 self
.logger
.debug("kdur: {}".format(kdur
))
6311 kdur
.get("helm-version")
6312 and kdur
.get("helm-version") == "v2"
6314 k8s_cluster_type
= "helm-chart"
6315 elif kdur
.get("juju-bundle"):
6316 k8s_cluster_type
= "juju-bundle"
6319 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6320 "juju-bundle. Maybe an old NBI version is running".format(
6321 db_vnfr
["member-vnf-index-ref"], kdu_name
6325 max_instance_count
= 10
6326 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
6327 max_instance_count
= kdu_profile
.get(
6328 "max-number-of-instances", 10
6331 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
6332 deployed_kdu
, _
= get_deployed_kdu(
6333 nsr_deployed
, kdu_name
, vnf_index
6335 if deployed_kdu
is None:
6337 "KDU '{}' for vnf '{}' not deployed".format(
6341 kdu_instance
= deployed_kdu
.get("kdu-instance")
6342 instance_num
= await self
.k8scluster_map
[
6348 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6349 kdu_model
=deployed_kdu
.get("kdu-model"),
6351 kdu_replica_count
= instance_num
+ kdu_delta
.get(
6352 "number-of-instances", 1
6355 # Control if new count is over max and instance_num is less than max.
6356 # Then assign max instance number to kdu replica count
6357 if kdu_replica_count
> max_instance_count
> instance_num
:
6358 kdu_replica_count
= max_instance_count
6359 if kdu_replica_count
> max_instance_count
:
6361 "reached the limit of {} (max-instance-count) "
6362 "scaling-out operations for the "
6363 "scaling-group-descriptor '{}'".format(
6364 instance_num
, scaling_group
6368 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6369 vca_scaling_info
.append(
6371 "osm_kdu_id": kdu_name
,
6372 "member-vnf-index": vnf_index
,
6374 "kdu_index": instance_num
+ x
- 1,
6377 scaling_info
["kdu-create"][kdu_name
].append(
6379 "member-vnf-index": vnf_index
,
6381 "k8s-cluster-type": k8s_cluster_type
,
6382 "resource-name": resource_name
,
6383 "scale": kdu_replica_count
,
6386 elif scaling_type
== "SCALE_IN":
6387 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6389 scaling_info
["scaling_direction"] = "IN"
6390 scaling_info
["vdu-delete"] = {}
6391 scaling_info
["kdu-delete"] = {}
6393 for delta
in deltas
:
6394 for vdu_delta
in delta
.get("vdu-delta", {}):
6395 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6396 min_instance_count
= 0
6397 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6398 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
6399 min_instance_count
= vdu_profile
["min-number-of-instances"]
6401 default_instance_num
= get_number_of_instances(
6402 db_vnfd
, vdu_delta
["id"]
6404 instance_num
= vdu_delta
.get("number-of-instances", 1)
6405 nb_scale_op
-= instance_num
6407 new_instance_count
= nb_scale_op
+ default_instance_num
6409 if new_instance_count
< min_instance_count
< vdu_count
:
6410 instances_number
= min_instance_count
- new_instance_count
6412 instances_number
= instance_num
6414 if new_instance_count
< min_instance_count
:
6416 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6417 "scaling-group-descriptor '{}'".format(
6418 nb_scale_op
, scaling_group
6421 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6422 vca_scaling_info
.append(
6424 "osm_vdu_id": vdu_delta
["id"],
6425 "member-vnf-index": vnf_index
,
6427 "vdu_index": vdu_index
- 1 - x
,
6430 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
6431 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6432 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6433 kdu_name
= kdu_profile
["kdu-name"]
6434 resource_name
= kdu_profile
.get("resource-name", "")
6436 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
6437 scaling_info
["kdu-delete"][kdu_name
] = []
6439 kdur
= get_kdur(db_vnfr
, kdu_name
)
6440 if kdur
.get("helm-chart"):
6441 k8s_cluster_type
= "helm-chart-v3"
6442 self
.logger
.debug("kdur: {}".format(kdur
))
6444 kdur
.get("helm-version")
6445 and kdur
.get("helm-version") == "v2"
6447 k8s_cluster_type
= "helm-chart"
6448 elif kdur
.get("juju-bundle"):
6449 k8s_cluster_type
= "juju-bundle"
6452 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6453 "juju-bundle. Maybe an old NBI version is running".format(
6454 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
6458 min_instance_count
= 0
6459 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
6460 min_instance_count
= kdu_profile
["min-number-of-instances"]
6462 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
6463 deployed_kdu
, _
= get_deployed_kdu(
6464 nsr_deployed
, kdu_name
, vnf_index
6466 if deployed_kdu
is None:
6468 "KDU '{}' for vnf '{}' not deployed".format(
6472 kdu_instance
= deployed_kdu
.get("kdu-instance")
6473 instance_num
= await self
.k8scluster_map
[
6479 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6480 kdu_model
=deployed_kdu
.get("kdu-model"),
6482 kdu_replica_count
= instance_num
- kdu_delta
.get(
6483 "number-of-instances", 1
6486 if kdu_replica_count
< min_instance_count
< instance_num
:
6487 kdu_replica_count
= min_instance_count
6488 if kdu_replica_count
< min_instance_count
:
6490 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6491 "scaling-group-descriptor '{}'".format(
6492 instance_num
, scaling_group
6496 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6497 vca_scaling_info
.append(
6499 "osm_kdu_id": kdu_name
,
6500 "member-vnf-index": vnf_index
,
6502 "kdu_index": instance_num
- x
- 1,
6505 scaling_info
["kdu-delete"][kdu_name
].append(
6507 "member-vnf-index": vnf_index
,
6509 "k8s-cluster-type": k8s_cluster_type
,
6510 "resource-name": resource_name
,
6511 "scale": kdu_replica_count
,
6515 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
6516 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
6517 if scaling_info
["scaling_direction"] == "IN":
6518 for vdur
in reversed(db_vnfr
["vdur"]):
6519 if vdu_delete
.get(vdur
["vdu-id-ref"]):
6520 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
6521 scaling_info
["vdu"].append(
6523 "name": vdur
.get("name") or vdur
.get("vdu-name"),
6524 "vdu_id": vdur
["vdu-id-ref"],
6528 for interface
in vdur
["interfaces"]:
6529 scaling_info
["vdu"][-1]["interface"].append(
6531 "name": interface
["name"],
6532 "ip_address": interface
["ip-address"],
6533 "mac_address": interface
.get("mac-address"),
6536 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
6539 step
= "Executing pre-scale vnf-config-primitive"
6540 if scaling_descriptor
.get("scaling-config-action"):
6541 for scaling_config_action
in scaling_descriptor
[
6542 "scaling-config-action"
6545 scaling_config_action
.get("trigger") == "pre-scale-in"
6546 and scaling_type
== "SCALE_IN"
6548 scaling_config_action
.get("trigger") == "pre-scale-out"
6549 and scaling_type
== "SCALE_OUT"
6551 vnf_config_primitive
= scaling_config_action
[
6552 "vnf-config-primitive-name-ref"
6554 step
= db_nslcmop_update
[
6556 ] = "executing pre-scale scaling-config-action '{}'".format(
6557 vnf_config_primitive
6560 # look for primitive
6561 for config_primitive
in (
6562 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6563 ).get("config-primitive", ()):
6564 if config_primitive
["name"] == vnf_config_primitive
:
6568 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
6569 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
6570 "primitive".format(scaling_group
, vnf_config_primitive
)
6573 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6574 if db_vnfr
.get("additionalParamsForVnf"):
6575 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6577 scale_process
= "VCA"
6578 db_nsr_update
["config-status"] = "configuring pre-scaling"
6579 primitive_params
= self
._map
_primitive
_params
(
6580 config_primitive
, {}, vnfr_params
6583 # Pre-scale retry check: Check if this sub-operation has been executed before
6584 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6587 vnf_config_primitive
,
6591 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6592 # Skip sub-operation
6593 result
= "COMPLETED"
6594 result_detail
= "Done"
6597 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6598 vnf_config_primitive
, result
, result_detail
6602 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6603 # New sub-operation: Get index of this sub-operation
6605 len(db_nslcmop
.get("_admin", {}).get("operations"))
6610 + "vnf_config_primitive={} New sub-operation".format(
6611 vnf_config_primitive
6615 # retry: Get registered params for this existing sub-operation
6616 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6619 vnf_index
= op
.get("member_vnf_index")
6620 vnf_config_primitive
= op
.get("primitive")
6621 primitive_params
= op
.get("primitive_params")
6624 + "vnf_config_primitive={} Sub-operation retry".format(
6625 vnf_config_primitive
6628 # Execute the primitive, either with new (first-time) or registered (reintent) args
6629 ee_descriptor_id
= config_primitive
.get(
6630 "execution-environment-ref"
6632 primitive_name
= config_primitive
.get(
6633 "execution-environment-primitive", vnf_config_primitive
6635 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6636 nsr_deployed
["VCA"],
6637 member_vnf_index
=vnf_index
,
6639 vdu_count_index
=None,
6640 ee_descriptor_id
=ee_descriptor_id
,
6642 result
, result_detail
= await self
._ns
_execute
_primitive
(
6651 + "vnf_config_primitive={} Done with result {} {}".format(
6652 vnf_config_primitive
, result
, result_detail
6655 # Update operationState = COMPLETED | FAILED
6656 self
._update
_suboperation
_status
(
6657 db_nslcmop
, op_index
, result
, result_detail
6660 if result
== "FAILED":
6661 raise LcmException(result_detail
)
6662 db_nsr_update
["config-status"] = old_config_status
6663 scale_process
= None
6667 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
6670 "_admin.scaling-group.{}.time".format(admin_scale_index
)
6673 # SCALE-IN VCA - BEGIN
6674 if vca_scaling_info
:
6675 step
= db_nslcmop_update
[
6677 ] = "Deleting the execution environments"
6678 scale_process
= "VCA"
6679 for vca_info
in vca_scaling_info
:
6680 if vca_info
["type"] == "delete" and not vca_info
.get("osm_kdu_id"):
6681 member_vnf_index
= str(vca_info
["member-vnf-index"])
6683 logging_text
+ "vdu info: {}".format(vca_info
)
6685 if vca_info
.get("osm_vdu_id"):
6686 vdu_id
= vca_info
["osm_vdu_id"]
6687 vdu_index
= int(vca_info
["vdu_index"])
6690 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6691 member_vnf_index
, vdu_id
, vdu_index
6693 stage
[2] = step
= "Scaling in VCA"
6694 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6695 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
6696 config_update
= db_nsr
["configurationStatus"]
6697 for vca_index
, vca
in enumerate(vca_update
):
6699 (vca
or vca
.get("ee_id"))
6700 and vca
["member-vnf-index"] == member_vnf_index
6701 and vca
["vdu_count_index"] == vdu_index
6703 if vca
.get("vdu_id"):
6704 config_descriptor
= get_configuration(
6705 db_vnfd
, vca
.get("vdu_id")
6707 elif vca
.get("kdu_name"):
6708 config_descriptor
= get_configuration(
6709 db_vnfd
, vca
.get("kdu_name")
6712 config_descriptor
= get_configuration(
6713 db_vnfd
, db_vnfd
["id"]
6715 operation_params
= (
6716 db_nslcmop
.get("operationParams") or {}
6718 exec_terminate_primitives
= not operation_params
.get(
6719 "skip_terminate_primitives"
6720 ) and vca
.get("needed_terminate")
6721 task
= asyncio
.ensure_future(
6730 exec_primitives
=exec_terminate_primitives
,
6734 timeout
=self
.timeout_charm_delete
,
6737 tasks_dict_info
[task
] = "Terminating VCA {}".format(
6740 del vca_update
[vca_index
]
6741 del config_update
[vca_index
]
6742 # wait for pending tasks of terminate primitives
6746 + "Waiting for tasks {}".format(
6747 list(tasks_dict_info
.keys())
6750 error_list
= await self
._wait
_for
_tasks
(
6754 self
.timeout_charm_delete
, self
.timeout_ns_terminate
6759 tasks_dict_info
.clear()
6761 raise LcmException("; ".join(error_list
))
6763 db_vca_and_config_update
= {
6764 "_admin.deployed.VCA": vca_update
,
6765 "configurationStatus": config_update
,
6768 "nsrs", db_nsr
["_id"], db_vca_and_config_update
6770 scale_process
= None
6771 # SCALE-IN VCA - END
6774 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
6775 scale_process
= "RO"
6776 if self
.ro_config
.get("ng"):
6777 await self
._scale
_ng
_ro
(
6778 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
6780 scaling_info
.pop("vdu-create", None)
6781 scaling_info
.pop("vdu-delete", None)
6783 scale_process
= None
6787 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
6788 scale_process
= "KDU"
6789 await self
._scale
_kdu
(
6790 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6792 scaling_info
.pop("kdu-create", None)
6793 scaling_info
.pop("kdu-delete", None)
6795 scale_process
= None
6799 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6801 # SCALE-UP VCA - BEGIN
6802 if vca_scaling_info
:
6803 step
= db_nslcmop_update
[
6805 ] = "Creating new execution environments"
6806 scale_process
= "VCA"
6807 for vca_info
in vca_scaling_info
:
6808 if vca_info
["type"] == "create" and not vca_info
.get("osm_kdu_id"):
6809 member_vnf_index
= str(vca_info
["member-vnf-index"])
6811 logging_text
+ "vdu info: {}".format(vca_info
)
6813 vnfd_id
= db_vnfr
["vnfd-ref"]
6814 if vca_info
.get("osm_vdu_id"):
6815 vdu_index
= int(vca_info
["vdu_index"])
6816 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
6817 if db_vnfr
.get("additionalParamsForVnf"):
6818 deploy_params
.update(
6820 db_vnfr
["additionalParamsForVnf"].copy()
6823 descriptor_config
= get_configuration(
6824 db_vnfd
, db_vnfd
["id"]
6826 if descriptor_config
:
6831 logging_text
=logging_text
6832 + "member_vnf_index={} ".format(member_vnf_index
),
6835 nslcmop_id
=nslcmop_id
,
6841 member_vnf_index
=member_vnf_index
,
6842 vdu_index
=vdu_index
,
6844 deploy_params
=deploy_params
,
6845 descriptor_config
=descriptor_config
,
6846 base_folder
=base_folder
,
6847 task_instantiation_info
=tasks_dict_info
,
6850 vdu_id
= vca_info
["osm_vdu_id"]
6851 vdur
= find_in_list(
6852 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
6854 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
6855 if vdur
.get("additionalParams"):
6856 deploy_params_vdu
= parse_yaml_strings(
6857 vdur
["additionalParams"]
6860 deploy_params_vdu
= deploy_params
6861 deploy_params_vdu
["OSM"] = get_osm_params(
6862 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
6864 if descriptor_config
:
6869 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6870 member_vnf_index
, vdu_id
, vdu_index
6872 stage
[2] = step
= "Scaling out VCA"
6873 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6875 logging_text
=logging_text
6876 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6877 member_vnf_index
, vdu_id
, vdu_index
6881 nslcmop_id
=nslcmop_id
,
6887 member_vnf_index
=member_vnf_index
,
6888 vdu_index
=vdu_index
,
6890 deploy_params
=deploy_params_vdu
,
6891 descriptor_config
=descriptor_config
,
6892 base_folder
=base_folder
,
6893 task_instantiation_info
=tasks_dict_info
,
6896 # SCALE-UP VCA - END
6897 scale_process
= None
6900 # execute primitive service POST-SCALING
6901 step
= "Executing post-scale vnf-config-primitive"
6902 if scaling_descriptor
.get("scaling-config-action"):
6903 for scaling_config_action
in scaling_descriptor
[
6904 "scaling-config-action"
6907 scaling_config_action
.get("trigger") == "post-scale-in"
6908 and scaling_type
== "SCALE_IN"
6910 scaling_config_action
.get("trigger") == "post-scale-out"
6911 and scaling_type
== "SCALE_OUT"
6913 vnf_config_primitive
= scaling_config_action
[
6914 "vnf-config-primitive-name-ref"
6916 step
= db_nslcmop_update
[
6918 ] = "executing post-scale scaling-config-action '{}'".format(
6919 vnf_config_primitive
6922 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6923 if db_vnfr
.get("additionalParamsForVnf"):
6924 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6926 # look for primitive
6927 for config_primitive
in (
6928 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6929 ).get("config-primitive", ()):
6930 if config_primitive
["name"] == vnf_config_primitive
:
6934 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
6935 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
6936 "config-primitive".format(
6937 scaling_group
, vnf_config_primitive
6940 scale_process
= "VCA"
6941 db_nsr_update
["config-status"] = "configuring post-scaling"
6942 primitive_params
= self
._map
_primitive
_params
(
6943 config_primitive
, {}, vnfr_params
6946 # Post-scale retry check: Check if this sub-operation has been executed before
6947 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6950 vnf_config_primitive
,
6954 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6955 # Skip sub-operation
6956 result
= "COMPLETED"
6957 result_detail
= "Done"
6960 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6961 vnf_config_primitive
, result
, result_detail
6965 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6966 # New sub-operation: Get index of this sub-operation
6968 len(db_nslcmop
.get("_admin", {}).get("operations"))
6973 + "vnf_config_primitive={} New sub-operation".format(
6974 vnf_config_primitive
6978 # retry: Get registered params for this existing sub-operation
6979 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6982 vnf_index
= op
.get("member_vnf_index")
6983 vnf_config_primitive
= op
.get("primitive")
6984 primitive_params
= op
.get("primitive_params")
6987 + "vnf_config_primitive={} Sub-operation retry".format(
6988 vnf_config_primitive
6991 # Execute the primitive, either with new (first-time) or registered (reintent) args
6992 ee_descriptor_id
= config_primitive
.get(
6993 "execution-environment-ref"
6995 primitive_name
= config_primitive
.get(
6996 "execution-environment-primitive", vnf_config_primitive
6998 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6999 nsr_deployed
["VCA"],
7000 member_vnf_index
=vnf_index
,
7002 vdu_count_index
=None,
7003 ee_descriptor_id
=ee_descriptor_id
,
7005 result
, result_detail
= await self
._ns
_execute
_primitive
(
7014 + "vnf_config_primitive={} Done with result {} {}".format(
7015 vnf_config_primitive
, result
, result_detail
7018 # Update operationState = COMPLETED | FAILED
7019 self
._update
_suboperation
_status
(
7020 db_nslcmop
, op_index
, result
, result_detail
7023 if result
== "FAILED":
7024 raise LcmException(result_detail
)
7025 db_nsr_update
["config-status"] = old_config_status
7026 scale_process
= None
7031 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
7032 db_nsr_update
["operational-status"] = (
7034 if old_operational_status
== "failed"
7035 else old_operational_status
7037 db_nsr_update
["config-status"] = old_config_status
7040 ROclient
.ROClientException
,
7045 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7047 except asyncio
.CancelledError
:
7049 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7051 exc
= "Operation was cancelled"
7052 except Exception as e
:
7053 exc
= traceback
.format_exc()
7054 self
.logger
.critical(
7055 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7059 self
._write
_ns
_status
(
7062 current_operation
="IDLE",
7063 current_operation_id
=None,
7066 stage
[1] = "Waiting for instantiate pending tasks."
7067 self
.logger
.debug(logging_text
+ stage
[1])
7068 exc
= await self
._wait
_for
_tasks
(
7071 self
.timeout_ns_deploy
,
7079 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7080 nslcmop_operation_state
= "FAILED"
7082 db_nsr_update
["operational-status"] = old_operational_status
7083 db_nsr_update
["config-status"] = old_config_status
7084 db_nsr_update
["detailed-status"] = ""
7086 if "VCA" in scale_process
:
7087 db_nsr_update
["config-status"] = "failed"
7088 if "RO" in scale_process
:
7089 db_nsr_update
["operational-status"] = "failed"
7092 ] = "FAILED scaling nslcmop={} {}: {}".format(
7093 nslcmop_id
, step
, exc
7096 error_description_nslcmop
= None
7097 nslcmop_operation_state
= "COMPLETED"
7098 db_nslcmop_update
["detailed-status"] = "Done"
7100 self
._write
_op
_status
(
7103 error_message
=error_description_nslcmop
,
7104 operation_state
=nslcmop_operation_state
,
7105 other_update
=db_nslcmop_update
,
7108 self
._write
_ns
_status
(
7111 current_operation
="IDLE",
7112 current_operation_id
=None,
7113 other_update
=db_nsr_update
,
7116 if nslcmop_operation_state
:
7120 "nslcmop_id": nslcmop_id
,
7121 "operationState": nslcmop_operation_state
,
7123 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
7124 except Exception as e
:
7126 logging_text
+ "kafka_write notification Exception {}".format(e
)
7128 self
.logger
.debug(logging_text
+ "Exit")
7129 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
7131 async def _scale_kdu(
7132 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
7134 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
7135 for kdu_name
in _scaling_info
:
7136 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
7137 deployed_kdu
, index
= get_deployed_kdu(
7138 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
7140 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
7141 kdu_instance
= deployed_kdu
["kdu-instance"]
7142 kdu_model
= deployed_kdu
.get("kdu-model")
7143 scale
= int(kdu_scaling_info
["scale"])
7144 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
7147 "collection": "nsrs",
7148 "filter": {"_id": nsr_id
},
7149 "path": "_admin.deployed.K8s.{}".format(index
),
7152 step
= "scaling application {}".format(
7153 kdu_scaling_info
["resource-name"]
7155 self
.logger
.debug(logging_text
+ step
)
7157 if kdu_scaling_info
["type"] == "delete":
7158 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7161 and kdu_config
.get("terminate-config-primitive")
7162 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7164 terminate_config_primitive_list
= kdu_config
.get(
7165 "terminate-config-primitive"
7167 terminate_config_primitive_list
.sort(
7168 key
=lambda val
: int(val
["seq"])
7172 terminate_config_primitive
7173 ) in terminate_config_primitive_list
:
7174 primitive_params_
= self
._map
_primitive
_params
(
7175 terminate_config_primitive
, {}, {}
7177 step
= "execute terminate config primitive"
7178 self
.logger
.debug(logging_text
+ step
)
7179 await asyncio
.wait_for(
7180 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7181 cluster_uuid
=cluster_uuid
,
7182 kdu_instance
=kdu_instance
,
7183 primitive_name
=terminate_config_primitive
["name"],
7184 params
=primitive_params_
,
7191 await asyncio
.wait_for(
7192 self
.k8scluster_map
[k8s_cluster_type
].scale(
7195 kdu_scaling_info
["resource-name"],
7197 cluster_uuid
=cluster_uuid
,
7198 kdu_model
=kdu_model
,
7202 timeout
=self
.timeout_vca_on_error
,
7205 if kdu_scaling_info
["type"] == "create":
7206 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7209 and kdu_config
.get("initial-config-primitive")
7210 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7212 initial_config_primitive_list
= kdu_config
.get(
7213 "initial-config-primitive"
7215 initial_config_primitive_list
.sort(
7216 key
=lambda val
: int(val
["seq"])
7219 for initial_config_primitive
in initial_config_primitive_list
:
7220 primitive_params_
= self
._map
_primitive
_params
(
7221 initial_config_primitive
, {}, {}
7223 step
= "execute initial config primitive"
7224 self
.logger
.debug(logging_text
+ step
)
7225 await asyncio
.wait_for(
7226 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7227 cluster_uuid
=cluster_uuid
,
7228 kdu_instance
=kdu_instance
,
7229 primitive_name
=initial_config_primitive
["name"],
7230 params
=primitive_params_
,
7237 async def _scale_ng_ro(
7238 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
7240 nsr_id
= db_nslcmop
["nsInstanceId"]
7241 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7244 # read from db: vnfd's for every vnf
7247 # for each vnf in ns, read vnfd
7248 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
7249 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
7250 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
7251 # if we haven't this vnfd, read it from db
7252 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
7254 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7255 db_vnfds
.append(vnfd
)
7256 n2vc_key
= self
.n2vc
.get_public_key()
7257 n2vc_key_list
= [n2vc_key
]
7260 vdu_scaling_info
.get("vdu-create"),
7261 vdu_scaling_info
.get("vdu-delete"),
7264 # db_vnfr has been updated, update db_vnfrs to use it
7265 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
7266 await self
._instantiate
_ng
_ro
(
7276 start_deploy
=time(),
7277 timeout_ns_deploy
=self
.timeout_ns_deploy
,
7279 if vdu_scaling_info
.get("vdu-delete"):
7281 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
7284 async def extract_prometheus_scrape_jobs(
7285 self
, ee_id
, artifact_path
, ee_config_descriptor
, vnfr_id
, nsr_id
, target_ip
7287 # look if exist a file called 'prometheus*.j2' and
7288 artifact_content
= self
.fs
.dir_ls(artifact_path
)
7292 for f
in artifact_content
7293 if f
.startswith("prometheus") and f
.endswith(".j2")
7299 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
7303 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
7304 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
7306 vnfr_id
= vnfr_id
.replace("-", "")
7308 "JOB_NAME": vnfr_id
,
7309 "TARGET_IP": target_ip
,
7310 "EXPORTER_POD_IP": host_name
,
7311 "EXPORTER_POD_PORT": host_port
,
7313 job_list
= parse_job(job_data
, variables
)
7314 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
7315 for job
in job_list
:
7317 not isinstance(job
.get("job_name"), str)
7318 or vnfr_id
not in job
["job_name"]
7320 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
7321 job
["nsr_id"] = nsr_id
7322 job
["vnfr_id"] = vnfr_id
7325 async def rebuild_start_stop(
7326 self
, nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
7328 logging_text
= "Task ns={} {}={} ".format(nsr_id
, operation_type
, nslcmop_id
)
7329 self
.logger
.info(logging_text
+ "Enter")
7330 stage
= ["Preparing the environment", ""]
7331 # database nsrs record
7335 # in case of error, indicates what part of scale was failed to put nsr at error status
7336 start_deploy
= time()
7338 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_id
})
7339 vim_account_id
= db_vnfr
.get("vim-account-id")
7340 vim_info_key
= "vim:" + vim_account_id
7341 vdu_id
= additional_param
["vdu_id"]
7342 vdurs
= [item
for item
in db_vnfr
["vdur"] if item
["vdu-id-ref"] == vdu_id
]
7343 vdur
= find_in_list(
7344 vdurs
, lambda vdu
: vdu
["count-index"] == additional_param
["count-index"]
7347 vdu_vim_name
= vdur
["name"]
7348 vim_vm_id
= vdur
["vim_info"][vim_info_key
]["vim_id"]
7349 target_vim
, _
= next(k_v
for k_v
in vdur
["vim_info"].items())
7351 raise LcmException("Target vdu is not found")
7352 self
.logger
.info("vdu_vim_name >> {} ".format(vdu_vim_name
))
7353 # wait for any previous tasks in process
7354 stage
[1] = "Waiting for previous operations to terminate"
7355 self
.logger
.info(stage
[1])
7356 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7358 stage
[1] = "Reading from database."
7359 self
.logger
.info(stage
[1])
7360 self
._write
_ns
_status
(
7363 current_operation
=operation_type
.upper(),
7364 current_operation_id
=nslcmop_id
,
7366 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7369 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
7370 db_nsr_update
["operational-status"] = operation_type
7371 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7375 "vim_vm_id": vim_vm_id
,
7377 "vdu_index": additional_param
["count-index"],
7378 "vdu_id": vdur
["id"],
7379 "target_vim": target_vim
,
7380 "vim_account_id": vim_account_id
,
7383 stage
[1] = "Sending rebuild request to RO... {}".format(desc
)
7384 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7385 self
.logger
.info("ro nsr id: {}".format(nsr_id
))
7386 result_dict
= await self
.RO
.operate(nsr_id
, desc
, operation_type
)
7387 self
.logger
.info("response from RO: {}".format(result_dict
))
7388 action_id
= result_dict
["action_id"]
7389 await self
._wait
_ng
_ro
(
7394 self
.timeout_operate
,
7396 "start_stop_rebuild",
7398 return "COMPLETED", "Done"
7399 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7400 self
.logger
.error("Exit Exception {}".format(e
))
7402 except asyncio
.CancelledError
:
7403 self
.logger
.error("Cancelled Exception while '{}'".format(stage
))
7404 exc
= "Operation was cancelled"
7405 except Exception as e
:
7406 exc
= traceback
.format_exc()
7407 self
.logger
.critical(
7408 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7410 return "FAILED", "Error in operate VNF {}".format(exc
)
7412 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7414 Get VCA Cloud and VCA Cloud Credentials for the VIM account
7416 :param: vim_account_id: VIM Account ID
7418 :return: (cloud_name, cloud_credential)
7420 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7421 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
7423 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7425 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
7427 :param: vim_account_id: VIM Account ID
7429 :return: (cloud_name, cloud_credential)
7431 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7432 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")
7434 async def migrate(self
, nsr_id
, nslcmop_id
):
7436 Migrate VNFs and VDUs instances in a NS
7438 :param: nsr_id: NS Instance ID
7439 :param: nslcmop_id: nslcmop ID of migrate
7442 # Try to lock HA task here
7443 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7444 if not task_is_locked_by_me
:
7446 logging_text
= "Task ns={} migrate ".format(nsr_id
)
7447 self
.logger
.debug(logging_text
+ "Enter")
7448 # get all needed from database
7450 db_nslcmop_update
= {}
7451 nslcmop_operation_state
= None
7455 # in case of error, indicates what part of scale was failed to put nsr at error status
7456 start_deploy
= time()
7459 # wait for any previous tasks in process
7460 step
= "Waiting for previous operations to terminate"
7461 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7463 self
._write
_ns
_status
(
7466 current_operation
="MIGRATING",
7467 current_operation_id
=nslcmop_id
,
7469 step
= "Getting nslcmop from database"
7471 step
+ " after having waited for previous tasks to be completed"
7473 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7474 migrate_params
= db_nslcmop
.get("operationParams")
7477 target
.update(migrate_params
)
7478 desc
= await self
.RO
.migrate(nsr_id
, target
)
7479 self
.logger
.debug("RO return > {}".format(desc
))
7480 action_id
= desc
["action_id"]
7481 await self
._wait
_ng
_ro
(
7486 self
.timeout_migrate
,
7487 operation
="migrate",
7489 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7490 self
.logger
.error("Exit Exception {}".format(e
))
7492 except asyncio
.CancelledError
:
7493 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
7494 exc
= "Operation was cancelled"
7495 except Exception as e
:
7496 exc
= traceback
.format_exc()
7497 self
.logger
.critical(
7498 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7501 self
._write
_ns
_status
(
7504 current_operation
="IDLE",
7505 current_operation_id
=None,
7508 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
7509 nslcmop_operation_state
= "FAILED"
7511 nslcmop_operation_state
= "COMPLETED"
7512 db_nslcmop_update
["detailed-status"] = "Done"
7513 db_nsr_update
["detailed-status"] = "Done"
7515 self
._write
_op
_status
(
7519 operation_state
=nslcmop_operation_state
,
7520 other_update
=db_nslcmop_update
,
7522 if nslcmop_operation_state
:
7526 "nslcmop_id": nslcmop_id
,
7527 "operationState": nslcmop_operation_state
,
7529 await self
.msg
.aiowrite("ns", "migrated", msg
, loop
=self
.loop
)
7530 except Exception as e
:
7532 logging_text
+ "kafka_write notification Exception {}".format(e
)
7534 self
.logger
.debug(logging_text
+ "Exit")
7535 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_migrate")
7537 async def heal(self
, nsr_id
, nslcmop_id
):
7541 :param nsr_id: ns instance to heal
7542 :param nslcmop_id: operation to run
7546 # Try to lock HA task here
7547 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7548 if not task_is_locked_by_me
:
7551 logging_text
= "Task ns={} heal={} ".format(nsr_id
, nslcmop_id
)
7552 stage
= ["", "", ""]
7553 tasks_dict_info
= {}
7554 # ^ stage, step, VIM progress
7555 self
.logger
.debug(logging_text
+ "Enter")
7556 # get all needed from database
7558 db_nslcmop_update
= {}
7560 db_vnfrs
= {} # vnf's info indexed by _id
7562 old_operational_status
= ""
7563 old_config_status
= ""
7566 # wait for any previous tasks in process
7567 step
= "Waiting for previous operations to terminate"
7568 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7569 self
._write
_ns
_status
(
7572 current_operation
="HEALING",
7573 current_operation_id
=nslcmop_id
,
7576 step
= "Getting nslcmop from database"
7578 step
+ " after having waited for previous tasks to be completed"
7580 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7582 step
= "Getting nsr from database"
7583 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
7584 old_operational_status
= db_nsr
["operational-status"]
7585 old_config_status
= db_nsr
["config-status"]
7588 "_admin.deployed.RO.operational-status": "healing",
7590 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7592 step
= "Sending heal order to VIM"
7593 task_ro
= asyncio
.ensure_future(
7595 logging_text
=logging_text
,
7597 db_nslcmop
=db_nslcmop
,
7601 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "heal_RO", task_ro
)
7602 tasks_dict_info
[task_ro
] = "Healing at VIM"
7606 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
7607 self
.logger
.debug(logging_text
+ stage
[1])
7608 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7609 self
.fs
.sync(db_nsr
["nsd-id"])
7611 # read from db: vnfr's of this ns
7612 step
= "Getting vnfrs from db"
7613 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
7614 for vnfr
in db_vnfrs_list
:
7615 db_vnfrs
[vnfr
["_id"]] = vnfr
7616 self
.logger
.debug("ns.heal db_vnfrs={}".format(db_vnfrs
))
7618 # Check for each target VNF
7619 target_list
= db_nslcmop
.get("operationParams", {}).get("healVnfData", {})
7620 for target_vnf
in target_list
:
7621 # Find this VNF in the list from DB
7622 vnfr_id
= target_vnf
.get("vnfInstanceId", None)
7624 db_vnfr
= db_vnfrs
[vnfr_id
]
7625 vnfd_id
= db_vnfr
.get("vnfd-id")
7626 vnfd_ref
= db_vnfr
.get("vnfd-ref")
7627 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7628 base_folder
= vnfd
["_admin"]["storage"]
7633 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
7634 member_vnf_index
= db_vnfr
.get("member-vnf-index-ref")
7636 # Check each target VDU and deploy N2VC
7637 target_vdu_list
= target_vnf
.get("additionalParams", {}).get(
7640 if not target_vdu_list
:
7641 # Codigo nuevo para crear diccionario
7642 target_vdu_list
= []
7643 for existing_vdu
in db_vnfr
.get("vdur"):
7644 vdu_name
= existing_vdu
.get("vdu-name", None)
7645 vdu_index
= existing_vdu
.get("count-index", 0)
7646 vdu_run_day1
= target_vnf
.get("additionalParams", {}).get(
7649 vdu_to_be_healed
= {
7651 "count-index": vdu_index
,
7652 "run-day1": vdu_run_day1
,
7654 target_vdu_list
.append(vdu_to_be_healed
)
7655 for target_vdu
in target_vdu_list
:
7656 deploy_params_vdu
= target_vdu
7657 # Set run-day1 vnf level value if not vdu level value exists
7658 if not deploy_params_vdu
.get("run-day1") and target_vnf
[
7661 deploy_params_vdu
["run-day1"] = target_vnf
[
7664 vdu_name
= target_vdu
.get("vdu-id", None)
7665 # TODO: Get vdu_id from vdud.
7667 # For multi instance VDU count-index is mandatory
7668 # For single session VDU count-indes is 0
7669 vdu_index
= target_vdu
.get("count-index", 0)
7671 # n2vc_redesign STEP 3 to 6 Deploy N2VC
7672 stage
[1] = "Deploying Execution Environments."
7673 self
.logger
.debug(logging_text
+ stage
[1])
7675 # VNF Level charm. Normal case when proxy charms.
7676 # If target instance is management machine continue with actions: recreate EE for native charms or reinject juju key for proxy charms.
7677 descriptor_config
= get_configuration(vnfd
, vnfd_ref
)
7678 if descriptor_config
:
7679 # Continue if healed machine is management machine
7680 vnf_ip_address
= db_vnfr
.get("ip-address")
7681 target_instance
= None
7682 for instance
in db_vnfr
.get("vdur", None):
7684 instance
["vdu-name"] == vdu_name
7685 and instance
["count-index"] == vdu_index
7687 target_instance
= instance
7689 if vnf_ip_address
== target_instance
.get("ip-address"):
7691 logging_text
=logging_text
7692 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7693 member_vnf_index
, vdu_name
, vdu_index
7697 nslcmop_id
=nslcmop_id
,
7703 member_vnf_index
=member_vnf_index
,
7706 deploy_params
=deploy_params_vdu
,
7707 descriptor_config
=descriptor_config
,
7708 base_folder
=base_folder
,
7709 task_instantiation_info
=tasks_dict_info
,
7713 # VDU Level charm. Normal case with native charms.
7714 descriptor_config
= get_configuration(vnfd
, vdu_name
)
7715 if descriptor_config
:
7717 logging_text
=logging_text
7718 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7719 member_vnf_index
, vdu_name
, vdu_index
7723 nslcmop_id
=nslcmop_id
,
7729 member_vnf_index
=member_vnf_index
,
7730 vdu_index
=vdu_index
,
7732 deploy_params
=deploy_params_vdu
,
7733 descriptor_config
=descriptor_config
,
7734 base_folder
=base_folder
,
7735 task_instantiation_info
=tasks_dict_info
,
7740 ROclient
.ROClientException
,
7745 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7747 except asyncio
.CancelledError
:
7749 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7751 exc
= "Operation was cancelled"
7752 except Exception as e
:
7753 exc
= traceback
.format_exc()
7754 self
.logger
.critical(
7755 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7760 stage
[1] = "Waiting for healing pending tasks."
7761 self
.logger
.debug(logging_text
+ stage
[1])
7762 exc
= await self
._wait
_for
_tasks
(
7765 self
.timeout_ns_deploy
,
7773 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7774 nslcmop_operation_state
= "FAILED"
7776 db_nsr_update
["operational-status"] = old_operational_status
7777 db_nsr_update
["config-status"] = old_config_status
7780 ] = "FAILED healing nslcmop={} {}: {}".format(nslcmop_id
, step
, exc
)
7781 for task
, task_name
in tasks_dict_info
.items():
7782 if not task
.done() or task
.cancelled() or task
.exception():
7783 if task_name
.startswith(self
.task_name_deploy_vca
):
7784 # A N2VC task is pending
7785 db_nsr_update
["config-status"] = "failed"
7787 # RO task is pending
7788 db_nsr_update
["operational-status"] = "failed"
7790 error_description_nslcmop
= None
7791 nslcmop_operation_state
= "COMPLETED"
7792 db_nslcmop_update
["detailed-status"] = "Done"
7793 db_nsr_update
["detailed-status"] = "Done"
7794 db_nsr_update
["operational-status"] = "running"
7795 db_nsr_update
["config-status"] = "configured"
7797 self
._write
_op
_status
(
7800 error_message
=error_description_nslcmop
,
7801 operation_state
=nslcmop_operation_state
,
7802 other_update
=db_nslcmop_update
,
7805 self
._write
_ns
_status
(
7808 current_operation
="IDLE",
7809 current_operation_id
=None,
7810 other_update
=db_nsr_update
,
7813 if nslcmop_operation_state
:
7817 "nslcmop_id": nslcmop_id
,
7818 "operationState": nslcmop_operation_state
,
7820 await self
.msg
.aiowrite("ns", "healed", msg
, loop
=self
.loop
)
7821 except Exception as e
:
7823 logging_text
+ "kafka_write notification Exception {}".format(e
)
7825 self
.logger
.debug(logging_text
+ "Exit")
7826 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_heal")
7837 :param logging_text: preffix text to use at logging
7838 :param nsr_id: nsr identity
7839 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
7840 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
7841 :return: None or exception
7844 def get_vim_account(vim_account_id
):
7846 if vim_account_id
in db_vims
:
7847 return db_vims
[vim_account_id
]
7848 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
7849 db_vims
[vim_account_id
] = db_vim
7854 ns_params
= db_nslcmop
.get("operationParams")
7855 if ns_params
and ns_params
.get("timeout_ns_heal"):
7856 timeout_ns_heal
= ns_params
["timeout_ns_heal"]
7858 timeout_ns_heal
= self
.timeout
.get("ns_heal", self
.timeout_ns_heal
)
7862 nslcmop_id
= db_nslcmop
["_id"]
7864 "action_id": nslcmop_id
,
7866 self
.logger
.warning(
7867 "db_nslcmop={} and timeout_ns_heal={}".format(
7868 db_nslcmop
, timeout_ns_heal
7871 target
.update(db_nslcmop
.get("operationParams", {}))
7873 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
7874 desc
= await self
.RO
.recreate(nsr_id
, target
)
7875 self
.logger
.debug("RO return > {}".format(desc
))
7876 action_id
= desc
["action_id"]
7877 # waits for RO to complete because Reinjecting juju key at ro can find VM in state Deleted
7878 await self
._wait
_ng
_ro
(
7885 operation
="healing",
7890 "_admin.deployed.RO.operational-status": "running",
7891 "detailed-status": " ".join(stage
),
7893 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7894 self
._write
_op
_status
(nslcmop_id
, stage
)
7896 logging_text
+ "ns healed at RO. RO_id={}".format(action_id
)
7899 except Exception as e
:
7900 stage
[2] = "ERROR healing at VIM"
7901 # self.set_vnfr_at_error(db_vnfrs, str(e))
7903 "Error healing at VIM {}".format(e
),
7904 exc_info
=not isinstance(
7907 ROclient
.ROClientException
,
7933 task_instantiation_info
,
7936 # launch instantiate_N2VC in a asyncio task and register task object
7937 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
7938 # if not found, create one entry and update database
7939 # fill db_nsr._admin.deployed.VCA.<index>
7942 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
7946 get_charm_name
= False
7947 if "execution-environment-list" in descriptor_config
:
7948 ee_list
= descriptor_config
.get("execution-environment-list", [])
7949 elif "juju" in descriptor_config
:
7950 ee_list
= [descriptor_config
] # ns charms
7951 if "execution-environment-list" not in descriptor_config
:
7952 # charm name is only required for ns charms
7953 get_charm_name
= True
7954 else: # other types as script are not supported
7957 for ee_item
in ee_list
:
7960 + "_deploy_n2vc ee_item juju={}, helm={}".format(
7961 ee_item
.get("juju"), ee_item
.get("helm-chart")
7964 ee_descriptor_id
= ee_item
.get("id")
7965 if ee_item
.get("juju"):
7966 vca_name
= ee_item
["juju"].get("charm")
7968 charm_name
= self
.find_charm_name(db_nsr
, str(vca_name
))
7971 if ee_item
["juju"].get("charm") is not None
7974 if ee_item
["juju"].get("cloud") == "k8s":
7975 vca_type
= "k8s_proxy_charm"
7976 elif ee_item
["juju"].get("proxy") is False:
7977 vca_type
= "native_charm"
7978 elif ee_item
.get("helm-chart"):
7979 vca_name
= ee_item
["helm-chart"]
7980 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
7983 vca_type
= "helm-v3"
7986 logging_text
+ "skipping non juju neither charm configuration"
7991 for vca_index
, vca_deployed
in enumerate(
7992 db_nsr
["_admin"]["deployed"]["VCA"]
7994 if not vca_deployed
:
7997 vca_deployed
.get("member-vnf-index") == member_vnf_index
7998 and vca_deployed
.get("vdu_id") == vdu_id
7999 and vca_deployed
.get("kdu_name") == kdu_name
8000 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
8001 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
8005 # not found, create one.
8007 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
8010 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
8012 target
+= "/kdu/{}".format(kdu_name
)
8014 "target_element": target
,
8015 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
8016 "member-vnf-index": member_vnf_index
,
8018 "kdu_name": kdu_name
,
8019 "vdu_count_index": vdu_index
,
8020 "operational-status": "init", # TODO revise
8021 "detailed-status": "", # TODO revise
8022 "step": "initial-deploy", # TODO revise
8024 "vdu_name": vdu_name
,
8026 "ee_descriptor_id": ee_descriptor_id
,
8027 "charm_name": charm_name
,
8031 # create VCA and configurationStatus in db
8033 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
8034 "configurationStatus.{}".format(vca_index
): dict(),
8036 self
.update_db_2("nsrs", nsr_id
, db_dict
)
8038 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
8040 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
8041 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
8042 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
8045 task_n2vc
= asyncio
.ensure_future(
8047 logging_text
=logging_text
,
8048 vca_index
=vca_index
,
8054 vdu_index
=vdu_index
,
8055 deploy_params
=deploy_params
,
8056 config_descriptor
=descriptor_config
,
8057 base_folder
=base_folder
,
8058 nslcmop_id
=nslcmop_id
,
8062 ee_config_descriptor
=ee_item
,
8065 self
.lcm_tasks
.register(
8069 "instantiate_N2VC-{}".format(vca_index
),
8072 task_instantiation_info
[
8074 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
8075 member_vnf_index
or "", vdu_id
or ""
8078 async def heal_N2VC(
8095 ee_config_descriptor
,
8097 nsr_id
= db_nsr
["_id"]
8098 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
8099 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
8100 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
8101 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
8103 "collection": "nsrs",
8104 "filter": {"_id": nsr_id
},
8105 "path": db_update_entry
,
8111 element_under_configuration
= nsr_id
8115 vnfr_id
= db_vnfr
["_id"]
8116 osm_config
["osm"]["vnf_id"] = vnfr_id
8118 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
8120 if vca_type
== "native_charm":
8123 index_number
= vdu_index
or 0
8126 element_type
= "VNF"
8127 element_under_configuration
= vnfr_id
8128 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
8130 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
8131 element_type
= "VDU"
8132 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
8133 osm_config
["osm"]["vdu_id"] = vdu_id
8135 namespace
+= ".{}".format(kdu_name
)
8136 element_type
= "KDU"
8137 element_under_configuration
= kdu_name
8138 osm_config
["osm"]["kdu_name"] = kdu_name
8141 if base_folder
["pkg-dir"]:
8142 artifact_path
= "{}/{}/{}/{}".format(
8143 base_folder
["folder"],
8144 base_folder
["pkg-dir"],
8147 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8152 artifact_path
= "{}/Scripts/{}/{}/".format(
8153 base_folder
["folder"],
8156 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8161 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
8163 # get initial_config_primitive_list that applies to this element
8164 initial_config_primitive_list
= config_descriptor
.get(
8165 "initial-config-primitive"
8169 "Initial config primitive list > {}".format(
8170 initial_config_primitive_list
8174 # add config if not present for NS charm
8175 ee_descriptor_id
= ee_config_descriptor
.get("id")
8176 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
8177 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
8178 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
8182 "Initial config primitive list #2 > {}".format(
8183 initial_config_primitive_list
8186 # n2vc_redesign STEP 3.1
8187 # find old ee_id if exists
8188 ee_id
= vca_deployed
.get("ee_id")
8190 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
8191 # create or register execution environment in VCA. Only for native charms when healing
8192 if vca_type
== "native_charm":
8193 step
= "Waiting to VM being up and getting IP address"
8194 self
.logger
.debug(logging_text
+ step
)
8195 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8204 credentials
= {"hostname": rw_mgmt_ip
}
8206 username
= deep_get(
8207 config_descriptor
, ("config-access", "ssh-access", "default-user")
8209 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
8210 # merged. Meanwhile let's get username from initial-config-primitive
8211 if not username
and initial_config_primitive_list
:
8212 for config_primitive
in initial_config_primitive_list
:
8213 for param
in config_primitive
.get("parameter", ()):
8214 if param
["name"] == "ssh-username":
8215 username
= param
["value"]
8219 "Cannot determine the username neither with 'initial-config-primitive' nor with "
8220 "'config-access.ssh-access.default-user'"
8222 credentials
["username"] = username
8224 # n2vc_redesign STEP 3.2
8225 # TODO: Before healing at RO it is needed to destroy native charm units to be deleted.
8226 self
._write
_configuration
_status
(
8228 vca_index
=vca_index
,
8229 status
="REGISTERING",
8230 element_under_configuration
=element_under_configuration
,
8231 element_type
=element_type
,
8234 step
= "register execution environment {}".format(credentials
)
8235 self
.logger
.debug(logging_text
+ step
)
8236 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
8237 credentials
=credentials
,
8238 namespace
=namespace
,
8243 # update ee_id en db
8245 "_admin.deployed.VCA.{}.ee_id".format(vca_index
): ee_id
,
8247 self
.update_db_2("nsrs", nsr_id
, db_dict_ee_id
)
8249 # for compatibility with MON/POL modules, the need model and application name at database
8250 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
8251 # Not sure if this need to be done when healing
8253 ee_id_parts = ee_id.split(".")
8254 db_nsr_update = {db_update_entry + "ee_id": ee_id}
8255 if len(ee_id_parts) >= 2:
8256 model_name = ee_id_parts[0]
8257 application_name = ee_id_parts[1]
8258 db_nsr_update[db_update_entry + "model"] = model_name
8259 db_nsr_update[db_update_entry + "application"] = application_name
8262 # n2vc_redesign STEP 3.3
8263 # Install configuration software. Only for native charms.
8264 step
= "Install configuration Software"
8266 self
._write
_configuration
_status
(
8268 vca_index
=vca_index
,
8269 status
="INSTALLING SW",
8270 element_under_configuration
=element_under_configuration
,
8271 element_type
=element_type
,
8272 # other_update=db_nsr_update,
8276 # TODO check if already done
8277 self
.logger
.debug(logging_text
+ step
)
8279 if vca_type
== "native_charm":
8280 config_primitive
= next(
8281 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
8284 if config_primitive
:
8285 config
= self
._map
_primitive
_params
(
8286 config_primitive
, {}, deploy_params
8288 await self
.vca_map
[vca_type
].install_configuration_sw(
8290 artifact_path
=artifact_path
,
8298 # write in db flag of configuration_sw already installed
8300 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
8303 # Not sure if this need to be done when healing
8305 # add relations for this VCA (wait for other peers related with this VCA)
8306 await self._add_vca_relations(
8307 logging_text=logging_text,
8310 vca_index=vca_index,
8314 # if SSH access is required, then get execution environment SSH public
8315 # if native charm we have waited already to VM be UP
8316 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
8319 # self.logger.debug("get ssh key block")
8321 config_descriptor
, ("config-access", "ssh-access", "required")
8323 # self.logger.debug("ssh key needed")
8324 # Needed to inject a ssh key
8327 ("config-access", "ssh-access", "default-user"),
8329 step
= "Install configuration Software, getting public ssh key"
8330 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
8331 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
8334 step
= "Insert public key into VM user={} ssh_key={}".format(
8338 # self.logger.debug("no need to get ssh key")
8339 step
= "Waiting to VM being up and getting IP address"
8340 self
.logger
.debug(logging_text
+ step
)
8342 # n2vc_redesign STEP 5.1
8343 # wait for RO (ip-address) Insert pub_key into VM
8344 # IMPORTANT: We need do wait for RO to complete healing operation.
8345 await self
._wait
_heal
_ro
(nsr_id
, self
.timeout_ns_heal
)
8348 rw_mgmt_ip
= await self
.wait_kdu_up(
8349 logging_text
, nsr_id
, vnfr_id
, kdu_name
8352 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8362 rw_mgmt_ip
= None # This is for a NS configuration
8364 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
8366 # store rw_mgmt_ip in deploy params for later replacement
8367 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
8370 # get run-day1 operation parameter
8371 runDay1
= deploy_params
.get("run-day1", False)
8373 "Healing vnf={}, vdu={}, runDay1 ={}".format(vnfr_id
, vdu_id
, runDay1
)
8376 # n2vc_redesign STEP 6 Execute initial config primitive
8377 step
= "execute initial config primitive"
8379 # wait for dependent primitives execution (NS -> VNF -> VDU)
8380 if initial_config_primitive_list
:
8381 await self
._wait
_dependent
_n
2vc
(
8382 nsr_id
, vca_deployed_list
, vca_index
8385 # stage, in function of element type: vdu, kdu, vnf or ns
8386 my_vca
= vca_deployed_list
[vca_index
]
8387 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
8389 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
8390 elif my_vca
.get("member-vnf-index"):
8392 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
8395 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
8397 self
._write
_configuration
_status
(
8398 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
8401 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
8403 check_if_terminated_needed
= True
8404 for initial_config_primitive
in initial_config_primitive_list
:
8405 # adding information on the vca_deployed if it is a NS execution environment
8406 if not vca_deployed
["member-vnf-index"]:
8407 deploy_params
["ns_config_info"] = json
.dumps(
8408 self
._get
_ns
_config
_info
(nsr_id
)
8410 # TODO check if already done
8411 primitive_params_
= self
._map
_primitive
_params
(
8412 initial_config_primitive
, {}, deploy_params
8415 step
= "execute primitive '{}' params '{}'".format(
8416 initial_config_primitive
["name"], primitive_params_
8418 self
.logger
.debug(logging_text
+ step
)
8419 await self
.vca_map
[vca_type
].exec_primitive(
8421 primitive_name
=initial_config_primitive
["name"],
8422 params_dict
=primitive_params_
,
8427 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
8428 if check_if_terminated_needed
:
8429 if config_descriptor
.get("terminate-config-primitive"):
8433 {db_update_entry
+ "needed_terminate": True},
8435 check_if_terminated_needed
= False
8437 # TODO register in database that primitive is done
8439 # STEP 7 Configure metrics
8440 # Not sure if this need to be done when healing
8442 if vca_type == "helm" or vca_type == "helm-v3":
8443 prometheus_jobs = await self.extract_prometheus_scrape_jobs(
8445 artifact_path=artifact_path,
8446 ee_config_descriptor=ee_config_descriptor,
8449 target_ip=rw_mgmt_ip,
8455 {db_update_entry + "prometheus_jobs": prometheus_jobs},
8458 for job in prometheus_jobs:
8461 {"job_name": job["job_name"]},
8464 fail_on_empty=False,
8468 step
= "instantiated at VCA"
8469 self
.logger
.debug(logging_text
+ step
)
8471 self
._write
_configuration
_status
(
8472 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
8475 except Exception as e
: # TODO not use Exception but N2VC exception
8476 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
8478 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
8481 "Exception while {} : {}".format(step
, e
), exc_info
=True
8483 self
._write
_configuration
_status
(
8484 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
8486 raise LcmException("{} {}".format(step
, e
)) from e
8488 async def _wait_heal_ro(
8494 while time() <= start_time
+ timeout
:
8495 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
8496 operational_status_ro
= db_nsr
["_admin"]["deployed"]["RO"][
8497 "operational-status"
8499 self
.logger
.debug("Wait Heal RO > {}".format(operational_status_ro
))
8500 if operational_status_ro
!= "healing":
8502 await asyncio
.sleep(15, loop
=self
.loop
)
8503 else: # timeout_ns_deploy
8504 raise NgRoException("Timeout waiting ns to deploy")
8506 async def vertical_scale(self
, nsr_id
, nslcmop_id
):
8508 Vertical Scale the VDUs in a NS
8510 :param: nsr_id: NS Instance ID
8511 :param: nslcmop_id: nslcmop ID of migrate
8514 # Try to lock HA task here
8515 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
8516 if not task_is_locked_by_me
:
8518 logging_text
= "Task ns={} vertical scale ".format(nsr_id
)
8519 self
.logger
.debug(logging_text
+ "Enter")
8520 # get all needed from database
8522 db_nslcmop_update
= {}
8523 nslcmop_operation_state
= None
8527 # in case of error, indicates what part of scale was failed to put nsr at error status
8528 start_deploy
= time()
8531 # wait for any previous tasks in process
8532 step
= "Waiting for previous operations to terminate"
8533 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
8535 self
._write
_ns
_status
(
8538 current_operation
="VerticalScale",
8539 current_operation_id
=nslcmop_id
,
8541 step
= "Getting nslcmop from database"
8543 step
+ " after having waited for previous tasks to be completed"
8545 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
8546 operationParams
= db_nslcmop
.get("operationParams")
8548 target
.update(operationParams
)
8549 desc
= await self
.RO
.vertical_scale(nsr_id
, target
)
8550 self
.logger
.debug("RO return > {}".format(desc
))
8551 action_id
= desc
["action_id"]
8552 await self
._wait
_ng
_ro
(
8557 self
.timeout_verticalscale
,
8558 operation
="verticalscale",
8560 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
8561 self
.logger
.error("Exit Exception {}".format(e
))
8563 except asyncio
.CancelledError
:
8564 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
8565 exc
= "Operation was cancelled"
8566 except Exception as e
:
8567 exc
= traceback
.format_exc()
8568 self
.logger
.critical(
8569 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
8572 self
._write
_ns
_status
(
8575 current_operation
="IDLE",
8576 current_operation_id
=None,
8579 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
8580 nslcmop_operation_state
= "FAILED"
8582 nslcmop_operation_state
= "COMPLETED"
8583 db_nslcmop_update
["detailed-status"] = "Done"
8584 db_nsr_update
["detailed-status"] = "Done"
8586 self
._write
_op
_status
(
8590 operation_state
=nslcmop_operation_state
,
8591 other_update
=db_nslcmop_update
,
8593 if nslcmop_operation_state
:
8597 "nslcmop_id": nslcmop_id
,
8598 "operationState": nslcmop_operation_state
,
8600 await self
.msg
.aiowrite("ns", "verticalscaled", msg
, loop
=self
.loop
)
8601 except Exception as e
:
8603 logging_text
+ "kafka_write notification Exception {}".format(e
)
8605 self
.logger
.debug(logging_text
+ "Exit")
8606 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_verticalscale")