1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 from typing
import Any
, Dict
, List
24 import logging
.handlers
36 from osm_lcm
import ROclient
37 from osm_lcm
.data_utils
.nsr
import (
40 get_deployed_vca_list
,
43 from osm_lcm
.data_utils
.vca
import (
52 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
53 from osm_lcm
.lcm_utils
import (
60 check_juju_bundle_existence
,
61 get_charm_artifact_path
,
63 from osm_lcm
.data_utils
.nsd
import (
64 get_ns_configuration_relation_list
,
68 from osm_lcm
.data_utils
.vnfd
import (
74 get_ee_sorted_initial_config_primitive_list
,
75 get_ee_sorted_terminate_config_primitive_list
,
77 get_virtual_link_profiles
,
82 get_number_of_instances
,
84 get_kdu_resource_profile
,
85 find_software_version
,
87 from osm_lcm
.data_utils
.list_utils
import find_in_list
88 from osm_lcm
.data_utils
.vnfr
import (
92 get_volumes_from_instantiation_params
,
94 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
95 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
96 from n2vc
.definitions
import RelationEndpoint
97 from n2vc
.k8s_helm_conn
import K8sHelmConnector
98 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
99 from n2vc
.k8s_juju_conn
import K8sJujuConnector
101 from osm_common
.dbbase
import DbException
102 from osm_common
.fsbase
import FsException
104 from osm_lcm
.data_utils
.database
.database
import Database
105 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
107 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
108 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
110 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
111 from osm_lcm
.osm_config
import OsmConfigBuilder
112 from osm_lcm
.prometheus
import parse_job
114 from copy
import copy
, deepcopy
115 from time
import time
116 from uuid
import uuid4
118 from random
import randint
120 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
123 class NsLcm(LcmBase
):
124 timeout_vca_on_error
= (
126 ) # Time for charm from first time at blocked,error status to mark as failed
127 timeout_ns_deploy
= 2 * 3600 # default global timeout for deployment a ns
128 timeout_ns_terminate
= 1800 # default global timeout for un deployment a ns
129 timeout_ns_heal
= 1800 # default global timeout for un deployment a ns
130 timeout_charm_delete
= 10 * 60
131 timeout_primitive
= 30 * 60 # timeout for primitive execution
132 timeout_ns_update
= 30 * 60 # timeout for ns update
133 timeout_progress_primitive
= (
135 ) # timeout for some progress in a primitive execution
136 timeout_migrate
= 1800 # default global timeout for migrating vnfs
137 timeout_operate
= 1800 # default global timeout for migrating vnfs
138 timeout_verticalscale
= 1800 # default global timeout for Vertical Sclaing
139 SUBOPERATION_STATUS_NOT_FOUND
= -1
140 SUBOPERATION_STATUS_NEW
= -2
141 SUBOPERATION_STATUS_SKIP
= -3
142 task_name_deploy_vca
= "Deploying VCA"
144 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
146 Init, Connect to database, filesystem storage, and messaging
147 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
150 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
152 self
.db
= Database().instance
.db
153 self
.fs
= Filesystem().instance
.fs
155 self
.lcm_tasks
= lcm_tasks
156 self
.timeout
= config
["timeout"]
157 self
.ro_config
= config
["ro_config"]
158 self
.ng_ro
= config
["ro_config"].get("ng")
159 self
.vca_config
= config
["VCA"].copy()
161 # create N2VC connector
162 self
.n2vc
= N2VCJujuConnector(
165 on_update_db
=self
._on
_update
_n
2vc
_db
,
170 self
.conn_helm_ee
= LCMHelmConn(
173 vca_config
=self
.vca_config
,
174 on_update_db
=self
._on
_update
_n
2vc
_db
,
177 self
.k8sclusterhelm2
= K8sHelmConnector(
178 kubectl_command
=self
.vca_config
.get("kubectlpath"),
179 helm_command
=self
.vca_config
.get("helmpath"),
186 self
.k8sclusterhelm3
= K8sHelm3Connector(
187 kubectl_command
=self
.vca_config
.get("kubectlpath"),
188 helm_command
=self
.vca_config
.get("helm3path"),
195 self
.k8sclusterjuju
= K8sJujuConnector(
196 kubectl_command
=self
.vca_config
.get("kubectlpath"),
197 juju_command
=self
.vca_config
.get("jujupath"),
200 on_update_db
=self
._on
_update
_k
8s
_db
,
205 self
.k8scluster_map
= {
206 "helm-chart": self
.k8sclusterhelm2
,
207 "helm-chart-v3": self
.k8sclusterhelm3
,
208 "chart": self
.k8sclusterhelm3
,
209 "juju-bundle": self
.k8sclusterjuju
,
210 "juju": self
.k8sclusterjuju
,
214 "lxc_proxy_charm": self
.n2vc
,
215 "native_charm": self
.n2vc
,
216 "k8s_proxy_charm": self
.n2vc
,
217 "helm": self
.conn_helm_ee
,
218 "helm-v3": self
.conn_helm_ee
,
222 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
)
224 self
.op_status_map
= {
225 "instantiation": self
.RO
.status
,
226 "termination": self
.RO
.status
,
227 "migrate": self
.RO
.status
,
228 "healing": self
.RO
.recreate_status
,
229 "verticalscale": self
.RO
.status
,
230 "start_stop_rebuild": self
.RO
.status
,
234 def increment_ip_mac(ip_mac
, vm_index
=1):
235 if not isinstance(ip_mac
, str):
238 # try with ipv4 look for last dot
239 i
= ip_mac
.rfind(".")
242 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
243 # try with ipv6 or mac look for last colon. Operate in hex
244 i
= ip_mac
.rfind(":")
247 # format in hex, len can be 2 for mac or 4 for ipv6
248 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
249 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
255 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
257 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
260 # TODO filter RO descriptor fields...
264 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
265 db_dict
["deploymentStatus"] = ro_descriptor
266 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
268 except Exception as e
:
270 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id
, e
)
273 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
275 # remove last dot from path (if exists)
276 if path
.endswith("."):
279 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
280 # .format(table, filter, path, updated_data))
283 nsr_id
= filter.get("_id")
285 # read ns record from database
286 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
287 current_ns_status
= nsr
.get("nsState")
289 # get vca status for NS
290 status_dict
= await self
.n2vc
.get_status(
291 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
296 db_dict
["vcaStatus"] = status_dict
298 # update configurationStatus for this VCA
300 vca_index
= int(path
[path
.rfind(".") + 1 :])
303 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
305 vca_status
= vca_list
[vca_index
].get("status")
307 configuration_status_list
= nsr
.get("configurationStatus")
308 config_status
= configuration_status_list
[vca_index
].get("status")
310 if config_status
== "BROKEN" and vca_status
!= "failed":
311 db_dict
["configurationStatus"][vca_index
] = "READY"
312 elif config_status
!= "BROKEN" and vca_status
== "failed":
313 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
314 except Exception as e
:
315 # not update configurationStatus
316 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
318 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
319 # if nsState = 'DEGRADED' check if all is OK
321 if current_ns_status
in ("READY", "DEGRADED"):
322 error_description
= ""
324 if status_dict
.get("machines"):
325 for machine_id
in status_dict
.get("machines"):
326 machine
= status_dict
.get("machines").get(machine_id
)
327 # check machine agent-status
328 if machine
.get("agent-status"):
329 s
= machine
.get("agent-status").get("status")
332 error_description
+= (
333 "machine {} agent-status={} ; ".format(
337 # check machine instance status
338 if machine
.get("instance-status"):
339 s
= machine
.get("instance-status").get("status")
342 error_description
+= (
343 "machine {} instance-status={} ; ".format(
348 if status_dict
.get("applications"):
349 for app_id
in status_dict
.get("applications"):
350 app
= status_dict
.get("applications").get(app_id
)
351 # check application status
352 if app
.get("status"):
353 s
= app
.get("status").get("status")
356 error_description
+= (
357 "application {} status={} ; ".format(app_id
, s
)
360 if error_description
:
361 db_dict
["errorDescription"] = error_description
362 if current_ns_status
== "READY" and is_degraded
:
363 db_dict
["nsState"] = "DEGRADED"
364 if current_ns_status
== "DEGRADED" and not is_degraded
:
365 db_dict
["nsState"] = "READY"
368 self
.update_db_2("nsrs", nsr_id
, db_dict
)
370 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
372 except Exception as e
:
373 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
375 async def _on_update_k8s_db(
376 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None, cluster_type
="juju"
379 Updating vca status in NSR record
380 :param cluster_uuid: UUID of a k8s cluster
381 :param kdu_instance: The unique name of the KDU instance
382 :param filter: To get nsr_id
383 :cluster_type: The cluster type (juju, k8s)
387 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
388 # .format(cluster_uuid, kdu_instance, filter))
390 nsr_id
= filter.get("_id")
392 vca_status
= await self
.k8scluster_map
[cluster_type
].status_kdu(
393 cluster_uuid
=cluster_uuid
,
394 kdu_instance
=kdu_instance
,
396 complete_status
=True,
402 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
405 f
"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
409 self
.update_db_2("nsrs", nsr_id
, db_dict
)
410 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
412 except Exception as e
:
413 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
416 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
419 undefined
=StrictUndefined
,
420 autoescape
=select_autoescape(default_for_string
=True, default
=True),
422 template
= env
.from_string(cloud_init_text
)
423 return template
.render(additional_params
or {})
424 except UndefinedError
as e
:
426 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
427 "file, must be provided in the instantiation parameters inside the "
428 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
430 except (TemplateError
, TemplateNotFound
) as e
:
432 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
437 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
438 cloud_init_content
= cloud_init_file
= None
440 if vdu
.get("cloud-init-file"):
441 base_folder
= vnfd
["_admin"]["storage"]
442 if base_folder
["pkg-dir"]:
443 cloud_init_file
= "{}/{}/cloud_init/{}".format(
444 base_folder
["folder"],
445 base_folder
["pkg-dir"],
446 vdu
["cloud-init-file"],
449 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
450 base_folder
["folder"],
451 vdu
["cloud-init-file"],
453 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
454 cloud_init_content
= ci_file
.read()
455 elif vdu
.get("cloud-init"):
456 cloud_init_content
= vdu
["cloud-init"]
458 return cloud_init_content
459 except FsException
as e
:
461 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
462 vnfd
["id"], vdu
["id"], cloud_init_file
, e
466 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
468 (vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]), {}
470 additional_params
= vdur
.get("additionalParams")
471 return parse_yaml_strings(additional_params
)
473 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
475 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
476 :param vnfd: input vnfd
477 :param new_id: overrides vnf id if provided
478 :param additionalParams: Instantiation params for VNFs provided
479 :param nsrId: Id of the NSR
480 :return: copy of vnfd
482 vnfd_RO
= deepcopy(vnfd
)
483 # remove unused by RO configuration, monitoring, scaling and internal keys
484 vnfd_RO
.pop("_id", None)
485 vnfd_RO
.pop("_admin", None)
486 vnfd_RO
.pop("monitoring-param", None)
487 vnfd_RO
.pop("scaling-group-descriptor", None)
488 vnfd_RO
.pop("kdu", None)
489 vnfd_RO
.pop("k8s-cluster", None)
491 vnfd_RO
["id"] = new_id
493 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
494 for vdu
in get_iterable(vnfd_RO
, "vdu"):
495 vdu
.pop("cloud-init-file", None)
496 vdu
.pop("cloud-init", None)
500 def ip_profile_2_RO(ip_profile
):
501 RO_ip_profile
= deepcopy(ip_profile
)
502 if "dns-server" in RO_ip_profile
:
503 if isinstance(RO_ip_profile
["dns-server"], list):
504 RO_ip_profile
["dns-address"] = []
505 for ds
in RO_ip_profile
.pop("dns-server"):
506 RO_ip_profile
["dns-address"].append(ds
["address"])
508 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
509 if RO_ip_profile
.get("ip-version") == "ipv4":
510 RO_ip_profile
["ip-version"] = "IPv4"
511 if RO_ip_profile
.get("ip-version") == "ipv6":
512 RO_ip_profile
["ip-version"] = "IPv6"
513 if "dhcp-params" in RO_ip_profile
:
514 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
517 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
518 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
519 if db_vim
["_admin"]["operationalState"] != "ENABLED":
521 "VIM={} is not available. operationalState={}".format(
522 vim_account
, db_vim
["_admin"]["operationalState"]
525 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
528 def get_ro_wim_id_for_wim_account(self
, wim_account
):
529 if isinstance(wim_account
, str):
530 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
531 if db_wim
["_admin"]["operationalState"] != "ENABLED":
533 "WIM={} is not available. operationalState={}".format(
534 wim_account
, db_wim
["_admin"]["operationalState"]
537 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
542 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
544 db_vdu_push_list
= []
546 db_update
= {"_admin.modified": time()}
548 for vdu_id
, vdu_count
in vdu_create
.items():
552 for vdur
in reversed(db_vnfr
["vdur"])
553 if vdur
["vdu-id-ref"] == vdu_id
558 # Read the template saved in the db:
560 "No vdur in the database. Using the vdur-template to scale"
562 vdur_template
= db_vnfr
.get("vdur-template")
563 if not vdur_template
:
565 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
569 vdur
= vdur_template
[0]
570 # Delete a template from the database after using it
573 {"_id": db_vnfr
["_id"]},
575 pull
={"vdur-template": {"_id": vdur
["_id"]}},
577 for count
in range(vdu_count
):
578 vdur_copy
= deepcopy(vdur
)
579 vdur_copy
["status"] = "BUILD"
580 vdur_copy
["status-detailed"] = None
581 vdur_copy
["ip-address"] = None
582 vdur_copy
["_id"] = str(uuid4())
583 vdur_copy
["count-index"] += count
+ 1
584 vdur_copy
["id"] = "{}-{}".format(
585 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
587 vdur_copy
.pop("vim_info", None)
588 for iface
in vdur_copy
["interfaces"]:
589 if iface
.get("fixed-ip"):
590 iface
["ip-address"] = self
.increment_ip_mac(
591 iface
["ip-address"], count
+ 1
594 iface
.pop("ip-address", None)
595 if iface
.get("fixed-mac"):
596 iface
["mac-address"] = self
.increment_ip_mac(
597 iface
["mac-address"], count
+ 1
600 iface
.pop("mac-address", None)
604 ) # only first vdu can be managment of vnf
605 db_vdu_push_list
.append(vdur_copy
)
606 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
608 if len(db_vnfr
["vdur"]) == 1:
609 # The scale will move to 0 instances
611 "Scaling to 0 !, creating the template with the last vdur"
613 template_vdur
= [db_vnfr
["vdur"][0]]
614 for vdu_id
, vdu_count
in vdu_delete
.items():
616 indexes_to_delete
= [
618 for iv
in enumerate(db_vnfr
["vdur"])
619 if iv
[1]["vdu-id-ref"] == vdu_id
623 "vdur.{}.status".format(i
): "DELETING"
624 for i
in indexes_to_delete
[-vdu_count
:]
628 # it must be deleted one by one because common.db does not allow otherwise
631 for v
in reversed(db_vnfr
["vdur"])
632 if v
["vdu-id-ref"] == vdu_id
634 for vdu
in vdus_to_delete
[:vdu_count
]:
637 {"_id": db_vnfr
["_id"]},
639 pull
={"vdur": {"_id": vdu
["_id"]}},
643 db_push
["vdur"] = db_vdu_push_list
645 db_push
["vdur-template"] = template_vdur
648 db_vnfr
["vdur-template"] = template_vdur
649 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
650 # modify passed dictionary db_vnfr
651 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
652 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
654 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
656 Updates database nsr with the RO info for the created vld
657 :param ns_update_nsr: dictionary to be filled with the updated info
658 :param db_nsr: content of db_nsr. This is also modified
659 :param nsr_desc_RO: nsr descriptor from RO
660 :return: Nothing, LcmException is raised on errors
663 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
664 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
665 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
667 vld
["vim-id"] = net_RO
.get("vim_net_id")
668 vld
["name"] = net_RO
.get("vim_name")
669 vld
["status"] = net_RO
.get("status")
670 vld
["status-detailed"] = net_RO
.get("error_msg")
671 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
675 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
678 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
680 for db_vnfr
in db_vnfrs
.values():
681 vnfr_update
= {"status": "ERROR"}
682 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
683 if "status" not in vdur
:
684 vdur
["status"] = "ERROR"
685 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
687 vdur
["status-detailed"] = str(error_text
)
689 "vdur.{}.status-detailed".format(vdu_index
)
691 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
692 except DbException
as e
:
693 self
.logger
.error("Cannot update vnf. {}".format(e
))
695 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
697 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
698 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
699 :param nsr_desc_RO: nsr descriptor from RO
700 :return: Nothing, LcmException is raised on errors
702 for vnf_index
, db_vnfr
in db_vnfrs
.items():
703 for vnf_RO
in nsr_desc_RO
["vnfs"]:
704 if vnf_RO
["member_vnf_index"] != vnf_index
:
707 if vnf_RO
.get("ip_address"):
708 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
[
711 elif not db_vnfr
.get("ip-address"):
712 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
713 raise LcmExceptionNoMgmtIP(
714 "ns member_vnf_index '{}' has no IP address".format(
719 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
720 vdur_RO_count_index
= 0
721 if vdur
.get("pdu-type"):
723 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
724 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
726 if vdur
["count-index"] != vdur_RO_count_index
:
727 vdur_RO_count_index
+= 1
729 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
730 if vdur_RO
.get("ip_address"):
731 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
733 vdur
["ip-address"] = None
734 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
735 vdur
["name"] = vdur_RO
.get("vim_name")
736 vdur
["status"] = vdur_RO
.get("status")
737 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
738 for ifacer
in get_iterable(vdur
, "interfaces"):
739 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
740 if ifacer
["name"] == interface_RO
.get("internal_name"):
741 ifacer
["ip-address"] = interface_RO
.get(
744 ifacer
["mac-address"] = interface_RO
.get(
750 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
751 "from VIM info".format(
752 vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]
755 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
759 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
761 vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]
765 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
766 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
767 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
769 vld
["vim-id"] = net_RO
.get("vim_net_id")
770 vld
["name"] = net_RO
.get("vim_name")
771 vld
["status"] = net_RO
.get("status")
772 vld
["status-detailed"] = net_RO
.get("error_msg")
773 vnfr_update
["vld.{}".format(vld_index
)] = vld
777 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
782 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
787 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
792 def _get_ns_config_info(self
, nsr_id
):
794 Generates a mapping between vnf,vdu elements and the N2VC id
795 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
796 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
797 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
798 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
800 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
801 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
803 ns_config_info
= {"osm-config-mapping": mapping
}
804 for vca
in vca_deployed_list
:
805 if not vca
["member-vnf-index"]:
807 if not vca
["vdu_id"]:
808 mapping
[vca
["member-vnf-index"]] = vca
["application"]
812 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
814 ] = vca
["application"]
815 return ns_config_info
817 async def _instantiate_ng_ro(
834 def get_vim_account(vim_account_id
):
836 if vim_account_id
in db_vims
:
837 return db_vims
[vim_account_id
]
838 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
839 db_vims
[vim_account_id
] = db_vim
842 # modify target_vld info with instantiation parameters
843 def parse_vld_instantiation_params(
844 target_vim
, target_vld
, vld_params
, target_sdn
846 if vld_params
.get("ip-profile"):
847 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
[
850 if vld_params
.get("provider-network"):
851 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
854 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
855 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
858 if vld_params
.get("wimAccountId"):
859 target_wim
= "wim:{}".format(vld_params
["wimAccountId"])
860 target_vld
["vim_info"][target_wim
] = {}
861 for param
in ("vim-network-name", "vim-network-id"):
862 if vld_params
.get(param
):
863 if isinstance(vld_params
[param
], dict):
864 for vim
, vim_net
in vld_params
[param
].items():
865 other_target_vim
= "vim:" + vim
867 target_vld
["vim_info"],
868 (other_target_vim
, param
.replace("-", "_")),
871 else: # isinstance str
872 target_vld
["vim_info"][target_vim
][
873 param
.replace("-", "_")
874 ] = vld_params
[param
]
875 if vld_params
.get("common_id"):
876 target_vld
["common_id"] = vld_params
.get("common_id")
878 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
879 def update_ns_vld_target(target
, ns_params
):
880 for vnf_params
in ns_params
.get("vnf", ()):
881 if vnf_params
.get("vimAccountId"):
885 for vnfr
in db_vnfrs
.values()
886 if vnf_params
["member-vnf-index"]
887 == vnfr
["member-vnf-index-ref"]
891 vdur
= next((vdur
for vdur
in target_vnf
.get("vdur", ())), None)
894 for a_index
, a_vld
in enumerate(target
["ns"]["vld"]):
895 target_vld
= find_in_list(
896 get_iterable(vdur
, "interfaces"),
897 lambda iface
: iface
.get("ns-vld-id") == a_vld
["name"],
900 vld_params
= find_in_list(
901 get_iterable(ns_params
, "vld"),
902 lambda v_vld
: v_vld
["name"] in (a_vld
["name"], a_vld
["id"]),
906 if vnf_params
.get("vimAccountId") not in a_vld
.get(
909 target_vim_network_list
= [
910 v
for _
, v
in a_vld
.get("vim_info").items()
912 target_vim_network_name
= next(
914 item
.get("vim_network_name", "")
915 for item
in target_vim_network_list
920 target
["ns"]["vld"][a_index
].get("vim_info").update(
922 "vim:{}".format(vnf_params
["vimAccountId"]): {
923 "vim_network_name": target_vim_network_name
,
929 for param
in ("vim-network-name", "vim-network-id"):
930 if vld_params
.get(param
) and isinstance(
931 vld_params
[param
], dict
933 for vim
, vim_net
in vld_params
[
936 other_target_vim
= "vim:" + vim
938 target
["ns"]["vld"][a_index
].get(
943 param
.replace("-", "_"),
948 nslcmop_id
= db_nslcmop
["_id"]
950 "name": db_nsr
["name"],
953 "image": deepcopy(db_nsr
["image"]),
954 "flavor": deepcopy(db_nsr
["flavor"]),
955 "action_id": nslcmop_id
,
956 "cloud_init_content": {},
958 for image
in target
["image"]:
959 image
["vim_info"] = {}
960 for flavor
in target
["flavor"]:
961 flavor
["vim_info"] = {}
962 if db_nsr
.get("affinity-or-anti-affinity-group"):
963 target
["affinity-or-anti-affinity-group"] = deepcopy(
964 db_nsr
["affinity-or-anti-affinity-group"]
966 for affinity_or_anti_affinity_group
in target
[
967 "affinity-or-anti-affinity-group"
969 affinity_or_anti_affinity_group
["vim_info"] = {}
971 if db_nslcmop
.get("lcmOperationType") != "instantiate":
972 # get parameters of instantiation:
973 db_nslcmop_instantiate
= self
.db
.get_list(
976 "nsInstanceId": db_nslcmop
["nsInstanceId"],
977 "lcmOperationType": "instantiate",
980 ns_params
= db_nslcmop_instantiate
.get("operationParams")
982 ns_params
= db_nslcmop
.get("operationParams")
983 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
984 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
987 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
988 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
992 "mgmt-network": vld
.get("mgmt-network", False),
993 "type": vld
.get("type"),
996 "vim_network_name": vld
.get("vim-network-name"),
997 "vim_account_id": ns_params
["vimAccountId"],
1001 # check if this network needs SDN assist
1002 if vld
.get("pci-interfaces"):
1003 db_vim
= get_vim_account(ns_params
["vimAccountId"])
1004 sdnc_id
= db_vim
["config"].get("sdn-controller")
1006 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
1007 target_sdn
= "sdn:{}".format(sdnc_id
)
1008 target_vld
["vim_info"][target_sdn
] = {
1010 "target_vim": target_vim
,
1012 "type": vld
.get("type"),
1015 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
1016 for nsd_vnf_profile
in nsd_vnf_profiles
:
1017 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
1018 if cp
["virtual-link-profile-id"] == vld
["id"]:
1020 "member_vnf:{}.{}".format(
1021 cp
["constituent-cpd-id"][0][
1022 "constituent-base-element-id"
1024 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
1026 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
1028 # check at nsd descriptor, if there is an ip-profile
1030 nsd_vlp
= find_in_list(
1031 get_virtual_link_profiles(nsd
),
1032 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
1037 and nsd_vlp
.get("virtual-link-protocol-data")
1038 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1040 ip_profile_source_data
= nsd_vlp
["virtual-link-protocol-data"][
1043 ip_profile_dest_data
= {}
1044 if "ip-version" in ip_profile_source_data
:
1045 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1048 if "cidr" in ip_profile_source_data
:
1049 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1052 if "gateway-ip" in ip_profile_source_data
:
1053 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
[
1056 if "dhcp-enabled" in ip_profile_source_data
:
1057 ip_profile_dest_data
["dhcp-params"] = {
1058 "enabled": ip_profile_source_data
["dhcp-enabled"]
1060 vld_params
["ip-profile"] = ip_profile_dest_data
1062 # update vld_params with instantiation params
1063 vld_instantiation_params
= find_in_list(
1064 get_iterable(ns_params
, "vld"),
1065 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
1067 if vld_instantiation_params
:
1068 vld_params
.update(vld_instantiation_params
)
1069 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
1070 target
["ns"]["vld"].append(target_vld
)
1071 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1072 update_ns_vld_target(target
, ns_params
)
1074 for vnfr
in db_vnfrs
.values():
1075 vnfd
= find_in_list(
1076 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
1078 vnf_params
= find_in_list(
1079 get_iterable(ns_params
, "vnf"),
1080 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
1082 target_vnf
= deepcopy(vnfr
)
1083 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
1084 for vld
in target_vnf
.get("vld", ()):
1085 # check if connected to a ns.vld, to fill target'
1086 vnf_cp
= find_in_list(
1087 vnfd
.get("int-virtual-link-desc", ()),
1088 lambda cpd
: cpd
.get("id") == vld
["id"],
1091 ns_cp
= "member_vnf:{}.{}".format(
1092 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
1094 if cp2target
.get(ns_cp
):
1095 vld
["target"] = cp2target
[ns_cp
]
1098 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
1100 # check if this network needs SDN assist
1102 if vld
.get("pci-interfaces"):
1103 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1104 sdnc_id
= db_vim
["config"].get("sdn-controller")
1106 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
1107 target_sdn
= "sdn:{}".format(sdnc_id
)
1108 vld
["vim_info"][target_sdn
] = {
1110 "target_vim": target_vim
,
1112 "type": vld
.get("type"),
1115 # check at vnfd descriptor, if there is an ip-profile
1117 vnfd_vlp
= find_in_list(
1118 get_virtual_link_profiles(vnfd
),
1119 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
1123 and vnfd_vlp
.get("virtual-link-protocol-data")
1124 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1126 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"][
1129 ip_profile_dest_data
= {}
1130 if "ip-version" in ip_profile_source_data
:
1131 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1134 if "cidr" in ip_profile_source_data
:
1135 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1138 if "gateway-ip" in ip_profile_source_data
:
1139 ip_profile_dest_data
[
1141 ] = ip_profile_source_data
["gateway-ip"]
1142 if "dhcp-enabled" in ip_profile_source_data
:
1143 ip_profile_dest_data
["dhcp-params"] = {
1144 "enabled": ip_profile_source_data
["dhcp-enabled"]
1147 vld_params
["ip-profile"] = ip_profile_dest_data
1148 # update vld_params with instantiation params
1150 vld_instantiation_params
= find_in_list(
1151 get_iterable(vnf_params
, "internal-vld"),
1152 lambda i_vld
: i_vld
["name"] == vld
["id"],
1154 if vld_instantiation_params
:
1155 vld_params
.update(vld_instantiation_params
)
1156 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
1159 for vdur
in target_vnf
.get("vdur", ()):
1160 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
1161 continue # This vdu must not be created
1162 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
1164 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
1167 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
1168 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
1171 and vdu_configuration
.get("config-access")
1172 and vdu_configuration
.get("config-access").get("ssh-access")
1174 vdur
["ssh-keys"] = ssh_keys_all
1175 vdur
["ssh-access-required"] = vdu_configuration
[
1177 ]["ssh-access"]["required"]
1180 and vnf_configuration
.get("config-access")
1181 and vnf_configuration
.get("config-access").get("ssh-access")
1182 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
1184 vdur
["ssh-keys"] = ssh_keys_all
1185 vdur
["ssh-access-required"] = vnf_configuration
[
1187 ]["ssh-access"]["required"]
1188 elif ssh_keys_instantiation
and find_in_list(
1189 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
1191 vdur
["ssh-keys"] = ssh_keys_instantiation
1193 self
.logger
.debug("NS > vdur > {}".format(vdur
))
1195 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1197 if vdud
.get("cloud-init-file"):
1198 vdur
["cloud-init"] = "{}:file:{}".format(
1199 vnfd
["_id"], vdud
.get("cloud-init-file")
1201 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1202 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1203 base_folder
= vnfd
["_admin"]["storage"]
1204 if base_folder
["pkg-dir"]:
1205 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1206 base_folder
["folder"],
1207 base_folder
["pkg-dir"],
1208 vdud
.get("cloud-init-file"),
1211 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
1212 base_folder
["folder"],
1213 vdud
.get("cloud-init-file"),
1215 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1216 target
["cloud_init_content"][
1219 elif vdud
.get("cloud-init"):
1220 vdur
["cloud-init"] = "{}:vdu:{}".format(
1221 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1223 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1224 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1227 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1228 deploy_params_vdu
= self
._format
_additional
_params
(
1229 vdur
.get("additionalParams") or {}
1231 deploy_params_vdu
["OSM"] = get_osm_params(
1232 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1234 vdur
["additionalParams"] = deploy_params_vdu
1237 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1238 if target_vim
not in ns_flavor
["vim_info"]:
1239 ns_flavor
["vim_info"][target_vim
] = {}
1242 # in case alternative images are provided we must check if they should be applied
1243 # for the vim_type, modify the vim_type taking into account
1244 ns_image_id
= int(vdur
["ns-image-id"])
1245 if vdur
.get("alt-image-ids"):
1246 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1247 vim_type
= db_vim
["vim_type"]
1248 for alt_image_id
in vdur
.get("alt-image-ids"):
1249 ns_alt_image
= target
["image"][int(alt_image_id
)]
1250 if vim_type
== ns_alt_image
.get("vim-type"):
1251 # must use alternative image
1253 "use alternative image id: {}".format(alt_image_id
)
1255 ns_image_id
= alt_image_id
1256 vdur
["ns-image-id"] = ns_image_id
1258 ns_image
= target
["image"][int(ns_image_id
)]
1259 if target_vim
not in ns_image
["vim_info"]:
1260 ns_image
["vim_info"][target_vim
] = {}
1263 if vdur
.get("affinity-or-anti-affinity-group-id"):
1264 for ags_id
in vdur
["affinity-or-anti-affinity-group-id"]:
1265 ns_ags
= target
["affinity-or-anti-affinity-group"][int(ags_id
)]
1266 if target_vim
not in ns_ags
["vim_info"]:
1267 ns_ags
["vim_info"][target_vim
] = {}
1269 vdur
["vim_info"] = {target_vim
: {}}
1270 # instantiation parameters
1272 vdu_instantiation_params
= find_in_list(
1273 get_iterable(vnf_params
, "vdu"),
1274 lambda i_vdu
: i_vdu
["id"] == vdud
["id"],
1276 if vdu_instantiation_params
:
1277 # Parse the vdu_volumes from the instantiation params
1278 vdu_volumes
= get_volumes_from_instantiation_params(
1279 vdu_instantiation_params
, vdud
1281 vdur
["additionalParams"]["OSM"]["vdu_volumes"] = vdu_volumes
1282 vdur_list
.append(vdur
)
1283 target_vnf
["vdur"] = vdur_list
1284 target
["vnf"].append(target_vnf
)
1286 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
1287 desc
= await self
.RO
.deploy(nsr_id
, target
)
1288 self
.logger
.debug("RO return > {}".format(desc
))
1289 action_id
= desc
["action_id"]
1290 await self
._wait
_ng
_ro
(
1297 operation
="instantiation",
1302 "_admin.deployed.RO.operational-status": "running",
1303 "detailed-status": " ".join(stage
),
1305 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1306 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1307 self
._write
_op
_status
(nslcmop_id
, stage
)
1309 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1313 async def _wait_ng_ro(
1323 detailed_status_old
= None
1325 start_time
= start_time
or time()
1326 while time() <= start_time
+ timeout
:
1327 desc_status
= await self
.op_status_map
[operation
](nsr_id
, action_id
)
1328 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1329 if desc_status
["status"] == "FAILED":
1330 raise NgRoException(desc_status
["details"])
1331 elif desc_status
["status"] == "BUILD":
1333 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1334 elif desc_status
["status"] == "DONE":
1336 stage
[2] = "Deployed at VIM"
1339 assert False, "ROclient.check_ns_status returns unknown {}".format(
1340 desc_status
["status"]
1342 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1343 detailed_status_old
= stage
[2]
1344 db_nsr_update
["detailed-status"] = " ".join(stage
)
1345 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1346 self
._write
_op
_status
(nslcmop_id
, stage
)
1347 await asyncio
.sleep(15, loop
=self
.loop
)
1348 else: # timeout_ns_deploy
1349 raise NgRoException("Timeout waiting ns to deploy")
1351 async def _terminate_ng_ro(
1352 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1357 start_deploy
= time()
1364 "action_id": nslcmop_id
,
1366 desc
= await self
.RO
.deploy(nsr_id
, target
)
1367 action_id
= desc
["action_id"]
1368 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1369 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1372 + "ns terminate action at RO. action_id={}".format(action_id
)
1376 delete_timeout
= 20 * 60 # 20 minutes
1377 await self
._wait
_ng
_ro
(
1384 operation
="termination",
1387 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1388 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1390 await self
.RO
.delete(nsr_id
)
1391 except Exception as e
:
1392 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
1393 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1394 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1395 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1397 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1399 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
1400 failed_detail
.append("delete conflict: {}".format(e
))
1403 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1406 failed_detail
.append("delete error: {}".format(e
))
1409 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1413 stage
[2] = "Error deleting from VIM"
1415 stage
[2] = "Deleted from VIM"
1416 db_nsr_update
["detailed-status"] = " ".join(stage
)
1417 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1418 self
._write
_op
_status
(nslcmop_id
, stage
)
1421 raise LcmException("; ".join(failed_detail
))
1424 async def instantiate_RO(
1438 :param logging_text: preffix text to use at logging
1439 :param nsr_id: nsr identity
1440 :param nsd: database content of ns descriptor
1441 :param db_nsr: database content of ns record
1442 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1444 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1445 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1446 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1447 :return: None or exception
1450 start_deploy
= time()
1451 ns_params
= db_nslcmop
.get("operationParams")
1452 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1453 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1455 timeout_ns_deploy
= self
.timeout
.get(
1456 "ns_deploy", self
.timeout_ns_deploy
1459 # Check for and optionally request placement optimization. Database will be updated if placement activated
1460 stage
[2] = "Waiting for Placement."
1461 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1462 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1463 for vnfr
in db_vnfrs
.values():
1464 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1467 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1469 return await self
._instantiate
_ng
_ro
(
1482 except Exception as e
:
1483 stage
[2] = "ERROR deploying at VIM"
1484 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1486 "Error deploying at VIM {}".format(e
),
1487 exc_info
=not isinstance(
1490 ROclient
.ROClientException
,
1499 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1501 Wait for kdu to be up, get ip address
1502 :param logging_text: prefix use for logging
1506 :return: IP address, K8s services
1509 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1512 while nb_tries
< 360:
1513 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1517 for x
in get_iterable(db_vnfr
, "kdur")
1518 if x
.get("kdu-name") == kdu_name
1524 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1526 if kdur
.get("status"):
1527 if kdur
["status"] in ("READY", "ENABLED"):
1528 return kdur
.get("ip-address"), kdur
.get("services")
1531 "target KDU={} is in error state".format(kdu_name
)
1534 await asyncio
.sleep(10, loop
=self
.loop
)
1536 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1538 async def wait_vm_up_insert_key_ro(
1539 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1542 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1543 :param logging_text: prefix use for logging
1548 :param pub_key: public ssh key to inject, None to skip
1549 :param user: user to apply the public ssh key
1553 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1557 target_vdu_id
= None
1563 if ro_retries
>= 360: # 1 hour
1565 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1568 await asyncio
.sleep(10, loop
=self
.loop
)
1571 if not target_vdu_id
:
1572 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1574 if not vdu_id
: # for the VNF case
1575 if db_vnfr
.get("status") == "ERROR":
1577 "Cannot inject ssh-key because target VNF is in error state"
1579 ip_address
= db_vnfr
.get("ip-address")
1585 for x
in get_iterable(db_vnfr
, "vdur")
1586 if x
.get("ip-address") == ip_address
1594 for x
in get_iterable(db_vnfr
, "vdur")
1595 if x
.get("vdu-id-ref") == vdu_id
1596 and x
.get("count-index") == vdu_index
1602 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1603 ): # If only one, this should be the target vdu
1604 vdur
= db_vnfr
["vdur"][0]
1607 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1608 vnfr_id
, vdu_id
, vdu_index
1611 # New generation RO stores information at "vim_info"
1614 if vdur
.get("vim_info"):
1616 t
for t
in vdur
["vim_info"]
1617 ) # there should be only one key
1618 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1620 vdur
.get("pdu-type")
1621 or vdur
.get("status") == "ACTIVE"
1622 or ng_ro_status
== "ACTIVE"
1624 ip_address
= vdur
.get("ip-address")
1627 target_vdu_id
= vdur
["vdu-id-ref"]
1628 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1630 "Cannot inject ssh-key because target VM is in error state"
1633 if not target_vdu_id
:
1636 # inject public key into machine
1637 if pub_key
and user
:
1638 self
.logger
.debug(logging_text
+ "Inserting RO key")
1639 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1640 if vdur
.get("pdu-type"):
1641 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1644 ro_vm_id
= "{}-{}".format(
1645 db_vnfr
["member-vnf-index-ref"], target_vdu_id
1646 ) # TODO add vdu_index
1650 "action": "inject_ssh_key",
1654 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1656 desc
= await self
.RO
.deploy(nsr_id
, target
)
1657 action_id
= desc
["action_id"]
1658 await self
._wait
_ng
_ro
(
1659 nsr_id
, action_id
, timeout
=600, operation
="instantiation"
1663 # wait until NS is deployed at RO
1665 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1666 ro_nsr_id
= deep_get(
1667 db_nsrs
, ("_admin", "deployed", "RO", "nsr_id")
1671 result_dict
= await self
.RO
.create_action(
1673 item_id_name
=ro_nsr_id
,
1675 "add_public_key": pub_key
,
1680 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1681 if not result_dict
or not isinstance(result_dict
, dict):
1683 "Unknown response from RO when injecting key"
1685 for result
in result_dict
.values():
1686 if result
.get("vim_result") == 200:
1689 raise ROclient
.ROClientException(
1690 "error injecting key: {}".format(
1691 result
.get("description")
1695 except NgRoException
as e
:
1697 "Reaching max tries injecting key. Error: {}".format(e
)
1699 except ROclient
.ROClientException
as e
:
1703 + "error injecting key: {}. Retrying until {} seconds".format(
1710 "Reaching max tries injecting key. Error: {}".format(e
)
1717 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1719 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1721 my_vca
= vca_deployed_list
[vca_index
]
1722 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1723 # vdu or kdu: no dependencies
1727 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1728 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1729 configuration_status_list
= db_nsr
["configurationStatus"]
1730 for index
, vca_deployed
in enumerate(configuration_status_list
):
1731 if index
== vca_index
:
1734 if not my_vca
.get("member-vnf-index") or (
1735 vca_deployed
.get("member-vnf-index")
1736 == my_vca
.get("member-vnf-index")
1738 internal_status
= configuration_status_list
[index
].get("status")
1739 if internal_status
== "READY":
1741 elif internal_status
== "BROKEN":
1743 "Configuration aborted because dependent charm/s has failed"
1748 # no dependencies, return
1750 await asyncio
.sleep(10)
1753 raise LcmException("Configuration aborted because dependent charm/s timeout")
1755 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1758 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1760 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1761 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1764 async def instantiate_N2VC(
1781 ee_config_descriptor
,
1783 nsr_id
= db_nsr
["_id"]
1784 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1785 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1786 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1787 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1789 "collection": "nsrs",
1790 "filter": {"_id": nsr_id
},
1791 "path": db_update_entry
,
1797 element_under_configuration
= nsr_id
1801 vnfr_id
= db_vnfr
["_id"]
1802 osm_config
["osm"]["vnf_id"] = vnfr_id
1804 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1806 if vca_type
== "native_charm":
1809 index_number
= vdu_index
or 0
1812 element_type
= "VNF"
1813 element_under_configuration
= vnfr_id
1814 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1816 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1817 element_type
= "VDU"
1818 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1819 osm_config
["osm"]["vdu_id"] = vdu_id
1821 namespace
+= ".{}".format(kdu_name
)
1822 element_type
= "KDU"
1823 element_under_configuration
= kdu_name
1824 osm_config
["osm"]["kdu_name"] = kdu_name
1827 if base_folder
["pkg-dir"]:
1828 artifact_path
= "{}/{}/{}/{}".format(
1829 base_folder
["folder"],
1830 base_folder
["pkg-dir"],
1833 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1838 artifact_path
= "{}/Scripts/{}/{}/".format(
1839 base_folder
["folder"],
1842 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1847 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1849 # get initial_config_primitive_list that applies to this element
1850 initial_config_primitive_list
= config_descriptor
.get(
1851 "initial-config-primitive"
1855 "Initial config primitive list > {}".format(
1856 initial_config_primitive_list
1860 # add config if not present for NS charm
1861 ee_descriptor_id
= ee_config_descriptor
.get("id")
1862 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1863 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1864 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1868 "Initial config primitive list #2 > {}".format(
1869 initial_config_primitive_list
1872 # n2vc_redesign STEP 3.1
1873 # find old ee_id if exists
1874 ee_id
= vca_deployed
.get("ee_id")
1876 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1877 # create or register execution environment in VCA
1878 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1880 self
._write
_configuration
_status
(
1882 vca_index
=vca_index
,
1884 element_under_configuration
=element_under_configuration
,
1885 element_type
=element_type
,
1888 step
= "create execution environment"
1889 self
.logger
.debug(logging_text
+ step
)
1893 if vca_type
== "k8s_proxy_charm":
1894 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1895 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1896 namespace
=namespace
,
1897 artifact_path
=artifact_path
,
1901 elif vca_type
== "helm" or vca_type
== "helm-v3":
1902 ee_id
, credentials
= await self
.vca_map
[
1904 ].create_execution_environment(
1905 namespace
=namespace
,
1909 artifact_path
=artifact_path
,
1910 chart_model
=vca_name
,
1914 ee_id
, credentials
= await self
.vca_map
[
1916 ].create_execution_environment(
1917 namespace
=namespace
,
1923 elif vca_type
== "native_charm":
1924 step
= "Waiting to VM being up and getting IP address"
1925 self
.logger
.debug(logging_text
+ step
)
1926 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1935 credentials
= {"hostname": rw_mgmt_ip
}
1937 username
= deep_get(
1938 config_descriptor
, ("config-access", "ssh-access", "default-user")
1940 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1941 # merged. Meanwhile let's get username from initial-config-primitive
1942 if not username
and initial_config_primitive_list
:
1943 for config_primitive
in initial_config_primitive_list
:
1944 for param
in config_primitive
.get("parameter", ()):
1945 if param
["name"] == "ssh-username":
1946 username
= param
["value"]
1950 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1951 "'config-access.ssh-access.default-user'"
1953 credentials
["username"] = username
1954 # n2vc_redesign STEP 3.2
1956 self
._write
_configuration
_status
(
1958 vca_index
=vca_index
,
1959 status
="REGISTERING",
1960 element_under_configuration
=element_under_configuration
,
1961 element_type
=element_type
,
1964 step
= "register execution environment {}".format(credentials
)
1965 self
.logger
.debug(logging_text
+ step
)
1966 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1967 credentials
=credentials
,
1968 namespace
=namespace
,
1973 # for compatibility with MON/POL modules, the need model and application name at database
1974 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1975 ee_id_parts
= ee_id
.split(".")
1976 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1977 if len(ee_id_parts
) >= 2:
1978 model_name
= ee_id_parts
[0]
1979 application_name
= ee_id_parts
[1]
1980 db_nsr_update
[db_update_entry
+ "model"] = model_name
1981 db_nsr_update
[db_update_entry
+ "application"] = application_name
1983 # n2vc_redesign STEP 3.3
1984 step
= "Install configuration Software"
1986 self
._write
_configuration
_status
(
1988 vca_index
=vca_index
,
1989 status
="INSTALLING SW",
1990 element_under_configuration
=element_under_configuration
,
1991 element_type
=element_type
,
1992 other_update
=db_nsr_update
,
1995 # TODO check if already done
1996 self
.logger
.debug(logging_text
+ step
)
1998 if vca_type
== "native_charm":
1999 config_primitive
= next(
2000 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
2003 if config_primitive
:
2004 config
= self
._map
_primitive
_params
(
2005 config_primitive
, {}, deploy_params
2008 if vca_type
== "lxc_proxy_charm":
2009 if element_type
== "NS":
2010 num_units
= db_nsr
.get("config-units") or 1
2011 elif element_type
== "VNF":
2012 num_units
= db_vnfr
.get("config-units") or 1
2013 elif element_type
== "VDU":
2014 for v
in db_vnfr
["vdur"]:
2015 if vdu_id
== v
["vdu-id-ref"]:
2016 num_units
= v
.get("config-units") or 1
2018 if vca_type
!= "k8s_proxy_charm":
2019 await self
.vca_map
[vca_type
].install_configuration_sw(
2021 artifact_path
=artifact_path
,
2024 num_units
=num_units
,
2029 # write in db flag of configuration_sw already installed
2031 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
2034 # add relations for this VCA (wait for other peers related with this VCA)
2035 await self
._add
_vca
_relations
(
2036 logging_text
=logging_text
,
2039 vca_index
=vca_index
,
2042 # if SSH access is required, then get execution environment SSH public
2043 # if native charm we have waited already to VM be UP
2044 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
2047 # self.logger.debug("get ssh key block")
2049 config_descriptor
, ("config-access", "ssh-access", "required")
2051 # self.logger.debug("ssh key needed")
2052 # Needed to inject a ssh key
2055 ("config-access", "ssh-access", "default-user"),
2057 step
= "Install configuration Software, getting public ssh key"
2058 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
2059 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
2062 step
= "Insert public key into VM user={} ssh_key={}".format(
2066 # self.logger.debug("no need to get ssh key")
2067 step
= "Waiting to VM being up and getting IP address"
2068 self
.logger
.debug(logging_text
+ step
)
2070 # default rw_mgmt_ip to None, avoiding the non definition of the variable
2073 # n2vc_redesign STEP 5.1
2074 # wait for RO (ip-address) Insert pub_key into VM
2077 rw_mgmt_ip
, services
= await self
.wait_kdu_up(
2078 logging_text
, nsr_id
, vnfr_id
, kdu_name
2080 vnfd
= self
.db
.get_one(
2082 {"_id": f
'{db_vnfr["vnfd-id"]}:{db_vnfr["revision"]}'},
2084 kdu
= get_kdu(vnfd
, kdu_name
)
2086 service
["name"] for service
in get_kdu_services(kdu
)
2088 exposed_services
= []
2089 for service
in services
:
2090 if any(s
in service
["name"] for s
in kdu_services
):
2091 exposed_services
.append(service
)
2092 await self
.vca_map
[vca_type
].exec_primitive(
2094 primitive_name
="config",
2096 "osm-config": json
.dumps(
2098 k8s
={"services": exposed_services
}
2105 # This verification is needed in order to avoid trying to add a public key
2106 # to a VM, when the VNF is a KNF (in the edge case where the user creates a VCA
2107 # for a KNF and not for its KDUs, the previous verification gives False, and the code
2108 # jumps to this block, meaning that there is the need to verify if the VNF is actually a VNF
2110 elif db_vnfr
.get("vdur"):
2111 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
2121 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
2123 # store rw_mgmt_ip in deploy params for later replacement
2124 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
2126 # n2vc_redesign STEP 6 Execute initial config primitive
2127 step
= "execute initial config primitive"
2129 # wait for dependent primitives execution (NS -> VNF -> VDU)
2130 if initial_config_primitive_list
:
2131 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
2133 # stage, in function of element type: vdu, kdu, vnf or ns
2134 my_vca
= vca_deployed_list
[vca_index
]
2135 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
2137 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
2138 elif my_vca
.get("member-vnf-index"):
2140 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
2143 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
2145 self
._write
_configuration
_status
(
2146 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
2149 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2151 check_if_terminated_needed
= True
2152 for initial_config_primitive
in initial_config_primitive_list
:
2153 # adding information on the vca_deployed if it is a NS execution environment
2154 if not vca_deployed
["member-vnf-index"]:
2155 deploy_params
["ns_config_info"] = json
.dumps(
2156 self
._get
_ns
_config
_info
(nsr_id
)
2158 # TODO check if already done
2159 primitive_params_
= self
._map
_primitive
_params
(
2160 initial_config_primitive
, {}, deploy_params
2163 step
= "execute primitive '{}' params '{}'".format(
2164 initial_config_primitive
["name"], primitive_params_
2166 self
.logger
.debug(logging_text
+ step
)
2167 await self
.vca_map
[vca_type
].exec_primitive(
2169 primitive_name
=initial_config_primitive
["name"],
2170 params_dict
=primitive_params_
,
2175 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2176 if check_if_terminated_needed
:
2177 if config_descriptor
.get("terminate-config-primitive"):
2179 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
2181 check_if_terminated_needed
= False
2183 # TODO register in database that primitive is done
2185 # STEP 7 Configure metrics
2186 if vca_type
== "helm" or vca_type
== "helm-v3":
2187 # TODO: review for those cases where the helm chart is a reference and
2188 # is not part of the NF package
2189 prometheus_jobs
= await self
.extract_prometheus_scrape_jobs(
2191 artifact_path
=artifact_path
,
2192 ee_config_descriptor
=ee_config_descriptor
,
2195 target_ip
=rw_mgmt_ip
,
2201 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
2204 for job
in prometheus_jobs
:
2207 {"job_name": job
["job_name"]},
2210 fail_on_empty
=False,
2213 step
= "instantiated at VCA"
2214 self
.logger
.debug(logging_text
+ step
)
2216 self
._write
_configuration
_status
(
2217 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
2220 except Exception as e
: # TODO not use Exception but N2VC exception
2221 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2223 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
2226 "Exception while {} : {}".format(step
, e
), exc_info
=True
2228 self
._write
_configuration
_status
(
2229 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2231 raise LcmException("{} {}".format(step
, e
)) from e
2233 def _write_ns_status(
2237 current_operation
: str,
2238 current_operation_id
: str,
2239 error_description
: str = None,
2240 error_detail
: str = None,
2241 other_update
: dict = None,
2244 Update db_nsr fields.
2247 :param current_operation:
2248 :param current_operation_id:
2249 :param error_description:
2250 :param error_detail:
2251 :param other_update: Other required changes at database if provided, will be cleared
2255 db_dict
= other_update
or {}
2258 ] = current_operation_id
# for backward compatibility
2259 db_dict
["_admin.current-operation"] = current_operation_id
2260 db_dict
["_admin.operation-type"] = (
2261 current_operation
if current_operation
!= "IDLE" else None
2263 db_dict
["currentOperation"] = current_operation
2264 db_dict
["currentOperationID"] = current_operation_id
2265 db_dict
["errorDescription"] = error_description
2266 db_dict
["errorDetail"] = error_detail
2269 db_dict
["nsState"] = ns_state
2270 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2271 except DbException
as e
:
2272 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2274 def _write_op_status(
2278 error_message
: str = None,
2279 queuePosition
: int = 0,
2280 operation_state
: str = None,
2281 other_update
: dict = None,
2284 db_dict
= other_update
or {}
2285 db_dict
["queuePosition"] = queuePosition
2286 if isinstance(stage
, list):
2287 db_dict
["stage"] = stage
[0]
2288 db_dict
["detailed-status"] = " ".join(stage
)
2289 elif stage
is not None:
2290 db_dict
["stage"] = str(stage
)
2292 if error_message
is not None:
2293 db_dict
["errorMessage"] = error_message
2294 if operation_state
is not None:
2295 db_dict
["operationState"] = operation_state
2296 db_dict
["statusEnteredTime"] = time()
2297 self
.update_db_2("nslcmops", op_id
, db_dict
)
2298 except DbException
as e
:
2300 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2303 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2305 nsr_id
= db_nsr
["_id"]
2306 # configurationStatus
2307 config_status
= db_nsr
.get("configurationStatus")
2310 "configurationStatus.{}.status".format(index
): status
2311 for index
, v
in enumerate(config_status
)
2315 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2317 except DbException
as e
:
2319 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2322 def _write_configuration_status(
2327 element_under_configuration
: str = None,
2328 element_type
: str = None,
2329 other_update
: dict = None,
2332 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2333 # .format(vca_index, status))
2336 db_path
= "configurationStatus.{}.".format(vca_index
)
2337 db_dict
= other_update
or {}
2339 db_dict
[db_path
+ "status"] = status
2340 if element_under_configuration
:
2342 db_path
+ "elementUnderConfiguration"
2343 ] = element_under_configuration
2345 db_dict
[db_path
+ "elementType"] = element_type
2346 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2347 except DbException
as e
:
2349 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2350 status
, nsr_id
, vca_index
, e
2354 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2356 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2357 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2358 Database is used because the result can be obtained from a different LCM worker in case of HA.
2359 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2360 :param db_nslcmop: database content of nslcmop
2361 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2362 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2363 computed 'vim-account-id'
2366 nslcmop_id
= db_nslcmop
["_id"]
2367 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2368 if placement_engine
== "PLA":
2370 logging_text
+ "Invoke and wait for placement optimization"
2372 await self
.msg
.aiowrite(
2373 "pla", "get_placement", {"nslcmopId": nslcmop_id
}, loop
=self
.loop
2375 db_poll_interval
= 5
2376 wait
= db_poll_interval
* 10
2378 while not pla_result
and wait
>= 0:
2379 await asyncio
.sleep(db_poll_interval
)
2380 wait
-= db_poll_interval
2381 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2382 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2386 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2389 for pla_vnf
in pla_result
["vnf"]:
2390 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2391 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2396 {"_id": vnfr
["_id"]},
2397 {"vim-account-id": pla_vnf
["vimAccountId"]},
2400 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2403 def update_nsrs_with_pla_result(self
, params
):
2405 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2407 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2409 except Exception as e
:
2410 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2412 async def instantiate(self
, nsr_id
, nslcmop_id
):
2415 :param nsr_id: ns instance to deploy
2416 :param nslcmop_id: operation to run
2420 # Try to lock HA task here
2421 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2422 if not task_is_locked_by_me
:
2424 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2428 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2429 self
.logger
.debug(logging_text
+ "Enter")
2431 # get all needed from database
2433 # database nsrs record
2436 # database nslcmops record
2439 # update operation on nsrs
2441 # update operation on nslcmops
2442 db_nslcmop_update
= {}
2444 nslcmop_operation_state
= None
2445 db_vnfrs
= {} # vnf's info indexed by member-index
2447 tasks_dict_info
= {} # from task to info text
2451 "Stage 1/5: preparation of the environment.",
2452 "Waiting for previous operations to terminate.",
2455 # ^ stage, step, VIM progress
2457 # wait for any previous tasks in process
2458 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2460 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2461 stage
[1] = "Reading from database."
2462 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2463 db_nsr_update
["detailed-status"] = "creating"
2464 db_nsr_update
["operational-status"] = "init"
2465 self
._write
_ns
_status
(
2467 ns_state
="BUILDING",
2468 current_operation
="INSTANTIATING",
2469 current_operation_id
=nslcmop_id
,
2470 other_update
=db_nsr_update
,
2472 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2474 # read from db: operation
2475 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2476 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2477 if db_nslcmop
["operationParams"].get("additionalParamsForVnf"):
2478 db_nslcmop
["operationParams"]["additionalParamsForVnf"] = json
.loads(
2479 db_nslcmop
["operationParams"]["additionalParamsForVnf"]
2481 ns_params
= db_nslcmop
.get("operationParams")
2482 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2483 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2485 timeout_ns_deploy
= self
.timeout
.get(
2486 "ns_deploy", self
.timeout_ns_deploy
2490 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2491 self
.logger
.debug(logging_text
+ stage
[1])
2492 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2493 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2494 self
.logger
.debug(logging_text
+ stage
[1])
2495 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2496 self
.fs
.sync(db_nsr
["nsd-id"])
2498 # nsr_name = db_nsr["name"] # TODO short-name??
2500 # read from db: vnf's of this ns
2501 stage
[1] = "Getting vnfrs from db."
2502 self
.logger
.debug(logging_text
+ stage
[1])
2503 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2505 # read from db: vnfd's for every vnf
2506 db_vnfds
= [] # every vnfd data
2508 # for each vnf in ns, read vnfd
2509 for vnfr
in db_vnfrs_list
:
2510 if vnfr
.get("kdur"):
2512 for kdur
in vnfr
["kdur"]:
2513 if kdur
.get("additionalParams"):
2514 kdur
["additionalParams"] = json
.loads(
2515 kdur
["additionalParams"]
2517 kdur_list
.append(kdur
)
2518 vnfr
["kdur"] = kdur_list
2520 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2521 vnfd_id
= vnfr
["vnfd-id"]
2522 vnfd_ref
= vnfr
["vnfd-ref"]
2523 self
.fs
.sync(vnfd_id
)
2525 # if we haven't this vnfd, read it from db
2526 if vnfd_id
not in db_vnfds
:
2528 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2531 self
.logger
.debug(logging_text
+ stage
[1])
2532 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2535 db_vnfds
.append(vnfd
)
2537 # Get or generates the _admin.deployed.VCA list
2538 vca_deployed_list
= None
2539 if db_nsr
["_admin"].get("deployed"):
2540 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2541 if vca_deployed_list
is None:
2542 vca_deployed_list
= []
2543 configuration_status_list
= []
2544 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2545 db_nsr_update
["configurationStatus"] = configuration_status_list
2546 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2547 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2548 elif isinstance(vca_deployed_list
, dict):
2549 # maintain backward compatibility. Change a dict to list at database
2550 vca_deployed_list
= list(vca_deployed_list
.values())
2551 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2552 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2555 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2557 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2558 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2560 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2561 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2562 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2564 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2567 # n2vc_redesign STEP 2 Deploy Network Scenario
2568 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2569 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2571 stage
[1] = "Deploying KDUs."
2572 # self.logger.debug(logging_text + "Before deploy_kdus")
2573 # Call to deploy_kdus in case exists the "vdu:kdu" param
2574 await self
.deploy_kdus(
2575 logging_text
=logging_text
,
2577 nslcmop_id
=nslcmop_id
,
2580 task_instantiation_info
=tasks_dict_info
,
2583 stage
[1] = "Getting VCA public key."
2584 # n2vc_redesign STEP 1 Get VCA public ssh-key
2585 # feature 1429. Add n2vc public key to needed VMs
2586 n2vc_key
= self
.n2vc
.get_public_key()
2587 n2vc_key_list
= [n2vc_key
]
2588 if self
.vca_config
.get("public_key"):
2589 n2vc_key_list
.append(self
.vca_config
["public_key"])
2591 stage
[1] = "Deploying NS at VIM."
2592 task_ro
= asyncio
.ensure_future(
2593 self
.instantiate_RO(
2594 logging_text
=logging_text
,
2598 db_nslcmop
=db_nslcmop
,
2601 n2vc_key_list
=n2vc_key_list
,
2605 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2606 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2608 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2609 stage
[1] = "Deploying Execution Environments."
2610 self
.logger
.debug(logging_text
+ stage
[1])
2612 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2613 for vnf_profile
in get_vnf_profiles(nsd
):
2614 vnfd_id
= vnf_profile
["vnfd-id"]
2615 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2616 member_vnf_index
= str(vnf_profile
["id"])
2617 db_vnfr
= db_vnfrs
[member_vnf_index
]
2618 base_folder
= vnfd
["_admin"]["storage"]
2624 # Get additional parameters
2625 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2626 if db_vnfr
.get("additionalParamsForVnf"):
2627 deploy_params
.update(
2628 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2631 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2632 if descriptor_config
:
2634 logging_text
=logging_text
2635 + "member_vnf_index={} ".format(member_vnf_index
),
2638 nslcmop_id
=nslcmop_id
,
2644 member_vnf_index
=member_vnf_index
,
2645 vdu_index
=vdu_index
,
2647 deploy_params
=deploy_params
,
2648 descriptor_config
=descriptor_config
,
2649 base_folder
=base_folder
,
2650 task_instantiation_info
=tasks_dict_info
,
2654 # Deploy charms for each VDU that supports one.
2655 for vdud
in get_vdu_list(vnfd
):
2657 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2658 vdur
= find_in_list(
2659 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2662 if vdur
.get("additionalParams"):
2663 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2665 deploy_params_vdu
= deploy_params
2666 deploy_params_vdu
["OSM"] = get_osm_params(
2667 db_vnfr
, vdu_id
, vdu_count_index
=0
2669 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2671 self
.logger
.debug("VDUD > {}".format(vdud
))
2673 "Descriptor config > {}".format(descriptor_config
)
2675 if descriptor_config
:
2678 for vdu_index
in range(vdud_count
):
2679 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2681 logging_text
=logging_text
2682 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2683 member_vnf_index
, vdu_id
, vdu_index
2687 nslcmop_id
=nslcmop_id
,
2693 member_vnf_index
=member_vnf_index
,
2694 vdu_index
=vdu_index
,
2696 deploy_params
=deploy_params_vdu
,
2697 descriptor_config
=descriptor_config
,
2698 base_folder
=base_folder
,
2699 task_instantiation_info
=tasks_dict_info
,
2702 for kdud
in get_kdu_list(vnfd
):
2703 kdu_name
= kdud
["name"]
2704 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2705 if descriptor_config
:
2710 x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
2712 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2713 if kdur
.get("additionalParams"):
2714 deploy_params_kdu
.update(
2715 parse_yaml_strings(kdur
["additionalParams"].copy())
2719 logging_text
=logging_text
,
2722 nslcmop_id
=nslcmop_id
,
2728 member_vnf_index
=member_vnf_index
,
2729 vdu_index
=vdu_index
,
2731 deploy_params
=deploy_params_kdu
,
2732 descriptor_config
=descriptor_config
,
2733 base_folder
=base_folder
,
2734 task_instantiation_info
=tasks_dict_info
,
2738 # Check if this NS has a charm configuration
2739 descriptor_config
= nsd
.get("ns-configuration")
2740 if descriptor_config
and descriptor_config
.get("juju"):
2743 member_vnf_index
= None
2749 # Get additional parameters
2750 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2751 if db_nsr
.get("additionalParamsForNs"):
2752 deploy_params
.update(
2753 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2755 base_folder
= nsd
["_admin"]["storage"]
2757 logging_text
=logging_text
,
2760 nslcmop_id
=nslcmop_id
,
2766 member_vnf_index
=member_vnf_index
,
2767 vdu_index
=vdu_index
,
2769 deploy_params
=deploy_params
,
2770 descriptor_config
=descriptor_config
,
2771 base_folder
=base_folder
,
2772 task_instantiation_info
=tasks_dict_info
,
2776 # rest of staff will be done at finally
2779 ROclient
.ROClientException
,
2785 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
2788 except asyncio
.CancelledError
:
2790 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
2792 exc
= "Operation was cancelled"
2793 except Exception as e
:
2794 exc
= traceback
.format_exc()
2795 self
.logger
.critical(
2796 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
2801 error_list
.append(str(exc
))
2803 # wait for pending tasks
2805 stage
[1] = "Waiting for instantiate pending tasks."
2806 self
.logger
.debug(logging_text
+ stage
[1])
2807 error_list
+= await self
._wait
_for
_tasks
(
2815 stage
[1] = stage
[2] = ""
2816 except asyncio
.CancelledError
:
2817 error_list
.append("Cancelled")
2818 # TODO cancel all tasks
2819 except Exception as exc
:
2820 error_list
.append(str(exc
))
2822 # update operation-status
2823 db_nsr_update
["operational-status"] = "running"
2824 # let's begin with VCA 'configured' status (later we can change it)
2825 db_nsr_update
["config-status"] = "configured"
2826 for task
, task_name
in tasks_dict_info
.items():
2827 if not task
.done() or task
.cancelled() or task
.exception():
2828 if task_name
.startswith(self
.task_name_deploy_vca
):
2829 # A N2VC task is pending
2830 db_nsr_update
["config-status"] = "failed"
2832 # RO or KDU task is pending
2833 db_nsr_update
["operational-status"] = "failed"
2835 # update status at database
2837 error_detail
= ". ".join(error_list
)
2838 self
.logger
.error(logging_text
+ error_detail
)
2839 error_description_nslcmop
= "{} Detail: {}".format(
2840 stage
[0], error_detail
2842 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
2843 nslcmop_id
, stage
[0]
2846 db_nsr_update
["detailed-status"] = (
2847 error_description_nsr
+ " Detail: " + error_detail
2849 db_nslcmop_update
["detailed-status"] = error_detail
2850 nslcmop_operation_state
= "FAILED"
2854 error_description_nsr
= error_description_nslcmop
= None
2856 db_nsr_update
["detailed-status"] = "Done"
2857 db_nslcmop_update
["detailed-status"] = "Done"
2858 nslcmop_operation_state
= "COMPLETED"
2861 self
._write
_ns
_status
(
2864 current_operation
="IDLE",
2865 current_operation_id
=None,
2866 error_description
=error_description_nsr
,
2867 error_detail
=error_detail
,
2868 other_update
=db_nsr_update
,
2870 self
._write
_op
_status
(
2873 error_message
=error_description_nslcmop
,
2874 operation_state
=nslcmop_operation_state
,
2875 other_update
=db_nslcmop_update
,
2878 if nslcmop_operation_state
:
2880 await self
.msg
.aiowrite(
2885 "nslcmop_id": nslcmop_id
,
2886 "operationState": nslcmop_operation_state
,
2890 except Exception as e
:
2892 logging_text
+ "kafka_write notification Exception {}".format(e
)
2895 self
.logger
.debug(logging_text
+ "Exit")
2896 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2898 def _get_vnfd(self
, vnfd_id
: str, cached_vnfds
: Dict
[str, Any
]):
2899 if vnfd_id
not in cached_vnfds
:
2900 cached_vnfds
[vnfd_id
] = self
.db
.get_one("vnfds", {"id": vnfd_id
})
2901 return cached_vnfds
[vnfd_id
]
2903 def _get_vnfr(self
, nsr_id
: str, vnf_profile_id
: str, cached_vnfrs
: Dict
[str, Any
]):
2904 if vnf_profile_id
not in cached_vnfrs
:
2905 cached_vnfrs
[vnf_profile_id
] = self
.db
.get_one(
2908 "member-vnf-index-ref": vnf_profile_id
,
2909 "nsr-id-ref": nsr_id
,
2912 return cached_vnfrs
[vnf_profile_id
]
2914 def _is_deployed_vca_in_relation(
2915 self
, vca
: DeployedVCA
, relation
: Relation
2918 for endpoint
in (relation
.provider
, relation
.requirer
):
2919 if endpoint
["kdu-resource-profile-id"]:
2922 vca
.vnf_profile_id
== endpoint
.vnf_profile_id
2923 and vca
.vdu_profile_id
== endpoint
.vdu_profile_id
2924 and vca
.execution_environment_ref
== endpoint
.execution_environment_ref
2930 def _update_ee_relation_data_with_implicit_data(
2931 self
, nsr_id
, nsd
, ee_relation_data
, cached_vnfds
, vnf_profile_id
: str = None
2933 ee_relation_data
= safe_get_ee_relation(
2934 nsr_id
, ee_relation_data
, vnf_profile_id
=vnf_profile_id
2936 ee_relation_level
= EELevel
.get_level(ee_relation_data
)
2937 if (ee_relation_level
in (EELevel
.VNF
, EELevel
.VDU
)) and not ee_relation_data
[
2938 "execution-environment-ref"
2940 vnf_profile
= get_vnf_profile(nsd
, ee_relation_data
["vnf-profile-id"])
2941 vnfd_id
= vnf_profile
["vnfd-id"]
2942 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2945 if ee_relation_level
== EELevel
.VNF
2946 else ee_relation_data
["vdu-profile-id"]
2948 ee
= get_juju_ee_ref(db_vnfd
, entity_id
)
2951 f
"not execution environments found for ee_relation {ee_relation_data}"
2953 ee_relation_data
["execution-environment-ref"] = ee
["id"]
2954 return ee_relation_data
2956 def _get_ns_relations(
2959 nsd
: Dict
[str, Any
],
2961 cached_vnfds
: Dict
[str, Any
],
2962 ) -> List
[Relation
]:
2964 db_ns_relations
= get_ns_configuration_relation_list(nsd
)
2965 for r
in db_ns_relations
:
2966 provider_dict
= None
2967 requirer_dict
= None
2968 if all(key
in r
for key
in ("provider", "requirer")):
2969 provider_dict
= r
["provider"]
2970 requirer_dict
= r
["requirer"]
2971 elif "entities" in r
:
2972 provider_id
= r
["entities"][0]["id"]
2975 "endpoint": r
["entities"][0]["endpoint"],
2977 if provider_id
!= nsd
["id"]:
2978 provider_dict
["vnf-profile-id"] = provider_id
2979 requirer_id
= r
["entities"][1]["id"]
2982 "endpoint": r
["entities"][1]["endpoint"],
2984 if requirer_id
!= nsd
["id"]:
2985 requirer_dict
["vnf-profile-id"] = requirer_id
2988 "provider/requirer or entities must be included in the relation."
2990 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2991 nsr_id
, nsd
, provider_dict
, cached_vnfds
2993 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2994 nsr_id
, nsd
, requirer_dict
, cached_vnfds
2996 provider
= EERelation(relation_provider
)
2997 requirer
= EERelation(relation_requirer
)
2998 relation
= Relation(r
["name"], provider
, requirer
)
2999 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
3001 relations
.append(relation
)
3004 def _get_vnf_relations(
3007 nsd
: Dict
[str, Any
],
3009 cached_vnfds
: Dict
[str, Any
],
3010 ) -> List
[Relation
]:
3012 vnf_profile
= get_vnf_profile(nsd
, vca
.vnf_profile_id
)
3013 vnf_profile_id
= vnf_profile
["id"]
3014 vnfd_id
= vnf_profile
["vnfd-id"]
3015 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
3016 db_vnf_relations
= get_relation_list(db_vnfd
, vnfd_id
)
3017 for r
in db_vnf_relations
:
3018 provider_dict
= None
3019 requirer_dict
= None
3020 if all(key
in r
for key
in ("provider", "requirer")):
3021 provider_dict
= r
["provider"]
3022 requirer_dict
= r
["requirer"]
3023 elif "entities" in r
:
3024 provider_id
= r
["entities"][0]["id"]
3027 "vnf-profile-id": vnf_profile_id
,
3028 "endpoint": r
["entities"][0]["endpoint"],
3030 if provider_id
!= vnfd_id
:
3031 provider_dict
["vdu-profile-id"] = provider_id
3032 requirer_id
= r
["entities"][1]["id"]
3035 "vnf-profile-id": vnf_profile_id
,
3036 "endpoint": r
["entities"][1]["endpoint"],
3038 if requirer_id
!= vnfd_id
:
3039 requirer_dict
["vdu-profile-id"] = requirer_id
3042 "provider/requirer or entities must be included in the relation."
3044 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3045 nsr_id
, nsd
, provider_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3047 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3048 nsr_id
, nsd
, requirer_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3050 provider
= EERelation(relation_provider
)
3051 requirer
= EERelation(relation_requirer
)
3052 relation
= Relation(r
["name"], provider
, requirer
)
3053 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
3055 relations
.append(relation
)
3058 def _get_kdu_resource_data(
3060 ee_relation
: EERelation
,
3061 db_nsr
: Dict
[str, Any
],
3062 cached_vnfds
: Dict
[str, Any
],
3063 ) -> DeployedK8sResource
:
3064 nsd
= get_nsd(db_nsr
)
3065 vnf_profiles
= get_vnf_profiles(nsd
)
3066 vnfd_id
= find_in_list(
3068 lambda vnf_profile
: vnf_profile
["id"] == ee_relation
.vnf_profile_id
,
3070 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
3071 kdu_resource_profile
= get_kdu_resource_profile(
3072 db_vnfd
, ee_relation
.kdu_resource_profile_id
3074 kdu_name
= kdu_resource_profile
["kdu-name"]
3075 deployed_kdu
, _
= get_deployed_kdu(
3076 db_nsr
.get("_admin", ()).get("deployed", ()),
3078 ee_relation
.vnf_profile_id
,
3080 deployed_kdu
.update({"resource-name": kdu_resource_profile
["resource-name"]})
3083 def _get_deployed_component(
3085 ee_relation
: EERelation
,
3086 db_nsr
: Dict
[str, Any
],
3087 cached_vnfds
: Dict
[str, Any
],
3088 ) -> DeployedComponent
:
3089 nsr_id
= db_nsr
["_id"]
3090 deployed_component
= None
3091 ee_level
= EELevel
.get_level(ee_relation
)
3092 if ee_level
== EELevel
.NS
:
3093 vca
= get_deployed_vca(db_nsr
, {"vdu_id": None, "member-vnf-index": None})
3095 deployed_component
= DeployedVCA(nsr_id
, vca
)
3096 elif ee_level
== EELevel
.VNF
:
3097 vca
= get_deployed_vca(
3101 "member-vnf-index": ee_relation
.vnf_profile_id
,
3102 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3106 deployed_component
= DeployedVCA(nsr_id
, vca
)
3107 elif ee_level
== EELevel
.VDU
:
3108 vca
= get_deployed_vca(
3111 "vdu_id": ee_relation
.vdu_profile_id
,
3112 "member-vnf-index": ee_relation
.vnf_profile_id
,
3113 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3117 deployed_component
= DeployedVCA(nsr_id
, vca
)
3118 elif ee_level
== EELevel
.KDU
:
3119 kdu_resource_data
= self
._get
_kdu
_resource
_data
(
3120 ee_relation
, db_nsr
, cached_vnfds
3122 if kdu_resource_data
:
3123 deployed_component
= DeployedK8sResource(kdu_resource_data
)
3124 return deployed_component
3126 async def _add_relation(
3130 db_nsr
: Dict
[str, Any
],
3131 cached_vnfds
: Dict
[str, Any
],
3132 cached_vnfrs
: Dict
[str, Any
],
3134 deployed_provider
= self
._get
_deployed
_component
(
3135 relation
.provider
, db_nsr
, cached_vnfds
3137 deployed_requirer
= self
._get
_deployed
_component
(
3138 relation
.requirer
, db_nsr
, cached_vnfds
3142 and deployed_requirer
3143 and deployed_provider
.config_sw_installed
3144 and deployed_requirer
.config_sw_installed
3146 provider_db_vnfr
= (
3148 relation
.provider
.nsr_id
,
3149 relation
.provider
.vnf_profile_id
,
3152 if relation
.provider
.vnf_profile_id
3155 requirer_db_vnfr
= (
3157 relation
.requirer
.nsr_id
,
3158 relation
.requirer
.vnf_profile_id
,
3161 if relation
.requirer
.vnf_profile_id
3164 provider_vca_id
= self
.get_vca_id(provider_db_vnfr
, db_nsr
)
3165 requirer_vca_id
= self
.get_vca_id(requirer_db_vnfr
, db_nsr
)
3166 provider_relation_endpoint
= RelationEndpoint(
3167 deployed_provider
.ee_id
,
3169 relation
.provider
.endpoint
,
3171 requirer_relation_endpoint
= RelationEndpoint(
3172 deployed_requirer
.ee_id
,
3174 relation
.requirer
.endpoint
,
3176 await self
.vca_map
[vca_type
].add_relation(
3177 provider
=provider_relation_endpoint
,
3178 requirer
=requirer_relation_endpoint
,
3180 # remove entry from relations list
3184 async def _add_vca_relations(
3190 timeout
: int = 3600,
3194 # 1. find all relations for this VCA
3195 # 2. wait for other peers related
3199 # STEP 1: find all relations for this VCA
3202 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3203 nsd
= get_nsd(db_nsr
)
3206 deployed_vca_dict
= get_deployed_vca_list(db_nsr
)[vca_index
]
3207 my_vca
= DeployedVCA(nsr_id
, deployed_vca_dict
)
3212 relations
.extend(self
._get
_ns
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3213 relations
.extend(self
._get
_vnf
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3215 # if no relations, terminate
3217 self
.logger
.debug(logging_text
+ " No relations")
3220 self
.logger
.debug(logging_text
+ " adding relations {}".format(relations
))
3227 if now
- start
>= timeout
:
3228 self
.logger
.error(logging_text
+ " : timeout adding relations")
3231 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3232 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3234 # for each relation, find the VCA's related
3235 for relation
in relations
.copy():
3236 added
= await self
._add
_relation
(
3244 relations
.remove(relation
)
3247 self
.logger
.debug("Relations added")
3249 await asyncio
.sleep(5.0)
3253 except Exception as e
:
3254 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
3257 async def _install_kdu(
3265 k8s_instance_info
: dict,
3266 k8params
: dict = None,
3272 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
3275 "collection": "nsrs",
3276 "filter": {"_id": nsr_id
},
3277 "path": nsr_db_path
,
3280 if k8s_instance_info
.get("kdu-deployment-name"):
3281 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
3283 kdu_instance
= self
.k8scluster_map
[
3285 ].generate_kdu_instance_name(
3286 db_dict
=db_dict_install
,
3287 kdu_model
=k8s_instance_info
["kdu-model"],
3288 kdu_name
=k8s_instance_info
["kdu-name"],
3291 # Update the nsrs table with the kdu-instance value
3295 _desc
={nsr_db_path
+ ".kdu-instance": kdu_instance
},
3298 # Update the nsrs table with the actual namespace being used, if the k8scluster-type is `juju` or
3299 # `juju-bundle`. This verification is needed because there is not a standard/homogeneous namespace
3300 # between the Helm Charts and Juju Bundles-based KNFs. If we found a way of having an homogeneous
3301 # namespace, this first verification could be removed, and the next step would be done for any kind
3303 # TODO -> find a way to have an homogeneous namespace between the Helm Charts and Juju Bundles-based
3304 # KNFs (Bug 2027: https://osm.etsi.org/bugzilla/show_bug.cgi?id=2027)
3305 if k8sclustertype
in ("juju", "juju-bundle"):
3306 # First, verify if the current namespace is present in the `_admin.projects_read` (if not, it means
3307 # that the user passed a namespace which he wants its KDU to be deployed in)
3313 "_admin.projects_write": k8s_instance_info
["namespace"],
3314 "_admin.projects_read": k8s_instance_info
["namespace"],
3320 f
"Updating namespace/model for Juju Bundle from {k8s_instance_info['namespace']} to {kdu_instance}"
3325 _desc
={f
"{nsr_db_path}.namespace": kdu_instance
},
3327 k8s_instance_info
["namespace"] = kdu_instance
3329 await self
.k8scluster_map
[k8sclustertype
].install(
3330 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3331 kdu_model
=k8s_instance_info
["kdu-model"],
3334 db_dict
=db_dict_install
,
3336 kdu_name
=k8s_instance_info
["kdu-name"],
3337 namespace
=k8s_instance_info
["namespace"],
3338 kdu_instance
=kdu_instance
,
3342 # Obtain services to obtain management service ip
3343 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
3344 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3345 kdu_instance
=kdu_instance
,
3346 namespace
=k8s_instance_info
["namespace"],
3349 # Obtain management service info (if exists)
3350 vnfr_update_dict
= {}
3351 kdu_config
= get_configuration(vnfd
, kdud
["name"])
3353 target_ee_list
= kdu_config
.get("execution-environment-list", [])
3358 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
3361 for service
in kdud
.get("service", [])
3362 if service
.get("mgmt-service")
3364 for mgmt_service
in mgmt_services
:
3365 for service
in services
:
3366 if service
["name"].startswith(mgmt_service
["name"]):
3367 # Mgmt service found, Obtain service ip
3368 ip
= service
.get("external_ip", service
.get("cluster_ip"))
3369 if isinstance(ip
, list) and len(ip
) == 1:
3373 "kdur.{}.ip-address".format(kdu_index
)
3376 # Check if must update also mgmt ip at the vnf
3377 service_external_cp
= mgmt_service
.get(
3378 "external-connection-point-ref"
3380 if service_external_cp
:
3382 deep_get(vnfd
, ("mgmt-interface", "cp"))
3383 == service_external_cp
3385 vnfr_update_dict
["ip-address"] = ip
3390 "external-connection-point-ref", ""
3392 == service_external_cp
,
3395 "kdur.{}.ip-address".format(kdu_index
)
3400 "Mgmt service name: {} not found".format(
3401 mgmt_service
["name"]
3405 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3406 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3408 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3411 and kdu_config
.get("initial-config-primitive")
3412 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3414 initial_config_primitive_list
= kdu_config
.get(
3415 "initial-config-primitive"
3417 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3419 for initial_config_primitive
in initial_config_primitive_list
:
3420 primitive_params_
= self
._map
_primitive
_params
(
3421 initial_config_primitive
, {}, {}
3424 await asyncio
.wait_for(
3425 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3426 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3427 kdu_instance
=kdu_instance
,
3428 primitive_name
=initial_config_primitive
["name"],
3429 params
=primitive_params_
,
3430 db_dict
=db_dict_install
,
3436 except Exception as e
:
3437 # Prepare update db with error and raise exception
3440 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3444 vnfr_data
.get("_id"),
3445 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3448 # ignore to keep original exception
3450 # reraise original error
3455 async def deploy_kdus(
3462 task_instantiation_info
,
3464 # Launch kdus if present in the descriptor
3466 k8scluster_id_2_uuic
= {
3467 "helm-chart-v3": {},
3472 async def _get_cluster_id(cluster_id
, cluster_type
):
3473 nonlocal k8scluster_id_2_uuic
3474 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3475 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3477 # check if K8scluster is creating and wait look if previous tasks in process
3478 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3479 "k8scluster", cluster_id
3482 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3483 task_name
, cluster_id
3485 self
.logger
.debug(logging_text
+ text
)
3486 await asyncio
.wait(task_dependency
, timeout
=3600)
3488 db_k8scluster
= self
.db
.get_one(
3489 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3491 if not db_k8scluster
:
3492 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3494 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3496 if cluster_type
== "helm-chart-v3":
3498 # backward compatibility for existing clusters that have not been initialized for helm v3
3499 k8s_credentials
= yaml
.safe_dump(
3500 db_k8scluster
.get("credentials")
3502 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3503 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3505 db_k8scluster_update
= {}
3506 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3507 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3508 db_k8scluster_update
[
3509 "_admin.helm-chart-v3.created"
3511 db_k8scluster_update
[
3512 "_admin.helm-chart-v3.operationalState"
3515 "k8sclusters", cluster_id
, db_k8scluster_update
3517 except Exception as e
:
3520 + "error initializing helm-v3 cluster: {}".format(str(e
))
3523 "K8s cluster '{}' has not been initialized for '{}'".format(
3524 cluster_id
, cluster_type
3529 "K8s cluster '{}' has not been initialized for '{}'".format(
3530 cluster_id
, cluster_type
3533 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3536 logging_text
+= "Deploy kdus: "
3539 db_nsr_update
= {"_admin.deployed.K8s": []}
3540 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3543 updated_cluster_list
= []
3544 updated_v3_cluster_list
= []
3546 for vnfr_data
in db_vnfrs
.values():
3547 vca_id
= self
.get_vca_id(vnfr_data
, {})
3548 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3549 # Step 0: Prepare and set parameters
3550 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3551 vnfd_id
= vnfr_data
.get("vnfd-id")
3552 vnfd_with_id
= find_in_list(
3553 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3557 for kdud
in vnfd_with_id
["kdu"]
3558 if kdud
["name"] == kdur
["kdu-name"]
3560 namespace
= kdur
.get("k8s-namespace")
3561 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3562 if kdur
.get("helm-chart"):
3563 kdumodel
= kdur
["helm-chart"]
3564 # Default version: helm3, if helm-version is v2 assign v2
3565 k8sclustertype
= "helm-chart-v3"
3566 self
.logger
.debug("kdur: {}".format(kdur
))
3568 kdur
.get("helm-version")
3569 and kdur
.get("helm-version") == "v2"
3571 k8sclustertype
= "helm-chart"
3572 elif kdur
.get("juju-bundle"):
3573 kdumodel
= kdur
["juju-bundle"]
3574 k8sclustertype
= "juju-bundle"
3577 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3578 "juju-bundle. Maybe an old NBI version is running".format(
3579 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3582 # check if kdumodel is a file and exists
3584 vnfd_with_id
= find_in_list(
3585 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3587 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3588 if storage
: # may be not present if vnfd has not artifacts
3589 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3590 if storage
["pkg-dir"]:
3591 filename
= "{}/{}/{}s/{}".format(
3598 filename
= "{}/Scripts/{}s/{}".format(
3603 if self
.fs
.file_exists(
3604 filename
, mode
="file"
3605 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3606 kdumodel
= self
.fs
.path
+ filename
3607 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3609 except Exception: # it is not a file
3612 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3613 step
= "Synchronize repos for k8s cluster '{}'".format(
3616 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3620 k8sclustertype
== "helm-chart"
3621 and cluster_uuid
not in updated_cluster_list
3623 k8sclustertype
== "helm-chart-v3"
3624 and cluster_uuid
not in updated_v3_cluster_list
3626 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3627 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3628 cluster_uuid
=cluster_uuid
3631 if del_repo_list
or added_repo_dict
:
3632 if k8sclustertype
== "helm-chart":
3634 "_admin.helm_charts_added." + item
: None
3635 for item
in del_repo_list
3638 "_admin.helm_charts_added." + item
: name
3639 for item
, name
in added_repo_dict
.items()
3641 updated_cluster_list
.append(cluster_uuid
)
3642 elif k8sclustertype
== "helm-chart-v3":
3644 "_admin.helm_charts_v3_added." + item
: None
3645 for item
in del_repo_list
3648 "_admin.helm_charts_v3_added." + item
: name
3649 for item
, name
in added_repo_dict
.items()
3651 updated_v3_cluster_list
.append(cluster_uuid
)
3653 logging_text
+ "repos synchronized on k8s cluster "
3654 "'{}' to_delete: {}, to_add: {}".format(
3655 k8s_cluster_id
, del_repo_list
, added_repo_dict
3660 {"_id": k8s_cluster_id
},
3666 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3667 vnfr_data
["member-vnf-index-ref"],
3671 k8s_instance_info
= {
3672 "kdu-instance": None,
3673 "k8scluster-uuid": cluster_uuid
,
3674 "k8scluster-type": k8sclustertype
,
3675 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3676 "kdu-name": kdur
["kdu-name"],
3677 "kdu-model": kdumodel
,
3678 "namespace": namespace
,
3679 "kdu-deployment-name": kdu_deployment_name
,
3681 db_path
= "_admin.deployed.K8s.{}".format(index
)
3682 db_nsr_update
[db_path
] = k8s_instance_info
3683 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3684 vnfd_with_id
= find_in_list(
3685 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3687 task
= asyncio
.ensure_future(
3696 k8params
=desc_params
,
3701 self
.lcm_tasks
.register(
3705 "instantiate_KDU-{}".format(index
),
3708 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3714 except (LcmException
, asyncio
.CancelledError
):
3716 except Exception as e
:
3717 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3718 if isinstance(e
, (N2VCException
, DbException
)):
3719 self
.logger
.error(logging_text
+ msg
)
3721 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3722 raise LcmException(msg
)
3725 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3744 task_instantiation_info
,
3747 # launch instantiate_N2VC in a asyncio task and register task object
3748 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3749 # if not found, create one entry and update database
3750 # fill db_nsr._admin.deployed.VCA.<index>
3753 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
3757 get_charm_name
= False
3758 if "execution-environment-list" in descriptor_config
:
3759 ee_list
= descriptor_config
.get("execution-environment-list", [])
3760 elif "juju" in descriptor_config
:
3761 ee_list
= [descriptor_config
] # ns charms
3762 if "execution-environment-list" not in descriptor_config
:
3763 # charm name is only required for ns charms
3764 get_charm_name
= True
3765 else: # other types as script are not supported
3768 for ee_item
in ee_list
:
3771 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3772 ee_item
.get("juju"), ee_item
.get("helm-chart")
3775 ee_descriptor_id
= ee_item
.get("id")
3776 if ee_item
.get("juju"):
3777 vca_name
= ee_item
["juju"].get("charm")
3779 charm_name
= self
.find_charm_name(db_nsr
, str(vca_name
))
3782 if ee_item
["juju"].get("charm") is not None
3785 if ee_item
["juju"].get("cloud") == "k8s":
3786 vca_type
= "k8s_proxy_charm"
3787 elif ee_item
["juju"].get("proxy") is False:
3788 vca_type
= "native_charm"
3789 elif ee_item
.get("helm-chart"):
3790 vca_name
= ee_item
["helm-chart"]
3791 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
3794 vca_type
= "helm-v3"
3797 logging_text
+ "skipping non juju neither charm configuration"
3802 for vca_index
, vca_deployed
in enumerate(
3803 db_nsr
["_admin"]["deployed"]["VCA"]
3805 if not vca_deployed
:
3808 vca_deployed
.get("member-vnf-index") == member_vnf_index
3809 and vca_deployed
.get("vdu_id") == vdu_id
3810 and vca_deployed
.get("kdu_name") == kdu_name
3811 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
3812 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
3816 # not found, create one.
3818 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
3821 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
3823 target
+= "/kdu/{}".format(kdu_name
)
3825 "target_element": target
,
3826 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3827 "member-vnf-index": member_vnf_index
,
3829 "kdu_name": kdu_name
,
3830 "vdu_count_index": vdu_index
,
3831 "operational-status": "init", # TODO revise
3832 "detailed-status": "", # TODO revise
3833 "step": "initial-deploy", # TODO revise
3835 "vdu_name": vdu_name
,
3837 "ee_descriptor_id": ee_descriptor_id
,
3838 "charm_name": charm_name
,
3842 # create VCA and configurationStatus in db
3844 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
3845 "configurationStatus.{}".format(vca_index
): dict(),
3847 self
.update_db_2("nsrs", nsr_id
, db_dict
)
3849 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
3851 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
3852 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
3853 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
3856 task_n2vc
= asyncio
.ensure_future(
3857 self
.instantiate_N2VC(
3858 logging_text
=logging_text
,
3859 vca_index
=vca_index
,
3865 vdu_index
=vdu_index
,
3866 deploy_params
=deploy_params
,
3867 config_descriptor
=descriptor_config
,
3868 base_folder
=base_folder
,
3869 nslcmop_id
=nslcmop_id
,
3873 ee_config_descriptor
=ee_item
,
3876 self
.lcm_tasks
.register(
3880 "instantiate_N2VC-{}".format(vca_index
),
3883 task_instantiation_info
[
3885 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
3886 member_vnf_index
or "", vdu_id
or ""
3890 def _create_nslcmop(nsr_id
, operation
, params
):
3892 Creates a ns-lcm-opp content to be stored at database.
3893 :param nsr_id: internal id of the instance
3894 :param operation: instantiate, terminate, scale, action, ...
3895 :param params: user parameters for the operation
3896 :return: dictionary following SOL005 format
3898 # Raise exception if invalid arguments
3899 if not (nsr_id
and operation
and params
):
3901 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3908 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3909 "operationState": "PROCESSING",
3910 "statusEnteredTime": now
,
3911 "nsInstanceId": nsr_id
,
3912 "lcmOperationType": operation
,
3914 "isAutomaticInvocation": False,
3915 "operationParams": params
,
3916 "isCancelPending": False,
3918 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
3919 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
3924 def _format_additional_params(self
, params
):
3925 params
= params
or {}
3926 for key
, value
in params
.items():
3927 if str(value
).startswith("!!yaml "):
3928 params
[key
] = yaml
.safe_load(value
[7:])
3931 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
3932 primitive
= seq
.get("name")
3933 primitive_params
= {}
3935 "member_vnf_index": vnf_index
,
3936 "primitive": primitive
,
3937 "primitive_params": primitive_params
,
3940 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
3944 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
3945 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
3946 if op
.get("operationState") == "COMPLETED":
3947 # b. Skip sub-operation
3948 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3949 return self
.SUBOPERATION_STATUS_SKIP
3951 # c. retry executing sub-operation
3952 # The sub-operation exists, and operationState != 'COMPLETED'
3953 # Update operationState = 'PROCESSING' to indicate a retry.
3954 operationState
= "PROCESSING"
3955 detailed_status
= "In progress"
3956 self
._update
_suboperation
_status
(
3957 db_nslcmop
, op_index
, operationState
, detailed_status
3959 # Return the sub-operation index
3960 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3961 # with arguments extracted from the sub-operation
3964 # Find a sub-operation where all keys in a matching dictionary must match
3965 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3966 def _find_suboperation(self
, db_nslcmop
, match
):
3967 if db_nslcmop
and match
:
3968 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
3969 for i
, op
in enumerate(op_list
):
3970 if all(op
.get(k
) == match
[k
] for k
in match
):
3972 return self
.SUBOPERATION_STATUS_NOT_FOUND
3974 # Update status for a sub-operation given its index
3975 def _update_suboperation_status(
3976 self
, db_nslcmop
, op_index
, operationState
, detailed_status
3978 # Update DB for HA tasks
3979 q_filter
= {"_id": db_nslcmop
["_id"]}
3981 "_admin.operations.{}.operationState".format(op_index
): operationState
,
3982 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
3985 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
3988 # Add sub-operation, return the index of the added sub-operation
3989 # Optionally, set operationState, detailed-status, and operationType
3990 # Status and type are currently set for 'scale' sub-operations:
3991 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3992 # 'detailed-status' : status message
3993 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3994 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3995 def _add_suboperation(
4003 mapped_primitive_params
,
4004 operationState
=None,
4005 detailed_status
=None,
4008 RO_scaling_info
=None,
4011 return self
.SUBOPERATION_STATUS_NOT_FOUND
4012 # Get the "_admin.operations" list, if it exists
4013 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
4014 op_list
= db_nslcmop_admin
.get("operations")
4015 # Create or append to the "_admin.operations" list
4017 "member_vnf_index": vnf_index
,
4019 "vdu_count_index": vdu_count_index
,
4020 "primitive": primitive
,
4021 "primitive_params": mapped_primitive_params
,
4024 new_op
["operationState"] = operationState
4026 new_op
["detailed-status"] = detailed_status
4028 new_op
["lcmOperationType"] = operationType
4030 new_op
["RO_nsr_id"] = RO_nsr_id
4032 new_op
["RO_scaling_info"] = RO_scaling_info
4034 # No existing operations, create key 'operations' with current operation as first list element
4035 db_nslcmop_admin
.update({"operations": [new_op
]})
4036 op_list
= db_nslcmop_admin
.get("operations")
4038 # Existing operations, append operation to list
4039 op_list
.append(new_op
)
4041 db_nslcmop_update
= {"_admin.operations": op_list
}
4042 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
4043 op_index
= len(op_list
) - 1
4046 # Helper methods for scale() sub-operations
4048 # pre-scale/post-scale:
4049 # Check for 3 different cases:
4050 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
4051 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
4052 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
4053 def _check_or_add_scale_suboperation(
4057 vnf_config_primitive
,
4061 RO_scaling_info
=None,
4063 # Find this sub-operation
4064 if RO_nsr_id
and RO_scaling_info
:
4065 operationType
= "SCALE-RO"
4067 "member_vnf_index": vnf_index
,
4068 "RO_nsr_id": RO_nsr_id
,
4069 "RO_scaling_info": RO_scaling_info
,
4073 "member_vnf_index": vnf_index
,
4074 "primitive": vnf_config_primitive
,
4075 "primitive_params": primitive_params
,
4076 "lcmOperationType": operationType
,
4078 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
4079 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
4080 # a. New sub-operation
4081 # The sub-operation does not exist, add it.
4082 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
4083 # The following parameters are set to None for all kind of scaling:
4085 vdu_count_index
= None
4087 if RO_nsr_id
and RO_scaling_info
:
4088 vnf_config_primitive
= None
4089 primitive_params
= None
4092 RO_scaling_info
= None
4093 # Initial status for sub-operation
4094 operationState
= "PROCESSING"
4095 detailed_status
= "In progress"
4096 # Add sub-operation for pre/post-scaling (zero or more operations)
4097 self
._add
_suboperation
(
4103 vnf_config_primitive
,
4111 return self
.SUBOPERATION_STATUS_NEW
4113 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
4114 # or op_index (operationState != 'COMPLETED')
4115 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
4117 # Function to return execution_environment id
4119 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
4120 # TODO vdu_index_count
4121 for vca
in vca_deployed_list
:
4122 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
4125 async def destroy_N2VC(
4133 exec_primitives
=True,
4138 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
4139 :param logging_text:
4141 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
4142 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
4143 :param vca_index: index in the database _admin.deployed.VCA
4144 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
4145 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
4146 not executed properly
4147 :param scaling_in: True destroys the application, False destroys the model
4148 :return: None or exception
4153 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
4154 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
4158 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
4160 # execute terminate_primitives
4162 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
4163 config_descriptor
.get("terminate-config-primitive"),
4164 vca_deployed
.get("ee_descriptor_id"),
4166 vdu_id
= vca_deployed
.get("vdu_id")
4167 vdu_count_index
= vca_deployed
.get("vdu_count_index")
4168 vdu_name
= vca_deployed
.get("vdu_name")
4169 vnf_index
= vca_deployed
.get("member-vnf-index")
4170 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
4171 for seq
in terminate_primitives
:
4172 # For each sequence in list, get primitive and call _ns_execute_primitive()
4173 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
4174 vnf_index
, seq
.get("name")
4176 self
.logger
.debug(logging_text
+ step
)
4177 # Create the primitive for each sequence, i.e. "primitive": "touch"
4178 primitive
= seq
.get("name")
4179 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
4184 self
._add
_suboperation
(
4191 mapped_primitive_params
,
4193 # Sub-operations: Call _ns_execute_primitive() instead of action()
4195 result
, result_detail
= await self
._ns
_execute
_primitive
(
4196 vca_deployed
["ee_id"],
4198 mapped_primitive_params
,
4202 except LcmException
:
4203 # this happens when VCA is not deployed. In this case it is not needed to terminate
4205 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
4206 if result
not in result_ok
:
4208 "terminate_primitive {} for vnf_member_index={} fails with "
4209 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
4211 # set that this VCA do not need terminated
4212 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
4216 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
4219 # Delete Prometheus Jobs if any
4220 # This uses NSR_ID, so it will destroy any jobs under this index
4221 self
.db
.del_list("prometheus_jobs", {"nsr_id": db_nslcmop
["nsInstanceId"]})
4224 await self
.vca_map
[vca_type
].delete_execution_environment(
4225 vca_deployed
["ee_id"],
4226 scaling_in
=scaling_in
,
4231 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
4232 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
4233 namespace
= "." + db_nsr
["_id"]
4235 await self
.n2vc
.delete_namespace(
4236 namespace
=namespace
,
4237 total_timeout
=self
.timeout_charm_delete
,
4240 except N2VCNotFound
: # already deleted. Skip
4242 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
4244 async def _terminate_RO(
4245 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4248 Terminates a deployment from RO
4249 :param logging_text:
4250 :param nsr_deployed: db_nsr._admin.deployed
4253 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
4254 this method will update only the index 2, but it will write on database the concatenated content of the list
4259 ro_nsr_id
= ro_delete_action
= None
4260 if nsr_deployed
and nsr_deployed
.get("RO"):
4261 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
4262 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
4265 stage
[2] = "Deleting ns from VIM."
4266 db_nsr_update
["detailed-status"] = " ".join(stage
)
4267 self
._write
_op
_status
(nslcmop_id
, stage
)
4268 self
.logger
.debug(logging_text
+ stage
[2])
4269 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4270 self
._write
_op
_status
(nslcmop_id
, stage
)
4271 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
4272 ro_delete_action
= desc
["action_id"]
4274 "_admin.deployed.RO.nsr_delete_action_id"
4275 ] = ro_delete_action
4276 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4277 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4278 if ro_delete_action
:
4279 # wait until NS is deleted from VIM
4280 stage
[2] = "Waiting ns deleted from VIM."
4281 detailed_status_old
= None
4285 + " RO_id={} ro_delete_action={}".format(
4286 ro_nsr_id
, ro_delete_action
4289 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4290 self
._write
_op
_status
(nslcmop_id
, stage
)
4292 delete_timeout
= 20 * 60 # 20 minutes
4293 while delete_timeout
> 0:
4294 desc
= await self
.RO
.show(
4296 item_id_name
=ro_nsr_id
,
4297 extra_item
="action",
4298 extra_item_id
=ro_delete_action
,
4302 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
4304 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
4305 if ns_status
== "ERROR":
4306 raise ROclient
.ROClientException(ns_status_info
)
4307 elif ns_status
== "BUILD":
4308 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
4309 elif ns_status
== "ACTIVE":
4310 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4311 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4316 ), "ROclient.check_action_status returns unknown {}".format(
4319 if stage
[2] != detailed_status_old
:
4320 detailed_status_old
= stage
[2]
4321 db_nsr_update
["detailed-status"] = " ".join(stage
)
4322 self
._write
_op
_status
(nslcmop_id
, stage
)
4323 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4324 await asyncio
.sleep(5, loop
=self
.loop
)
4326 else: # delete_timeout <= 0:
4327 raise ROclient
.ROClientException(
4328 "Timeout waiting ns deleted from VIM"
4331 except Exception as e
:
4332 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4334 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4336 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4337 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4338 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4340 logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
)
4343 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4345 failed_detail
.append("delete conflict: {}".format(e
))
4348 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
)
4351 failed_detail
.append("delete error: {}".format(e
))
4353 logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
)
4357 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
4358 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
4360 stage
[2] = "Deleting nsd from RO."
4361 db_nsr_update
["detailed-status"] = " ".join(stage
)
4362 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4363 self
._write
_op
_status
(nslcmop_id
, stage
)
4364 await self
.RO
.delete("nsd", ro_nsd_id
)
4366 logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
)
4368 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4369 except Exception as e
:
4371 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4373 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4375 logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
)
4378 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4380 failed_detail
.append(
4381 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
)
4383 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4385 failed_detail
.append(
4386 "ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
)
4388 self
.logger
.error(logging_text
+ failed_detail
[-1])
4390 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
4391 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
4392 if not vnf_deployed
or not vnf_deployed
["id"]:
4395 ro_vnfd_id
= vnf_deployed
["id"]
4398 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4399 vnf_deployed
["member-vnf-index"], ro_vnfd_id
4401 db_nsr_update
["detailed-status"] = " ".join(stage
)
4402 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4403 self
._write
_op
_status
(nslcmop_id
, stage
)
4404 await self
.RO
.delete("vnfd", ro_vnfd_id
)
4406 logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
)
4408 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
4409 except Exception as e
:
4411 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4414 "_admin.deployed.RO.vnfd.{}.id".format(index
)
4418 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
)
4421 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4423 failed_detail
.append(
4424 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
)
4426 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4428 failed_detail
.append(
4429 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
)
4431 self
.logger
.error(logging_text
+ failed_detail
[-1])
4434 stage
[2] = "Error deleting from VIM"
4436 stage
[2] = "Deleted from VIM"
4437 db_nsr_update
["detailed-status"] = " ".join(stage
)
4438 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4439 self
._write
_op
_status
(nslcmop_id
, stage
)
4442 raise LcmException("; ".join(failed_detail
))
4444 async def terminate(self
, nsr_id
, nslcmop_id
):
4445 # Try to lock HA task here
4446 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4447 if not task_is_locked_by_me
:
4450 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4451 self
.logger
.debug(logging_text
+ "Enter")
4452 timeout_ns_terminate
= self
.timeout_ns_terminate
4455 operation_params
= None
4457 error_list
= [] # annotates all failed error messages
4458 db_nslcmop_update
= {}
4459 autoremove
= False # autoremove after terminated
4460 tasks_dict_info
= {}
4463 "Stage 1/3: Preparing task.",
4464 "Waiting for previous operations to terminate.",
4467 # ^ contains [stage, step, VIM-status]
4469 # wait for any previous tasks in process
4470 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4472 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4473 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4474 operation_params
= db_nslcmop
.get("operationParams") or {}
4475 if operation_params
.get("timeout_ns_terminate"):
4476 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4477 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4478 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4480 db_nsr_update
["operational-status"] = "terminating"
4481 db_nsr_update
["config-status"] = "terminating"
4482 self
._write
_ns
_status
(
4484 ns_state
="TERMINATING",
4485 current_operation
="TERMINATING",
4486 current_operation_id
=nslcmop_id
,
4487 other_update
=db_nsr_update
,
4489 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4490 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4491 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4494 stage
[1] = "Getting vnf descriptors from db."
4495 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4497 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4499 db_vnfds_from_id
= {}
4500 db_vnfds_from_member_index
= {}
4502 for vnfr
in db_vnfrs_list
:
4503 vnfd_id
= vnfr
["vnfd-id"]
4504 if vnfd_id
not in db_vnfds_from_id
:
4505 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4506 db_vnfds_from_id
[vnfd_id
] = vnfd
4507 db_vnfds_from_member_index
[
4508 vnfr
["member-vnf-index-ref"]
4509 ] = db_vnfds_from_id
[vnfd_id
]
4511 # Destroy individual execution environments when there are terminating primitives.
4512 # Rest of EE will be deleted at once
4513 # TODO - check before calling _destroy_N2VC
4514 # if not operation_params.get("skip_terminate_primitives"):#
4515 # or not vca.get("needed_terminate"):
4516 stage
[0] = "Stage 2/3 execute terminating primitives."
4517 self
.logger
.debug(logging_text
+ stage
[0])
4518 stage
[1] = "Looking execution environment that needs terminate."
4519 self
.logger
.debug(logging_text
+ stage
[1])
4521 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4522 config_descriptor
= None
4523 vca_member_vnf_index
= vca
.get("member-vnf-index")
4524 vca_id
= self
.get_vca_id(
4525 db_vnfrs_dict
.get(vca_member_vnf_index
)
4526 if vca_member_vnf_index
4530 if not vca
or not vca
.get("ee_id"):
4532 if not vca
.get("member-vnf-index"):
4534 config_descriptor
= db_nsr
.get("ns-configuration")
4535 elif vca
.get("vdu_id"):
4536 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4537 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4538 elif vca
.get("kdu_name"):
4539 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4540 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4542 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4543 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4544 vca_type
= vca
.get("type")
4545 exec_terminate_primitives
= not operation_params
.get(
4546 "skip_terminate_primitives"
4547 ) and vca
.get("needed_terminate")
4548 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4549 # pending native charms
4551 True if vca_type
in ("helm", "helm-v3", "native_charm") else False
4553 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4554 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4555 task
= asyncio
.ensure_future(
4563 exec_terminate_primitives
,
4567 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4569 # wait for pending tasks of terminate primitives
4573 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4575 error_list
= await self
._wait
_for
_tasks
(
4578 min(self
.timeout_charm_delete
, timeout_ns_terminate
),
4582 tasks_dict_info
.clear()
4584 return # raise LcmException("; ".join(error_list))
4586 # remove All execution environments at once
4587 stage
[0] = "Stage 3/3 delete all."
4589 if nsr_deployed
.get("VCA"):
4590 stage
[1] = "Deleting all execution environments."
4591 self
.logger
.debug(logging_text
+ stage
[1])
4592 vca_id
= self
.get_vca_id({}, db_nsr
)
4593 task_delete_ee
= asyncio
.ensure_future(
4595 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4596 timeout
=self
.timeout_charm_delete
,
4599 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4600 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4602 # Delete from k8scluster
4603 stage
[1] = "Deleting KDUs."
4604 self
.logger
.debug(logging_text
+ stage
[1])
4605 # print(nsr_deployed)
4606 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4607 if not kdu
or not kdu
.get("kdu-instance"):
4609 kdu_instance
= kdu
.get("kdu-instance")
4610 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4611 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4612 vca_id
= self
.get_vca_id({}, db_nsr
)
4613 task_delete_kdu_instance
= asyncio
.ensure_future(
4614 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4615 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4616 kdu_instance
=kdu_instance
,
4618 namespace
=kdu
.get("namespace"),
4624 + "Unknown k8s deployment type {}".format(
4625 kdu
.get("k8scluster-type")
4630 task_delete_kdu_instance
4631 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4634 stage
[1] = "Deleting ns from VIM."
4636 task_delete_ro
= asyncio
.ensure_future(
4637 self
._terminate
_ng
_ro
(
4638 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4642 task_delete_ro
= asyncio
.ensure_future(
4644 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4647 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4649 # rest of staff will be done at finally
4652 ROclient
.ROClientException
,
4657 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4659 except asyncio
.CancelledError
:
4661 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4663 exc
= "Operation was cancelled"
4664 except Exception as e
:
4665 exc
= traceback
.format_exc()
4666 self
.logger
.critical(
4667 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4672 error_list
.append(str(exc
))
4674 # wait for pending tasks
4676 stage
[1] = "Waiting for terminate pending tasks."
4677 self
.logger
.debug(logging_text
+ stage
[1])
4678 error_list
+= await self
._wait
_for
_tasks
(
4681 timeout_ns_terminate
,
4685 stage
[1] = stage
[2] = ""
4686 except asyncio
.CancelledError
:
4687 error_list
.append("Cancelled")
4688 # TODO cancell all tasks
4689 except Exception as exc
:
4690 error_list
.append(str(exc
))
4691 # update status at database
4693 error_detail
= "; ".join(error_list
)
4694 # self.logger.error(logging_text + error_detail)
4695 error_description_nslcmop
= "{} Detail: {}".format(
4696 stage
[0], error_detail
4698 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4699 nslcmop_id
, stage
[0]
4702 db_nsr_update
["operational-status"] = "failed"
4703 db_nsr_update
["detailed-status"] = (
4704 error_description_nsr
+ " Detail: " + error_detail
4706 db_nslcmop_update
["detailed-status"] = error_detail
4707 nslcmop_operation_state
= "FAILED"
4711 error_description_nsr
= error_description_nslcmop
= None
4712 ns_state
= "NOT_INSTANTIATED"
4713 db_nsr_update
["operational-status"] = "terminated"
4714 db_nsr_update
["detailed-status"] = "Done"
4715 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4716 db_nslcmop_update
["detailed-status"] = "Done"
4717 nslcmop_operation_state
= "COMPLETED"
4720 self
._write
_ns
_status
(
4723 current_operation
="IDLE",
4724 current_operation_id
=None,
4725 error_description
=error_description_nsr
,
4726 error_detail
=error_detail
,
4727 other_update
=db_nsr_update
,
4729 self
._write
_op
_status
(
4732 error_message
=error_description_nslcmop
,
4733 operation_state
=nslcmop_operation_state
,
4734 other_update
=db_nslcmop_update
,
4736 if ns_state
== "NOT_INSTANTIATED":
4740 {"nsr-id-ref": nsr_id
},
4741 {"_admin.nsState": "NOT_INSTANTIATED"},
4743 except DbException
as e
:
4746 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4750 if operation_params
:
4751 autoremove
= operation_params
.get("autoremove", False)
4752 if nslcmop_operation_state
:
4754 await self
.msg
.aiowrite(
4759 "nslcmop_id": nslcmop_id
,
4760 "operationState": nslcmop_operation_state
,
4761 "autoremove": autoremove
,
4765 except Exception as e
:
4767 logging_text
+ "kafka_write notification Exception {}".format(e
)
4770 self
.logger
.debug(logging_text
+ "Exit")
4771 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4773 async def _wait_for_tasks(
4774 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4777 error_detail_list
= []
4779 pending_tasks
= list(created_tasks_info
.keys())
4780 num_tasks
= len(pending_tasks
)
4782 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4783 self
._write
_op
_status
(nslcmop_id
, stage
)
4784 while pending_tasks
:
4786 _timeout
= timeout
+ time_start
- time()
4787 done
, pending_tasks
= await asyncio
.wait(
4788 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4790 num_done
+= len(done
)
4791 if not done
: # Timeout
4792 for task
in pending_tasks
:
4793 new_error
= created_tasks_info
[task
] + ": Timeout"
4794 error_detail_list
.append(new_error
)
4795 error_list
.append(new_error
)
4798 if task
.cancelled():
4801 exc
= task
.exception()
4803 if isinstance(exc
, asyncio
.TimeoutError
):
4805 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4806 error_list
.append(created_tasks_info
[task
])
4807 error_detail_list
.append(new_error
)
4814 ROclient
.ROClientException
,
4820 self
.logger
.error(logging_text
+ new_error
)
4822 exc_traceback
= "".join(
4823 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4827 + created_tasks_info
[task
]
4833 logging_text
+ created_tasks_info
[task
] + ": Done"
4835 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4837 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4838 if nsr_id
: # update also nsr
4843 "errorDescription": "Error at: " + ", ".join(error_list
),
4844 "errorDetail": ". ".join(error_detail_list
),
4847 self
._write
_op
_status
(nslcmop_id
, stage
)
4848 return error_detail_list
4851 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4853 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4854 The default-value is used. If it is between < > it look for a value at instantiation_params
4855 :param primitive_desc: portion of VNFD/NSD that describes primitive
4856 :param params: Params provided by user
4857 :param instantiation_params: Instantiation params provided by user
4858 :return: a dictionary with the calculated params
4860 calculated_params
= {}
4861 for parameter
in primitive_desc
.get("parameter", ()):
4862 param_name
= parameter
["name"]
4863 if param_name
in params
:
4864 calculated_params
[param_name
] = params
[param_name
]
4865 elif "default-value" in parameter
or "value" in parameter
:
4866 if "value" in parameter
:
4867 calculated_params
[param_name
] = parameter
["value"]
4869 calculated_params
[param_name
] = parameter
["default-value"]
4871 isinstance(calculated_params
[param_name
], str)
4872 and calculated_params
[param_name
].startswith("<")
4873 and calculated_params
[param_name
].endswith(">")
4875 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4876 calculated_params
[param_name
] = instantiation_params
[
4877 calculated_params
[param_name
][1:-1]
4881 "Parameter {} needed to execute primitive {} not provided".format(
4882 calculated_params
[param_name
], primitive_desc
["name"]
4887 "Parameter {} needed to execute primitive {} not provided".format(
4888 param_name
, primitive_desc
["name"]
4892 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4893 calculated_params
[param_name
] = yaml
.safe_dump(
4894 calculated_params
[param_name
], default_flow_style
=True, width
=256
4896 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4898 ].startswith("!!yaml "):
4899 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4900 if parameter
.get("data-type") == "INTEGER":
4902 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4903 except ValueError: # error converting string to int
4905 "Parameter {} of primitive {} must be integer".format(
4906 param_name
, primitive_desc
["name"]
4909 elif parameter
.get("data-type") == "BOOLEAN":
4910 calculated_params
[param_name
] = not (
4911 (str(calculated_params
[param_name
])).lower() == "false"
4914 # add always ns_config_info if primitive name is config
4915 if primitive_desc
["name"] == "config":
4916 if "ns_config_info" in instantiation_params
:
4917 calculated_params
["ns_config_info"] = instantiation_params
[
4920 return calculated_params
4922 def _look_for_deployed_vca(
4929 ee_descriptor_id
=None,
4931 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4932 for vca
in deployed_vca
:
4935 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4938 vdu_count_index
is not None
4939 and vdu_count_index
!= vca
["vdu_count_index"]
4942 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4944 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4948 # vca_deployed not found
4950 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4951 " is not deployed".format(
4960 ee_id
= vca
.get("ee_id")
4962 "type", "lxc_proxy_charm"
4963 ) # default value for backward compatibility - proxy charm
4966 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4967 "execution environment".format(
4968 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
4971 return ee_id
, vca_type
4973 async def _ns_execute_primitive(
4979 retries_interval
=30,
4986 if primitive
== "config":
4987 primitive_params
= {"params": primitive_params
}
4989 vca_type
= vca_type
or "lxc_proxy_charm"
4993 output
= await asyncio
.wait_for(
4994 self
.vca_map
[vca_type
].exec_primitive(
4996 primitive_name
=primitive
,
4997 params_dict
=primitive_params
,
4998 progress_timeout
=self
.timeout_progress_primitive
,
4999 total_timeout
=self
.timeout_primitive
,
5004 timeout
=timeout
or self
.timeout_primitive
,
5008 except asyncio
.CancelledError
:
5010 except Exception as e
:
5014 "Error executing action {} on {} -> {}".format(
5019 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
5021 if isinstance(e
, asyncio
.TimeoutError
):
5023 message
="Timed out waiting for action to complete"
5025 return "FAILED", getattr(e
, "message", repr(e
))
5027 return "COMPLETED", output
5029 except (LcmException
, asyncio
.CancelledError
):
5031 except Exception as e
:
5032 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
5034 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
5036 Updating the vca_status with latest juju information in nsrs record
5037 :param: nsr_id: Id of the nsr
5038 :param: nslcmop_id: Id of the nslcmop
5042 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
5043 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5044 vca_id
= self
.get_vca_id({}, db_nsr
)
5045 if db_nsr
["_admin"]["deployed"]["K8s"]:
5046 for _
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
5047 cluster_uuid
, kdu_instance
, cluster_type
= (
5048 k8s
["k8scluster-uuid"],
5049 k8s
["kdu-instance"],
5050 k8s
["k8scluster-type"],
5052 await self
._on
_update
_k
8s
_db
(
5053 cluster_uuid
=cluster_uuid
,
5054 kdu_instance
=kdu_instance
,
5055 filter={"_id": nsr_id
},
5057 cluster_type
=cluster_type
,
5060 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
5061 table
, filter = "nsrs", {"_id": nsr_id
}
5062 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
5063 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
5065 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
5066 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
5068 async def action(self
, nsr_id
, nslcmop_id
):
5069 # Try to lock HA task here
5070 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5071 if not task_is_locked_by_me
:
5074 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
5075 self
.logger
.debug(logging_text
+ "Enter")
5076 # get all needed from database
5080 db_nslcmop_update
= {}
5081 nslcmop_operation_state
= None
5082 error_description_nslcmop
= None
5085 # wait for any previous tasks in process
5086 step
= "Waiting for previous operations to terminate"
5087 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5089 self
._write
_ns
_status
(
5092 current_operation
="RUNNING ACTION",
5093 current_operation_id
=nslcmop_id
,
5096 step
= "Getting information from database"
5097 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5098 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5099 if db_nslcmop
["operationParams"].get("primitive_params"):
5100 db_nslcmop
["operationParams"]["primitive_params"] = json
.loads(
5101 db_nslcmop
["operationParams"]["primitive_params"]
5104 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5105 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
5106 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
5107 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
5108 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
5109 primitive
= db_nslcmop
["operationParams"]["primitive"]
5110 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
5111 timeout_ns_action
= db_nslcmop
["operationParams"].get(
5112 "timeout_ns_action", self
.timeout_primitive
5116 step
= "Getting vnfr from database"
5117 db_vnfr
= self
.db
.get_one(
5118 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
5120 if db_vnfr
.get("kdur"):
5122 for kdur
in db_vnfr
["kdur"]:
5123 if kdur
.get("additionalParams"):
5124 kdur
["additionalParams"] = json
.loads(
5125 kdur
["additionalParams"]
5127 kdur_list
.append(kdur
)
5128 db_vnfr
["kdur"] = kdur_list
5129 step
= "Getting vnfd from database"
5130 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
5132 # Sync filesystem before running a primitive
5133 self
.fs
.sync(db_vnfr
["vnfd-id"])
5135 step
= "Getting nsd from database"
5136 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
5138 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5139 # for backward compatibility
5140 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5141 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5142 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5143 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5145 # look for primitive
5146 config_primitive_desc
= descriptor_configuration
= None
5148 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
5150 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
5152 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
5154 descriptor_configuration
= db_nsd
.get("ns-configuration")
5156 if descriptor_configuration
and descriptor_configuration
.get(
5159 for config_primitive
in descriptor_configuration
["config-primitive"]:
5160 if config_primitive
["name"] == primitive
:
5161 config_primitive_desc
= config_primitive
5164 if not config_primitive_desc
:
5165 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
5167 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
5171 primitive_name
= primitive
5172 ee_descriptor_id
= None
5174 primitive_name
= config_primitive_desc
.get(
5175 "execution-environment-primitive", primitive
5177 ee_descriptor_id
= config_primitive_desc
.get(
5178 "execution-environment-ref"
5184 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
5186 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
5189 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
5191 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
5193 desc_params
= parse_yaml_strings(
5194 db_vnfr
.get("additionalParamsForVnf")
5197 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
5198 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
5199 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
5201 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
5202 actions
.add(primitive
["name"])
5203 for primitive
in kdu_configuration
.get("config-primitive", []):
5204 actions
.add(primitive
["name"])
5206 nsr_deployed
["K8s"],
5207 lambda kdu
: kdu_name
== kdu
["kdu-name"]
5208 and kdu
["member-vnf-index"] == vnf_index
,
5212 if primitive_name
in actions
5213 and kdu
["k8scluster-type"] not in ("helm-chart", "helm-chart-v3")
5217 # TODO check if ns is in a proper status
5219 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
5221 # kdur and desc_params already set from before
5222 if primitive_params
:
5223 desc_params
.update(primitive_params
)
5224 # TODO Check if we will need something at vnf level
5225 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
5227 kdu_name
== kdu
["kdu-name"]
5228 and kdu
["member-vnf-index"] == vnf_index
5233 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
5236 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
5237 msg
= "unknown k8scluster-type '{}'".format(
5238 kdu
.get("k8scluster-type")
5240 raise LcmException(msg
)
5243 "collection": "nsrs",
5244 "filter": {"_id": nsr_id
},
5245 "path": "_admin.deployed.K8s.{}".format(index
),
5249 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
5251 step
= "Executing kdu {}".format(primitive_name
)
5252 if primitive_name
== "upgrade":
5253 if desc_params
.get("kdu_model"):
5254 kdu_model
= desc_params
.get("kdu_model")
5255 del desc_params
["kdu_model"]
5257 kdu_model
= kdu
.get("kdu-model")
5258 parts
= kdu_model
.split(sep
=":")
5260 kdu_model
= parts
[0]
5261 if desc_params
.get("kdu_atomic_upgrade"):
5262 atomic_upgrade
= desc_params
.get("kdu_atomic_upgrade").lower() in ("yes", "true", "1")
5263 del desc_params
["kdu_atomic_upgrade"]
5265 atomic_upgrade
= True
5267 detailed_status
= await asyncio
.wait_for(
5268 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
5269 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5270 kdu_instance
=kdu
.get("kdu-instance"),
5271 atomic
=atomic_upgrade
,
5272 kdu_model
=kdu_model
,
5275 timeout
=timeout_ns_action
,
5277 timeout
=timeout_ns_action
+ 10,
5280 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
5282 elif primitive_name
== "rollback":
5283 detailed_status
= await asyncio
.wait_for(
5284 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
5285 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5286 kdu_instance
=kdu
.get("kdu-instance"),
5289 timeout
=timeout_ns_action
,
5291 elif primitive_name
== "status":
5292 detailed_status
= await asyncio
.wait_for(
5293 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
5294 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5295 kdu_instance
=kdu
.get("kdu-instance"),
5298 timeout
=timeout_ns_action
,
5301 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
5302 kdu
["kdu-name"], nsr_id
5304 params
= self
._map
_primitive
_params
(
5305 config_primitive_desc
, primitive_params
, desc_params
5308 detailed_status
= await asyncio
.wait_for(
5309 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
5310 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5311 kdu_instance
=kdu_instance
,
5312 primitive_name
=primitive_name
,
5315 timeout
=timeout_ns_action
,
5318 timeout
=timeout_ns_action
,
5322 nslcmop_operation_state
= "COMPLETED"
5324 detailed_status
= ""
5325 nslcmop_operation_state
= "FAILED"
5327 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5328 nsr_deployed
["VCA"],
5329 member_vnf_index
=vnf_index
,
5331 vdu_count_index
=vdu_count_index
,
5332 ee_descriptor_id
=ee_descriptor_id
,
5334 for vca_index
, vca_deployed
in enumerate(
5335 db_nsr
["_admin"]["deployed"]["VCA"]
5337 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5339 "collection": "nsrs",
5340 "filter": {"_id": nsr_id
},
5341 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
5345 nslcmop_operation_state
,
5347 ) = await self
._ns
_execute
_primitive
(
5349 primitive
=primitive_name
,
5350 primitive_params
=self
._map
_primitive
_params
(
5351 config_primitive_desc
, primitive_params
, desc_params
5353 timeout
=timeout_ns_action
,
5359 db_nslcmop_update
["detailed-status"] = detailed_status
5360 error_description_nslcmop
= (
5361 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
5365 + "Done with result {} {}".format(
5366 nslcmop_operation_state
, detailed_status
5369 return # database update is called inside finally
5371 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5372 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5374 except asyncio
.CancelledError
:
5376 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5378 exc
= "Operation was cancelled"
5379 except asyncio
.TimeoutError
:
5380 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5382 except Exception as e
:
5383 exc
= traceback
.format_exc()
5384 self
.logger
.critical(
5385 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5394 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5395 nslcmop_operation_state
= "FAILED"
5397 self
._write
_ns
_status
(
5401 ], # TODO check if degraded. For the moment use previous status
5402 current_operation
="IDLE",
5403 current_operation_id
=None,
5404 # error_description=error_description_nsr,
5405 # error_detail=error_detail,
5406 other_update
=db_nsr_update
,
5409 self
._write
_op
_status
(
5412 error_message
=error_description_nslcmop
,
5413 operation_state
=nslcmop_operation_state
,
5414 other_update
=db_nslcmop_update
,
5417 if nslcmop_operation_state
:
5419 await self
.msg
.aiowrite(
5424 "nslcmop_id": nslcmop_id
,
5425 "operationState": nslcmop_operation_state
,
5429 except Exception as e
:
5431 logging_text
+ "kafka_write notification Exception {}".format(e
)
5433 self
.logger
.debug(logging_text
+ "Exit")
5434 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5435 return nslcmop_operation_state
, detailed_status
5437 async def terminate_vdus(
5438 self
, db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
5440 """This method terminates VDUs
5443 db_vnfr: VNF instance record
5444 member_vnf_index: VNF index to identify the VDUs to be removed
5445 db_nsr: NS instance record
5446 update_db_nslcmops: Nslcmop update record
5448 vca_scaling_info
= []
5449 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5450 scaling_info
["scaling_direction"] = "IN"
5451 scaling_info
["vdu-delete"] = {}
5452 scaling_info
["kdu-delete"] = {}
5453 db_vdur
= db_vnfr
.get("vdur")
5454 vdur_list
= copy(db_vdur
)
5456 for index
, vdu
in enumerate(vdur_list
):
5457 vca_scaling_info
.append(
5459 "osm_vdu_id": vdu
["vdu-id-ref"],
5460 "member-vnf-index": member_vnf_index
,
5462 "vdu_index": count_index
,
5465 scaling_info
["vdu-delete"][vdu
["vdu-id-ref"]] = count_index
5466 scaling_info
["vdu"].append(
5468 "name": vdu
.get("name") or vdu
.get("vdu-name"),
5469 "vdu_id": vdu
["vdu-id-ref"],
5473 for interface
in vdu
["interfaces"]:
5474 scaling_info
["vdu"][index
]["interface"].append(
5476 "name": interface
["name"],
5477 "ip_address": interface
["ip-address"],
5478 "mac_address": interface
.get("mac-address"),
5481 self
.logger
.info("NS update scaling info{}".format(scaling_info
))
5482 stage
[2] = "Terminating VDUs"
5483 if scaling_info
.get("vdu-delete"):
5484 # scale_process = "RO"
5485 if self
.ro_config
.get("ng"):
5486 await self
._scale
_ng
_ro
(
5495 async def remove_vnf(self
, nsr_id
, nslcmop_id
, vnf_instance_id
):
5496 """This method is to Remove VNF instances from NS.
5499 nsr_id: NS instance id
5500 nslcmop_id: nslcmop id of update
5501 vnf_instance_id: id of the VNF instance to be removed
5504 result: (str, str) COMPLETED/FAILED, details
5508 logging_text
= "Task ns={} update ".format(nsr_id
)
5509 check_vnfr_count
= len(self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}))
5510 self
.logger
.info("check_vnfr_count {}".format(check_vnfr_count
))
5511 if check_vnfr_count
> 1:
5512 stage
= ["", "", ""]
5513 step
= "Getting nslcmop from database"
5515 step
+ " after having waited for previous tasks to be completed"
5517 # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5518 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5519 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5520 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5521 """ db_vnfr = self.db.get_one(
5522 "vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id}) """
5524 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5525 await self
.terminate_vdus(
5534 constituent_vnfr
= db_nsr
.get("constituent-vnfr-ref")
5535 constituent_vnfr
.remove(db_vnfr
.get("_id"))
5536 db_nsr_update
["constituent-vnfr-ref"] = db_nsr
.get(
5537 "constituent-vnfr-ref"
5539 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5540 self
.db
.del_one("vnfrs", {"_id": db_vnfr
.get("_id")})
5541 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5542 return "COMPLETED", "Done"
5544 step
= "Terminate VNF Failed with"
5546 "{} Cannot terminate the last VNF in this NS.".format(
5550 except (LcmException
, asyncio
.CancelledError
):
5552 except Exception as e
:
5553 self
.logger
.debug("Error removing VNF {}".format(e
))
5554 return "FAILED", "Error removing VNF {}".format(e
)
5556 async def _ns_redeploy_vnf(
5564 """This method updates and redeploys VNF instances
5567 nsr_id: NS instance id
5568 nslcmop_id: nslcmop id
5569 db_vnfd: VNF descriptor
5570 db_vnfr: VNF instance record
5571 db_nsr: NS instance record
5574 result: (str, str) COMPLETED/FAILED, details
5578 stage
= ["", "", ""]
5579 logging_text
= "Task ns={} update ".format(nsr_id
)
5580 latest_vnfd_revision
= db_vnfd
["_admin"].get("revision")
5581 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5583 # Terminate old VNF resources
5584 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5585 await self
.terminate_vdus(
5594 # old_vnfd_id = db_vnfr["vnfd-id"]
5595 # new_db_vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
5596 new_db_vnfd
= db_vnfd
5597 # new_vnfd_ref = new_db_vnfd["id"]
5598 # new_vnfd_id = vnfd_id
5602 for cp
in new_db_vnfd
.get("ext-cpd", ()):
5604 "name": cp
.get("id"),
5605 "connection-point-id": cp
.get("int-cpd", {}).get("cpd"),
5606 "connection-point-vdu-id": cp
.get("int-cpd", {}).get("vdu-id"),
5609 new_vnfr_cp
.append(vnf_cp
)
5610 new_vdur
= update_db_nslcmops
["operationParams"]["newVdur"]
5611 # new_vdur = self._create_vdur_descriptor_from_vnfd(db_nsd, db_vnfd, old_db_vnfd, vnfd_id, db_nsr, member_vnf_index)
5612 # new_vnfr_update = {"vnfd-ref": new_vnfd_ref, "vnfd-id": new_vnfd_id, "connection-point": new_vnfr_cp, "vdur": new_vdur, "ip-address": ""}
5614 "revision": latest_vnfd_revision
,
5615 "connection-point": new_vnfr_cp
,
5619 self
.update_db_2("vnfrs", db_vnfr
["_id"], new_vnfr_update
)
5620 updated_db_vnfr
= self
.db
.get_one(
5622 {"member-vnf-index-ref": member_vnf_index
, "nsr-id-ref": nsr_id
},
5625 # Instantiate new VNF resources
5626 # update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5627 vca_scaling_info
= []
5628 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5629 scaling_info
["scaling_direction"] = "OUT"
5630 scaling_info
["vdu-create"] = {}
5631 scaling_info
["kdu-create"] = {}
5632 vdud_instantiate_list
= db_vnfd
["vdu"]
5633 for index
, vdud
in enumerate(vdud_instantiate_list
):
5634 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(vdud
, db_vnfd
)
5636 additional_params
= (
5637 self
._get
_vdu
_additional
_params
(updated_db_vnfr
, vdud
["id"])
5640 cloud_init_list
= []
5642 # TODO Information of its own ip is not available because db_vnfr is not updated.
5643 additional_params
["OSM"] = get_osm_params(
5644 updated_db_vnfr
, vdud
["id"], 1
5646 cloud_init_list
.append(
5647 self
._parse
_cloud
_init
(
5654 vca_scaling_info
.append(
5656 "osm_vdu_id": vdud
["id"],
5657 "member-vnf-index": member_vnf_index
,
5659 "vdu_index": count_index
,
5662 scaling_info
["vdu-create"][vdud
["id"]] = count_index
5663 if self
.ro_config
.get("ng"):
5665 "New Resources to be deployed: {}".format(scaling_info
)
5667 await self
._scale
_ng
_ro
(
5675 return "COMPLETED", "Done"
5676 except (LcmException
, asyncio
.CancelledError
):
5678 except Exception as e
:
5679 self
.logger
.debug("Error updating VNF {}".format(e
))
5680 return "FAILED", "Error updating VNF {}".format(e
)
5682 async def _ns_charm_upgrade(
5688 timeout
: float = None,
5690 """This method upgrade charms in VNF instances
5693 ee_id: Execution environment id
5694 path: Local path to the charm
5696 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
5697 timeout: (Float) Timeout for the ns update operation
5700 result: (str, str) COMPLETED/FAILED, details
5703 charm_type
= charm_type
or "lxc_proxy_charm"
5704 output
= await self
.vca_map
[charm_type
].upgrade_charm(
5708 charm_type
=charm_type
,
5709 timeout
=timeout
or self
.timeout_ns_update
,
5713 return "COMPLETED", output
5715 except (LcmException
, asyncio
.CancelledError
):
5718 except Exception as e
:
5720 self
.logger
.debug("Error upgrading charm {}".format(path
))
5722 return "FAILED", "Error upgrading charm {}: {}".format(path
, e
)
5724 async def update(self
, nsr_id
, nslcmop_id
):
5725 """Update NS according to different update types
5727 This method performs upgrade of VNF instances then updates the revision
5728 number in VNF record
5731 nsr_id: Network service will be updated
5732 nslcmop_id: ns lcm operation id
5735 It may raise DbException, LcmException, N2VCException, K8sException
5738 # Try to lock HA task here
5739 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5740 if not task_is_locked_by_me
:
5743 logging_text
= "Task ns={} update={} ".format(nsr_id
, nslcmop_id
)
5744 self
.logger
.debug(logging_text
+ "Enter")
5746 # Set the required variables to be filled up later
5748 db_nslcmop_update
= {}
5750 nslcmop_operation_state
= None
5752 error_description_nslcmop
= ""
5754 change_type
= "updated"
5755 detailed_status
= ""
5758 # wait for any previous tasks in process
5759 step
= "Waiting for previous operations to terminate"
5760 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5761 self
._write
_ns
_status
(
5764 current_operation
="UPDATING",
5765 current_operation_id
=nslcmop_id
,
5768 step
= "Getting nslcmop from database"
5769 db_nslcmop
= self
.db
.get_one(
5770 "nslcmops", {"_id": nslcmop_id
}, fail_on_empty
=False
5772 update_type
= db_nslcmop
["operationParams"]["updateType"]
5774 step
= "Getting nsr from database"
5775 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5776 old_operational_status
= db_nsr
["operational-status"]
5777 db_nsr_update
["operational-status"] = "updating"
5778 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5779 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5781 if update_type
== "CHANGE_VNFPKG":
5783 # Get the input parameters given through update request
5784 vnf_instance_id
= db_nslcmop
["operationParams"][
5785 "changeVnfPackageData"
5786 ].get("vnfInstanceId")
5788 vnfd_id
= db_nslcmop
["operationParams"]["changeVnfPackageData"].get(
5791 timeout_seconds
= db_nslcmop
["operationParams"].get("timeout_ns_update")
5793 step
= "Getting vnfr from database"
5794 db_vnfr
= self
.db
.get_one(
5795 "vnfrs", {"_id": vnf_instance_id
}, fail_on_empty
=False
5798 step
= "Getting vnfds from database"
5800 latest_vnfd
= self
.db
.get_one(
5801 "vnfds", {"_id": vnfd_id
}, fail_on_empty
=False
5803 latest_vnfd_revision
= latest_vnfd
["_admin"].get("revision")
5806 current_vnf_revision
= db_vnfr
.get("revision", 1)
5807 current_vnfd
= self
.db
.get_one(
5809 {"_id": vnfd_id
+ ":" + str(current_vnf_revision
)},
5810 fail_on_empty
=False,
5812 # Charm artifact paths will be filled up later
5814 current_charm_artifact_path
,
5815 target_charm_artifact_path
,
5816 charm_artifact_paths
,
5819 step
= "Checking if revision has changed in VNFD"
5820 if current_vnf_revision
!= latest_vnfd_revision
:
5822 change_type
= "policy_updated"
5824 # There is new revision of VNFD, update operation is required
5825 current_vnfd_path
= vnfd_id
+ ":" + str(current_vnf_revision
)
5826 latest_vnfd_path
= vnfd_id
+ ":" + str(latest_vnfd_revision
)
5828 step
= "Removing the VNFD packages if they exist in the local path"
5829 shutil
.rmtree(self
.fs
.path
+ current_vnfd_path
, ignore_errors
=True)
5830 shutil
.rmtree(self
.fs
.path
+ latest_vnfd_path
, ignore_errors
=True)
5832 step
= "Get the VNFD packages from FSMongo"
5833 self
.fs
.sync(from_path
=latest_vnfd_path
)
5834 self
.fs
.sync(from_path
=current_vnfd_path
)
5837 "Get the charm-type, charm-id, ee-id if there is deployed VCA"
5839 base_folder
= latest_vnfd
["_admin"]["storage"]
5841 for charm_index
, charm_deployed
in enumerate(
5842 get_iterable(nsr_deployed
, "VCA")
5844 vnf_index
= db_vnfr
.get("member-vnf-index-ref")
5846 # Getting charm-id and charm-type
5847 if charm_deployed
.get("member-vnf-index") == vnf_index
:
5848 charm_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5849 charm_type
= charm_deployed
.get("type")
5852 ee_id
= charm_deployed
.get("ee_id")
5854 step
= "Getting descriptor config"
5855 descriptor_config
= get_configuration(
5856 current_vnfd
, current_vnfd
["id"]
5859 if "execution-environment-list" in descriptor_config
:
5860 ee_list
= descriptor_config
.get(
5861 "execution-environment-list", []
5866 # There could be several charm used in the same VNF
5867 for ee_item
in ee_list
:
5868 if ee_item
.get("juju"):
5870 step
= "Getting charm name"
5871 charm_name
= ee_item
["juju"].get("charm")
5873 step
= "Setting Charm artifact paths"
5874 current_charm_artifact_path
.append(
5875 get_charm_artifact_path(
5879 current_vnf_revision
,
5882 target_charm_artifact_path
.append(
5883 get_charm_artifact_path(
5887 latest_vnfd_revision
,
5891 charm_artifact_paths
= zip(
5892 current_charm_artifact_path
, target_charm_artifact_path
5895 step
= "Checking if software version has changed in VNFD"
5896 if find_software_version(current_vnfd
) != find_software_version(
5900 step
= "Checking if existing VNF has charm"
5901 for current_charm_path
, target_charm_path
in list(
5902 charm_artifact_paths
5904 if current_charm_path
:
5906 "Software version change is not supported as VNF instance {} has charm.".format(
5911 # There is no change in the charm package, then redeploy the VNF
5912 # based on new descriptor
5913 step
= "Redeploying VNF"
5914 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5915 (result
, detailed_status
) = await self
._ns
_redeploy
_vnf
(
5916 nsr_id
, nslcmop_id
, latest_vnfd
, db_vnfr
, db_nsr
5918 if result
== "FAILED":
5919 nslcmop_operation_state
= result
5920 error_description_nslcmop
= detailed_status
5921 db_nslcmop_update
["detailed-status"] = detailed_status
5924 + " step {} Done with result {} {}".format(
5925 step
, nslcmop_operation_state
, detailed_status
5930 step
= "Checking if any charm package has changed or not"
5931 for current_charm_path
, target_charm_path
in list(
5932 charm_artifact_paths
5936 and target_charm_path
5937 and self
.check_charm_hash_changed(
5938 current_charm_path
, target_charm_path
5942 step
= "Checking whether VNF uses juju bundle"
5943 if check_juju_bundle_existence(current_vnfd
):
5946 "Charm upgrade is not supported for the instance which"
5947 " uses juju-bundle: {}".format(
5948 check_juju_bundle_existence(current_vnfd
)
5952 step
= "Upgrading Charm"
5956 ) = await self
._ns
_charm
_upgrade
(
5959 charm_type
=charm_type
,
5960 path
=self
.fs
.path
+ target_charm_path
,
5961 timeout
=timeout_seconds
,
5964 if result
== "FAILED":
5965 nslcmop_operation_state
= result
5966 error_description_nslcmop
= detailed_status
5968 db_nslcmop_update
["detailed-status"] = detailed_status
5971 + " step {} Done with result {} {}".format(
5972 step
, nslcmop_operation_state
, detailed_status
5976 step
= "Updating policies"
5977 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5978 result
= "COMPLETED"
5979 detailed_status
= "Done"
5980 db_nslcmop_update
["detailed-status"] = "Done"
5982 # If nslcmop_operation_state is None, so any operation is not failed.
5983 if not nslcmop_operation_state
:
5984 nslcmop_operation_state
= "COMPLETED"
5986 # If update CHANGE_VNFPKG nslcmop_operation is successful
5987 # vnf revision need to be updated
5988 vnfr_update
["revision"] = latest_vnfd_revision
5989 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
5993 + " task Done with result {} {}".format(
5994 nslcmop_operation_state
, detailed_status
5997 elif update_type
== "REMOVE_VNF":
5998 # This part is included in https://osm.etsi.org/gerrit/11876
5999 vnf_instance_id
= db_nslcmop
["operationParams"]["removeVnfInstanceId"]
6000 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
6001 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
6002 step
= "Removing VNF"
6003 (result
, detailed_status
) = await self
.remove_vnf(
6004 nsr_id
, nslcmop_id
, vnf_instance_id
6006 if result
== "FAILED":
6007 nslcmop_operation_state
= result
6008 error_description_nslcmop
= detailed_status
6009 db_nslcmop_update
["detailed-status"] = detailed_status
6010 change_type
= "vnf_terminated"
6011 if not nslcmop_operation_state
:
6012 nslcmop_operation_state
= "COMPLETED"
6015 + " task Done with result {} {}".format(
6016 nslcmop_operation_state
, detailed_status
6020 elif update_type
== "OPERATE_VNF":
6021 vnf_id
= db_nslcmop
["operationParams"]["operateVnfData"][
6024 operation_type
= db_nslcmop
["operationParams"]["operateVnfData"][
6027 additional_param
= db_nslcmop
["operationParams"]["operateVnfData"][
6030 (result
, detailed_status
) = await self
.rebuild_start_stop(
6031 nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
6033 if result
== "FAILED":
6034 nslcmop_operation_state
= result
6035 error_description_nslcmop
= detailed_status
6036 db_nslcmop_update
["detailed-status"] = detailed_status
6037 if not nslcmop_operation_state
:
6038 nslcmop_operation_state
= "COMPLETED"
6041 + " task Done with result {} {}".format(
6042 nslcmop_operation_state
, detailed_status
6046 # If nslcmop_operation_state is None, so any operation is not failed.
6047 # All operations are executed in overall.
6048 if not nslcmop_operation_state
:
6049 nslcmop_operation_state
= "COMPLETED"
6050 db_nsr_update
["operational-status"] = old_operational_status
6052 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
6053 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
6055 except asyncio
.CancelledError
:
6057 logging_text
+ "Cancelled Exception while '{}'".format(step
)
6059 exc
= "Operation was cancelled"
6060 except asyncio
.TimeoutError
:
6061 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
6063 except Exception as e
:
6064 exc
= traceback
.format_exc()
6065 self
.logger
.critical(
6066 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6075 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
6076 nslcmop_operation_state
= "FAILED"
6077 db_nsr_update
["operational-status"] = old_operational_status
6079 self
._write
_ns
_status
(
6081 ns_state
=db_nsr
["nsState"],
6082 current_operation
="IDLE",
6083 current_operation_id
=None,
6084 other_update
=db_nsr_update
,
6087 self
._write
_op
_status
(
6090 error_message
=error_description_nslcmop
,
6091 operation_state
=nslcmop_operation_state
,
6092 other_update
=db_nslcmop_update
,
6095 if nslcmop_operation_state
:
6099 "nslcmop_id": nslcmop_id
,
6100 "operationState": nslcmop_operation_state
,
6102 if change_type
in ("vnf_terminated", "policy_updated"):
6103 msg
.update({"vnf_member_index": member_vnf_index
})
6104 await self
.msg
.aiowrite("ns", change_type
, msg
, loop
=self
.loop
)
6105 except Exception as e
:
6107 logging_text
+ "kafka_write notification Exception {}".format(e
)
6109 self
.logger
.debug(logging_text
+ "Exit")
6110 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_update")
6111 return nslcmop_operation_state
, detailed_status
6113 async def scale(self
, nsr_id
, nslcmop_id
):
6114 # Try to lock HA task here
6115 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
6116 if not task_is_locked_by_me
:
6119 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
6120 stage
= ["", "", ""]
6121 tasks_dict_info
= {}
6122 # ^ stage, step, VIM progress
6123 self
.logger
.debug(logging_text
+ "Enter")
6124 # get all needed from database
6126 db_nslcmop_update
= {}
6129 # in case of error, indicates what part of scale was failed to put nsr at error status
6130 scale_process
= None
6131 old_operational_status
= ""
6132 old_config_status
= ""
6135 # wait for any previous tasks in process
6136 step
= "Waiting for previous operations to terminate"
6137 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
6138 self
._write
_ns
_status
(
6141 current_operation
="SCALING",
6142 current_operation_id
=nslcmop_id
,
6145 step
= "Getting nslcmop from database"
6147 step
+ " after having waited for previous tasks to be completed"
6149 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
6151 step
= "Getting nsr from database"
6152 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
6153 old_operational_status
= db_nsr
["operational-status"]
6154 old_config_status
= db_nsr
["config-status"]
6156 step
= "Parsing scaling parameters"
6157 db_nsr_update
["operational-status"] = "scaling"
6158 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6159 nsr_deployed
= db_nsr
["_admin"].get("deployed")
6161 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
6163 ]["member-vnf-index"]
6164 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
6166 ]["scaling-group-descriptor"]
6167 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
6168 # for backward compatibility
6169 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
6170 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
6171 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
6172 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6174 step
= "Getting vnfr from database"
6175 db_vnfr
= self
.db
.get_one(
6176 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
6179 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
6181 step
= "Getting vnfd from database"
6182 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
6184 base_folder
= db_vnfd
["_admin"]["storage"]
6186 step
= "Getting scaling-group-descriptor"
6187 scaling_descriptor
= find_in_list(
6188 get_scaling_aspect(db_vnfd
),
6189 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
6191 if not scaling_descriptor
:
6193 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
6194 "at vnfd:scaling-group-descriptor".format(scaling_group
)
6197 step
= "Sending scale order to VIM"
6198 # TODO check if ns is in a proper status
6200 if not db_nsr
["_admin"].get("scaling-group"):
6205 "_admin.scaling-group": [
6206 {"name": scaling_group
, "nb-scale-op": 0}
6210 admin_scale_index
= 0
6212 for admin_scale_index
, admin_scale_info
in enumerate(
6213 db_nsr
["_admin"]["scaling-group"]
6215 if admin_scale_info
["name"] == scaling_group
:
6216 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
6218 else: # not found, set index one plus last element and add new entry with the name
6219 admin_scale_index
+= 1
6221 "_admin.scaling-group.{}.name".format(admin_scale_index
)
6224 vca_scaling_info
= []
6225 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
6226 if scaling_type
== "SCALE_OUT":
6227 if "aspect-delta-details" not in scaling_descriptor
:
6229 "Aspect delta details not fount in scaling descriptor {}".format(
6230 scaling_descriptor
["name"]
6233 # count if max-instance-count is reached
6234 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6236 scaling_info
["scaling_direction"] = "OUT"
6237 scaling_info
["vdu-create"] = {}
6238 scaling_info
["kdu-create"] = {}
6239 for delta
in deltas
:
6240 for vdu_delta
in delta
.get("vdu-delta", {}):
6241 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
6242 # vdu_index also provides the number of instance of the targeted vdu
6243 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6244 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
6248 additional_params
= (
6249 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
6252 cloud_init_list
= []
6254 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6255 max_instance_count
= 10
6256 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
6257 max_instance_count
= vdu_profile
.get(
6258 "max-number-of-instances", 10
6261 default_instance_num
= get_number_of_instances(
6264 instances_number
= vdu_delta
.get("number-of-instances", 1)
6265 nb_scale_op
+= instances_number
6267 new_instance_count
= nb_scale_op
+ default_instance_num
6268 # Control if new count is over max and vdu count is less than max.
6269 # Then assign new instance count
6270 if new_instance_count
> max_instance_count
> vdu_count
:
6271 instances_number
= new_instance_count
- max_instance_count
6273 instances_number
= instances_number
6275 if new_instance_count
> max_instance_count
:
6277 "reached the limit of {} (max-instance-count) "
6278 "scaling-out operations for the "
6279 "scaling-group-descriptor '{}'".format(
6280 nb_scale_op
, scaling_group
6283 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6285 # TODO Information of its own ip is not available because db_vnfr is not updated.
6286 additional_params
["OSM"] = get_osm_params(
6287 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
6289 cloud_init_list
.append(
6290 self
._parse
_cloud
_init
(
6297 vca_scaling_info
.append(
6299 "osm_vdu_id": vdu_delta
["id"],
6300 "member-vnf-index": vnf_index
,
6302 "vdu_index": vdu_index
+ x
,
6305 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
6306 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6307 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6308 kdu_name
= kdu_profile
["kdu-name"]
6309 resource_name
= kdu_profile
.get("resource-name", "")
6311 # Might have different kdus in the same delta
6312 # Should have list for each kdu
6313 if not scaling_info
["kdu-create"].get(kdu_name
, None):
6314 scaling_info
["kdu-create"][kdu_name
] = []
6316 kdur
= get_kdur(db_vnfr
, kdu_name
)
6317 if kdur
.get("helm-chart"):
6318 k8s_cluster_type
= "helm-chart-v3"
6319 self
.logger
.debug("kdur: {}".format(kdur
))
6321 kdur
.get("helm-version")
6322 and kdur
.get("helm-version") == "v2"
6324 k8s_cluster_type
= "helm-chart"
6325 elif kdur
.get("juju-bundle"):
6326 k8s_cluster_type
= "juju-bundle"
6329 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6330 "juju-bundle. Maybe an old NBI version is running".format(
6331 db_vnfr
["member-vnf-index-ref"], kdu_name
6335 max_instance_count
= 10
6336 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
6337 max_instance_count
= kdu_profile
.get(
6338 "max-number-of-instances", 10
6341 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
6342 deployed_kdu
, _
= get_deployed_kdu(
6343 nsr_deployed
, kdu_name
, vnf_index
6345 if deployed_kdu
is None:
6347 "KDU '{}' for vnf '{}' not deployed".format(
6351 kdu_instance
= deployed_kdu
.get("kdu-instance")
6352 instance_num
= await self
.k8scluster_map
[
6358 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6359 kdu_model
=deployed_kdu
.get("kdu-model"),
6361 kdu_replica_count
= instance_num
+ kdu_delta
.get(
6362 "number-of-instances", 1
6365 # Control if new count is over max and instance_num is less than max.
6366 # Then assign max instance number to kdu replica count
6367 if kdu_replica_count
> max_instance_count
> instance_num
:
6368 kdu_replica_count
= max_instance_count
6369 if kdu_replica_count
> max_instance_count
:
6371 "reached the limit of {} (max-instance-count) "
6372 "scaling-out operations for the "
6373 "scaling-group-descriptor '{}'".format(
6374 instance_num
, scaling_group
6378 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6379 vca_scaling_info
.append(
6381 "osm_kdu_id": kdu_name
,
6382 "member-vnf-index": vnf_index
,
6384 "kdu_index": instance_num
+ x
- 1,
6387 scaling_info
["kdu-create"][kdu_name
].append(
6389 "member-vnf-index": vnf_index
,
6391 "k8s-cluster-type": k8s_cluster_type
,
6392 "resource-name": resource_name
,
6393 "scale": kdu_replica_count
,
6396 elif scaling_type
== "SCALE_IN":
6397 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6399 scaling_info
["scaling_direction"] = "IN"
6400 scaling_info
["vdu-delete"] = {}
6401 scaling_info
["kdu-delete"] = {}
6403 for delta
in deltas
:
6404 for vdu_delta
in delta
.get("vdu-delta", {}):
6405 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6406 min_instance_count
= 0
6407 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6408 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
6409 min_instance_count
= vdu_profile
["min-number-of-instances"]
6411 default_instance_num
= get_number_of_instances(
6412 db_vnfd
, vdu_delta
["id"]
6414 instance_num
= vdu_delta
.get("number-of-instances", 1)
6415 nb_scale_op
-= instance_num
6417 new_instance_count
= nb_scale_op
+ default_instance_num
6419 if new_instance_count
< min_instance_count
< vdu_count
:
6420 instances_number
= min_instance_count
- new_instance_count
6422 instances_number
= instance_num
6424 if new_instance_count
< min_instance_count
:
6426 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6427 "scaling-group-descriptor '{}'".format(
6428 nb_scale_op
, scaling_group
6431 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6432 vca_scaling_info
.append(
6434 "osm_vdu_id": vdu_delta
["id"],
6435 "member-vnf-index": vnf_index
,
6437 "vdu_index": vdu_index
- 1 - x
,
6440 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
6441 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6442 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6443 kdu_name
= kdu_profile
["kdu-name"]
6444 resource_name
= kdu_profile
.get("resource-name", "")
6446 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
6447 scaling_info
["kdu-delete"][kdu_name
] = []
6449 kdur
= get_kdur(db_vnfr
, kdu_name
)
6450 if kdur
.get("helm-chart"):
6451 k8s_cluster_type
= "helm-chart-v3"
6452 self
.logger
.debug("kdur: {}".format(kdur
))
6454 kdur
.get("helm-version")
6455 and kdur
.get("helm-version") == "v2"
6457 k8s_cluster_type
= "helm-chart"
6458 elif kdur
.get("juju-bundle"):
6459 k8s_cluster_type
= "juju-bundle"
6462 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6463 "juju-bundle. Maybe an old NBI version is running".format(
6464 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
6468 min_instance_count
= 0
6469 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
6470 min_instance_count
= kdu_profile
["min-number-of-instances"]
6472 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
6473 deployed_kdu
, _
= get_deployed_kdu(
6474 nsr_deployed
, kdu_name
, vnf_index
6476 if deployed_kdu
is None:
6478 "KDU '{}' for vnf '{}' not deployed".format(
6482 kdu_instance
= deployed_kdu
.get("kdu-instance")
6483 instance_num
= await self
.k8scluster_map
[
6489 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6490 kdu_model
=deployed_kdu
.get("kdu-model"),
6492 kdu_replica_count
= instance_num
- kdu_delta
.get(
6493 "number-of-instances", 1
6496 if kdu_replica_count
< min_instance_count
< instance_num
:
6497 kdu_replica_count
= min_instance_count
6498 if kdu_replica_count
< min_instance_count
:
6500 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6501 "scaling-group-descriptor '{}'".format(
6502 instance_num
, scaling_group
6506 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6507 vca_scaling_info
.append(
6509 "osm_kdu_id": kdu_name
,
6510 "member-vnf-index": vnf_index
,
6512 "kdu_index": instance_num
- x
- 1,
6515 scaling_info
["kdu-delete"][kdu_name
].append(
6517 "member-vnf-index": vnf_index
,
6519 "k8s-cluster-type": k8s_cluster_type
,
6520 "resource-name": resource_name
,
6521 "scale": kdu_replica_count
,
6525 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
6526 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
6527 if scaling_info
["scaling_direction"] == "IN":
6528 for vdur
in reversed(db_vnfr
["vdur"]):
6529 if vdu_delete
.get(vdur
["vdu-id-ref"]):
6530 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
6531 scaling_info
["vdu"].append(
6533 "name": vdur
.get("name") or vdur
.get("vdu-name"),
6534 "vdu_id": vdur
["vdu-id-ref"],
6538 for interface
in vdur
["interfaces"]:
6539 scaling_info
["vdu"][-1]["interface"].append(
6541 "name": interface
["name"],
6542 "ip_address": interface
["ip-address"],
6543 "mac_address": interface
.get("mac-address"),
6546 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
6549 step
= "Executing pre-scale vnf-config-primitive"
6550 if scaling_descriptor
.get("scaling-config-action"):
6551 for scaling_config_action
in scaling_descriptor
[
6552 "scaling-config-action"
6555 scaling_config_action
.get("trigger") == "pre-scale-in"
6556 and scaling_type
== "SCALE_IN"
6558 scaling_config_action
.get("trigger") == "pre-scale-out"
6559 and scaling_type
== "SCALE_OUT"
6561 vnf_config_primitive
= scaling_config_action
[
6562 "vnf-config-primitive-name-ref"
6564 step
= db_nslcmop_update
[
6566 ] = "executing pre-scale scaling-config-action '{}'".format(
6567 vnf_config_primitive
6570 # look for primitive
6571 for config_primitive
in (
6572 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6573 ).get("config-primitive", ()):
6574 if config_primitive
["name"] == vnf_config_primitive
:
6578 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
6579 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
6580 "primitive".format(scaling_group
, vnf_config_primitive
)
6583 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6584 if db_vnfr
.get("additionalParamsForVnf"):
6585 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6587 scale_process
= "VCA"
6588 db_nsr_update
["config-status"] = "configuring pre-scaling"
6589 primitive_params
= self
._map
_primitive
_params
(
6590 config_primitive
, {}, vnfr_params
6593 # Pre-scale retry check: Check if this sub-operation has been executed before
6594 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6597 vnf_config_primitive
,
6601 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6602 # Skip sub-operation
6603 result
= "COMPLETED"
6604 result_detail
= "Done"
6607 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6608 vnf_config_primitive
, result
, result_detail
6612 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6613 # New sub-operation: Get index of this sub-operation
6615 len(db_nslcmop
.get("_admin", {}).get("operations"))
6620 + "vnf_config_primitive={} New sub-operation".format(
6621 vnf_config_primitive
6625 # retry: Get registered params for this existing sub-operation
6626 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6629 vnf_index
= op
.get("member_vnf_index")
6630 vnf_config_primitive
= op
.get("primitive")
6631 primitive_params
= op
.get("primitive_params")
6634 + "vnf_config_primitive={} Sub-operation retry".format(
6635 vnf_config_primitive
6638 # Execute the primitive, either with new (first-time) or registered (reintent) args
6639 ee_descriptor_id
= config_primitive
.get(
6640 "execution-environment-ref"
6642 primitive_name
= config_primitive
.get(
6643 "execution-environment-primitive", vnf_config_primitive
6645 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6646 nsr_deployed
["VCA"],
6647 member_vnf_index
=vnf_index
,
6649 vdu_count_index
=None,
6650 ee_descriptor_id
=ee_descriptor_id
,
6652 result
, result_detail
= await self
._ns
_execute
_primitive
(
6661 + "vnf_config_primitive={} Done with result {} {}".format(
6662 vnf_config_primitive
, result
, result_detail
6665 # Update operationState = COMPLETED | FAILED
6666 self
._update
_suboperation
_status
(
6667 db_nslcmop
, op_index
, result
, result_detail
6670 if result
== "FAILED":
6671 raise LcmException(result_detail
)
6672 db_nsr_update
["config-status"] = old_config_status
6673 scale_process
= None
6677 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
6680 "_admin.scaling-group.{}.time".format(admin_scale_index
)
6683 # SCALE-IN VCA - BEGIN
6684 if vca_scaling_info
:
6685 step
= db_nslcmop_update
[
6687 ] = "Deleting the execution environments"
6688 scale_process
= "VCA"
6689 for vca_info
in vca_scaling_info
:
6690 if vca_info
["type"] == "delete" and not vca_info
.get("osm_kdu_id"):
6691 member_vnf_index
= str(vca_info
["member-vnf-index"])
6693 logging_text
+ "vdu info: {}".format(vca_info
)
6695 if vca_info
.get("osm_vdu_id"):
6696 vdu_id
= vca_info
["osm_vdu_id"]
6697 vdu_index
= int(vca_info
["vdu_index"])
6700 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6701 member_vnf_index
, vdu_id
, vdu_index
6703 stage
[2] = step
= "Scaling in VCA"
6704 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6705 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
6706 config_update
= db_nsr
["configurationStatus"]
6707 for vca_index
, vca
in enumerate(vca_update
):
6709 (vca
or vca
.get("ee_id"))
6710 and vca
["member-vnf-index"] == member_vnf_index
6711 and vca
["vdu_count_index"] == vdu_index
6713 if vca
.get("vdu_id"):
6714 config_descriptor
= get_configuration(
6715 db_vnfd
, vca
.get("vdu_id")
6717 elif vca
.get("kdu_name"):
6718 config_descriptor
= get_configuration(
6719 db_vnfd
, vca
.get("kdu_name")
6722 config_descriptor
= get_configuration(
6723 db_vnfd
, db_vnfd
["id"]
6725 operation_params
= (
6726 db_nslcmop
.get("operationParams") or {}
6728 exec_terminate_primitives
= not operation_params
.get(
6729 "skip_terminate_primitives"
6730 ) and vca
.get("needed_terminate")
6731 task
= asyncio
.ensure_future(
6740 exec_primitives
=exec_terminate_primitives
,
6744 timeout
=self
.timeout_charm_delete
,
6747 tasks_dict_info
[task
] = "Terminating VCA {}".format(
6750 del vca_update
[vca_index
]
6751 del config_update
[vca_index
]
6752 # wait for pending tasks of terminate primitives
6756 + "Waiting for tasks {}".format(
6757 list(tasks_dict_info
.keys())
6760 error_list
= await self
._wait
_for
_tasks
(
6764 self
.timeout_charm_delete
, self
.timeout_ns_terminate
6769 tasks_dict_info
.clear()
6771 raise LcmException("; ".join(error_list
))
6773 db_vca_and_config_update
= {
6774 "_admin.deployed.VCA": vca_update
,
6775 "configurationStatus": config_update
,
6778 "nsrs", db_nsr
["_id"], db_vca_and_config_update
6780 scale_process
= None
6781 # SCALE-IN VCA - END
6784 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
6785 scale_process
= "RO"
6786 if self
.ro_config
.get("ng"):
6787 await self
._scale
_ng
_ro
(
6788 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
6790 scaling_info
.pop("vdu-create", None)
6791 scaling_info
.pop("vdu-delete", None)
6793 scale_process
= None
6797 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
6798 scale_process
= "KDU"
6799 await self
._scale
_kdu
(
6800 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6802 scaling_info
.pop("kdu-create", None)
6803 scaling_info
.pop("kdu-delete", None)
6805 scale_process
= None
6809 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6811 # SCALE-UP VCA - BEGIN
6812 if vca_scaling_info
:
6813 step
= db_nslcmop_update
[
6815 ] = "Creating new execution environments"
6816 scale_process
= "VCA"
6817 for vca_info
in vca_scaling_info
:
6818 if vca_info
["type"] == "create" and not vca_info
.get("osm_kdu_id"):
6819 member_vnf_index
= str(vca_info
["member-vnf-index"])
6821 logging_text
+ "vdu info: {}".format(vca_info
)
6823 vnfd_id
= db_vnfr
["vnfd-ref"]
6824 if vca_info
.get("osm_vdu_id"):
6825 vdu_index
= int(vca_info
["vdu_index"])
6826 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
6827 if db_vnfr
.get("additionalParamsForVnf"):
6828 deploy_params
.update(
6830 db_vnfr
["additionalParamsForVnf"].copy()
6833 descriptor_config
= get_configuration(
6834 db_vnfd
, db_vnfd
["id"]
6836 if descriptor_config
:
6841 logging_text
=logging_text
6842 + "member_vnf_index={} ".format(member_vnf_index
),
6845 nslcmop_id
=nslcmop_id
,
6851 member_vnf_index
=member_vnf_index
,
6852 vdu_index
=vdu_index
,
6854 deploy_params
=deploy_params
,
6855 descriptor_config
=descriptor_config
,
6856 base_folder
=base_folder
,
6857 task_instantiation_info
=tasks_dict_info
,
6860 vdu_id
= vca_info
["osm_vdu_id"]
6861 vdur
= find_in_list(
6862 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
6864 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
6865 if vdur
.get("additionalParams"):
6866 deploy_params_vdu
= parse_yaml_strings(
6867 vdur
["additionalParams"]
6870 deploy_params_vdu
= deploy_params
6871 deploy_params_vdu
["OSM"] = get_osm_params(
6872 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
6874 if descriptor_config
:
6879 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6880 member_vnf_index
, vdu_id
, vdu_index
6882 stage
[2] = step
= "Scaling out VCA"
6883 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6885 logging_text
=logging_text
6886 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6887 member_vnf_index
, vdu_id
, vdu_index
6891 nslcmop_id
=nslcmop_id
,
6897 member_vnf_index
=member_vnf_index
,
6898 vdu_index
=vdu_index
,
6900 deploy_params
=deploy_params_vdu
,
6901 descriptor_config
=descriptor_config
,
6902 base_folder
=base_folder
,
6903 task_instantiation_info
=tasks_dict_info
,
6906 # SCALE-UP VCA - END
6907 scale_process
= None
6910 # execute primitive service POST-SCALING
6911 step
= "Executing post-scale vnf-config-primitive"
6912 if scaling_descriptor
.get("scaling-config-action"):
6913 for scaling_config_action
in scaling_descriptor
[
6914 "scaling-config-action"
6917 scaling_config_action
.get("trigger") == "post-scale-in"
6918 and scaling_type
== "SCALE_IN"
6920 scaling_config_action
.get("trigger") == "post-scale-out"
6921 and scaling_type
== "SCALE_OUT"
6923 vnf_config_primitive
= scaling_config_action
[
6924 "vnf-config-primitive-name-ref"
6926 step
= db_nslcmop_update
[
6928 ] = "executing post-scale scaling-config-action '{}'".format(
6929 vnf_config_primitive
6932 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6933 if db_vnfr
.get("additionalParamsForVnf"):
6934 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6936 # look for primitive
6937 for config_primitive
in (
6938 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6939 ).get("config-primitive", ()):
6940 if config_primitive
["name"] == vnf_config_primitive
:
6944 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
6945 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
6946 "config-primitive".format(
6947 scaling_group
, vnf_config_primitive
6950 scale_process
= "VCA"
6951 db_nsr_update
["config-status"] = "configuring post-scaling"
6952 primitive_params
= self
._map
_primitive
_params
(
6953 config_primitive
, {}, vnfr_params
6956 # Post-scale retry check: Check if this sub-operation has been executed before
6957 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6960 vnf_config_primitive
,
6964 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6965 # Skip sub-operation
6966 result
= "COMPLETED"
6967 result_detail
= "Done"
6970 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6971 vnf_config_primitive
, result
, result_detail
6975 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6976 # New sub-operation: Get index of this sub-operation
6978 len(db_nslcmop
.get("_admin", {}).get("operations"))
6983 + "vnf_config_primitive={} New sub-operation".format(
6984 vnf_config_primitive
6988 # retry: Get registered params for this existing sub-operation
6989 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6992 vnf_index
= op
.get("member_vnf_index")
6993 vnf_config_primitive
= op
.get("primitive")
6994 primitive_params
= op
.get("primitive_params")
6997 + "vnf_config_primitive={} Sub-operation retry".format(
6998 vnf_config_primitive
7001 # Execute the primitive, either with new (first-time) or registered (reintent) args
7002 ee_descriptor_id
= config_primitive
.get(
7003 "execution-environment-ref"
7005 primitive_name
= config_primitive
.get(
7006 "execution-environment-primitive", vnf_config_primitive
7008 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
7009 nsr_deployed
["VCA"],
7010 member_vnf_index
=vnf_index
,
7012 vdu_count_index
=None,
7013 ee_descriptor_id
=ee_descriptor_id
,
7015 result
, result_detail
= await self
._ns
_execute
_primitive
(
7024 + "vnf_config_primitive={} Done with result {} {}".format(
7025 vnf_config_primitive
, result
, result_detail
7028 # Update operationState = COMPLETED | FAILED
7029 self
._update
_suboperation
_status
(
7030 db_nslcmop
, op_index
, result
, result_detail
7033 if result
== "FAILED":
7034 raise LcmException(result_detail
)
7035 db_nsr_update
["config-status"] = old_config_status
7036 scale_process
= None
7041 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
7042 db_nsr_update
["operational-status"] = (
7044 if old_operational_status
== "failed"
7045 else old_operational_status
7047 db_nsr_update
["config-status"] = old_config_status
7050 ROclient
.ROClientException
,
7055 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7057 except asyncio
.CancelledError
:
7059 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7061 exc
= "Operation was cancelled"
7062 except Exception as e
:
7063 exc
= traceback
.format_exc()
7064 self
.logger
.critical(
7065 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7069 self
._write
_ns
_status
(
7072 current_operation
="IDLE",
7073 current_operation_id
=None,
7076 stage
[1] = "Waiting for instantiate pending tasks."
7077 self
.logger
.debug(logging_text
+ stage
[1])
7078 exc
= await self
._wait
_for
_tasks
(
7081 self
.timeout_ns_deploy
,
7089 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7090 nslcmop_operation_state
= "FAILED"
7092 db_nsr_update
["operational-status"] = old_operational_status
7093 db_nsr_update
["config-status"] = old_config_status
7094 db_nsr_update
["detailed-status"] = ""
7096 if "VCA" in scale_process
:
7097 db_nsr_update
["config-status"] = "failed"
7098 if "RO" in scale_process
:
7099 db_nsr_update
["operational-status"] = "failed"
7102 ] = "FAILED scaling nslcmop={} {}: {}".format(
7103 nslcmop_id
, step
, exc
7106 error_description_nslcmop
= None
7107 nslcmop_operation_state
= "COMPLETED"
7108 db_nslcmop_update
["detailed-status"] = "Done"
7110 self
._write
_op
_status
(
7113 error_message
=error_description_nslcmop
,
7114 operation_state
=nslcmop_operation_state
,
7115 other_update
=db_nslcmop_update
,
7118 self
._write
_ns
_status
(
7121 current_operation
="IDLE",
7122 current_operation_id
=None,
7123 other_update
=db_nsr_update
,
7126 if nslcmop_operation_state
:
7130 "nslcmop_id": nslcmop_id
,
7131 "operationState": nslcmop_operation_state
,
7133 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
7134 except Exception as e
:
7136 logging_text
+ "kafka_write notification Exception {}".format(e
)
7138 self
.logger
.debug(logging_text
+ "Exit")
7139 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
7141 async def _scale_kdu(
7142 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
7144 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
7145 for kdu_name
in _scaling_info
:
7146 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
7147 deployed_kdu
, index
= get_deployed_kdu(
7148 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
7150 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
7151 kdu_instance
= deployed_kdu
["kdu-instance"]
7152 kdu_model
= deployed_kdu
.get("kdu-model")
7153 scale
= int(kdu_scaling_info
["scale"])
7154 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
7157 "collection": "nsrs",
7158 "filter": {"_id": nsr_id
},
7159 "path": "_admin.deployed.K8s.{}".format(index
),
7162 step
= "scaling application {}".format(
7163 kdu_scaling_info
["resource-name"]
7165 self
.logger
.debug(logging_text
+ step
)
7167 if kdu_scaling_info
["type"] == "delete":
7168 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7171 and kdu_config
.get("terminate-config-primitive")
7172 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7174 terminate_config_primitive_list
= kdu_config
.get(
7175 "terminate-config-primitive"
7177 terminate_config_primitive_list
.sort(
7178 key
=lambda val
: int(val
["seq"])
7182 terminate_config_primitive
7183 ) in terminate_config_primitive_list
:
7184 primitive_params_
= self
._map
_primitive
_params
(
7185 terminate_config_primitive
, {}, {}
7187 step
= "execute terminate config primitive"
7188 self
.logger
.debug(logging_text
+ step
)
7189 await asyncio
.wait_for(
7190 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7191 cluster_uuid
=cluster_uuid
,
7192 kdu_instance
=kdu_instance
,
7193 primitive_name
=terminate_config_primitive
["name"],
7194 params
=primitive_params_
,
7201 await asyncio
.wait_for(
7202 self
.k8scluster_map
[k8s_cluster_type
].scale(
7205 kdu_scaling_info
["resource-name"],
7207 cluster_uuid
=cluster_uuid
,
7208 kdu_model
=kdu_model
,
7212 timeout
=self
.timeout_vca_on_error
,
7215 if kdu_scaling_info
["type"] == "create":
7216 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7219 and kdu_config
.get("initial-config-primitive")
7220 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7222 initial_config_primitive_list
= kdu_config
.get(
7223 "initial-config-primitive"
7225 initial_config_primitive_list
.sort(
7226 key
=lambda val
: int(val
["seq"])
7229 for initial_config_primitive
in initial_config_primitive_list
:
7230 primitive_params_
= self
._map
_primitive
_params
(
7231 initial_config_primitive
, {}, {}
7233 step
= "execute initial config primitive"
7234 self
.logger
.debug(logging_text
+ step
)
7235 await asyncio
.wait_for(
7236 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7237 cluster_uuid
=cluster_uuid
,
7238 kdu_instance
=kdu_instance
,
7239 primitive_name
=initial_config_primitive
["name"],
7240 params
=primitive_params_
,
7247 async def _scale_ng_ro(
7248 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
7250 nsr_id
= db_nslcmop
["nsInstanceId"]
7251 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7254 # read from db: vnfd's for every vnf
7257 # for each vnf in ns, read vnfd
7258 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
7259 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
7260 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
7261 # if we haven't this vnfd, read it from db
7262 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
7264 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7265 db_vnfds
.append(vnfd
)
7266 n2vc_key
= self
.n2vc
.get_public_key()
7267 n2vc_key_list
= [n2vc_key
]
7270 vdu_scaling_info
.get("vdu-create"),
7271 vdu_scaling_info
.get("vdu-delete"),
7274 # db_vnfr has been updated, update db_vnfrs to use it
7275 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
7276 await self
._instantiate
_ng
_ro
(
7286 start_deploy
=time(),
7287 timeout_ns_deploy
=self
.timeout_ns_deploy
,
7289 if vdu_scaling_info
.get("vdu-delete"):
7291 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
7294 async def extract_prometheus_scrape_jobs(
7295 self
, ee_id
, artifact_path
, ee_config_descriptor
, vnfr_id
, nsr_id
, target_ip
7297 # look if exist a file called 'prometheus*.j2' and
7298 artifact_content
= self
.fs
.dir_ls(artifact_path
)
7302 for f
in artifact_content
7303 if f
.startswith("prometheus") and f
.endswith(".j2")
7309 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
7313 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
7314 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
7316 vnfr_id
= vnfr_id
.replace("-", "")
7318 "JOB_NAME": vnfr_id
,
7319 "TARGET_IP": target_ip
,
7320 "EXPORTER_POD_IP": host_name
,
7321 "EXPORTER_POD_PORT": host_port
,
7323 job_list
= parse_job(job_data
, variables
)
7324 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
7325 for job
in job_list
:
7327 not isinstance(job
.get("job_name"), str)
7328 or vnfr_id
not in job
["job_name"]
7330 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
7331 job
["nsr_id"] = nsr_id
7332 job
["vnfr_id"] = vnfr_id
7335 async def rebuild_start_stop(
7336 self
, nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
7338 logging_text
= "Task ns={} {}={} ".format(nsr_id
, operation_type
, nslcmop_id
)
7339 self
.logger
.info(logging_text
+ "Enter")
7340 stage
= ["Preparing the environment", ""]
7341 # database nsrs record
7345 # in case of error, indicates what part of scale was failed to put nsr at error status
7346 start_deploy
= time()
7348 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_id
})
7349 vim_account_id
= db_vnfr
.get("vim-account-id")
7350 vim_info_key
= "vim:" + vim_account_id
7351 vdu_id
= additional_param
["vdu_id"]
7352 vdurs
= [item
for item
in db_vnfr
["vdur"] if item
["vdu-id-ref"] == vdu_id
]
7353 vdur
= find_in_list(
7354 vdurs
, lambda vdu
: vdu
["count-index"] == additional_param
["count-index"]
7357 vdu_vim_name
= vdur
["name"]
7358 vim_vm_id
= vdur
["vim_info"][vim_info_key
]["vim_id"]
7359 target_vim
, _
= next(k_v
for k_v
in vdur
["vim_info"].items())
7361 raise LcmException("Target vdu is not found")
7362 self
.logger
.info("vdu_vim_name >> {} ".format(vdu_vim_name
))
7363 # wait for any previous tasks in process
7364 stage
[1] = "Waiting for previous operations to terminate"
7365 self
.logger
.info(stage
[1])
7366 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7368 stage
[1] = "Reading from database."
7369 self
.logger
.info(stage
[1])
7370 self
._write
_ns
_status
(
7373 current_operation
=operation_type
.upper(),
7374 current_operation_id
=nslcmop_id
,
7376 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7379 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
7380 db_nsr_update
["operational-status"] = operation_type
7381 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7385 "vim_vm_id": vim_vm_id
,
7387 "vdu_index": additional_param
["count-index"],
7388 "vdu_id": vdur
["id"],
7389 "target_vim": target_vim
,
7390 "vim_account_id": vim_account_id
,
7393 stage
[1] = "Sending rebuild request to RO... {}".format(desc
)
7394 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7395 self
.logger
.info("ro nsr id: {}".format(nsr_id
))
7396 result_dict
= await self
.RO
.operate(nsr_id
, desc
, operation_type
)
7397 self
.logger
.info("response from RO: {}".format(result_dict
))
7398 action_id
= result_dict
["action_id"]
7399 await self
._wait
_ng
_ro
(
7404 self
.timeout_operate
,
7406 "start_stop_rebuild",
7408 return "COMPLETED", "Done"
7409 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7410 self
.logger
.error("Exit Exception {}".format(e
))
7412 except asyncio
.CancelledError
:
7413 self
.logger
.error("Cancelled Exception while '{}'".format(stage
))
7414 exc
= "Operation was cancelled"
7415 except Exception as e
:
7416 exc
= traceback
.format_exc()
7417 self
.logger
.critical(
7418 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7420 return "FAILED", "Error in operate VNF {}".format(exc
)
7422 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7424 Get VCA Cloud and VCA Cloud Credentials for the VIM account
7426 :param: vim_account_id: VIM Account ID
7428 :return: (cloud_name, cloud_credential)
7430 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7431 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
7433 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7435 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
7437 :param: vim_account_id: VIM Account ID
7439 :return: (cloud_name, cloud_credential)
7441 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7442 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")
7444 async def migrate(self
, nsr_id
, nslcmop_id
):
7446 Migrate VNFs and VDUs instances in a NS
7448 :param: nsr_id: NS Instance ID
7449 :param: nslcmop_id: nslcmop ID of migrate
7452 # Try to lock HA task here
7453 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7454 if not task_is_locked_by_me
:
7456 logging_text
= "Task ns={} migrate ".format(nsr_id
)
7457 self
.logger
.debug(logging_text
+ "Enter")
7458 # get all needed from database
7460 db_nslcmop_update
= {}
7461 nslcmop_operation_state
= None
7465 # in case of error, indicates what part of scale was failed to put nsr at error status
7466 start_deploy
= time()
7469 # wait for any previous tasks in process
7470 step
= "Waiting for previous operations to terminate"
7471 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7473 self
._write
_ns
_status
(
7476 current_operation
="MIGRATING",
7477 current_operation_id
=nslcmop_id
,
7479 step
= "Getting nslcmop from database"
7481 step
+ " after having waited for previous tasks to be completed"
7483 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7484 migrate_params
= db_nslcmop
.get("operationParams")
7487 target
.update(migrate_params
)
7488 desc
= await self
.RO
.migrate(nsr_id
, target
)
7489 self
.logger
.debug("RO return > {}".format(desc
))
7490 action_id
= desc
["action_id"]
7491 await self
._wait
_ng
_ro
(
7496 self
.timeout_migrate
,
7497 operation
="migrate",
7499 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7500 self
.logger
.error("Exit Exception {}".format(e
))
7502 except asyncio
.CancelledError
:
7503 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
7504 exc
= "Operation was cancelled"
7505 except Exception as e
:
7506 exc
= traceback
.format_exc()
7507 self
.logger
.critical(
7508 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7511 self
._write
_ns
_status
(
7514 current_operation
="IDLE",
7515 current_operation_id
=None,
7518 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
7519 nslcmop_operation_state
= "FAILED"
7521 nslcmop_operation_state
= "COMPLETED"
7522 db_nslcmop_update
["detailed-status"] = "Done"
7523 db_nsr_update
["detailed-status"] = "Done"
7525 self
._write
_op
_status
(
7529 operation_state
=nslcmop_operation_state
,
7530 other_update
=db_nslcmop_update
,
7532 if nslcmop_operation_state
:
7536 "nslcmop_id": nslcmop_id
,
7537 "operationState": nslcmop_operation_state
,
7539 await self
.msg
.aiowrite("ns", "migrated", msg
, loop
=self
.loop
)
7540 except Exception as e
:
7542 logging_text
+ "kafka_write notification Exception {}".format(e
)
7544 self
.logger
.debug(logging_text
+ "Exit")
7545 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_migrate")
7547 async def heal(self
, nsr_id
, nslcmop_id
):
7551 :param nsr_id: ns instance to heal
7552 :param nslcmop_id: operation to run
7556 # Try to lock HA task here
7557 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7558 if not task_is_locked_by_me
:
7561 logging_text
= "Task ns={} heal={} ".format(nsr_id
, nslcmop_id
)
7562 stage
= ["", "", ""]
7563 tasks_dict_info
= {}
7564 # ^ stage, step, VIM progress
7565 self
.logger
.debug(logging_text
+ "Enter")
7566 # get all needed from database
7568 db_nslcmop_update
= {}
7570 db_vnfrs
= {} # vnf's info indexed by _id
7572 old_operational_status
= ""
7573 old_config_status
= ""
7576 # wait for any previous tasks in process
7577 step
= "Waiting for previous operations to terminate"
7578 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7579 self
._write
_ns
_status
(
7582 current_operation
="HEALING",
7583 current_operation_id
=nslcmop_id
,
7586 step
= "Getting nslcmop from database"
7588 step
+ " after having waited for previous tasks to be completed"
7590 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7592 step
= "Getting nsr from database"
7593 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
7594 old_operational_status
= db_nsr
["operational-status"]
7595 old_config_status
= db_nsr
["config-status"]
7598 "_admin.deployed.RO.operational-status": "healing",
7600 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7602 step
= "Sending heal order to VIM"
7603 task_ro
= asyncio
.ensure_future(
7605 logging_text
=logging_text
,
7607 db_nslcmop
=db_nslcmop
,
7611 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "heal_RO", task_ro
)
7612 tasks_dict_info
[task_ro
] = "Healing at VIM"
7616 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
7617 self
.logger
.debug(logging_text
+ stage
[1])
7618 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7619 self
.fs
.sync(db_nsr
["nsd-id"])
7621 # read from db: vnfr's of this ns
7622 step
= "Getting vnfrs from db"
7623 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
7624 for vnfr
in db_vnfrs_list
:
7625 db_vnfrs
[vnfr
["_id"]] = vnfr
7626 self
.logger
.debug("ns.heal db_vnfrs={}".format(db_vnfrs
))
7628 # Check for each target VNF
7629 target_list
= db_nslcmop
.get("operationParams", {}).get("healVnfData", {})
7630 for target_vnf
in target_list
:
7631 # Find this VNF in the list from DB
7632 vnfr_id
= target_vnf
.get("vnfInstanceId", None)
7634 db_vnfr
= db_vnfrs
[vnfr_id
]
7635 vnfd_id
= db_vnfr
.get("vnfd-id")
7636 vnfd_ref
= db_vnfr
.get("vnfd-ref")
7637 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7638 base_folder
= vnfd
["_admin"]["storage"]
7643 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
7644 member_vnf_index
= db_vnfr
.get("member-vnf-index-ref")
7646 # Check each target VDU and deploy N2VC
7647 target_vdu_list
= target_vnf
.get("additionalParams", {}).get(
7650 if not target_vdu_list
:
7651 # Codigo nuevo para crear diccionario
7652 target_vdu_list
= []
7653 for existing_vdu
in db_vnfr
.get("vdur"):
7654 vdu_name
= existing_vdu
.get("vdu-name", None)
7655 vdu_index
= existing_vdu
.get("count-index", 0)
7656 vdu_run_day1
= target_vnf
.get("additionalParams", {}).get(
7659 vdu_to_be_healed
= {
7661 "count-index": vdu_index
,
7662 "run-day1": vdu_run_day1
,
7664 target_vdu_list
.append(vdu_to_be_healed
)
7665 for target_vdu
in target_vdu_list
:
7666 deploy_params_vdu
= target_vdu
7667 # Set run-day1 vnf level value if not vdu level value exists
7668 if not deploy_params_vdu
.get("run-day1") and target_vnf
[
7671 deploy_params_vdu
["run-day1"] = target_vnf
[
7674 vdu_name
= target_vdu
.get("vdu-id", None)
7675 # TODO: Get vdu_id from vdud.
7677 # For multi instance VDU count-index is mandatory
7678 # For single session VDU count-indes is 0
7679 vdu_index
= target_vdu
.get("count-index", 0)
7681 # n2vc_redesign STEP 3 to 6 Deploy N2VC
7682 stage
[1] = "Deploying Execution Environments."
7683 self
.logger
.debug(logging_text
+ stage
[1])
7685 # VNF Level charm. Normal case when proxy charms.
7686 # If target instance is management machine continue with actions: recreate EE for native charms or reinject juju key for proxy charms.
7687 descriptor_config
= get_configuration(vnfd
, vnfd_ref
)
7688 if descriptor_config
:
7689 # Continue if healed machine is management machine
7690 vnf_ip_address
= db_vnfr
.get("ip-address")
7691 target_instance
= None
7692 for instance
in db_vnfr
.get("vdur", None):
7694 instance
["vdu-name"] == vdu_name
7695 and instance
["count-index"] == vdu_index
7697 target_instance
= instance
7699 if vnf_ip_address
== target_instance
.get("ip-address"):
7701 logging_text
=logging_text
7702 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7703 member_vnf_index
, vdu_name
, vdu_index
7707 nslcmop_id
=nslcmop_id
,
7713 member_vnf_index
=member_vnf_index
,
7716 deploy_params
=deploy_params_vdu
,
7717 descriptor_config
=descriptor_config
,
7718 base_folder
=base_folder
,
7719 task_instantiation_info
=tasks_dict_info
,
7723 # VDU Level charm. Normal case with native charms.
7724 descriptor_config
= get_configuration(vnfd
, vdu_name
)
7725 if descriptor_config
:
7727 logging_text
=logging_text
7728 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7729 member_vnf_index
, vdu_name
, vdu_index
7733 nslcmop_id
=nslcmop_id
,
7739 member_vnf_index
=member_vnf_index
,
7740 vdu_index
=vdu_index
,
7742 deploy_params
=deploy_params_vdu
,
7743 descriptor_config
=descriptor_config
,
7744 base_folder
=base_folder
,
7745 task_instantiation_info
=tasks_dict_info
,
7750 ROclient
.ROClientException
,
7755 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7757 except asyncio
.CancelledError
:
7759 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7761 exc
= "Operation was cancelled"
7762 except Exception as e
:
7763 exc
= traceback
.format_exc()
7764 self
.logger
.critical(
7765 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7770 stage
[1] = "Waiting for healing pending tasks."
7771 self
.logger
.debug(logging_text
+ stage
[1])
7772 exc
= await self
._wait
_for
_tasks
(
7775 self
.timeout_ns_deploy
,
7783 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7784 nslcmop_operation_state
= "FAILED"
7786 db_nsr_update
["operational-status"] = old_operational_status
7787 db_nsr_update
["config-status"] = old_config_status
7790 ] = "FAILED healing nslcmop={} {}: {}".format(nslcmop_id
, step
, exc
)
7791 for task
, task_name
in tasks_dict_info
.items():
7792 if not task
.done() or task
.cancelled() or task
.exception():
7793 if task_name
.startswith(self
.task_name_deploy_vca
):
7794 # A N2VC task is pending
7795 db_nsr_update
["config-status"] = "failed"
7797 # RO task is pending
7798 db_nsr_update
["operational-status"] = "failed"
7800 error_description_nslcmop
= None
7801 nslcmop_operation_state
= "COMPLETED"
7802 db_nslcmop_update
["detailed-status"] = "Done"
7803 db_nsr_update
["detailed-status"] = "Done"
7804 db_nsr_update
["operational-status"] = "running"
7805 db_nsr_update
["config-status"] = "configured"
7807 self
._write
_op
_status
(
7810 error_message
=error_description_nslcmop
,
7811 operation_state
=nslcmop_operation_state
,
7812 other_update
=db_nslcmop_update
,
7815 self
._write
_ns
_status
(
7818 current_operation
="IDLE",
7819 current_operation_id
=None,
7820 other_update
=db_nsr_update
,
7823 if nslcmop_operation_state
:
7827 "nslcmop_id": nslcmop_id
,
7828 "operationState": nslcmop_operation_state
,
7830 await self
.msg
.aiowrite("ns", "healed", msg
, loop
=self
.loop
)
7831 except Exception as e
:
7833 logging_text
+ "kafka_write notification Exception {}".format(e
)
7835 self
.logger
.debug(logging_text
+ "Exit")
7836 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_heal")
7847 :param logging_text: preffix text to use at logging
7848 :param nsr_id: nsr identity
7849 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
7850 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
7851 :return: None or exception
7854 def get_vim_account(vim_account_id
):
7856 if vim_account_id
in db_vims
:
7857 return db_vims
[vim_account_id
]
7858 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
7859 db_vims
[vim_account_id
] = db_vim
7864 ns_params
= db_nslcmop
.get("operationParams")
7865 if ns_params
and ns_params
.get("timeout_ns_heal"):
7866 timeout_ns_heal
= ns_params
["timeout_ns_heal"]
7868 timeout_ns_heal
= self
.timeout
.get("ns_heal", self
.timeout_ns_heal
)
7872 nslcmop_id
= db_nslcmop
["_id"]
7874 "action_id": nslcmop_id
,
7876 self
.logger
.warning(
7877 "db_nslcmop={} and timeout_ns_heal={}".format(
7878 db_nslcmop
, timeout_ns_heal
7881 target
.update(db_nslcmop
.get("operationParams", {}))
7883 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
7884 desc
= await self
.RO
.recreate(nsr_id
, target
)
7885 self
.logger
.debug("RO return > {}".format(desc
))
7886 action_id
= desc
["action_id"]
7887 # waits for RO to complete because Reinjecting juju key at ro can find VM in state Deleted
7888 await self
._wait
_ng
_ro
(
7895 operation
="healing",
7900 "_admin.deployed.RO.operational-status": "running",
7901 "detailed-status": " ".join(stage
),
7903 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7904 self
._write
_op
_status
(nslcmop_id
, stage
)
7906 logging_text
+ "ns healed at RO. RO_id={}".format(action_id
)
7909 except Exception as e
:
7910 stage
[2] = "ERROR healing at VIM"
7911 # self.set_vnfr_at_error(db_vnfrs, str(e))
7913 "Error healing at VIM {}".format(e
),
7914 exc_info
=not isinstance(
7917 ROclient
.ROClientException
,
7943 task_instantiation_info
,
7946 # launch instantiate_N2VC in a asyncio task and register task object
7947 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
7948 # if not found, create one entry and update database
7949 # fill db_nsr._admin.deployed.VCA.<index>
7952 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
7956 get_charm_name
= False
7957 if "execution-environment-list" in descriptor_config
:
7958 ee_list
= descriptor_config
.get("execution-environment-list", [])
7959 elif "juju" in descriptor_config
:
7960 ee_list
= [descriptor_config
] # ns charms
7961 if "execution-environment-list" not in descriptor_config
:
7962 # charm name is only required for ns charms
7963 get_charm_name
= True
7964 else: # other types as script are not supported
7967 for ee_item
in ee_list
:
7970 + "_deploy_n2vc ee_item juju={}, helm={}".format(
7971 ee_item
.get("juju"), ee_item
.get("helm-chart")
7974 ee_descriptor_id
= ee_item
.get("id")
7975 if ee_item
.get("juju"):
7976 vca_name
= ee_item
["juju"].get("charm")
7978 charm_name
= self
.find_charm_name(db_nsr
, str(vca_name
))
7981 if ee_item
["juju"].get("charm") is not None
7984 if ee_item
["juju"].get("cloud") == "k8s":
7985 vca_type
= "k8s_proxy_charm"
7986 elif ee_item
["juju"].get("proxy") is False:
7987 vca_type
= "native_charm"
7988 elif ee_item
.get("helm-chart"):
7989 vca_name
= ee_item
["helm-chart"]
7990 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
7993 vca_type
= "helm-v3"
7996 logging_text
+ "skipping non juju neither charm configuration"
8001 for vca_index
, vca_deployed
in enumerate(
8002 db_nsr
["_admin"]["deployed"]["VCA"]
8004 if not vca_deployed
:
8007 vca_deployed
.get("member-vnf-index") == member_vnf_index
8008 and vca_deployed
.get("vdu_id") == vdu_id
8009 and vca_deployed
.get("kdu_name") == kdu_name
8010 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
8011 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
8015 # not found, create one.
8017 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
8020 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
8022 target
+= "/kdu/{}".format(kdu_name
)
8024 "target_element": target
,
8025 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
8026 "member-vnf-index": member_vnf_index
,
8028 "kdu_name": kdu_name
,
8029 "vdu_count_index": vdu_index
,
8030 "operational-status": "init", # TODO revise
8031 "detailed-status": "", # TODO revise
8032 "step": "initial-deploy", # TODO revise
8034 "vdu_name": vdu_name
,
8036 "ee_descriptor_id": ee_descriptor_id
,
8037 "charm_name": charm_name
,
8041 # create VCA and configurationStatus in db
8043 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
8044 "configurationStatus.{}".format(vca_index
): dict(),
8046 self
.update_db_2("nsrs", nsr_id
, db_dict
)
8048 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
8050 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
8051 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
8052 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
8055 task_n2vc
= asyncio
.ensure_future(
8057 logging_text
=logging_text
,
8058 vca_index
=vca_index
,
8064 vdu_index
=vdu_index
,
8065 deploy_params
=deploy_params
,
8066 config_descriptor
=descriptor_config
,
8067 base_folder
=base_folder
,
8068 nslcmop_id
=nslcmop_id
,
8072 ee_config_descriptor
=ee_item
,
8075 self
.lcm_tasks
.register(
8079 "instantiate_N2VC-{}".format(vca_index
),
8082 task_instantiation_info
[
8084 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
8085 member_vnf_index
or "", vdu_id
or ""
8088 async def heal_N2VC(
8105 ee_config_descriptor
,
8107 nsr_id
= db_nsr
["_id"]
8108 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
8109 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
8110 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
8111 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
8113 "collection": "nsrs",
8114 "filter": {"_id": nsr_id
},
8115 "path": db_update_entry
,
8121 element_under_configuration
= nsr_id
8125 vnfr_id
= db_vnfr
["_id"]
8126 osm_config
["osm"]["vnf_id"] = vnfr_id
8128 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
8130 if vca_type
== "native_charm":
8133 index_number
= vdu_index
or 0
8136 element_type
= "VNF"
8137 element_under_configuration
= vnfr_id
8138 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
8140 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
8141 element_type
= "VDU"
8142 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
8143 osm_config
["osm"]["vdu_id"] = vdu_id
8145 namespace
+= ".{}".format(kdu_name
)
8146 element_type
= "KDU"
8147 element_under_configuration
= kdu_name
8148 osm_config
["osm"]["kdu_name"] = kdu_name
8151 if base_folder
["pkg-dir"]:
8152 artifact_path
= "{}/{}/{}/{}".format(
8153 base_folder
["folder"],
8154 base_folder
["pkg-dir"],
8157 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8162 artifact_path
= "{}/Scripts/{}/{}/".format(
8163 base_folder
["folder"],
8166 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8171 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
8173 # get initial_config_primitive_list that applies to this element
8174 initial_config_primitive_list
= config_descriptor
.get(
8175 "initial-config-primitive"
8179 "Initial config primitive list > {}".format(
8180 initial_config_primitive_list
8184 # add config if not present for NS charm
8185 ee_descriptor_id
= ee_config_descriptor
.get("id")
8186 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
8187 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
8188 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
8192 "Initial config primitive list #2 > {}".format(
8193 initial_config_primitive_list
8196 # n2vc_redesign STEP 3.1
8197 # find old ee_id if exists
8198 ee_id
= vca_deployed
.get("ee_id")
8200 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
8201 # create or register execution environment in VCA. Only for native charms when healing
8202 if vca_type
== "native_charm":
8203 step
= "Waiting to VM being up and getting IP address"
8204 self
.logger
.debug(logging_text
+ step
)
8205 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8214 credentials
= {"hostname": rw_mgmt_ip
}
8216 username
= deep_get(
8217 config_descriptor
, ("config-access", "ssh-access", "default-user")
8219 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
8220 # merged. Meanwhile let's get username from initial-config-primitive
8221 if not username
and initial_config_primitive_list
:
8222 for config_primitive
in initial_config_primitive_list
:
8223 for param
in config_primitive
.get("parameter", ()):
8224 if param
["name"] == "ssh-username":
8225 username
= param
["value"]
8229 "Cannot determine the username neither with 'initial-config-primitive' nor with "
8230 "'config-access.ssh-access.default-user'"
8232 credentials
["username"] = username
8234 # n2vc_redesign STEP 3.2
8235 # TODO: Before healing at RO it is needed to destroy native charm units to be deleted.
8236 self
._write
_configuration
_status
(
8238 vca_index
=vca_index
,
8239 status
="REGISTERING",
8240 element_under_configuration
=element_under_configuration
,
8241 element_type
=element_type
,
8244 step
= "register execution environment {}".format(credentials
)
8245 self
.logger
.debug(logging_text
+ step
)
8246 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
8247 credentials
=credentials
,
8248 namespace
=namespace
,
8253 # update ee_id en db
8255 "_admin.deployed.VCA.{}.ee_id".format(vca_index
): ee_id
,
8257 self
.update_db_2("nsrs", nsr_id
, db_dict_ee_id
)
8259 # for compatibility with MON/POL modules, the need model and application name at database
8260 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
8261 # Not sure if this need to be done when healing
8263 ee_id_parts = ee_id.split(".")
8264 db_nsr_update = {db_update_entry + "ee_id": ee_id}
8265 if len(ee_id_parts) >= 2:
8266 model_name = ee_id_parts[0]
8267 application_name = ee_id_parts[1]
8268 db_nsr_update[db_update_entry + "model"] = model_name
8269 db_nsr_update[db_update_entry + "application"] = application_name
8272 # n2vc_redesign STEP 3.3
8273 # Install configuration software. Only for native charms.
8274 step
= "Install configuration Software"
8276 self
._write
_configuration
_status
(
8278 vca_index
=vca_index
,
8279 status
="INSTALLING SW",
8280 element_under_configuration
=element_under_configuration
,
8281 element_type
=element_type
,
8282 # other_update=db_nsr_update,
8286 # TODO check if already done
8287 self
.logger
.debug(logging_text
+ step
)
8289 if vca_type
== "native_charm":
8290 config_primitive
= next(
8291 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
8294 if config_primitive
:
8295 config
= self
._map
_primitive
_params
(
8296 config_primitive
, {}, deploy_params
8298 await self
.vca_map
[vca_type
].install_configuration_sw(
8300 artifact_path
=artifact_path
,
8308 # write in db flag of configuration_sw already installed
8310 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
8313 # Not sure if this need to be done when healing
8315 # add relations for this VCA (wait for other peers related with this VCA)
8316 await self._add_vca_relations(
8317 logging_text=logging_text,
8320 vca_index=vca_index,
8324 # if SSH access is required, then get execution environment SSH public
8325 # if native charm we have waited already to VM be UP
8326 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
8329 # self.logger.debug("get ssh key block")
8331 config_descriptor
, ("config-access", "ssh-access", "required")
8333 # self.logger.debug("ssh key needed")
8334 # Needed to inject a ssh key
8337 ("config-access", "ssh-access", "default-user"),
8339 step
= "Install configuration Software, getting public ssh key"
8340 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
8341 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
8344 step
= "Insert public key into VM user={} ssh_key={}".format(
8348 # self.logger.debug("no need to get ssh key")
8349 step
= "Waiting to VM being up and getting IP address"
8350 self
.logger
.debug(logging_text
+ step
)
8352 # n2vc_redesign STEP 5.1
8353 # wait for RO (ip-address) Insert pub_key into VM
8354 # IMPORTANT: We need do wait for RO to complete healing operation.
8355 await self
._wait
_heal
_ro
(nsr_id
, self
.timeout_ns_heal
)
8358 rw_mgmt_ip
= await self
.wait_kdu_up(
8359 logging_text
, nsr_id
, vnfr_id
, kdu_name
8362 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8372 rw_mgmt_ip
= None # This is for a NS configuration
8374 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
8376 # store rw_mgmt_ip in deploy params for later replacement
8377 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
8380 # get run-day1 operation parameter
8381 runDay1
= deploy_params
.get("run-day1", False)
8383 "Healing vnf={}, vdu={}, runDay1 ={}".format(vnfr_id
, vdu_id
, runDay1
)
8386 # n2vc_redesign STEP 6 Execute initial config primitive
8387 step
= "execute initial config primitive"
8389 # wait for dependent primitives execution (NS -> VNF -> VDU)
8390 if initial_config_primitive_list
:
8391 await self
._wait
_dependent
_n
2vc
(
8392 nsr_id
, vca_deployed_list
, vca_index
8395 # stage, in function of element type: vdu, kdu, vnf or ns
8396 my_vca
= vca_deployed_list
[vca_index
]
8397 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
8399 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
8400 elif my_vca
.get("member-vnf-index"):
8402 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
8405 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
8407 self
._write
_configuration
_status
(
8408 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
8411 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
8413 check_if_terminated_needed
= True
8414 for initial_config_primitive
in initial_config_primitive_list
:
8415 # adding information on the vca_deployed if it is a NS execution environment
8416 if not vca_deployed
["member-vnf-index"]:
8417 deploy_params
["ns_config_info"] = json
.dumps(
8418 self
._get
_ns
_config
_info
(nsr_id
)
8420 # TODO check if already done
8421 primitive_params_
= self
._map
_primitive
_params
(
8422 initial_config_primitive
, {}, deploy_params
8425 step
= "execute primitive '{}' params '{}'".format(
8426 initial_config_primitive
["name"], primitive_params_
8428 self
.logger
.debug(logging_text
+ step
)
8429 await self
.vca_map
[vca_type
].exec_primitive(
8431 primitive_name
=initial_config_primitive
["name"],
8432 params_dict
=primitive_params_
,
8437 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
8438 if check_if_terminated_needed
:
8439 if config_descriptor
.get("terminate-config-primitive"):
8443 {db_update_entry
+ "needed_terminate": True},
8445 check_if_terminated_needed
= False
8447 # TODO register in database that primitive is done
8449 # STEP 7 Configure metrics
8450 # Not sure if this need to be done when healing
8452 if vca_type == "helm" or vca_type == "helm-v3":
8453 prometheus_jobs = await self.extract_prometheus_scrape_jobs(
8455 artifact_path=artifact_path,
8456 ee_config_descriptor=ee_config_descriptor,
8459 target_ip=rw_mgmt_ip,
8465 {db_update_entry + "prometheus_jobs": prometheus_jobs},
8468 for job in prometheus_jobs:
8471 {"job_name": job["job_name"]},
8474 fail_on_empty=False,
8478 step
= "instantiated at VCA"
8479 self
.logger
.debug(logging_text
+ step
)
8481 self
._write
_configuration
_status
(
8482 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
8485 except Exception as e
: # TODO not use Exception but N2VC exception
8486 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
8488 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
8491 "Exception while {} : {}".format(step
, e
), exc_info
=True
8493 self
._write
_configuration
_status
(
8494 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
8496 raise LcmException("{} {}".format(step
, e
)) from e
8498 async def _wait_heal_ro(
8504 while time() <= start_time
+ timeout
:
8505 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
8506 operational_status_ro
= db_nsr
["_admin"]["deployed"]["RO"][
8507 "operational-status"
8509 self
.logger
.debug("Wait Heal RO > {}".format(operational_status_ro
))
8510 if operational_status_ro
!= "healing":
8512 await asyncio
.sleep(15, loop
=self
.loop
)
8513 else: # timeout_ns_deploy
8514 raise NgRoException("Timeout waiting ns to deploy")
8516 async def vertical_scale(self
, nsr_id
, nslcmop_id
):
8518 Vertical Scale the VDUs in a NS
8520 :param: nsr_id: NS Instance ID
8521 :param: nslcmop_id: nslcmop ID of migrate
8524 # Try to lock HA task here
8525 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
8526 if not task_is_locked_by_me
:
8528 logging_text
= "Task ns={} vertical scale ".format(nsr_id
)
8529 self
.logger
.debug(logging_text
+ "Enter")
8530 # get all needed from database
8532 db_nslcmop_update
= {}
8533 nslcmop_operation_state
= None
8537 # in case of error, indicates what part of scale was failed to put nsr at error status
8538 start_deploy
= time()
8541 # wait for any previous tasks in process
8542 step
= "Waiting for previous operations to terminate"
8543 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
8545 self
._write
_ns
_status
(
8548 current_operation
="VerticalScale",
8549 current_operation_id
=nslcmop_id
,
8551 step
= "Getting nslcmop from database"
8553 step
+ " after having waited for previous tasks to be completed"
8555 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
8556 operationParams
= db_nslcmop
.get("operationParams")
8558 target
.update(operationParams
)
8559 desc
= await self
.RO
.vertical_scale(nsr_id
, target
)
8560 self
.logger
.debug("RO return > {}".format(desc
))
8561 action_id
= desc
["action_id"]
8562 await self
._wait
_ng
_ro
(
8567 self
.timeout_verticalscale
,
8568 operation
="verticalscale",
8570 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
8571 self
.logger
.error("Exit Exception {}".format(e
))
8573 except asyncio
.CancelledError
:
8574 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
8575 exc
= "Operation was cancelled"
8576 except Exception as e
:
8577 exc
= traceback
.format_exc()
8578 self
.logger
.critical(
8579 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
8582 self
._write
_ns
_status
(
8585 current_operation
="IDLE",
8586 current_operation_id
=None,
8589 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
8590 nslcmop_operation_state
= "FAILED"
8592 nslcmop_operation_state
= "COMPLETED"
8593 db_nslcmop_update
["detailed-status"] = "Done"
8594 db_nsr_update
["detailed-status"] = "Done"
8596 self
._write
_op
_status
(
8600 operation_state
=nslcmop_operation_state
,
8601 other_update
=db_nslcmop_update
,
8603 if nslcmop_operation_state
:
8607 "nslcmop_id": nslcmop_id
,
8608 "operationState": nslcmop_operation_state
,
8610 await self
.msg
.aiowrite("ns", "verticalscaled", msg
, loop
=self
.loop
)
8611 except Exception as e
:
8613 logging_text
+ "kafka_write notification Exception {}".format(e
)
8615 self
.logger
.debug(logging_text
+ "Exit")
8616 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_verticalscale")