1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 from typing
import Any
, Dict
, List
24 import logging
.handlers
36 from osm_lcm
import ROclient
37 from osm_lcm
.data_utils
.nsr
import (
40 get_deployed_vca_list
,
43 from osm_lcm
.data_utils
.vca
import (
52 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
53 from osm_lcm
.lcm_utils
import (
60 check_juju_bundle_existence
,
61 get_charm_artifact_path
,
63 from osm_lcm
.data_utils
.nsd
import (
64 get_ns_configuration_relation_list
,
68 from osm_lcm
.data_utils
.vnfd
import (
74 get_ee_sorted_initial_config_primitive_list
,
75 get_ee_sorted_terminate_config_primitive_list
,
77 get_virtual_link_profiles
,
82 get_number_of_instances
,
84 get_kdu_resource_profile
,
85 find_software_version
,
87 from osm_lcm
.data_utils
.list_utils
import find_in_list
88 from osm_lcm
.data_utils
.vnfr
import (
92 get_volumes_from_instantiation_params
,
94 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
95 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
96 from n2vc
.definitions
import RelationEndpoint
97 from n2vc
.k8s_helm_conn
import K8sHelmConnector
98 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
99 from n2vc
.k8s_juju_conn
import K8sJujuConnector
101 from osm_common
.dbbase
import DbException
102 from osm_common
.fsbase
import FsException
104 from osm_lcm
.data_utils
.database
.database
import Database
105 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
107 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
108 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
110 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
111 from osm_lcm
.osm_config
import OsmConfigBuilder
112 from osm_lcm
.prometheus
import parse_job
114 from copy
import copy
, deepcopy
115 from time
import time
116 from uuid
import uuid4
118 from random
import randint
120 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
123 class NsLcm(LcmBase
):
124 timeout_vca_on_error
= (
126 ) # Time for charm from first time at blocked,error status to mark as failed
127 timeout_ns_deploy
= 2 * 3600 # default global timeout for deployment a ns
128 timeout_ns_terminate
= 1800 # default global timeout for un deployment a ns
129 timeout_ns_heal
= 1800 # default global timeout for un deployment a ns
130 timeout_charm_delete
= 10 * 60
131 timeout_primitive
= 30 * 60 # timeout for primitive execution
132 timeout_ns_update
= 30 * 60 # timeout for ns update
133 timeout_progress_primitive
= (
135 ) # timeout for some progress in a primitive execution
136 timeout_migrate
= 1800 # default global timeout for migrating vnfs
137 timeout_operate
= 1800 # default global timeout for migrating vnfs
138 timeout_verticalscale
= 1800 # default global timeout for Vertical Sclaing
139 SUBOPERATION_STATUS_NOT_FOUND
= -1
140 SUBOPERATION_STATUS_NEW
= -2
141 SUBOPERATION_STATUS_SKIP
= -3
142 task_name_deploy_vca
= "Deploying VCA"
144 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
146 Init, Connect to database, filesystem storage, and messaging
147 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
150 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
152 self
.db
= Database().instance
.db
153 self
.fs
= Filesystem().instance
.fs
155 self
.lcm_tasks
= lcm_tasks
156 self
.timeout
= config
["timeout"]
157 self
.ro_config
= config
["ro_config"]
158 self
.ng_ro
= config
["ro_config"].get("ng")
159 self
.vca_config
= config
["VCA"].copy()
161 # create N2VC connector
162 self
.n2vc
= N2VCJujuConnector(
165 on_update_db
=self
._on
_update
_n
2vc
_db
,
170 self
.conn_helm_ee
= LCMHelmConn(
173 vca_config
=self
.vca_config
,
174 on_update_db
=self
._on
_update
_n
2vc
_db
,
177 self
.k8sclusterhelm2
= K8sHelmConnector(
178 kubectl_command
=self
.vca_config
.get("kubectlpath"),
179 helm_command
=self
.vca_config
.get("helmpath"),
186 self
.k8sclusterhelm3
= K8sHelm3Connector(
187 kubectl_command
=self
.vca_config
.get("kubectlpath"),
188 helm_command
=self
.vca_config
.get("helm3path"),
195 self
.k8sclusterjuju
= K8sJujuConnector(
196 kubectl_command
=self
.vca_config
.get("kubectlpath"),
197 juju_command
=self
.vca_config
.get("jujupath"),
200 on_update_db
=self
._on
_update
_k
8s
_db
,
205 self
.k8scluster_map
= {
206 "helm-chart": self
.k8sclusterhelm2
,
207 "helm-chart-v3": self
.k8sclusterhelm3
,
208 "chart": self
.k8sclusterhelm3
,
209 "juju-bundle": self
.k8sclusterjuju
,
210 "juju": self
.k8sclusterjuju
,
214 "lxc_proxy_charm": self
.n2vc
,
215 "native_charm": self
.n2vc
,
216 "k8s_proxy_charm": self
.n2vc
,
217 "helm": self
.conn_helm_ee
,
218 "helm-v3": self
.conn_helm_ee
,
222 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
)
224 self
.op_status_map
= {
225 "instantiation": self
.RO
.status
,
226 "termination": self
.RO
.status
,
227 "migrate": self
.RO
.status
,
228 "healing": self
.RO
.recreate_status
,
229 "verticalscale": self
.RO
.status
,
230 "start_stop_rebuild": self
.RO
.status
,
234 def increment_ip_mac(ip_mac
, vm_index
=1):
235 if not isinstance(ip_mac
, str):
238 # try with ipv4 look for last dot
239 i
= ip_mac
.rfind(".")
242 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
243 # try with ipv6 or mac look for last colon. Operate in hex
244 i
= ip_mac
.rfind(":")
247 # format in hex, len can be 2 for mac or 4 for ipv6
248 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
249 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
255 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
257 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
260 # TODO filter RO descriptor fields...
264 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
265 db_dict
["deploymentStatus"] = ro_descriptor
266 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
268 except Exception as e
:
270 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id
, e
)
273 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
275 # remove last dot from path (if exists)
276 if path
.endswith("."):
279 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
280 # .format(table, filter, path, updated_data))
283 nsr_id
= filter.get("_id")
285 # read ns record from database
286 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
287 current_ns_status
= nsr
.get("nsState")
289 # get vca status for NS
290 status_dict
= await self
.n2vc
.get_status(
291 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
296 db_dict
["vcaStatus"] = status_dict
298 # update configurationStatus for this VCA
300 vca_index
= int(path
[path
.rfind(".") + 1 :])
303 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
305 vca_status
= vca_list
[vca_index
].get("status")
307 configuration_status_list
= nsr
.get("configurationStatus")
308 config_status
= configuration_status_list
[vca_index
].get("status")
310 if config_status
== "BROKEN" and vca_status
!= "failed":
311 db_dict
["configurationStatus"][vca_index
] = "READY"
312 elif config_status
!= "BROKEN" and vca_status
== "failed":
313 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
314 except Exception as e
:
315 # not update configurationStatus
316 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
318 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
319 # if nsState = 'DEGRADED' check if all is OK
321 if current_ns_status
in ("READY", "DEGRADED"):
322 error_description
= ""
324 if status_dict
.get("machines"):
325 for machine_id
in status_dict
.get("machines"):
326 machine
= status_dict
.get("machines").get(machine_id
)
327 # check machine agent-status
328 if machine
.get("agent-status"):
329 s
= machine
.get("agent-status").get("status")
332 error_description
+= (
333 "machine {} agent-status={} ; ".format(
337 # check machine instance status
338 if machine
.get("instance-status"):
339 s
= machine
.get("instance-status").get("status")
342 error_description
+= (
343 "machine {} instance-status={} ; ".format(
348 if status_dict
.get("applications"):
349 for app_id
in status_dict
.get("applications"):
350 app
= status_dict
.get("applications").get(app_id
)
351 # check application status
352 if app
.get("status"):
353 s
= app
.get("status").get("status")
356 error_description
+= (
357 "application {} status={} ; ".format(app_id
, s
)
360 if error_description
:
361 db_dict
["errorDescription"] = error_description
362 if current_ns_status
== "READY" and is_degraded
:
363 db_dict
["nsState"] = "DEGRADED"
364 if current_ns_status
== "DEGRADED" and not is_degraded
:
365 db_dict
["nsState"] = "READY"
368 self
.update_db_2("nsrs", nsr_id
, db_dict
)
370 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
372 except Exception as e
:
373 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
375 async def _on_update_k8s_db(
376 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None, cluster_type
="juju"
379 Updating vca status in NSR record
380 :param cluster_uuid: UUID of a k8s cluster
381 :param kdu_instance: The unique name of the KDU instance
382 :param filter: To get nsr_id
383 :cluster_type: The cluster type (juju, k8s)
387 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
388 # .format(cluster_uuid, kdu_instance, filter))
390 nsr_id
= filter.get("_id")
392 vca_status
= await self
.k8scluster_map
[cluster_type
].status_kdu(
393 cluster_uuid
=cluster_uuid
,
394 kdu_instance
=kdu_instance
,
396 complete_status
=True,
402 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
405 f
"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
409 self
.update_db_2("nsrs", nsr_id
, db_dict
)
410 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
412 except Exception as e
:
413 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
416 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
419 undefined
=StrictUndefined
,
420 autoescape
=select_autoescape(default_for_string
=True, default
=True),
422 template
= env
.from_string(cloud_init_text
)
423 return template
.render(additional_params
or {})
424 except UndefinedError
as e
:
426 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
427 "file, must be provided in the instantiation parameters inside the "
428 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
430 except (TemplateError
, TemplateNotFound
) as e
:
432 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
437 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
438 cloud_init_content
= cloud_init_file
= None
440 if vdu
.get("cloud-init-file"):
441 base_folder
= vnfd
["_admin"]["storage"]
442 if base_folder
["pkg-dir"]:
443 cloud_init_file
= "{}/{}/cloud_init/{}".format(
444 base_folder
["folder"],
445 base_folder
["pkg-dir"],
446 vdu
["cloud-init-file"],
449 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
450 base_folder
["folder"],
451 vdu
["cloud-init-file"],
453 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
454 cloud_init_content
= ci_file
.read()
455 elif vdu
.get("cloud-init"):
456 cloud_init_content
= vdu
["cloud-init"]
458 return cloud_init_content
459 except FsException
as e
:
461 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
462 vnfd
["id"], vdu
["id"], cloud_init_file
, e
466 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
468 (vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]), {}
470 additional_params
= vdur
.get("additionalParams")
471 return parse_yaml_strings(additional_params
)
473 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
475 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
476 :param vnfd: input vnfd
477 :param new_id: overrides vnf id if provided
478 :param additionalParams: Instantiation params for VNFs provided
479 :param nsrId: Id of the NSR
480 :return: copy of vnfd
482 vnfd_RO
= deepcopy(vnfd
)
483 # remove unused by RO configuration, monitoring, scaling and internal keys
484 vnfd_RO
.pop("_id", None)
485 vnfd_RO
.pop("_admin", None)
486 vnfd_RO
.pop("monitoring-param", None)
487 vnfd_RO
.pop("scaling-group-descriptor", None)
488 vnfd_RO
.pop("kdu", None)
489 vnfd_RO
.pop("k8s-cluster", None)
491 vnfd_RO
["id"] = new_id
493 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
494 for vdu
in get_iterable(vnfd_RO
, "vdu"):
495 vdu
.pop("cloud-init-file", None)
496 vdu
.pop("cloud-init", None)
500 def ip_profile_2_RO(ip_profile
):
501 RO_ip_profile
= deepcopy(ip_profile
)
502 if "dns-server" in RO_ip_profile
:
503 if isinstance(RO_ip_profile
["dns-server"], list):
504 RO_ip_profile
["dns-address"] = []
505 for ds
in RO_ip_profile
.pop("dns-server"):
506 RO_ip_profile
["dns-address"].append(ds
["address"])
508 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
509 if RO_ip_profile
.get("ip-version") == "ipv4":
510 RO_ip_profile
["ip-version"] = "IPv4"
511 if RO_ip_profile
.get("ip-version") == "ipv6":
512 RO_ip_profile
["ip-version"] = "IPv6"
513 if "dhcp-params" in RO_ip_profile
:
514 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
517 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
518 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
519 if db_vim
["_admin"]["operationalState"] != "ENABLED":
521 "VIM={} is not available. operationalState={}".format(
522 vim_account
, db_vim
["_admin"]["operationalState"]
525 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
528 def get_ro_wim_id_for_wim_account(self
, wim_account
):
529 if isinstance(wim_account
, str):
530 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
531 if db_wim
["_admin"]["operationalState"] != "ENABLED":
533 "WIM={} is not available. operationalState={}".format(
534 wim_account
, db_wim
["_admin"]["operationalState"]
537 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
542 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
544 db_vdu_push_list
= []
546 db_update
= {"_admin.modified": time()}
548 for vdu_id
, vdu_count
in vdu_create
.items():
552 for vdur
in reversed(db_vnfr
["vdur"])
553 if vdur
["vdu-id-ref"] == vdu_id
558 # Read the template saved in the db:
560 "No vdur in the database. Using the vdur-template to scale"
562 vdur_template
= db_vnfr
.get("vdur-template")
563 if not vdur_template
:
565 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
569 vdur
= vdur_template
[0]
570 # Delete a template from the database after using it
573 {"_id": db_vnfr
["_id"]},
575 pull
={"vdur-template": {"_id": vdur
["_id"]}},
577 for count
in range(vdu_count
):
578 vdur_copy
= deepcopy(vdur
)
579 vdur_copy
["status"] = "BUILD"
580 vdur_copy
["status-detailed"] = None
581 vdur_copy
["ip-address"] = None
582 vdur_copy
["_id"] = str(uuid4())
583 vdur_copy
["count-index"] += count
+ 1
584 vdur_copy
["id"] = "{}-{}".format(
585 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
587 vdur_copy
.pop("vim_info", None)
588 for iface
in vdur_copy
["interfaces"]:
589 if iface
.get("fixed-ip"):
590 iface
["ip-address"] = self
.increment_ip_mac(
591 iface
["ip-address"], count
+ 1
594 iface
.pop("ip-address", None)
595 if iface
.get("fixed-mac"):
596 iface
["mac-address"] = self
.increment_ip_mac(
597 iface
["mac-address"], count
+ 1
600 iface
.pop("mac-address", None)
604 ) # only first vdu can be managment of vnf
605 db_vdu_push_list
.append(vdur_copy
)
606 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
608 if len(db_vnfr
["vdur"]) == 1:
609 # The scale will move to 0 instances
611 "Scaling to 0 !, creating the template with the last vdur"
613 template_vdur
= [db_vnfr
["vdur"][0]]
614 for vdu_id
, vdu_count
in vdu_delete
.items():
616 indexes_to_delete
= [
618 for iv
in enumerate(db_vnfr
["vdur"])
619 if iv
[1]["vdu-id-ref"] == vdu_id
623 "vdur.{}.status".format(i
): "DELETING"
624 for i
in indexes_to_delete
[-vdu_count
:]
628 # it must be deleted one by one because common.db does not allow otherwise
631 for v
in reversed(db_vnfr
["vdur"])
632 if v
["vdu-id-ref"] == vdu_id
634 for vdu
in vdus_to_delete
[:vdu_count
]:
637 {"_id": db_vnfr
["_id"]},
639 pull
={"vdur": {"_id": vdu
["_id"]}},
643 db_push
["vdur"] = db_vdu_push_list
645 db_push
["vdur-template"] = template_vdur
648 db_vnfr
["vdur-template"] = template_vdur
649 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
650 # modify passed dictionary db_vnfr
651 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
652 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
654 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
656 Updates database nsr with the RO info for the created vld
657 :param ns_update_nsr: dictionary to be filled with the updated info
658 :param db_nsr: content of db_nsr. This is also modified
659 :param nsr_desc_RO: nsr descriptor from RO
660 :return: Nothing, LcmException is raised on errors
663 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
664 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
665 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
667 vld
["vim-id"] = net_RO
.get("vim_net_id")
668 vld
["name"] = net_RO
.get("vim_name")
669 vld
["status"] = net_RO
.get("status")
670 vld
["status-detailed"] = net_RO
.get("error_msg")
671 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
675 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
678 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
680 for db_vnfr
in db_vnfrs
.values():
681 vnfr_update
= {"status": "ERROR"}
682 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
683 if "status" not in vdur
:
684 vdur
["status"] = "ERROR"
685 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
687 vdur
["status-detailed"] = str(error_text
)
689 "vdur.{}.status-detailed".format(vdu_index
)
691 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
692 except DbException
as e
:
693 self
.logger
.error("Cannot update vnf. {}".format(e
))
695 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
697 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
698 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
699 :param nsr_desc_RO: nsr descriptor from RO
700 :return: Nothing, LcmException is raised on errors
702 for vnf_index
, db_vnfr
in db_vnfrs
.items():
703 for vnf_RO
in nsr_desc_RO
["vnfs"]:
704 if vnf_RO
["member_vnf_index"] != vnf_index
:
707 if vnf_RO
.get("ip_address"):
708 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
[
711 elif not db_vnfr
.get("ip-address"):
712 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
713 raise LcmExceptionNoMgmtIP(
714 "ns member_vnf_index '{}' has no IP address".format(
719 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
720 vdur_RO_count_index
= 0
721 if vdur
.get("pdu-type"):
723 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
724 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
726 if vdur
["count-index"] != vdur_RO_count_index
:
727 vdur_RO_count_index
+= 1
729 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
730 if vdur_RO
.get("ip_address"):
731 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
733 vdur
["ip-address"] = None
734 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
735 vdur
["name"] = vdur_RO
.get("vim_name")
736 vdur
["status"] = vdur_RO
.get("status")
737 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
738 for ifacer
in get_iterable(vdur
, "interfaces"):
739 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
740 if ifacer
["name"] == interface_RO
.get("internal_name"):
741 ifacer
["ip-address"] = interface_RO
.get(
744 ifacer
["mac-address"] = interface_RO
.get(
750 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
751 "from VIM info".format(
752 vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]
755 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
759 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
761 vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]
765 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
766 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
767 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
769 vld
["vim-id"] = net_RO
.get("vim_net_id")
770 vld
["name"] = net_RO
.get("vim_name")
771 vld
["status"] = net_RO
.get("status")
772 vld
["status-detailed"] = net_RO
.get("error_msg")
773 vnfr_update
["vld.{}".format(vld_index
)] = vld
777 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
782 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
787 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
792 def _get_ns_config_info(self
, nsr_id
):
794 Generates a mapping between vnf,vdu elements and the N2VC id
795 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
796 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
797 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
798 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
800 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
801 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
803 ns_config_info
= {"osm-config-mapping": mapping
}
804 for vca
in vca_deployed_list
:
805 if not vca
["member-vnf-index"]:
807 if not vca
["vdu_id"]:
808 mapping
[vca
["member-vnf-index"]] = vca
["application"]
812 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
814 ] = vca
["application"]
815 return ns_config_info
817 async def _instantiate_ng_ro(
834 def get_vim_account(vim_account_id
):
836 if vim_account_id
in db_vims
:
837 return db_vims
[vim_account_id
]
838 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
839 db_vims
[vim_account_id
] = db_vim
842 # modify target_vld info with instantiation parameters
843 def parse_vld_instantiation_params(
844 target_vim
, target_vld
, vld_params
, target_sdn
846 if vld_params
.get("ip-profile"):
847 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
[
850 if vld_params
.get("provider-network"):
851 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
854 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
855 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
858 if vld_params
.get("wimAccountId"):
859 target_wim
= "wim:{}".format(vld_params
["wimAccountId"])
860 target_vld
["vim_info"][target_wim
] = {}
861 for param
in ("vim-network-name", "vim-network-id"):
862 if vld_params
.get(param
):
863 if isinstance(vld_params
[param
], dict):
864 for vim
, vim_net
in vld_params
[param
].items():
865 other_target_vim
= "vim:" + vim
867 target_vld
["vim_info"],
868 (other_target_vim
, param
.replace("-", "_")),
871 else: # isinstance str
872 target_vld
["vim_info"][target_vim
][
873 param
.replace("-", "_")
874 ] = vld_params
[param
]
875 if vld_params
.get("common_id"):
876 target_vld
["common_id"] = vld_params
.get("common_id")
878 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
879 def update_ns_vld_target(target
, ns_params
):
880 for vnf_params
in ns_params
.get("vnf", ()):
881 if vnf_params
.get("vimAccountId"):
885 for vnfr
in db_vnfrs
.values()
886 if vnf_params
["member-vnf-index"]
887 == vnfr
["member-vnf-index-ref"]
891 vdur
= next((vdur
for vdur
in target_vnf
.get("vdur", ())), None)
892 for a_index
, a_vld
in enumerate(target
["ns"]["vld"]):
893 target_vld
= find_in_list(
894 get_iterable(vdur
, "interfaces"),
895 lambda iface
: iface
.get("ns-vld-id") == a_vld
["name"],
898 vld_params
= find_in_list(
899 get_iterable(ns_params
, "vld"),
900 lambda v_vld
: v_vld
["name"] in (a_vld
["name"], a_vld
["id"]),
904 if vnf_params
.get("vimAccountId") not in a_vld
.get(
907 target_vim_network_list
= [
908 v
for _
, v
in a_vld
.get("vim_info").items()
910 target_vim_network_name
= next(
912 item
.get("vim_network_name", "")
913 for item
in target_vim_network_list
918 target
["ns"]["vld"][a_index
].get("vim_info").update(
920 "vim:{}".format(vnf_params
["vimAccountId"]): {
921 "vim_network_name": target_vim_network_name
,
927 for param
in ("vim-network-name", "vim-network-id"):
928 if vld_params
.get(param
) and isinstance(
929 vld_params
[param
], dict
931 for vim
, vim_net
in vld_params
[
934 other_target_vim
= "vim:" + vim
936 target
["ns"]["vld"][a_index
].get(
941 param
.replace("-", "_"),
946 nslcmop_id
= db_nslcmop
["_id"]
948 "name": db_nsr
["name"],
951 "image": deepcopy(db_nsr
["image"]),
952 "flavor": deepcopy(db_nsr
["flavor"]),
953 "action_id": nslcmop_id
,
954 "cloud_init_content": {},
956 for image
in target
["image"]:
957 image
["vim_info"] = {}
958 for flavor
in target
["flavor"]:
959 flavor
["vim_info"] = {}
960 if db_nsr
.get("affinity-or-anti-affinity-group"):
961 target
["affinity-or-anti-affinity-group"] = deepcopy(
962 db_nsr
["affinity-or-anti-affinity-group"]
964 for affinity_or_anti_affinity_group
in target
[
965 "affinity-or-anti-affinity-group"
967 affinity_or_anti_affinity_group
["vim_info"] = {}
969 if db_nslcmop
.get("lcmOperationType") != "instantiate":
970 # get parameters of instantiation:
971 db_nslcmop_instantiate
= self
.db
.get_list(
974 "nsInstanceId": db_nslcmop
["nsInstanceId"],
975 "lcmOperationType": "instantiate",
978 ns_params
= db_nslcmop_instantiate
.get("operationParams")
980 ns_params
= db_nslcmop
.get("operationParams")
981 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
982 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
985 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
986 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
990 "mgmt-network": vld
.get("mgmt-network", False),
991 "type": vld
.get("type"),
994 "vim_network_name": vld
.get("vim-network-name"),
995 "vim_account_id": ns_params
["vimAccountId"],
999 # check if this network needs SDN assist
1000 if vld
.get("pci-interfaces"):
1001 db_vim
= get_vim_account(ns_params
["vimAccountId"])
1002 sdnc_id
= db_vim
["config"].get("sdn-controller")
1004 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
1005 target_sdn
= "sdn:{}".format(sdnc_id
)
1006 target_vld
["vim_info"][target_sdn
] = {
1008 "target_vim": target_vim
,
1010 "type": vld
.get("type"),
1013 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
1014 for nsd_vnf_profile
in nsd_vnf_profiles
:
1015 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
1016 if cp
["virtual-link-profile-id"] == vld
["id"]:
1018 "member_vnf:{}.{}".format(
1019 cp
["constituent-cpd-id"][0][
1020 "constituent-base-element-id"
1022 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
1024 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
1026 # check at nsd descriptor, if there is an ip-profile
1028 nsd_vlp
= find_in_list(
1029 get_virtual_link_profiles(nsd
),
1030 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
1035 and nsd_vlp
.get("virtual-link-protocol-data")
1036 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1038 ip_profile_source_data
= nsd_vlp
["virtual-link-protocol-data"][
1041 ip_profile_dest_data
= {}
1042 if "ip-version" in ip_profile_source_data
:
1043 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1046 if "cidr" in ip_profile_source_data
:
1047 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1050 if "gateway-ip" in ip_profile_source_data
:
1051 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
[
1054 if "dhcp-enabled" in ip_profile_source_data
:
1055 ip_profile_dest_data
["dhcp-params"] = {
1056 "enabled": ip_profile_source_data
["dhcp-enabled"]
1058 vld_params
["ip-profile"] = ip_profile_dest_data
1060 # update vld_params with instantiation params
1061 vld_instantiation_params
= find_in_list(
1062 get_iterable(ns_params
, "vld"),
1063 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
1065 if vld_instantiation_params
:
1066 vld_params
.update(vld_instantiation_params
)
1067 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
1068 target
["ns"]["vld"].append(target_vld
)
1069 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1070 update_ns_vld_target(target
, ns_params
)
1072 for vnfr
in db_vnfrs
.values():
1073 vnfd
= find_in_list(
1074 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
1076 vnf_params
= find_in_list(
1077 get_iterable(ns_params
, "vnf"),
1078 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
1080 target_vnf
= deepcopy(vnfr
)
1081 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
1082 for vld
in target_vnf
.get("vld", ()):
1083 # check if connected to a ns.vld, to fill target'
1084 vnf_cp
= find_in_list(
1085 vnfd
.get("int-virtual-link-desc", ()),
1086 lambda cpd
: cpd
.get("id") == vld
["id"],
1089 ns_cp
= "member_vnf:{}.{}".format(
1090 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
1092 if cp2target
.get(ns_cp
):
1093 vld
["target"] = cp2target
[ns_cp
]
1096 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
1098 # check if this network needs SDN assist
1100 if vld
.get("pci-interfaces"):
1101 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1102 sdnc_id
= db_vim
["config"].get("sdn-controller")
1104 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
1105 target_sdn
= "sdn:{}".format(sdnc_id
)
1106 vld
["vim_info"][target_sdn
] = {
1108 "target_vim": target_vim
,
1110 "type": vld
.get("type"),
1113 # check at vnfd descriptor, if there is an ip-profile
1115 vnfd_vlp
= find_in_list(
1116 get_virtual_link_profiles(vnfd
),
1117 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
1121 and vnfd_vlp
.get("virtual-link-protocol-data")
1122 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1124 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"][
1127 ip_profile_dest_data
= {}
1128 if "ip-version" in ip_profile_source_data
:
1129 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1132 if "cidr" in ip_profile_source_data
:
1133 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1136 if "gateway-ip" in ip_profile_source_data
:
1137 ip_profile_dest_data
[
1139 ] = ip_profile_source_data
["gateway-ip"]
1140 if "dhcp-enabled" in ip_profile_source_data
:
1141 ip_profile_dest_data
["dhcp-params"] = {
1142 "enabled": ip_profile_source_data
["dhcp-enabled"]
1145 vld_params
["ip-profile"] = ip_profile_dest_data
1146 # update vld_params with instantiation params
1148 vld_instantiation_params
= find_in_list(
1149 get_iterable(vnf_params
, "internal-vld"),
1150 lambda i_vld
: i_vld
["name"] == vld
["id"],
1152 if vld_instantiation_params
:
1153 vld_params
.update(vld_instantiation_params
)
1154 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
1157 for vdur
in target_vnf
.get("vdur", ()):
1158 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
1159 continue # This vdu must not be created
1160 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
1162 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
1165 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
1166 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
1169 and vdu_configuration
.get("config-access")
1170 and vdu_configuration
.get("config-access").get("ssh-access")
1172 vdur
["ssh-keys"] = ssh_keys_all
1173 vdur
["ssh-access-required"] = vdu_configuration
[
1175 ]["ssh-access"]["required"]
1178 and vnf_configuration
.get("config-access")
1179 and vnf_configuration
.get("config-access").get("ssh-access")
1180 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
1182 vdur
["ssh-keys"] = ssh_keys_all
1183 vdur
["ssh-access-required"] = vnf_configuration
[
1185 ]["ssh-access"]["required"]
1186 elif ssh_keys_instantiation
and find_in_list(
1187 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
1189 vdur
["ssh-keys"] = ssh_keys_instantiation
1191 self
.logger
.debug("NS > vdur > {}".format(vdur
))
1193 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1195 if vdud
.get("cloud-init-file"):
1196 vdur
["cloud-init"] = "{}:file:{}".format(
1197 vnfd
["_id"], vdud
.get("cloud-init-file")
1199 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1200 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1201 base_folder
= vnfd
["_admin"]["storage"]
1202 if base_folder
["pkg-dir"]:
1203 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1204 base_folder
["folder"],
1205 base_folder
["pkg-dir"],
1206 vdud
.get("cloud-init-file"),
1209 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
1210 base_folder
["folder"],
1211 vdud
.get("cloud-init-file"),
1213 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1214 target
["cloud_init_content"][
1217 elif vdud
.get("cloud-init"):
1218 vdur
["cloud-init"] = "{}:vdu:{}".format(
1219 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1221 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1222 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1225 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1226 deploy_params_vdu
= self
._format
_additional
_params
(
1227 vdur
.get("additionalParams") or {}
1229 deploy_params_vdu
["OSM"] = get_osm_params(
1230 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1232 vdur
["additionalParams"] = deploy_params_vdu
1235 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1236 if target_vim
not in ns_flavor
["vim_info"]:
1237 ns_flavor
["vim_info"][target_vim
] = {}
1240 # in case alternative images are provided we must check if they should be applied
1241 # for the vim_type, modify the vim_type taking into account
1242 ns_image_id
= int(vdur
["ns-image-id"])
1243 if vdur
.get("alt-image-ids"):
1244 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1245 vim_type
= db_vim
["vim_type"]
1246 for alt_image_id
in vdur
.get("alt-image-ids"):
1247 ns_alt_image
= target
["image"][int(alt_image_id
)]
1248 if vim_type
== ns_alt_image
.get("vim-type"):
1249 # must use alternative image
1251 "use alternative image id: {}".format(alt_image_id
)
1253 ns_image_id
= alt_image_id
1254 vdur
["ns-image-id"] = ns_image_id
1256 ns_image
= target
["image"][int(ns_image_id
)]
1257 if target_vim
not in ns_image
["vim_info"]:
1258 ns_image
["vim_info"][target_vim
] = {}
1261 if vdur
.get("affinity-or-anti-affinity-group-id"):
1262 for ags_id
in vdur
["affinity-or-anti-affinity-group-id"]:
1263 ns_ags
= target
["affinity-or-anti-affinity-group"][int(ags_id
)]
1264 if target_vim
not in ns_ags
["vim_info"]:
1265 ns_ags
["vim_info"][target_vim
] = {}
1267 vdur
["vim_info"] = {target_vim
: {}}
1268 # instantiation parameters
1270 vdu_instantiation_params
= find_in_list(
1271 get_iterable(vnf_params
, "vdu"),
1272 lambda i_vdu
: i_vdu
["id"] == vdud
["id"],
1274 if vdu_instantiation_params
:
1275 # Parse the vdu_volumes from the instantiation params
1276 vdu_volumes
= get_volumes_from_instantiation_params(
1277 vdu_instantiation_params
, vdud
1279 vdur
["additionalParams"]["OSM"]["vdu_volumes"] = vdu_volumes
1280 vdur_list
.append(vdur
)
1281 target_vnf
["vdur"] = vdur_list
1282 target
["vnf"].append(target_vnf
)
1284 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
1285 desc
= await self
.RO
.deploy(nsr_id
, target
)
1286 self
.logger
.debug("RO return > {}".format(desc
))
1287 action_id
= desc
["action_id"]
1288 await self
._wait
_ng
_ro
(
1289 nsr_id
, action_id
, nslcmop_id
, start_deploy
, timeout_ns_deploy
, stage
,
1290 operation
="instantiation"
1295 "_admin.deployed.RO.operational-status": "running",
1296 "detailed-status": " ".join(stage
),
1298 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1299 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1300 self
._write
_op
_status
(nslcmop_id
, stage
)
1302 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1306 async def _wait_ng_ro(
1316 detailed_status_old
= None
1318 start_time
= start_time
or time()
1319 while time() <= start_time
+ timeout
:
1320 desc_status
= await self
.op_status_map
[operation
](nsr_id
, action_id
)
1321 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1322 if desc_status
["status"] == "FAILED":
1323 raise NgRoException(desc_status
["details"])
1324 elif desc_status
["status"] == "BUILD":
1326 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1327 elif desc_status
["status"] == "DONE":
1329 stage
[2] = "Deployed at VIM"
1332 assert False, "ROclient.check_ns_status returns unknown {}".format(
1333 desc_status
["status"]
1335 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1336 detailed_status_old
= stage
[2]
1337 db_nsr_update
["detailed-status"] = " ".join(stage
)
1338 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1339 self
._write
_op
_status
(nslcmop_id
, stage
)
1340 await asyncio
.sleep(15, loop
=self
.loop
)
1341 else: # timeout_ns_deploy
1342 raise NgRoException("Timeout waiting ns to deploy")
1344 async def _terminate_ng_ro(
1345 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1350 start_deploy
= time()
1357 "action_id": nslcmop_id
,
1359 desc
= await self
.RO
.deploy(nsr_id
, target
)
1360 action_id
= desc
["action_id"]
1361 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1362 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1365 + "ns terminate action at RO. action_id={}".format(action_id
)
1369 delete_timeout
= 20 * 60 # 20 minutes
1370 await self
._wait
_ng
_ro
(
1371 nsr_id
, action_id
, nslcmop_id
, start_deploy
, delete_timeout
, stage
,
1372 operation
="termination"
1375 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1376 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1378 await self
.RO
.delete(nsr_id
)
1379 except Exception as e
:
1380 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
1381 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1382 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1383 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1385 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1387 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
1388 failed_detail
.append("delete conflict: {}".format(e
))
1391 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1394 failed_detail
.append("delete error: {}".format(e
))
1397 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1401 stage
[2] = "Error deleting from VIM"
1403 stage
[2] = "Deleted from VIM"
1404 db_nsr_update
["detailed-status"] = " ".join(stage
)
1405 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1406 self
._write
_op
_status
(nslcmop_id
, stage
)
1409 raise LcmException("; ".join(failed_detail
))
1412 async def instantiate_RO(
1426 :param logging_text: preffix text to use at logging
1427 :param nsr_id: nsr identity
1428 :param nsd: database content of ns descriptor
1429 :param db_nsr: database content of ns record
1430 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1432 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1433 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1434 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1435 :return: None or exception
1438 start_deploy
= time()
1439 ns_params
= db_nslcmop
.get("operationParams")
1440 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1441 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1443 timeout_ns_deploy
= self
.timeout
.get(
1444 "ns_deploy", self
.timeout_ns_deploy
1447 # Check for and optionally request placement optimization. Database will be updated if placement activated
1448 stage
[2] = "Waiting for Placement."
1449 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1450 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1451 for vnfr
in db_vnfrs
.values():
1452 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1455 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1457 return await self
._instantiate
_ng
_ro
(
1470 except Exception as e
:
1471 stage
[2] = "ERROR deploying at VIM"
1472 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1474 "Error deploying at VIM {}".format(e
),
1475 exc_info
=not isinstance(
1478 ROclient
.ROClientException
,
1487 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1489 Wait for kdu to be up, get ip address
1490 :param logging_text: prefix use for logging
1494 :return: IP address, K8s services
1497 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1500 while nb_tries
< 360:
1501 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1505 for x
in get_iterable(db_vnfr
, "kdur")
1506 if x
.get("kdu-name") == kdu_name
1512 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1514 if kdur
.get("status"):
1515 if kdur
["status"] in ("READY", "ENABLED"):
1516 return kdur
.get("ip-address"), kdur
.get("services")
1519 "target KDU={} is in error state".format(kdu_name
)
1522 await asyncio
.sleep(10, loop
=self
.loop
)
1524 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1526 async def wait_vm_up_insert_key_ro(
1527 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1530 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1531 :param logging_text: prefix use for logging
1536 :param pub_key: public ssh key to inject, None to skip
1537 :param user: user to apply the public ssh key
1541 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1545 target_vdu_id
= None
1551 if ro_retries
>= 360: # 1 hour
1553 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1556 await asyncio
.sleep(10, loop
=self
.loop
)
1559 if not target_vdu_id
:
1560 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1562 if not vdu_id
: # for the VNF case
1563 if db_vnfr
.get("status") == "ERROR":
1565 "Cannot inject ssh-key because target VNF is in error state"
1567 ip_address
= db_vnfr
.get("ip-address")
1573 for x
in get_iterable(db_vnfr
, "vdur")
1574 if x
.get("ip-address") == ip_address
1582 for x
in get_iterable(db_vnfr
, "vdur")
1583 if x
.get("vdu-id-ref") == vdu_id
1584 and x
.get("count-index") == vdu_index
1590 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1591 ): # If only one, this should be the target vdu
1592 vdur
= db_vnfr
["vdur"][0]
1595 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1596 vnfr_id
, vdu_id
, vdu_index
1599 # New generation RO stores information at "vim_info"
1602 if vdur
.get("vim_info"):
1604 t
for t
in vdur
["vim_info"]
1605 ) # there should be only one key
1606 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1608 vdur
.get("pdu-type")
1609 or vdur
.get("status") == "ACTIVE"
1610 or ng_ro_status
== "ACTIVE"
1612 ip_address
= vdur
.get("ip-address")
1615 target_vdu_id
= vdur
["vdu-id-ref"]
1616 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1618 "Cannot inject ssh-key because target VM is in error state"
1621 if not target_vdu_id
:
1624 # inject public key into machine
1625 if pub_key
and user
:
1626 self
.logger
.debug(logging_text
+ "Inserting RO key")
1627 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1628 if vdur
.get("pdu-type"):
1629 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1632 ro_vm_id
= "{}-{}".format(
1633 db_vnfr
["member-vnf-index-ref"], target_vdu_id
1634 ) # TODO add vdu_index
1638 "action": "inject_ssh_key",
1642 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1644 desc
= await self
.RO
.deploy(nsr_id
, target
)
1645 action_id
= desc
["action_id"]
1646 await self
._wait
_ng
_ro
(nsr_id
, action_id
, timeout
=600, operation
="instantiation")
1649 # wait until NS is deployed at RO
1651 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1652 ro_nsr_id
= deep_get(
1653 db_nsrs
, ("_admin", "deployed", "RO", "nsr_id")
1657 result_dict
= await self
.RO
.create_action(
1659 item_id_name
=ro_nsr_id
,
1661 "add_public_key": pub_key
,
1666 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1667 if not result_dict
or not isinstance(result_dict
, dict):
1669 "Unknown response from RO when injecting key"
1671 for result
in result_dict
.values():
1672 if result
.get("vim_result") == 200:
1675 raise ROclient
.ROClientException(
1676 "error injecting key: {}".format(
1677 result
.get("description")
1681 except NgRoException
as e
:
1683 "Reaching max tries injecting key. Error: {}".format(e
)
1685 except ROclient
.ROClientException
as e
:
1689 + "error injecting key: {}. Retrying until {} seconds".format(
1696 "Reaching max tries injecting key. Error: {}".format(e
)
1703 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1705 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1707 my_vca
= vca_deployed_list
[vca_index
]
1708 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1709 # vdu or kdu: no dependencies
1713 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1714 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1715 configuration_status_list
= db_nsr
["configurationStatus"]
1716 for index
, vca_deployed
in enumerate(configuration_status_list
):
1717 if index
== vca_index
:
1720 if not my_vca
.get("member-vnf-index") or (
1721 vca_deployed
.get("member-vnf-index")
1722 == my_vca
.get("member-vnf-index")
1724 internal_status
= configuration_status_list
[index
].get("status")
1725 if internal_status
== "READY":
1727 elif internal_status
== "BROKEN":
1729 "Configuration aborted because dependent charm/s has failed"
1734 # no dependencies, return
1736 await asyncio
.sleep(10)
1739 raise LcmException("Configuration aborted because dependent charm/s timeout")
1741 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1744 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1746 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1747 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1750 async def instantiate_N2VC(
1767 ee_config_descriptor
,
1769 nsr_id
= db_nsr
["_id"]
1770 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1771 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1772 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1773 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1775 "collection": "nsrs",
1776 "filter": {"_id": nsr_id
},
1777 "path": db_update_entry
,
1783 element_under_configuration
= nsr_id
1787 vnfr_id
= db_vnfr
["_id"]
1788 osm_config
["osm"]["vnf_id"] = vnfr_id
1790 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1792 if vca_type
== "native_charm":
1795 index_number
= vdu_index
or 0
1798 element_type
= "VNF"
1799 element_under_configuration
= vnfr_id
1800 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1802 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1803 element_type
= "VDU"
1804 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1805 osm_config
["osm"]["vdu_id"] = vdu_id
1807 namespace
+= ".{}".format(kdu_name
)
1808 element_type
= "KDU"
1809 element_under_configuration
= kdu_name
1810 osm_config
["osm"]["kdu_name"] = kdu_name
1813 if base_folder
["pkg-dir"]:
1814 artifact_path
= "{}/{}/{}/{}".format(
1815 base_folder
["folder"],
1816 base_folder
["pkg-dir"],
1819 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1824 artifact_path
= "{}/Scripts/{}/{}/".format(
1825 base_folder
["folder"],
1828 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1833 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1835 # get initial_config_primitive_list that applies to this element
1836 initial_config_primitive_list
= config_descriptor
.get(
1837 "initial-config-primitive"
1841 "Initial config primitive list > {}".format(
1842 initial_config_primitive_list
1846 # add config if not present for NS charm
1847 ee_descriptor_id
= ee_config_descriptor
.get("id")
1848 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1849 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1850 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1854 "Initial config primitive list #2 > {}".format(
1855 initial_config_primitive_list
1858 # n2vc_redesign STEP 3.1
1859 # find old ee_id if exists
1860 ee_id
= vca_deployed
.get("ee_id")
1862 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1863 # create or register execution environment in VCA
1864 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1866 self
._write
_configuration
_status
(
1868 vca_index
=vca_index
,
1870 element_under_configuration
=element_under_configuration
,
1871 element_type
=element_type
,
1874 step
= "create execution environment"
1875 self
.logger
.debug(logging_text
+ step
)
1879 if vca_type
== "k8s_proxy_charm":
1880 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1881 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1882 namespace
=namespace
,
1883 artifact_path
=artifact_path
,
1887 elif vca_type
== "helm" or vca_type
== "helm-v3":
1888 ee_id
, credentials
= await self
.vca_map
[
1890 ].create_execution_environment(
1891 namespace
=namespace
,
1895 artifact_path
=artifact_path
,
1899 ee_id
, credentials
= await self
.vca_map
[
1901 ].create_execution_environment(
1902 namespace
=namespace
,
1908 elif vca_type
== "native_charm":
1909 step
= "Waiting to VM being up and getting IP address"
1910 self
.logger
.debug(logging_text
+ step
)
1911 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1920 credentials
= {"hostname": rw_mgmt_ip
}
1922 username
= deep_get(
1923 config_descriptor
, ("config-access", "ssh-access", "default-user")
1925 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1926 # merged. Meanwhile let's get username from initial-config-primitive
1927 if not username
and initial_config_primitive_list
:
1928 for config_primitive
in initial_config_primitive_list
:
1929 for param
in config_primitive
.get("parameter", ()):
1930 if param
["name"] == "ssh-username":
1931 username
= param
["value"]
1935 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1936 "'config-access.ssh-access.default-user'"
1938 credentials
["username"] = username
1939 # n2vc_redesign STEP 3.2
1941 self
._write
_configuration
_status
(
1943 vca_index
=vca_index
,
1944 status
="REGISTERING",
1945 element_under_configuration
=element_under_configuration
,
1946 element_type
=element_type
,
1949 step
= "register execution environment {}".format(credentials
)
1950 self
.logger
.debug(logging_text
+ step
)
1951 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1952 credentials
=credentials
,
1953 namespace
=namespace
,
1958 # for compatibility with MON/POL modules, the need model and application name at database
1959 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1960 ee_id_parts
= ee_id
.split(".")
1961 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1962 if len(ee_id_parts
) >= 2:
1963 model_name
= ee_id_parts
[0]
1964 application_name
= ee_id_parts
[1]
1965 db_nsr_update
[db_update_entry
+ "model"] = model_name
1966 db_nsr_update
[db_update_entry
+ "application"] = application_name
1968 # n2vc_redesign STEP 3.3
1969 step
= "Install configuration Software"
1971 self
._write
_configuration
_status
(
1973 vca_index
=vca_index
,
1974 status
="INSTALLING SW",
1975 element_under_configuration
=element_under_configuration
,
1976 element_type
=element_type
,
1977 other_update
=db_nsr_update
,
1980 # TODO check if already done
1981 self
.logger
.debug(logging_text
+ step
)
1983 if vca_type
== "native_charm":
1984 config_primitive
= next(
1985 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
1988 if config_primitive
:
1989 config
= self
._map
_primitive
_params
(
1990 config_primitive
, {}, deploy_params
1993 if vca_type
== "lxc_proxy_charm":
1994 if element_type
== "NS":
1995 num_units
= db_nsr
.get("config-units") or 1
1996 elif element_type
== "VNF":
1997 num_units
= db_vnfr
.get("config-units") or 1
1998 elif element_type
== "VDU":
1999 for v
in db_vnfr
["vdur"]:
2000 if vdu_id
== v
["vdu-id-ref"]:
2001 num_units
= v
.get("config-units") or 1
2003 if vca_type
!= "k8s_proxy_charm":
2004 await self
.vca_map
[vca_type
].install_configuration_sw(
2006 artifact_path
=artifact_path
,
2009 num_units
=num_units
,
2014 # write in db flag of configuration_sw already installed
2016 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
2019 # add relations for this VCA (wait for other peers related with this VCA)
2020 await self
._add
_vca
_relations
(
2021 logging_text
=logging_text
,
2024 vca_index
=vca_index
,
2027 # if SSH access is required, then get execution environment SSH public
2028 # if native charm we have waited already to VM be UP
2029 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
2032 # self.logger.debug("get ssh key block")
2034 config_descriptor
, ("config-access", "ssh-access", "required")
2036 # self.logger.debug("ssh key needed")
2037 # Needed to inject a ssh key
2040 ("config-access", "ssh-access", "default-user"),
2042 step
= "Install configuration Software, getting public ssh key"
2043 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
2044 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
2047 step
= "Insert public key into VM user={} ssh_key={}".format(
2051 # self.logger.debug("no need to get ssh key")
2052 step
= "Waiting to VM being up and getting IP address"
2053 self
.logger
.debug(logging_text
+ step
)
2055 # default rw_mgmt_ip to None, avoiding the non definition of the variable
2058 # n2vc_redesign STEP 5.1
2059 # wait for RO (ip-address) Insert pub_key into VM
2062 rw_mgmt_ip
, services
= await self
.wait_kdu_up(
2063 logging_text
, nsr_id
, vnfr_id
, kdu_name
2065 vnfd
= self
.db
.get_one(
2067 {"_id": f
'{db_vnfr["vnfd-id"]}:{db_vnfr["revision"]}'},
2069 kdu
= get_kdu(vnfd
, kdu_name
)
2071 service
["name"] for service
in get_kdu_services(kdu
)
2073 exposed_services
= []
2074 for service
in services
:
2075 if any(s
in service
["name"] for s
in kdu_services
):
2076 exposed_services
.append(service
)
2077 await self
.vca_map
[vca_type
].exec_primitive(
2079 primitive_name
="config",
2081 "osm-config": json
.dumps(
2083 k8s
={"services": exposed_services
}
2090 # This verification is needed in order to avoid trying to add a public key
2091 # to a VM, when the VNF is a KNF (in the edge case where the user creates a VCA
2092 # for a KNF and not for its KDUs, the previous verification gives False, and the code
2093 # jumps to this block, meaning that there is the need to verify if the VNF is actually a VNF
2095 elif db_vnfr
.get('vdur'):
2096 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
2106 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
2108 # store rw_mgmt_ip in deploy params for later replacement
2109 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
2111 # n2vc_redesign STEP 6 Execute initial config primitive
2112 step
= "execute initial config primitive"
2114 # wait for dependent primitives execution (NS -> VNF -> VDU)
2115 if initial_config_primitive_list
:
2116 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
2118 # stage, in function of element type: vdu, kdu, vnf or ns
2119 my_vca
= vca_deployed_list
[vca_index
]
2120 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
2122 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
2123 elif my_vca
.get("member-vnf-index"):
2125 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
2128 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
2130 self
._write
_configuration
_status
(
2131 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
2134 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2136 check_if_terminated_needed
= True
2137 for initial_config_primitive
in initial_config_primitive_list
:
2138 # adding information on the vca_deployed if it is a NS execution environment
2139 if not vca_deployed
["member-vnf-index"]:
2140 deploy_params
["ns_config_info"] = json
.dumps(
2141 self
._get
_ns
_config
_info
(nsr_id
)
2143 # TODO check if already done
2144 primitive_params_
= self
._map
_primitive
_params
(
2145 initial_config_primitive
, {}, deploy_params
2148 step
= "execute primitive '{}' params '{}'".format(
2149 initial_config_primitive
["name"], primitive_params_
2151 self
.logger
.debug(logging_text
+ step
)
2152 await self
.vca_map
[vca_type
].exec_primitive(
2154 primitive_name
=initial_config_primitive
["name"],
2155 params_dict
=primitive_params_
,
2160 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2161 if check_if_terminated_needed
:
2162 if config_descriptor
.get("terminate-config-primitive"):
2164 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
2166 check_if_terminated_needed
= False
2168 # TODO register in database that primitive is done
2170 # STEP 7 Configure metrics
2171 if vca_type
== "helm" or vca_type
== "helm-v3":
2172 prometheus_jobs
= await self
.extract_prometheus_scrape_jobs(
2174 artifact_path
=artifact_path
,
2175 ee_config_descriptor
=ee_config_descriptor
,
2178 target_ip
=rw_mgmt_ip
,
2184 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
2187 for job
in prometheus_jobs
:
2190 {"job_name": job
["job_name"]},
2193 fail_on_empty
=False,
2196 step
= "instantiated at VCA"
2197 self
.logger
.debug(logging_text
+ step
)
2199 self
._write
_configuration
_status
(
2200 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
2203 except Exception as e
: # TODO not use Exception but N2VC exception
2204 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2206 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
2209 "Exception while {} : {}".format(step
, e
), exc_info
=True
2211 self
._write
_configuration
_status
(
2212 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2214 raise LcmException("{} {}".format(step
, e
)) from e
2216 def _write_ns_status(
2220 current_operation
: str,
2221 current_operation_id
: str,
2222 error_description
: str = None,
2223 error_detail
: str = None,
2224 other_update
: dict = None,
2227 Update db_nsr fields.
2230 :param current_operation:
2231 :param current_operation_id:
2232 :param error_description:
2233 :param error_detail:
2234 :param other_update: Other required changes at database if provided, will be cleared
2238 db_dict
= other_update
or {}
2241 ] = current_operation_id
# for backward compatibility
2242 db_dict
["_admin.current-operation"] = current_operation_id
2243 db_dict
["_admin.operation-type"] = (
2244 current_operation
if current_operation
!= "IDLE" else None
2246 db_dict
["currentOperation"] = current_operation
2247 db_dict
["currentOperationID"] = current_operation_id
2248 db_dict
["errorDescription"] = error_description
2249 db_dict
["errorDetail"] = error_detail
2252 db_dict
["nsState"] = ns_state
2253 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2254 except DbException
as e
:
2255 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2257 def _write_op_status(
2261 error_message
: str = None,
2262 queuePosition
: int = 0,
2263 operation_state
: str = None,
2264 other_update
: dict = None,
2267 db_dict
= other_update
or {}
2268 db_dict
["queuePosition"] = queuePosition
2269 if isinstance(stage
, list):
2270 db_dict
["stage"] = stage
[0]
2271 db_dict
["detailed-status"] = " ".join(stage
)
2272 elif stage
is not None:
2273 db_dict
["stage"] = str(stage
)
2275 if error_message
is not None:
2276 db_dict
["errorMessage"] = error_message
2277 if operation_state
is not None:
2278 db_dict
["operationState"] = operation_state
2279 db_dict
["statusEnteredTime"] = time()
2280 self
.update_db_2("nslcmops", op_id
, db_dict
)
2281 except DbException
as e
:
2283 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2286 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2288 nsr_id
= db_nsr
["_id"]
2289 # configurationStatus
2290 config_status
= db_nsr
.get("configurationStatus")
2293 "configurationStatus.{}.status".format(index
): status
2294 for index
, v
in enumerate(config_status
)
2298 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2300 except DbException
as e
:
2302 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2305 def _write_configuration_status(
2310 element_under_configuration
: str = None,
2311 element_type
: str = None,
2312 other_update
: dict = None,
2315 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2316 # .format(vca_index, status))
2319 db_path
= "configurationStatus.{}.".format(vca_index
)
2320 db_dict
= other_update
or {}
2322 db_dict
[db_path
+ "status"] = status
2323 if element_under_configuration
:
2325 db_path
+ "elementUnderConfiguration"
2326 ] = element_under_configuration
2328 db_dict
[db_path
+ "elementType"] = element_type
2329 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2330 except DbException
as e
:
2332 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2333 status
, nsr_id
, vca_index
, e
2337 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2339 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2340 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2341 Database is used because the result can be obtained from a different LCM worker in case of HA.
2342 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2343 :param db_nslcmop: database content of nslcmop
2344 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2345 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2346 computed 'vim-account-id'
2349 nslcmop_id
= db_nslcmop
["_id"]
2350 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2351 if placement_engine
== "PLA":
2353 logging_text
+ "Invoke and wait for placement optimization"
2355 await self
.msg
.aiowrite(
2356 "pla", "get_placement", {"nslcmopId": nslcmop_id
}, loop
=self
.loop
2358 db_poll_interval
= 5
2359 wait
= db_poll_interval
* 10
2361 while not pla_result
and wait
>= 0:
2362 await asyncio
.sleep(db_poll_interval
)
2363 wait
-= db_poll_interval
2364 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2365 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2369 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2372 for pla_vnf
in pla_result
["vnf"]:
2373 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2374 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2379 {"_id": vnfr
["_id"]},
2380 {"vim-account-id": pla_vnf
["vimAccountId"]},
2383 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2386 def update_nsrs_with_pla_result(self
, params
):
2388 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2390 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2392 except Exception as e
:
2393 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2395 async def instantiate(self
, nsr_id
, nslcmop_id
):
2398 :param nsr_id: ns instance to deploy
2399 :param nslcmop_id: operation to run
2403 # Try to lock HA task here
2404 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2405 if not task_is_locked_by_me
:
2407 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2411 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2412 self
.logger
.debug(logging_text
+ "Enter")
2414 # get all needed from database
2416 # database nsrs record
2419 # database nslcmops record
2422 # update operation on nsrs
2424 # update operation on nslcmops
2425 db_nslcmop_update
= {}
2427 nslcmop_operation_state
= None
2428 db_vnfrs
= {} # vnf's info indexed by member-index
2430 tasks_dict_info
= {} # from task to info text
2434 "Stage 1/5: preparation of the environment.",
2435 "Waiting for previous operations to terminate.",
2438 # ^ stage, step, VIM progress
2440 # wait for any previous tasks in process
2441 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2443 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2444 stage
[1] = "Reading from database."
2445 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2446 db_nsr_update
["detailed-status"] = "creating"
2447 db_nsr_update
["operational-status"] = "init"
2448 self
._write
_ns
_status
(
2450 ns_state
="BUILDING",
2451 current_operation
="INSTANTIATING",
2452 current_operation_id
=nslcmop_id
,
2453 other_update
=db_nsr_update
,
2455 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2457 # read from db: operation
2458 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2459 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2460 if db_nslcmop
["operationParams"].get("additionalParamsForVnf"):
2461 db_nslcmop
["operationParams"]["additionalParamsForVnf"] = json
.loads(
2462 db_nslcmop
["operationParams"]["additionalParamsForVnf"]
2464 ns_params
= db_nslcmop
.get("operationParams")
2465 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2466 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2468 timeout_ns_deploy
= self
.timeout
.get(
2469 "ns_deploy", self
.timeout_ns_deploy
2473 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2474 self
.logger
.debug(logging_text
+ stage
[1])
2475 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2476 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2477 self
.logger
.debug(logging_text
+ stage
[1])
2478 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2479 self
.fs
.sync(db_nsr
["nsd-id"])
2481 # nsr_name = db_nsr["name"] # TODO short-name??
2483 # read from db: vnf's of this ns
2484 stage
[1] = "Getting vnfrs from db."
2485 self
.logger
.debug(logging_text
+ stage
[1])
2486 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2488 # read from db: vnfd's for every vnf
2489 db_vnfds
= [] # every vnfd data
2491 # for each vnf in ns, read vnfd
2492 for vnfr
in db_vnfrs_list
:
2493 if vnfr
.get("kdur"):
2495 for kdur
in vnfr
["kdur"]:
2496 if kdur
.get("additionalParams"):
2497 kdur
["additionalParams"] = json
.loads(
2498 kdur
["additionalParams"]
2500 kdur_list
.append(kdur
)
2501 vnfr
["kdur"] = kdur_list
2503 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2504 vnfd_id
= vnfr
["vnfd-id"]
2505 vnfd_ref
= vnfr
["vnfd-ref"]
2506 self
.fs
.sync(vnfd_id
)
2508 # if we haven't this vnfd, read it from db
2509 if vnfd_id
not in db_vnfds
:
2511 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2514 self
.logger
.debug(logging_text
+ stage
[1])
2515 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2518 db_vnfds
.append(vnfd
)
2520 # Get or generates the _admin.deployed.VCA list
2521 vca_deployed_list
= None
2522 if db_nsr
["_admin"].get("deployed"):
2523 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2524 if vca_deployed_list
is None:
2525 vca_deployed_list
= []
2526 configuration_status_list
= []
2527 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2528 db_nsr_update
["configurationStatus"] = configuration_status_list
2529 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2530 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2531 elif isinstance(vca_deployed_list
, dict):
2532 # maintain backward compatibility. Change a dict to list at database
2533 vca_deployed_list
= list(vca_deployed_list
.values())
2534 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2535 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2538 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2540 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2541 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2543 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2544 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2545 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2547 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2550 # n2vc_redesign STEP 2 Deploy Network Scenario
2551 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2552 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2554 stage
[1] = "Deploying KDUs."
2555 # self.logger.debug(logging_text + "Before deploy_kdus")
2556 # Call to deploy_kdus in case exists the "vdu:kdu" param
2557 await self
.deploy_kdus(
2558 logging_text
=logging_text
,
2560 nslcmop_id
=nslcmop_id
,
2563 task_instantiation_info
=tasks_dict_info
,
2566 stage
[1] = "Getting VCA public key."
2567 # n2vc_redesign STEP 1 Get VCA public ssh-key
2568 # feature 1429. Add n2vc public key to needed VMs
2569 n2vc_key
= self
.n2vc
.get_public_key()
2570 n2vc_key_list
= [n2vc_key
]
2571 if self
.vca_config
.get("public_key"):
2572 n2vc_key_list
.append(self
.vca_config
["public_key"])
2574 stage
[1] = "Deploying NS at VIM."
2575 task_ro
= asyncio
.ensure_future(
2576 self
.instantiate_RO(
2577 logging_text
=logging_text
,
2581 db_nslcmop
=db_nslcmop
,
2584 n2vc_key_list
=n2vc_key_list
,
2588 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2589 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2591 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2592 stage
[1] = "Deploying Execution Environments."
2593 self
.logger
.debug(logging_text
+ stage
[1])
2595 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2596 for vnf_profile
in get_vnf_profiles(nsd
):
2597 vnfd_id
= vnf_profile
["vnfd-id"]
2598 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2599 member_vnf_index
= str(vnf_profile
["id"])
2600 db_vnfr
= db_vnfrs
[member_vnf_index
]
2601 base_folder
= vnfd
["_admin"]["storage"]
2607 # Get additional parameters
2608 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2609 if db_vnfr
.get("additionalParamsForVnf"):
2610 deploy_params
.update(
2611 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2614 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2615 if descriptor_config
:
2617 logging_text
=logging_text
2618 + "member_vnf_index={} ".format(member_vnf_index
),
2621 nslcmop_id
=nslcmop_id
,
2627 member_vnf_index
=member_vnf_index
,
2628 vdu_index
=vdu_index
,
2630 deploy_params
=deploy_params
,
2631 descriptor_config
=descriptor_config
,
2632 base_folder
=base_folder
,
2633 task_instantiation_info
=tasks_dict_info
,
2637 # Deploy charms for each VDU that supports one.
2638 for vdud
in get_vdu_list(vnfd
):
2640 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2641 vdur
= find_in_list(
2642 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2645 if vdur
.get("additionalParams"):
2646 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2648 deploy_params_vdu
= deploy_params
2649 deploy_params_vdu
["OSM"] = get_osm_params(
2650 db_vnfr
, vdu_id
, vdu_count_index
=0
2652 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2654 self
.logger
.debug("VDUD > {}".format(vdud
))
2656 "Descriptor config > {}".format(descriptor_config
)
2658 if descriptor_config
:
2661 for vdu_index
in range(vdud_count
):
2662 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2664 logging_text
=logging_text
2665 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2666 member_vnf_index
, vdu_id
, vdu_index
2670 nslcmop_id
=nslcmop_id
,
2676 member_vnf_index
=member_vnf_index
,
2677 vdu_index
=vdu_index
,
2679 deploy_params
=deploy_params_vdu
,
2680 descriptor_config
=descriptor_config
,
2681 base_folder
=base_folder
,
2682 task_instantiation_info
=tasks_dict_info
,
2685 for kdud
in get_kdu_list(vnfd
):
2686 kdu_name
= kdud
["name"]
2687 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2688 if descriptor_config
:
2693 x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
2695 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2696 if kdur
.get("additionalParams"):
2697 deploy_params_kdu
.update(
2698 parse_yaml_strings(kdur
["additionalParams"].copy())
2702 logging_text
=logging_text
,
2705 nslcmop_id
=nslcmop_id
,
2711 member_vnf_index
=member_vnf_index
,
2712 vdu_index
=vdu_index
,
2714 deploy_params
=deploy_params_kdu
,
2715 descriptor_config
=descriptor_config
,
2716 base_folder
=base_folder
,
2717 task_instantiation_info
=tasks_dict_info
,
2721 # Check if this NS has a charm configuration
2722 descriptor_config
= nsd
.get("ns-configuration")
2723 if descriptor_config
and descriptor_config
.get("juju"):
2726 member_vnf_index
= None
2732 # Get additional parameters
2733 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2734 if db_nsr
.get("additionalParamsForNs"):
2735 deploy_params
.update(
2736 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2738 base_folder
= nsd
["_admin"]["storage"]
2740 logging_text
=logging_text
,
2743 nslcmop_id
=nslcmop_id
,
2749 member_vnf_index
=member_vnf_index
,
2750 vdu_index
=vdu_index
,
2752 deploy_params
=deploy_params
,
2753 descriptor_config
=descriptor_config
,
2754 base_folder
=base_folder
,
2755 task_instantiation_info
=tasks_dict_info
,
2759 # rest of staff will be done at finally
2762 ROclient
.ROClientException
,
2768 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
2771 except asyncio
.CancelledError
:
2773 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
2775 exc
= "Operation was cancelled"
2776 except Exception as e
:
2777 exc
= traceback
.format_exc()
2778 self
.logger
.critical(
2779 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
2784 error_list
.append(str(exc
))
2786 # wait for pending tasks
2788 stage
[1] = "Waiting for instantiate pending tasks."
2789 self
.logger
.debug(logging_text
+ stage
[1])
2790 error_list
+= await self
._wait
_for
_tasks
(
2798 stage
[1] = stage
[2] = ""
2799 except asyncio
.CancelledError
:
2800 error_list
.append("Cancelled")
2801 # TODO cancel all tasks
2802 except Exception as exc
:
2803 error_list
.append(str(exc
))
2805 # update operation-status
2806 db_nsr_update
["operational-status"] = "running"
2807 # let's begin with VCA 'configured' status (later we can change it)
2808 db_nsr_update
["config-status"] = "configured"
2809 for task
, task_name
in tasks_dict_info
.items():
2810 if not task
.done() or task
.cancelled() or task
.exception():
2811 if task_name
.startswith(self
.task_name_deploy_vca
):
2812 # A N2VC task is pending
2813 db_nsr_update
["config-status"] = "failed"
2815 # RO or KDU task is pending
2816 db_nsr_update
["operational-status"] = "failed"
2818 # update status at database
2820 error_detail
= ". ".join(error_list
)
2821 self
.logger
.error(logging_text
+ error_detail
)
2822 error_description_nslcmop
= "{} Detail: {}".format(
2823 stage
[0], error_detail
2825 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
2826 nslcmop_id
, stage
[0]
2829 db_nsr_update
["detailed-status"] = (
2830 error_description_nsr
+ " Detail: " + error_detail
2832 db_nslcmop_update
["detailed-status"] = error_detail
2833 nslcmop_operation_state
= "FAILED"
2837 error_description_nsr
= error_description_nslcmop
= None
2839 db_nsr_update
["detailed-status"] = "Done"
2840 db_nslcmop_update
["detailed-status"] = "Done"
2841 nslcmop_operation_state
= "COMPLETED"
2844 self
._write
_ns
_status
(
2847 current_operation
="IDLE",
2848 current_operation_id
=None,
2849 error_description
=error_description_nsr
,
2850 error_detail
=error_detail
,
2851 other_update
=db_nsr_update
,
2853 self
._write
_op
_status
(
2856 error_message
=error_description_nslcmop
,
2857 operation_state
=nslcmop_operation_state
,
2858 other_update
=db_nslcmop_update
,
2861 if nslcmop_operation_state
:
2863 await self
.msg
.aiowrite(
2868 "nslcmop_id": nslcmop_id
,
2869 "operationState": nslcmop_operation_state
,
2873 except Exception as e
:
2875 logging_text
+ "kafka_write notification Exception {}".format(e
)
2878 self
.logger
.debug(logging_text
+ "Exit")
2879 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2881 def _get_vnfd(self
, vnfd_id
: str, cached_vnfds
: Dict
[str, Any
]):
2882 if vnfd_id
not in cached_vnfds
:
2883 cached_vnfds
[vnfd_id
] = self
.db
.get_one("vnfds", {"id": vnfd_id
})
2884 return cached_vnfds
[vnfd_id
]
2886 def _get_vnfr(self
, nsr_id
: str, vnf_profile_id
: str, cached_vnfrs
: Dict
[str, Any
]):
2887 if vnf_profile_id
not in cached_vnfrs
:
2888 cached_vnfrs
[vnf_profile_id
] = self
.db
.get_one(
2891 "member-vnf-index-ref": vnf_profile_id
,
2892 "nsr-id-ref": nsr_id
,
2895 return cached_vnfrs
[vnf_profile_id
]
2897 def _is_deployed_vca_in_relation(
2898 self
, vca
: DeployedVCA
, relation
: Relation
2901 for endpoint
in (relation
.provider
, relation
.requirer
):
2902 if endpoint
["kdu-resource-profile-id"]:
2905 vca
.vnf_profile_id
== endpoint
.vnf_profile_id
2906 and vca
.vdu_profile_id
== endpoint
.vdu_profile_id
2907 and vca
.execution_environment_ref
== endpoint
.execution_environment_ref
2913 def _update_ee_relation_data_with_implicit_data(
2914 self
, nsr_id
, nsd
, ee_relation_data
, cached_vnfds
, vnf_profile_id
: str = None
2916 ee_relation_data
= safe_get_ee_relation(
2917 nsr_id
, ee_relation_data
, vnf_profile_id
=vnf_profile_id
2919 ee_relation_level
= EELevel
.get_level(ee_relation_data
)
2920 if (ee_relation_level
in (EELevel
.VNF
, EELevel
.VDU
)) and not ee_relation_data
[
2921 "execution-environment-ref"
2923 vnf_profile
= get_vnf_profile(nsd
, ee_relation_data
["vnf-profile-id"])
2924 vnfd_id
= vnf_profile
["vnfd-id"]
2925 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2928 if ee_relation_level
== EELevel
.VNF
2929 else ee_relation_data
["vdu-profile-id"]
2931 ee
= get_juju_ee_ref(db_vnfd
, entity_id
)
2934 f
"not execution environments found for ee_relation {ee_relation_data}"
2936 ee_relation_data
["execution-environment-ref"] = ee
["id"]
2937 return ee_relation_data
2939 def _get_ns_relations(
2942 nsd
: Dict
[str, Any
],
2944 cached_vnfds
: Dict
[str, Any
],
2945 ) -> List
[Relation
]:
2947 db_ns_relations
= get_ns_configuration_relation_list(nsd
)
2948 for r
in db_ns_relations
:
2949 provider_dict
= None
2950 requirer_dict
= None
2951 if all(key
in r
for key
in ("provider", "requirer")):
2952 provider_dict
= r
["provider"]
2953 requirer_dict
= r
["requirer"]
2954 elif "entities" in r
:
2955 provider_id
= r
["entities"][0]["id"]
2958 "endpoint": r
["entities"][0]["endpoint"],
2960 if provider_id
!= nsd
["id"]:
2961 provider_dict
["vnf-profile-id"] = provider_id
2962 requirer_id
= r
["entities"][1]["id"]
2965 "endpoint": r
["entities"][1]["endpoint"],
2967 if requirer_id
!= nsd
["id"]:
2968 requirer_dict
["vnf-profile-id"] = requirer_id
2971 "provider/requirer or entities must be included in the relation."
2973 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2974 nsr_id
, nsd
, provider_dict
, cached_vnfds
2976 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2977 nsr_id
, nsd
, requirer_dict
, cached_vnfds
2979 provider
= EERelation(relation_provider
)
2980 requirer
= EERelation(relation_requirer
)
2981 relation
= Relation(r
["name"], provider
, requirer
)
2982 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
2984 relations
.append(relation
)
2987 def _get_vnf_relations(
2990 nsd
: Dict
[str, Any
],
2992 cached_vnfds
: Dict
[str, Any
],
2993 ) -> List
[Relation
]:
2995 vnf_profile
= get_vnf_profile(nsd
, vca
.vnf_profile_id
)
2996 vnf_profile_id
= vnf_profile
["id"]
2997 vnfd_id
= vnf_profile
["vnfd-id"]
2998 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2999 db_vnf_relations
= get_relation_list(db_vnfd
, vnfd_id
)
3000 for r
in db_vnf_relations
:
3001 provider_dict
= None
3002 requirer_dict
= None
3003 if all(key
in r
for key
in ("provider", "requirer")):
3004 provider_dict
= r
["provider"]
3005 requirer_dict
= r
["requirer"]
3006 elif "entities" in r
:
3007 provider_id
= r
["entities"][0]["id"]
3010 "vnf-profile-id": vnf_profile_id
,
3011 "endpoint": r
["entities"][0]["endpoint"],
3013 if provider_id
!= vnfd_id
:
3014 provider_dict
["vdu-profile-id"] = provider_id
3015 requirer_id
= r
["entities"][1]["id"]
3018 "vnf-profile-id": vnf_profile_id
,
3019 "endpoint": r
["entities"][1]["endpoint"],
3021 if requirer_id
!= vnfd_id
:
3022 requirer_dict
["vdu-profile-id"] = requirer_id
3025 "provider/requirer or entities must be included in the relation."
3027 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3028 nsr_id
, nsd
, provider_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3030 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3031 nsr_id
, nsd
, requirer_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3033 provider
= EERelation(relation_provider
)
3034 requirer
= EERelation(relation_requirer
)
3035 relation
= Relation(r
["name"], provider
, requirer
)
3036 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
3038 relations
.append(relation
)
3041 def _get_kdu_resource_data(
3043 ee_relation
: EERelation
,
3044 db_nsr
: Dict
[str, Any
],
3045 cached_vnfds
: Dict
[str, Any
],
3046 ) -> DeployedK8sResource
:
3047 nsd
= get_nsd(db_nsr
)
3048 vnf_profiles
= get_vnf_profiles(nsd
)
3049 vnfd_id
= find_in_list(
3051 lambda vnf_profile
: vnf_profile
["id"] == ee_relation
.vnf_profile_id
,
3053 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
3054 kdu_resource_profile
= get_kdu_resource_profile(
3055 db_vnfd
, ee_relation
.kdu_resource_profile_id
3057 kdu_name
= kdu_resource_profile
["kdu-name"]
3058 deployed_kdu
, _
= get_deployed_kdu(
3059 db_nsr
.get("_admin", ()).get("deployed", ()),
3061 ee_relation
.vnf_profile_id
,
3063 deployed_kdu
.update({"resource-name": kdu_resource_profile
["resource-name"]})
3066 def _get_deployed_component(
3068 ee_relation
: EERelation
,
3069 db_nsr
: Dict
[str, Any
],
3070 cached_vnfds
: Dict
[str, Any
],
3071 ) -> DeployedComponent
:
3072 nsr_id
= db_nsr
["_id"]
3073 deployed_component
= None
3074 ee_level
= EELevel
.get_level(ee_relation
)
3075 if ee_level
== EELevel
.NS
:
3076 vca
= get_deployed_vca(db_nsr
, {"vdu_id": None, "member-vnf-index": None})
3078 deployed_component
= DeployedVCA(nsr_id
, vca
)
3079 elif ee_level
== EELevel
.VNF
:
3080 vca
= get_deployed_vca(
3084 "member-vnf-index": ee_relation
.vnf_profile_id
,
3085 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3089 deployed_component
= DeployedVCA(nsr_id
, vca
)
3090 elif ee_level
== EELevel
.VDU
:
3091 vca
= get_deployed_vca(
3094 "vdu_id": ee_relation
.vdu_profile_id
,
3095 "member-vnf-index": ee_relation
.vnf_profile_id
,
3096 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3100 deployed_component
= DeployedVCA(nsr_id
, vca
)
3101 elif ee_level
== EELevel
.KDU
:
3102 kdu_resource_data
= self
._get
_kdu
_resource
_data
(
3103 ee_relation
, db_nsr
, cached_vnfds
3105 if kdu_resource_data
:
3106 deployed_component
= DeployedK8sResource(kdu_resource_data
)
3107 return deployed_component
3109 async def _add_relation(
3113 db_nsr
: Dict
[str, Any
],
3114 cached_vnfds
: Dict
[str, Any
],
3115 cached_vnfrs
: Dict
[str, Any
],
3117 deployed_provider
= self
._get
_deployed
_component
(
3118 relation
.provider
, db_nsr
, cached_vnfds
3120 deployed_requirer
= self
._get
_deployed
_component
(
3121 relation
.requirer
, db_nsr
, cached_vnfds
3125 and deployed_requirer
3126 and deployed_provider
.config_sw_installed
3127 and deployed_requirer
.config_sw_installed
3129 provider_db_vnfr
= (
3131 relation
.provider
.nsr_id
,
3132 relation
.provider
.vnf_profile_id
,
3135 if relation
.provider
.vnf_profile_id
3138 requirer_db_vnfr
= (
3140 relation
.requirer
.nsr_id
,
3141 relation
.requirer
.vnf_profile_id
,
3144 if relation
.requirer
.vnf_profile_id
3147 provider_vca_id
= self
.get_vca_id(provider_db_vnfr
, db_nsr
)
3148 requirer_vca_id
= self
.get_vca_id(requirer_db_vnfr
, db_nsr
)
3149 provider_relation_endpoint
= RelationEndpoint(
3150 deployed_provider
.ee_id
,
3152 relation
.provider
.endpoint
,
3154 requirer_relation_endpoint
= RelationEndpoint(
3155 deployed_requirer
.ee_id
,
3157 relation
.requirer
.endpoint
,
3159 await self
.vca_map
[vca_type
].add_relation(
3160 provider
=provider_relation_endpoint
,
3161 requirer
=requirer_relation_endpoint
,
3163 # remove entry from relations list
3167 async def _add_vca_relations(
3173 timeout
: int = 3600,
3177 # 1. find all relations for this VCA
3178 # 2. wait for other peers related
3182 # STEP 1: find all relations for this VCA
3185 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3186 nsd
= get_nsd(db_nsr
)
3189 deployed_vca_dict
= get_deployed_vca_list(db_nsr
)[vca_index
]
3190 my_vca
= DeployedVCA(nsr_id
, deployed_vca_dict
)
3195 relations
.extend(self
._get
_ns
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3196 relations
.extend(self
._get
_vnf
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3198 # if no relations, terminate
3200 self
.logger
.debug(logging_text
+ " No relations")
3203 self
.logger
.debug(logging_text
+ " adding relations {}".format(relations
))
3210 if now
- start
>= timeout
:
3211 self
.logger
.error(logging_text
+ " : timeout adding relations")
3214 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3215 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3217 # for each relation, find the VCA's related
3218 for relation
in relations
.copy():
3219 added
= await self
._add
_relation
(
3227 relations
.remove(relation
)
3230 self
.logger
.debug("Relations added")
3232 await asyncio
.sleep(5.0)
3236 except Exception as e
:
3237 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
3240 async def _install_kdu(
3248 k8s_instance_info
: dict,
3249 k8params
: dict = None,
3255 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
3258 "collection": "nsrs",
3259 "filter": {"_id": nsr_id
},
3260 "path": nsr_db_path
,
3263 if k8s_instance_info
.get("kdu-deployment-name"):
3264 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
3266 kdu_instance
= self
.k8scluster_map
[
3268 ].generate_kdu_instance_name(
3269 db_dict
=db_dict_install
,
3270 kdu_model
=k8s_instance_info
["kdu-model"],
3271 kdu_name
=k8s_instance_info
["kdu-name"],
3274 # Update the nsrs table with the kdu-instance value
3278 _desc
={nsr_db_path
+ ".kdu-instance": kdu_instance
},
3281 # Update the nsrs table with the actual namespace being used, if the k8scluster-type is `juju` or
3282 # `juju-bundle`. This verification is needed because there is not a standard/homogeneous namespace
3283 # between the Helm Charts and Juju Bundles-based KNFs. If we found a way of having an homogeneous
3284 # namespace, this first verification could be removed, and the next step would be done for any kind
3286 # TODO -> find a way to have an homogeneous namespace between the Helm Charts and Juju Bundles-based
3287 # KNFs (Bug 2027: https://osm.etsi.org/bugzilla/show_bug.cgi?id=2027)
3288 if k8sclustertype
in ("juju", "juju-bundle"):
3289 # First, verify if the current namespace is present in the `_admin.projects_read` (if not, it means
3290 # that the user passed a namespace which he wants its KDU to be deployed in)
3296 "_admin.projects_write": k8s_instance_info
["namespace"],
3297 "_admin.projects_read": k8s_instance_info
["namespace"],
3303 f
"Updating namespace/model for Juju Bundle from {k8s_instance_info['namespace']} to {kdu_instance}"
3308 _desc
={f
"{nsr_db_path}.namespace": kdu_instance
},
3310 k8s_instance_info
["namespace"] = kdu_instance
3312 await self
.k8scluster_map
[k8sclustertype
].install(
3313 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3314 kdu_model
=k8s_instance_info
["kdu-model"],
3317 db_dict
=db_dict_install
,
3319 kdu_name
=k8s_instance_info
["kdu-name"],
3320 namespace
=k8s_instance_info
["namespace"],
3321 kdu_instance
=kdu_instance
,
3325 # Obtain services to obtain management service ip
3326 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
3327 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3328 kdu_instance
=kdu_instance
,
3329 namespace
=k8s_instance_info
["namespace"],
3332 # Obtain management service info (if exists)
3333 vnfr_update_dict
= {}
3334 kdu_config
= get_configuration(vnfd
, kdud
["name"])
3336 target_ee_list
= kdu_config
.get("execution-environment-list", [])
3341 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
3344 for service
in kdud
.get("service", [])
3345 if service
.get("mgmt-service")
3347 for mgmt_service
in mgmt_services
:
3348 for service
in services
:
3349 if service
["name"].startswith(mgmt_service
["name"]):
3350 # Mgmt service found, Obtain service ip
3351 ip
= service
.get("external_ip", service
.get("cluster_ip"))
3352 if isinstance(ip
, list) and len(ip
) == 1:
3356 "kdur.{}.ip-address".format(kdu_index
)
3359 # Check if must update also mgmt ip at the vnf
3360 service_external_cp
= mgmt_service
.get(
3361 "external-connection-point-ref"
3363 if service_external_cp
:
3365 deep_get(vnfd
, ("mgmt-interface", "cp"))
3366 == service_external_cp
3368 vnfr_update_dict
["ip-address"] = ip
3373 "external-connection-point-ref", ""
3375 == service_external_cp
,
3378 "kdur.{}.ip-address".format(kdu_index
)
3383 "Mgmt service name: {} not found".format(
3384 mgmt_service
["name"]
3388 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3389 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3391 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3394 and kdu_config
.get("initial-config-primitive")
3395 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3397 initial_config_primitive_list
= kdu_config
.get(
3398 "initial-config-primitive"
3400 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3402 for initial_config_primitive
in initial_config_primitive_list
:
3403 primitive_params_
= self
._map
_primitive
_params
(
3404 initial_config_primitive
, {}, {}
3407 await asyncio
.wait_for(
3408 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3409 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3410 kdu_instance
=kdu_instance
,
3411 primitive_name
=initial_config_primitive
["name"],
3412 params
=primitive_params_
,
3413 db_dict
=db_dict_install
,
3419 except Exception as e
:
3420 # Prepare update db with error and raise exception
3423 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3427 vnfr_data
.get("_id"),
3428 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3431 # ignore to keep original exception
3433 # reraise original error
3438 async def deploy_kdus(
3445 task_instantiation_info
,
3447 # Launch kdus if present in the descriptor
3449 k8scluster_id_2_uuic
= {
3450 "helm-chart-v3": {},
3455 async def _get_cluster_id(cluster_id
, cluster_type
):
3456 nonlocal k8scluster_id_2_uuic
3457 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3458 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3460 # check if K8scluster is creating and wait look if previous tasks in process
3461 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3462 "k8scluster", cluster_id
3465 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3466 task_name
, cluster_id
3468 self
.logger
.debug(logging_text
+ text
)
3469 await asyncio
.wait(task_dependency
, timeout
=3600)
3471 db_k8scluster
= self
.db
.get_one(
3472 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3474 if not db_k8scluster
:
3475 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3477 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3479 if cluster_type
== "helm-chart-v3":
3481 # backward compatibility for existing clusters that have not been initialized for helm v3
3482 k8s_credentials
= yaml
.safe_dump(
3483 db_k8scluster
.get("credentials")
3485 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3486 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3488 db_k8scluster_update
= {}
3489 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3490 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3491 db_k8scluster_update
[
3492 "_admin.helm-chart-v3.created"
3494 db_k8scluster_update
[
3495 "_admin.helm-chart-v3.operationalState"
3498 "k8sclusters", cluster_id
, db_k8scluster_update
3500 except Exception as e
:
3503 + "error initializing helm-v3 cluster: {}".format(str(e
))
3506 "K8s cluster '{}' has not been initialized for '{}'".format(
3507 cluster_id
, cluster_type
3512 "K8s cluster '{}' has not been initialized for '{}'".format(
3513 cluster_id
, cluster_type
3516 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3519 logging_text
+= "Deploy kdus: "
3522 db_nsr_update
= {"_admin.deployed.K8s": []}
3523 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3526 updated_cluster_list
= []
3527 updated_v3_cluster_list
= []
3529 for vnfr_data
in db_vnfrs
.values():
3530 vca_id
= self
.get_vca_id(vnfr_data
, {})
3531 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3532 # Step 0: Prepare and set parameters
3533 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3534 vnfd_id
= vnfr_data
.get("vnfd-id")
3535 vnfd_with_id
= find_in_list(
3536 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3540 for kdud
in vnfd_with_id
["kdu"]
3541 if kdud
["name"] == kdur
["kdu-name"]
3543 namespace
= kdur
.get("k8s-namespace")
3544 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3545 if kdur
.get("helm-chart"):
3546 kdumodel
= kdur
["helm-chart"]
3547 # Default version: helm3, if helm-version is v2 assign v2
3548 k8sclustertype
= "helm-chart-v3"
3549 self
.logger
.debug("kdur: {}".format(kdur
))
3551 kdur
.get("helm-version")
3552 and kdur
.get("helm-version") == "v2"
3554 k8sclustertype
= "helm-chart"
3555 elif kdur
.get("juju-bundle"):
3556 kdumodel
= kdur
["juju-bundle"]
3557 k8sclustertype
= "juju-bundle"
3560 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3561 "juju-bundle. Maybe an old NBI version is running".format(
3562 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3565 # check if kdumodel is a file and exists
3567 vnfd_with_id
= find_in_list(
3568 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3570 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3571 if storage
: # may be not present if vnfd has not artifacts
3572 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3573 if storage
["pkg-dir"]:
3574 filename
= "{}/{}/{}s/{}".format(
3581 filename
= "{}/Scripts/{}s/{}".format(
3586 if self
.fs
.file_exists(
3587 filename
, mode
="file"
3588 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3589 kdumodel
= self
.fs
.path
+ filename
3590 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3592 except Exception: # it is not a file
3595 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3596 step
= "Synchronize repos for k8s cluster '{}'".format(
3599 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3603 k8sclustertype
== "helm-chart"
3604 and cluster_uuid
not in updated_cluster_list
3606 k8sclustertype
== "helm-chart-v3"
3607 and cluster_uuid
not in updated_v3_cluster_list
3609 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3610 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3611 cluster_uuid
=cluster_uuid
3614 if del_repo_list
or added_repo_dict
:
3615 if k8sclustertype
== "helm-chart":
3617 "_admin.helm_charts_added." + item
: None
3618 for item
in del_repo_list
3621 "_admin.helm_charts_added." + item
: name
3622 for item
, name
in added_repo_dict
.items()
3624 updated_cluster_list
.append(cluster_uuid
)
3625 elif k8sclustertype
== "helm-chart-v3":
3627 "_admin.helm_charts_v3_added." + item
: None
3628 for item
in del_repo_list
3631 "_admin.helm_charts_v3_added." + item
: name
3632 for item
, name
in added_repo_dict
.items()
3634 updated_v3_cluster_list
.append(cluster_uuid
)
3636 logging_text
+ "repos synchronized on k8s cluster "
3637 "'{}' to_delete: {}, to_add: {}".format(
3638 k8s_cluster_id
, del_repo_list
, added_repo_dict
3643 {"_id": k8s_cluster_id
},
3649 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3650 vnfr_data
["member-vnf-index-ref"],
3654 k8s_instance_info
= {
3655 "kdu-instance": None,
3656 "k8scluster-uuid": cluster_uuid
,
3657 "k8scluster-type": k8sclustertype
,
3658 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3659 "kdu-name": kdur
["kdu-name"],
3660 "kdu-model": kdumodel
,
3661 "namespace": namespace
,
3662 "kdu-deployment-name": kdu_deployment_name
,
3664 db_path
= "_admin.deployed.K8s.{}".format(index
)
3665 db_nsr_update
[db_path
] = k8s_instance_info
3666 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3667 vnfd_with_id
= find_in_list(
3668 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3670 task
= asyncio
.ensure_future(
3679 k8params
=desc_params
,
3684 self
.lcm_tasks
.register(
3688 "instantiate_KDU-{}".format(index
),
3691 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3697 except (LcmException
, asyncio
.CancelledError
):
3699 except Exception as e
:
3700 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3701 if isinstance(e
, (N2VCException
, DbException
)):
3702 self
.logger
.error(logging_text
+ msg
)
3704 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3705 raise LcmException(msg
)
3708 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3727 task_instantiation_info
,
3730 # launch instantiate_N2VC in a asyncio task and register task object
3731 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3732 # if not found, create one entry and update database
3733 # fill db_nsr._admin.deployed.VCA.<index>
3736 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
3738 if "execution-environment-list" in descriptor_config
:
3739 ee_list
= descriptor_config
.get("execution-environment-list", [])
3740 elif "juju" in descriptor_config
:
3741 ee_list
= [descriptor_config
] # ns charms
3742 else: # other types as script are not supported
3745 for ee_item
in ee_list
:
3748 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3749 ee_item
.get("juju"), ee_item
.get("helm-chart")
3752 ee_descriptor_id
= ee_item
.get("id")
3753 if ee_item
.get("juju"):
3754 vca_name
= ee_item
["juju"].get("charm")
3757 if ee_item
["juju"].get("charm") is not None
3760 if ee_item
["juju"].get("cloud") == "k8s":
3761 vca_type
= "k8s_proxy_charm"
3762 elif ee_item
["juju"].get("proxy") is False:
3763 vca_type
= "native_charm"
3764 elif ee_item
.get("helm-chart"):
3765 vca_name
= ee_item
["helm-chart"]
3766 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
3769 vca_type
= "helm-v3"
3772 logging_text
+ "skipping non juju neither charm configuration"
3777 for vca_index
, vca_deployed
in enumerate(
3778 db_nsr
["_admin"]["deployed"]["VCA"]
3780 if not vca_deployed
:
3783 vca_deployed
.get("member-vnf-index") == member_vnf_index
3784 and vca_deployed
.get("vdu_id") == vdu_id
3785 and vca_deployed
.get("kdu_name") == kdu_name
3786 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
3787 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
3791 # not found, create one.
3793 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
3796 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
3798 target
+= "/kdu/{}".format(kdu_name
)
3800 "target_element": target
,
3801 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3802 "member-vnf-index": member_vnf_index
,
3804 "kdu_name": kdu_name
,
3805 "vdu_count_index": vdu_index
,
3806 "operational-status": "init", # TODO revise
3807 "detailed-status": "", # TODO revise
3808 "step": "initial-deploy", # TODO revise
3810 "vdu_name": vdu_name
,
3812 "ee_descriptor_id": ee_descriptor_id
,
3816 # create VCA and configurationStatus in db
3818 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
3819 "configurationStatus.{}".format(vca_index
): dict(),
3821 self
.update_db_2("nsrs", nsr_id
, db_dict
)
3823 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
3825 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
3826 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
3827 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
3830 task_n2vc
= asyncio
.ensure_future(
3831 self
.instantiate_N2VC(
3832 logging_text
=logging_text
,
3833 vca_index
=vca_index
,
3839 vdu_index
=vdu_index
,
3840 deploy_params
=deploy_params
,
3841 config_descriptor
=descriptor_config
,
3842 base_folder
=base_folder
,
3843 nslcmop_id
=nslcmop_id
,
3847 ee_config_descriptor
=ee_item
,
3850 self
.lcm_tasks
.register(
3854 "instantiate_N2VC-{}".format(vca_index
),
3857 task_instantiation_info
[
3859 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
3860 member_vnf_index
or "", vdu_id
or ""
3864 def _create_nslcmop(nsr_id
, operation
, params
):
3866 Creates a ns-lcm-opp content to be stored at database.
3867 :param nsr_id: internal id of the instance
3868 :param operation: instantiate, terminate, scale, action, ...
3869 :param params: user parameters for the operation
3870 :return: dictionary following SOL005 format
3872 # Raise exception if invalid arguments
3873 if not (nsr_id
and operation
and params
):
3875 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3882 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3883 "operationState": "PROCESSING",
3884 "statusEnteredTime": now
,
3885 "nsInstanceId": nsr_id
,
3886 "lcmOperationType": operation
,
3888 "isAutomaticInvocation": False,
3889 "operationParams": params
,
3890 "isCancelPending": False,
3892 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
3893 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
3898 def _format_additional_params(self
, params
):
3899 params
= params
or {}
3900 for key
, value
in params
.items():
3901 if str(value
).startswith("!!yaml "):
3902 params
[key
] = yaml
.safe_load(value
[7:])
3905 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
3906 primitive
= seq
.get("name")
3907 primitive_params
= {}
3909 "member_vnf_index": vnf_index
,
3910 "primitive": primitive
,
3911 "primitive_params": primitive_params
,
3914 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
3918 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
3919 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
3920 if op
.get("operationState") == "COMPLETED":
3921 # b. Skip sub-operation
3922 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3923 return self
.SUBOPERATION_STATUS_SKIP
3925 # c. retry executing sub-operation
3926 # The sub-operation exists, and operationState != 'COMPLETED'
3927 # Update operationState = 'PROCESSING' to indicate a retry.
3928 operationState
= "PROCESSING"
3929 detailed_status
= "In progress"
3930 self
._update
_suboperation
_status
(
3931 db_nslcmop
, op_index
, operationState
, detailed_status
3933 # Return the sub-operation index
3934 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3935 # with arguments extracted from the sub-operation
3938 # Find a sub-operation where all keys in a matching dictionary must match
3939 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3940 def _find_suboperation(self
, db_nslcmop
, match
):
3941 if db_nslcmop
and match
:
3942 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
3943 for i
, op
in enumerate(op_list
):
3944 if all(op
.get(k
) == match
[k
] for k
in match
):
3946 return self
.SUBOPERATION_STATUS_NOT_FOUND
3948 # Update status for a sub-operation given its index
3949 def _update_suboperation_status(
3950 self
, db_nslcmop
, op_index
, operationState
, detailed_status
3952 # Update DB for HA tasks
3953 q_filter
= {"_id": db_nslcmop
["_id"]}
3955 "_admin.operations.{}.operationState".format(op_index
): operationState
,
3956 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
3959 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
3962 # Add sub-operation, return the index of the added sub-operation
3963 # Optionally, set operationState, detailed-status, and operationType
3964 # Status and type are currently set for 'scale' sub-operations:
3965 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3966 # 'detailed-status' : status message
3967 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3968 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3969 def _add_suboperation(
3977 mapped_primitive_params
,
3978 operationState
=None,
3979 detailed_status
=None,
3982 RO_scaling_info
=None,
3985 return self
.SUBOPERATION_STATUS_NOT_FOUND
3986 # Get the "_admin.operations" list, if it exists
3987 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
3988 op_list
= db_nslcmop_admin
.get("operations")
3989 # Create or append to the "_admin.operations" list
3991 "member_vnf_index": vnf_index
,
3993 "vdu_count_index": vdu_count_index
,
3994 "primitive": primitive
,
3995 "primitive_params": mapped_primitive_params
,
3998 new_op
["operationState"] = operationState
4000 new_op
["detailed-status"] = detailed_status
4002 new_op
["lcmOperationType"] = operationType
4004 new_op
["RO_nsr_id"] = RO_nsr_id
4006 new_op
["RO_scaling_info"] = RO_scaling_info
4008 # No existing operations, create key 'operations' with current operation as first list element
4009 db_nslcmop_admin
.update({"operations": [new_op
]})
4010 op_list
= db_nslcmop_admin
.get("operations")
4012 # Existing operations, append operation to list
4013 op_list
.append(new_op
)
4015 db_nslcmop_update
= {"_admin.operations": op_list
}
4016 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
4017 op_index
= len(op_list
) - 1
4020 # Helper methods for scale() sub-operations
4022 # pre-scale/post-scale:
4023 # Check for 3 different cases:
4024 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
4025 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
4026 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
4027 def _check_or_add_scale_suboperation(
4031 vnf_config_primitive
,
4035 RO_scaling_info
=None,
4037 # Find this sub-operation
4038 if RO_nsr_id
and RO_scaling_info
:
4039 operationType
= "SCALE-RO"
4041 "member_vnf_index": vnf_index
,
4042 "RO_nsr_id": RO_nsr_id
,
4043 "RO_scaling_info": RO_scaling_info
,
4047 "member_vnf_index": vnf_index
,
4048 "primitive": vnf_config_primitive
,
4049 "primitive_params": primitive_params
,
4050 "lcmOperationType": operationType
,
4052 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
4053 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
4054 # a. New sub-operation
4055 # The sub-operation does not exist, add it.
4056 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
4057 # The following parameters are set to None for all kind of scaling:
4059 vdu_count_index
= None
4061 if RO_nsr_id
and RO_scaling_info
:
4062 vnf_config_primitive
= None
4063 primitive_params
= None
4066 RO_scaling_info
= None
4067 # Initial status for sub-operation
4068 operationState
= "PROCESSING"
4069 detailed_status
= "In progress"
4070 # Add sub-operation for pre/post-scaling (zero or more operations)
4071 self
._add
_suboperation
(
4077 vnf_config_primitive
,
4085 return self
.SUBOPERATION_STATUS_NEW
4087 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
4088 # or op_index (operationState != 'COMPLETED')
4089 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
4091 # Function to return execution_environment id
4093 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
4094 # TODO vdu_index_count
4095 for vca
in vca_deployed_list
:
4096 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
4099 async def destroy_N2VC(
4107 exec_primitives
=True,
4112 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
4113 :param logging_text:
4115 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
4116 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
4117 :param vca_index: index in the database _admin.deployed.VCA
4118 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
4119 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
4120 not executed properly
4121 :param scaling_in: True destroys the application, False destroys the model
4122 :return: None or exception
4127 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
4128 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
4132 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
4134 # execute terminate_primitives
4136 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
4137 config_descriptor
.get("terminate-config-primitive"),
4138 vca_deployed
.get("ee_descriptor_id"),
4140 vdu_id
= vca_deployed
.get("vdu_id")
4141 vdu_count_index
= vca_deployed
.get("vdu_count_index")
4142 vdu_name
= vca_deployed
.get("vdu_name")
4143 vnf_index
= vca_deployed
.get("member-vnf-index")
4144 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
4145 for seq
in terminate_primitives
:
4146 # For each sequence in list, get primitive and call _ns_execute_primitive()
4147 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
4148 vnf_index
, seq
.get("name")
4150 self
.logger
.debug(logging_text
+ step
)
4151 # Create the primitive for each sequence, i.e. "primitive": "touch"
4152 primitive
= seq
.get("name")
4153 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
4158 self
._add
_suboperation
(
4165 mapped_primitive_params
,
4167 # Sub-operations: Call _ns_execute_primitive() instead of action()
4169 result
, result_detail
= await self
._ns
_execute
_primitive
(
4170 vca_deployed
["ee_id"],
4172 mapped_primitive_params
,
4176 except LcmException
:
4177 # this happens when VCA is not deployed. In this case it is not needed to terminate
4179 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
4180 if result
not in result_ok
:
4182 "terminate_primitive {} for vnf_member_index={} fails with "
4183 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
4185 # set that this VCA do not need terminated
4186 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
4190 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
4193 # Delete Prometheus Jobs if any
4194 # This uses NSR_ID, so it will destroy any jobs under this index
4195 self
.db
.del_list("prometheus_jobs", {"nsr_id": db_nslcmop
["nsInstanceId"]})
4198 await self
.vca_map
[vca_type
].delete_execution_environment(
4199 vca_deployed
["ee_id"],
4200 scaling_in
=scaling_in
,
4205 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
4206 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
4207 namespace
= "." + db_nsr
["_id"]
4209 await self
.n2vc
.delete_namespace(
4210 namespace
=namespace
,
4211 total_timeout
=self
.timeout_charm_delete
,
4214 except N2VCNotFound
: # already deleted. Skip
4216 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
4218 async def _terminate_RO(
4219 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4222 Terminates a deployment from RO
4223 :param logging_text:
4224 :param nsr_deployed: db_nsr._admin.deployed
4227 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
4228 this method will update only the index 2, but it will write on database the concatenated content of the list
4233 ro_nsr_id
= ro_delete_action
= None
4234 if nsr_deployed
and nsr_deployed
.get("RO"):
4235 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
4236 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
4239 stage
[2] = "Deleting ns from VIM."
4240 db_nsr_update
["detailed-status"] = " ".join(stage
)
4241 self
._write
_op
_status
(nslcmop_id
, stage
)
4242 self
.logger
.debug(logging_text
+ stage
[2])
4243 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4244 self
._write
_op
_status
(nslcmop_id
, stage
)
4245 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
4246 ro_delete_action
= desc
["action_id"]
4248 "_admin.deployed.RO.nsr_delete_action_id"
4249 ] = ro_delete_action
4250 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4251 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4252 if ro_delete_action
:
4253 # wait until NS is deleted from VIM
4254 stage
[2] = "Waiting ns deleted from VIM."
4255 detailed_status_old
= None
4259 + " RO_id={} ro_delete_action={}".format(
4260 ro_nsr_id
, ro_delete_action
4263 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4264 self
._write
_op
_status
(nslcmop_id
, stage
)
4266 delete_timeout
= 20 * 60 # 20 minutes
4267 while delete_timeout
> 0:
4268 desc
= await self
.RO
.show(
4270 item_id_name
=ro_nsr_id
,
4271 extra_item
="action",
4272 extra_item_id
=ro_delete_action
,
4276 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
4278 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
4279 if ns_status
== "ERROR":
4280 raise ROclient
.ROClientException(ns_status_info
)
4281 elif ns_status
== "BUILD":
4282 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
4283 elif ns_status
== "ACTIVE":
4284 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4285 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4290 ), "ROclient.check_action_status returns unknown {}".format(
4293 if stage
[2] != detailed_status_old
:
4294 detailed_status_old
= stage
[2]
4295 db_nsr_update
["detailed-status"] = " ".join(stage
)
4296 self
._write
_op
_status
(nslcmop_id
, stage
)
4297 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4298 await asyncio
.sleep(5, loop
=self
.loop
)
4300 else: # delete_timeout <= 0:
4301 raise ROclient
.ROClientException(
4302 "Timeout waiting ns deleted from VIM"
4305 except Exception as e
:
4306 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4308 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4310 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4311 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4312 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4314 logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
)
4317 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4319 failed_detail
.append("delete conflict: {}".format(e
))
4322 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
)
4325 failed_detail
.append("delete error: {}".format(e
))
4327 logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
)
4331 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
4332 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
4334 stage
[2] = "Deleting nsd from RO."
4335 db_nsr_update
["detailed-status"] = " ".join(stage
)
4336 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4337 self
._write
_op
_status
(nslcmop_id
, stage
)
4338 await self
.RO
.delete("nsd", ro_nsd_id
)
4340 logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
)
4342 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4343 except Exception as e
:
4345 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4347 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4349 logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
)
4352 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4354 failed_detail
.append(
4355 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
)
4357 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4359 failed_detail
.append(
4360 "ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
)
4362 self
.logger
.error(logging_text
+ failed_detail
[-1])
4364 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
4365 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
4366 if not vnf_deployed
or not vnf_deployed
["id"]:
4369 ro_vnfd_id
= vnf_deployed
["id"]
4372 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4373 vnf_deployed
["member-vnf-index"], ro_vnfd_id
4375 db_nsr_update
["detailed-status"] = " ".join(stage
)
4376 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4377 self
._write
_op
_status
(nslcmop_id
, stage
)
4378 await self
.RO
.delete("vnfd", ro_vnfd_id
)
4380 logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
)
4382 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
4383 except Exception as e
:
4385 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4388 "_admin.deployed.RO.vnfd.{}.id".format(index
)
4392 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
)
4395 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4397 failed_detail
.append(
4398 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
)
4400 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4402 failed_detail
.append(
4403 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
)
4405 self
.logger
.error(logging_text
+ failed_detail
[-1])
4408 stage
[2] = "Error deleting from VIM"
4410 stage
[2] = "Deleted from VIM"
4411 db_nsr_update
["detailed-status"] = " ".join(stage
)
4412 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4413 self
._write
_op
_status
(nslcmop_id
, stage
)
4416 raise LcmException("; ".join(failed_detail
))
4418 async def terminate(self
, nsr_id
, nslcmop_id
):
4419 # Try to lock HA task here
4420 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4421 if not task_is_locked_by_me
:
4424 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4425 self
.logger
.debug(logging_text
+ "Enter")
4426 timeout_ns_terminate
= self
.timeout_ns_terminate
4429 operation_params
= None
4431 error_list
= [] # annotates all failed error messages
4432 db_nslcmop_update
= {}
4433 autoremove
= False # autoremove after terminated
4434 tasks_dict_info
= {}
4437 "Stage 1/3: Preparing task.",
4438 "Waiting for previous operations to terminate.",
4441 # ^ contains [stage, step, VIM-status]
4443 # wait for any previous tasks in process
4444 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4446 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4447 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4448 operation_params
= db_nslcmop
.get("operationParams") or {}
4449 if operation_params
.get("timeout_ns_terminate"):
4450 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4451 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4452 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4454 db_nsr_update
["operational-status"] = "terminating"
4455 db_nsr_update
["config-status"] = "terminating"
4456 self
._write
_ns
_status
(
4458 ns_state
="TERMINATING",
4459 current_operation
="TERMINATING",
4460 current_operation_id
=nslcmop_id
,
4461 other_update
=db_nsr_update
,
4463 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4464 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4465 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4468 stage
[1] = "Getting vnf descriptors from db."
4469 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4471 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4473 db_vnfds_from_id
= {}
4474 db_vnfds_from_member_index
= {}
4476 for vnfr
in db_vnfrs_list
:
4477 vnfd_id
= vnfr
["vnfd-id"]
4478 if vnfd_id
not in db_vnfds_from_id
:
4479 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4480 db_vnfds_from_id
[vnfd_id
] = vnfd
4481 db_vnfds_from_member_index
[
4482 vnfr
["member-vnf-index-ref"]
4483 ] = db_vnfds_from_id
[vnfd_id
]
4485 # Destroy individual execution environments when there are terminating primitives.
4486 # Rest of EE will be deleted at once
4487 # TODO - check before calling _destroy_N2VC
4488 # if not operation_params.get("skip_terminate_primitives"):#
4489 # or not vca.get("needed_terminate"):
4490 stage
[0] = "Stage 2/3 execute terminating primitives."
4491 self
.logger
.debug(logging_text
+ stage
[0])
4492 stage
[1] = "Looking execution environment that needs terminate."
4493 self
.logger
.debug(logging_text
+ stage
[1])
4495 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4496 config_descriptor
= None
4497 vca_member_vnf_index
= vca
.get("member-vnf-index")
4498 vca_id
= self
.get_vca_id(
4499 db_vnfrs_dict
.get(vca_member_vnf_index
)
4500 if vca_member_vnf_index
4504 if not vca
or not vca
.get("ee_id"):
4506 if not vca
.get("member-vnf-index"):
4508 config_descriptor
= db_nsr
.get("ns-configuration")
4509 elif vca
.get("vdu_id"):
4510 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4511 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4512 elif vca
.get("kdu_name"):
4513 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4514 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4516 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4517 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4518 vca_type
= vca
.get("type")
4519 exec_terminate_primitives
= not operation_params
.get(
4520 "skip_terminate_primitives"
4521 ) and vca
.get("needed_terminate")
4522 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4523 # pending native charms
4525 True if vca_type
in ("helm", "helm-v3", "native_charm") else False
4527 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4528 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4529 task
= asyncio
.ensure_future(
4537 exec_terminate_primitives
,
4541 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4543 # wait for pending tasks of terminate primitives
4547 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4549 error_list
= await self
._wait
_for
_tasks
(
4552 min(self
.timeout_charm_delete
, timeout_ns_terminate
),
4556 tasks_dict_info
.clear()
4558 return # raise LcmException("; ".join(error_list))
4560 # remove All execution environments at once
4561 stage
[0] = "Stage 3/3 delete all."
4563 if nsr_deployed
.get("VCA"):
4564 stage
[1] = "Deleting all execution environments."
4565 self
.logger
.debug(logging_text
+ stage
[1])
4566 vca_id
= self
.get_vca_id({}, db_nsr
)
4567 task_delete_ee
= asyncio
.ensure_future(
4569 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4570 timeout
=self
.timeout_charm_delete
,
4573 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4574 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4576 # Delete from k8scluster
4577 stage
[1] = "Deleting KDUs."
4578 self
.logger
.debug(logging_text
+ stage
[1])
4579 # print(nsr_deployed)
4580 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4581 if not kdu
or not kdu
.get("kdu-instance"):
4583 kdu_instance
= kdu
.get("kdu-instance")
4584 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4585 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4586 vca_id
= self
.get_vca_id({}, db_nsr
)
4587 task_delete_kdu_instance
= asyncio
.ensure_future(
4588 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4589 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4590 kdu_instance
=kdu_instance
,
4592 namespace
=kdu
.get("namespace"),
4598 + "Unknown k8s deployment type {}".format(
4599 kdu
.get("k8scluster-type")
4604 task_delete_kdu_instance
4605 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4608 stage
[1] = "Deleting ns from VIM."
4610 task_delete_ro
= asyncio
.ensure_future(
4611 self
._terminate
_ng
_ro
(
4612 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4616 task_delete_ro
= asyncio
.ensure_future(
4618 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4621 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4623 # rest of staff will be done at finally
4626 ROclient
.ROClientException
,
4631 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4633 except asyncio
.CancelledError
:
4635 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4637 exc
= "Operation was cancelled"
4638 except Exception as e
:
4639 exc
= traceback
.format_exc()
4640 self
.logger
.critical(
4641 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4646 error_list
.append(str(exc
))
4648 # wait for pending tasks
4650 stage
[1] = "Waiting for terminate pending tasks."
4651 self
.logger
.debug(logging_text
+ stage
[1])
4652 error_list
+= await self
._wait
_for
_tasks
(
4655 timeout_ns_terminate
,
4659 stage
[1] = stage
[2] = ""
4660 except asyncio
.CancelledError
:
4661 error_list
.append("Cancelled")
4662 # TODO cancell all tasks
4663 except Exception as exc
:
4664 error_list
.append(str(exc
))
4665 # update status at database
4667 error_detail
= "; ".join(error_list
)
4668 # self.logger.error(logging_text + error_detail)
4669 error_description_nslcmop
= "{} Detail: {}".format(
4670 stage
[0], error_detail
4672 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4673 nslcmop_id
, stage
[0]
4676 db_nsr_update
["operational-status"] = "failed"
4677 db_nsr_update
["detailed-status"] = (
4678 error_description_nsr
+ " Detail: " + error_detail
4680 db_nslcmop_update
["detailed-status"] = error_detail
4681 nslcmop_operation_state
= "FAILED"
4685 error_description_nsr
= error_description_nslcmop
= None
4686 ns_state
= "NOT_INSTANTIATED"
4687 db_nsr_update
["operational-status"] = "terminated"
4688 db_nsr_update
["detailed-status"] = "Done"
4689 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4690 db_nslcmop_update
["detailed-status"] = "Done"
4691 nslcmop_operation_state
= "COMPLETED"
4694 self
._write
_ns
_status
(
4697 current_operation
="IDLE",
4698 current_operation_id
=None,
4699 error_description
=error_description_nsr
,
4700 error_detail
=error_detail
,
4701 other_update
=db_nsr_update
,
4703 self
._write
_op
_status
(
4706 error_message
=error_description_nslcmop
,
4707 operation_state
=nslcmop_operation_state
,
4708 other_update
=db_nslcmop_update
,
4710 if ns_state
== "NOT_INSTANTIATED":
4714 {"nsr-id-ref": nsr_id
},
4715 {"_admin.nsState": "NOT_INSTANTIATED"},
4717 except DbException
as e
:
4720 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4724 if operation_params
:
4725 autoremove
= operation_params
.get("autoremove", False)
4726 if nslcmop_operation_state
:
4728 await self
.msg
.aiowrite(
4733 "nslcmop_id": nslcmop_id
,
4734 "operationState": nslcmop_operation_state
,
4735 "autoremove": autoremove
,
4739 except Exception as e
:
4741 logging_text
+ "kafka_write notification Exception {}".format(e
)
4744 self
.logger
.debug(logging_text
+ "Exit")
4745 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4747 async def _wait_for_tasks(
4748 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4751 error_detail_list
= []
4753 pending_tasks
= list(created_tasks_info
.keys())
4754 num_tasks
= len(pending_tasks
)
4756 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4757 self
._write
_op
_status
(nslcmop_id
, stage
)
4758 while pending_tasks
:
4760 _timeout
= timeout
+ time_start
- time()
4761 done
, pending_tasks
= await asyncio
.wait(
4762 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4764 num_done
+= len(done
)
4765 if not done
: # Timeout
4766 for task
in pending_tasks
:
4767 new_error
= created_tasks_info
[task
] + ": Timeout"
4768 error_detail_list
.append(new_error
)
4769 error_list
.append(new_error
)
4772 if task
.cancelled():
4775 exc
= task
.exception()
4777 if isinstance(exc
, asyncio
.TimeoutError
):
4779 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4780 error_list
.append(created_tasks_info
[task
])
4781 error_detail_list
.append(new_error
)
4788 ROclient
.ROClientException
,
4794 self
.logger
.error(logging_text
+ new_error
)
4796 exc_traceback
= "".join(
4797 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4801 + created_tasks_info
[task
]
4807 logging_text
+ created_tasks_info
[task
] + ": Done"
4809 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4811 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4812 if nsr_id
: # update also nsr
4817 "errorDescription": "Error at: " + ", ".join(error_list
),
4818 "errorDetail": ". ".join(error_detail_list
),
4821 self
._write
_op
_status
(nslcmop_id
, stage
)
4822 return error_detail_list
4825 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4827 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4828 The default-value is used. If it is between < > it look for a value at instantiation_params
4829 :param primitive_desc: portion of VNFD/NSD that describes primitive
4830 :param params: Params provided by user
4831 :param instantiation_params: Instantiation params provided by user
4832 :return: a dictionary with the calculated params
4834 calculated_params
= {}
4835 for parameter
in primitive_desc
.get("parameter", ()):
4836 param_name
= parameter
["name"]
4837 if param_name
in params
:
4838 calculated_params
[param_name
] = params
[param_name
]
4839 elif "default-value" in parameter
or "value" in parameter
:
4840 if "value" in parameter
:
4841 calculated_params
[param_name
] = parameter
["value"]
4843 calculated_params
[param_name
] = parameter
["default-value"]
4845 isinstance(calculated_params
[param_name
], str)
4846 and calculated_params
[param_name
].startswith("<")
4847 and calculated_params
[param_name
].endswith(">")
4849 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4850 calculated_params
[param_name
] = instantiation_params
[
4851 calculated_params
[param_name
][1:-1]
4855 "Parameter {} needed to execute primitive {} not provided".format(
4856 calculated_params
[param_name
], primitive_desc
["name"]
4861 "Parameter {} needed to execute primitive {} not provided".format(
4862 param_name
, primitive_desc
["name"]
4866 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4867 calculated_params
[param_name
] = yaml
.safe_dump(
4868 calculated_params
[param_name
], default_flow_style
=True, width
=256
4870 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4872 ].startswith("!!yaml "):
4873 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4874 if parameter
.get("data-type") == "INTEGER":
4876 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4877 except ValueError: # error converting string to int
4879 "Parameter {} of primitive {} must be integer".format(
4880 param_name
, primitive_desc
["name"]
4883 elif parameter
.get("data-type") == "BOOLEAN":
4884 calculated_params
[param_name
] = not (
4885 (str(calculated_params
[param_name
])).lower() == "false"
4888 # add always ns_config_info if primitive name is config
4889 if primitive_desc
["name"] == "config":
4890 if "ns_config_info" in instantiation_params
:
4891 calculated_params
["ns_config_info"] = instantiation_params
[
4894 return calculated_params
4896 def _look_for_deployed_vca(
4903 ee_descriptor_id
=None,
4905 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4906 for vca
in deployed_vca
:
4909 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4912 vdu_count_index
is not None
4913 and vdu_count_index
!= vca
["vdu_count_index"]
4916 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4918 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4922 # vca_deployed not found
4924 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4925 " is not deployed".format(
4934 ee_id
= vca
.get("ee_id")
4936 "type", "lxc_proxy_charm"
4937 ) # default value for backward compatibility - proxy charm
4940 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4941 "execution environment".format(
4942 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
4945 return ee_id
, vca_type
4947 async def _ns_execute_primitive(
4953 retries_interval
=30,
4960 if primitive
== "config":
4961 primitive_params
= {"params": primitive_params
}
4963 vca_type
= vca_type
or "lxc_proxy_charm"
4967 output
= await asyncio
.wait_for(
4968 self
.vca_map
[vca_type
].exec_primitive(
4970 primitive_name
=primitive
,
4971 params_dict
=primitive_params
,
4972 progress_timeout
=self
.timeout_progress_primitive
,
4973 total_timeout
=self
.timeout_primitive
,
4978 timeout
=timeout
or self
.timeout_primitive
,
4982 except asyncio
.CancelledError
:
4984 except Exception as e
: # asyncio.TimeoutError
4985 if isinstance(e
, asyncio
.TimeoutError
):
4990 "Error executing action {} on {} -> {}".format(
4995 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
4997 return "FAILED", str(e
)
4999 return "COMPLETED", output
5001 except (LcmException
, asyncio
.CancelledError
):
5003 except Exception as e
:
5004 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
5006 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
5008 Updating the vca_status with latest juju information in nsrs record
5009 :param: nsr_id: Id of the nsr
5010 :param: nslcmop_id: Id of the nslcmop
5014 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
5015 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5016 vca_id
= self
.get_vca_id({}, db_nsr
)
5017 if db_nsr
["_admin"]["deployed"]["K8s"]:
5018 for _
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
5019 cluster_uuid
, kdu_instance
, cluster_type
= (
5020 k8s
["k8scluster-uuid"],
5021 k8s
["kdu-instance"],
5022 k8s
["k8scluster-type"],
5024 await self
._on
_update
_k
8s
_db
(
5025 cluster_uuid
=cluster_uuid
,
5026 kdu_instance
=kdu_instance
,
5027 filter={"_id": nsr_id
},
5029 cluster_type
=cluster_type
,
5032 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
5033 table
, filter = "nsrs", {"_id": nsr_id
}
5034 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
5035 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
5037 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
5038 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
5040 async def action(self
, nsr_id
, nslcmop_id
):
5041 # Try to lock HA task here
5042 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5043 if not task_is_locked_by_me
:
5046 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
5047 self
.logger
.debug(logging_text
+ "Enter")
5048 # get all needed from database
5052 db_nslcmop_update
= {}
5053 nslcmop_operation_state
= None
5054 error_description_nslcmop
= None
5057 # wait for any previous tasks in process
5058 step
= "Waiting for previous operations to terminate"
5059 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5061 self
._write
_ns
_status
(
5064 current_operation
="RUNNING ACTION",
5065 current_operation_id
=nslcmop_id
,
5068 step
= "Getting information from database"
5069 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5070 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5071 if db_nslcmop
["operationParams"].get("primitive_params"):
5072 db_nslcmop
["operationParams"]["primitive_params"] = json
.loads(
5073 db_nslcmop
["operationParams"]["primitive_params"]
5076 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5077 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
5078 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
5079 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
5080 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
5081 primitive
= db_nslcmop
["operationParams"]["primitive"]
5082 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
5083 timeout_ns_action
= db_nslcmop
["operationParams"].get(
5084 "timeout_ns_action", self
.timeout_primitive
5088 step
= "Getting vnfr from database"
5089 db_vnfr
= self
.db
.get_one(
5090 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
5092 if db_vnfr
.get("kdur"):
5094 for kdur
in db_vnfr
["kdur"]:
5095 if kdur
.get("additionalParams"):
5096 kdur
["additionalParams"] = json
.loads(
5097 kdur
["additionalParams"]
5099 kdur_list
.append(kdur
)
5100 db_vnfr
["kdur"] = kdur_list
5101 step
= "Getting vnfd from database"
5102 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
5104 # Sync filesystem before running a primitive
5105 self
.fs
.sync(db_vnfr
["vnfd-id"])
5107 step
= "Getting nsd from database"
5108 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
5110 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5111 # for backward compatibility
5112 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5113 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5114 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5115 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5117 # look for primitive
5118 config_primitive_desc
= descriptor_configuration
= None
5120 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
5122 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
5124 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
5126 descriptor_configuration
= db_nsd
.get("ns-configuration")
5128 if descriptor_configuration
and descriptor_configuration
.get(
5131 for config_primitive
in descriptor_configuration
["config-primitive"]:
5132 if config_primitive
["name"] == primitive
:
5133 config_primitive_desc
= config_primitive
5136 if not config_primitive_desc
:
5137 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
5139 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
5143 primitive_name
= primitive
5144 ee_descriptor_id
= None
5146 primitive_name
= config_primitive_desc
.get(
5147 "execution-environment-primitive", primitive
5149 ee_descriptor_id
= config_primitive_desc
.get(
5150 "execution-environment-ref"
5156 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
5158 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
5161 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
5163 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
5165 desc_params
= parse_yaml_strings(
5166 db_vnfr
.get("additionalParamsForVnf")
5169 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
5170 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
5171 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
5173 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
5174 actions
.add(primitive
["name"])
5175 for primitive
in kdu_configuration
.get("config-primitive", []):
5176 actions
.add(primitive
["name"])
5178 nsr_deployed
["K8s"],
5179 lambda kdu
: kdu_name
== kdu
["kdu-name"]
5180 and kdu
["member-vnf-index"] == vnf_index
,
5184 if primitive_name
in actions
5185 and kdu
["k8scluster-type"] not in ("helm-chart", "helm-chart-v3")
5189 # TODO check if ns is in a proper status
5191 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
5193 # kdur and desc_params already set from before
5194 if primitive_params
:
5195 desc_params
.update(primitive_params
)
5196 # TODO Check if we will need something at vnf level
5197 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
5199 kdu_name
== kdu
["kdu-name"]
5200 and kdu
["member-vnf-index"] == vnf_index
5205 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
5208 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
5209 msg
= "unknown k8scluster-type '{}'".format(
5210 kdu
.get("k8scluster-type")
5212 raise LcmException(msg
)
5215 "collection": "nsrs",
5216 "filter": {"_id": nsr_id
},
5217 "path": "_admin.deployed.K8s.{}".format(index
),
5221 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
5223 step
= "Executing kdu {}".format(primitive_name
)
5224 if primitive_name
== "upgrade":
5225 if desc_params
.get("kdu_model"):
5226 kdu_model
= desc_params
.get("kdu_model")
5227 del desc_params
["kdu_model"]
5229 kdu_model
= kdu
.get("kdu-model")
5230 parts
= kdu_model
.split(sep
=":")
5232 kdu_model
= parts
[0]
5234 detailed_status
= await asyncio
.wait_for(
5235 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
5236 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5237 kdu_instance
=kdu
.get("kdu-instance"),
5239 kdu_model
=kdu_model
,
5242 timeout
=timeout_ns_action
,
5244 timeout
=timeout_ns_action
+ 10,
5247 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
5249 elif primitive_name
== "rollback":
5250 detailed_status
= await asyncio
.wait_for(
5251 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
5252 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5253 kdu_instance
=kdu
.get("kdu-instance"),
5256 timeout
=timeout_ns_action
,
5258 elif primitive_name
== "status":
5259 detailed_status
= await asyncio
.wait_for(
5260 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
5261 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5262 kdu_instance
=kdu
.get("kdu-instance"),
5265 timeout
=timeout_ns_action
,
5268 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
5269 kdu
["kdu-name"], nsr_id
5271 params
= self
._map
_primitive
_params
(
5272 config_primitive_desc
, primitive_params
, desc_params
5275 detailed_status
= await asyncio
.wait_for(
5276 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
5277 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5278 kdu_instance
=kdu_instance
,
5279 primitive_name
=primitive_name
,
5282 timeout
=timeout_ns_action
,
5285 timeout
=timeout_ns_action
,
5289 nslcmop_operation_state
= "COMPLETED"
5291 detailed_status
= ""
5292 nslcmop_operation_state
= "FAILED"
5294 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5295 nsr_deployed
["VCA"],
5296 member_vnf_index
=vnf_index
,
5298 vdu_count_index
=vdu_count_index
,
5299 ee_descriptor_id
=ee_descriptor_id
,
5301 for vca_index
, vca_deployed
in enumerate(
5302 db_nsr
["_admin"]["deployed"]["VCA"]
5304 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5306 "collection": "nsrs",
5307 "filter": {"_id": nsr_id
},
5308 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
5312 nslcmop_operation_state
,
5314 ) = await self
._ns
_execute
_primitive
(
5316 primitive
=primitive_name
,
5317 primitive_params
=self
._map
_primitive
_params
(
5318 config_primitive_desc
, primitive_params
, desc_params
5320 timeout
=timeout_ns_action
,
5326 db_nslcmop_update
["detailed-status"] = detailed_status
5327 error_description_nslcmop
= (
5328 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
5332 + " task Done with result {} {}".format(
5333 nslcmop_operation_state
, detailed_status
5336 return # database update is called inside finally
5338 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5339 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5341 except asyncio
.CancelledError
:
5343 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5345 exc
= "Operation was cancelled"
5346 except asyncio
.TimeoutError
:
5347 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5349 except Exception as e
:
5350 exc
= traceback
.format_exc()
5351 self
.logger
.critical(
5352 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5361 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5362 nslcmop_operation_state
= "FAILED"
5364 self
._write
_ns
_status
(
5368 ], # TODO check if degraded. For the moment use previous status
5369 current_operation
="IDLE",
5370 current_operation_id
=None,
5371 # error_description=error_description_nsr,
5372 # error_detail=error_detail,
5373 other_update
=db_nsr_update
,
5376 self
._write
_op
_status
(
5379 error_message
=error_description_nslcmop
,
5380 operation_state
=nslcmop_operation_state
,
5381 other_update
=db_nslcmop_update
,
5384 if nslcmop_operation_state
:
5386 await self
.msg
.aiowrite(
5391 "nslcmop_id": nslcmop_id
,
5392 "operationState": nslcmop_operation_state
,
5396 except Exception as e
:
5398 logging_text
+ "kafka_write notification Exception {}".format(e
)
5400 self
.logger
.debug(logging_text
+ "Exit")
5401 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5402 return nslcmop_operation_state
, detailed_status
5404 async def terminate_vdus(
5405 self
, db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
5407 """This method terminates VDUs
5410 db_vnfr: VNF instance record
5411 member_vnf_index: VNF index to identify the VDUs to be removed
5412 db_nsr: NS instance record
5413 update_db_nslcmops: Nslcmop update record
5415 vca_scaling_info
= []
5416 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5417 scaling_info
["scaling_direction"] = "IN"
5418 scaling_info
["vdu-delete"] = {}
5419 scaling_info
["kdu-delete"] = {}
5420 db_vdur
= db_vnfr
.get("vdur")
5421 vdur_list
= copy(db_vdur
)
5423 for index
, vdu
in enumerate(vdur_list
):
5424 vca_scaling_info
.append(
5426 "osm_vdu_id": vdu
["vdu-id-ref"],
5427 "member-vnf-index": member_vnf_index
,
5429 "vdu_index": count_index
,
5431 scaling_info
["vdu-delete"][vdu
["vdu-id-ref"]] = count_index
5432 scaling_info
["vdu"].append(
5434 "name": vdu
.get("name") or vdu
.get("vdu-name"),
5435 "vdu_id": vdu
["vdu-id-ref"],
5438 for interface
in vdu
["interfaces"]:
5439 scaling_info
["vdu"][index
]["interface"].append(
5441 "name": interface
["name"],
5442 "ip_address": interface
["ip-address"],
5443 "mac_address": interface
.get("mac-address"),
5445 self
.logger
.info("NS update scaling info{}".format(scaling_info
))
5446 stage
[2] = "Terminating VDUs"
5447 if scaling_info
.get("vdu-delete"):
5448 # scale_process = "RO"
5449 if self
.ro_config
.get("ng"):
5450 await self
._scale
_ng
_ro
(
5451 logging_text
, db_nsr
, update_db_nslcmops
, db_vnfr
, scaling_info
, stage
5454 async def remove_vnf(
5455 self
, nsr_id
, nslcmop_id
, vnf_instance_id
5457 """This method is to Remove VNF instances from NS.
5460 nsr_id: NS instance id
5461 nslcmop_id: nslcmop id of update
5462 vnf_instance_id: id of the VNF instance to be removed
5465 result: (str, str) COMPLETED/FAILED, details
5469 logging_text
= "Task ns={} update ".format(nsr_id
)
5470 check_vnfr_count
= len(self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}))
5471 self
.logger
.info("check_vnfr_count {}".format(check_vnfr_count
))
5472 if check_vnfr_count
> 1:
5473 stage
= ["", "", ""]
5474 step
= "Getting nslcmop from database"
5475 self
.logger
.debug(step
+ " after having waited for previous tasks to be completed")
5476 # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5477 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5478 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5479 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5480 """ db_vnfr = self.db.get_one(
5481 "vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id}) """
5483 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5484 await self
.terminate_vdus(db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
)
5486 constituent_vnfr
= db_nsr
.get("constituent-vnfr-ref")
5487 constituent_vnfr
.remove(db_vnfr
.get("_id"))
5488 db_nsr_update
["constituent-vnfr-ref"] = db_nsr
.get("constituent-vnfr-ref")
5489 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5490 self
.db
.del_one("vnfrs", {"_id": db_vnfr
.get("_id")})
5491 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5492 return "COMPLETED", "Done"
5494 step
= "Terminate VNF Failed with"
5495 raise LcmException("{} Cannot terminate the last VNF in this NS.".format(
5497 except (LcmException
, asyncio
.CancelledError
):
5499 except Exception as e
:
5500 self
.logger
.debug("Error removing VNF {}".format(e
))
5501 return "FAILED", "Error removing VNF {}".format(e
)
5503 async def _ns_redeploy_vnf(
5504 self
, nsr_id
, nslcmop_id
, db_vnfd
, db_vnfr
, db_nsr
,
5506 """This method updates and redeploys VNF instances
5509 nsr_id: NS instance id
5510 nslcmop_id: nslcmop id
5511 db_vnfd: VNF descriptor
5512 db_vnfr: VNF instance record
5513 db_nsr: NS instance record
5516 result: (str, str) COMPLETED/FAILED, details
5520 stage
= ["", "", ""]
5521 logging_text
= "Task ns={} update ".format(nsr_id
)
5522 latest_vnfd_revision
= db_vnfd
["_admin"].get("revision")
5523 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5525 # Terminate old VNF resources
5526 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5527 await self
.terminate_vdus(db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
)
5529 # old_vnfd_id = db_vnfr["vnfd-id"]
5530 # new_db_vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
5531 new_db_vnfd
= db_vnfd
5532 # new_vnfd_ref = new_db_vnfd["id"]
5533 # new_vnfd_id = vnfd_id
5537 for cp
in new_db_vnfd
.get("ext-cpd", ()):
5539 "name": cp
.get("id"),
5540 "connection-point-id": cp
.get("int-cpd", {}).get("cpd"),
5541 "connection-point-vdu-id": cp
.get("int-cpd", {}).get("vdu-id"),
5544 new_vnfr_cp
.append(vnf_cp
)
5545 new_vdur
= update_db_nslcmops
["operationParams"]["newVdur"]
5546 # new_vdur = self._create_vdur_descriptor_from_vnfd(db_nsd, db_vnfd, old_db_vnfd, vnfd_id, db_nsr, member_vnf_index)
5547 # new_vnfr_update = {"vnfd-ref": new_vnfd_ref, "vnfd-id": new_vnfd_id, "connection-point": new_vnfr_cp, "vdur": new_vdur, "ip-address": ""}
5548 new_vnfr_update
= {"revision": latest_vnfd_revision
, "connection-point": new_vnfr_cp
, "vdur": new_vdur
, "ip-address": ""}
5549 self
.update_db_2("vnfrs", db_vnfr
["_id"], new_vnfr_update
)
5550 updated_db_vnfr
= self
.db
.get_one(
5551 "vnfrs", {"member-vnf-index-ref": member_vnf_index
, "nsr-id-ref": nsr_id
}
5554 # Instantiate new VNF resources
5555 # update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5556 vca_scaling_info
= []
5557 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5558 scaling_info
["scaling_direction"] = "OUT"
5559 scaling_info
["vdu-create"] = {}
5560 scaling_info
["kdu-create"] = {}
5561 vdud_instantiate_list
= db_vnfd
["vdu"]
5562 for index
, vdud
in enumerate(vdud_instantiate_list
):
5563 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
5567 additional_params
= (
5568 self
._get
_vdu
_additional
_params
(updated_db_vnfr
, vdud
["id"])
5571 cloud_init_list
= []
5573 # TODO Information of its own ip is not available because db_vnfr is not updated.
5574 additional_params
["OSM"] = get_osm_params(
5575 updated_db_vnfr
, vdud
["id"], 1
5577 cloud_init_list
.append(
5578 self
._parse
_cloud
_init
(
5585 vca_scaling_info
.append(
5587 "osm_vdu_id": vdud
["id"],
5588 "member-vnf-index": member_vnf_index
,
5590 "vdu_index": count_index
,
5593 scaling_info
["vdu-create"][vdud
["id"]] = count_index
5594 if self
.ro_config
.get("ng"):
5596 "New Resources to be deployed: {}".format(scaling_info
))
5597 await self
._scale
_ng
_ro
(
5598 logging_text
, db_nsr
, update_db_nslcmops
, updated_db_vnfr
, scaling_info
, stage
5600 return "COMPLETED", "Done"
5601 except (LcmException
, asyncio
.CancelledError
):
5603 except Exception as e
:
5604 self
.logger
.debug("Error updating VNF {}".format(e
))
5605 return "FAILED", "Error updating VNF {}".format(e
)
5607 async def _ns_charm_upgrade(
5613 timeout
: float = None,
5615 """This method upgrade charms in VNF instances
5618 ee_id: Execution environment id
5619 path: Local path to the charm
5621 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
5622 timeout: (Float) Timeout for the ns update operation
5625 result: (str, str) COMPLETED/FAILED, details
5628 charm_type
= charm_type
or "lxc_proxy_charm"
5629 output
= await self
.vca_map
[charm_type
].upgrade_charm(
5633 charm_type
=charm_type
,
5634 timeout
=timeout
or self
.timeout_ns_update
,
5638 return "COMPLETED", output
5640 except (LcmException
, asyncio
.CancelledError
):
5643 except Exception as e
:
5645 self
.logger
.debug("Error upgrading charm {}".format(path
))
5647 return "FAILED", "Error upgrading charm {}: {}".format(path
, e
)
5649 async def update(self
, nsr_id
, nslcmop_id
):
5650 """Update NS according to different update types
5652 This method performs upgrade of VNF instances then updates the revision
5653 number in VNF record
5656 nsr_id: Network service will be updated
5657 nslcmop_id: ns lcm operation id
5660 It may raise DbException, LcmException, N2VCException, K8sException
5663 # Try to lock HA task here
5664 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5665 if not task_is_locked_by_me
:
5668 logging_text
= "Task ns={} update={} ".format(nsr_id
, nslcmop_id
)
5669 self
.logger
.debug(logging_text
+ "Enter")
5671 # Set the required variables to be filled up later
5673 db_nslcmop_update
= {}
5675 nslcmop_operation_state
= None
5677 error_description_nslcmop
= ""
5679 change_type
= "updated"
5680 detailed_status
= ""
5683 # wait for any previous tasks in process
5684 step
= "Waiting for previous operations to terminate"
5685 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5686 self
._write
_ns
_status
(
5689 current_operation
="UPDATING",
5690 current_operation_id
=nslcmop_id
,
5693 step
= "Getting nslcmop from database"
5694 db_nslcmop
= self
.db
.get_one(
5695 "nslcmops", {"_id": nslcmop_id
}, fail_on_empty
=False
5697 update_type
= db_nslcmop
["operationParams"]["updateType"]
5699 step
= "Getting nsr from database"
5700 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5701 old_operational_status
= db_nsr
["operational-status"]
5702 db_nsr_update
["operational-status"] = "updating"
5703 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5704 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5706 if update_type
== "CHANGE_VNFPKG":
5708 # Get the input parameters given through update request
5709 vnf_instance_id
= db_nslcmop
["operationParams"][
5710 "changeVnfPackageData"
5711 ].get("vnfInstanceId")
5713 vnfd_id
= db_nslcmop
["operationParams"]["changeVnfPackageData"].get(
5716 timeout_seconds
= db_nslcmop
["operationParams"].get("timeout_ns_update")
5718 step
= "Getting vnfr from database"
5719 db_vnfr
= self
.db
.get_one(
5720 "vnfrs", {"_id": vnf_instance_id
}, fail_on_empty
=False
5723 step
= "Getting vnfds from database"
5725 latest_vnfd
= self
.db
.get_one(
5726 "vnfds", {"_id": vnfd_id
}, fail_on_empty
=False
5728 latest_vnfd_revision
= latest_vnfd
["_admin"].get("revision")
5731 current_vnf_revision
= db_vnfr
.get("revision", 1)
5732 current_vnfd
= self
.db
.get_one(
5734 {"_id": vnfd_id
+ ":" + str(current_vnf_revision
)},
5735 fail_on_empty
=False,
5737 # Charm artifact paths will be filled up later
5739 current_charm_artifact_path
,
5740 target_charm_artifact_path
,
5741 charm_artifact_paths
,
5744 step
= "Checking if revision has changed in VNFD"
5745 if current_vnf_revision
!= latest_vnfd_revision
:
5747 change_type
= "policy_updated"
5749 # There is new revision of VNFD, update operation is required
5750 current_vnfd_path
= vnfd_id
+ ":" + str(current_vnf_revision
)
5751 latest_vnfd_path
= vnfd_id
+ ":" + str(latest_vnfd_revision
)
5753 step
= "Removing the VNFD packages if they exist in the local path"
5754 shutil
.rmtree(self
.fs
.path
+ current_vnfd_path
, ignore_errors
=True)
5755 shutil
.rmtree(self
.fs
.path
+ latest_vnfd_path
, ignore_errors
=True)
5757 step
= "Get the VNFD packages from FSMongo"
5758 self
.fs
.sync(from_path
=latest_vnfd_path
)
5759 self
.fs
.sync(from_path
=current_vnfd_path
)
5762 "Get the charm-type, charm-id, ee-id if there is deployed VCA"
5764 base_folder
= latest_vnfd
["_admin"]["storage"]
5766 for charm_index
, charm_deployed
in enumerate(
5767 get_iterable(nsr_deployed
, "VCA")
5769 vnf_index
= db_vnfr
.get("member-vnf-index-ref")
5771 # Getting charm-id and charm-type
5772 if charm_deployed
.get("member-vnf-index") == vnf_index
:
5773 charm_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5774 charm_type
= charm_deployed
.get("type")
5777 ee_id
= charm_deployed
.get("ee_id")
5779 step
= "Getting descriptor config"
5780 descriptor_config
= get_configuration(
5781 current_vnfd
, current_vnfd
["id"]
5784 if "execution-environment-list" in descriptor_config
:
5785 ee_list
= descriptor_config
.get(
5786 "execution-environment-list", []
5791 # There could be several charm used in the same VNF
5792 for ee_item
in ee_list
:
5793 if ee_item
.get("juju"):
5795 step
= "Getting charm name"
5796 charm_name
= ee_item
["juju"].get("charm")
5798 step
= "Setting Charm artifact paths"
5799 current_charm_artifact_path
.append(
5800 get_charm_artifact_path(
5804 current_vnf_revision
,
5807 target_charm_artifact_path
.append(
5808 get_charm_artifact_path(
5812 latest_vnfd_revision
,
5816 charm_artifact_paths
= zip(
5817 current_charm_artifact_path
, target_charm_artifact_path
5820 step
= "Checking if software version has changed in VNFD"
5821 if find_software_version(current_vnfd
) != find_software_version(
5825 step
= "Checking if existing VNF has charm"
5826 for current_charm_path
, target_charm_path
in list(
5827 charm_artifact_paths
5829 if current_charm_path
:
5831 "Software version change is not supported as VNF instance {} has charm.".format(
5836 # There is no change in the charm package, then redeploy the VNF
5837 # based on new descriptor
5838 step
= "Redeploying VNF"
5839 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5843 ) = await self
._ns
_redeploy
_vnf
(
5850 if result
== "FAILED":
5851 nslcmop_operation_state
= result
5852 error_description_nslcmop
= detailed_status
5853 db_nslcmop_update
["detailed-status"] = detailed_status
5856 + " step {} Done with result {} {}".format(
5857 step
, nslcmop_operation_state
, detailed_status
5862 step
= "Checking if any charm package has changed or not"
5863 for current_charm_path
, target_charm_path
in list(
5864 charm_artifact_paths
5868 and target_charm_path
5869 and self
.check_charm_hash_changed(
5870 current_charm_path
, target_charm_path
5874 step
= "Checking whether VNF uses juju bundle"
5875 if check_juju_bundle_existence(current_vnfd
):
5878 "Charm upgrade is not supported for the instance which"
5879 " uses juju-bundle: {}".format(
5880 check_juju_bundle_existence(current_vnfd
)
5884 step
= "Upgrading Charm"
5888 ) = await self
._ns
_charm
_upgrade
(
5891 charm_type
=charm_type
,
5892 path
=self
.fs
.path
+ target_charm_path
,
5893 timeout
=timeout_seconds
,
5896 if result
== "FAILED":
5897 nslcmop_operation_state
= result
5898 error_description_nslcmop
= detailed_status
5900 db_nslcmop_update
["detailed-status"] = detailed_status
5903 + " step {} Done with result {} {}".format(
5904 step
, nslcmop_operation_state
, detailed_status
5908 step
= "Updating policies"
5909 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5910 result
= "COMPLETED"
5911 detailed_status
= "Done"
5912 db_nslcmop_update
["detailed-status"] = "Done"
5914 # If nslcmop_operation_state is None, so any operation is not failed.
5915 if not nslcmop_operation_state
:
5916 nslcmop_operation_state
= "COMPLETED"
5918 # If update CHANGE_VNFPKG nslcmop_operation is successful
5919 # vnf revision need to be updated
5920 vnfr_update
["revision"] = latest_vnfd_revision
5921 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
5925 + " task Done with result {} {}".format(
5926 nslcmop_operation_state
, detailed_status
5929 elif update_type
== "REMOVE_VNF":
5930 # This part is included in https://osm.etsi.org/gerrit/11876
5931 vnf_instance_id
= db_nslcmop
["operationParams"]["removeVnfInstanceId"]
5932 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5933 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5934 step
= "Removing VNF"
5935 (result
, detailed_status
) = await self
.remove_vnf(nsr_id
, nslcmop_id
, vnf_instance_id
)
5936 if result
== "FAILED":
5937 nslcmop_operation_state
= result
5938 error_description_nslcmop
= detailed_status
5939 db_nslcmop_update
["detailed-status"] = detailed_status
5940 change_type
= "vnf_terminated"
5941 if not nslcmop_operation_state
:
5942 nslcmop_operation_state
= "COMPLETED"
5945 + " task Done with result {} {}".format(
5946 nslcmop_operation_state
, detailed_status
5950 elif update_type
== "OPERATE_VNF":
5951 vnf_id
= db_nslcmop
["operationParams"]["operateVnfData"]["vnfInstanceId"]
5952 operation_type
= db_nslcmop
["operationParams"]["operateVnfData"]["changeStateTo"]
5953 additional_param
= db_nslcmop
["operationParams"]["operateVnfData"]["additionalParam"]
5954 (result
, detailed_status
) = await self
.rebuild_start_stop(
5955 nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
5957 if result
== "FAILED":
5958 nslcmop_operation_state
= result
5959 error_description_nslcmop
= detailed_status
5960 db_nslcmop_update
["detailed-status"] = detailed_status
5961 if not nslcmop_operation_state
:
5962 nslcmop_operation_state
= "COMPLETED"
5965 + " task Done with result {} {}".format(
5966 nslcmop_operation_state
, detailed_status
5970 # If nslcmop_operation_state is None, so any operation is not failed.
5971 # All operations are executed in overall.
5972 if not nslcmop_operation_state
:
5973 nslcmop_operation_state
= "COMPLETED"
5974 db_nsr_update
["operational-status"] = old_operational_status
5976 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5977 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5979 except asyncio
.CancelledError
:
5981 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5983 exc
= "Operation was cancelled"
5984 except asyncio
.TimeoutError
:
5985 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5987 except Exception as e
:
5988 exc
= traceback
.format_exc()
5989 self
.logger
.critical(
5990 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5999 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
6000 nslcmop_operation_state
= "FAILED"
6001 db_nsr_update
["operational-status"] = old_operational_status
6003 self
._write
_ns
_status
(
6005 ns_state
=db_nsr
["nsState"],
6006 current_operation
="IDLE",
6007 current_operation_id
=None,
6008 other_update
=db_nsr_update
,
6011 self
._write
_op
_status
(
6014 error_message
=error_description_nslcmop
,
6015 operation_state
=nslcmop_operation_state
,
6016 other_update
=db_nslcmop_update
,
6019 if nslcmop_operation_state
:
6023 "nslcmop_id": nslcmop_id
,
6024 "operationState": nslcmop_operation_state
,
6026 if change_type
in ("vnf_terminated", "policy_updated"):
6027 msg
.update({"vnf_member_index": member_vnf_index
})
6028 await self
.msg
.aiowrite("ns", change_type
, msg
, loop
=self
.loop
)
6029 except Exception as e
:
6031 logging_text
+ "kafka_write notification Exception {}".format(e
)
6033 self
.logger
.debug(logging_text
+ "Exit")
6034 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_update")
6035 return nslcmop_operation_state
, detailed_status
6037 async def scale(self
, nsr_id
, nslcmop_id
):
6038 # Try to lock HA task here
6039 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
6040 if not task_is_locked_by_me
:
6043 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
6044 stage
= ["", "", ""]
6045 tasks_dict_info
= {}
6046 # ^ stage, step, VIM progress
6047 self
.logger
.debug(logging_text
+ "Enter")
6048 # get all needed from database
6050 db_nslcmop_update
= {}
6053 # in case of error, indicates what part of scale was failed to put nsr at error status
6054 scale_process
= None
6055 old_operational_status
= ""
6056 old_config_status
= ""
6059 # wait for any previous tasks in process
6060 step
= "Waiting for previous operations to terminate"
6061 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
6062 self
._write
_ns
_status
(
6065 current_operation
="SCALING",
6066 current_operation_id
=nslcmop_id
,
6069 step
= "Getting nslcmop from database"
6071 step
+ " after having waited for previous tasks to be completed"
6073 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
6075 step
= "Getting nsr from database"
6076 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
6077 old_operational_status
= db_nsr
["operational-status"]
6078 old_config_status
= db_nsr
["config-status"]
6080 step
= "Parsing scaling parameters"
6081 db_nsr_update
["operational-status"] = "scaling"
6082 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6083 nsr_deployed
= db_nsr
["_admin"].get("deployed")
6085 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
6087 ]["member-vnf-index"]
6088 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
6090 ]["scaling-group-descriptor"]
6091 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
6092 # for backward compatibility
6093 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
6094 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
6095 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
6096 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6098 step
= "Getting vnfr from database"
6099 db_vnfr
= self
.db
.get_one(
6100 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
6103 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
6105 step
= "Getting vnfd from database"
6106 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
6108 base_folder
= db_vnfd
["_admin"]["storage"]
6110 step
= "Getting scaling-group-descriptor"
6111 scaling_descriptor
= find_in_list(
6112 get_scaling_aspect(db_vnfd
),
6113 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
6115 if not scaling_descriptor
:
6117 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
6118 "at vnfd:scaling-group-descriptor".format(scaling_group
)
6121 step
= "Sending scale order to VIM"
6122 # TODO check if ns is in a proper status
6124 if not db_nsr
["_admin"].get("scaling-group"):
6129 "_admin.scaling-group": [
6130 {"name": scaling_group
, "nb-scale-op": 0}
6134 admin_scale_index
= 0
6136 for admin_scale_index
, admin_scale_info
in enumerate(
6137 db_nsr
["_admin"]["scaling-group"]
6139 if admin_scale_info
["name"] == scaling_group
:
6140 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
6142 else: # not found, set index one plus last element and add new entry with the name
6143 admin_scale_index
+= 1
6145 "_admin.scaling-group.{}.name".format(admin_scale_index
)
6148 vca_scaling_info
= []
6149 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
6150 if scaling_type
== "SCALE_OUT":
6151 if "aspect-delta-details" not in scaling_descriptor
:
6153 "Aspect delta details not fount in scaling descriptor {}".format(
6154 scaling_descriptor
["name"]
6157 # count if max-instance-count is reached
6158 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6160 scaling_info
["scaling_direction"] = "OUT"
6161 scaling_info
["vdu-create"] = {}
6162 scaling_info
["kdu-create"] = {}
6163 for delta
in deltas
:
6164 for vdu_delta
in delta
.get("vdu-delta", {}):
6165 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
6166 # vdu_index also provides the number of instance of the targeted vdu
6167 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6168 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
6172 additional_params
= (
6173 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
6176 cloud_init_list
= []
6178 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6179 max_instance_count
= 10
6180 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
6181 max_instance_count
= vdu_profile
.get(
6182 "max-number-of-instances", 10
6185 default_instance_num
= get_number_of_instances(
6188 instances_number
= vdu_delta
.get("number-of-instances", 1)
6189 nb_scale_op
+= instances_number
6191 new_instance_count
= nb_scale_op
+ default_instance_num
6192 # Control if new count is over max and vdu count is less than max.
6193 # Then assign new instance count
6194 if new_instance_count
> max_instance_count
> vdu_count
:
6195 instances_number
= new_instance_count
- max_instance_count
6197 instances_number
= instances_number
6199 if new_instance_count
> max_instance_count
:
6201 "reached the limit of {} (max-instance-count) "
6202 "scaling-out operations for the "
6203 "scaling-group-descriptor '{}'".format(
6204 nb_scale_op
, scaling_group
6207 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6209 # TODO Information of its own ip is not available because db_vnfr is not updated.
6210 additional_params
["OSM"] = get_osm_params(
6211 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
6213 cloud_init_list
.append(
6214 self
._parse
_cloud
_init
(
6221 vca_scaling_info
.append(
6223 "osm_vdu_id": vdu_delta
["id"],
6224 "member-vnf-index": vnf_index
,
6226 "vdu_index": vdu_index
+ x
,
6229 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
6230 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6231 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6232 kdu_name
= kdu_profile
["kdu-name"]
6233 resource_name
= kdu_profile
.get("resource-name", "")
6235 # Might have different kdus in the same delta
6236 # Should have list for each kdu
6237 if not scaling_info
["kdu-create"].get(kdu_name
, None):
6238 scaling_info
["kdu-create"][kdu_name
] = []
6240 kdur
= get_kdur(db_vnfr
, kdu_name
)
6241 if kdur
.get("helm-chart"):
6242 k8s_cluster_type
= "helm-chart-v3"
6243 self
.logger
.debug("kdur: {}".format(kdur
))
6245 kdur
.get("helm-version")
6246 and kdur
.get("helm-version") == "v2"
6248 k8s_cluster_type
= "helm-chart"
6249 elif kdur
.get("juju-bundle"):
6250 k8s_cluster_type
= "juju-bundle"
6253 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6254 "juju-bundle. Maybe an old NBI version is running".format(
6255 db_vnfr
["member-vnf-index-ref"], kdu_name
6259 max_instance_count
= 10
6260 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
6261 max_instance_count
= kdu_profile
.get(
6262 "max-number-of-instances", 10
6265 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
6266 deployed_kdu
, _
= get_deployed_kdu(
6267 nsr_deployed
, kdu_name
, vnf_index
6269 if deployed_kdu
is None:
6271 "KDU '{}' for vnf '{}' not deployed".format(
6275 kdu_instance
= deployed_kdu
.get("kdu-instance")
6276 instance_num
= await self
.k8scluster_map
[
6282 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6283 kdu_model
=deployed_kdu
.get("kdu-model"),
6285 kdu_replica_count
= instance_num
+ kdu_delta
.get(
6286 "number-of-instances", 1
6289 # Control if new count is over max and instance_num is less than max.
6290 # Then assign max instance number to kdu replica count
6291 if kdu_replica_count
> max_instance_count
> instance_num
:
6292 kdu_replica_count
= max_instance_count
6293 if kdu_replica_count
> max_instance_count
:
6295 "reached the limit of {} (max-instance-count) "
6296 "scaling-out operations for the "
6297 "scaling-group-descriptor '{}'".format(
6298 instance_num
, scaling_group
6302 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6303 vca_scaling_info
.append(
6305 "osm_kdu_id": kdu_name
,
6306 "member-vnf-index": vnf_index
,
6308 "kdu_index": instance_num
+ x
- 1,
6311 scaling_info
["kdu-create"][kdu_name
].append(
6313 "member-vnf-index": vnf_index
,
6315 "k8s-cluster-type": k8s_cluster_type
,
6316 "resource-name": resource_name
,
6317 "scale": kdu_replica_count
,
6320 elif scaling_type
== "SCALE_IN":
6321 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6323 scaling_info
["scaling_direction"] = "IN"
6324 scaling_info
["vdu-delete"] = {}
6325 scaling_info
["kdu-delete"] = {}
6327 for delta
in deltas
:
6328 for vdu_delta
in delta
.get("vdu-delta", {}):
6329 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6330 min_instance_count
= 0
6331 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6332 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
6333 min_instance_count
= vdu_profile
["min-number-of-instances"]
6335 default_instance_num
= get_number_of_instances(
6336 db_vnfd
, vdu_delta
["id"]
6338 instance_num
= vdu_delta
.get("number-of-instances", 1)
6339 nb_scale_op
-= instance_num
6341 new_instance_count
= nb_scale_op
+ default_instance_num
6343 if new_instance_count
< min_instance_count
< vdu_count
:
6344 instances_number
= min_instance_count
- new_instance_count
6346 instances_number
= instance_num
6348 if new_instance_count
< min_instance_count
:
6350 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6351 "scaling-group-descriptor '{}'".format(
6352 nb_scale_op
, scaling_group
6355 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6356 vca_scaling_info
.append(
6358 "osm_vdu_id": vdu_delta
["id"],
6359 "member-vnf-index": vnf_index
,
6361 "vdu_index": vdu_index
- 1 - x
,
6364 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
6365 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6366 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6367 kdu_name
= kdu_profile
["kdu-name"]
6368 resource_name
= kdu_profile
.get("resource-name", "")
6370 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
6371 scaling_info
["kdu-delete"][kdu_name
] = []
6373 kdur
= get_kdur(db_vnfr
, kdu_name
)
6374 if kdur
.get("helm-chart"):
6375 k8s_cluster_type
= "helm-chart-v3"
6376 self
.logger
.debug("kdur: {}".format(kdur
))
6378 kdur
.get("helm-version")
6379 and kdur
.get("helm-version") == "v2"
6381 k8s_cluster_type
= "helm-chart"
6382 elif kdur
.get("juju-bundle"):
6383 k8s_cluster_type
= "juju-bundle"
6386 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6387 "juju-bundle. Maybe an old NBI version is running".format(
6388 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
6392 min_instance_count
= 0
6393 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
6394 min_instance_count
= kdu_profile
["min-number-of-instances"]
6396 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
6397 deployed_kdu
, _
= get_deployed_kdu(
6398 nsr_deployed
, kdu_name
, vnf_index
6400 if deployed_kdu
is None:
6402 "KDU '{}' for vnf '{}' not deployed".format(
6406 kdu_instance
= deployed_kdu
.get("kdu-instance")
6407 instance_num
= await self
.k8scluster_map
[
6413 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6414 kdu_model
=deployed_kdu
.get("kdu-model"),
6416 kdu_replica_count
= instance_num
- kdu_delta
.get(
6417 "number-of-instances", 1
6420 if kdu_replica_count
< min_instance_count
< instance_num
:
6421 kdu_replica_count
= min_instance_count
6422 if kdu_replica_count
< min_instance_count
:
6424 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6425 "scaling-group-descriptor '{}'".format(
6426 instance_num
, scaling_group
6430 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6431 vca_scaling_info
.append(
6433 "osm_kdu_id": kdu_name
,
6434 "member-vnf-index": vnf_index
,
6436 "kdu_index": instance_num
- x
- 1,
6439 scaling_info
["kdu-delete"][kdu_name
].append(
6441 "member-vnf-index": vnf_index
,
6443 "k8s-cluster-type": k8s_cluster_type
,
6444 "resource-name": resource_name
,
6445 "scale": kdu_replica_count
,
6449 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
6450 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
6451 if scaling_info
["scaling_direction"] == "IN":
6452 for vdur
in reversed(db_vnfr
["vdur"]):
6453 if vdu_delete
.get(vdur
["vdu-id-ref"]):
6454 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
6455 scaling_info
["vdu"].append(
6457 "name": vdur
.get("name") or vdur
.get("vdu-name"),
6458 "vdu_id": vdur
["vdu-id-ref"],
6462 for interface
in vdur
["interfaces"]:
6463 scaling_info
["vdu"][-1]["interface"].append(
6465 "name": interface
["name"],
6466 "ip_address": interface
["ip-address"],
6467 "mac_address": interface
.get("mac-address"),
6470 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
6473 step
= "Executing pre-scale vnf-config-primitive"
6474 if scaling_descriptor
.get("scaling-config-action"):
6475 for scaling_config_action
in scaling_descriptor
[
6476 "scaling-config-action"
6479 scaling_config_action
.get("trigger") == "pre-scale-in"
6480 and scaling_type
== "SCALE_IN"
6482 scaling_config_action
.get("trigger") == "pre-scale-out"
6483 and scaling_type
== "SCALE_OUT"
6485 vnf_config_primitive
= scaling_config_action
[
6486 "vnf-config-primitive-name-ref"
6488 step
= db_nslcmop_update
[
6490 ] = "executing pre-scale scaling-config-action '{}'".format(
6491 vnf_config_primitive
6494 # look for primitive
6495 for config_primitive
in (
6496 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6497 ).get("config-primitive", ()):
6498 if config_primitive
["name"] == vnf_config_primitive
:
6502 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
6503 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
6504 "primitive".format(scaling_group
, vnf_config_primitive
)
6507 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6508 if db_vnfr
.get("additionalParamsForVnf"):
6509 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6511 scale_process
= "VCA"
6512 db_nsr_update
["config-status"] = "configuring pre-scaling"
6513 primitive_params
= self
._map
_primitive
_params
(
6514 config_primitive
, {}, vnfr_params
6517 # Pre-scale retry check: Check if this sub-operation has been executed before
6518 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6521 vnf_config_primitive
,
6525 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6526 # Skip sub-operation
6527 result
= "COMPLETED"
6528 result_detail
= "Done"
6531 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6532 vnf_config_primitive
, result
, result_detail
6536 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6537 # New sub-operation: Get index of this sub-operation
6539 len(db_nslcmop
.get("_admin", {}).get("operations"))
6544 + "vnf_config_primitive={} New sub-operation".format(
6545 vnf_config_primitive
6549 # retry: Get registered params for this existing sub-operation
6550 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6553 vnf_index
= op
.get("member_vnf_index")
6554 vnf_config_primitive
= op
.get("primitive")
6555 primitive_params
= op
.get("primitive_params")
6558 + "vnf_config_primitive={} Sub-operation retry".format(
6559 vnf_config_primitive
6562 # Execute the primitive, either with new (first-time) or registered (reintent) args
6563 ee_descriptor_id
= config_primitive
.get(
6564 "execution-environment-ref"
6566 primitive_name
= config_primitive
.get(
6567 "execution-environment-primitive", vnf_config_primitive
6569 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6570 nsr_deployed
["VCA"],
6571 member_vnf_index
=vnf_index
,
6573 vdu_count_index
=None,
6574 ee_descriptor_id
=ee_descriptor_id
,
6576 result
, result_detail
= await self
._ns
_execute
_primitive
(
6585 + "vnf_config_primitive={} Done with result {} {}".format(
6586 vnf_config_primitive
, result
, result_detail
6589 # Update operationState = COMPLETED | FAILED
6590 self
._update
_suboperation
_status
(
6591 db_nslcmop
, op_index
, result
, result_detail
6594 if result
== "FAILED":
6595 raise LcmException(result_detail
)
6596 db_nsr_update
["config-status"] = old_config_status
6597 scale_process
= None
6601 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
6604 "_admin.scaling-group.{}.time".format(admin_scale_index
)
6607 # SCALE-IN VCA - BEGIN
6608 if vca_scaling_info
:
6609 step
= db_nslcmop_update
[
6611 ] = "Deleting the execution environments"
6612 scale_process
= "VCA"
6613 for vca_info
in vca_scaling_info
:
6614 if vca_info
["type"] == "delete" and not vca_info
.get("osm_kdu_id"):
6615 member_vnf_index
= str(vca_info
["member-vnf-index"])
6617 logging_text
+ "vdu info: {}".format(vca_info
)
6619 if vca_info
.get("osm_vdu_id"):
6620 vdu_id
= vca_info
["osm_vdu_id"]
6621 vdu_index
= int(vca_info
["vdu_index"])
6624 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6625 member_vnf_index
, vdu_id
, vdu_index
6627 stage
[2] = step
= "Scaling in VCA"
6628 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6629 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
6630 config_update
= db_nsr
["configurationStatus"]
6631 for vca_index
, vca
in enumerate(vca_update
):
6633 (vca
or vca
.get("ee_id"))
6634 and vca
["member-vnf-index"] == member_vnf_index
6635 and vca
["vdu_count_index"] == vdu_index
6637 if vca
.get("vdu_id"):
6638 config_descriptor
= get_configuration(
6639 db_vnfd
, vca
.get("vdu_id")
6641 elif vca
.get("kdu_name"):
6642 config_descriptor
= get_configuration(
6643 db_vnfd
, vca
.get("kdu_name")
6646 config_descriptor
= get_configuration(
6647 db_vnfd
, db_vnfd
["id"]
6649 operation_params
= (
6650 db_nslcmop
.get("operationParams") or {}
6652 exec_terminate_primitives
= not operation_params
.get(
6653 "skip_terminate_primitives"
6654 ) and vca
.get("needed_terminate")
6655 task
= asyncio
.ensure_future(
6664 exec_primitives
=exec_terminate_primitives
,
6668 timeout
=self
.timeout_charm_delete
,
6671 tasks_dict_info
[task
] = "Terminating VCA {}".format(
6674 del vca_update
[vca_index
]
6675 del config_update
[vca_index
]
6676 # wait for pending tasks of terminate primitives
6680 + "Waiting for tasks {}".format(
6681 list(tasks_dict_info
.keys())
6684 error_list
= await self
._wait
_for
_tasks
(
6688 self
.timeout_charm_delete
, self
.timeout_ns_terminate
6693 tasks_dict_info
.clear()
6695 raise LcmException("; ".join(error_list
))
6697 db_vca_and_config_update
= {
6698 "_admin.deployed.VCA": vca_update
,
6699 "configurationStatus": config_update
,
6702 "nsrs", db_nsr
["_id"], db_vca_and_config_update
6704 scale_process
= None
6705 # SCALE-IN VCA - END
6708 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
6709 scale_process
= "RO"
6710 if self
.ro_config
.get("ng"):
6711 await self
._scale
_ng
_ro
(
6712 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
6714 scaling_info
.pop("vdu-create", None)
6715 scaling_info
.pop("vdu-delete", None)
6717 scale_process
= None
6721 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
6722 scale_process
= "KDU"
6723 await self
._scale
_kdu
(
6724 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6726 scaling_info
.pop("kdu-create", None)
6727 scaling_info
.pop("kdu-delete", None)
6729 scale_process
= None
6733 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6735 # SCALE-UP VCA - BEGIN
6736 if vca_scaling_info
:
6737 step
= db_nslcmop_update
[
6739 ] = "Creating new execution environments"
6740 scale_process
= "VCA"
6741 for vca_info
in vca_scaling_info
:
6742 if vca_info
["type"] == "create" and not vca_info
.get("osm_kdu_id"):
6743 member_vnf_index
= str(vca_info
["member-vnf-index"])
6745 logging_text
+ "vdu info: {}".format(vca_info
)
6747 vnfd_id
= db_vnfr
["vnfd-ref"]
6748 if vca_info
.get("osm_vdu_id"):
6749 vdu_index
= int(vca_info
["vdu_index"])
6750 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
6751 if db_vnfr
.get("additionalParamsForVnf"):
6752 deploy_params
.update(
6754 db_vnfr
["additionalParamsForVnf"].copy()
6757 descriptor_config
= get_configuration(
6758 db_vnfd
, db_vnfd
["id"]
6760 if descriptor_config
:
6765 logging_text
=logging_text
6766 + "member_vnf_index={} ".format(member_vnf_index
),
6769 nslcmop_id
=nslcmop_id
,
6775 member_vnf_index
=member_vnf_index
,
6776 vdu_index
=vdu_index
,
6778 deploy_params
=deploy_params
,
6779 descriptor_config
=descriptor_config
,
6780 base_folder
=base_folder
,
6781 task_instantiation_info
=tasks_dict_info
,
6784 vdu_id
= vca_info
["osm_vdu_id"]
6785 vdur
= find_in_list(
6786 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
6788 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
6789 if vdur
.get("additionalParams"):
6790 deploy_params_vdu
= parse_yaml_strings(
6791 vdur
["additionalParams"]
6794 deploy_params_vdu
= deploy_params
6795 deploy_params_vdu
["OSM"] = get_osm_params(
6796 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
6798 if descriptor_config
:
6803 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6804 member_vnf_index
, vdu_id
, vdu_index
6806 stage
[2] = step
= "Scaling out VCA"
6807 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6809 logging_text
=logging_text
6810 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6811 member_vnf_index
, vdu_id
, vdu_index
6815 nslcmop_id
=nslcmop_id
,
6821 member_vnf_index
=member_vnf_index
,
6822 vdu_index
=vdu_index
,
6824 deploy_params
=deploy_params_vdu
,
6825 descriptor_config
=descriptor_config
,
6826 base_folder
=base_folder
,
6827 task_instantiation_info
=tasks_dict_info
,
6830 # SCALE-UP VCA - END
6831 scale_process
= None
6834 # execute primitive service POST-SCALING
6835 step
= "Executing post-scale vnf-config-primitive"
6836 if scaling_descriptor
.get("scaling-config-action"):
6837 for scaling_config_action
in scaling_descriptor
[
6838 "scaling-config-action"
6841 scaling_config_action
.get("trigger") == "post-scale-in"
6842 and scaling_type
== "SCALE_IN"
6844 scaling_config_action
.get("trigger") == "post-scale-out"
6845 and scaling_type
== "SCALE_OUT"
6847 vnf_config_primitive
= scaling_config_action
[
6848 "vnf-config-primitive-name-ref"
6850 step
= db_nslcmop_update
[
6852 ] = "executing post-scale scaling-config-action '{}'".format(
6853 vnf_config_primitive
6856 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6857 if db_vnfr
.get("additionalParamsForVnf"):
6858 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6860 # look for primitive
6861 for config_primitive
in (
6862 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6863 ).get("config-primitive", ()):
6864 if config_primitive
["name"] == vnf_config_primitive
:
6868 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
6869 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
6870 "config-primitive".format(
6871 scaling_group
, vnf_config_primitive
6874 scale_process
= "VCA"
6875 db_nsr_update
["config-status"] = "configuring post-scaling"
6876 primitive_params
= self
._map
_primitive
_params
(
6877 config_primitive
, {}, vnfr_params
6880 # Post-scale retry check: Check if this sub-operation has been executed before
6881 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6884 vnf_config_primitive
,
6888 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6889 # Skip sub-operation
6890 result
= "COMPLETED"
6891 result_detail
= "Done"
6894 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6895 vnf_config_primitive
, result
, result_detail
6899 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6900 # New sub-operation: Get index of this sub-operation
6902 len(db_nslcmop
.get("_admin", {}).get("operations"))
6907 + "vnf_config_primitive={} New sub-operation".format(
6908 vnf_config_primitive
6912 # retry: Get registered params for this existing sub-operation
6913 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6916 vnf_index
= op
.get("member_vnf_index")
6917 vnf_config_primitive
= op
.get("primitive")
6918 primitive_params
= op
.get("primitive_params")
6921 + "vnf_config_primitive={} Sub-operation retry".format(
6922 vnf_config_primitive
6925 # Execute the primitive, either with new (first-time) or registered (reintent) args
6926 ee_descriptor_id
= config_primitive
.get(
6927 "execution-environment-ref"
6929 primitive_name
= config_primitive
.get(
6930 "execution-environment-primitive", vnf_config_primitive
6932 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6933 nsr_deployed
["VCA"],
6934 member_vnf_index
=vnf_index
,
6936 vdu_count_index
=None,
6937 ee_descriptor_id
=ee_descriptor_id
,
6939 result
, result_detail
= await self
._ns
_execute
_primitive
(
6948 + "vnf_config_primitive={} Done with result {} {}".format(
6949 vnf_config_primitive
, result
, result_detail
6952 # Update operationState = COMPLETED | FAILED
6953 self
._update
_suboperation
_status
(
6954 db_nslcmop
, op_index
, result
, result_detail
6957 if result
== "FAILED":
6958 raise LcmException(result_detail
)
6959 db_nsr_update
["config-status"] = old_config_status
6960 scale_process
= None
6965 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
6966 db_nsr_update
["operational-status"] = (
6968 if old_operational_status
== "failed"
6969 else old_operational_status
6971 db_nsr_update
["config-status"] = old_config_status
6974 ROclient
.ROClientException
,
6979 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
6981 except asyncio
.CancelledError
:
6983 logging_text
+ "Cancelled Exception while '{}'".format(step
)
6985 exc
= "Operation was cancelled"
6986 except Exception as e
:
6987 exc
= traceback
.format_exc()
6988 self
.logger
.critical(
6989 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6993 self
._write
_ns
_status
(
6996 current_operation
="IDLE",
6997 current_operation_id
=None,
7000 stage
[1] = "Waiting for instantiate pending tasks."
7001 self
.logger
.debug(logging_text
+ stage
[1])
7002 exc
= await self
._wait
_for
_tasks
(
7005 self
.timeout_ns_deploy
,
7013 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7014 nslcmop_operation_state
= "FAILED"
7016 db_nsr_update
["operational-status"] = old_operational_status
7017 db_nsr_update
["config-status"] = old_config_status
7018 db_nsr_update
["detailed-status"] = ""
7020 if "VCA" in scale_process
:
7021 db_nsr_update
["config-status"] = "failed"
7022 if "RO" in scale_process
:
7023 db_nsr_update
["operational-status"] = "failed"
7026 ] = "FAILED scaling nslcmop={} {}: {}".format(
7027 nslcmop_id
, step
, exc
7030 error_description_nslcmop
= None
7031 nslcmop_operation_state
= "COMPLETED"
7032 db_nslcmop_update
["detailed-status"] = "Done"
7034 self
._write
_op
_status
(
7037 error_message
=error_description_nslcmop
,
7038 operation_state
=nslcmop_operation_state
,
7039 other_update
=db_nslcmop_update
,
7042 self
._write
_ns
_status
(
7045 current_operation
="IDLE",
7046 current_operation_id
=None,
7047 other_update
=db_nsr_update
,
7050 if nslcmop_operation_state
:
7054 "nslcmop_id": nslcmop_id
,
7055 "operationState": nslcmop_operation_state
,
7057 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
7058 except Exception as e
:
7060 logging_text
+ "kafka_write notification Exception {}".format(e
)
7062 self
.logger
.debug(logging_text
+ "Exit")
7063 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
7065 async def _scale_kdu(
7066 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
7068 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
7069 for kdu_name
in _scaling_info
:
7070 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
7071 deployed_kdu
, index
= get_deployed_kdu(
7072 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
7074 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
7075 kdu_instance
= deployed_kdu
["kdu-instance"]
7076 kdu_model
= deployed_kdu
.get("kdu-model")
7077 scale
= int(kdu_scaling_info
["scale"])
7078 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
7081 "collection": "nsrs",
7082 "filter": {"_id": nsr_id
},
7083 "path": "_admin.deployed.K8s.{}".format(index
),
7086 step
= "scaling application {}".format(
7087 kdu_scaling_info
["resource-name"]
7089 self
.logger
.debug(logging_text
+ step
)
7091 if kdu_scaling_info
["type"] == "delete":
7092 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7095 and kdu_config
.get("terminate-config-primitive")
7096 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7098 terminate_config_primitive_list
= kdu_config
.get(
7099 "terminate-config-primitive"
7101 terminate_config_primitive_list
.sort(
7102 key
=lambda val
: int(val
["seq"])
7106 terminate_config_primitive
7107 ) in terminate_config_primitive_list
:
7108 primitive_params_
= self
._map
_primitive
_params
(
7109 terminate_config_primitive
, {}, {}
7111 step
= "execute terminate config primitive"
7112 self
.logger
.debug(logging_text
+ step
)
7113 await asyncio
.wait_for(
7114 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7115 cluster_uuid
=cluster_uuid
,
7116 kdu_instance
=kdu_instance
,
7117 primitive_name
=terminate_config_primitive
["name"],
7118 params
=primitive_params_
,
7125 await asyncio
.wait_for(
7126 self
.k8scluster_map
[k8s_cluster_type
].scale(
7129 kdu_scaling_info
["resource-name"],
7131 cluster_uuid
=cluster_uuid
,
7132 kdu_model
=kdu_model
,
7136 timeout
=self
.timeout_vca_on_error
,
7139 if kdu_scaling_info
["type"] == "create":
7140 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7143 and kdu_config
.get("initial-config-primitive")
7144 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7146 initial_config_primitive_list
= kdu_config
.get(
7147 "initial-config-primitive"
7149 initial_config_primitive_list
.sort(
7150 key
=lambda val
: int(val
["seq"])
7153 for initial_config_primitive
in initial_config_primitive_list
:
7154 primitive_params_
= self
._map
_primitive
_params
(
7155 initial_config_primitive
, {}, {}
7157 step
= "execute initial config primitive"
7158 self
.logger
.debug(logging_text
+ step
)
7159 await asyncio
.wait_for(
7160 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7161 cluster_uuid
=cluster_uuid
,
7162 kdu_instance
=kdu_instance
,
7163 primitive_name
=initial_config_primitive
["name"],
7164 params
=primitive_params_
,
7171 async def _scale_ng_ro(
7172 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
7174 nsr_id
= db_nslcmop
["nsInstanceId"]
7175 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7178 # read from db: vnfd's for every vnf
7181 # for each vnf in ns, read vnfd
7182 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
7183 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
7184 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
7185 # if we haven't this vnfd, read it from db
7186 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
7188 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7189 db_vnfds
.append(vnfd
)
7190 n2vc_key
= self
.n2vc
.get_public_key()
7191 n2vc_key_list
= [n2vc_key
]
7194 vdu_scaling_info
.get("vdu-create"),
7195 vdu_scaling_info
.get("vdu-delete"),
7198 # db_vnfr has been updated, update db_vnfrs to use it
7199 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
7200 await self
._instantiate
_ng
_ro
(
7210 start_deploy
=time(),
7211 timeout_ns_deploy
=self
.timeout_ns_deploy
,
7213 if vdu_scaling_info
.get("vdu-delete"):
7215 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
7218 async def extract_prometheus_scrape_jobs(
7219 self
, ee_id
, artifact_path
, ee_config_descriptor
, vnfr_id
, nsr_id
, target_ip
7221 # look if exist a file called 'prometheus*.j2' and
7222 artifact_content
= self
.fs
.dir_ls(artifact_path
)
7226 for f
in artifact_content
7227 if f
.startswith("prometheus") and f
.endswith(".j2")
7233 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
7237 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
7238 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
7240 vnfr_id
= vnfr_id
.replace("-", "")
7242 "JOB_NAME": vnfr_id
,
7243 "TARGET_IP": target_ip
,
7244 "EXPORTER_POD_IP": host_name
,
7245 "EXPORTER_POD_PORT": host_port
,
7247 job_list
= parse_job(job_data
, variables
)
7248 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
7249 for job
in job_list
:
7251 not isinstance(job
.get("job_name"), str)
7252 or vnfr_id
not in job
["job_name"]
7254 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
7255 job
["nsr_id"] = nsr_id
7256 job
["vnfr_id"] = vnfr_id
7259 async def rebuild_start_stop(self
, nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
):
7260 logging_text
= "Task ns={} {}={} ".format(nsr_id
, operation_type
, nslcmop_id
)
7261 self
.logger
.info(logging_text
+ "Enter")
7262 stage
= ["Preparing the environment", ""]
7263 # database nsrs record
7267 # in case of error, indicates what part of scale was failed to put nsr at error status
7268 start_deploy
= time()
7270 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_id
})
7271 vim_account_id
= db_vnfr
.get("vim-account-id")
7272 vim_info_key
= "vim:" + vim_account_id
7273 vdu_id
= additional_param
["vdu_id"]
7274 vdurs
= [item
for item
in db_vnfr
["vdur"] if item
["vdu-id-ref"] == vdu_id
]
7275 vdur
= find_in_list(
7276 vdurs
, lambda vdu
: vdu
["count-index"] == additional_param
["count-index"]
7279 vdu_vim_name
= vdur
["name"]
7280 vim_vm_id
= vdur
["vim_info"][vim_info_key
]["vim_id"]
7281 target_vim
, _
= next(k_v
for k_v
in vdur
["vim_info"].items())
7283 raise LcmException("Target vdu is not found")
7284 self
.logger
.info("vdu_vim_name >> {} ".format(vdu_vim_name
))
7285 # wait for any previous tasks in process
7286 stage
[1] = "Waiting for previous operations to terminate"
7287 self
.logger
.info(stage
[1])
7288 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
7290 stage
[1] = "Reading from database."
7291 self
.logger
.info(stage
[1])
7292 self
._write
_ns
_status
(
7295 current_operation
=operation_type
.upper(),
7296 current_operation_id
=nslcmop_id
7298 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7301 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
7302 db_nsr_update
["operational-status"] = operation_type
7303 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7307 "vim_vm_id": vim_vm_id
,
7309 "vdu_index": additional_param
["count-index"],
7310 "vdu_id": vdur
["id"],
7311 "target_vim": target_vim
,
7312 "vim_account_id": vim_account_id
7315 stage
[1] = "Sending rebuild request to RO... {}".format(desc
)
7316 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7317 self
.logger
.info("ro nsr id: {}".format(nsr_id
))
7318 result_dict
= await self
.RO
.operate(nsr_id
, desc
, operation_type
)
7319 self
.logger
.info("response from RO: {}".format(result_dict
))
7320 action_id
= result_dict
["action_id"]
7321 await self
._wait
_ng
_ro
(
7322 nsr_id
, action_id
, nslcmop_id
, start_deploy
,
7323 self
.timeout_operate
, None, "start_stop_rebuild",
7325 return "COMPLETED", "Done"
7326 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7327 self
.logger
.error("Exit Exception {}".format(e
))
7329 except asyncio
.CancelledError
:
7330 self
.logger
.error("Cancelled Exception while '{}'".format(stage
))
7331 exc
= "Operation was cancelled"
7332 except Exception as e
:
7333 exc
= traceback
.format_exc()
7334 self
.logger
.critical("Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
7335 return "FAILED", "Error in operate VNF {}".format(exc
)
7337 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7339 Get VCA Cloud and VCA Cloud Credentials for the VIM account
7341 :param: vim_account_id: VIM Account ID
7343 :return: (cloud_name, cloud_credential)
7345 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7346 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
7348 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7350 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
7352 :param: vim_account_id: VIM Account ID
7354 :return: (cloud_name, cloud_credential)
7356 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7357 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")
7359 async def migrate(self
, nsr_id
, nslcmop_id
):
7361 Migrate VNFs and VDUs instances in a NS
7363 :param: nsr_id: NS Instance ID
7364 :param: nslcmop_id: nslcmop ID of migrate
7367 # Try to lock HA task here
7368 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7369 if not task_is_locked_by_me
:
7371 logging_text
= "Task ns={} migrate ".format(nsr_id
)
7372 self
.logger
.debug(logging_text
+ "Enter")
7373 # get all needed from database
7375 db_nslcmop_update
= {}
7376 nslcmop_operation_state
= None
7380 # in case of error, indicates what part of scale was failed to put nsr at error status
7381 start_deploy
= time()
7384 # wait for any previous tasks in process
7385 step
= "Waiting for previous operations to terminate"
7386 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7388 self
._write
_ns
_status
(
7391 current_operation
="MIGRATING",
7392 current_operation_id
=nslcmop_id
,
7394 step
= "Getting nslcmop from database"
7396 step
+ " after having waited for previous tasks to be completed"
7398 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7399 migrate_params
= db_nslcmop
.get("operationParams")
7402 target
.update(migrate_params
)
7403 desc
= await self
.RO
.migrate(nsr_id
, target
)
7404 self
.logger
.debug("RO return > {}".format(desc
))
7405 action_id
= desc
["action_id"]
7406 await self
._wait
_ng
_ro
(
7407 nsr_id
, action_id
, nslcmop_id
, start_deploy
, self
.timeout_migrate
,
7410 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7411 self
.logger
.error("Exit Exception {}".format(e
))
7413 except asyncio
.CancelledError
:
7414 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
7415 exc
= "Operation was cancelled"
7416 except Exception as e
:
7417 exc
= traceback
.format_exc()
7418 self
.logger
.critical(
7419 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7422 self
._write
_ns
_status
(
7425 current_operation
="IDLE",
7426 current_operation_id
=None,
7429 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
7430 nslcmop_operation_state
= "FAILED"
7432 nslcmop_operation_state
= "COMPLETED"
7433 db_nslcmop_update
["detailed-status"] = "Done"
7434 db_nsr_update
["detailed-status"] = "Done"
7436 self
._write
_op
_status
(
7440 operation_state
=nslcmop_operation_state
,
7441 other_update
=db_nslcmop_update
,
7443 if nslcmop_operation_state
:
7447 "nslcmop_id": nslcmop_id
,
7448 "operationState": nslcmop_operation_state
,
7450 await self
.msg
.aiowrite("ns", "migrated", msg
, loop
=self
.loop
)
7451 except Exception as e
:
7453 logging_text
+ "kafka_write notification Exception {}".format(e
)
7455 self
.logger
.debug(logging_text
+ "Exit")
7456 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_migrate")
7459 async def heal(self
, nsr_id
, nslcmop_id
):
7463 :param nsr_id: ns instance to heal
7464 :param nslcmop_id: operation to run
7468 # Try to lock HA task here
7469 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7470 if not task_is_locked_by_me
:
7473 logging_text
= "Task ns={} heal={} ".format(nsr_id
, nslcmop_id
)
7474 stage
= ["", "", ""]
7475 tasks_dict_info
= {}
7476 # ^ stage, step, VIM progress
7477 self
.logger
.debug(logging_text
+ "Enter")
7478 # get all needed from database
7480 db_nslcmop_update
= {}
7482 db_vnfrs
= {} # vnf's info indexed by _id
7484 old_operational_status
= ""
7485 old_config_status
= ""
7488 # wait for any previous tasks in process
7489 step
= "Waiting for previous operations to terminate"
7490 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7491 self
._write
_ns
_status
(
7494 current_operation
="HEALING",
7495 current_operation_id
=nslcmop_id
,
7498 step
= "Getting nslcmop from database"
7500 step
+ " after having waited for previous tasks to be completed"
7502 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7504 step
= "Getting nsr from database"
7505 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
7506 old_operational_status
= db_nsr
["operational-status"]
7507 old_config_status
= db_nsr
["config-status"]
7510 "_admin.deployed.RO.operational-status": "healing",
7512 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7514 step
= "Sending heal order to VIM"
7515 task_ro
= asyncio
.ensure_future(
7517 logging_text
=logging_text
,
7519 db_nslcmop
=db_nslcmop
,
7523 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "heal_RO", task_ro
)
7524 tasks_dict_info
[task_ro
] = "Healing at VIM"
7528 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
7529 self
.logger
.debug(logging_text
+ stage
[1])
7530 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7531 self
.fs
.sync(db_nsr
["nsd-id"])
7533 # read from db: vnfr's of this ns
7534 step
= "Getting vnfrs from db"
7535 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
7536 for vnfr
in db_vnfrs_list
:
7537 db_vnfrs
[vnfr
["_id"]] = vnfr
7538 self
.logger
.debug("ns.heal db_vnfrs={}".format(db_vnfrs
))
7540 # Check for each target VNF
7541 target_list
= db_nslcmop
.get("operationParams", {}).get("healVnfData", {})
7542 for target_vnf
in target_list
:
7543 # Find this VNF in the list from DB
7544 vnfr_id
= target_vnf
.get("vnfInstanceId", None)
7546 db_vnfr
= db_vnfrs
[vnfr_id
]
7547 vnfd_id
= db_vnfr
.get("vnfd-id")
7548 vnfd_ref
= db_vnfr
.get("vnfd-ref")
7549 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7550 base_folder
= vnfd
["_admin"]["storage"]
7555 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
7556 member_vnf_index
= db_vnfr
.get("member-vnf-index-ref")
7558 # Check each target VDU and deploy N2VC
7559 for target_vdu
in target_vnf
["additionalParams"].get("vdu", None):
7560 deploy_params_vdu
= target_vdu
7561 # Set run-day1 vnf level value if not vdu level value exists
7562 if not deploy_params_vdu
.get("run-day1") and target_vnf
["additionalParams"].get("run-day1"):
7563 deploy_params_vdu
["run-day1"] = target_vnf
["additionalParams"].get("run-day1")
7564 vdu_name
= target_vdu
.get("vdu-id", None)
7565 # TODO: Get vdu_id from vdud.
7567 # For multi instance VDU count-index is mandatory
7568 # For single session VDU count-indes is 0
7569 vdu_index
= target_vdu
.get("count-index",0)
7571 # n2vc_redesign STEP 3 to 6 Deploy N2VC
7572 stage
[1] = "Deploying Execution Environments."
7573 self
.logger
.debug(logging_text
+ stage
[1])
7575 # VNF Level charm. Normal case when proxy charms.
7576 # If target instance is management machine continue with actions: recreate EE for native charms or reinject juju key for proxy charms.
7577 descriptor_config
= get_configuration(vnfd
, vnfd_ref
)
7578 if descriptor_config
:
7579 # Continue if healed machine is management machine
7580 vnf_ip_address
= db_vnfr
.get("ip-address")
7581 target_instance
= None
7582 for instance
in db_vnfr
.get("vdur", None):
7583 if ( instance
["vdu-name"] == vdu_name
and instance
["count-index"] == vdu_index
):
7584 target_instance
= instance
7586 if vnf_ip_address
== target_instance
.get("ip-address"):
7588 logging_text
=logging_text
7589 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7590 member_vnf_index
, vdu_name
, vdu_index
7594 nslcmop_id
=nslcmop_id
,
7600 member_vnf_index
=member_vnf_index
,
7603 deploy_params
=deploy_params_vdu
,
7604 descriptor_config
=descriptor_config
,
7605 base_folder
=base_folder
,
7606 task_instantiation_info
=tasks_dict_info
,
7610 # VDU Level charm. Normal case with native charms.
7611 descriptor_config
= get_configuration(vnfd
, vdu_name
)
7612 if descriptor_config
:
7614 logging_text
=logging_text
7615 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7616 member_vnf_index
, vdu_name
, vdu_index
7620 nslcmop_id
=nslcmop_id
,
7626 member_vnf_index
=member_vnf_index
,
7627 vdu_index
=vdu_index
,
7629 deploy_params
=deploy_params_vdu
,
7630 descriptor_config
=descriptor_config
,
7631 base_folder
=base_folder
,
7632 task_instantiation_info
=tasks_dict_info
,
7637 ROclient
.ROClientException
,
7642 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7644 except asyncio
.CancelledError
:
7646 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7648 exc
= "Operation was cancelled"
7649 except Exception as e
:
7650 exc
= traceback
.format_exc()
7651 self
.logger
.critical(
7652 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7657 stage
[1] = "Waiting for healing pending tasks."
7658 self
.logger
.debug(logging_text
+ stage
[1])
7659 exc
= await self
._wait
_for
_tasks
(
7662 self
.timeout_ns_deploy
,
7670 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7671 nslcmop_operation_state
= "FAILED"
7673 db_nsr_update
["operational-status"] = old_operational_status
7674 db_nsr_update
["config-status"] = old_config_status
7677 ] = "FAILED healing nslcmop={} {}: {}".format(
7678 nslcmop_id
, step
, exc
7680 for task
, task_name
in tasks_dict_info
.items():
7681 if not task
.done() or task
.cancelled() or task
.exception():
7682 if task_name
.startswith(self
.task_name_deploy_vca
):
7683 # A N2VC task is pending
7684 db_nsr_update
["config-status"] = "failed"
7686 # RO task is pending
7687 db_nsr_update
["operational-status"] = "failed"
7689 error_description_nslcmop
= None
7690 nslcmop_operation_state
= "COMPLETED"
7691 db_nslcmop_update
["detailed-status"] = "Done"
7692 db_nsr_update
["detailed-status"] = "Done"
7693 db_nsr_update
["operational-status"] = "running"
7694 db_nsr_update
["config-status"] = "configured"
7696 self
._write
_op
_status
(
7699 error_message
=error_description_nslcmop
,
7700 operation_state
=nslcmop_operation_state
,
7701 other_update
=db_nslcmop_update
,
7704 self
._write
_ns
_status
(
7707 current_operation
="IDLE",
7708 current_operation_id
=None,
7709 other_update
=db_nsr_update
,
7712 if nslcmop_operation_state
:
7716 "nslcmop_id": nslcmop_id
,
7717 "operationState": nslcmop_operation_state
,
7719 await self
.msg
.aiowrite("ns", "healed", msg
, loop
=self
.loop
)
7720 except Exception as e
:
7722 logging_text
+ "kafka_write notification Exception {}".format(e
)
7724 self
.logger
.debug(logging_text
+ "Exit")
7725 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_heal")
7736 :param logging_text: preffix text to use at logging
7737 :param nsr_id: nsr identity
7738 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
7739 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
7740 :return: None or exception
7742 def get_vim_account(vim_account_id
):
7744 if vim_account_id
in db_vims
:
7745 return db_vims
[vim_account_id
]
7746 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
7747 db_vims
[vim_account_id
] = db_vim
7752 ns_params
= db_nslcmop
.get("operationParams")
7753 if ns_params
and ns_params
.get("timeout_ns_heal"):
7754 timeout_ns_heal
= ns_params
["timeout_ns_heal"]
7756 timeout_ns_heal
= self
.timeout
.get(
7757 "ns_heal", self
.timeout_ns_heal
7762 nslcmop_id
= db_nslcmop
["_id"]
7764 "action_id": nslcmop_id
,
7766 self
.logger
.warning("db_nslcmop={} and timeout_ns_heal={}".format(db_nslcmop
,timeout_ns_heal
))
7767 target
.update(db_nslcmop
.get("operationParams", {}))
7769 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
7770 desc
= await self
.RO
.recreate(nsr_id
, target
)
7771 self
.logger
.debug("RO return > {}".format(desc
))
7772 action_id
= desc
["action_id"]
7773 # waits for RO to complete because Reinjecting juju key at ro can find VM in state Deleted
7774 await self
._wait
_ng
_ro
(
7775 nsr_id
, action_id
, nslcmop_id
, start_heal
, timeout_ns_heal
, stage
,
7781 "_admin.deployed.RO.operational-status": "running",
7782 "detailed-status": " ".join(stage
),
7784 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7785 self
._write
_op
_status
(nslcmop_id
, stage
)
7787 logging_text
+ "ns healed at RO. RO_id={}".format(action_id
)
7790 except Exception as e
:
7791 stage
[2] = "ERROR healing at VIM"
7792 #self.set_vnfr_at_error(db_vnfrs, str(e))
7794 "Error healing at VIM {}".format(e
),
7795 exc_info
=not isinstance(
7798 ROclient
.ROClientException
,
7824 task_instantiation_info
,
7827 # launch instantiate_N2VC in a asyncio task and register task object
7828 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
7829 # if not found, create one entry and update database
7830 # fill db_nsr._admin.deployed.VCA.<index>
7833 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
7835 if "execution-environment-list" in descriptor_config
:
7836 ee_list
= descriptor_config
.get("execution-environment-list", [])
7837 elif "juju" in descriptor_config
:
7838 ee_list
= [descriptor_config
] # ns charms
7839 else: # other types as script are not supported
7842 for ee_item
in ee_list
:
7845 + "_deploy_n2vc ee_item juju={}, helm={}".format(
7846 ee_item
.get("juju"), ee_item
.get("helm-chart")
7849 ee_descriptor_id
= ee_item
.get("id")
7850 if ee_item
.get("juju"):
7851 vca_name
= ee_item
["juju"].get("charm")
7854 if ee_item
["juju"].get("charm") is not None
7857 if ee_item
["juju"].get("cloud") == "k8s":
7858 vca_type
= "k8s_proxy_charm"
7859 elif ee_item
["juju"].get("proxy") is False:
7860 vca_type
= "native_charm"
7861 elif ee_item
.get("helm-chart"):
7862 vca_name
= ee_item
["helm-chart"]
7863 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
7866 vca_type
= "helm-v3"
7869 logging_text
+ "skipping non juju neither charm configuration"
7874 for vca_index
, vca_deployed
in enumerate(
7875 db_nsr
["_admin"]["deployed"]["VCA"]
7877 if not vca_deployed
:
7880 vca_deployed
.get("member-vnf-index") == member_vnf_index
7881 and vca_deployed
.get("vdu_id") == vdu_id
7882 and vca_deployed
.get("kdu_name") == kdu_name
7883 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
7884 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
7888 # not found, create one.
7890 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
7893 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
7895 target
+= "/kdu/{}".format(kdu_name
)
7897 "target_element": target
,
7898 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
7899 "member-vnf-index": member_vnf_index
,
7901 "kdu_name": kdu_name
,
7902 "vdu_count_index": vdu_index
,
7903 "operational-status": "init", # TODO revise
7904 "detailed-status": "", # TODO revise
7905 "step": "initial-deploy", # TODO revise
7907 "vdu_name": vdu_name
,
7909 "ee_descriptor_id": ee_descriptor_id
,
7913 # create VCA and configurationStatus in db
7915 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
7916 "configurationStatus.{}".format(vca_index
): dict(),
7918 self
.update_db_2("nsrs", nsr_id
, db_dict
)
7920 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
7922 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
7923 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
7924 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
7927 task_n2vc
= asyncio
.ensure_future(
7929 logging_text
=logging_text
,
7930 vca_index
=vca_index
,
7936 vdu_index
=vdu_index
,
7937 deploy_params
=deploy_params
,
7938 config_descriptor
=descriptor_config
,
7939 base_folder
=base_folder
,
7940 nslcmop_id
=nslcmop_id
,
7944 ee_config_descriptor
=ee_item
,
7947 self
.lcm_tasks
.register(
7951 "instantiate_N2VC-{}".format(vca_index
),
7954 task_instantiation_info
[
7956 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
7957 member_vnf_index
or "", vdu_id
or ""
7960 async def heal_N2VC(
7977 ee_config_descriptor
,
7979 nsr_id
= db_nsr
["_id"]
7980 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
7981 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
7982 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
7983 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
7985 "collection": "nsrs",
7986 "filter": {"_id": nsr_id
},
7987 "path": db_update_entry
,
7993 element_under_configuration
= nsr_id
7997 vnfr_id
= db_vnfr
["_id"]
7998 osm_config
["osm"]["vnf_id"] = vnfr_id
8000 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
8002 if vca_type
== "native_charm":
8005 index_number
= vdu_index
or 0
8008 element_type
= "VNF"
8009 element_under_configuration
= vnfr_id
8010 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
8012 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
8013 element_type
= "VDU"
8014 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
8015 osm_config
["osm"]["vdu_id"] = vdu_id
8017 namespace
+= ".{}".format(kdu_name
)
8018 element_type
= "KDU"
8019 element_under_configuration
= kdu_name
8020 osm_config
["osm"]["kdu_name"] = kdu_name
8023 if base_folder
["pkg-dir"]:
8024 artifact_path
= "{}/{}/{}/{}".format(
8025 base_folder
["folder"],
8026 base_folder
["pkg-dir"],
8029 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8034 artifact_path
= "{}/Scripts/{}/{}/".format(
8035 base_folder
["folder"],
8038 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8043 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
8045 # get initial_config_primitive_list that applies to this element
8046 initial_config_primitive_list
= config_descriptor
.get(
8047 "initial-config-primitive"
8051 "Initial config primitive list > {}".format(
8052 initial_config_primitive_list
8056 # add config if not present for NS charm
8057 ee_descriptor_id
= ee_config_descriptor
.get("id")
8058 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
8059 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
8060 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
8064 "Initial config primitive list #2 > {}".format(
8065 initial_config_primitive_list
8068 # n2vc_redesign STEP 3.1
8069 # find old ee_id if exists
8070 ee_id
= vca_deployed
.get("ee_id")
8072 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
8073 # create or register execution environment in VCA. Only for native charms when healing
8074 if vca_type
== "native_charm":
8075 step
= "Waiting to VM being up and getting IP address"
8076 self
.logger
.debug(logging_text
+ step
)
8077 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8086 credentials
= {"hostname": rw_mgmt_ip
}
8088 username
= deep_get(
8089 config_descriptor
, ("config-access", "ssh-access", "default-user")
8091 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
8092 # merged. Meanwhile let's get username from initial-config-primitive
8093 if not username
and initial_config_primitive_list
:
8094 for config_primitive
in initial_config_primitive_list
:
8095 for param
in config_primitive
.get("parameter", ()):
8096 if param
["name"] == "ssh-username":
8097 username
= param
["value"]
8101 "Cannot determine the username neither with 'initial-config-primitive' nor with "
8102 "'config-access.ssh-access.default-user'"
8104 credentials
["username"] = username
8106 # n2vc_redesign STEP 3.2
8107 # TODO: Before healing at RO it is needed to destroy native charm units to be deleted.
8108 self
._write
_configuration
_status
(
8110 vca_index
=vca_index
,
8111 status
="REGISTERING",
8112 element_under_configuration
=element_under_configuration
,
8113 element_type
=element_type
,
8116 step
= "register execution environment {}".format(credentials
)
8117 self
.logger
.debug(logging_text
+ step
)
8118 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
8119 credentials
=credentials
,
8120 namespace
=namespace
,
8125 # update ee_id en db
8127 "_admin.deployed.VCA.{}.ee_id".format(vca_index
): ee_id
,
8129 self
.update_db_2("nsrs", nsr_id
, db_dict_ee_id
)
8131 # for compatibility with MON/POL modules, the need model and application name at database
8132 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
8133 # Not sure if this need to be done when healing
8135 ee_id_parts = ee_id.split(".")
8136 db_nsr_update = {db_update_entry + "ee_id": ee_id}
8137 if len(ee_id_parts) >= 2:
8138 model_name = ee_id_parts[0]
8139 application_name = ee_id_parts[1]
8140 db_nsr_update[db_update_entry + "model"] = model_name
8141 db_nsr_update[db_update_entry + "application"] = application_name
8144 # n2vc_redesign STEP 3.3
8145 # Install configuration software. Only for native charms.
8146 step
= "Install configuration Software"
8148 self
._write
_configuration
_status
(
8150 vca_index
=vca_index
,
8151 status
="INSTALLING SW",
8152 element_under_configuration
=element_under_configuration
,
8153 element_type
=element_type
,
8154 #other_update=db_nsr_update,
8158 # TODO check if already done
8159 self
.logger
.debug(logging_text
+ step
)
8161 if vca_type
== "native_charm":
8162 config_primitive
= next(
8163 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
8166 if config_primitive
:
8167 config
= self
._map
_primitive
_params
(
8168 config_primitive
, {}, deploy_params
8170 await self
.vca_map
[vca_type
].install_configuration_sw(
8172 artifact_path
=artifact_path
,
8180 # write in db flag of configuration_sw already installed
8182 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
8185 # Not sure if this need to be done when healing
8187 # add relations for this VCA (wait for other peers related with this VCA)
8188 await self._add_vca_relations(
8189 logging_text=logging_text,
8192 vca_index=vca_index,
8196 # if SSH access is required, then get execution environment SSH public
8197 # if native charm we have waited already to VM be UP
8198 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
8201 # self.logger.debug("get ssh key block")
8203 config_descriptor
, ("config-access", "ssh-access", "required")
8205 # self.logger.debug("ssh key needed")
8206 # Needed to inject a ssh key
8209 ("config-access", "ssh-access", "default-user"),
8211 step
= "Install configuration Software, getting public ssh key"
8212 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
8213 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
8216 step
= "Insert public key into VM user={} ssh_key={}".format(
8220 # self.logger.debug("no need to get ssh key")
8221 step
= "Waiting to VM being up and getting IP address"
8222 self
.logger
.debug(logging_text
+ step
)
8224 # n2vc_redesign STEP 5.1
8225 # wait for RO (ip-address) Insert pub_key into VM
8226 # IMPORTANT: We need do wait for RO to complete healing operation.
8227 await self
._wait
_heal
_ro
(nsr_id
,self
.timeout_ns_heal
)
8230 rw_mgmt_ip
= await self
.wait_kdu_up(
8231 logging_text
, nsr_id
, vnfr_id
, kdu_name
8234 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8244 rw_mgmt_ip
= None # This is for a NS configuration
8246 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
8248 # store rw_mgmt_ip in deploy params for later replacement
8249 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
8252 # get run-day1 operation parameter
8253 runDay1
= deploy_params
.get("run-day1",False)
8254 self
.logger
.debug(" Healing vnf={}, vdu={}, runDay1 ={}".format(vnfr_id
,vdu_id
,runDay1
))
8256 # n2vc_redesign STEP 6 Execute initial config primitive
8257 step
= "execute initial config primitive"
8259 # wait for dependent primitives execution (NS -> VNF -> VDU)
8260 if initial_config_primitive_list
:
8261 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
8263 # stage, in function of element type: vdu, kdu, vnf or ns
8264 my_vca
= vca_deployed_list
[vca_index
]
8265 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
8267 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
8268 elif my_vca
.get("member-vnf-index"):
8270 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
8273 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
8275 self
._write
_configuration
_status
(
8276 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
8279 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
8281 check_if_terminated_needed
= True
8282 for initial_config_primitive
in initial_config_primitive_list
:
8283 # adding information on the vca_deployed if it is a NS execution environment
8284 if not vca_deployed
["member-vnf-index"]:
8285 deploy_params
["ns_config_info"] = json
.dumps(
8286 self
._get
_ns
_config
_info
(nsr_id
)
8288 # TODO check if already done
8289 primitive_params_
= self
._map
_primitive
_params
(
8290 initial_config_primitive
, {}, deploy_params
8293 step
= "execute primitive '{}' params '{}'".format(
8294 initial_config_primitive
["name"], primitive_params_
8296 self
.logger
.debug(logging_text
+ step
)
8297 await self
.vca_map
[vca_type
].exec_primitive(
8299 primitive_name
=initial_config_primitive
["name"],
8300 params_dict
=primitive_params_
,
8305 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
8306 if check_if_terminated_needed
:
8307 if config_descriptor
.get("terminate-config-primitive"):
8309 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
8311 check_if_terminated_needed
= False
8313 # TODO register in database that primitive is done
8315 # STEP 7 Configure metrics
8316 # Not sure if this need to be done when healing
8318 if vca_type == "helm" or vca_type == "helm-v3":
8319 prometheus_jobs = await self.extract_prometheus_scrape_jobs(
8321 artifact_path=artifact_path,
8322 ee_config_descriptor=ee_config_descriptor,
8325 target_ip=rw_mgmt_ip,
8331 {db_update_entry + "prometheus_jobs": prometheus_jobs},
8334 for job in prometheus_jobs:
8337 {"job_name": job["job_name"]},
8340 fail_on_empty=False,
8344 step
= "instantiated at VCA"
8345 self
.logger
.debug(logging_text
+ step
)
8347 self
._write
_configuration
_status
(
8348 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
8351 except Exception as e
: # TODO not use Exception but N2VC exception
8352 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
8354 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
8357 "Exception while {} : {}".format(step
, e
), exc_info
=True
8359 self
._write
_configuration
_status
(
8360 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
8362 raise LcmException("{} {}".format(step
, e
)) from e
8364 async def _wait_heal_ro(
8370 while time() <= start_time
+ timeout
:
8371 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
8372 operational_status_ro
= db_nsr
["_admin"]["deployed"]["RO"]["operational-status"]
8373 self
.logger
.debug("Wait Heal RO > {}".format(operational_status_ro
))
8374 if operational_status_ro
!= "healing":
8376 await asyncio
.sleep(15, loop
=self
.loop
)
8377 else: # timeout_ns_deploy
8378 raise NgRoException("Timeout waiting ns to deploy")
8380 async def vertical_scale(self
, nsr_id
, nslcmop_id
):
8382 Vertical Scale the VDUs in a NS
8384 :param: nsr_id: NS Instance ID
8385 :param: nslcmop_id: nslcmop ID of migrate
8388 # Try to lock HA task here
8389 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
8390 if not task_is_locked_by_me
:
8392 logging_text
= "Task ns={} vertical scale ".format(nsr_id
)
8393 self
.logger
.debug(logging_text
+ "Enter")
8394 # get all needed from database
8396 db_nslcmop_update
= {}
8397 nslcmop_operation_state
= None
8401 # in case of error, indicates what part of scale was failed to put nsr at error status
8402 start_deploy
= time()
8405 # wait for any previous tasks in process
8406 step
= "Waiting for previous operations to terminate"
8407 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
8409 self
._write
_ns
_status
(
8412 current_operation
="VerticalScale",
8413 current_operation_id
=nslcmop_id
8415 step
= "Getting nslcmop from database"
8416 self
.logger
.debug(step
+ " after having waited for previous tasks to be completed")
8417 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
8418 operationParams
= db_nslcmop
.get("operationParams")
8420 target
.update(operationParams
)
8421 desc
= await self
.RO
.vertical_scale(nsr_id
, target
)
8422 self
.logger
.debug("RO return > {}".format(desc
))
8423 action_id
= desc
["action_id"]
8424 await self
._wait
_ng
_ro
(
8425 nsr_id
, action_id
, nslcmop_id
, start_deploy
, self
.timeout_verticalscale
,
8426 operation
="verticalscale"
8428 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
8429 self
.logger
.error("Exit Exception {}".format(e
))
8431 except asyncio
.CancelledError
:
8432 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
8433 exc
= "Operation was cancelled"
8434 except Exception as e
:
8435 exc
= traceback
.format_exc()
8436 self
.logger
.critical("Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
8438 self
._write
_ns
_status
(
8441 current_operation
="IDLE",
8442 current_operation_id
=None,
8447 ] = "FAILED {}: {}".format(step
, exc
)
8448 nslcmop_operation_state
= "FAILED"
8450 nslcmop_operation_state
= "COMPLETED"
8451 db_nslcmop_update
["detailed-status"] = "Done"
8452 db_nsr_update
["detailed-status"] = "Done"
8454 self
._write
_op
_status
(
8458 operation_state
=nslcmop_operation_state
,
8459 other_update
=db_nslcmop_update
,
8461 if nslcmop_operation_state
:
8465 "nslcmop_id": nslcmop_id
,
8466 "operationState": nslcmop_operation_state
,
8468 await self
.msg
.aiowrite("ns", "verticalscaled", msg
, loop
=self
.loop
)
8469 except Exception as e
:
8471 logging_text
+ "kafka_write notification Exception {}".format(e
)
8473 self
.logger
.debug(logging_text
+ "Exit")
8474 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_verticalscale")