1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 from typing
import Any
, Dict
, List
24 import logging
.handlers
36 from osm_lcm
import ROclient
37 from osm_lcm
.data_utils
.nsr
import (
40 get_deployed_vca_list
,
43 from osm_lcm
.data_utils
.vca
import (
52 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
53 from osm_lcm
.lcm_utils
import (
60 check_juju_bundle_existence
,
61 get_charm_artifact_path
,
63 from osm_lcm
.data_utils
.nsd
import (
64 get_ns_configuration_relation_list
,
68 from osm_lcm
.data_utils
.vnfd
import (
74 get_ee_sorted_initial_config_primitive_list
,
75 get_ee_sorted_terminate_config_primitive_list
,
77 get_virtual_link_profiles
,
82 get_number_of_instances
,
84 get_kdu_resource_profile
,
85 find_software_version
,
87 from osm_lcm
.data_utils
.list_utils
import find_in_list
88 from osm_lcm
.data_utils
.vnfr
import (
92 get_volumes_from_instantiation_params
,
94 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
95 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
96 from n2vc
.definitions
import RelationEndpoint
97 from n2vc
.k8s_helm_conn
import K8sHelmConnector
98 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
99 from n2vc
.k8s_juju_conn
import K8sJujuConnector
101 from osm_common
.dbbase
import DbException
102 from osm_common
.fsbase
import FsException
104 from osm_lcm
.data_utils
.database
.database
import Database
105 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
107 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
108 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
110 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
111 from osm_lcm
.osm_config
import OsmConfigBuilder
112 from osm_lcm
.prometheus
import parse_job
114 from copy
import copy
, deepcopy
115 from time
import time
116 from uuid
import uuid4
118 from random
import randint
120 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
123 class NsLcm(LcmBase
):
124 timeout_vca_on_error
= (
126 ) # Time for charm from first time at blocked,error status to mark as failed
127 timeout_ns_deploy
= 2 * 3600 # default global timeout for deployment a ns
128 timeout_ns_terminate
= 1800 # default global timeout for un deployment a ns
129 timeout_ns_heal
= 1800 # default global timeout for un deployment a ns
130 timeout_charm_delete
= 10 * 60
131 timeout_primitive
= 30 * 60 # timeout for primitive execution
132 timeout_ns_update
= 30 * 60 # timeout for ns update
133 timeout_progress_primitive
= (
135 ) # timeout for some progress in a primitive execution
136 timeout_migrate
= 1800 # default global timeout for migrating vnfs
137 timeout_operate
= 1800 # default global timeout for migrating vnfs
138 timeout_verticalscale
= 1800 # default global timeout for Vertical Sclaing
139 SUBOPERATION_STATUS_NOT_FOUND
= -1
140 SUBOPERATION_STATUS_NEW
= -2
141 SUBOPERATION_STATUS_SKIP
= -3
142 task_name_deploy_vca
= "Deploying VCA"
144 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
146 Init, Connect to database, filesystem storage, and messaging
147 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
150 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
152 self
.db
= Database().instance
.db
153 self
.fs
= Filesystem().instance
.fs
155 self
.lcm_tasks
= lcm_tasks
156 self
.timeout
= config
["timeout"]
157 self
.ro_config
= config
["ro_config"]
158 self
.ng_ro
= config
["ro_config"].get("ng")
159 self
.vca_config
= config
["VCA"].copy()
161 # create N2VC connector
162 self
.n2vc
= N2VCJujuConnector(
165 on_update_db
=self
._on
_update
_n
2vc
_db
,
170 self
.conn_helm_ee
= LCMHelmConn(
173 vca_config
=self
.vca_config
,
174 on_update_db
=self
._on
_update
_n
2vc
_db
,
177 self
.k8sclusterhelm2
= K8sHelmConnector(
178 kubectl_command
=self
.vca_config
.get("kubectlpath"),
179 helm_command
=self
.vca_config
.get("helmpath"),
186 self
.k8sclusterhelm3
= K8sHelm3Connector(
187 kubectl_command
=self
.vca_config
.get("kubectlpath"),
188 helm_command
=self
.vca_config
.get("helm3path"),
195 self
.k8sclusterjuju
= K8sJujuConnector(
196 kubectl_command
=self
.vca_config
.get("kubectlpath"),
197 juju_command
=self
.vca_config
.get("jujupath"),
200 on_update_db
=self
._on
_update
_k
8s
_db
,
205 self
.k8scluster_map
= {
206 "helm-chart": self
.k8sclusterhelm2
,
207 "helm-chart-v3": self
.k8sclusterhelm3
,
208 "chart": self
.k8sclusterhelm3
,
209 "juju-bundle": self
.k8sclusterjuju
,
210 "juju": self
.k8sclusterjuju
,
214 "lxc_proxy_charm": self
.n2vc
,
215 "native_charm": self
.n2vc
,
216 "k8s_proxy_charm": self
.n2vc
,
217 "helm": self
.conn_helm_ee
,
218 "helm-v3": self
.conn_helm_ee
,
222 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
)
224 self
.op_status_map
= {
225 "instantiation": self
.RO
.status
,
226 "termination": self
.RO
.status
,
227 "migrate": self
.RO
.status
,
228 "healing": self
.RO
.recreate_status
,
229 "verticalscale": self
.RO
.status
,
230 "start_stop_rebuild": self
.RO
.status
,
234 def increment_ip_mac(ip_mac
, vm_index
=1):
235 if not isinstance(ip_mac
, str):
238 # try with ipv4 look for last dot
239 i
= ip_mac
.rfind(".")
242 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
243 # try with ipv6 or mac look for last colon. Operate in hex
244 i
= ip_mac
.rfind(":")
247 # format in hex, len can be 2 for mac or 4 for ipv6
248 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
249 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
255 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
257 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
260 # TODO filter RO descriptor fields...
264 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
265 db_dict
["deploymentStatus"] = ro_descriptor
266 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
268 except Exception as e
:
270 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id
, e
)
273 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
275 # remove last dot from path (if exists)
276 if path
.endswith("."):
279 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
280 # .format(table, filter, path, updated_data))
283 nsr_id
= filter.get("_id")
285 # read ns record from database
286 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
287 current_ns_status
= nsr
.get("nsState")
289 # get vca status for NS
290 status_dict
= await self
.n2vc
.get_status(
291 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
296 db_dict
["vcaStatus"] = status_dict
298 # update configurationStatus for this VCA
300 vca_index
= int(path
[path
.rfind(".") + 1 :])
303 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
305 vca_status
= vca_list
[vca_index
].get("status")
307 configuration_status_list
= nsr
.get("configurationStatus")
308 config_status
= configuration_status_list
[vca_index
].get("status")
310 if config_status
== "BROKEN" and vca_status
!= "failed":
311 db_dict
["configurationStatus"][vca_index
] = "READY"
312 elif config_status
!= "BROKEN" and vca_status
== "failed":
313 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
314 except Exception as e
:
315 # not update configurationStatus
316 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
318 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
319 # if nsState = 'DEGRADED' check if all is OK
321 if current_ns_status
in ("READY", "DEGRADED"):
322 error_description
= ""
324 if status_dict
.get("machines"):
325 for machine_id
in status_dict
.get("machines"):
326 machine
= status_dict
.get("machines").get(machine_id
)
327 # check machine agent-status
328 if machine
.get("agent-status"):
329 s
= machine
.get("agent-status").get("status")
332 error_description
+= (
333 "machine {} agent-status={} ; ".format(
337 # check machine instance status
338 if machine
.get("instance-status"):
339 s
= machine
.get("instance-status").get("status")
342 error_description
+= (
343 "machine {} instance-status={} ; ".format(
348 if status_dict
.get("applications"):
349 for app_id
in status_dict
.get("applications"):
350 app
= status_dict
.get("applications").get(app_id
)
351 # check application status
352 if app
.get("status"):
353 s
= app
.get("status").get("status")
356 error_description
+= (
357 "application {} status={} ; ".format(app_id
, s
)
360 if error_description
:
361 db_dict
["errorDescription"] = error_description
362 if current_ns_status
== "READY" and is_degraded
:
363 db_dict
["nsState"] = "DEGRADED"
364 if current_ns_status
== "DEGRADED" and not is_degraded
:
365 db_dict
["nsState"] = "READY"
368 self
.update_db_2("nsrs", nsr_id
, db_dict
)
370 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
372 except Exception as e
:
373 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
375 async def _on_update_k8s_db(
376 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None, cluster_type
="juju"
379 Updating vca status in NSR record
380 :param cluster_uuid: UUID of a k8s cluster
381 :param kdu_instance: The unique name of the KDU instance
382 :param filter: To get nsr_id
383 :cluster_type: The cluster type (juju, k8s)
387 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
388 # .format(cluster_uuid, kdu_instance, filter))
390 nsr_id
= filter.get("_id")
392 vca_status
= await self
.k8scluster_map
[cluster_type
].status_kdu(
393 cluster_uuid
=cluster_uuid
,
394 kdu_instance
=kdu_instance
,
396 complete_status
=True,
402 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
405 f
"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
409 self
.update_db_2("nsrs", nsr_id
, db_dict
)
410 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
412 except Exception as e
:
413 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
416 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
419 undefined
=StrictUndefined
,
420 autoescape
=select_autoescape(default_for_string
=True, default
=True),
422 template
= env
.from_string(cloud_init_text
)
423 return template
.render(additional_params
or {})
424 except UndefinedError
as e
:
426 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
427 "file, must be provided in the instantiation parameters inside the "
428 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
430 except (TemplateError
, TemplateNotFound
) as e
:
432 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
437 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
438 cloud_init_content
= cloud_init_file
= None
440 if vdu
.get("cloud-init-file"):
441 base_folder
= vnfd
["_admin"]["storage"]
442 if base_folder
["pkg-dir"]:
443 cloud_init_file
= "{}/{}/cloud_init/{}".format(
444 base_folder
["folder"],
445 base_folder
["pkg-dir"],
446 vdu
["cloud-init-file"],
449 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
450 base_folder
["folder"],
451 vdu
["cloud-init-file"],
453 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
454 cloud_init_content
= ci_file
.read()
455 elif vdu
.get("cloud-init"):
456 cloud_init_content
= vdu
["cloud-init"]
458 return cloud_init_content
459 except FsException
as e
:
461 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
462 vnfd
["id"], vdu
["id"], cloud_init_file
, e
466 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
468 (vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]), {}
470 additional_params
= vdur
.get("additionalParams")
471 return parse_yaml_strings(additional_params
)
473 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
475 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
476 :param vnfd: input vnfd
477 :param new_id: overrides vnf id if provided
478 :param additionalParams: Instantiation params for VNFs provided
479 :param nsrId: Id of the NSR
480 :return: copy of vnfd
482 vnfd_RO
= deepcopy(vnfd
)
483 # remove unused by RO configuration, monitoring, scaling and internal keys
484 vnfd_RO
.pop("_id", None)
485 vnfd_RO
.pop("_admin", None)
486 vnfd_RO
.pop("monitoring-param", None)
487 vnfd_RO
.pop("scaling-group-descriptor", None)
488 vnfd_RO
.pop("kdu", None)
489 vnfd_RO
.pop("k8s-cluster", None)
491 vnfd_RO
["id"] = new_id
493 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
494 for vdu
in get_iterable(vnfd_RO
, "vdu"):
495 vdu
.pop("cloud-init-file", None)
496 vdu
.pop("cloud-init", None)
500 def ip_profile_2_RO(ip_profile
):
501 RO_ip_profile
= deepcopy(ip_profile
)
502 if "dns-server" in RO_ip_profile
:
503 if isinstance(RO_ip_profile
["dns-server"], list):
504 RO_ip_profile
["dns-address"] = []
505 for ds
in RO_ip_profile
.pop("dns-server"):
506 RO_ip_profile
["dns-address"].append(ds
["address"])
508 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
509 if RO_ip_profile
.get("ip-version") == "ipv4":
510 RO_ip_profile
["ip-version"] = "IPv4"
511 if RO_ip_profile
.get("ip-version") == "ipv6":
512 RO_ip_profile
["ip-version"] = "IPv6"
513 if "dhcp-params" in RO_ip_profile
:
514 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
517 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
518 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
519 if db_vim
["_admin"]["operationalState"] != "ENABLED":
521 "VIM={} is not available. operationalState={}".format(
522 vim_account
, db_vim
["_admin"]["operationalState"]
525 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
528 def get_ro_wim_id_for_wim_account(self
, wim_account
):
529 if isinstance(wim_account
, str):
530 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
531 if db_wim
["_admin"]["operationalState"] != "ENABLED":
533 "WIM={} is not available. operationalState={}".format(
534 wim_account
, db_wim
["_admin"]["operationalState"]
537 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
542 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
544 db_vdu_push_list
= []
546 db_update
= {"_admin.modified": time()}
548 for vdu_id
, vdu_count
in vdu_create
.items():
552 for vdur
in reversed(db_vnfr
["vdur"])
553 if vdur
["vdu-id-ref"] == vdu_id
558 # Read the template saved in the db:
560 "No vdur in the database. Using the vdur-template to scale"
562 vdur_template
= db_vnfr
.get("vdur-template")
563 if not vdur_template
:
565 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
569 vdur
= vdur_template
[0]
570 # Delete a template from the database after using it
573 {"_id": db_vnfr
["_id"]},
575 pull
={"vdur-template": {"_id": vdur
["_id"]}},
577 for count
in range(vdu_count
):
578 vdur_copy
= deepcopy(vdur
)
579 vdur_copy
["status"] = "BUILD"
580 vdur_copy
["status-detailed"] = None
581 vdur_copy
["ip-address"] = None
582 vdur_copy
["_id"] = str(uuid4())
583 vdur_copy
["count-index"] += count
+ 1
584 vdur_copy
["id"] = "{}-{}".format(
585 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
587 vdur_copy
.pop("vim_info", None)
588 for iface
in vdur_copy
["interfaces"]:
589 if iface
.get("fixed-ip"):
590 iface
["ip-address"] = self
.increment_ip_mac(
591 iface
["ip-address"], count
+ 1
594 iface
.pop("ip-address", None)
595 if iface
.get("fixed-mac"):
596 iface
["mac-address"] = self
.increment_ip_mac(
597 iface
["mac-address"], count
+ 1
600 iface
.pop("mac-address", None)
604 ) # only first vdu can be managment of vnf
605 db_vdu_push_list
.append(vdur_copy
)
606 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
608 if len(db_vnfr
["vdur"]) == 1:
609 # The scale will move to 0 instances
611 "Scaling to 0 !, creating the template with the last vdur"
613 template_vdur
= [db_vnfr
["vdur"][0]]
614 for vdu_id
, vdu_count
in vdu_delete
.items():
616 indexes_to_delete
= [
618 for iv
in enumerate(db_vnfr
["vdur"])
619 if iv
[1]["vdu-id-ref"] == vdu_id
623 "vdur.{}.status".format(i
): "DELETING"
624 for i
in indexes_to_delete
[-vdu_count
:]
628 # it must be deleted one by one because common.db does not allow otherwise
631 for v
in reversed(db_vnfr
["vdur"])
632 if v
["vdu-id-ref"] == vdu_id
634 for vdu
in vdus_to_delete
[:vdu_count
]:
637 {"_id": db_vnfr
["_id"]},
639 pull
={"vdur": {"_id": vdu
["_id"]}},
643 db_push
["vdur"] = db_vdu_push_list
645 db_push
["vdur-template"] = template_vdur
648 db_vnfr
["vdur-template"] = template_vdur
649 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
650 # modify passed dictionary db_vnfr
651 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
652 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
654 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
656 Updates database nsr with the RO info for the created vld
657 :param ns_update_nsr: dictionary to be filled with the updated info
658 :param db_nsr: content of db_nsr. This is also modified
659 :param nsr_desc_RO: nsr descriptor from RO
660 :return: Nothing, LcmException is raised on errors
663 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
664 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
665 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
667 vld
["vim-id"] = net_RO
.get("vim_net_id")
668 vld
["name"] = net_RO
.get("vim_name")
669 vld
["status"] = net_RO
.get("status")
670 vld
["status-detailed"] = net_RO
.get("error_msg")
671 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
675 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
678 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
680 for db_vnfr
in db_vnfrs
.values():
681 vnfr_update
= {"status": "ERROR"}
682 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
683 if "status" not in vdur
:
684 vdur
["status"] = "ERROR"
685 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
687 vdur
["status-detailed"] = str(error_text
)
689 "vdur.{}.status-detailed".format(vdu_index
)
691 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
692 except DbException
as e
:
693 self
.logger
.error("Cannot update vnf. {}".format(e
))
695 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
697 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
698 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
699 :param nsr_desc_RO: nsr descriptor from RO
700 :return: Nothing, LcmException is raised on errors
702 for vnf_index
, db_vnfr
in db_vnfrs
.items():
703 for vnf_RO
in nsr_desc_RO
["vnfs"]:
704 if vnf_RO
["member_vnf_index"] != vnf_index
:
707 if vnf_RO
.get("ip_address"):
708 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
[
711 elif not db_vnfr
.get("ip-address"):
712 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
713 raise LcmExceptionNoMgmtIP(
714 "ns member_vnf_index '{}' has no IP address".format(
719 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
720 vdur_RO_count_index
= 0
721 if vdur
.get("pdu-type"):
723 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
724 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
726 if vdur
["count-index"] != vdur_RO_count_index
:
727 vdur_RO_count_index
+= 1
729 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
730 if vdur_RO
.get("ip_address"):
731 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
733 vdur
["ip-address"] = None
734 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
735 vdur
["name"] = vdur_RO
.get("vim_name")
736 vdur
["status"] = vdur_RO
.get("status")
737 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
738 for ifacer
in get_iterable(vdur
, "interfaces"):
739 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
740 if ifacer
["name"] == interface_RO
.get("internal_name"):
741 ifacer
["ip-address"] = interface_RO
.get(
744 ifacer
["mac-address"] = interface_RO
.get(
750 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
751 "from VIM info".format(
752 vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]
755 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
759 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
761 vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]
765 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
766 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
767 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
769 vld
["vim-id"] = net_RO
.get("vim_net_id")
770 vld
["name"] = net_RO
.get("vim_name")
771 vld
["status"] = net_RO
.get("status")
772 vld
["status-detailed"] = net_RO
.get("error_msg")
773 vnfr_update
["vld.{}".format(vld_index
)] = vld
777 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
782 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
787 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
792 def _get_ns_config_info(self
, nsr_id
):
794 Generates a mapping between vnf,vdu elements and the N2VC id
795 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
796 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
797 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
798 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
800 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
801 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
803 ns_config_info
= {"osm-config-mapping": mapping
}
804 for vca
in vca_deployed_list
:
805 if not vca
["member-vnf-index"]:
807 if not vca
["vdu_id"]:
808 mapping
[vca
["member-vnf-index"]] = vca
["application"]
812 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
814 ] = vca
["application"]
815 return ns_config_info
817 async def _instantiate_ng_ro(
834 def get_vim_account(vim_account_id
):
836 if vim_account_id
in db_vims
:
837 return db_vims
[vim_account_id
]
838 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
839 db_vims
[vim_account_id
] = db_vim
842 # modify target_vld info with instantiation parameters
843 def parse_vld_instantiation_params(
844 target_vim
, target_vld
, vld_params
, target_sdn
846 if vld_params
.get("ip-profile"):
847 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
[
850 if vld_params
.get("provider-network"):
851 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
854 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
855 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
858 if vld_params
.get("wimAccountId"):
859 target_wim
= "wim:{}".format(vld_params
["wimAccountId"])
860 target_vld
["vim_info"][target_wim
] = {}
861 for param
in ("vim-network-name", "vim-network-id"):
862 if vld_params
.get(param
):
863 if isinstance(vld_params
[param
], dict):
864 for vim
, vim_net
in vld_params
[param
].items():
865 other_target_vim
= "vim:" + vim
867 target_vld
["vim_info"],
868 (other_target_vim
, param
.replace("-", "_")),
871 else: # isinstance str
872 target_vld
["vim_info"][target_vim
][
873 param
.replace("-", "_")
874 ] = vld_params
[param
]
875 if vld_params
.get("common_id"):
876 target_vld
["common_id"] = vld_params
.get("common_id")
878 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
879 def update_ns_vld_target(target
, ns_params
):
880 for vnf_params
in ns_params
.get("vnf", ()):
881 if vnf_params
.get("vimAccountId"):
885 for vnfr
in db_vnfrs
.values()
886 if vnf_params
["member-vnf-index"]
887 == vnfr
["member-vnf-index-ref"]
891 vdur
= next((vdur
for vdur
in target_vnf
.get("vdur", ())), None)
892 for a_index
, a_vld
in enumerate(target
["ns"]["vld"]):
893 target_vld
= find_in_list(
894 get_iterable(vdur
, "interfaces"),
895 lambda iface
: iface
.get("ns-vld-id") == a_vld
["name"],
898 vld_params
= find_in_list(
899 get_iterable(ns_params
, "vld"),
900 lambda v_vld
: v_vld
["name"] in (a_vld
["name"], a_vld
["id"]),
904 if vnf_params
.get("vimAccountId") not in a_vld
.get(
907 target_vim_network_list
= [
908 v
for _
, v
in a_vld
.get("vim_info").items()
910 target_vim_network_name
= next(
912 item
.get("vim_network_name", "")
913 for item
in target_vim_network_list
918 target
["ns"]["vld"][a_index
].get("vim_info").update(
920 "vim:{}".format(vnf_params
["vimAccountId"]): {
921 "vim_network_name": target_vim_network_name
,
927 for param
in ("vim-network-name", "vim-network-id"):
928 if vld_params
.get(param
) and isinstance(
929 vld_params
[param
], dict
931 for vim
, vim_net
in vld_params
[
934 other_target_vim
= "vim:" + vim
936 target
["ns"]["vld"][a_index
].get(
941 param
.replace("-", "_"),
946 nslcmop_id
= db_nslcmop
["_id"]
948 "name": db_nsr
["name"],
951 "image": deepcopy(db_nsr
["image"]),
952 "flavor": deepcopy(db_nsr
["flavor"]),
953 "action_id": nslcmop_id
,
954 "cloud_init_content": {},
956 for image
in target
["image"]:
957 image
["vim_info"] = {}
958 for flavor
in target
["flavor"]:
959 flavor
["vim_info"] = {}
960 if db_nsr
.get("affinity-or-anti-affinity-group"):
961 target
["affinity-or-anti-affinity-group"] = deepcopy(
962 db_nsr
["affinity-or-anti-affinity-group"]
964 for affinity_or_anti_affinity_group
in target
[
965 "affinity-or-anti-affinity-group"
967 affinity_or_anti_affinity_group
["vim_info"] = {}
969 if db_nslcmop
.get("lcmOperationType") != "instantiate":
970 # get parameters of instantiation:
971 db_nslcmop_instantiate
= self
.db
.get_list(
974 "nsInstanceId": db_nslcmop
["nsInstanceId"],
975 "lcmOperationType": "instantiate",
978 ns_params
= db_nslcmop_instantiate
.get("operationParams")
980 ns_params
= db_nslcmop
.get("operationParams")
981 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
982 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
985 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
986 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
990 "mgmt-network": vld
.get("mgmt-network", False),
991 "type": vld
.get("type"),
994 "vim_network_name": vld
.get("vim-network-name"),
995 "vim_account_id": ns_params
["vimAccountId"],
999 # check if this network needs SDN assist
1000 if vld
.get("pci-interfaces"):
1001 db_vim
= get_vim_account(ns_params
["vimAccountId"])
1002 sdnc_id
= db_vim
["config"].get("sdn-controller")
1004 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
1005 target_sdn
= "sdn:{}".format(sdnc_id
)
1006 target_vld
["vim_info"][target_sdn
] = {
1008 "target_vim": target_vim
,
1010 "type": vld
.get("type"),
1013 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
1014 for nsd_vnf_profile
in nsd_vnf_profiles
:
1015 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
1016 if cp
["virtual-link-profile-id"] == vld
["id"]:
1018 "member_vnf:{}.{}".format(
1019 cp
["constituent-cpd-id"][0][
1020 "constituent-base-element-id"
1022 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
1024 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
1026 # check at nsd descriptor, if there is an ip-profile
1028 nsd_vlp
= find_in_list(
1029 get_virtual_link_profiles(nsd
),
1030 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
1035 and nsd_vlp
.get("virtual-link-protocol-data")
1036 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1038 ip_profile_source_data
= nsd_vlp
["virtual-link-protocol-data"][
1041 ip_profile_dest_data
= {}
1042 if "ip-version" in ip_profile_source_data
:
1043 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1046 if "cidr" in ip_profile_source_data
:
1047 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1050 if "gateway-ip" in ip_profile_source_data
:
1051 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
[
1054 if "dhcp-enabled" in ip_profile_source_data
:
1055 ip_profile_dest_data
["dhcp-params"] = {
1056 "enabled": ip_profile_source_data
["dhcp-enabled"]
1058 vld_params
["ip-profile"] = ip_profile_dest_data
1060 # update vld_params with instantiation params
1061 vld_instantiation_params
= find_in_list(
1062 get_iterable(ns_params
, "vld"),
1063 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
1065 if vld_instantiation_params
:
1066 vld_params
.update(vld_instantiation_params
)
1067 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
1068 target
["ns"]["vld"].append(target_vld
)
1069 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1070 update_ns_vld_target(target
, ns_params
)
1072 for vnfr
in db_vnfrs
.values():
1073 vnfd
= find_in_list(
1074 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
1076 vnf_params
= find_in_list(
1077 get_iterable(ns_params
, "vnf"),
1078 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
1080 target_vnf
= deepcopy(vnfr
)
1081 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
1082 for vld
in target_vnf
.get("vld", ()):
1083 # check if connected to a ns.vld, to fill target'
1084 vnf_cp
= find_in_list(
1085 vnfd
.get("int-virtual-link-desc", ()),
1086 lambda cpd
: cpd
.get("id") == vld
["id"],
1089 ns_cp
= "member_vnf:{}.{}".format(
1090 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
1092 if cp2target
.get(ns_cp
):
1093 vld
["target"] = cp2target
[ns_cp
]
1096 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
1098 # check if this network needs SDN assist
1100 if vld
.get("pci-interfaces"):
1101 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1102 sdnc_id
= db_vim
["config"].get("sdn-controller")
1104 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
1105 target_sdn
= "sdn:{}".format(sdnc_id
)
1106 vld
["vim_info"][target_sdn
] = {
1108 "target_vim": target_vim
,
1110 "type": vld
.get("type"),
1113 # check at vnfd descriptor, if there is an ip-profile
1115 vnfd_vlp
= find_in_list(
1116 get_virtual_link_profiles(vnfd
),
1117 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
1121 and vnfd_vlp
.get("virtual-link-protocol-data")
1122 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1124 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"][
1127 ip_profile_dest_data
= {}
1128 if "ip-version" in ip_profile_source_data
:
1129 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1132 if "cidr" in ip_profile_source_data
:
1133 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1136 if "gateway-ip" in ip_profile_source_data
:
1137 ip_profile_dest_data
[
1139 ] = ip_profile_source_data
["gateway-ip"]
1140 if "dhcp-enabled" in ip_profile_source_data
:
1141 ip_profile_dest_data
["dhcp-params"] = {
1142 "enabled": ip_profile_source_data
["dhcp-enabled"]
1145 vld_params
["ip-profile"] = ip_profile_dest_data
1146 # update vld_params with instantiation params
1148 vld_instantiation_params
= find_in_list(
1149 get_iterable(vnf_params
, "internal-vld"),
1150 lambda i_vld
: i_vld
["name"] == vld
["id"],
1152 if vld_instantiation_params
:
1153 vld_params
.update(vld_instantiation_params
)
1154 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
1157 for vdur
in target_vnf
.get("vdur", ()):
1158 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
1159 continue # This vdu must not be created
1160 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
1162 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
1165 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
1166 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
1169 and vdu_configuration
.get("config-access")
1170 and vdu_configuration
.get("config-access").get("ssh-access")
1172 vdur
["ssh-keys"] = ssh_keys_all
1173 vdur
["ssh-access-required"] = vdu_configuration
[
1175 ]["ssh-access"]["required"]
1178 and vnf_configuration
.get("config-access")
1179 and vnf_configuration
.get("config-access").get("ssh-access")
1180 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
1182 vdur
["ssh-keys"] = ssh_keys_all
1183 vdur
["ssh-access-required"] = vnf_configuration
[
1185 ]["ssh-access"]["required"]
1186 elif ssh_keys_instantiation
and find_in_list(
1187 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
1189 vdur
["ssh-keys"] = ssh_keys_instantiation
1191 self
.logger
.debug("NS > vdur > {}".format(vdur
))
1193 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1195 if vdud
.get("cloud-init-file"):
1196 vdur
["cloud-init"] = "{}:file:{}".format(
1197 vnfd
["_id"], vdud
.get("cloud-init-file")
1199 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1200 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1201 base_folder
= vnfd
["_admin"]["storage"]
1202 if base_folder
["pkg-dir"]:
1203 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1204 base_folder
["folder"],
1205 base_folder
["pkg-dir"],
1206 vdud
.get("cloud-init-file"),
1209 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
1210 base_folder
["folder"],
1211 vdud
.get("cloud-init-file"),
1213 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1214 target
["cloud_init_content"][
1217 elif vdud
.get("cloud-init"):
1218 vdur
["cloud-init"] = "{}:vdu:{}".format(
1219 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1221 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1222 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1225 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1226 deploy_params_vdu
= self
._format
_additional
_params
(
1227 vdur
.get("additionalParams") or {}
1229 deploy_params_vdu
["OSM"] = get_osm_params(
1230 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1232 vdur
["additionalParams"] = deploy_params_vdu
1235 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1236 if target_vim
not in ns_flavor
["vim_info"]:
1237 ns_flavor
["vim_info"][target_vim
] = {}
1240 # in case alternative images are provided we must check if they should be applied
1241 # for the vim_type, modify the vim_type taking into account
1242 ns_image_id
= int(vdur
["ns-image-id"])
1243 if vdur
.get("alt-image-ids"):
1244 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1245 vim_type
= db_vim
["vim_type"]
1246 for alt_image_id
in vdur
.get("alt-image-ids"):
1247 ns_alt_image
= target
["image"][int(alt_image_id
)]
1248 if vim_type
== ns_alt_image
.get("vim-type"):
1249 # must use alternative image
1251 "use alternative image id: {}".format(alt_image_id
)
1253 ns_image_id
= alt_image_id
1254 vdur
["ns-image-id"] = ns_image_id
1256 ns_image
= target
["image"][int(ns_image_id
)]
1257 if target_vim
not in ns_image
["vim_info"]:
1258 ns_image
["vim_info"][target_vim
] = {}
1261 if vdur
.get("affinity-or-anti-affinity-group-id"):
1262 for ags_id
in vdur
["affinity-or-anti-affinity-group-id"]:
1263 ns_ags
= target
["affinity-or-anti-affinity-group"][int(ags_id
)]
1264 if target_vim
not in ns_ags
["vim_info"]:
1265 ns_ags
["vim_info"][target_vim
] = {}
1267 vdur
["vim_info"] = {target_vim
: {}}
1268 # instantiation parameters
1270 vdu_instantiation_params
= find_in_list(
1271 get_iterable(vnf_params
, "vdu"),
1272 lambda i_vdu
: i_vdu
["id"] == vdud
["id"],
1274 if vdu_instantiation_params
:
1275 # Parse the vdu_volumes from the instantiation params
1276 vdu_volumes
= get_volumes_from_instantiation_params(
1277 vdu_instantiation_params
, vdud
1279 vdur
["additionalParams"]["OSM"]["vdu_volumes"] = vdu_volumes
1280 vdur_list
.append(vdur
)
1281 target_vnf
["vdur"] = vdur_list
1282 target
["vnf"].append(target_vnf
)
1284 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
1285 desc
= await self
.RO
.deploy(nsr_id
, target
)
1286 self
.logger
.debug("RO return > {}".format(desc
))
1287 action_id
= desc
["action_id"]
1288 await self
._wait
_ng
_ro
(
1295 operation
="instantiation",
1300 "_admin.deployed.RO.operational-status": "running",
1301 "detailed-status": " ".join(stage
),
1303 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1304 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1305 self
._write
_op
_status
(nslcmop_id
, stage
)
1307 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1311 async def _wait_ng_ro(
1321 detailed_status_old
= None
1323 start_time
= start_time
or time()
1324 while time() <= start_time
+ timeout
:
1325 desc_status
= await self
.op_status_map
[operation
](nsr_id
, action_id
)
1326 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1327 if desc_status
["status"] == "FAILED":
1328 raise NgRoException(desc_status
["details"])
1329 elif desc_status
["status"] == "BUILD":
1331 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1332 elif desc_status
["status"] == "DONE":
1334 stage
[2] = "Deployed at VIM"
1337 assert False, "ROclient.check_ns_status returns unknown {}".format(
1338 desc_status
["status"]
1340 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1341 detailed_status_old
= stage
[2]
1342 db_nsr_update
["detailed-status"] = " ".join(stage
)
1343 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1344 self
._write
_op
_status
(nslcmop_id
, stage
)
1345 await asyncio
.sleep(15, loop
=self
.loop
)
1346 else: # timeout_ns_deploy
1347 raise NgRoException("Timeout waiting ns to deploy")
1349 async def _terminate_ng_ro(
1350 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1355 start_deploy
= time()
1362 "action_id": nslcmop_id
,
1364 desc
= await self
.RO
.deploy(nsr_id
, target
)
1365 action_id
= desc
["action_id"]
1366 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1367 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1370 + "ns terminate action at RO. action_id={}".format(action_id
)
1374 delete_timeout
= 20 * 60 # 20 minutes
1375 await self
._wait
_ng
_ro
(
1382 operation
="termination",
1385 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1386 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1388 await self
.RO
.delete(nsr_id
)
1389 except Exception as e
:
1390 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
1391 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1392 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1393 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1395 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1397 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
1398 failed_detail
.append("delete conflict: {}".format(e
))
1401 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1404 failed_detail
.append("delete error: {}".format(e
))
1407 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1411 stage
[2] = "Error deleting from VIM"
1413 stage
[2] = "Deleted from VIM"
1414 db_nsr_update
["detailed-status"] = " ".join(stage
)
1415 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1416 self
._write
_op
_status
(nslcmop_id
, stage
)
1419 raise LcmException("; ".join(failed_detail
))
1422 async def instantiate_RO(
1436 :param logging_text: preffix text to use at logging
1437 :param nsr_id: nsr identity
1438 :param nsd: database content of ns descriptor
1439 :param db_nsr: database content of ns record
1440 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1442 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1443 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1444 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1445 :return: None or exception
1448 start_deploy
= time()
1449 ns_params
= db_nslcmop
.get("operationParams")
1450 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1451 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1453 timeout_ns_deploy
= self
.timeout
.get(
1454 "ns_deploy", self
.timeout_ns_deploy
1457 # Check for and optionally request placement optimization. Database will be updated if placement activated
1458 stage
[2] = "Waiting for Placement."
1459 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1460 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1461 for vnfr
in db_vnfrs
.values():
1462 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1465 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1467 return await self
._instantiate
_ng
_ro
(
1480 except Exception as e
:
1481 stage
[2] = "ERROR deploying at VIM"
1482 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1484 "Error deploying at VIM {}".format(e
),
1485 exc_info
=not isinstance(
1488 ROclient
.ROClientException
,
1497 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1499 Wait for kdu to be up, get ip address
1500 :param logging_text: prefix use for logging
1504 :return: IP address, K8s services
1507 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1510 while nb_tries
< 360:
1511 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1515 for x
in get_iterable(db_vnfr
, "kdur")
1516 if x
.get("kdu-name") == kdu_name
1522 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1524 if kdur
.get("status"):
1525 if kdur
["status"] in ("READY", "ENABLED"):
1526 return kdur
.get("ip-address"), kdur
.get("services")
1529 "target KDU={} is in error state".format(kdu_name
)
1532 await asyncio
.sleep(10, loop
=self
.loop
)
1534 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1536 async def wait_vm_up_insert_key_ro(
1537 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1540 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1541 :param logging_text: prefix use for logging
1546 :param pub_key: public ssh key to inject, None to skip
1547 :param user: user to apply the public ssh key
1551 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1555 target_vdu_id
= None
1561 if ro_retries
>= 360: # 1 hour
1563 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1566 await asyncio
.sleep(10, loop
=self
.loop
)
1569 if not target_vdu_id
:
1570 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1572 if not vdu_id
: # for the VNF case
1573 if db_vnfr
.get("status") == "ERROR":
1575 "Cannot inject ssh-key because target VNF is in error state"
1577 ip_address
= db_vnfr
.get("ip-address")
1583 for x
in get_iterable(db_vnfr
, "vdur")
1584 if x
.get("ip-address") == ip_address
1592 for x
in get_iterable(db_vnfr
, "vdur")
1593 if x
.get("vdu-id-ref") == vdu_id
1594 and x
.get("count-index") == vdu_index
1600 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1601 ): # If only one, this should be the target vdu
1602 vdur
= db_vnfr
["vdur"][0]
1605 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1606 vnfr_id
, vdu_id
, vdu_index
1609 # New generation RO stores information at "vim_info"
1612 if vdur
.get("vim_info"):
1614 t
for t
in vdur
["vim_info"]
1615 ) # there should be only one key
1616 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1618 vdur
.get("pdu-type")
1619 or vdur
.get("status") == "ACTIVE"
1620 or ng_ro_status
== "ACTIVE"
1622 ip_address
= vdur
.get("ip-address")
1625 target_vdu_id
= vdur
["vdu-id-ref"]
1626 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1628 "Cannot inject ssh-key because target VM is in error state"
1631 if not target_vdu_id
:
1634 # inject public key into machine
1635 if pub_key
and user
:
1636 self
.logger
.debug(logging_text
+ "Inserting RO key")
1637 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1638 if vdur
.get("pdu-type"):
1639 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1642 ro_vm_id
= "{}-{}".format(
1643 db_vnfr
["member-vnf-index-ref"], target_vdu_id
1644 ) # TODO add vdu_index
1648 "action": "inject_ssh_key",
1652 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1654 desc
= await self
.RO
.deploy(nsr_id
, target
)
1655 action_id
= desc
["action_id"]
1656 await self
._wait
_ng
_ro
(
1657 nsr_id
, action_id
, timeout
=600, operation
="instantiation"
1661 # wait until NS is deployed at RO
1663 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1664 ro_nsr_id
= deep_get(
1665 db_nsrs
, ("_admin", "deployed", "RO", "nsr_id")
1669 result_dict
= await self
.RO
.create_action(
1671 item_id_name
=ro_nsr_id
,
1673 "add_public_key": pub_key
,
1678 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1679 if not result_dict
or not isinstance(result_dict
, dict):
1681 "Unknown response from RO when injecting key"
1683 for result
in result_dict
.values():
1684 if result
.get("vim_result") == 200:
1687 raise ROclient
.ROClientException(
1688 "error injecting key: {}".format(
1689 result
.get("description")
1693 except NgRoException
as e
:
1695 "Reaching max tries injecting key. Error: {}".format(e
)
1697 except ROclient
.ROClientException
as e
:
1701 + "error injecting key: {}. Retrying until {} seconds".format(
1708 "Reaching max tries injecting key. Error: {}".format(e
)
1715 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1717 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1719 my_vca
= vca_deployed_list
[vca_index
]
1720 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1721 # vdu or kdu: no dependencies
1725 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1726 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1727 configuration_status_list
= db_nsr
["configurationStatus"]
1728 for index
, vca_deployed
in enumerate(configuration_status_list
):
1729 if index
== vca_index
:
1732 if not my_vca
.get("member-vnf-index") or (
1733 vca_deployed
.get("member-vnf-index")
1734 == my_vca
.get("member-vnf-index")
1736 internal_status
= configuration_status_list
[index
].get("status")
1737 if internal_status
== "READY":
1739 elif internal_status
== "BROKEN":
1741 "Configuration aborted because dependent charm/s has failed"
1746 # no dependencies, return
1748 await asyncio
.sleep(10)
1751 raise LcmException("Configuration aborted because dependent charm/s timeout")
1753 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1756 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1758 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1759 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1762 async def instantiate_N2VC(
1779 ee_config_descriptor
,
1781 nsr_id
= db_nsr
["_id"]
1782 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1783 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1784 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1785 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1787 "collection": "nsrs",
1788 "filter": {"_id": nsr_id
},
1789 "path": db_update_entry
,
1795 element_under_configuration
= nsr_id
1799 vnfr_id
= db_vnfr
["_id"]
1800 osm_config
["osm"]["vnf_id"] = vnfr_id
1802 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1804 if vca_type
== "native_charm":
1807 index_number
= vdu_index
or 0
1810 element_type
= "VNF"
1811 element_under_configuration
= vnfr_id
1812 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1814 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1815 element_type
= "VDU"
1816 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1817 osm_config
["osm"]["vdu_id"] = vdu_id
1819 namespace
+= ".{}".format(kdu_name
)
1820 element_type
= "KDU"
1821 element_under_configuration
= kdu_name
1822 osm_config
["osm"]["kdu_name"] = kdu_name
1825 if base_folder
["pkg-dir"]:
1826 artifact_path
= "{}/{}/{}/{}".format(
1827 base_folder
["folder"],
1828 base_folder
["pkg-dir"],
1831 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1836 artifact_path
= "{}/Scripts/{}/{}/".format(
1837 base_folder
["folder"],
1840 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1845 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1847 # get initial_config_primitive_list that applies to this element
1848 initial_config_primitive_list
= config_descriptor
.get(
1849 "initial-config-primitive"
1853 "Initial config primitive list > {}".format(
1854 initial_config_primitive_list
1858 # add config if not present for NS charm
1859 ee_descriptor_id
= ee_config_descriptor
.get("id")
1860 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1861 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1862 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1866 "Initial config primitive list #2 > {}".format(
1867 initial_config_primitive_list
1870 # n2vc_redesign STEP 3.1
1871 # find old ee_id if exists
1872 ee_id
= vca_deployed
.get("ee_id")
1874 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1875 # create or register execution environment in VCA
1876 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1878 self
._write
_configuration
_status
(
1880 vca_index
=vca_index
,
1882 element_under_configuration
=element_under_configuration
,
1883 element_type
=element_type
,
1886 step
= "create execution environment"
1887 self
.logger
.debug(logging_text
+ step
)
1891 if vca_type
== "k8s_proxy_charm":
1892 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1893 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1894 namespace
=namespace
,
1895 artifact_path
=artifact_path
,
1899 elif vca_type
== "helm" or vca_type
== "helm-v3":
1900 ee_id
, credentials
= await self
.vca_map
[
1902 ].create_execution_environment(
1903 namespace
=namespace
,
1907 artifact_path
=artifact_path
,
1908 chart_model
=vca_name
,
1912 ee_id
, credentials
= await self
.vca_map
[
1914 ].create_execution_environment(
1915 namespace
=namespace
,
1921 elif vca_type
== "native_charm":
1922 step
= "Waiting to VM being up and getting IP address"
1923 self
.logger
.debug(logging_text
+ step
)
1924 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1933 credentials
= {"hostname": rw_mgmt_ip
}
1935 username
= deep_get(
1936 config_descriptor
, ("config-access", "ssh-access", "default-user")
1938 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1939 # merged. Meanwhile let's get username from initial-config-primitive
1940 if not username
and initial_config_primitive_list
:
1941 for config_primitive
in initial_config_primitive_list
:
1942 for param
in config_primitive
.get("parameter", ()):
1943 if param
["name"] == "ssh-username":
1944 username
= param
["value"]
1948 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1949 "'config-access.ssh-access.default-user'"
1951 credentials
["username"] = username
1952 # n2vc_redesign STEP 3.2
1954 self
._write
_configuration
_status
(
1956 vca_index
=vca_index
,
1957 status
="REGISTERING",
1958 element_under_configuration
=element_under_configuration
,
1959 element_type
=element_type
,
1962 step
= "register execution environment {}".format(credentials
)
1963 self
.logger
.debug(logging_text
+ step
)
1964 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1965 credentials
=credentials
,
1966 namespace
=namespace
,
1971 # for compatibility with MON/POL modules, the need model and application name at database
1972 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1973 ee_id_parts
= ee_id
.split(".")
1974 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1975 if len(ee_id_parts
) >= 2:
1976 model_name
= ee_id_parts
[0]
1977 application_name
= ee_id_parts
[1]
1978 db_nsr_update
[db_update_entry
+ "model"] = model_name
1979 db_nsr_update
[db_update_entry
+ "application"] = application_name
1981 # n2vc_redesign STEP 3.3
1982 step
= "Install configuration Software"
1984 self
._write
_configuration
_status
(
1986 vca_index
=vca_index
,
1987 status
="INSTALLING SW",
1988 element_under_configuration
=element_under_configuration
,
1989 element_type
=element_type
,
1990 other_update
=db_nsr_update
,
1993 # TODO check if already done
1994 self
.logger
.debug(logging_text
+ step
)
1996 if vca_type
== "native_charm":
1997 config_primitive
= next(
1998 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
2001 if config_primitive
:
2002 config
= self
._map
_primitive
_params
(
2003 config_primitive
, {}, deploy_params
2006 if vca_type
== "lxc_proxy_charm":
2007 if element_type
== "NS":
2008 num_units
= db_nsr
.get("config-units") or 1
2009 elif element_type
== "VNF":
2010 num_units
= db_vnfr
.get("config-units") or 1
2011 elif element_type
== "VDU":
2012 for v
in db_vnfr
["vdur"]:
2013 if vdu_id
== v
["vdu-id-ref"]:
2014 num_units
= v
.get("config-units") or 1
2016 if vca_type
!= "k8s_proxy_charm":
2017 await self
.vca_map
[vca_type
].install_configuration_sw(
2019 artifact_path
=artifact_path
,
2022 num_units
=num_units
,
2027 # write in db flag of configuration_sw already installed
2029 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
2032 # add relations for this VCA (wait for other peers related with this VCA)
2033 await self
._add
_vca
_relations
(
2034 logging_text
=logging_text
,
2037 vca_index
=vca_index
,
2040 # if SSH access is required, then get execution environment SSH public
2041 # if native charm we have waited already to VM be UP
2042 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
2045 # self.logger.debug("get ssh key block")
2047 config_descriptor
, ("config-access", "ssh-access", "required")
2049 # self.logger.debug("ssh key needed")
2050 # Needed to inject a ssh key
2053 ("config-access", "ssh-access", "default-user"),
2055 step
= "Install configuration Software, getting public ssh key"
2056 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
2057 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
2060 step
= "Insert public key into VM user={} ssh_key={}".format(
2064 # self.logger.debug("no need to get ssh key")
2065 step
= "Waiting to VM being up and getting IP address"
2066 self
.logger
.debug(logging_text
+ step
)
2068 # default rw_mgmt_ip to None, avoiding the non definition of the variable
2071 # n2vc_redesign STEP 5.1
2072 # wait for RO (ip-address) Insert pub_key into VM
2075 rw_mgmt_ip
, services
= await self
.wait_kdu_up(
2076 logging_text
, nsr_id
, vnfr_id
, kdu_name
2078 vnfd
= self
.db
.get_one(
2080 {"_id": f
'{db_vnfr["vnfd-id"]}:{db_vnfr["revision"]}'},
2082 kdu
= get_kdu(vnfd
, kdu_name
)
2084 service
["name"] for service
in get_kdu_services(kdu
)
2086 exposed_services
= []
2087 for service
in services
:
2088 if any(s
in service
["name"] for s
in kdu_services
):
2089 exposed_services
.append(service
)
2090 await self
.vca_map
[vca_type
].exec_primitive(
2092 primitive_name
="config",
2094 "osm-config": json
.dumps(
2096 k8s
={"services": exposed_services
}
2103 # This verification is needed in order to avoid trying to add a public key
2104 # to a VM, when the VNF is a KNF (in the edge case where the user creates a VCA
2105 # for a KNF and not for its KDUs, the previous verification gives False, and the code
2106 # jumps to this block, meaning that there is the need to verify if the VNF is actually a VNF
2108 elif db_vnfr
.get("vdur"):
2109 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
2119 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
2121 # store rw_mgmt_ip in deploy params for later replacement
2122 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
2124 # n2vc_redesign STEP 6 Execute initial config primitive
2125 step
= "execute initial config primitive"
2127 # wait for dependent primitives execution (NS -> VNF -> VDU)
2128 if initial_config_primitive_list
:
2129 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
2131 # stage, in function of element type: vdu, kdu, vnf or ns
2132 my_vca
= vca_deployed_list
[vca_index
]
2133 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
2135 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
2136 elif my_vca
.get("member-vnf-index"):
2138 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
2141 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
2143 self
._write
_configuration
_status
(
2144 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
2147 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2149 check_if_terminated_needed
= True
2150 for initial_config_primitive
in initial_config_primitive_list
:
2151 # adding information on the vca_deployed if it is a NS execution environment
2152 if not vca_deployed
["member-vnf-index"]:
2153 deploy_params
["ns_config_info"] = json
.dumps(
2154 self
._get
_ns
_config
_info
(nsr_id
)
2156 # TODO check if already done
2157 primitive_params_
= self
._map
_primitive
_params
(
2158 initial_config_primitive
, {}, deploy_params
2161 step
= "execute primitive '{}' params '{}'".format(
2162 initial_config_primitive
["name"], primitive_params_
2164 self
.logger
.debug(logging_text
+ step
)
2165 await self
.vca_map
[vca_type
].exec_primitive(
2167 primitive_name
=initial_config_primitive
["name"],
2168 params_dict
=primitive_params_
,
2173 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2174 if check_if_terminated_needed
:
2175 if config_descriptor
.get("terminate-config-primitive"):
2177 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
2179 check_if_terminated_needed
= False
2181 # TODO register in database that primitive is done
2183 # STEP 7 Configure metrics
2184 if vca_type
== "helm" or vca_type
== "helm-v3":
2185 # TODO: review for those cases where the helm chart is a reference and
2186 # is not part of the NF package
2187 prometheus_jobs
= await self
.extract_prometheus_scrape_jobs(
2189 artifact_path
=artifact_path
,
2190 ee_config_descriptor
=ee_config_descriptor
,
2193 target_ip
=rw_mgmt_ip
,
2199 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
2202 for job
in prometheus_jobs
:
2205 {"job_name": job
["job_name"]},
2208 fail_on_empty
=False,
2211 step
= "instantiated at VCA"
2212 self
.logger
.debug(logging_text
+ step
)
2214 self
._write
_configuration
_status
(
2215 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
2218 except Exception as e
: # TODO not use Exception but N2VC exception
2219 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2221 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
2224 "Exception while {} : {}".format(step
, e
), exc_info
=True
2226 self
._write
_configuration
_status
(
2227 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2229 raise LcmException("{} {}".format(step
, e
)) from e
2231 def _write_ns_status(
2235 current_operation
: str,
2236 current_operation_id
: str,
2237 error_description
: str = None,
2238 error_detail
: str = None,
2239 other_update
: dict = None,
2242 Update db_nsr fields.
2245 :param current_operation:
2246 :param current_operation_id:
2247 :param error_description:
2248 :param error_detail:
2249 :param other_update: Other required changes at database if provided, will be cleared
2253 db_dict
= other_update
or {}
2256 ] = current_operation_id
# for backward compatibility
2257 db_dict
["_admin.current-operation"] = current_operation_id
2258 db_dict
["_admin.operation-type"] = (
2259 current_operation
if current_operation
!= "IDLE" else None
2261 db_dict
["currentOperation"] = current_operation
2262 db_dict
["currentOperationID"] = current_operation_id
2263 db_dict
["errorDescription"] = error_description
2264 db_dict
["errorDetail"] = error_detail
2267 db_dict
["nsState"] = ns_state
2268 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2269 except DbException
as e
:
2270 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2272 def _write_op_status(
2276 error_message
: str = None,
2277 queuePosition
: int = 0,
2278 operation_state
: str = None,
2279 other_update
: dict = None,
2282 db_dict
= other_update
or {}
2283 db_dict
["queuePosition"] = queuePosition
2284 if isinstance(stage
, list):
2285 db_dict
["stage"] = stage
[0]
2286 db_dict
["detailed-status"] = " ".join(stage
)
2287 elif stage
is not None:
2288 db_dict
["stage"] = str(stage
)
2290 if error_message
is not None:
2291 db_dict
["errorMessage"] = error_message
2292 if operation_state
is not None:
2293 db_dict
["operationState"] = operation_state
2294 db_dict
["statusEnteredTime"] = time()
2295 self
.update_db_2("nslcmops", op_id
, db_dict
)
2296 except DbException
as e
:
2298 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2301 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2303 nsr_id
= db_nsr
["_id"]
2304 # configurationStatus
2305 config_status
= db_nsr
.get("configurationStatus")
2308 "configurationStatus.{}.status".format(index
): status
2309 for index
, v
in enumerate(config_status
)
2313 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2315 except DbException
as e
:
2317 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2320 def _write_configuration_status(
2325 element_under_configuration
: str = None,
2326 element_type
: str = None,
2327 other_update
: dict = None,
2330 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2331 # .format(vca_index, status))
2334 db_path
= "configurationStatus.{}.".format(vca_index
)
2335 db_dict
= other_update
or {}
2337 db_dict
[db_path
+ "status"] = status
2338 if element_under_configuration
:
2340 db_path
+ "elementUnderConfiguration"
2341 ] = element_under_configuration
2343 db_dict
[db_path
+ "elementType"] = element_type
2344 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2345 except DbException
as e
:
2347 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2348 status
, nsr_id
, vca_index
, e
2352 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2354 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2355 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2356 Database is used because the result can be obtained from a different LCM worker in case of HA.
2357 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2358 :param db_nslcmop: database content of nslcmop
2359 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2360 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2361 computed 'vim-account-id'
2364 nslcmop_id
= db_nslcmop
["_id"]
2365 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2366 if placement_engine
== "PLA":
2368 logging_text
+ "Invoke and wait for placement optimization"
2370 await self
.msg
.aiowrite(
2371 "pla", "get_placement", {"nslcmopId": nslcmop_id
}, loop
=self
.loop
2373 db_poll_interval
= 5
2374 wait
= db_poll_interval
* 10
2376 while not pla_result
and wait
>= 0:
2377 await asyncio
.sleep(db_poll_interval
)
2378 wait
-= db_poll_interval
2379 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2380 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2384 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2387 for pla_vnf
in pla_result
["vnf"]:
2388 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2389 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2394 {"_id": vnfr
["_id"]},
2395 {"vim-account-id": pla_vnf
["vimAccountId"]},
2398 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2401 def update_nsrs_with_pla_result(self
, params
):
2403 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2405 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2407 except Exception as e
:
2408 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2410 async def instantiate(self
, nsr_id
, nslcmop_id
):
2413 :param nsr_id: ns instance to deploy
2414 :param nslcmop_id: operation to run
2418 # Try to lock HA task here
2419 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2420 if not task_is_locked_by_me
:
2422 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2426 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2427 self
.logger
.debug(logging_text
+ "Enter")
2429 # get all needed from database
2431 # database nsrs record
2434 # database nslcmops record
2437 # update operation on nsrs
2439 # update operation on nslcmops
2440 db_nslcmop_update
= {}
2442 nslcmop_operation_state
= None
2443 db_vnfrs
= {} # vnf's info indexed by member-index
2445 tasks_dict_info
= {} # from task to info text
2449 "Stage 1/5: preparation of the environment.",
2450 "Waiting for previous operations to terminate.",
2453 # ^ stage, step, VIM progress
2455 # wait for any previous tasks in process
2456 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2458 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2459 stage
[1] = "Reading from database."
2460 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2461 db_nsr_update
["detailed-status"] = "creating"
2462 db_nsr_update
["operational-status"] = "init"
2463 self
._write
_ns
_status
(
2465 ns_state
="BUILDING",
2466 current_operation
="INSTANTIATING",
2467 current_operation_id
=nslcmop_id
,
2468 other_update
=db_nsr_update
,
2470 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2472 # read from db: operation
2473 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2474 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2475 if db_nslcmop
["operationParams"].get("additionalParamsForVnf"):
2476 db_nslcmop
["operationParams"]["additionalParamsForVnf"] = json
.loads(
2477 db_nslcmop
["operationParams"]["additionalParamsForVnf"]
2479 ns_params
= db_nslcmop
.get("operationParams")
2480 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2481 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2483 timeout_ns_deploy
= self
.timeout
.get(
2484 "ns_deploy", self
.timeout_ns_deploy
2488 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2489 self
.logger
.debug(logging_text
+ stage
[1])
2490 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2491 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2492 self
.logger
.debug(logging_text
+ stage
[1])
2493 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2494 self
.fs
.sync(db_nsr
["nsd-id"])
2496 # nsr_name = db_nsr["name"] # TODO short-name??
2498 # read from db: vnf's of this ns
2499 stage
[1] = "Getting vnfrs from db."
2500 self
.logger
.debug(logging_text
+ stage
[1])
2501 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2503 # read from db: vnfd's for every vnf
2504 db_vnfds
= [] # every vnfd data
2506 # for each vnf in ns, read vnfd
2507 for vnfr
in db_vnfrs_list
:
2508 if vnfr
.get("kdur"):
2510 for kdur
in vnfr
["kdur"]:
2511 if kdur
.get("additionalParams"):
2512 kdur
["additionalParams"] = json
.loads(
2513 kdur
["additionalParams"]
2515 kdur_list
.append(kdur
)
2516 vnfr
["kdur"] = kdur_list
2518 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2519 vnfd_id
= vnfr
["vnfd-id"]
2520 vnfd_ref
= vnfr
["vnfd-ref"]
2521 self
.fs
.sync(vnfd_id
)
2523 # if we haven't this vnfd, read it from db
2524 if vnfd_id
not in db_vnfds
:
2526 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2529 self
.logger
.debug(logging_text
+ stage
[1])
2530 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2533 db_vnfds
.append(vnfd
)
2535 # Get or generates the _admin.deployed.VCA list
2536 vca_deployed_list
= None
2537 if db_nsr
["_admin"].get("deployed"):
2538 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2539 if vca_deployed_list
is None:
2540 vca_deployed_list
= []
2541 configuration_status_list
= []
2542 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2543 db_nsr_update
["configurationStatus"] = configuration_status_list
2544 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2545 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2546 elif isinstance(vca_deployed_list
, dict):
2547 # maintain backward compatibility. Change a dict to list at database
2548 vca_deployed_list
= list(vca_deployed_list
.values())
2549 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2550 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2553 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2555 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2556 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2558 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2559 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2560 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2562 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2565 # n2vc_redesign STEP 2 Deploy Network Scenario
2566 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2567 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2569 stage
[1] = "Deploying KDUs."
2570 # self.logger.debug(logging_text + "Before deploy_kdus")
2571 # Call to deploy_kdus in case exists the "vdu:kdu" param
2572 await self
.deploy_kdus(
2573 logging_text
=logging_text
,
2575 nslcmop_id
=nslcmop_id
,
2578 task_instantiation_info
=tasks_dict_info
,
2581 stage
[1] = "Getting VCA public key."
2582 # n2vc_redesign STEP 1 Get VCA public ssh-key
2583 # feature 1429. Add n2vc public key to needed VMs
2584 n2vc_key
= self
.n2vc
.get_public_key()
2585 n2vc_key_list
= [n2vc_key
]
2586 if self
.vca_config
.get("public_key"):
2587 n2vc_key_list
.append(self
.vca_config
["public_key"])
2589 stage
[1] = "Deploying NS at VIM."
2590 task_ro
= asyncio
.ensure_future(
2591 self
.instantiate_RO(
2592 logging_text
=logging_text
,
2596 db_nslcmop
=db_nslcmop
,
2599 n2vc_key_list
=n2vc_key_list
,
2603 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2604 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2606 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2607 stage
[1] = "Deploying Execution Environments."
2608 self
.logger
.debug(logging_text
+ stage
[1])
2610 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2611 for vnf_profile
in get_vnf_profiles(nsd
):
2612 vnfd_id
= vnf_profile
["vnfd-id"]
2613 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2614 member_vnf_index
= str(vnf_profile
["id"])
2615 db_vnfr
= db_vnfrs
[member_vnf_index
]
2616 base_folder
= vnfd
["_admin"]["storage"]
2622 # Get additional parameters
2623 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2624 if db_vnfr
.get("additionalParamsForVnf"):
2625 deploy_params
.update(
2626 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2629 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2630 if descriptor_config
:
2632 logging_text
=logging_text
2633 + "member_vnf_index={} ".format(member_vnf_index
),
2636 nslcmop_id
=nslcmop_id
,
2642 member_vnf_index
=member_vnf_index
,
2643 vdu_index
=vdu_index
,
2645 deploy_params
=deploy_params
,
2646 descriptor_config
=descriptor_config
,
2647 base_folder
=base_folder
,
2648 task_instantiation_info
=tasks_dict_info
,
2652 # Deploy charms for each VDU that supports one.
2653 for vdud
in get_vdu_list(vnfd
):
2655 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2656 vdur
= find_in_list(
2657 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2660 if vdur
.get("additionalParams"):
2661 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2663 deploy_params_vdu
= deploy_params
2664 deploy_params_vdu
["OSM"] = get_osm_params(
2665 db_vnfr
, vdu_id
, vdu_count_index
=0
2667 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2669 self
.logger
.debug("VDUD > {}".format(vdud
))
2671 "Descriptor config > {}".format(descriptor_config
)
2673 if descriptor_config
:
2676 for vdu_index
in range(vdud_count
):
2677 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2679 logging_text
=logging_text
2680 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2681 member_vnf_index
, vdu_id
, vdu_index
2685 nslcmop_id
=nslcmop_id
,
2691 member_vnf_index
=member_vnf_index
,
2692 vdu_index
=vdu_index
,
2694 deploy_params
=deploy_params_vdu
,
2695 descriptor_config
=descriptor_config
,
2696 base_folder
=base_folder
,
2697 task_instantiation_info
=tasks_dict_info
,
2700 for kdud
in get_kdu_list(vnfd
):
2701 kdu_name
= kdud
["name"]
2702 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2703 if descriptor_config
:
2708 x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
2710 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2711 if kdur
.get("additionalParams"):
2712 deploy_params_kdu
.update(
2713 parse_yaml_strings(kdur
["additionalParams"].copy())
2717 logging_text
=logging_text
,
2720 nslcmop_id
=nslcmop_id
,
2726 member_vnf_index
=member_vnf_index
,
2727 vdu_index
=vdu_index
,
2729 deploy_params
=deploy_params_kdu
,
2730 descriptor_config
=descriptor_config
,
2731 base_folder
=base_folder
,
2732 task_instantiation_info
=tasks_dict_info
,
2736 # Check if this NS has a charm configuration
2737 descriptor_config
= nsd
.get("ns-configuration")
2738 if descriptor_config
and descriptor_config
.get("juju"):
2741 member_vnf_index
= None
2747 # Get additional parameters
2748 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2749 if db_nsr
.get("additionalParamsForNs"):
2750 deploy_params
.update(
2751 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2753 base_folder
= nsd
["_admin"]["storage"]
2755 logging_text
=logging_text
,
2758 nslcmop_id
=nslcmop_id
,
2764 member_vnf_index
=member_vnf_index
,
2765 vdu_index
=vdu_index
,
2767 deploy_params
=deploy_params
,
2768 descriptor_config
=descriptor_config
,
2769 base_folder
=base_folder
,
2770 task_instantiation_info
=tasks_dict_info
,
2774 # rest of staff will be done at finally
2777 ROclient
.ROClientException
,
2783 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
2786 except asyncio
.CancelledError
:
2788 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
2790 exc
= "Operation was cancelled"
2791 except Exception as e
:
2792 exc
= traceback
.format_exc()
2793 self
.logger
.critical(
2794 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
2799 error_list
.append(str(exc
))
2801 # wait for pending tasks
2803 stage
[1] = "Waiting for instantiate pending tasks."
2804 self
.logger
.debug(logging_text
+ stage
[1])
2805 error_list
+= await self
._wait
_for
_tasks
(
2813 stage
[1] = stage
[2] = ""
2814 except asyncio
.CancelledError
:
2815 error_list
.append("Cancelled")
2816 # TODO cancel all tasks
2817 except Exception as exc
:
2818 error_list
.append(str(exc
))
2820 # update operation-status
2821 db_nsr_update
["operational-status"] = "running"
2822 # let's begin with VCA 'configured' status (later we can change it)
2823 db_nsr_update
["config-status"] = "configured"
2824 for task
, task_name
in tasks_dict_info
.items():
2825 if not task
.done() or task
.cancelled() or task
.exception():
2826 if task_name
.startswith(self
.task_name_deploy_vca
):
2827 # A N2VC task is pending
2828 db_nsr_update
["config-status"] = "failed"
2830 # RO or KDU task is pending
2831 db_nsr_update
["operational-status"] = "failed"
2833 # update status at database
2835 error_detail
= ". ".join(error_list
)
2836 self
.logger
.error(logging_text
+ error_detail
)
2837 error_description_nslcmop
= "{} Detail: {}".format(
2838 stage
[0], error_detail
2840 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
2841 nslcmop_id
, stage
[0]
2844 db_nsr_update
["detailed-status"] = (
2845 error_description_nsr
+ " Detail: " + error_detail
2847 db_nslcmop_update
["detailed-status"] = error_detail
2848 nslcmop_operation_state
= "FAILED"
2852 error_description_nsr
= error_description_nslcmop
= None
2854 db_nsr_update
["detailed-status"] = "Done"
2855 db_nslcmop_update
["detailed-status"] = "Done"
2856 nslcmop_operation_state
= "COMPLETED"
2859 self
._write
_ns
_status
(
2862 current_operation
="IDLE",
2863 current_operation_id
=None,
2864 error_description
=error_description_nsr
,
2865 error_detail
=error_detail
,
2866 other_update
=db_nsr_update
,
2868 self
._write
_op
_status
(
2871 error_message
=error_description_nslcmop
,
2872 operation_state
=nslcmop_operation_state
,
2873 other_update
=db_nslcmop_update
,
2876 if nslcmop_operation_state
:
2878 await self
.msg
.aiowrite(
2883 "nslcmop_id": nslcmop_id
,
2884 "operationState": nslcmop_operation_state
,
2888 except Exception as e
:
2890 logging_text
+ "kafka_write notification Exception {}".format(e
)
2893 self
.logger
.debug(logging_text
+ "Exit")
2894 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2896 def _get_vnfd(self
, vnfd_id
: str, cached_vnfds
: Dict
[str, Any
]):
2897 if vnfd_id
not in cached_vnfds
:
2898 cached_vnfds
[vnfd_id
] = self
.db
.get_one("vnfds", {"id": vnfd_id
})
2899 return cached_vnfds
[vnfd_id
]
2901 def _get_vnfr(self
, nsr_id
: str, vnf_profile_id
: str, cached_vnfrs
: Dict
[str, Any
]):
2902 if vnf_profile_id
not in cached_vnfrs
:
2903 cached_vnfrs
[vnf_profile_id
] = self
.db
.get_one(
2906 "member-vnf-index-ref": vnf_profile_id
,
2907 "nsr-id-ref": nsr_id
,
2910 return cached_vnfrs
[vnf_profile_id
]
2912 def _is_deployed_vca_in_relation(
2913 self
, vca
: DeployedVCA
, relation
: Relation
2916 for endpoint
in (relation
.provider
, relation
.requirer
):
2917 if endpoint
["kdu-resource-profile-id"]:
2920 vca
.vnf_profile_id
== endpoint
.vnf_profile_id
2921 and vca
.vdu_profile_id
== endpoint
.vdu_profile_id
2922 and vca
.execution_environment_ref
== endpoint
.execution_environment_ref
2928 def _update_ee_relation_data_with_implicit_data(
2929 self
, nsr_id
, nsd
, ee_relation_data
, cached_vnfds
, vnf_profile_id
: str = None
2931 ee_relation_data
= safe_get_ee_relation(
2932 nsr_id
, ee_relation_data
, vnf_profile_id
=vnf_profile_id
2934 ee_relation_level
= EELevel
.get_level(ee_relation_data
)
2935 if (ee_relation_level
in (EELevel
.VNF
, EELevel
.VDU
)) and not ee_relation_data
[
2936 "execution-environment-ref"
2938 vnf_profile
= get_vnf_profile(nsd
, ee_relation_data
["vnf-profile-id"])
2939 vnfd_id
= vnf_profile
["vnfd-id"]
2940 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2943 if ee_relation_level
== EELevel
.VNF
2944 else ee_relation_data
["vdu-profile-id"]
2946 ee
= get_juju_ee_ref(db_vnfd
, entity_id
)
2949 f
"not execution environments found for ee_relation {ee_relation_data}"
2951 ee_relation_data
["execution-environment-ref"] = ee
["id"]
2952 return ee_relation_data
2954 def _get_ns_relations(
2957 nsd
: Dict
[str, Any
],
2959 cached_vnfds
: Dict
[str, Any
],
2960 ) -> List
[Relation
]:
2962 db_ns_relations
= get_ns_configuration_relation_list(nsd
)
2963 for r
in db_ns_relations
:
2964 provider_dict
= None
2965 requirer_dict
= None
2966 if all(key
in r
for key
in ("provider", "requirer")):
2967 provider_dict
= r
["provider"]
2968 requirer_dict
= r
["requirer"]
2969 elif "entities" in r
:
2970 provider_id
= r
["entities"][0]["id"]
2973 "endpoint": r
["entities"][0]["endpoint"],
2975 if provider_id
!= nsd
["id"]:
2976 provider_dict
["vnf-profile-id"] = provider_id
2977 requirer_id
= r
["entities"][1]["id"]
2980 "endpoint": r
["entities"][1]["endpoint"],
2982 if requirer_id
!= nsd
["id"]:
2983 requirer_dict
["vnf-profile-id"] = requirer_id
2986 "provider/requirer or entities must be included in the relation."
2988 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2989 nsr_id
, nsd
, provider_dict
, cached_vnfds
2991 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2992 nsr_id
, nsd
, requirer_dict
, cached_vnfds
2994 provider
= EERelation(relation_provider
)
2995 requirer
= EERelation(relation_requirer
)
2996 relation
= Relation(r
["name"], provider
, requirer
)
2997 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
2999 relations
.append(relation
)
3002 def _get_vnf_relations(
3005 nsd
: Dict
[str, Any
],
3007 cached_vnfds
: Dict
[str, Any
],
3008 ) -> List
[Relation
]:
3010 vnf_profile
= get_vnf_profile(nsd
, vca
.vnf_profile_id
)
3011 vnf_profile_id
= vnf_profile
["id"]
3012 vnfd_id
= vnf_profile
["vnfd-id"]
3013 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
3014 db_vnf_relations
= get_relation_list(db_vnfd
, vnfd_id
)
3015 for r
in db_vnf_relations
:
3016 provider_dict
= None
3017 requirer_dict
= None
3018 if all(key
in r
for key
in ("provider", "requirer")):
3019 provider_dict
= r
["provider"]
3020 requirer_dict
= r
["requirer"]
3021 elif "entities" in r
:
3022 provider_id
= r
["entities"][0]["id"]
3025 "vnf-profile-id": vnf_profile_id
,
3026 "endpoint": r
["entities"][0]["endpoint"],
3028 if provider_id
!= vnfd_id
:
3029 provider_dict
["vdu-profile-id"] = provider_id
3030 requirer_id
= r
["entities"][1]["id"]
3033 "vnf-profile-id": vnf_profile_id
,
3034 "endpoint": r
["entities"][1]["endpoint"],
3036 if requirer_id
!= vnfd_id
:
3037 requirer_dict
["vdu-profile-id"] = requirer_id
3040 "provider/requirer or entities must be included in the relation."
3042 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3043 nsr_id
, nsd
, provider_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3045 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3046 nsr_id
, nsd
, requirer_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3048 provider
= EERelation(relation_provider
)
3049 requirer
= EERelation(relation_requirer
)
3050 relation
= Relation(r
["name"], provider
, requirer
)
3051 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
3053 relations
.append(relation
)
3056 def _get_kdu_resource_data(
3058 ee_relation
: EERelation
,
3059 db_nsr
: Dict
[str, Any
],
3060 cached_vnfds
: Dict
[str, Any
],
3061 ) -> DeployedK8sResource
:
3062 nsd
= get_nsd(db_nsr
)
3063 vnf_profiles
= get_vnf_profiles(nsd
)
3064 vnfd_id
= find_in_list(
3066 lambda vnf_profile
: vnf_profile
["id"] == ee_relation
.vnf_profile_id
,
3068 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
3069 kdu_resource_profile
= get_kdu_resource_profile(
3070 db_vnfd
, ee_relation
.kdu_resource_profile_id
3072 kdu_name
= kdu_resource_profile
["kdu-name"]
3073 deployed_kdu
, _
= get_deployed_kdu(
3074 db_nsr
.get("_admin", ()).get("deployed", ()),
3076 ee_relation
.vnf_profile_id
,
3078 deployed_kdu
.update({"resource-name": kdu_resource_profile
["resource-name"]})
3081 def _get_deployed_component(
3083 ee_relation
: EERelation
,
3084 db_nsr
: Dict
[str, Any
],
3085 cached_vnfds
: Dict
[str, Any
],
3086 ) -> DeployedComponent
:
3087 nsr_id
= db_nsr
["_id"]
3088 deployed_component
= None
3089 ee_level
= EELevel
.get_level(ee_relation
)
3090 if ee_level
== EELevel
.NS
:
3091 vca
= get_deployed_vca(db_nsr
, {"vdu_id": None, "member-vnf-index": None})
3093 deployed_component
= DeployedVCA(nsr_id
, vca
)
3094 elif ee_level
== EELevel
.VNF
:
3095 vca
= get_deployed_vca(
3099 "member-vnf-index": ee_relation
.vnf_profile_id
,
3100 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3104 deployed_component
= DeployedVCA(nsr_id
, vca
)
3105 elif ee_level
== EELevel
.VDU
:
3106 vca
= get_deployed_vca(
3109 "vdu_id": ee_relation
.vdu_profile_id
,
3110 "member-vnf-index": ee_relation
.vnf_profile_id
,
3111 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3115 deployed_component
= DeployedVCA(nsr_id
, vca
)
3116 elif ee_level
== EELevel
.KDU
:
3117 kdu_resource_data
= self
._get
_kdu
_resource
_data
(
3118 ee_relation
, db_nsr
, cached_vnfds
3120 if kdu_resource_data
:
3121 deployed_component
= DeployedK8sResource(kdu_resource_data
)
3122 return deployed_component
3124 async def _add_relation(
3128 db_nsr
: Dict
[str, Any
],
3129 cached_vnfds
: Dict
[str, Any
],
3130 cached_vnfrs
: Dict
[str, Any
],
3132 deployed_provider
= self
._get
_deployed
_component
(
3133 relation
.provider
, db_nsr
, cached_vnfds
3135 deployed_requirer
= self
._get
_deployed
_component
(
3136 relation
.requirer
, db_nsr
, cached_vnfds
3140 and deployed_requirer
3141 and deployed_provider
.config_sw_installed
3142 and deployed_requirer
.config_sw_installed
3144 provider_db_vnfr
= (
3146 relation
.provider
.nsr_id
,
3147 relation
.provider
.vnf_profile_id
,
3150 if relation
.provider
.vnf_profile_id
3153 requirer_db_vnfr
= (
3155 relation
.requirer
.nsr_id
,
3156 relation
.requirer
.vnf_profile_id
,
3159 if relation
.requirer
.vnf_profile_id
3162 provider_vca_id
= self
.get_vca_id(provider_db_vnfr
, db_nsr
)
3163 requirer_vca_id
= self
.get_vca_id(requirer_db_vnfr
, db_nsr
)
3164 provider_relation_endpoint
= RelationEndpoint(
3165 deployed_provider
.ee_id
,
3167 relation
.provider
.endpoint
,
3169 requirer_relation_endpoint
= RelationEndpoint(
3170 deployed_requirer
.ee_id
,
3172 relation
.requirer
.endpoint
,
3174 await self
.vca_map
[vca_type
].add_relation(
3175 provider
=provider_relation_endpoint
,
3176 requirer
=requirer_relation_endpoint
,
3178 # remove entry from relations list
3182 async def _add_vca_relations(
3188 timeout
: int = 3600,
3192 # 1. find all relations for this VCA
3193 # 2. wait for other peers related
3197 # STEP 1: find all relations for this VCA
3200 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3201 nsd
= get_nsd(db_nsr
)
3204 deployed_vca_dict
= get_deployed_vca_list(db_nsr
)[vca_index
]
3205 my_vca
= DeployedVCA(nsr_id
, deployed_vca_dict
)
3210 relations
.extend(self
._get
_ns
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3211 relations
.extend(self
._get
_vnf
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3213 # if no relations, terminate
3215 self
.logger
.debug(logging_text
+ " No relations")
3218 self
.logger
.debug(logging_text
+ " adding relations {}".format(relations
))
3225 if now
- start
>= timeout
:
3226 self
.logger
.error(logging_text
+ " : timeout adding relations")
3229 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3230 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3232 # for each relation, find the VCA's related
3233 for relation
in relations
.copy():
3234 added
= await self
._add
_relation
(
3242 relations
.remove(relation
)
3245 self
.logger
.debug("Relations added")
3247 await asyncio
.sleep(5.0)
3251 except Exception as e
:
3252 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
3255 async def _install_kdu(
3263 k8s_instance_info
: dict,
3264 k8params
: dict = None,
3270 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
3273 "collection": "nsrs",
3274 "filter": {"_id": nsr_id
},
3275 "path": nsr_db_path
,
3278 if k8s_instance_info
.get("kdu-deployment-name"):
3279 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
3281 kdu_instance
= self
.k8scluster_map
[
3283 ].generate_kdu_instance_name(
3284 db_dict
=db_dict_install
,
3285 kdu_model
=k8s_instance_info
["kdu-model"],
3286 kdu_name
=k8s_instance_info
["kdu-name"],
3289 # Update the nsrs table with the kdu-instance value
3293 _desc
={nsr_db_path
+ ".kdu-instance": kdu_instance
},
3296 # Update the nsrs table with the actual namespace being used, if the k8scluster-type is `juju` or
3297 # `juju-bundle`. This verification is needed because there is not a standard/homogeneous namespace
3298 # between the Helm Charts and Juju Bundles-based KNFs. If we found a way of having an homogeneous
3299 # namespace, this first verification could be removed, and the next step would be done for any kind
3301 # TODO -> find a way to have an homogeneous namespace between the Helm Charts and Juju Bundles-based
3302 # KNFs (Bug 2027: https://osm.etsi.org/bugzilla/show_bug.cgi?id=2027)
3303 if k8sclustertype
in ("juju", "juju-bundle"):
3304 # First, verify if the current namespace is present in the `_admin.projects_read` (if not, it means
3305 # that the user passed a namespace which he wants its KDU to be deployed in)
3311 "_admin.projects_write": k8s_instance_info
["namespace"],
3312 "_admin.projects_read": k8s_instance_info
["namespace"],
3318 f
"Updating namespace/model for Juju Bundle from {k8s_instance_info['namespace']} to {kdu_instance}"
3323 _desc
={f
"{nsr_db_path}.namespace": kdu_instance
},
3325 k8s_instance_info
["namespace"] = kdu_instance
3327 await self
.k8scluster_map
[k8sclustertype
].install(
3328 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3329 kdu_model
=k8s_instance_info
["kdu-model"],
3332 db_dict
=db_dict_install
,
3334 kdu_name
=k8s_instance_info
["kdu-name"],
3335 namespace
=k8s_instance_info
["namespace"],
3336 kdu_instance
=kdu_instance
,
3340 # Obtain services to obtain management service ip
3341 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
3342 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3343 kdu_instance
=kdu_instance
,
3344 namespace
=k8s_instance_info
["namespace"],
3347 # Obtain management service info (if exists)
3348 vnfr_update_dict
= {}
3349 kdu_config
= get_configuration(vnfd
, kdud
["name"])
3351 target_ee_list
= kdu_config
.get("execution-environment-list", [])
3356 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
3359 for service
in kdud
.get("service", [])
3360 if service
.get("mgmt-service")
3362 for mgmt_service
in mgmt_services
:
3363 for service
in services
:
3364 if service
["name"].startswith(mgmt_service
["name"]):
3365 # Mgmt service found, Obtain service ip
3366 ip
= service
.get("external_ip", service
.get("cluster_ip"))
3367 if isinstance(ip
, list) and len(ip
) == 1:
3371 "kdur.{}.ip-address".format(kdu_index
)
3374 # Check if must update also mgmt ip at the vnf
3375 service_external_cp
= mgmt_service
.get(
3376 "external-connection-point-ref"
3378 if service_external_cp
:
3380 deep_get(vnfd
, ("mgmt-interface", "cp"))
3381 == service_external_cp
3383 vnfr_update_dict
["ip-address"] = ip
3388 "external-connection-point-ref", ""
3390 == service_external_cp
,
3393 "kdur.{}.ip-address".format(kdu_index
)
3398 "Mgmt service name: {} not found".format(
3399 mgmt_service
["name"]
3403 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3404 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3406 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3409 and kdu_config
.get("initial-config-primitive")
3410 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3412 initial_config_primitive_list
= kdu_config
.get(
3413 "initial-config-primitive"
3415 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3417 for initial_config_primitive
in initial_config_primitive_list
:
3418 primitive_params_
= self
._map
_primitive
_params
(
3419 initial_config_primitive
, {}, {}
3422 await asyncio
.wait_for(
3423 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3424 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3425 kdu_instance
=kdu_instance
,
3426 primitive_name
=initial_config_primitive
["name"],
3427 params
=primitive_params_
,
3428 db_dict
=db_dict_install
,
3434 except Exception as e
:
3435 # Prepare update db with error and raise exception
3438 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3442 vnfr_data
.get("_id"),
3443 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3446 # ignore to keep original exception
3448 # reraise original error
3453 async def deploy_kdus(
3460 task_instantiation_info
,
3462 # Launch kdus if present in the descriptor
3464 k8scluster_id_2_uuic
= {
3465 "helm-chart-v3": {},
3470 async def _get_cluster_id(cluster_id
, cluster_type
):
3471 nonlocal k8scluster_id_2_uuic
3472 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3473 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3475 # check if K8scluster is creating and wait look if previous tasks in process
3476 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3477 "k8scluster", cluster_id
3480 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3481 task_name
, cluster_id
3483 self
.logger
.debug(logging_text
+ text
)
3484 await asyncio
.wait(task_dependency
, timeout
=3600)
3486 db_k8scluster
= self
.db
.get_one(
3487 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3489 if not db_k8scluster
:
3490 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3492 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3494 if cluster_type
== "helm-chart-v3":
3496 # backward compatibility for existing clusters that have not been initialized for helm v3
3497 k8s_credentials
= yaml
.safe_dump(
3498 db_k8scluster
.get("credentials")
3500 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3501 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3503 db_k8scluster_update
= {}
3504 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3505 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3506 db_k8scluster_update
[
3507 "_admin.helm-chart-v3.created"
3509 db_k8scluster_update
[
3510 "_admin.helm-chart-v3.operationalState"
3513 "k8sclusters", cluster_id
, db_k8scluster_update
3515 except Exception as e
:
3518 + "error initializing helm-v3 cluster: {}".format(str(e
))
3521 "K8s cluster '{}' has not been initialized for '{}'".format(
3522 cluster_id
, cluster_type
3527 "K8s cluster '{}' has not been initialized for '{}'".format(
3528 cluster_id
, cluster_type
3531 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3534 logging_text
+= "Deploy kdus: "
3537 db_nsr_update
= {"_admin.deployed.K8s": []}
3538 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3541 updated_cluster_list
= []
3542 updated_v3_cluster_list
= []
3544 for vnfr_data
in db_vnfrs
.values():
3545 vca_id
= self
.get_vca_id(vnfr_data
, {})
3546 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3547 # Step 0: Prepare and set parameters
3548 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3549 vnfd_id
= vnfr_data
.get("vnfd-id")
3550 vnfd_with_id
= find_in_list(
3551 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3555 for kdud
in vnfd_with_id
["kdu"]
3556 if kdud
["name"] == kdur
["kdu-name"]
3558 namespace
= kdur
.get("k8s-namespace")
3559 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3560 if kdur
.get("helm-chart"):
3561 kdumodel
= kdur
["helm-chart"]
3562 # Default version: helm3, if helm-version is v2 assign v2
3563 k8sclustertype
= "helm-chart-v3"
3564 self
.logger
.debug("kdur: {}".format(kdur
))
3566 kdur
.get("helm-version")
3567 and kdur
.get("helm-version") == "v2"
3569 k8sclustertype
= "helm-chart"
3570 elif kdur
.get("juju-bundle"):
3571 kdumodel
= kdur
["juju-bundle"]
3572 k8sclustertype
= "juju-bundle"
3575 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3576 "juju-bundle. Maybe an old NBI version is running".format(
3577 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3580 # check if kdumodel is a file and exists
3582 vnfd_with_id
= find_in_list(
3583 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3585 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3586 if storage
: # may be not present if vnfd has not artifacts
3587 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3588 if storage
["pkg-dir"]:
3589 filename
= "{}/{}/{}s/{}".format(
3596 filename
= "{}/Scripts/{}s/{}".format(
3601 if self
.fs
.file_exists(
3602 filename
, mode
="file"
3603 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3604 kdumodel
= self
.fs
.path
+ filename
3605 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3607 except Exception: # it is not a file
3610 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3611 step
= "Synchronize repos for k8s cluster '{}'".format(
3614 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3618 k8sclustertype
== "helm-chart"
3619 and cluster_uuid
not in updated_cluster_list
3621 k8sclustertype
== "helm-chart-v3"
3622 and cluster_uuid
not in updated_v3_cluster_list
3624 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3625 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3626 cluster_uuid
=cluster_uuid
3629 if del_repo_list
or added_repo_dict
:
3630 if k8sclustertype
== "helm-chart":
3632 "_admin.helm_charts_added." + item
: None
3633 for item
in del_repo_list
3636 "_admin.helm_charts_added." + item
: name
3637 for item
, name
in added_repo_dict
.items()
3639 updated_cluster_list
.append(cluster_uuid
)
3640 elif k8sclustertype
== "helm-chart-v3":
3642 "_admin.helm_charts_v3_added." + item
: None
3643 for item
in del_repo_list
3646 "_admin.helm_charts_v3_added." + item
: name
3647 for item
, name
in added_repo_dict
.items()
3649 updated_v3_cluster_list
.append(cluster_uuid
)
3651 logging_text
+ "repos synchronized on k8s cluster "
3652 "'{}' to_delete: {}, to_add: {}".format(
3653 k8s_cluster_id
, del_repo_list
, added_repo_dict
3658 {"_id": k8s_cluster_id
},
3664 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3665 vnfr_data
["member-vnf-index-ref"],
3669 k8s_instance_info
= {
3670 "kdu-instance": None,
3671 "k8scluster-uuid": cluster_uuid
,
3672 "k8scluster-type": k8sclustertype
,
3673 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3674 "kdu-name": kdur
["kdu-name"],
3675 "kdu-model": kdumodel
,
3676 "namespace": namespace
,
3677 "kdu-deployment-name": kdu_deployment_name
,
3679 db_path
= "_admin.deployed.K8s.{}".format(index
)
3680 db_nsr_update
[db_path
] = k8s_instance_info
3681 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3682 vnfd_with_id
= find_in_list(
3683 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3685 task
= asyncio
.ensure_future(
3694 k8params
=desc_params
,
3699 self
.lcm_tasks
.register(
3703 "instantiate_KDU-{}".format(index
),
3706 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3712 except (LcmException
, asyncio
.CancelledError
):
3714 except Exception as e
:
3715 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3716 if isinstance(e
, (N2VCException
, DbException
)):
3717 self
.logger
.error(logging_text
+ msg
)
3719 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3720 raise LcmException(msg
)
3723 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3742 task_instantiation_info
,
3745 # launch instantiate_N2VC in a asyncio task and register task object
3746 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3747 # if not found, create one entry and update database
3748 # fill db_nsr._admin.deployed.VCA.<index>
3751 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
3755 get_charm_name
= False
3756 if "execution-environment-list" in descriptor_config
:
3757 ee_list
= descriptor_config
.get("execution-environment-list", [])
3758 elif "juju" in descriptor_config
:
3759 ee_list
= [descriptor_config
] # ns charms
3760 if "execution-environment-list" not in descriptor_config
:
3761 # charm name is only required for ns charms
3762 get_charm_name
= True
3763 else: # other types as script are not supported
3766 for ee_item
in ee_list
:
3769 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3770 ee_item
.get("juju"), ee_item
.get("helm-chart")
3773 ee_descriptor_id
= ee_item
.get("id")
3774 if ee_item
.get("juju"):
3775 vca_name
= ee_item
["juju"].get("charm")
3777 charm_name
= self
.find_charm_name(db_nsr
, str(vca_name
))
3780 if ee_item
["juju"].get("charm") is not None
3783 if ee_item
["juju"].get("cloud") == "k8s":
3784 vca_type
= "k8s_proxy_charm"
3785 elif ee_item
["juju"].get("proxy") is False:
3786 vca_type
= "native_charm"
3787 elif ee_item
.get("helm-chart"):
3788 vca_name
= ee_item
["helm-chart"]
3789 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
3792 vca_type
= "helm-v3"
3795 logging_text
+ "skipping non juju neither charm configuration"
3800 for vca_index
, vca_deployed
in enumerate(
3801 db_nsr
["_admin"]["deployed"]["VCA"]
3803 if not vca_deployed
:
3806 vca_deployed
.get("member-vnf-index") == member_vnf_index
3807 and vca_deployed
.get("vdu_id") == vdu_id
3808 and vca_deployed
.get("kdu_name") == kdu_name
3809 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
3810 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
3814 # not found, create one.
3816 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
3819 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
3821 target
+= "/kdu/{}".format(kdu_name
)
3823 "target_element": target
,
3824 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3825 "member-vnf-index": member_vnf_index
,
3827 "kdu_name": kdu_name
,
3828 "vdu_count_index": vdu_index
,
3829 "operational-status": "init", # TODO revise
3830 "detailed-status": "", # TODO revise
3831 "step": "initial-deploy", # TODO revise
3833 "vdu_name": vdu_name
,
3835 "ee_descriptor_id": ee_descriptor_id
,
3836 "charm_name": charm_name
,
3840 # create VCA and configurationStatus in db
3842 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
3843 "configurationStatus.{}".format(vca_index
): dict(),
3845 self
.update_db_2("nsrs", nsr_id
, db_dict
)
3847 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
3849 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
3850 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
3851 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
3854 task_n2vc
= asyncio
.ensure_future(
3855 self
.instantiate_N2VC(
3856 logging_text
=logging_text
,
3857 vca_index
=vca_index
,
3863 vdu_index
=vdu_index
,
3864 deploy_params
=deploy_params
,
3865 config_descriptor
=descriptor_config
,
3866 base_folder
=base_folder
,
3867 nslcmop_id
=nslcmop_id
,
3871 ee_config_descriptor
=ee_item
,
3874 self
.lcm_tasks
.register(
3878 "instantiate_N2VC-{}".format(vca_index
),
3881 task_instantiation_info
[
3883 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
3884 member_vnf_index
or "", vdu_id
or ""
3888 def _create_nslcmop(nsr_id
, operation
, params
):
3890 Creates a ns-lcm-opp content to be stored at database.
3891 :param nsr_id: internal id of the instance
3892 :param operation: instantiate, terminate, scale, action, ...
3893 :param params: user parameters for the operation
3894 :return: dictionary following SOL005 format
3896 # Raise exception if invalid arguments
3897 if not (nsr_id
and operation
and params
):
3899 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3906 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3907 "operationState": "PROCESSING",
3908 "statusEnteredTime": now
,
3909 "nsInstanceId": nsr_id
,
3910 "lcmOperationType": operation
,
3912 "isAutomaticInvocation": False,
3913 "operationParams": params
,
3914 "isCancelPending": False,
3916 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
3917 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
3922 def _format_additional_params(self
, params
):
3923 params
= params
or {}
3924 for key
, value
in params
.items():
3925 if str(value
).startswith("!!yaml "):
3926 params
[key
] = yaml
.safe_load(value
[7:])
3929 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
3930 primitive
= seq
.get("name")
3931 primitive_params
= {}
3933 "member_vnf_index": vnf_index
,
3934 "primitive": primitive
,
3935 "primitive_params": primitive_params
,
3938 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
3942 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
3943 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
3944 if op
.get("operationState") == "COMPLETED":
3945 # b. Skip sub-operation
3946 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3947 return self
.SUBOPERATION_STATUS_SKIP
3949 # c. retry executing sub-operation
3950 # The sub-operation exists, and operationState != 'COMPLETED'
3951 # Update operationState = 'PROCESSING' to indicate a retry.
3952 operationState
= "PROCESSING"
3953 detailed_status
= "In progress"
3954 self
._update
_suboperation
_status
(
3955 db_nslcmop
, op_index
, operationState
, detailed_status
3957 # Return the sub-operation index
3958 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3959 # with arguments extracted from the sub-operation
3962 # Find a sub-operation where all keys in a matching dictionary must match
3963 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3964 def _find_suboperation(self
, db_nslcmop
, match
):
3965 if db_nslcmop
and match
:
3966 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
3967 for i
, op
in enumerate(op_list
):
3968 if all(op
.get(k
) == match
[k
] for k
in match
):
3970 return self
.SUBOPERATION_STATUS_NOT_FOUND
3972 # Update status for a sub-operation given its index
3973 def _update_suboperation_status(
3974 self
, db_nslcmop
, op_index
, operationState
, detailed_status
3976 # Update DB for HA tasks
3977 q_filter
= {"_id": db_nslcmop
["_id"]}
3979 "_admin.operations.{}.operationState".format(op_index
): operationState
,
3980 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
3983 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
3986 # Add sub-operation, return the index of the added sub-operation
3987 # Optionally, set operationState, detailed-status, and operationType
3988 # Status and type are currently set for 'scale' sub-operations:
3989 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3990 # 'detailed-status' : status message
3991 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3992 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3993 def _add_suboperation(
4001 mapped_primitive_params
,
4002 operationState
=None,
4003 detailed_status
=None,
4006 RO_scaling_info
=None,
4009 return self
.SUBOPERATION_STATUS_NOT_FOUND
4010 # Get the "_admin.operations" list, if it exists
4011 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
4012 op_list
= db_nslcmop_admin
.get("operations")
4013 # Create or append to the "_admin.operations" list
4015 "member_vnf_index": vnf_index
,
4017 "vdu_count_index": vdu_count_index
,
4018 "primitive": primitive
,
4019 "primitive_params": mapped_primitive_params
,
4022 new_op
["operationState"] = operationState
4024 new_op
["detailed-status"] = detailed_status
4026 new_op
["lcmOperationType"] = operationType
4028 new_op
["RO_nsr_id"] = RO_nsr_id
4030 new_op
["RO_scaling_info"] = RO_scaling_info
4032 # No existing operations, create key 'operations' with current operation as first list element
4033 db_nslcmop_admin
.update({"operations": [new_op
]})
4034 op_list
= db_nslcmop_admin
.get("operations")
4036 # Existing operations, append operation to list
4037 op_list
.append(new_op
)
4039 db_nslcmop_update
= {"_admin.operations": op_list
}
4040 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
4041 op_index
= len(op_list
) - 1
4044 # Helper methods for scale() sub-operations
4046 # pre-scale/post-scale:
4047 # Check for 3 different cases:
4048 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
4049 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
4050 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
4051 def _check_or_add_scale_suboperation(
4055 vnf_config_primitive
,
4059 RO_scaling_info
=None,
4061 # Find this sub-operation
4062 if RO_nsr_id
and RO_scaling_info
:
4063 operationType
= "SCALE-RO"
4065 "member_vnf_index": vnf_index
,
4066 "RO_nsr_id": RO_nsr_id
,
4067 "RO_scaling_info": RO_scaling_info
,
4071 "member_vnf_index": vnf_index
,
4072 "primitive": vnf_config_primitive
,
4073 "primitive_params": primitive_params
,
4074 "lcmOperationType": operationType
,
4076 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
4077 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
4078 # a. New sub-operation
4079 # The sub-operation does not exist, add it.
4080 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
4081 # The following parameters are set to None for all kind of scaling:
4083 vdu_count_index
= None
4085 if RO_nsr_id
and RO_scaling_info
:
4086 vnf_config_primitive
= None
4087 primitive_params
= None
4090 RO_scaling_info
= None
4091 # Initial status for sub-operation
4092 operationState
= "PROCESSING"
4093 detailed_status
= "In progress"
4094 # Add sub-operation for pre/post-scaling (zero or more operations)
4095 self
._add
_suboperation
(
4101 vnf_config_primitive
,
4109 return self
.SUBOPERATION_STATUS_NEW
4111 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
4112 # or op_index (operationState != 'COMPLETED')
4113 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
4115 # Function to return execution_environment id
4117 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
4118 # TODO vdu_index_count
4119 for vca
in vca_deployed_list
:
4120 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
4123 async def destroy_N2VC(
4131 exec_primitives
=True,
4136 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
4137 :param logging_text:
4139 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
4140 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
4141 :param vca_index: index in the database _admin.deployed.VCA
4142 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
4143 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
4144 not executed properly
4145 :param scaling_in: True destroys the application, False destroys the model
4146 :return: None or exception
4151 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
4152 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
4156 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
4158 # execute terminate_primitives
4160 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
4161 config_descriptor
.get("terminate-config-primitive"),
4162 vca_deployed
.get("ee_descriptor_id"),
4164 vdu_id
= vca_deployed
.get("vdu_id")
4165 vdu_count_index
= vca_deployed
.get("vdu_count_index")
4166 vdu_name
= vca_deployed
.get("vdu_name")
4167 vnf_index
= vca_deployed
.get("member-vnf-index")
4168 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
4169 for seq
in terminate_primitives
:
4170 # For each sequence in list, get primitive and call _ns_execute_primitive()
4171 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
4172 vnf_index
, seq
.get("name")
4174 self
.logger
.debug(logging_text
+ step
)
4175 # Create the primitive for each sequence, i.e. "primitive": "touch"
4176 primitive
= seq
.get("name")
4177 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
4182 self
._add
_suboperation
(
4189 mapped_primitive_params
,
4191 # Sub-operations: Call _ns_execute_primitive() instead of action()
4193 result
, result_detail
= await self
._ns
_execute
_primitive
(
4194 vca_deployed
["ee_id"],
4196 mapped_primitive_params
,
4200 except LcmException
:
4201 # this happens when VCA is not deployed. In this case it is not needed to terminate
4203 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
4204 if result
not in result_ok
:
4206 "terminate_primitive {} for vnf_member_index={} fails with "
4207 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
4209 # set that this VCA do not need terminated
4210 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
4214 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
4217 # Delete Prometheus Jobs if any
4218 # This uses NSR_ID, so it will destroy any jobs under this index
4219 self
.db
.del_list("prometheus_jobs", {"nsr_id": db_nslcmop
["nsInstanceId"]})
4222 await self
.vca_map
[vca_type
].delete_execution_environment(
4223 vca_deployed
["ee_id"],
4224 scaling_in
=scaling_in
,
4229 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
4230 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
4231 namespace
= "." + db_nsr
["_id"]
4233 await self
.n2vc
.delete_namespace(
4234 namespace
=namespace
,
4235 total_timeout
=self
.timeout_charm_delete
,
4238 except N2VCNotFound
: # already deleted. Skip
4240 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
4242 async def _terminate_RO(
4243 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4246 Terminates a deployment from RO
4247 :param logging_text:
4248 :param nsr_deployed: db_nsr._admin.deployed
4251 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
4252 this method will update only the index 2, but it will write on database the concatenated content of the list
4257 ro_nsr_id
= ro_delete_action
= None
4258 if nsr_deployed
and nsr_deployed
.get("RO"):
4259 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
4260 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
4263 stage
[2] = "Deleting ns from VIM."
4264 db_nsr_update
["detailed-status"] = " ".join(stage
)
4265 self
._write
_op
_status
(nslcmop_id
, stage
)
4266 self
.logger
.debug(logging_text
+ stage
[2])
4267 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4268 self
._write
_op
_status
(nslcmop_id
, stage
)
4269 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
4270 ro_delete_action
= desc
["action_id"]
4272 "_admin.deployed.RO.nsr_delete_action_id"
4273 ] = ro_delete_action
4274 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4275 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4276 if ro_delete_action
:
4277 # wait until NS is deleted from VIM
4278 stage
[2] = "Waiting ns deleted from VIM."
4279 detailed_status_old
= None
4283 + " RO_id={} ro_delete_action={}".format(
4284 ro_nsr_id
, ro_delete_action
4287 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4288 self
._write
_op
_status
(nslcmop_id
, stage
)
4290 delete_timeout
= 20 * 60 # 20 minutes
4291 while delete_timeout
> 0:
4292 desc
= await self
.RO
.show(
4294 item_id_name
=ro_nsr_id
,
4295 extra_item
="action",
4296 extra_item_id
=ro_delete_action
,
4300 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
4302 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
4303 if ns_status
== "ERROR":
4304 raise ROclient
.ROClientException(ns_status_info
)
4305 elif ns_status
== "BUILD":
4306 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
4307 elif ns_status
== "ACTIVE":
4308 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4309 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4314 ), "ROclient.check_action_status returns unknown {}".format(
4317 if stage
[2] != detailed_status_old
:
4318 detailed_status_old
= stage
[2]
4319 db_nsr_update
["detailed-status"] = " ".join(stage
)
4320 self
._write
_op
_status
(nslcmop_id
, stage
)
4321 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4322 await asyncio
.sleep(5, loop
=self
.loop
)
4324 else: # delete_timeout <= 0:
4325 raise ROclient
.ROClientException(
4326 "Timeout waiting ns deleted from VIM"
4329 except Exception as e
:
4330 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4332 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4334 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4335 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4336 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4338 logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
)
4341 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4343 failed_detail
.append("delete conflict: {}".format(e
))
4346 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
)
4349 failed_detail
.append("delete error: {}".format(e
))
4351 logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
)
4355 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
4356 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
4358 stage
[2] = "Deleting nsd from RO."
4359 db_nsr_update
["detailed-status"] = " ".join(stage
)
4360 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4361 self
._write
_op
_status
(nslcmop_id
, stage
)
4362 await self
.RO
.delete("nsd", ro_nsd_id
)
4364 logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
)
4366 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4367 except Exception as e
:
4369 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4371 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4373 logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
)
4376 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4378 failed_detail
.append(
4379 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
)
4381 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4383 failed_detail
.append(
4384 "ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
)
4386 self
.logger
.error(logging_text
+ failed_detail
[-1])
4388 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
4389 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
4390 if not vnf_deployed
or not vnf_deployed
["id"]:
4393 ro_vnfd_id
= vnf_deployed
["id"]
4396 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4397 vnf_deployed
["member-vnf-index"], ro_vnfd_id
4399 db_nsr_update
["detailed-status"] = " ".join(stage
)
4400 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4401 self
._write
_op
_status
(nslcmop_id
, stage
)
4402 await self
.RO
.delete("vnfd", ro_vnfd_id
)
4404 logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
)
4406 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
4407 except Exception as e
:
4409 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4412 "_admin.deployed.RO.vnfd.{}.id".format(index
)
4416 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
)
4419 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4421 failed_detail
.append(
4422 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
)
4424 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4426 failed_detail
.append(
4427 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
)
4429 self
.logger
.error(logging_text
+ failed_detail
[-1])
4432 stage
[2] = "Error deleting from VIM"
4434 stage
[2] = "Deleted from VIM"
4435 db_nsr_update
["detailed-status"] = " ".join(stage
)
4436 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4437 self
._write
_op
_status
(nslcmop_id
, stage
)
4440 raise LcmException("; ".join(failed_detail
))
4442 async def terminate(self
, nsr_id
, nslcmop_id
):
4443 # Try to lock HA task here
4444 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4445 if not task_is_locked_by_me
:
4448 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4449 self
.logger
.debug(logging_text
+ "Enter")
4450 timeout_ns_terminate
= self
.timeout_ns_terminate
4453 operation_params
= None
4455 error_list
= [] # annotates all failed error messages
4456 db_nslcmop_update
= {}
4457 autoremove
= False # autoremove after terminated
4458 tasks_dict_info
= {}
4461 "Stage 1/3: Preparing task.",
4462 "Waiting for previous operations to terminate.",
4465 # ^ contains [stage, step, VIM-status]
4467 # wait for any previous tasks in process
4468 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4470 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4471 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4472 operation_params
= db_nslcmop
.get("operationParams") or {}
4473 if operation_params
.get("timeout_ns_terminate"):
4474 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4475 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4476 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4478 db_nsr_update
["operational-status"] = "terminating"
4479 db_nsr_update
["config-status"] = "terminating"
4480 self
._write
_ns
_status
(
4482 ns_state
="TERMINATING",
4483 current_operation
="TERMINATING",
4484 current_operation_id
=nslcmop_id
,
4485 other_update
=db_nsr_update
,
4487 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4488 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4489 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4492 stage
[1] = "Getting vnf descriptors from db."
4493 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4495 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4497 db_vnfds_from_id
= {}
4498 db_vnfds_from_member_index
= {}
4500 for vnfr
in db_vnfrs_list
:
4501 vnfd_id
= vnfr
["vnfd-id"]
4502 if vnfd_id
not in db_vnfds_from_id
:
4503 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4504 db_vnfds_from_id
[vnfd_id
] = vnfd
4505 db_vnfds_from_member_index
[
4506 vnfr
["member-vnf-index-ref"]
4507 ] = db_vnfds_from_id
[vnfd_id
]
4509 # Destroy individual execution environments when there are terminating primitives.
4510 # Rest of EE will be deleted at once
4511 # TODO - check before calling _destroy_N2VC
4512 # if not operation_params.get("skip_terminate_primitives"):#
4513 # or not vca.get("needed_terminate"):
4514 stage
[0] = "Stage 2/3 execute terminating primitives."
4515 self
.logger
.debug(logging_text
+ stage
[0])
4516 stage
[1] = "Looking execution environment that needs terminate."
4517 self
.logger
.debug(logging_text
+ stage
[1])
4519 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4520 config_descriptor
= None
4521 vca_member_vnf_index
= vca
.get("member-vnf-index")
4522 vca_id
= self
.get_vca_id(
4523 db_vnfrs_dict
.get(vca_member_vnf_index
)
4524 if vca_member_vnf_index
4528 if not vca
or not vca
.get("ee_id"):
4530 if not vca
.get("member-vnf-index"):
4532 config_descriptor
= db_nsr
.get("ns-configuration")
4533 elif vca
.get("vdu_id"):
4534 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4535 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4536 elif vca
.get("kdu_name"):
4537 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4538 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4540 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4541 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4542 vca_type
= vca
.get("type")
4543 exec_terminate_primitives
= not operation_params
.get(
4544 "skip_terminate_primitives"
4545 ) and vca
.get("needed_terminate")
4546 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4547 # pending native charms
4549 True if vca_type
in ("helm", "helm-v3", "native_charm") else False
4551 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4552 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4553 task
= asyncio
.ensure_future(
4561 exec_terminate_primitives
,
4565 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4567 # wait for pending tasks of terminate primitives
4571 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4573 error_list
= await self
._wait
_for
_tasks
(
4576 min(self
.timeout_charm_delete
, timeout_ns_terminate
),
4580 tasks_dict_info
.clear()
4582 return # raise LcmException("; ".join(error_list))
4584 # remove All execution environments at once
4585 stage
[0] = "Stage 3/3 delete all."
4587 if nsr_deployed
.get("VCA"):
4588 stage
[1] = "Deleting all execution environments."
4589 self
.logger
.debug(logging_text
+ stage
[1])
4590 vca_id
= self
.get_vca_id({}, db_nsr
)
4591 task_delete_ee
= asyncio
.ensure_future(
4593 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4594 timeout
=self
.timeout_charm_delete
,
4597 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4598 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4600 # Delete from k8scluster
4601 stage
[1] = "Deleting KDUs."
4602 self
.logger
.debug(logging_text
+ stage
[1])
4603 # print(nsr_deployed)
4604 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4605 if not kdu
or not kdu
.get("kdu-instance"):
4607 kdu_instance
= kdu
.get("kdu-instance")
4608 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4609 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4610 vca_id
= self
.get_vca_id({}, db_nsr
)
4611 task_delete_kdu_instance
= asyncio
.ensure_future(
4612 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4613 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4614 kdu_instance
=kdu_instance
,
4616 namespace
=kdu
.get("namespace"),
4622 + "Unknown k8s deployment type {}".format(
4623 kdu
.get("k8scluster-type")
4628 task_delete_kdu_instance
4629 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4632 stage
[1] = "Deleting ns from VIM."
4634 task_delete_ro
= asyncio
.ensure_future(
4635 self
._terminate
_ng
_ro
(
4636 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4640 task_delete_ro
= asyncio
.ensure_future(
4642 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4645 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4647 # rest of staff will be done at finally
4650 ROclient
.ROClientException
,
4655 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4657 except asyncio
.CancelledError
:
4659 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4661 exc
= "Operation was cancelled"
4662 except Exception as e
:
4663 exc
= traceback
.format_exc()
4664 self
.logger
.critical(
4665 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4670 error_list
.append(str(exc
))
4672 # wait for pending tasks
4674 stage
[1] = "Waiting for terminate pending tasks."
4675 self
.logger
.debug(logging_text
+ stage
[1])
4676 error_list
+= await self
._wait
_for
_tasks
(
4679 timeout_ns_terminate
,
4683 stage
[1] = stage
[2] = ""
4684 except asyncio
.CancelledError
:
4685 error_list
.append("Cancelled")
4686 # TODO cancell all tasks
4687 except Exception as exc
:
4688 error_list
.append(str(exc
))
4689 # update status at database
4691 error_detail
= "; ".join(error_list
)
4692 # self.logger.error(logging_text + error_detail)
4693 error_description_nslcmop
= "{} Detail: {}".format(
4694 stage
[0], error_detail
4696 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4697 nslcmop_id
, stage
[0]
4700 db_nsr_update
["operational-status"] = "failed"
4701 db_nsr_update
["detailed-status"] = (
4702 error_description_nsr
+ " Detail: " + error_detail
4704 db_nslcmop_update
["detailed-status"] = error_detail
4705 nslcmop_operation_state
= "FAILED"
4709 error_description_nsr
= error_description_nslcmop
= None
4710 ns_state
= "NOT_INSTANTIATED"
4711 db_nsr_update
["operational-status"] = "terminated"
4712 db_nsr_update
["detailed-status"] = "Done"
4713 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4714 db_nslcmop_update
["detailed-status"] = "Done"
4715 nslcmop_operation_state
= "COMPLETED"
4718 self
._write
_ns
_status
(
4721 current_operation
="IDLE",
4722 current_operation_id
=None,
4723 error_description
=error_description_nsr
,
4724 error_detail
=error_detail
,
4725 other_update
=db_nsr_update
,
4727 self
._write
_op
_status
(
4730 error_message
=error_description_nslcmop
,
4731 operation_state
=nslcmop_operation_state
,
4732 other_update
=db_nslcmop_update
,
4734 if ns_state
== "NOT_INSTANTIATED":
4738 {"nsr-id-ref": nsr_id
},
4739 {"_admin.nsState": "NOT_INSTANTIATED"},
4741 except DbException
as e
:
4744 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4748 if operation_params
:
4749 autoremove
= operation_params
.get("autoremove", False)
4750 if nslcmop_operation_state
:
4752 await self
.msg
.aiowrite(
4757 "nslcmop_id": nslcmop_id
,
4758 "operationState": nslcmop_operation_state
,
4759 "autoremove": autoremove
,
4763 except Exception as e
:
4765 logging_text
+ "kafka_write notification Exception {}".format(e
)
4768 self
.logger
.debug(logging_text
+ "Exit")
4769 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4771 async def _wait_for_tasks(
4772 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4775 error_detail_list
= []
4777 pending_tasks
= list(created_tasks_info
.keys())
4778 num_tasks
= len(pending_tasks
)
4780 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4781 self
._write
_op
_status
(nslcmop_id
, stage
)
4782 while pending_tasks
:
4784 _timeout
= timeout
+ time_start
- time()
4785 done
, pending_tasks
= await asyncio
.wait(
4786 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4788 num_done
+= len(done
)
4789 if not done
: # Timeout
4790 for task
in pending_tasks
:
4791 new_error
= created_tasks_info
[task
] + ": Timeout"
4792 error_detail_list
.append(new_error
)
4793 error_list
.append(new_error
)
4796 if task
.cancelled():
4799 exc
= task
.exception()
4801 if isinstance(exc
, asyncio
.TimeoutError
):
4803 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4804 error_list
.append(created_tasks_info
[task
])
4805 error_detail_list
.append(new_error
)
4812 ROclient
.ROClientException
,
4818 self
.logger
.error(logging_text
+ new_error
)
4820 exc_traceback
= "".join(
4821 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4825 + created_tasks_info
[task
]
4831 logging_text
+ created_tasks_info
[task
] + ": Done"
4833 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4835 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4836 if nsr_id
: # update also nsr
4841 "errorDescription": "Error at: " + ", ".join(error_list
),
4842 "errorDetail": ". ".join(error_detail_list
),
4845 self
._write
_op
_status
(nslcmop_id
, stage
)
4846 return error_detail_list
4849 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4851 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4852 The default-value is used. If it is between < > it look for a value at instantiation_params
4853 :param primitive_desc: portion of VNFD/NSD that describes primitive
4854 :param params: Params provided by user
4855 :param instantiation_params: Instantiation params provided by user
4856 :return: a dictionary with the calculated params
4858 calculated_params
= {}
4859 for parameter
in primitive_desc
.get("parameter", ()):
4860 param_name
= parameter
["name"]
4861 if param_name
in params
:
4862 calculated_params
[param_name
] = params
[param_name
]
4863 elif "default-value" in parameter
or "value" in parameter
:
4864 if "value" in parameter
:
4865 calculated_params
[param_name
] = parameter
["value"]
4867 calculated_params
[param_name
] = parameter
["default-value"]
4869 isinstance(calculated_params
[param_name
], str)
4870 and calculated_params
[param_name
].startswith("<")
4871 and calculated_params
[param_name
].endswith(">")
4873 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4874 calculated_params
[param_name
] = instantiation_params
[
4875 calculated_params
[param_name
][1:-1]
4879 "Parameter {} needed to execute primitive {} not provided".format(
4880 calculated_params
[param_name
], primitive_desc
["name"]
4885 "Parameter {} needed to execute primitive {} not provided".format(
4886 param_name
, primitive_desc
["name"]
4890 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4891 calculated_params
[param_name
] = yaml
.safe_dump(
4892 calculated_params
[param_name
], default_flow_style
=True, width
=256
4894 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4896 ].startswith("!!yaml "):
4897 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4898 if parameter
.get("data-type") == "INTEGER":
4900 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4901 except ValueError: # error converting string to int
4903 "Parameter {} of primitive {} must be integer".format(
4904 param_name
, primitive_desc
["name"]
4907 elif parameter
.get("data-type") == "BOOLEAN":
4908 calculated_params
[param_name
] = not (
4909 (str(calculated_params
[param_name
])).lower() == "false"
4912 # add always ns_config_info if primitive name is config
4913 if primitive_desc
["name"] == "config":
4914 if "ns_config_info" in instantiation_params
:
4915 calculated_params
["ns_config_info"] = instantiation_params
[
4918 return calculated_params
4920 def _look_for_deployed_vca(
4927 ee_descriptor_id
=None,
4929 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4930 for vca
in deployed_vca
:
4933 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4936 vdu_count_index
is not None
4937 and vdu_count_index
!= vca
["vdu_count_index"]
4940 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4942 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4946 # vca_deployed not found
4948 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4949 " is not deployed".format(
4958 ee_id
= vca
.get("ee_id")
4960 "type", "lxc_proxy_charm"
4961 ) # default value for backward compatibility - proxy charm
4964 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4965 "execution environment".format(
4966 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
4969 return ee_id
, vca_type
4971 async def _ns_execute_primitive(
4977 retries_interval
=30,
4984 if primitive
== "config":
4985 primitive_params
= {"params": primitive_params
}
4987 vca_type
= vca_type
or "lxc_proxy_charm"
4991 output
= await asyncio
.wait_for(
4992 self
.vca_map
[vca_type
].exec_primitive(
4994 primitive_name
=primitive
,
4995 params_dict
=primitive_params
,
4996 progress_timeout
=self
.timeout_progress_primitive
,
4997 total_timeout
=self
.timeout_primitive
,
5002 timeout
=timeout
or self
.timeout_primitive
,
5006 except asyncio
.CancelledError
:
5008 except Exception as e
:
5012 "Error executing action {} on {} -> {}".format(
5017 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
5019 if isinstance(e
, asyncio
.TimeoutError
):
5021 message
="Timed out waiting for action to complete"
5023 return "FAILED", getattr(e
, "message", repr(e
))
5025 return "COMPLETED", output
5027 except (LcmException
, asyncio
.CancelledError
):
5029 except Exception as e
:
5030 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
5032 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
5034 Updating the vca_status with latest juju information in nsrs record
5035 :param: nsr_id: Id of the nsr
5036 :param: nslcmop_id: Id of the nslcmop
5040 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
5041 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5042 vca_id
= self
.get_vca_id({}, db_nsr
)
5043 if db_nsr
["_admin"]["deployed"]["K8s"]:
5044 for _
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
5045 cluster_uuid
, kdu_instance
, cluster_type
= (
5046 k8s
["k8scluster-uuid"],
5047 k8s
["kdu-instance"],
5048 k8s
["k8scluster-type"],
5050 await self
._on
_update
_k
8s
_db
(
5051 cluster_uuid
=cluster_uuid
,
5052 kdu_instance
=kdu_instance
,
5053 filter={"_id": nsr_id
},
5055 cluster_type
=cluster_type
,
5058 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
5059 table
, filter = "nsrs", {"_id": nsr_id
}
5060 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
5061 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
5063 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
5064 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
5066 async def action(self
, nsr_id
, nslcmop_id
):
5067 # Try to lock HA task here
5068 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5069 if not task_is_locked_by_me
:
5072 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
5073 self
.logger
.debug(logging_text
+ "Enter")
5074 # get all needed from database
5078 db_nslcmop_update
= {}
5079 nslcmop_operation_state
= None
5080 error_description_nslcmop
= None
5083 # wait for any previous tasks in process
5084 step
= "Waiting for previous operations to terminate"
5085 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5087 self
._write
_ns
_status
(
5090 current_operation
="RUNNING ACTION",
5091 current_operation_id
=nslcmop_id
,
5094 step
= "Getting information from database"
5095 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5096 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5097 if db_nslcmop
["operationParams"].get("primitive_params"):
5098 db_nslcmop
["operationParams"]["primitive_params"] = json
.loads(
5099 db_nslcmop
["operationParams"]["primitive_params"]
5102 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5103 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
5104 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
5105 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
5106 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
5107 primitive
= db_nslcmop
["operationParams"]["primitive"]
5108 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
5109 timeout_ns_action
= db_nslcmop
["operationParams"].get(
5110 "timeout_ns_action", self
.timeout_primitive
5114 step
= "Getting vnfr from database"
5115 db_vnfr
= self
.db
.get_one(
5116 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
5118 if db_vnfr
.get("kdur"):
5120 for kdur
in db_vnfr
["kdur"]:
5121 if kdur
.get("additionalParams"):
5122 kdur
["additionalParams"] = json
.loads(
5123 kdur
["additionalParams"]
5125 kdur_list
.append(kdur
)
5126 db_vnfr
["kdur"] = kdur_list
5127 step
= "Getting vnfd from database"
5128 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
5130 # Sync filesystem before running a primitive
5131 self
.fs
.sync(db_vnfr
["vnfd-id"])
5133 step
= "Getting nsd from database"
5134 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
5136 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5137 # for backward compatibility
5138 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5139 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5140 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5141 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5143 # look for primitive
5144 config_primitive_desc
= descriptor_configuration
= None
5146 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
5148 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
5150 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
5152 descriptor_configuration
= db_nsd
.get("ns-configuration")
5154 if descriptor_configuration
and descriptor_configuration
.get(
5157 for config_primitive
in descriptor_configuration
["config-primitive"]:
5158 if config_primitive
["name"] == primitive
:
5159 config_primitive_desc
= config_primitive
5162 if not config_primitive_desc
:
5163 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
5165 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
5169 primitive_name
= primitive
5170 ee_descriptor_id
= None
5172 primitive_name
= config_primitive_desc
.get(
5173 "execution-environment-primitive", primitive
5175 ee_descriptor_id
= config_primitive_desc
.get(
5176 "execution-environment-ref"
5182 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
5184 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
5187 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
5189 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
5191 desc_params
= parse_yaml_strings(
5192 db_vnfr
.get("additionalParamsForVnf")
5195 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
5196 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
5197 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
5199 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
5200 actions
.add(primitive
["name"])
5201 for primitive
in kdu_configuration
.get("config-primitive", []):
5202 actions
.add(primitive
["name"])
5204 nsr_deployed
["K8s"],
5205 lambda kdu
: kdu_name
== kdu
["kdu-name"]
5206 and kdu
["member-vnf-index"] == vnf_index
,
5210 if primitive_name
in actions
5211 and kdu
["k8scluster-type"] not in ("helm-chart", "helm-chart-v3")
5215 # TODO check if ns is in a proper status
5217 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
5219 # kdur and desc_params already set from before
5220 if primitive_params
:
5221 desc_params
.update(primitive_params
)
5222 # TODO Check if we will need something at vnf level
5223 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
5225 kdu_name
== kdu
["kdu-name"]
5226 and kdu
["member-vnf-index"] == vnf_index
5231 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
5234 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
5235 msg
= "unknown k8scluster-type '{}'".format(
5236 kdu
.get("k8scluster-type")
5238 raise LcmException(msg
)
5241 "collection": "nsrs",
5242 "filter": {"_id": nsr_id
},
5243 "path": "_admin.deployed.K8s.{}".format(index
),
5247 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
5249 step
= "Executing kdu {}".format(primitive_name
)
5250 if primitive_name
== "upgrade":
5251 if desc_params
.get("kdu_model"):
5252 kdu_model
= desc_params
.get("kdu_model")
5253 del desc_params
["kdu_model"]
5255 kdu_model
= kdu
.get("kdu-model")
5256 parts
= kdu_model
.split(sep
=":")
5258 kdu_model
= parts
[0]
5260 detailed_status
= await asyncio
.wait_for(
5261 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
5262 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5263 kdu_instance
=kdu
.get("kdu-instance"),
5265 kdu_model
=kdu_model
,
5268 timeout
=timeout_ns_action
,
5270 timeout
=timeout_ns_action
+ 10,
5273 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
5275 elif primitive_name
== "rollback":
5276 detailed_status
= await asyncio
.wait_for(
5277 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
5278 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5279 kdu_instance
=kdu
.get("kdu-instance"),
5282 timeout
=timeout_ns_action
,
5284 elif primitive_name
== "status":
5285 detailed_status
= await asyncio
.wait_for(
5286 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
5287 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5288 kdu_instance
=kdu
.get("kdu-instance"),
5291 timeout
=timeout_ns_action
,
5294 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
5295 kdu
["kdu-name"], nsr_id
5297 params
= self
._map
_primitive
_params
(
5298 config_primitive_desc
, primitive_params
, desc_params
5301 detailed_status
= await asyncio
.wait_for(
5302 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
5303 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5304 kdu_instance
=kdu_instance
,
5305 primitive_name
=primitive_name
,
5308 timeout
=timeout_ns_action
,
5311 timeout
=timeout_ns_action
,
5315 nslcmop_operation_state
= "COMPLETED"
5317 detailed_status
= ""
5318 nslcmop_operation_state
= "FAILED"
5320 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5321 nsr_deployed
["VCA"],
5322 member_vnf_index
=vnf_index
,
5324 vdu_count_index
=vdu_count_index
,
5325 ee_descriptor_id
=ee_descriptor_id
,
5327 for vca_index
, vca_deployed
in enumerate(
5328 db_nsr
["_admin"]["deployed"]["VCA"]
5330 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5332 "collection": "nsrs",
5333 "filter": {"_id": nsr_id
},
5334 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
5338 nslcmop_operation_state
,
5340 ) = await self
._ns
_execute
_primitive
(
5342 primitive
=primitive_name
,
5343 primitive_params
=self
._map
_primitive
_params
(
5344 config_primitive_desc
, primitive_params
, desc_params
5346 timeout
=timeout_ns_action
,
5352 db_nslcmop_update
["detailed-status"] = detailed_status
5353 error_description_nslcmop
= (
5354 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
5358 + "Done with result {} {}".format(
5359 nslcmop_operation_state
, detailed_status
5362 return # database update is called inside finally
5364 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5365 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5367 except asyncio
.CancelledError
:
5369 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5371 exc
= "Operation was cancelled"
5372 except asyncio
.TimeoutError
:
5373 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5375 except Exception as e
:
5376 exc
= traceback
.format_exc()
5377 self
.logger
.critical(
5378 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5387 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5388 nslcmop_operation_state
= "FAILED"
5390 self
._write
_ns
_status
(
5394 ], # TODO check if degraded. For the moment use previous status
5395 current_operation
="IDLE",
5396 current_operation_id
=None,
5397 # error_description=error_description_nsr,
5398 # error_detail=error_detail,
5399 other_update
=db_nsr_update
,
5402 self
._write
_op
_status
(
5405 error_message
=error_description_nslcmop
,
5406 operation_state
=nslcmop_operation_state
,
5407 other_update
=db_nslcmop_update
,
5410 if nslcmop_operation_state
:
5412 await self
.msg
.aiowrite(
5417 "nslcmop_id": nslcmop_id
,
5418 "operationState": nslcmop_operation_state
,
5422 except Exception as e
:
5424 logging_text
+ "kafka_write notification Exception {}".format(e
)
5426 self
.logger
.debug(logging_text
+ "Exit")
5427 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5428 return nslcmop_operation_state
, detailed_status
5430 async def terminate_vdus(
5431 self
, db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
5433 """This method terminates VDUs
5436 db_vnfr: VNF instance record
5437 member_vnf_index: VNF index to identify the VDUs to be removed
5438 db_nsr: NS instance record
5439 update_db_nslcmops: Nslcmop update record
5441 vca_scaling_info
= []
5442 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5443 scaling_info
["scaling_direction"] = "IN"
5444 scaling_info
["vdu-delete"] = {}
5445 scaling_info
["kdu-delete"] = {}
5446 db_vdur
= db_vnfr
.get("vdur")
5447 vdur_list
= copy(db_vdur
)
5449 for index
, vdu
in enumerate(vdur_list
):
5450 vca_scaling_info
.append(
5452 "osm_vdu_id": vdu
["vdu-id-ref"],
5453 "member-vnf-index": member_vnf_index
,
5455 "vdu_index": count_index
,
5458 scaling_info
["vdu-delete"][vdu
["vdu-id-ref"]] = count_index
5459 scaling_info
["vdu"].append(
5461 "name": vdu
.get("name") or vdu
.get("vdu-name"),
5462 "vdu_id": vdu
["vdu-id-ref"],
5466 for interface
in vdu
["interfaces"]:
5467 scaling_info
["vdu"][index
]["interface"].append(
5469 "name": interface
["name"],
5470 "ip_address": interface
["ip-address"],
5471 "mac_address": interface
.get("mac-address"),
5474 self
.logger
.info("NS update scaling info{}".format(scaling_info
))
5475 stage
[2] = "Terminating VDUs"
5476 if scaling_info
.get("vdu-delete"):
5477 # scale_process = "RO"
5478 if self
.ro_config
.get("ng"):
5479 await self
._scale
_ng
_ro
(
5488 async def remove_vnf(self
, nsr_id
, nslcmop_id
, vnf_instance_id
):
5489 """This method is to Remove VNF instances from NS.
5492 nsr_id: NS instance id
5493 nslcmop_id: nslcmop id of update
5494 vnf_instance_id: id of the VNF instance to be removed
5497 result: (str, str) COMPLETED/FAILED, details
5501 logging_text
= "Task ns={} update ".format(nsr_id
)
5502 check_vnfr_count
= len(self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}))
5503 self
.logger
.info("check_vnfr_count {}".format(check_vnfr_count
))
5504 if check_vnfr_count
> 1:
5505 stage
= ["", "", ""]
5506 step
= "Getting nslcmop from database"
5508 step
+ " after having waited for previous tasks to be completed"
5510 # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5511 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5512 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5513 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5514 """ db_vnfr = self.db.get_one(
5515 "vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id}) """
5517 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5518 await self
.terminate_vdus(
5527 constituent_vnfr
= db_nsr
.get("constituent-vnfr-ref")
5528 constituent_vnfr
.remove(db_vnfr
.get("_id"))
5529 db_nsr_update
["constituent-vnfr-ref"] = db_nsr
.get(
5530 "constituent-vnfr-ref"
5532 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5533 self
.db
.del_one("vnfrs", {"_id": db_vnfr
.get("_id")})
5534 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5535 return "COMPLETED", "Done"
5537 step
= "Terminate VNF Failed with"
5539 "{} Cannot terminate the last VNF in this NS.".format(
5543 except (LcmException
, asyncio
.CancelledError
):
5545 except Exception as e
:
5546 self
.logger
.debug("Error removing VNF {}".format(e
))
5547 return "FAILED", "Error removing VNF {}".format(e
)
5549 async def _ns_redeploy_vnf(
5557 """This method updates and redeploys VNF instances
5560 nsr_id: NS instance id
5561 nslcmop_id: nslcmop id
5562 db_vnfd: VNF descriptor
5563 db_vnfr: VNF instance record
5564 db_nsr: NS instance record
5567 result: (str, str) COMPLETED/FAILED, details
5571 stage
= ["", "", ""]
5572 logging_text
= "Task ns={} update ".format(nsr_id
)
5573 latest_vnfd_revision
= db_vnfd
["_admin"].get("revision")
5574 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5576 # Terminate old VNF resources
5577 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5578 await self
.terminate_vdus(
5587 # old_vnfd_id = db_vnfr["vnfd-id"]
5588 # new_db_vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
5589 new_db_vnfd
= db_vnfd
5590 # new_vnfd_ref = new_db_vnfd["id"]
5591 # new_vnfd_id = vnfd_id
5595 for cp
in new_db_vnfd
.get("ext-cpd", ()):
5597 "name": cp
.get("id"),
5598 "connection-point-id": cp
.get("int-cpd", {}).get("cpd"),
5599 "connection-point-vdu-id": cp
.get("int-cpd", {}).get("vdu-id"),
5602 new_vnfr_cp
.append(vnf_cp
)
5603 new_vdur
= update_db_nslcmops
["operationParams"]["newVdur"]
5604 # new_vdur = self._create_vdur_descriptor_from_vnfd(db_nsd, db_vnfd, old_db_vnfd, vnfd_id, db_nsr, member_vnf_index)
5605 # new_vnfr_update = {"vnfd-ref": new_vnfd_ref, "vnfd-id": new_vnfd_id, "connection-point": new_vnfr_cp, "vdur": new_vdur, "ip-address": ""}
5607 "revision": latest_vnfd_revision
,
5608 "connection-point": new_vnfr_cp
,
5612 self
.update_db_2("vnfrs", db_vnfr
["_id"], new_vnfr_update
)
5613 updated_db_vnfr
= self
.db
.get_one(
5615 {"member-vnf-index-ref": member_vnf_index
, "nsr-id-ref": nsr_id
},
5618 # Instantiate new VNF resources
5619 # update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5620 vca_scaling_info
= []
5621 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5622 scaling_info
["scaling_direction"] = "OUT"
5623 scaling_info
["vdu-create"] = {}
5624 scaling_info
["kdu-create"] = {}
5625 vdud_instantiate_list
= db_vnfd
["vdu"]
5626 for index
, vdud
in enumerate(vdud_instantiate_list
):
5627 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(vdud
, db_vnfd
)
5629 additional_params
= (
5630 self
._get
_vdu
_additional
_params
(updated_db_vnfr
, vdud
["id"])
5633 cloud_init_list
= []
5635 # TODO Information of its own ip is not available because db_vnfr is not updated.
5636 additional_params
["OSM"] = get_osm_params(
5637 updated_db_vnfr
, vdud
["id"], 1
5639 cloud_init_list
.append(
5640 self
._parse
_cloud
_init
(
5647 vca_scaling_info
.append(
5649 "osm_vdu_id": vdud
["id"],
5650 "member-vnf-index": member_vnf_index
,
5652 "vdu_index": count_index
,
5655 scaling_info
["vdu-create"][vdud
["id"]] = count_index
5656 if self
.ro_config
.get("ng"):
5658 "New Resources to be deployed: {}".format(scaling_info
)
5660 await self
._scale
_ng
_ro
(
5668 return "COMPLETED", "Done"
5669 except (LcmException
, asyncio
.CancelledError
):
5671 except Exception as e
:
5672 self
.logger
.debug("Error updating VNF {}".format(e
))
5673 return "FAILED", "Error updating VNF {}".format(e
)
5675 async def _ns_charm_upgrade(
5681 timeout
: float = None,
5683 """This method upgrade charms in VNF instances
5686 ee_id: Execution environment id
5687 path: Local path to the charm
5689 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
5690 timeout: (Float) Timeout for the ns update operation
5693 result: (str, str) COMPLETED/FAILED, details
5696 charm_type
= charm_type
or "lxc_proxy_charm"
5697 output
= await self
.vca_map
[charm_type
].upgrade_charm(
5701 charm_type
=charm_type
,
5702 timeout
=timeout
or self
.timeout_ns_update
,
5706 return "COMPLETED", output
5708 except (LcmException
, asyncio
.CancelledError
):
5711 except Exception as e
:
5713 self
.logger
.debug("Error upgrading charm {}".format(path
))
5715 return "FAILED", "Error upgrading charm {}: {}".format(path
, e
)
5717 async def update(self
, nsr_id
, nslcmop_id
):
5718 """Update NS according to different update types
5720 This method performs upgrade of VNF instances then updates the revision
5721 number in VNF record
5724 nsr_id: Network service will be updated
5725 nslcmop_id: ns lcm operation id
5728 It may raise DbException, LcmException, N2VCException, K8sException
5731 # Try to lock HA task here
5732 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5733 if not task_is_locked_by_me
:
5736 logging_text
= "Task ns={} update={} ".format(nsr_id
, nslcmop_id
)
5737 self
.logger
.debug(logging_text
+ "Enter")
5739 # Set the required variables to be filled up later
5741 db_nslcmop_update
= {}
5743 nslcmop_operation_state
= None
5745 error_description_nslcmop
= ""
5747 change_type
= "updated"
5748 detailed_status
= ""
5751 # wait for any previous tasks in process
5752 step
= "Waiting for previous operations to terminate"
5753 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5754 self
._write
_ns
_status
(
5757 current_operation
="UPDATING",
5758 current_operation_id
=nslcmop_id
,
5761 step
= "Getting nslcmop from database"
5762 db_nslcmop
= self
.db
.get_one(
5763 "nslcmops", {"_id": nslcmop_id
}, fail_on_empty
=False
5765 update_type
= db_nslcmop
["operationParams"]["updateType"]
5767 step
= "Getting nsr from database"
5768 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5769 old_operational_status
= db_nsr
["operational-status"]
5770 db_nsr_update
["operational-status"] = "updating"
5771 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5772 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5774 if update_type
== "CHANGE_VNFPKG":
5776 # Get the input parameters given through update request
5777 vnf_instance_id
= db_nslcmop
["operationParams"][
5778 "changeVnfPackageData"
5779 ].get("vnfInstanceId")
5781 vnfd_id
= db_nslcmop
["operationParams"]["changeVnfPackageData"].get(
5784 timeout_seconds
= db_nslcmop
["operationParams"].get("timeout_ns_update")
5786 step
= "Getting vnfr from database"
5787 db_vnfr
= self
.db
.get_one(
5788 "vnfrs", {"_id": vnf_instance_id
}, fail_on_empty
=False
5791 step
= "Getting vnfds from database"
5793 latest_vnfd
= self
.db
.get_one(
5794 "vnfds", {"_id": vnfd_id
}, fail_on_empty
=False
5796 latest_vnfd_revision
= latest_vnfd
["_admin"].get("revision")
5799 current_vnf_revision
= db_vnfr
.get("revision", 1)
5800 current_vnfd
= self
.db
.get_one(
5802 {"_id": vnfd_id
+ ":" + str(current_vnf_revision
)},
5803 fail_on_empty
=False,
5805 # Charm artifact paths will be filled up later
5807 current_charm_artifact_path
,
5808 target_charm_artifact_path
,
5809 charm_artifact_paths
,
5812 step
= "Checking if revision has changed in VNFD"
5813 if current_vnf_revision
!= latest_vnfd_revision
:
5815 change_type
= "policy_updated"
5817 # There is new revision of VNFD, update operation is required
5818 current_vnfd_path
= vnfd_id
+ ":" + str(current_vnf_revision
)
5819 latest_vnfd_path
= vnfd_id
+ ":" + str(latest_vnfd_revision
)
5821 step
= "Removing the VNFD packages if they exist in the local path"
5822 shutil
.rmtree(self
.fs
.path
+ current_vnfd_path
, ignore_errors
=True)
5823 shutil
.rmtree(self
.fs
.path
+ latest_vnfd_path
, ignore_errors
=True)
5825 step
= "Get the VNFD packages from FSMongo"
5826 self
.fs
.sync(from_path
=latest_vnfd_path
)
5827 self
.fs
.sync(from_path
=current_vnfd_path
)
5830 "Get the charm-type, charm-id, ee-id if there is deployed VCA"
5832 base_folder
= latest_vnfd
["_admin"]["storage"]
5834 for charm_index
, charm_deployed
in enumerate(
5835 get_iterable(nsr_deployed
, "VCA")
5837 vnf_index
= db_vnfr
.get("member-vnf-index-ref")
5839 # Getting charm-id and charm-type
5840 if charm_deployed
.get("member-vnf-index") == vnf_index
:
5841 charm_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5842 charm_type
= charm_deployed
.get("type")
5845 ee_id
= charm_deployed
.get("ee_id")
5847 step
= "Getting descriptor config"
5848 descriptor_config
= get_configuration(
5849 current_vnfd
, current_vnfd
["id"]
5852 if "execution-environment-list" in descriptor_config
:
5853 ee_list
= descriptor_config
.get(
5854 "execution-environment-list", []
5859 # There could be several charm used in the same VNF
5860 for ee_item
in ee_list
:
5861 if ee_item
.get("juju"):
5863 step
= "Getting charm name"
5864 charm_name
= ee_item
["juju"].get("charm")
5866 step
= "Setting Charm artifact paths"
5867 current_charm_artifact_path
.append(
5868 get_charm_artifact_path(
5872 current_vnf_revision
,
5875 target_charm_artifact_path
.append(
5876 get_charm_artifact_path(
5880 latest_vnfd_revision
,
5884 charm_artifact_paths
= zip(
5885 current_charm_artifact_path
, target_charm_artifact_path
5888 step
= "Checking if software version has changed in VNFD"
5889 if find_software_version(current_vnfd
) != find_software_version(
5893 step
= "Checking if existing VNF has charm"
5894 for current_charm_path
, target_charm_path
in list(
5895 charm_artifact_paths
5897 if current_charm_path
:
5899 "Software version change is not supported as VNF instance {} has charm.".format(
5904 # There is no change in the charm package, then redeploy the VNF
5905 # based on new descriptor
5906 step
= "Redeploying VNF"
5907 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5908 (result
, detailed_status
) = await self
._ns
_redeploy
_vnf
(
5909 nsr_id
, nslcmop_id
, latest_vnfd
, db_vnfr
, db_nsr
5911 if result
== "FAILED":
5912 nslcmop_operation_state
= result
5913 error_description_nslcmop
= detailed_status
5914 db_nslcmop_update
["detailed-status"] = detailed_status
5917 + " step {} Done with result {} {}".format(
5918 step
, nslcmop_operation_state
, detailed_status
5923 step
= "Checking if any charm package has changed or not"
5924 for current_charm_path
, target_charm_path
in list(
5925 charm_artifact_paths
5929 and target_charm_path
5930 and self
.check_charm_hash_changed(
5931 current_charm_path
, target_charm_path
5935 step
= "Checking whether VNF uses juju bundle"
5936 if check_juju_bundle_existence(current_vnfd
):
5939 "Charm upgrade is not supported for the instance which"
5940 " uses juju-bundle: {}".format(
5941 check_juju_bundle_existence(current_vnfd
)
5945 step
= "Upgrading Charm"
5949 ) = await self
._ns
_charm
_upgrade
(
5952 charm_type
=charm_type
,
5953 path
=self
.fs
.path
+ target_charm_path
,
5954 timeout
=timeout_seconds
,
5957 if result
== "FAILED":
5958 nslcmop_operation_state
= result
5959 error_description_nslcmop
= detailed_status
5961 db_nslcmop_update
["detailed-status"] = detailed_status
5964 + " step {} Done with result {} {}".format(
5965 step
, nslcmop_operation_state
, detailed_status
5969 step
= "Updating policies"
5970 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5971 result
= "COMPLETED"
5972 detailed_status
= "Done"
5973 db_nslcmop_update
["detailed-status"] = "Done"
5975 # If nslcmop_operation_state is None, so any operation is not failed.
5976 if not nslcmop_operation_state
:
5977 nslcmop_operation_state
= "COMPLETED"
5979 # If update CHANGE_VNFPKG nslcmop_operation is successful
5980 # vnf revision need to be updated
5981 vnfr_update
["revision"] = latest_vnfd_revision
5982 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
5986 + " task Done with result {} {}".format(
5987 nslcmop_operation_state
, detailed_status
5990 elif update_type
== "REMOVE_VNF":
5991 # This part is included in https://osm.etsi.org/gerrit/11876
5992 vnf_instance_id
= db_nslcmop
["operationParams"]["removeVnfInstanceId"]
5993 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5994 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5995 step
= "Removing VNF"
5996 (result
, detailed_status
) = await self
.remove_vnf(
5997 nsr_id
, nslcmop_id
, vnf_instance_id
5999 if result
== "FAILED":
6000 nslcmop_operation_state
= result
6001 error_description_nslcmop
= detailed_status
6002 db_nslcmop_update
["detailed-status"] = detailed_status
6003 change_type
= "vnf_terminated"
6004 if not nslcmop_operation_state
:
6005 nslcmop_operation_state
= "COMPLETED"
6008 + " task Done with result {} {}".format(
6009 nslcmop_operation_state
, detailed_status
6013 elif update_type
== "OPERATE_VNF":
6014 vnf_id
= db_nslcmop
["operationParams"]["operateVnfData"][
6017 operation_type
= db_nslcmop
["operationParams"]["operateVnfData"][
6020 additional_param
= db_nslcmop
["operationParams"]["operateVnfData"][
6023 (result
, detailed_status
) = await self
.rebuild_start_stop(
6024 nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
6026 if result
== "FAILED":
6027 nslcmop_operation_state
= result
6028 error_description_nslcmop
= detailed_status
6029 db_nslcmop_update
["detailed-status"] = detailed_status
6030 if not nslcmop_operation_state
:
6031 nslcmop_operation_state
= "COMPLETED"
6034 + " task Done with result {} {}".format(
6035 nslcmop_operation_state
, detailed_status
6039 # If nslcmop_operation_state is None, so any operation is not failed.
6040 # All operations are executed in overall.
6041 if not nslcmop_operation_state
:
6042 nslcmop_operation_state
= "COMPLETED"
6043 db_nsr_update
["operational-status"] = old_operational_status
6045 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
6046 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
6048 except asyncio
.CancelledError
:
6050 logging_text
+ "Cancelled Exception while '{}'".format(step
)
6052 exc
= "Operation was cancelled"
6053 except asyncio
.TimeoutError
:
6054 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
6056 except Exception as e
:
6057 exc
= traceback
.format_exc()
6058 self
.logger
.critical(
6059 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6068 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
6069 nslcmop_operation_state
= "FAILED"
6070 db_nsr_update
["operational-status"] = old_operational_status
6072 self
._write
_ns
_status
(
6074 ns_state
=db_nsr
["nsState"],
6075 current_operation
="IDLE",
6076 current_operation_id
=None,
6077 other_update
=db_nsr_update
,
6080 self
._write
_op
_status
(
6083 error_message
=error_description_nslcmop
,
6084 operation_state
=nslcmop_operation_state
,
6085 other_update
=db_nslcmop_update
,
6088 if nslcmop_operation_state
:
6092 "nslcmop_id": nslcmop_id
,
6093 "operationState": nslcmop_operation_state
,
6095 if change_type
in ("vnf_terminated", "policy_updated"):
6096 msg
.update({"vnf_member_index": member_vnf_index
})
6097 await self
.msg
.aiowrite("ns", change_type
, msg
, loop
=self
.loop
)
6098 except Exception as e
:
6100 logging_text
+ "kafka_write notification Exception {}".format(e
)
6102 self
.logger
.debug(logging_text
+ "Exit")
6103 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_update")
6104 return nslcmop_operation_state
, detailed_status
6106 async def scale(self
, nsr_id
, nslcmop_id
):
6107 # Try to lock HA task here
6108 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
6109 if not task_is_locked_by_me
:
6112 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
6113 stage
= ["", "", ""]
6114 tasks_dict_info
= {}
6115 # ^ stage, step, VIM progress
6116 self
.logger
.debug(logging_text
+ "Enter")
6117 # get all needed from database
6119 db_nslcmop_update
= {}
6122 # in case of error, indicates what part of scale was failed to put nsr at error status
6123 scale_process
= None
6124 old_operational_status
= ""
6125 old_config_status
= ""
6128 # wait for any previous tasks in process
6129 step
= "Waiting for previous operations to terminate"
6130 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
6131 self
._write
_ns
_status
(
6134 current_operation
="SCALING",
6135 current_operation_id
=nslcmop_id
,
6138 step
= "Getting nslcmop from database"
6140 step
+ " after having waited for previous tasks to be completed"
6142 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
6144 step
= "Getting nsr from database"
6145 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
6146 old_operational_status
= db_nsr
["operational-status"]
6147 old_config_status
= db_nsr
["config-status"]
6149 step
= "Parsing scaling parameters"
6150 db_nsr_update
["operational-status"] = "scaling"
6151 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6152 nsr_deployed
= db_nsr
["_admin"].get("deployed")
6154 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
6156 ]["member-vnf-index"]
6157 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
6159 ]["scaling-group-descriptor"]
6160 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
6161 # for backward compatibility
6162 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
6163 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
6164 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
6165 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6167 step
= "Getting vnfr from database"
6168 db_vnfr
= self
.db
.get_one(
6169 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
6172 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
6174 step
= "Getting vnfd from database"
6175 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
6177 base_folder
= db_vnfd
["_admin"]["storage"]
6179 step
= "Getting scaling-group-descriptor"
6180 scaling_descriptor
= find_in_list(
6181 get_scaling_aspect(db_vnfd
),
6182 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
6184 if not scaling_descriptor
:
6186 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
6187 "at vnfd:scaling-group-descriptor".format(scaling_group
)
6190 step
= "Sending scale order to VIM"
6191 # TODO check if ns is in a proper status
6193 if not db_nsr
["_admin"].get("scaling-group"):
6198 "_admin.scaling-group": [
6199 {"name": scaling_group
, "nb-scale-op": 0}
6203 admin_scale_index
= 0
6205 for admin_scale_index
, admin_scale_info
in enumerate(
6206 db_nsr
["_admin"]["scaling-group"]
6208 if admin_scale_info
["name"] == scaling_group
:
6209 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
6211 else: # not found, set index one plus last element and add new entry with the name
6212 admin_scale_index
+= 1
6214 "_admin.scaling-group.{}.name".format(admin_scale_index
)
6217 vca_scaling_info
= []
6218 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
6219 if scaling_type
== "SCALE_OUT":
6220 if "aspect-delta-details" not in scaling_descriptor
:
6222 "Aspect delta details not fount in scaling descriptor {}".format(
6223 scaling_descriptor
["name"]
6226 # count if max-instance-count is reached
6227 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6229 scaling_info
["scaling_direction"] = "OUT"
6230 scaling_info
["vdu-create"] = {}
6231 scaling_info
["kdu-create"] = {}
6232 for delta
in deltas
:
6233 for vdu_delta
in delta
.get("vdu-delta", {}):
6234 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
6235 # vdu_index also provides the number of instance of the targeted vdu
6236 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6237 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
6241 additional_params
= (
6242 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
6245 cloud_init_list
= []
6247 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6248 max_instance_count
= 10
6249 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
6250 max_instance_count
= vdu_profile
.get(
6251 "max-number-of-instances", 10
6254 default_instance_num
= get_number_of_instances(
6257 instances_number
= vdu_delta
.get("number-of-instances", 1)
6258 nb_scale_op
+= instances_number
6260 new_instance_count
= nb_scale_op
+ default_instance_num
6261 # Control if new count is over max and vdu count is less than max.
6262 # Then assign new instance count
6263 if new_instance_count
> max_instance_count
> vdu_count
:
6264 instances_number
= new_instance_count
- max_instance_count
6266 instances_number
= instances_number
6268 if new_instance_count
> max_instance_count
:
6270 "reached the limit of {} (max-instance-count) "
6271 "scaling-out operations for the "
6272 "scaling-group-descriptor '{}'".format(
6273 nb_scale_op
, scaling_group
6276 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6278 # TODO Information of its own ip is not available because db_vnfr is not updated.
6279 additional_params
["OSM"] = get_osm_params(
6280 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
6282 cloud_init_list
.append(
6283 self
._parse
_cloud
_init
(
6290 vca_scaling_info
.append(
6292 "osm_vdu_id": vdu_delta
["id"],
6293 "member-vnf-index": vnf_index
,
6295 "vdu_index": vdu_index
+ x
,
6298 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
6299 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6300 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6301 kdu_name
= kdu_profile
["kdu-name"]
6302 resource_name
= kdu_profile
.get("resource-name", "")
6304 # Might have different kdus in the same delta
6305 # Should have list for each kdu
6306 if not scaling_info
["kdu-create"].get(kdu_name
, None):
6307 scaling_info
["kdu-create"][kdu_name
] = []
6309 kdur
= get_kdur(db_vnfr
, kdu_name
)
6310 if kdur
.get("helm-chart"):
6311 k8s_cluster_type
= "helm-chart-v3"
6312 self
.logger
.debug("kdur: {}".format(kdur
))
6314 kdur
.get("helm-version")
6315 and kdur
.get("helm-version") == "v2"
6317 k8s_cluster_type
= "helm-chart"
6318 elif kdur
.get("juju-bundle"):
6319 k8s_cluster_type
= "juju-bundle"
6322 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6323 "juju-bundle. Maybe an old NBI version is running".format(
6324 db_vnfr
["member-vnf-index-ref"], kdu_name
6328 max_instance_count
= 10
6329 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
6330 max_instance_count
= kdu_profile
.get(
6331 "max-number-of-instances", 10
6334 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
6335 deployed_kdu
, _
= get_deployed_kdu(
6336 nsr_deployed
, kdu_name
, vnf_index
6338 if deployed_kdu
is None:
6340 "KDU '{}' for vnf '{}' not deployed".format(
6344 kdu_instance
= deployed_kdu
.get("kdu-instance")
6345 instance_num
= await self
.k8scluster_map
[
6351 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6352 kdu_model
=deployed_kdu
.get("kdu-model"),
6354 kdu_replica_count
= instance_num
+ kdu_delta
.get(
6355 "number-of-instances", 1
6358 # Control if new count is over max and instance_num is less than max.
6359 # Then assign max instance number to kdu replica count
6360 if kdu_replica_count
> max_instance_count
> instance_num
:
6361 kdu_replica_count
= max_instance_count
6362 if kdu_replica_count
> max_instance_count
:
6364 "reached the limit of {} (max-instance-count) "
6365 "scaling-out operations for the "
6366 "scaling-group-descriptor '{}'".format(
6367 instance_num
, scaling_group
6371 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6372 vca_scaling_info
.append(
6374 "osm_kdu_id": kdu_name
,
6375 "member-vnf-index": vnf_index
,
6377 "kdu_index": instance_num
+ x
- 1,
6380 scaling_info
["kdu-create"][kdu_name
].append(
6382 "member-vnf-index": vnf_index
,
6384 "k8s-cluster-type": k8s_cluster_type
,
6385 "resource-name": resource_name
,
6386 "scale": kdu_replica_count
,
6389 elif scaling_type
== "SCALE_IN":
6390 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6392 scaling_info
["scaling_direction"] = "IN"
6393 scaling_info
["vdu-delete"] = {}
6394 scaling_info
["kdu-delete"] = {}
6396 for delta
in deltas
:
6397 for vdu_delta
in delta
.get("vdu-delta", {}):
6398 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6399 min_instance_count
= 0
6400 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6401 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
6402 min_instance_count
= vdu_profile
["min-number-of-instances"]
6404 default_instance_num
= get_number_of_instances(
6405 db_vnfd
, vdu_delta
["id"]
6407 instance_num
= vdu_delta
.get("number-of-instances", 1)
6408 nb_scale_op
-= instance_num
6410 new_instance_count
= nb_scale_op
+ default_instance_num
6412 if new_instance_count
< min_instance_count
< vdu_count
:
6413 instances_number
= min_instance_count
- new_instance_count
6415 instances_number
= instance_num
6417 if new_instance_count
< min_instance_count
:
6419 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6420 "scaling-group-descriptor '{}'".format(
6421 nb_scale_op
, scaling_group
6424 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6425 vca_scaling_info
.append(
6427 "osm_vdu_id": vdu_delta
["id"],
6428 "member-vnf-index": vnf_index
,
6430 "vdu_index": vdu_index
- 1 - x
,
6433 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
6434 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6435 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6436 kdu_name
= kdu_profile
["kdu-name"]
6437 resource_name
= kdu_profile
.get("resource-name", "")
6439 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
6440 scaling_info
["kdu-delete"][kdu_name
] = []
6442 kdur
= get_kdur(db_vnfr
, kdu_name
)
6443 if kdur
.get("helm-chart"):
6444 k8s_cluster_type
= "helm-chart-v3"
6445 self
.logger
.debug("kdur: {}".format(kdur
))
6447 kdur
.get("helm-version")
6448 and kdur
.get("helm-version") == "v2"
6450 k8s_cluster_type
= "helm-chart"
6451 elif kdur
.get("juju-bundle"):
6452 k8s_cluster_type
= "juju-bundle"
6455 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6456 "juju-bundle. Maybe an old NBI version is running".format(
6457 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
6461 min_instance_count
= 0
6462 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
6463 min_instance_count
= kdu_profile
["min-number-of-instances"]
6465 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
6466 deployed_kdu
, _
= get_deployed_kdu(
6467 nsr_deployed
, kdu_name
, vnf_index
6469 if deployed_kdu
is None:
6471 "KDU '{}' for vnf '{}' not deployed".format(
6475 kdu_instance
= deployed_kdu
.get("kdu-instance")
6476 instance_num
= await self
.k8scluster_map
[
6482 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6483 kdu_model
=deployed_kdu
.get("kdu-model"),
6485 kdu_replica_count
= instance_num
- kdu_delta
.get(
6486 "number-of-instances", 1
6489 if kdu_replica_count
< min_instance_count
< instance_num
:
6490 kdu_replica_count
= min_instance_count
6491 if kdu_replica_count
< min_instance_count
:
6493 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6494 "scaling-group-descriptor '{}'".format(
6495 instance_num
, scaling_group
6499 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6500 vca_scaling_info
.append(
6502 "osm_kdu_id": kdu_name
,
6503 "member-vnf-index": vnf_index
,
6505 "kdu_index": instance_num
- x
- 1,
6508 scaling_info
["kdu-delete"][kdu_name
].append(
6510 "member-vnf-index": vnf_index
,
6512 "k8s-cluster-type": k8s_cluster_type
,
6513 "resource-name": resource_name
,
6514 "scale": kdu_replica_count
,
6518 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
6519 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
6520 if scaling_info
["scaling_direction"] == "IN":
6521 for vdur
in reversed(db_vnfr
["vdur"]):
6522 if vdu_delete
.get(vdur
["vdu-id-ref"]):
6523 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
6524 scaling_info
["vdu"].append(
6526 "name": vdur
.get("name") or vdur
.get("vdu-name"),
6527 "vdu_id": vdur
["vdu-id-ref"],
6531 for interface
in vdur
["interfaces"]:
6532 scaling_info
["vdu"][-1]["interface"].append(
6534 "name": interface
["name"],
6535 "ip_address": interface
["ip-address"],
6536 "mac_address": interface
.get("mac-address"),
6539 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
6542 step
= "Executing pre-scale vnf-config-primitive"
6543 if scaling_descriptor
.get("scaling-config-action"):
6544 for scaling_config_action
in scaling_descriptor
[
6545 "scaling-config-action"
6548 scaling_config_action
.get("trigger") == "pre-scale-in"
6549 and scaling_type
== "SCALE_IN"
6551 scaling_config_action
.get("trigger") == "pre-scale-out"
6552 and scaling_type
== "SCALE_OUT"
6554 vnf_config_primitive
= scaling_config_action
[
6555 "vnf-config-primitive-name-ref"
6557 step
= db_nslcmop_update
[
6559 ] = "executing pre-scale scaling-config-action '{}'".format(
6560 vnf_config_primitive
6563 # look for primitive
6564 for config_primitive
in (
6565 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6566 ).get("config-primitive", ()):
6567 if config_primitive
["name"] == vnf_config_primitive
:
6571 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
6572 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
6573 "primitive".format(scaling_group
, vnf_config_primitive
)
6576 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6577 if db_vnfr
.get("additionalParamsForVnf"):
6578 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6580 scale_process
= "VCA"
6581 db_nsr_update
["config-status"] = "configuring pre-scaling"
6582 primitive_params
= self
._map
_primitive
_params
(
6583 config_primitive
, {}, vnfr_params
6586 # Pre-scale retry check: Check if this sub-operation has been executed before
6587 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6590 vnf_config_primitive
,
6594 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6595 # Skip sub-operation
6596 result
= "COMPLETED"
6597 result_detail
= "Done"
6600 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6601 vnf_config_primitive
, result
, result_detail
6605 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6606 # New sub-operation: Get index of this sub-operation
6608 len(db_nslcmop
.get("_admin", {}).get("operations"))
6613 + "vnf_config_primitive={} New sub-operation".format(
6614 vnf_config_primitive
6618 # retry: Get registered params for this existing sub-operation
6619 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6622 vnf_index
= op
.get("member_vnf_index")
6623 vnf_config_primitive
= op
.get("primitive")
6624 primitive_params
= op
.get("primitive_params")
6627 + "vnf_config_primitive={} Sub-operation retry".format(
6628 vnf_config_primitive
6631 # Execute the primitive, either with new (first-time) or registered (reintent) args
6632 ee_descriptor_id
= config_primitive
.get(
6633 "execution-environment-ref"
6635 primitive_name
= config_primitive
.get(
6636 "execution-environment-primitive", vnf_config_primitive
6638 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6639 nsr_deployed
["VCA"],
6640 member_vnf_index
=vnf_index
,
6642 vdu_count_index
=None,
6643 ee_descriptor_id
=ee_descriptor_id
,
6645 result
, result_detail
= await self
._ns
_execute
_primitive
(
6654 + "vnf_config_primitive={} Done with result {} {}".format(
6655 vnf_config_primitive
, result
, result_detail
6658 # Update operationState = COMPLETED | FAILED
6659 self
._update
_suboperation
_status
(
6660 db_nslcmop
, op_index
, result
, result_detail
6663 if result
== "FAILED":
6664 raise LcmException(result_detail
)
6665 db_nsr_update
["config-status"] = old_config_status
6666 scale_process
= None
6670 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
6673 "_admin.scaling-group.{}.time".format(admin_scale_index
)
6676 # SCALE-IN VCA - BEGIN
6677 if vca_scaling_info
:
6678 step
= db_nslcmop_update
[
6680 ] = "Deleting the execution environments"
6681 scale_process
= "VCA"
6682 for vca_info
in vca_scaling_info
:
6683 if vca_info
["type"] == "delete" and not vca_info
.get("osm_kdu_id"):
6684 member_vnf_index
= str(vca_info
["member-vnf-index"])
6686 logging_text
+ "vdu info: {}".format(vca_info
)
6688 if vca_info
.get("osm_vdu_id"):
6689 vdu_id
= vca_info
["osm_vdu_id"]
6690 vdu_index
= int(vca_info
["vdu_index"])
6693 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6694 member_vnf_index
, vdu_id
, vdu_index
6696 stage
[2] = step
= "Scaling in VCA"
6697 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6698 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
6699 config_update
= db_nsr
["configurationStatus"]
6700 for vca_index
, vca
in enumerate(vca_update
):
6702 (vca
or vca
.get("ee_id"))
6703 and vca
["member-vnf-index"] == member_vnf_index
6704 and vca
["vdu_count_index"] == vdu_index
6706 if vca
.get("vdu_id"):
6707 config_descriptor
= get_configuration(
6708 db_vnfd
, vca
.get("vdu_id")
6710 elif vca
.get("kdu_name"):
6711 config_descriptor
= get_configuration(
6712 db_vnfd
, vca
.get("kdu_name")
6715 config_descriptor
= get_configuration(
6716 db_vnfd
, db_vnfd
["id"]
6718 operation_params
= (
6719 db_nslcmop
.get("operationParams") or {}
6721 exec_terminate_primitives
= not operation_params
.get(
6722 "skip_terminate_primitives"
6723 ) and vca
.get("needed_terminate")
6724 task
= asyncio
.ensure_future(
6733 exec_primitives
=exec_terminate_primitives
,
6737 timeout
=self
.timeout_charm_delete
,
6740 tasks_dict_info
[task
] = "Terminating VCA {}".format(
6743 del vca_update
[vca_index
]
6744 del config_update
[vca_index
]
6745 # wait for pending tasks of terminate primitives
6749 + "Waiting for tasks {}".format(
6750 list(tasks_dict_info
.keys())
6753 error_list
= await self
._wait
_for
_tasks
(
6757 self
.timeout_charm_delete
, self
.timeout_ns_terminate
6762 tasks_dict_info
.clear()
6764 raise LcmException("; ".join(error_list
))
6766 db_vca_and_config_update
= {
6767 "_admin.deployed.VCA": vca_update
,
6768 "configurationStatus": config_update
,
6771 "nsrs", db_nsr
["_id"], db_vca_and_config_update
6773 scale_process
= None
6774 # SCALE-IN VCA - END
6777 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
6778 scale_process
= "RO"
6779 if self
.ro_config
.get("ng"):
6780 await self
._scale
_ng
_ro
(
6781 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
6783 scaling_info
.pop("vdu-create", None)
6784 scaling_info
.pop("vdu-delete", None)
6786 scale_process
= None
6790 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
6791 scale_process
= "KDU"
6792 await self
._scale
_kdu
(
6793 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6795 scaling_info
.pop("kdu-create", None)
6796 scaling_info
.pop("kdu-delete", None)
6798 scale_process
= None
6802 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6804 # SCALE-UP VCA - BEGIN
6805 if vca_scaling_info
:
6806 step
= db_nslcmop_update
[
6808 ] = "Creating new execution environments"
6809 scale_process
= "VCA"
6810 for vca_info
in vca_scaling_info
:
6811 if vca_info
["type"] == "create" and not vca_info
.get("osm_kdu_id"):
6812 member_vnf_index
= str(vca_info
["member-vnf-index"])
6814 logging_text
+ "vdu info: {}".format(vca_info
)
6816 vnfd_id
= db_vnfr
["vnfd-ref"]
6817 if vca_info
.get("osm_vdu_id"):
6818 vdu_index
= int(vca_info
["vdu_index"])
6819 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
6820 if db_vnfr
.get("additionalParamsForVnf"):
6821 deploy_params
.update(
6823 db_vnfr
["additionalParamsForVnf"].copy()
6826 descriptor_config
= get_configuration(
6827 db_vnfd
, db_vnfd
["id"]
6829 if descriptor_config
:
6834 logging_text
=logging_text
6835 + "member_vnf_index={} ".format(member_vnf_index
),
6838 nslcmop_id
=nslcmop_id
,
6844 member_vnf_index
=member_vnf_index
,
6845 vdu_index
=vdu_index
,
6847 deploy_params
=deploy_params
,
6848 descriptor_config
=descriptor_config
,
6849 base_folder
=base_folder
,
6850 task_instantiation_info
=tasks_dict_info
,
6853 vdu_id
= vca_info
["osm_vdu_id"]
6854 vdur
= find_in_list(
6855 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
6857 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
6858 if vdur
.get("additionalParams"):
6859 deploy_params_vdu
= parse_yaml_strings(
6860 vdur
["additionalParams"]
6863 deploy_params_vdu
= deploy_params
6864 deploy_params_vdu
["OSM"] = get_osm_params(
6865 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
6867 if descriptor_config
:
6872 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6873 member_vnf_index
, vdu_id
, vdu_index
6875 stage
[2] = step
= "Scaling out VCA"
6876 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6878 logging_text
=logging_text
6879 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6880 member_vnf_index
, vdu_id
, vdu_index
6884 nslcmop_id
=nslcmop_id
,
6890 member_vnf_index
=member_vnf_index
,
6891 vdu_index
=vdu_index
,
6893 deploy_params
=deploy_params_vdu
,
6894 descriptor_config
=descriptor_config
,
6895 base_folder
=base_folder
,
6896 task_instantiation_info
=tasks_dict_info
,
6899 # SCALE-UP VCA - END
6900 scale_process
= None
6903 # execute primitive service POST-SCALING
6904 step
= "Executing post-scale vnf-config-primitive"
6905 if scaling_descriptor
.get("scaling-config-action"):
6906 for scaling_config_action
in scaling_descriptor
[
6907 "scaling-config-action"
6910 scaling_config_action
.get("trigger") == "post-scale-in"
6911 and scaling_type
== "SCALE_IN"
6913 scaling_config_action
.get("trigger") == "post-scale-out"
6914 and scaling_type
== "SCALE_OUT"
6916 vnf_config_primitive
= scaling_config_action
[
6917 "vnf-config-primitive-name-ref"
6919 step
= db_nslcmop_update
[
6921 ] = "executing post-scale scaling-config-action '{}'".format(
6922 vnf_config_primitive
6925 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6926 if db_vnfr
.get("additionalParamsForVnf"):
6927 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6929 # look for primitive
6930 for config_primitive
in (
6931 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6932 ).get("config-primitive", ()):
6933 if config_primitive
["name"] == vnf_config_primitive
:
6937 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
6938 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
6939 "config-primitive".format(
6940 scaling_group
, vnf_config_primitive
6943 scale_process
= "VCA"
6944 db_nsr_update
["config-status"] = "configuring post-scaling"
6945 primitive_params
= self
._map
_primitive
_params
(
6946 config_primitive
, {}, vnfr_params
6949 # Post-scale retry check: Check if this sub-operation has been executed before
6950 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6953 vnf_config_primitive
,
6957 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6958 # Skip sub-operation
6959 result
= "COMPLETED"
6960 result_detail
= "Done"
6963 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6964 vnf_config_primitive
, result
, result_detail
6968 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6969 # New sub-operation: Get index of this sub-operation
6971 len(db_nslcmop
.get("_admin", {}).get("operations"))
6976 + "vnf_config_primitive={} New sub-operation".format(
6977 vnf_config_primitive
6981 # retry: Get registered params for this existing sub-operation
6982 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6985 vnf_index
= op
.get("member_vnf_index")
6986 vnf_config_primitive
= op
.get("primitive")
6987 primitive_params
= op
.get("primitive_params")
6990 + "vnf_config_primitive={} Sub-operation retry".format(
6991 vnf_config_primitive
6994 # Execute the primitive, either with new (first-time) or registered (reintent) args
6995 ee_descriptor_id
= config_primitive
.get(
6996 "execution-environment-ref"
6998 primitive_name
= config_primitive
.get(
6999 "execution-environment-primitive", vnf_config_primitive
7001 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
7002 nsr_deployed
["VCA"],
7003 member_vnf_index
=vnf_index
,
7005 vdu_count_index
=None,
7006 ee_descriptor_id
=ee_descriptor_id
,
7008 result
, result_detail
= await self
._ns
_execute
_primitive
(
7017 + "vnf_config_primitive={} Done with result {} {}".format(
7018 vnf_config_primitive
, result
, result_detail
7021 # Update operationState = COMPLETED | FAILED
7022 self
._update
_suboperation
_status
(
7023 db_nslcmop
, op_index
, result
, result_detail
7026 if result
== "FAILED":
7027 raise LcmException(result_detail
)
7028 db_nsr_update
["config-status"] = old_config_status
7029 scale_process
= None
7034 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
7035 db_nsr_update
["operational-status"] = (
7037 if old_operational_status
== "failed"
7038 else old_operational_status
7040 db_nsr_update
["config-status"] = old_config_status
7043 ROclient
.ROClientException
,
7048 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7050 except asyncio
.CancelledError
:
7052 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7054 exc
= "Operation was cancelled"
7055 except Exception as e
:
7056 exc
= traceback
.format_exc()
7057 self
.logger
.critical(
7058 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7062 self
._write
_ns
_status
(
7065 current_operation
="IDLE",
7066 current_operation_id
=None,
7069 stage
[1] = "Waiting for instantiate pending tasks."
7070 self
.logger
.debug(logging_text
+ stage
[1])
7071 exc
= await self
._wait
_for
_tasks
(
7074 self
.timeout_ns_deploy
,
7082 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7083 nslcmop_operation_state
= "FAILED"
7085 db_nsr_update
["operational-status"] = old_operational_status
7086 db_nsr_update
["config-status"] = old_config_status
7087 db_nsr_update
["detailed-status"] = ""
7089 if "VCA" in scale_process
:
7090 db_nsr_update
["config-status"] = "failed"
7091 if "RO" in scale_process
:
7092 db_nsr_update
["operational-status"] = "failed"
7095 ] = "FAILED scaling nslcmop={} {}: {}".format(
7096 nslcmop_id
, step
, exc
7099 error_description_nslcmop
= None
7100 nslcmop_operation_state
= "COMPLETED"
7101 db_nslcmop_update
["detailed-status"] = "Done"
7103 self
._write
_op
_status
(
7106 error_message
=error_description_nslcmop
,
7107 operation_state
=nslcmop_operation_state
,
7108 other_update
=db_nslcmop_update
,
7111 self
._write
_ns
_status
(
7114 current_operation
="IDLE",
7115 current_operation_id
=None,
7116 other_update
=db_nsr_update
,
7119 if nslcmop_operation_state
:
7123 "nslcmop_id": nslcmop_id
,
7124 "operationState": nslcmop_operation_state
,
7126 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
7127 except Exception as e
:
7129 logging_text
+ "kafka_write notification Exception {}".format(e
)
7131 self
.logger
.debug(logging_text
+ "Exit")
7132 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
7134 async def _scale_kdu(
7135 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
7137 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
7138 for kdu_name
in _scaling_info
:
7139 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
7140 deployed_kdu
, index
= get_deployed_kdu(
7141 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
7143 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
7144 kdu_instance
= deployed_kdu
["kdu-instance"]
7145 kdu_model
= deployed_kdu
.get("kdu-model")
7146 scale
= int(kdu_scaling_info
["scale"])
7147 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
7150 "collection": "nsrs",
7151 "filter": {"_id": nsr_id
},
7152 "path": "_admin.deployed.K8s.{}".format(index
),
7155 step
= "scaling application {}".format(
7156 kdu_scaling_info
["resource-name"]
7158 self
.logger
.debug(logging_text
+ step
)
7160 if kdu_scaling_info
["type"] == "delete":
7161 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7164 and kdu_config
.get("terminate-config-primitive")
7165 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7167 terminate_config_primitive_list
= kdu_config
.get(
7168 "terminate-config-primitive"
7170 terminate_config_primitive_list
.sort(
7171 key
=lambda val
: int(val
["seq"])
7175 terminate_config_primitive
7176 ) in terminate_config_primitive_list
:
7177 primitive_params_
= self
._map
_primitive
_params
(
7178 terminate_config_primitive
, {}, {}
7180 step
= "execute terminate config primitive"
7181 self
.logger
.debug(logging_text
+ step
)
7182 await asyncio
.wait_for(
7183 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7184 cluster_uuid
=cluster_uuid
,
7185 kdu_instance
=kdu_instance
,
7186 primitive_name
=terminate_config_primitive
["name"],
7187 params
=primitive_params_
,
7194 await asyncio
.wait_for(
7195 self
.k8scluster_map
[k8s_cluster_type
].scale(
7198 kdu_scaling_info
["resource-name"],
7200 cluster_uuid
=cluster_uuid
,
7201 kdu_model
=kdu_model
,
7205 timeout
=self
.timeout_vca_on_error
,
7208 if kdu_scaling_info
["type"] == "create":
7209 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7212 and kdu_config
.get("initial-config-primitive")
7213 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7215 initial_config_primitive_list
= kdu_config
.get(
7216 "initial-config-primitive"
7218 initial_config_primitive_list
.sort(
7219 key
=lambda val
: int(val
["seq"])
7222 for initial_config_primitive
in initial_config_primitive_list
:
7223 primitive_params_
= self
._map
_primitive
_params
(
7224 initial_config_primitive
, {}, {}
7226 step
= "execute initial config primitive"
7227 self
.logger
.debug(logging_text
+ step
)
7228 await asyncio
.wait_for(
7229 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7230 cluster_uuid
=cluster_uuid
,
7231 kdu_instance
=kdu_instance
,
7232 primitive_name
=initial_config_primitive
["name"],
7233 params
=primitive_params_
,
7240 async def _scale_ng_ro(
7241 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
7243 nsr_id
= db_nslcmop
["nsInstanceId"]
7244 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7247 # read from db: vnfd's for every vnf
7250 # for each vnf in ns, read vnfd
7251 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
7252 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
7253 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
7254 # if we haven't this vnfd, read it from db
7255 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
7257 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7258 db_vnfds
.append(vnfd
)
7259 n2vc_key
= self
.n2vc
.get_public_key()
7260 n2vc_key_list
= [n2vc_key
]
7263 vdu_scaling_info
.get("vdu-create"),
7264 vdu_scaling_info
.get("vdu-delete"),
7267 # db_vnfr has been updated, update db_vnfrs to use it
7268 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
7269 await self
._instantiate
_ng
_ro
(
7279 start_deploy
=time(),
7280 timeout_ns_deploy
=self
.timeout_ns_deploy
,
7282 if vdu_scaling_info
.get("vdu-delete"):
7284 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
7287 async def extract_prometheus_scrape_jobs(
7288 self
, ee_id
, artifact_path
, ee_config_descriptor
, vnfr_id
, nsr_id
, target_ip
7290 # look if exist a file called 'prometheus*.j2' and
7291 artifact_content
= self
.fs
.dir_ls(artifact_path
)
7295 for f
in artifact_content
7296 if f
.startswith("prometheus") and f
.endswith(".j2")
7302 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
7306 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
7307 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
7309 vnfr_id
= vnfr_id
.replace("-", "")
7311 "JOB_NAME": vnfr_id
,
7312 "TARGET_IP": target_ip
,
7313 "EXPORTER_POD_IP": host_name
,
7314 "EXPORTER_POD_PORT": host_port
,
7316 job_list
= parse_job(job_data
, variables
)
7317 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
7318 for job
in job_list
:
7320 not isinstance(job
.get("job_name"), str)
7321 or vnfr_id
not in job
["job_name"]
7323 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
7324 job
["nsr_id"] = nsr_id
7325 job
["vnfr_id"] = vnfr_id
7328 async def rebuild_start_stop(
7329 self
, nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
7331 logging_text
= "Task ns={} {}={} ".format(nsr_id
, operation_type
, nslcmop_id
)
7332 self
.logger
.info(logging_text
+ "Enter")
7333 stage
= ["Preparing the environment", ""]
7334 # database nsrs record
7338 # in case of error, indicates what part of scale was failed to put nsr at error status
7339 start_deploy
= time()
7341 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_id
})
7342 vim_account_id
= db_vnfr
.get("vim-account-id")
7343 vim_info_key
= "vim:" + vim_account_id
7344 vdu_id
= additional_param
["vdu_id"]
7345 vdurs
= [item
for item
in db_vnfr
["vdur"] if item
["vdu-id-ref"] == vdu_id
]
7346 vdur
= find_in_list(
7347 vdurs
, lambda vdu
: vdu
["count-index"] == additional_param
["count-index"]
7350 vdu_vim_name
= vdur
["name"]
7351 vim_vm_id
= vdur
["vim_info"][vim_info_key
]["vim_id"]
7352 target_vim
, _
= next(k_v
for k_v
in vdur
["vim_info"].items())
7354 raise LcmException("Target vdu is not found")
7355 self
.logger
.info("vdu_vim_name >> {} ".format(vdu_vim_name
))
7356 # wait for any previous tasks in process
7357 stage
[1] = "Waiting for previous operations to terminate"
7358 self
.logger
.info(stage
[1])
7359 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7361 stage
[1] = "Reading from database."
7362 self
.logger
.info(stage
[1])
7363 self
._write
_ns
_status
(
7366 current_operation
=operation_type
.upper(),
7367 current_operation_id
=nslcmop_id
,
7369 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7372 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
7373 db_nsr_update
["operational-status"] = operation_type
7374 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7378 "vim_vm_id": vim_vm_id
,
7380 "vdu_index": additional_param
["count-index"],
7381 "vdu_id": vdur
["id"],
7382 "target_vim": target_vim
,
7383 "vim_account_id": vim_account_id
,
7386 stage
[1] = "Sending rebuild request to RO... {}".format(desc
)
7387 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7388 self
.logger
.info("ro nsr id: {}".format(nsr_id
))
7389 result_dict
= await self
.RO
.operate(nsr_id
, desc
, operation_type
)
7390 self
.logger
.info("response from RO: {}".format(result_dict
))
7391 action_id
= result_dict
["action_id"]
7392 await self
._wait
_ng
_ro
(
7397 self
.timeout_operate
,
7399 "start_stop_rebuild",
7401 return "COMPLETED", "Done"
7402 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7403 self
.logger
.error("Exit Exception {}".format(e
))
7405 except asyncio
.CancelledError
:
7406 self
.logger
.error("Cancelled Exception while '{}'".format(stage
))
7407 exc
= "Operation was cancelled"
7408 except Exception as e
:
7409 exc
= traceback
.format_exc()
7410 self
.logger
.critical(
7411 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7413 return "FAILED", "Error in operate VNF {}".format(exc
)
7415 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7417 Get VCA Cloud and VCA Cloud Credentials for the VIM account
7419 :param: vim_account_id: VIM Account ID
7421 :return: (cloud_name, cloud_credential)
7423 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7424 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
7426 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7428 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
7430 :param: vim_account_id: VIM Account ID
7432 :return: (cloud_name, cloud_credential)
7434 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7435 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")
7437 async def migrate(self
, nsr_id
, nslcmop_id
):
7439 Migrate VNFs and VDUs instances in a NS
7441 :param: nsr_id: NS Instance ID
7442 :param: nslcmop_id: nslcmop ID of migrate
7445 # Try to lock HA task here
7446 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7447 if not task_is_locked_by_me
:
7449 logging_text
= "Task ns={} migrate ".format(nsr_id
)
7450 self
.logger
.debug(logging_text
+ "Enter")
7451 # get all needed from database
7453 db_nslcmop_update
= {}
7454 nslcmop_operation_state
= None
7458 # in case of error, indicates what part of scale was failed to put nsr at error status
7459 start_deploy
= time()
7462 # wait for any previous tasks in process
7463 step
= "Waiting for previous operations to terminate"
7464 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7466 self
._write
_ns
_status
(
7469 current_operation
="MIGRATING",
7470 current_operation_id
=nslcmop_id
,
7472 step
= "Getting nslcmop from database"
7474 step
+ " after having waited for previous tasks to be completed"
7476 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7477 migrate_params
= db_nslcmop
.get("operationParams")
7480 target
.update(migrate_params
)
7481 desc
= await self
.RO
.migrate(nsr_id
, target
)
7482 self
.logger
.debug("RO return > {}".format(desc
))
7483 action_id
= desc
["action_id"]
7484 await self
._wait
_ng
_ro
(
7489 self
.timeout_migrate
,
7490 operation
="migrate",
7492 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7493 self
.logger
.error("Exit Exception {}".format(e
))
7495 except asyncio
.CancelledError
:
7496 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
7497 exc
= "Operation was cancelled"
7498 except Exception as e
:
7499 exc
= traceback
.format_exc()
7500 self
.logger
.critical(
7501 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7504 self
._write
_ns
_status
(
7507 current_operation
="IDLE",
7508 current_operation_id
=None,
7511 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
7512 nslcmop_operation_state
= "FAILED"
7514 nslcmop_operation_state
= "COMPLETED"
7515 db_nslcmop_update
["detailed-status"] = "Done"
7516 db_nsr_update
["detailed-status"] = "Done"
7518 self
._write
_op
_status
(
7522 operation_state
=nslcmop_operation_state
,
7523 other_update
=db_nslcmop_update
,
7525 if nslcmop_operation_state
:
7529 "nslcmop_id": nslcmop_id
,
7530 "operationState": nslcmop_operation_state
,
7532 await self
.msg
.aiowrite("ns", "migrated", msg
, loop
=self
.loop
)
7533 except Exception as e
:
7535 logging_text
+ "kafka_write notification Exception {}".format(e
)
7537 self
.logger
.debug(logging_text
+ "Exit")
7538 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_migrate")
7540 async def heal(self
, nsr_id
, nslcmop_id
):
7544 :param nsr_id: ns instance to heal
7545 :param nslcmop_id: operation to run
7549 # Try to lock HA task here
7550 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7551 if not task_is_locked_by_me
:
7554 logging_text
= "Task ns={} heal={} ".format(nsr_id
, nslcmop_id
)
7555 stage
= ["", "", ""]
7556 tasks_dict_info
= {}
7557 # ^ stage, step, VIM progress
7558 self
.logger
.debug(logging_text
+ "Enter")
7559 # get all needed from database
7561 db_nslcmop_update
= {}
7563 db_vnfrs
= {} # vnf's info indexed by _id
7565 old_operational_status
= ""
7566 old_config_status
= ""
7569 # wait for any previous tasks in process
7570 step
= "Waiting for previous operations to terminate"
7571 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7572 self
._write
_ns
_status
(
7575 current_operation
="HEALING",
7576 current_operation_id
=nslcmop_id
,
7579 step
= "Getting nslcmop from database"
7581 step
+ " after having waited for previous tasks to be completed"
7583 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7585 step
= "Getting nsr from database"
7586 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
7587 old_operational_status
= db_nsr
["operational-status"]
7588 old_config_status
= db_nsr
["config-status"]
7591 "_admin.deployed.RO.operational-status": "healing",
7593 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7595 step
= "Sending heal order to VIM"
7596 task_ro
= asyncio
.ensure_future(
7598 logging_text
=logging_text
,
7600 db_nslcmop
=db_nslcmop
,
7604 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "heal_RO", task_ro
)
7605 tasks_dict_info
[task_ro
] = "Healing at VIM"
7609 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
7610 self
.logger
.debug(logging_text
+ stage
[1])
7611 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7612 self
.fs
.sync(db_nsr
["nsd-id"])
7614 # read from db: vnfr's of this ns
7615 step
= "Getting vnfrs from db"
7616 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
7617 for vnfr
in db_vnfrs_list
:
7618 db_vnfrs
[vnfr
["_id"]] = vnfr
7619 self
.logger
.debug("ns.heal db_vnfrs={}".format(db_vnfrs
))
7621 # Check for each target VNF
7622 target_list
= db_nslcmop
.get("operationParams", {}).get("healVnfData", {})
7623 for target_vnf
in target_list
:
7624 # Find this VNF in the list from DB
7625 vnfr_id
= target_vnf
.get("vnfInstanceId", None)
7627 db_vnfr
= db_vnfrs
[vnfr_id
]
7628 vnfd_id
= db_vnfr
.get("vnfd-id")
7629 vnfd_ref
= db_vnfr
.get("vnfd-ref")
7630 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7631 base_folder
= vnfd
["_admin"]["storage"]
7636 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
7637 member_vnf_index
= db_vnfr
.get("member-vnf-index-ref")
7639 # Check each target VDU and deploy N2VC
7640 target_vdu_list
= target_vnf
.get("additionalParams", {}).get(
7643 if not target_vdu_list
:
7644 # Codigo nuevo para crear diccionario
7645 target_vdu_list
= []
7646 for existing_vdu
in db_vnfr
.get("vdur"):
7647 vdu_name
= existing_vdu
.get("vdu-name", None)
7648 vdu_index
= existing_vdu
.get("count-index", 0)
7649 vdu_run_day1
= target_vnf
.get("additionalParams", {}).get(
7652 vdu_to_be_healed
= {
7654 "count-index": vdu_index
,
7655 "run-day1": vdu_run_day1
,
7657 target_vdu_list
.append(vdu_to_be_healed
)
7658 for target_vdu
in target_vdu_list
:
7659 deploy_params_vdu
= target_vdu
7660 # Set run-day1 vnf level value if not vdu level value exists
7661 if not deploy_params_vdu
.get("run-day1") and target_vnf
[
7664 deploy_params_vdu
["run-day1"] = target_vnf
[
7667 vdu_name
= target_vdu
.get("vdu-id", None)
7668 # TODO: Get vdu_id from vdud.
7670 # For multi instance VDU count-index is mandatory
7671 # For single session VDU count-indes is 0
7672 vdu_index
= target_vdu
.get("count-index", 0)
7674 # n2vc_redesign STEP 3 to 6 Deploy N2VC
7675 stage
[1] = "Deploying Execution Environments."
7676 self
.logger
.debug(logging_text
+ stage
[1])
7678 # VNF Level charm. Normal case when proxy charms.
7679 # If target instance is management machine continue with actions: recreate EE for native charms or reinject juju key for proxy charms.
7680 descriptor_config
= get_configuration(vnfd
, vnfd_ref
)
7681 if descriptor_config
:
7682 # Continue if healed machine is management machine
7683 vnf_ip_address
= db_vnfr
.get("ip-address")
7684 target_instance
= None
7685 for instance
in db_vnfr
.get("vdur", None):
7687 instance
["vdu-name"] == vdu_name
7688 and instance
["count-index"] == vdu_index
7690 target_instance
= instance
7692 if vnf_ip_address
== target_instance
.get("ip-address"):
7694 logging_text
=logging_text
7695 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7696 member_vnf_index
, vdu_name
, vdu_index
7700 nslcmop_id
=nslcmop_id
,
7706 member_vnf_index
=member_vnf_index
,
7709 deploy_params
=deploy_params_vdu
,
7710 descriptor_config
=descriptor_config
,
7711 base_folder
=base_folder
,
7712 task_instantiation_info
=tasks_dict_info
,
7716 # VDU Level charm. Normal case with native charms.
7717 descriptor_config
= get_configuration(vnfd
, vdu_name
)
7718 if descriptor_config
:
7720 logging_text
=logging_text
7721 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7722 member_vnf_index
, vdu_name
, vdu_index
7726 nslcmop_id
=nslcmop_id
,
7732 member_vnf_index
=member_vnf_index
,
7733 vdu_index
=vdu_index
,
7735 deploy_params
=deploy_params_vdu
,
7736 descriptor_config
=descriptor_config
,
7737 base_folder
=base_folder
,
7738 task_instantiation_info
=tasks_dict_info
,
7743 ROclient
.ROClientException
,
7748 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7750 except asyncio
.CancelledError
:
7752 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7754 exc
= "Operation was cancelled"
7755 except Exception as e
:
7756 exc
= traceback
.format_exc()
7757 self
.logger
.critical(
7758 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7763 stage
[1] = "Waiting for healing pending tasks."
7764 self
.logger
.debug(logging_text
+ stage
[1])
7765 exc
= await self
._wait
_for
_tasks
(
7768 self
.timeout_ns_deploy
,
7776 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7777 nslcmop_operation_state
= "FAILED"
7779 db_nsr_update
["operational-status"] = old_operational_status
7780 db_nsr_update
["config-status"] = old_config_status
7783 ] = "FAILED healing nslcmop={} {}: {}".format(nslcmop_id
, step
, exc
)
7784 for task
, task_name
in tasks_dict_info
.items():
7785 if not task
.done() or task
.cancelled() or task
.exception():
7786 if task_name
.startswith(self
.task_name_deploy_vca
):
7787 # A N2VC task is pending
7788 db_nsr_update
["config-status"] = "failed"
7790 # RO task is pending
7791 db_nsr_update
["operational-status"] = "failed"
7793 error_description_nslcmop
= None
7794 nslcmop_operation_state
= "COMPLETED"
7795 db_nslcmop_update
["detailed-status"] = "Done"
7796 db_nsr_update
["detailed-status"] = "Done"
7797 db_nsr_update
["operational-status"] = "running"
7798 db_nsr_update
["config-status"] = "configured"
7800 self
._write
_op
_status
(
7803 error_message
=error_description_nslcmop
,
7804 operation_state
=nslcmop_operation_state
,
7805 other_update
=db_nslcmop_update
,
7808 self
._write
_ns
_status
(
7811 current_operation
="IDLE",
7812 current_operation_id
=None,
7813 other_update
=db_nsr_update
,
7816 if nslcmop_operation_state
:
7820 "nslcmop_id": nslcmop_id
,
7821 "operationState": nslcmop_operation_state
,
7823 await self
.msg
.aiowrite("ns", "healed", msg
, loop
=self
.loop
)
7824 except Exception as e
:
7826 logging_text
+ "kafka_write notification Exception {}".format(e
)
7828 self
.logger
.debug(logging_text
+ "Exit")
7829 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_heal")
7840 :param logging_text: preffix text to use at logging
7841 :param nsr_id: nsr identity
7842 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
7843 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
7844 :return: None or exception
7847 def get_vim_account(vim_account_id
):
7849 if vim_account_id
in db_vims
:
7850 return db_vims
[vim_account_id
]
7851 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
7852 db_vims
[vim_account_id
] = db_vim
7857 ns_params
= db_nslcmop
.get("operationParams")
7858 if ns_params
and ns_params
.get("timeout_ns_heal"):
7859 timeout_ns_heal
= ns_params
["timeout_ns_heal"]
7861 timeout_ns_heal
= self
.timeout
.get("ns_heal", self
.timeout_ns_heal
)
7865 nslcmop_id
= db_nslcmop
["_id"]
7867 "action_id": nslcmop_id
,
7869 self
.logger
.warning(
7870 "db_nslcmop={} and timeout_ns_heal={}".format(
7871 db_nslcmop
, timeout_ns_heal
7874 target
.update(db_nslcmop
.get("operationParams", {}))
7876 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
7877 desc
= await self
.RO
.recreate(nsr_id
, target
)
7878 self
.logger
.debug("RO return > {}".format(desc
))
7879 action_id
= desc
["action_id"]
7880 # waits for RO to complete because Reinjecting juju key at ro can find VM in state Deleted
7881 await self
._wait
_ng
_ro
(
7888 operation
="healing",
7893 "_admin.deployed.RO.operational-status": "running",
7894 "detailed-status": " ".join(stage
),
7896 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7897 self
._write
_op
_status
(nslcmop_id
, stage
)
7899 logging_text
+ "ns healed at RO. RO_id={}".format(action_id
)
7902 except Exception as e
:
7903 stage
[2] = "ERROR healing at VIM"
7904 # self.set_vnfr_at_error(db_vnfrs, str(e))
7906 "Error healing at VIM {}".format(e
),
7907 exc_info
=not isinstance(
7910 ROclient
.ROClientException
,
7936 task_instantiation_info
,
7939 # launch instantiate_N2VC in a asyncio task and register task object
7940 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
7941 # if not found, create one entry and update database
7942 # fill db_nsr._admin.deployed.VCA.<index>
7945 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
7949 get_charm_name
= False
7950 if "execution-environment-list" in descriptor_config
:
7951 ee_list
= descriptor_config
.get("execution-environment-list", [])
7952 elif "juju" in descriptor_config
:
7953 ee_list
= [descriptor_config
] # ns charms
7954 if "execution-environment-list" not in descriptor_config
:
7955 # charm name is only required for ns charms
7956 get_charm_name
= True
7957 else: # other types as script are not supported
7960 for ee_item
in ee_list
:
7963 + "_deploy_n2vc ee_item juju={}, helm={}".format(
7964 ee_item
.get("juju"), ee_item
.get("helm-chart")
7967 ee_descriptor_id
= ee_item
.get("id")
7968 if ee_item
.get("juju"):
7969 vca_name
= ee_item
["juju"].get("charm")
7971 charm_name
= self
.find_charm_name(db_nsr
, str(vca_name
))
7974 if ee_item
["juju"].get("charm") is not None
7977 if ee_item
["juju"].get("cloud") == "k8s":
7978 vca_type
= "k8s_proxy_charm"
7979 elif ee_item
["juju"].get("proxy") is False:
7980 vca_type
= "native_charm"
7981 elif ee_item
.get("helm-chart"):
7982 vca_name
= ee_item
["helm-chart"]
7983 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
7986 vca_type
= "helm-v3"
7989 logging_text
+ "skipping non juju neither charm configuration"
7994 for vca_index
, vca_deployed
in enumerate(
7995 db_nsr
["_admin"]["deployed"]["VCA"]
7997 if not vca_deployed
:
8000 vca_deployed
.get("member-vnf-index") == member_vnf_index
8001 and vca_deployed
.get("vdu_id") == vdu_id
8002 and vca_deployed
.get("kdu_name") == kdu_name
8003 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
8004 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
8008 # not found, create one.
8010 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
8013 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
8015 target
+= "/kdu/{}".format(kdu_name
)
8017 "target_element": target
,
8018 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
8019 "member-vnf-index": member_vnf_index
,
8021 "kdu_name": kdu_name
,
8022 "vdu_count_index": vdu_index
,
8023 "operational-status": "init", # TODO revise
8024 "detailed-status": "", # TODO revise
8025 "step": "initial-deploy", # TODO revise
8027 "vdu_name": vdu_name
,
8029 "ee_descriptor_id": ee_descriptor_id
,
8030 "charm_name": charm_name
,
8034 # create VCA and configurationStatus in db
8036 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
8037 "configurationStatus.{}".format(vca_index
): dict(),
8039 self
.update_db_2("nsrs", nsr_id
, db_dict
)
8041 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
8043 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
8044 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
8045 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
8048 task_n2vc
= asyncio
.ensure_future(
8050 logging_text
=logging_text
,
8051 vca_index
=vca_index
,
8057 vdu_index
=vdu_index
,
8058 deploy_params
=deploy_params
,
8059 config_descriptor
=descriptor_config
,
8060 base_folder
=base_folder
,
8061 nslcmop_id
=nslcmop_id
,
8065 ee_config_descriptor
=ee_item
,
8068 self
.lcm_tasks
.register(
8072 "instantiate_N2VC-{}".format(vca_index
),
8075 task_instantiation_info
[
8077 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
8078 member_vnf_index
or "", vdu_id
or ""
8081 async def heal_N2VC(
8098 ee_config_descriptor
,
8100 nsr_id
= db_nsr
["_id"]
8101 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
8102 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
8103 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
8104 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
8106 "collection": "nsrs",
8107 "filter": {"_id": nsr_id
},
8108 "path": db_update_entry
,
8114 element_under_configuration
= nsr_id
8118 vnfr_id
= db_vnfr
["_id"]
8119 osm_config
["osm"]["vnf_id"] = vnfr_id
8121 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
8123 if vca_type
== "native_charm":
8126 index_number
= vdu_index
or 0
8129 element_type
= "VNF"
8130 element_under_configuration
= vnfr_id
8131 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
8133 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
8134 element_type
= "VDU"
8135 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
8136 osm_config
["osm"]["vdu_id"] = vdu_id
8138 namespace
+= ".{}".format(kdu_name
)
8139 element_type
= "KDU"
8140 element_under_configuration
= kdu_name
8141 osm_config
["osm"]["kdu_name"] = kdu_name
8144 if base_folder
["pkg-dir"]:
8145 artifact_path
= "{}/{}/{}/{}".format(
8146 base_folder
["folder"],
8147 base_folder
["pkg-dir"],
8150 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8155 artifact_path
= "{}/Scripts/{}/{}/".format(
8156 base_folder
["folder"],
8159 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8164 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
8166 # get initial_config_primitive_list that applies to this element
8167 initial_config_primitive_list
= config_descriptor
.get(
8168 "initial-config-primitive"
8172 "Initial config primitive list > {}".format(
8173 initial_config_primitive_list
8177 # add config if not present for NS charm
8178 ee_descriptor_id
= ee_config_descriptor
.get("id")
8179 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
8180 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
8181 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
8185 "Initial config primitive list #2 > {}".format(
8186 initial_config_primitive_list
8189 # n2vc_redesign STEP 3.1
8190 # find old ee_id if exists
8191 ee_id
= vca_deployed
.get("ee_id")
8193 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
8194 # create or register execution environment in VCA. Only for native charms when healing
8195 if vca_type
== "native_charm":
8196 step
= "Waiting to VM being up and getting IP address"
8197 self
.logger
.debug(logging_text
+ step
)
8198 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8207 credentials
= {"hostname": rw_mgmt_ip
}
8209 username
= deep_get(
8210 config_descriptor
, ("config-access", "ssh-access", "default-user")
8212 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
8213 # merged. Meanwhile let's get username from initial-config-primitive
8214 if not username
and initial_config_primitive_list
:
8215 for config_primitive
in initial_config_primitive_list
:
8216 for param
in config_primitive
.get("parameter", ()):
8217 if param
["name"] == "ssh-username":
8218 username
= param
["value"]
8222 "Cannot determine the username neither with 'initial-config-primitive' nor with "
8223 "'config-access.ssh-access.default-user'"
8225 credentials
["username"] = username
8227 # n2vc_redesign STEP 3.2
8228 # TODO: Before healing at RO it is needed to destroy native charm units to be deleted.
8229 self
._write
_configuration
_status
(
8231 vca_index
=vca_index
,
8232 status
="REGISTERING",
8233 element_under_configuration
=element_under_configuration
,
8234 element_type
=element_type
,
8237 step
= "register execution environment {}".format(credentials
)
8238 self
.logger
.debug(logging_text
+ step
)
8239 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
8240 credentials
=credentials
,
8241 namespace
=namespace
,
8246 # update ee_id en db
8248 "_admin.deployed.VCA.{}.ee_id".format(vca_index
): ee_id
,
8250 self
.update_db_2("nsrs", nsr_id
, db_dict_ee_id
)
8252 # for compatibility with MON/POL modules, the need model and application name at database
8253 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
8254 # Not sure if this need to be done when healing
8256 ee_id_parts = ee_id.split(".")
8257 db_nsr_update = {db_update_entry + "ee_id": ee_id}
8258 if len(ee_id_parts) >= 2:
8259 model_name = ee_id_parts[0]
8260 application_name = ee_id_parts[1]
8261 db_nsr_update[db_update_entry + "model"] = model_name
8262 db_nsr_update[db_update_entry + "application"] = application_name
8265 # n2vc_redesign STEP 3.3
8266 # Install configuration software. Only for native charms.
8267 step
= "Install configuration Software"
8269 self
._write
_configuration
_status
(
8271 vca_index
=vca_index
,
8272 status
="INSTALLING SW",
8273 element_under_configuration
=element_under_configuration
,
8274 element_type
=element_type
,
8275 # other_update=db_nsr_update,
8279 # TODO check if already done
8280 self
.logger
.debug(logging_text
+ step
)
8282 if vca_type
== "native_charm":
8283 config_primitive
= next(
8284 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
8287 if config_primitive
:
8288 config
= self
._map
_primitive
_params
(
8289 config_primitive
, {}, deploy_params
8291 await self
.vca_map
[vca_type
].install_configuration_sw(
8293 artifact_path
=artifact_path
,
8301 # write in db flag of configuration_sw already installed
8303 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
8306 # Not sure if this need to be done when healing
8308 # add relations for this VCA (wait for other peers related with this VCA)
8309 await self._add_vca_relations(
8310 logging_text=logging_text,
8313 vca_index=vca_index,
8317 # if SSH access is required, then get execution environment SSH public
8318 # if native charm we have waited already to VM be UP
8319 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
8322 # self.logger.debug("get ssh key block")
8324 config_descriptor
, ("config-access", "ssh-access", "required")
8326 # self.logger.debug("ssh key needed")
8327 # Needed to inject a ssh key
8330 ("config-access", "ssh-access", "default-user"),
8332 step
= "Install configuration Software, getting public ssh key"
8333 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
8334 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
8337 step
= "Insert public key into VM user={} ssh_key={}".format(
8341 # self.logger.debug("no need to get ssh key")
8342 step
= "Waiting to VM being up and getting IP address"
8343 self
.logger
.debug(logging_text
+ step
)
8345 # n2vc_redesign STEP 5.1
8346 # wait for RO (ip-address) Insert pub_key into VM
8347 # IMPORTANT: We need do wait for RO to complete healing operation.
8348 await self
._wait
_heal
_ro
(nsr_id
, self
.timeout_ns_heal
)
8351 rw_mgmt_ip
= await self
.wait_kdu_up(
8352 logging_text
, nsr_id
, vnfr_id
, kdu_name
8355 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8365 rw_mgmt_ip
= None # This is for a NS configuration
8367 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
8369 # store rw_mgmt_ip in deploy params for later replacement
8370 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
8373 # get run-day1 operation parameter
8374 runDay1
= deploy_params
.get("run-day1", False)
8376 "Healing vnf={}, vdu={}, runDay1 ={}".format(vnfr_id
, vdu_id
, runDay1
)
8379 # n2vc_redesign STEP 6 Execute initial config primitive
8380 step
= "execute initial config primitive"
8382 # wait for dependent primitives execution (NS -> VNF -> VDU)
8383 if initial_config_primitive_list
:
8384 await self
._wait
_dependent
_n
2vc
(
8385 nsr_id
, vca_deployed_list
, vca_index
8388 # stage, in function of element type: vdu, kdu, vnf or ns
8389 my_vca
= vca_deployed_list
[vca_index
]
8390 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
8392 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
8393 elif my_vca
.get("member-vnf-index"):
8395 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
8398 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
8400 self
._write
_configuration
_status
(
8401 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
8404 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
8406 check_if_terminated_needed
= True
8407 for initial_config_primitive
in initial_config_primitive_list
:
8408 # adding information on the vca_deployed if it is a NS execution environment
8409 if not vca_deployed
["member-vnf-index"]:
8410 deploy_params
["ns_config_info"] = json
.dumps(
8411 self
._get
_ns
_config
_info
(nsr_id
)
8413 # TODO check if already done
8414 primitive_params_
= self
._map
_primitive
_params
(
8415 initial_config_primitive
, {}, deploy_params
8418 step
= "execute primitive '{}' params '{}'".format(
8419 initial_config_primitive
["name"], primitive_params_
8421 self
.logger
.debug(logging_text
+ step
)
8422 await self
.vca_map
[vca_type
].exec_primitive(
8424 primitive_name
=initial_config_primitive
["name"],
8425 params_dict
=primitive_params_
,
8430 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
8431 if check_if_terminated_needed
:
8432 if config_descriptor
.get("terminate-config-primitive"):
8436 {db_update_entry
+ "needed_terminate": True},
8438 check_if_terminated_needed
= False
8440 # TODO register in database that primitive is done
8442 # STEP 7 Configure metrics
8443 # Not sure if this need to be done when healing
8445 if vca_type == "helm" or vca_type == "helm-v3":
8446 prometheus_jobs = await self.extract_prometheus_scrape_jobs(
8448 artifact_path=artifact_path,
8449 ee_config_descriptor=ee_config_descriptor,
8452 target_ip=rw_mgmt_ip,
8458 {db_update_entry + "prometheus_jobs": prometheus_jobs},
8461 for job in prometheus_jobs:
8464 {"job_name": job["job_name"]},
8467 fail_on_empty=False,
8471 step
= "instantiated at VCA"
8472 self
.logger
.debug(logging_text
+ step
)
8474 self
._write
_configuration
_status
(
8475 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
8478 except Exception as e
: # TODO not use Exception but N2VC exception
8479 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
8481 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
8484 "Exception while {} : {}".format(step
, e
), exc_info
=True
8486 self
._write
_configuration
_status
(
8487 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
8489 raise LcmException("{} {}".format(step
, e
)) from e
8491 async def _wait_heal_ro(
8497 while time() <= start_time
+ timeout
:
8498 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
8499 operational_status_ro
= db_nsr
["_admin"]["deployed"]["RO"][
8500 "operational-status"
8502 self
.logger
.debug("Wait Heal RO > {}".format(operational_status_ro
))
8503 if operational_status_ro
!= "healing":
8505 await asyncio
.sleep(15, loop
=self
.loop
)
8506 else: # timeout_ns_deploy
8507 raise NgRoException("Timeout waiting ns to deploy")
8509 async def vertical_scale(self
, nsr_id
, nslcmop_id
):
8511 Vertical Scale the VDUs in a NS
8513 :param: nsr_id: NS Instance ID
8514 :param: nslcmop_id: nslcmop ID of migrate
8517 # Try to lock HA task here
8518 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
8519 if not task_is_locked_by_me
:
8521 logging_text
= "Task ns={} vertical scale ".format(nsr_id
)
8522 self
.logger
.debug(logging_text
+ "Enter")
8523 # get all needed from database
8525 db_nslcmop_update
= {}
8526 nslcmop_operation_state
= None
8530 # in case of error, indicates what part of scale was failed to put nsr at error status
8531 start_deploy
= time()
8534 # wait for any previous tasks in process
8535 step
= "Waiting for previous operations to terminate"
8536 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
8538 self
._write
_ns
_status
(
8541 current_operation
="VerticalScale",
8542 current_operation_id
=nslcmop_id
,
8544 step
= "Getting nslcmop from database"
8546 step
+ " after having waited for previous tasks to be completed"
8548 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
8549 operationParams
= db_nslcmop
.get("operationParams")
8551 target
.update(operationParams
)
8552 desc
= await self
.RO
.vertical_scale(nsr_id
, target
)
8553 self
.logger
.debug("RO return > {}".format(desc
))
8554 action_id
= desc
["action_id"]
8555 await self
._wait
_ng
_ro
(
8560 self
.timeout_verticalscale
,
8561 operation
="verticalscale",
8563 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
8564 self
.logger
.error("Exit Exception {}".format(e
))
8566 except asyncio
.CancelledError
:
8567 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
8568 exc
= "Operation was cancelled"
8569 except Exception as e
:
8570 exc
= traceback
.format_exc()
8571 self
.logger
.critical(
8572 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
8575 self
._write
_ns
_status
(
8578 current_operation
="IDLE",
8579 current_operation_id
=None,
8582 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
8583 nslcmop_operation_state
= "FAILED"
8585 nslcmop_operation_state
= "COMPLETED"
8586 db_nslcmop_update
["detailed-status"] = "Done"
8587 db_nsr_update
["detailed-status"] = "Done"
8589 self
._write
_op
_status
(
8593 operation_state
=nslcmop_operation_state
,
8594 other_update
=db_nslcmop_update
,
8596 if nslcmop_operation_state
:
8600 "nslcmop_id": nslcmop_id
,
8601 "operationState": nslcmop_operation_state
,
8603 await self
.msg
.aiowrite("ns", "verticalscaled", msg
, loop
=self
.loop
)
8604 except Exception as e
:
8606 logging_text
+ "kafka_write notification Exception {}".format(e
)
8608 self
.logger
.debug(logging_text
+ "Exit")
8609 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_verticalscale")