1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
22 import logging
.handlers
25 from jinja2
import Environment
, TemplateError
, TemplateNotFound
, StrictUndefined
, UndefinedError
27 from osm_lcm
import ROclient
28 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
29 from osm_lcm
.lcm_utils
import LcmException
, LcmExceptionNoMgmtIP
, LcmBase
, deep_get
, get_iterable
, populate_dict
30 from osm_lcm
.data_utils
.nsd
import get_vnf_profiles
31 from osm_lcm
.data_utils
.vnfd
import get_vdu_list
, get_vdu_profile
, \
32 get_ee_sorted_initial_config_primitive_list
, get_ee_sorted_terminate_config_primitive_list
, \
33 get_kdu_list
, get_virtual_link_profiles
, get_vdu
, get_configuration
, \
34 get_vdu_index
, get_scaling_aspect
, get_number_of_instances
, get_juju_ee_ref
35 from osm_lcm
.data_utils
.list_utils
import find_in_list
36 from osm_lcm
.data_utils
.vnfr
import get_osm_params
, get_vdur_index
37 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
38 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
39 from n2vc
.k8s_helm_conn
import K8sHelmConnector
40 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
41 from n2vc
.k8s_juju_conn
import K8sJujuConnector
43 from osm_common
.dbbase
import DbException
44 from osm_common
.fsbase
import FsException
46 from osm_lcm
.data_utils
.database
.database
import Database
47 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
49 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
50 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
52 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
54 from copy
import copy
, deepcopy
56 from uuid
import uuid4
58 from random
import randint
60 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
64 timeout_vca_on_error
= 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
65 timeout_ns_deploy
= 2 * 3600 # default global timeout for deployment a ns
66 timeout_ns_terminate
= 1800 # default global timeout for un deployment a ns
67 timeout_charm_delete
= 10 * 60
68 timeout_primitive
= 30 * 60 # timeout for primitive execution
69 timeout_progress_primitive
= 10 * 60 # timeout for some progress in a primitive execution
71 SUBOPERATION_STATUS_NOT_FOUND
= -1
72 SUBOPERATION_STATUS_NEW
= -2
73 SUBOPERATION_STATUS_SKIP
= -3
74 task_name_deploy_vca
= "Deploying VCA"
76 def __init__(self
, msg
, lcm_tasks
, config
, loop
, prometheus
=None):
78 Init, Connect to database, filesystem storage, and messaging
79 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
84 logger
=logging
.getLogger('lcm.ns')
87 self
.db
= Database().instance
.db
88 self
.fs
= Filesystem().instance
.fs
90 self
.lcm_tasks
= lcm_tasks
91 self
.timeout
= config
["timeout"]
92 self
.ro_config
= config
["ro_config"]
93 self
.ng_ro
= config
["ro_config"].get("ng")
94 self
.vca_config
= config
["VCA"].copy()
96 # create N2VC connector
97 self
.n2vc
= N2VCJujuConnector(
100 on_update_db
=self
._on
_update
_n
2vc
_db
,
105 self
.conn_helm_ee
= LCMHelmConn(
108 vca_config
=self
.vca_config
,
109 on_update_db
=self
._on
_update
_n
2vc
_db
112 self
.k8sclusterhelm2
= K8sHelmConnector(
113 kubectl_command
=self
.vca_config
.get("kubectlpath"),
114 helm_command
=self
.vca_config
.get("helmpath"),
121 self
.k8sclusterhelm3
= K8sHelm3Connector(
122 kubectl_command
=self
.vca_config
.get("kubectlpath"),
123 helm_command
=self
.vca_config
.get("helm3path"),
130 self
.k8sclusterjuju
= K8sJujuConnector(
131 kubectl_command
=self
.vca_config
.get("kubectlpath"),
132 juju_command
=self
.vca_config
.get("jujupath"),
135 on_update_db
=self
._on
_update
_k
8s
_db
,
140 self
.k8scluster_map
= {
141 "helm-chart": self
.k8sclusterhelm2
,
142 "helm-chart-v3": self
.k8sclusterhelm3
,
143 "chart": self
.k8sclusterhelm3
,
144 "juju-bundle": self
.k8sclusterjuju
,
145 "juju": self
.k8sclusterjuju
,
149 "lxc_proxy_charm": self
.n2vc
,
150 "native_charm": self
.n2vc
,
151 "k8s_proxy_charm": self
.n2vc
,
152 "helm": self
.conn_helm_ee
,
153 "helm-v3": self
.conn_helm_ee
156 self
.prometheus
= prometheus
159 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
)
162 def increment_ip_mac(ip_mac
, vm_index
=1):
163 if not isinstance(ip_mac
, str):
166 # try with ipv4 look for last dot
167 i
= ip_mac
.rfind(".")
170 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
171 # try with ipv6 or mac look for last colon. Operate in hex
172 i
= ip_mac
.rfind(":")
175 # format in hex, len can be 2 for mac or 4 for ipv6
176 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
)
181 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
183 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
186 # TODO filter RO descriptor fields...
190 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
191 db_dict
['deploymentStatus'] = ro_descriptor
192 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
194 except Exception as e
:
195 self
.logger
.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id
, e
))
197 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
199 # remove last dot from path (if exists)
200 if path
.endswith('.'):
203 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
204 # .format(table, filter, path, updated_data))
207 nsr_id
= filter.get('_id')
209 # read ns record from database
210 nsr
= self
.db
.get_one(table
='nsrs', q_filter
=filter)
211 current_ns_status
= nsr
.get('nsState')
213 # get vca status for NS
214 status_dict
= await self
.n2vc
.get_status(namespace
='.' + nsr_id
, yaml_format
=False, vca_id
=vca_id
)
218 db_dict
['vcaStatus'] = status_dict
219 await self
.n2vc
.update_vca_status(db_dict
['vcaStatus'], vca_id
=vca_id
)
221 # update configurationStatus for this VCA
223 vca_index
= int(path
[path
.rfind(".")+1:])
225 vca_list
= deep_get(target_dict
=nsr
, key_list
=('_admin', 'deployed', 'VCA'))
226 vca_status
= vca_list
[vca_index
].get('status')
228 configuration_status_list
= nsr
.get('configurationStatus')
229 config_status
= configuration_status_list
[vca_index
].get('status')
231 if config_status
== 'BROKEN' and vca_status
!= 'failed':
232 db_dict
['configurationStatus'][vca_index
] = 'READY'
233 elif config_status
!= 'BROKEN' and vca_status
== 'failed':
234 db_dict
['configurationStatus'][vca_index
] = 'BROKEN'
235 except Exception as e
:
236 # not update configurationStatus
237 self
.logger
.debug('Error updating vca_index (ignore): {}'.format(e
))
239 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
240 # if nsState = 'DEGRADED' check if all is OK
242 if current_ns_status
in ('READY', 'DEGRADED'):
243 error_description
= ''
245 if status_dict
.get('machines'):
246 for machine_id
in status_dict
.get('machines'):
247 machine
= status_dict
.get('machines').get(machine_id
)
248 # check machine agent-status
249 if machine
.get('agent-status'):
250 s
= machine
.get('agent-status').get('status')
253 error_description
+= 'machine {} agent-status={} ; '.format(machine_id
, s
)
254 # check machine instance status
255 if machine
.get('instance-status'):
256 s
= machine
.get('instance-status').get('status')
259 error_description
+= 'machine {} instance-status={} ; '.format(machine_id
, s
)
261 if status_dict
.get('applications'):
262 for app_id
in status_dict
.get('applications'):
263 app
= status_dict
.get('applications').get(app_id
)
264 # check application status
265 if app
.get('status'):
266 s
= app
.get('status').get('status')
269 error_description
+= 'application {} status={} ; '.format(app_id
, s
)
271 if error_description
:
272 db_dict
['errorDescription'] = error_description
273 if current_ns_status
== 'READY' and is_degraded
:
274 db_dict
['nsState'] = 'DEGRADED'
275 if current_ns_status
== 'DEGRADED' and not is_degraded
:
276 db_dict
['nsState'] = 'READY'
279 self
.update_db_2("nsrs", nsr_id
, db_dict
)
281 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
283 except Exception as e
:
284 self
.logger
.warn('Error updating NS state for ns={}: {}'.format(nsr_id
, e
))
286 async def _on_update_k8s_db(self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None):
288 Updating vca status in NSR record
289 :param cluster_uuid: UUID of a k8s cluster
290 :param kdu_instance: The unique name of the KDU instance
291 :param filter: To get nsr_id
295 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
296 # .format(cluster_uuid, kdu_instance, filter))
299 nsr_id
= filter.get('_id')
301 # get vca status for NS
302 vca_status
= await self
.k8sclusterjuju
.status_kdu(
305 complete_status
=True,
311 db_dict
['vcaStatus'] = {nsr_id
: vca_status
}
313 await self
.k8sclusterjuju
.update_vca_status(
314 db_dict
['vcaStatus'],
320 self
.update_db_2("nsrs", nsr_id
, db_dict
)
322 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
324 except Exception as e
:
325 self
.logger
.warn('Error updating NS state for ns={}: {}'.format(nsr_id
, e
))
328 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
330 env
= Environment(undefined
=StrictUndefined
)
331 template
= env
.from_string(cloud_init_text
)
332 return template
.render(additional_params
or {})
333 except UndefinedError
as e
:
334 raise LcmException("Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
335 "file, must be provided in the instantiation parameters inside the "
336 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
))
337 except (TemplateError
, TemplateNotFound
) as e
:
338 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
339 format(vnfd_id
, vdu_id
, e
))
341 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
342 cloud_init_content
= cloud_init_file
= None
344 if vdu
.get("cloud-init-file"):
345 base_folder
= vnfd
["_admin"]["storage"]
346 cloud_init_file
= "{}/{}/cloud_init/{}".format(base_folder
["folder"], base_folder
["pkg-dir"],
347 vdu
["cloud-init-file"])
348 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
349 cloud_init_content
= ci_file
.read()
350 elif vdu
.get("cloud-init"):
351 cloud_init_content
= vdu
["cloud-init"]
353 return cloud_init_content
354 except FsException
as e
:
355 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
356 format(vnfd
["id"], vdu
["id"], cloud_init_file
, e
))
358 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
359 vdur
= next(vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"])
360 additional_params
= vdur
.get("additionalParams")
361 return parse_yaml_strings(additional_params
)
363 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
365 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
366 :param vnfd: input vnfd
367 :param new_id: overrides vnf id if provided
368 :param additionalParams: Instantiation params for VNFs provided
369 :param nsrId: Id of the NSR
370 :return: copy of vnfd
372 vnfd_RO
= deepcopy(vnfd
)
373 # remove unused by RO configuration, monitoring, scaling and internal keys
374 vnfd_RO
.pop("_id", None)
375 vnfd_RO
.pop("_admin", None)
376 vnfd_RO
.pop("monitoring-param", None)
377 vnfd_RO
.pop("scaling-group-descriptor", None)
378 vnfd_RO
.pop("kdu", None)
379 vnfd_RO
.pop("k8s-cluster", None)
381 vnfd_RO
["id"] = new_id
383 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
384 for vdu
in get_iterable(vnfd_RO
, "vdu"):
385 vdu
.pop("cloud-init-file", None)
386 vdu
.pop("cloud-init", None)
390 def ip_profile_2_RO(ip_profile
):
391 RO_ip_profile
= deepcopy(ip_profile
)
392 if "dns-server" in RO_ip_profile
:
393 if isinstance(RO_ip_profile
["dns-server"], list):
394 RO_ip_profile
["dns-address"] = []
395 for ds
in RO_ip_profile
.pop("dns-server"):
396 RO_ip_profile
["dns-address"].append(ds
['address'])
398 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
399 if RO_ip_profile
.get("ip-version") == "ipv4":
400 RO_ip_profile
["ip-version"] = "IPv4"
401 if RO_ip_profile
.get("ip-version") == "ipv6":
402 RO_ip_profile
["ip-version"] = "IPv6"
403 if "dhcp-params" in RO_ip_profile
:
404 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
407 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
408 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
409 if db_vim
["_admin"]["operationalState"] != "ENABLED":
410 raise LcmException("VIM={} is not available. operationalState={}".format(
411 vim_account
, db_vim
["_admin"]["operationalState"]))
412 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
415 def get_ro_wim_id_for_wim_account(self
, wim_account
):
416 if isinstance(wim_account
, str):
417 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
418 if db_wim
["_admin"]["operationalState"] != "ENABLED":
419 raise LcmException("WIM={} is not available. operationalState={}".format(
420 wim_account
, db_wim
["_admin"]["operationalState"]))
421 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
426 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
428 db_vdu_push_list
= []
429 db_update
= {"_admin.modified": time()}
431 for vdu_id
, vdu_count
in vdu_create
.items():
432 vdur
= next((vdur
for vdur
in reversed(db_vnfr
["vdur"]) if vdur
["vdu-id-ref"] == vdu_id
), None)
434 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".
437 for count
in range(vdu_count
):
438 vdur_copy
= deepcopy(vdur
)
439 vdur_copy
["status"] = "BUILD"
440 vdur_copy
["status-detailed"] = None
441 vdur_copy
["ip-address"]: None
442 vdur_copy
["_id"] = str(uuid4())
443 vdur_copy
["count-index"] += count
+ 1
444 vdur_copy
["id"] = "{}-{}".format(vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"])
445 vdur_copy
.pop("vim_info", None)
446 for iface
in vdur_copy
["interfaces"]:
447 if iface
.get("fixed-ip"):
448 iface
["ip-address"] = self
.increment_ip_mac(iface
["ip-address"], count
+1)
450 iface
.pop("ip-address", None)
451 if iface
.get("fixed-mac"):
452 iface
["mac-address"] = self
.increment_ip_mac(iface
["mac-address"], count
+1)
454 iface
.pop("mac-address", None)
455 iface
.pop("mgmt_vnf", None) # only first vdu can be managment of vnf
456 db_vdu_push_list
.append(vdur_copy
)
457 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
459 for vdu_id
, vdu_count
in vdu_delete
.items():
461 indexes_to_delete
= [iv
[0] for iv
in enumerate(db_vnfr
["vdur"]) if iv
[1]["vdu-id-ref"] == vdu_id
]
462 db_update
.update({"vdur.{}.status".format(i
): "DELETING" for i
in indexes_to_delete
[-vdu_count
:]})
464 # it must be deleted one by one because common.db does not allow otherwise
465 vdus_to_delete
= [v
for v
in reversed(db_vnfr
["vdur"]) if v
["vdu-id-ref"] == vdu_id
]
466 for vdu
in vdus_to_delete
[:vdu_count
]:
467 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, None, pull
={"vdur": {"_id": vdu
["_id"]}})
468 db_push
= {"vdur": db_vdu_push_list
} if db_vdu_push_list
else None
469 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
470 # modify passed dictionary db_vnfr
471 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
472 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
474 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
476 Updates database nsr with the RO info for the created vld
477 :param ns_update_nsr: dictionary to be filled with the updated info
478 :param db_nsr: content of db_nsr. This is also modified
479 :param nsr_desc_RO: nsr descriptor from RO
480 :return: Nothing, LcmException is raised on errors
483 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
484 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
485 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
487 vld
["vim-id"] = net_RO
.get("vim_net_id")
488 vld
["name"] = net_RO
.get("vim_name")
489 vld
["status"] = net_RO
.get("status")
490 vld
["status-detailed"] = net_RO
.get("error_msg")
491 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
494 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld
["id"]))
496 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
498 for db_vnfr
in db_vnfrs
.values():
499 vnfr_update
= {"status": "ERROR"}
500 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
501 if "status" not in vdur
:
502 vdur
["status"] = "ERROR"
503 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
505 vdur
["status-detailed"] = str(error_text
)
506 vnfr_update
["vdur.{}.status-detailed".format(vdu_index
)] = "ERROR"
507 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
508 except DbException
as e
:
509 self
.logger
.error("Cannot update vnf. {}".format(e
))
511 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
513 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
514 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
515 :param nsr_desc_RO: nsr descriptor from RO
516 :return: Nothing, LcmException is raised on errors
518 for vnf_index
, db_vnfr
in db_vnfrs
.items():
519 for vnf_RO
in nsr_desc_RO
["vnfs"]:
520 if vnf_RO
["member_vnf_index"] != vnf_index
:
523 if vnf_RO
.get("ip_address"):
524 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
["ip_address"].split(";")[0]
525 elif not db_vnfr
.get("ip-address"):
526 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
527 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index
))
529 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
530 vdur_RO_count_index
= 0
531 if vdur
.get("pdu-type"):
533 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
534 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
536 if vdur
["count-index"] != vdur_RO_count_index
:
537 vdur_RO_count_index
+= 1
539 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
540 if vdur_RO
.get("ip_address"):
541 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
543 vdur
["ip-address"] = None
544 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
545 vdur
["name"] = vdur_RO
.get("vim_name")
546 vdur
["status"] = vdur_RO
.get("status")
547 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
548 for ifacer
in get_iterable(vdur
, "interfaces"):
549 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
550 if ifacer
["name"] == interface_RO
.get("internal_name"):
551 ifacer
["ip-address"] = interface_RO
.get("ip_address")
552 ifacer
["mac-address"] = interface_RO
.get("mac_address")
555 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
557 .format(vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]))
558 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
561 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
562 "VIM info".format(vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]))
564 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
565 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
566 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
568 vld
["vim-id"] = net_RO
.get("vim_net_id")
569 vld
["name"] = net_RO
.get("vim_name")
570 vld
["status"] = net_RO
.get("status")
571 vld
["status-detailed"] = net_RO
.get("error_msg")
572 vnfr_update
["vld.{}".format(vld_index
)] = vld
575 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
576 vnf_index
, vld
["id"]))
578 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
582 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index
))
584 def _get_ns_config_info(self
, nsr_id
):
586 Generates a mapping between vnf,vdu elements and the N2VC id
587 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
588 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
589 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
590 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
592 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
593 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
595 ns_config_info
= {"osm-config-mapping": mapping
}
596 for vca
in vca_deployed_list
:
597 if not vca
["member-vnf-index"]:
599 if not vca
["vdu_id"]:
600 mapping
[vca
["member-vnf-index"]] = vca
["application"]
602 mapping
["{}.{}.{}".format(vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"])] =\
604 return ns_config_info
606 async def _instantiate_ng_ro(self
, logging_text
, nsr_id
, nsd
, db_nsr
, db_nslcmop
, db_vnfrs
, db_vnfds
,
607 n2vc_key_list
, stage
, start_deploy
, timeout_ns_deploy
):
611 def get_vim_account(vim_account_id
):
613 if vim_account_id
in db_vims
:
614 return db_vims
[vim_account_id
]
615 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
616 db_vims
[vim_account_id
] = db_vim
619 # modify target_vld info with instantiation parameters
620 def parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, target_sdn
):
621 if vld_params
.get("ip-profile"):
622 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
["ip-profile"]
623 if vld_params
.get("provider-network"):
624 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
["provider-network"]
625 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
626 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
["provider-network"]["sdn-ports"]
627 if vld_params
.get("wimAccountId"):
628 target_wim
= "wim:{}".format(vld_params
["wimAccountId"])
629 target_vld
["vim_info"][target_wim
] = {}
630 for param
in ("vim-network-name", "vim-network-id"):
631 if vld_params
.get(param
):
632 if isinstance(vld_params
[param
], dict):
633 for vim
, vim_net
in vld_params
[param
].items():
634 other_target_vim
= "vim:" + vim
635 populate_dict(target_vld
["vim_info"], (other_target_vim
, param
.replace("-", "_")), vim_net
)
636 else: # isinstance str
637 target_vld
["vim_info"][target_vim
][param
.replace("-", "_")] = vld_params
[param
]
638 if vld_params
.get("common_id"):
639 target_vld
["common_id"] = vld_params
.get("common_id")
641 nslcmop_id
= db_nslcmop
["_id"]
643 "name": db_nsr
["name"],
646 "image": deepcopy(db_nsr
["image"]),
647 "flavor": deepcopy(db_nsr
["flavor"]),
648 "action_id": nslcmop_id
,
649 "cloud_init_content": {},
651 for image
in target
["image"]:
652 image
["vim_info"] = {}
653 for flavor
in target
["flavor"]:
654 flavor
["vim_info"] = {}
656 if db_nslcmop
.get("lcmOperationType") != "instantiate":
657 # get parameters of instantiation:
658 db_nslcmop_instantiate
= self
.db
.get_list("nslcmops", {"nsInstanceId": db_nslcmop
["nsInstanceId"],
659 "lcmOperationType": "instantiate"})[-1]
660 ns_params
= db_nslcmop_instantiate
.get("operationParams")
662 ns_params
= db_nslcmop
.get("operationParams")
663 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
664 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
667 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
668 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
672 "mgmt-network": vld
.get("mgmt-network", False),
673 "type": vld
.get("type"),
676 "vim_network_name": vld
.get("vim-network-name"),
677 "vim_account_id": ns_params
["vimAccountId"]
681 # check if this network needs SDN assist
682 if vld
.get("pci-interfaces"):
683 db_vim
= get_vim_account(ns_params
["vimAccountId"])
684 sdnc_id
= db_vim
["config"].get("sdn-controller")
686 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
687 target_sdn
= "sdn:{}".format(sdnc_id
)
688 target_vld
["vim_info"][target_sdn
] = {
689 "sdn": True, "target_vim": target_vim
, "vlds": [sdn_vld
], "type": vld
.get("type")}
691 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
692 for nsd_vnf_profile
in nsd_vnf_profiles
:
693 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
694 if cp
["virtual-link-profile-id"] == vld
["id"]:
695 cp2target
["member_vnf:{}.{}".format(
696 cp
["constituent-cpd-id"][0]["constituent-base-element-id"],
697 cp
["constituent-cpd-id"][0]["constituent-cpd-id"]
698 )] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
700 # check at nsd descriptor, if there is an ip-profile
702 nsd_vlp
= find_in_list(
703 get_virtual_link_profiles(nsd
),
704 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"] == vld
["id"])
705 if nsd_vlp
and nsd_vlp
.get("virtual-link-protocol-data") and \
706 nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data"):
707 ip_profile_source_data
= nsd_vlp
["virtual-link-protocol-data"]["l3-protocol-data"]
708 ip_profile_dest_data
= {}
709 if "ip-version" in ip_profile_source_data
:
710 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
["ip-version"]
711 if "cidr" in ip_profile_source_data
:
712 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
["cidr"]
713 if "gateway-ip" in ip_profile_source_data
:
714 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
["gateway-ip"]
715 if "dhcp-enabled" in ip_profile_source_data
:
716 ip_profile_dest_data
["dhcp-params"] = {
717 "enabled": ip_profile_source_data
["dhcp-enabled"]
719 vld_params
["ip-profile"] = ip_profile_dest_data
721 # update vld_params with instantiation params
722 vld_instantiation_params
= find_in_list(get_iterable(ns_params
, "vld"),
723 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]))
724 if vld_instantiation_params
:
725 vld_params
.update(vld_instantiation_params
)
726 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
727 target
["ns"]["vld"].append(target_vld
)
729 for vnfr
in db_vnfrs
.values():
730 vnfd
= find_in_list(db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"])
731 vnf_params
= find_in_list(get_iterable(ns_params
, "vnf"),
732 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"])
733 target_vnf
= deepcopy(vnfr
)
734 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
735 for vld
in target_vnf
.get("vld", ()):
736 # check if connected to a ns.vld, to fill target'
737 vnf_cp
= find_in_list(vnfd
.get("int-virtual-link-desc", ()),
738 lambda cpd
: cpd
.get("id") == vld
["id"])
740 ns_cp
= "member_vnf:{}.{}".format(vnfr
["member-vnf-index-ref"], vnf_cp
["id"])
741 if cp2target
.get(ns_cp
):
742 vld
["target"] = cp2target
[ns_cp
]
744 vld
["vim_info"] = {target_vim
: {"vim_network_name": vld
.get("vim-network-name")}}
745 # check if this network needs SDN assist
747 if vld
.get("pci-interfaces"):
748 db_vim
= get_vim_account(vnfr
["vim-account-id"])
749 sdnc_id
= db_vim
["config"].get("sdn-controller")
751 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
752 target_sdn
= "sdn:{}".format(sdnc_id
)
753 vld
["vim_info"][target_sdn
] = {
754 "sdn": True, "target_vim": target_vim
, "vlds": [sdn_vld
], "type": vld
.get("type")}
756 # check at vnfd descriptor, if there is an ip-profile
758 vnfd_vlp
= find_in_list(
759 get_virtual_link_profiles(vnfd
),
760 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"]
762 if vnfd_vlp
and vnfd_vlp
.get("virtual-link-protocol-data") and \
763 vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data"):
764 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"]["l3-protocol-data"]
765 ip_profile_dest_data
= {}
766 if "ip-version" in ip_profile_source_data
:
767 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
["ip-version"]
768 if "cidr" in ip_profile_source_data
:
769 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
["cidr"]
770 if "gateway-ip" in ip_profile_source_data
:
771 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
["gateway-ip"]
772 if "dhcp-enabled" in ip_profile_source_data
:
773 ip_profile_dest_data
["dhcp-params"] = {
774 "enabled": ip_profile_source_data
["dhcp-enabled"]
777 vld_params
["ip-profile"] = ip_profile_dest_data
778 # update vld_params with instantiation params
780 vld_instantiation_params
= find_in_list(get_iterable(vnf_params
, "internal-vld"),
781 lambda i_vld
: i_vld
["name"] == vld
["id"])
782 if vld_instantiation_params
:
783 vld_params
.update(vld_instantiation_params
)
784 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
787 for vdur
in target_vnf
.get("vdur", ()):
788 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
789 continue # This vdu must not be created
790 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
792 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
795 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
796 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
797 if vdu_configuration
and vdu_configuration
.get("config-access") and \
798 vdu_configuration
.get("config-access").get("ssh-access"):
799 vdur
["ssh-keys"] = ssh_keys_all
800 vdur
["ssh-access-required"] = vdu_configuration
["config-access"]["ssh-access"]["required"]
801 elif vnf_configuration
and vnf_configuration
.get("config-access") and \
802 vnf_configuration
.get("config-access").get("ssh-access") and \
803 any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"]):
804 vdur
["ssh-keys"] = ssh_keys_all
805 vdur
["ssh-access-required"] = vnf_configuration
["config-access"]["ssh-access"]["required"]
806 elif ssh_keys_instantiation
and \
807 find_in_list(vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")):
808 vdur
["ssh-keys"] = ssh_keys_instantiation
810 self
.logger
.debug("NS > vdur > {}".format(vdur
))
812 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
814 if vdud
.get("cloud-init-file"):
815 vdur
["cloud-init"] = "{}:file:{}".format(vnfd
["_id"], vdud
.get("cloud-init-file"))
816 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
817 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
818 base_folder
= vnfd
["_admin"]["storage"]
819 cloud_init_file
= "{}/{}/cloud_init/{}".format(base_folder
["folder"], base_folder
["pkg-dir"],
820 vdud
.get("cloud-init-file"))
821 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
822 target
["cloud_init_content"][vdur
["cloud-init"]] = ci_file
.read()
823 elif vdud
.get("cloud-init"):
824 vdur
["cloud-init"] = "{}:vdu:{}".format(vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"]))
825 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
826 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
["cloud-init"]
827 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
828 deploy_params_vdu
= self
._format
_additional
_params
(vdur
.get("additionalParams") or {})
829 deploy_params_vdu
["OSM"] = get_osm_params(vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"])
830 vdur
["additionalParams"] = deploy_params_vdu
833 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
834 if target_vim
not in ns_flavor
["vim_info"]:
835 ns_flavor
["vim_info"][target_vim
] = {}
838 # in case alternative images are provided we must check if they should be applied
839 # for the vim_type, modify the vim_type taking into account
840 ns_image_id
= int(vdur
["ns-image-id"])
841 if vdur
.get("alt-image-ids"):
842 db_vim
= get_vim_account(vnfr
["vim-account-id"])
843 vim_type
= db_vim
["vim_type"]
844 for alt_image_id
in vdur
.get("alt-image-ids"):
845 ns_alt_image
= target
["image"][int(alt_image_id
)]
846 if vim_type
== ns_alt_image
.get("vim-type"):
847 # must use alternative image
848 self
.logger
.debug("use alternative image id: {}".format(alt_image_id
))
849 ns_image_id
= alt_image_id
850 vdur
["ns-image-id"] = ns_image_id
852 ns_image
= target
["image"][int(ns_image_id
)]
853 if target_vim
not in ns_image
["vim_info"]:
854 ns_image
["vim_info"][target_vim
] = {}
856 vdur
["vim_info"] = {target_vim
: {}}
857 # instantiation parameters
859 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
861 vdur_list
.append(vdur
)
862 target_vnf
["vdur"] = vdur_list
863 target
["vnf"].append(target_vnf
)
865 desc
= await self
.RO
.deploy(nsr_id
, target
)
866 self
.logger
.debug("RO return > {}".format(desc
))
867 action_id
= desc
["action_id"]
868 await self
._wait
_ng
_ro
(nsr_id
, action_id
, nslcmop_id
, start_deploy
, timeout_ns_deploy
, stage
)
872 "_admin.deployed.RO.operational-status": "running",
873 "detailed-status": " ".join(stage
)
875 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
876 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
877 self
._write
_op
_status
(nslcmop_id
, stage
)
878 self
.logger
.debug(logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
))
881 async def _wait_ng_ro(self
, nsr_id
, action_id
, nslcmop_id
=None, start_time
=None, timeout
=600, stage
=None):
882 detailed_status_old
= None
884 start_time
= start_time
or time()
885 while time() <= start_time
+ timeout
:
886 desc_status
= await self
.RO
.status(nsr_id
, action_id
)
887 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
888 if desc_status
["status"] == "FAILED":
889 raise NgRoException(desc_status
["details"])
890 elif desc_status
["status"] == "BUILD":
892 stage
[2] = "VIM: ({})".format(desc_status
["details"])
893 elif desc_status
["status"] == "DONE":
895 stage
[2] = "Deployed at VIM"
898 assert False, "ROclient.check_ns_status returns unknown {}".format(desc_status
["status"])
899 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
900 detailed_status_old
= stage
[2]
901 db_nsr_update
["detailed-status"] = " ".join(stage
)
902 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
903 self
._write
_op
_status
(nslcmop_id
, stage
)
904 await asyncio
.sleep(15, loop
=self
.loop
)
905 else: # timeout_ns_deploy
906 raise NgRoException("Timeout waiting ns to deploy")
908 async def _terminate_ng_ro(self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
):
912 start_deploy
= time()
919 "action_id": nslcmop_id
921 desc
= await self
.RO
.deploy(nsr_id
, target
)
922 action_id
= desc
["action_id"]
923 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
924 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
925 self
.logger
.debug(logging_text
+ "ns terminate action at RO. action_id={}".format(action_id
))
928 delete_timeout
= 20 * 60 # 20 minutes
929 await self
._wait
_ng
_ro
(nsr_id
, action_id
, nslcmop_id
, start_deploy
, delete_timeout
, stage
)
931 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
932 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
934 await self
.RO
.delete(nsr_id
)
935 except Exception as e
:
936 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
937 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
938 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
939 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
940 self
.logger
.debug(logging_text
+ "RO_action_id={} already deleted".format(action_id
))
941 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
942 failed_detail
.append("delete conflict: {}".format(e
))
943 self
.logger
.debug(logging_text
+ "RO_action_id={} delete conflict: {}".format(action_id
, e
))
945 failed_detail
.append("delete error: {}".format(e
))
946 self
.logger
.error(logging_text
+ "RO_action_id={} delete error: {}".format(action_id
, e
))
949 stage
[2] = "Error deleting from VIM"
951 stage
[2] = "Deleted from VIM"
952 db_nsr_update
["detailed-status"] = " ".join(stage
)
953 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
954 self
._write
_op
_status
(nslcmop_id
, stage
)
957 raise LcmException("; ".join(failed_detail
))
960 async def instantiate_RO(self
, logging_text
, nsr_id
, nsd
, db_nsr
, db_nslcmop
, db_vnfrs
, db_vnfds
,
961 n2vc_key_list
, stage
):
964 :param logging_text: preffix text to use at logging
965 :param nsr_id: nsr identity
966 :param nsd: database content of ns descriptor
967 :param db_nsr: database content of ns record
968 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
970 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
971 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
972 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
973 :return: None or exception
976 start_deploy
= time()
977 ns_params
= db_nslcmop
.get("operationParams")
978 if ns_params
and ns_params
.get("timeout_ns_deploy"):
979 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
981 timeout_ns_deploy
= self
.timeout
.get("ns_deploy", self
.timeout_ns_deploy
)
983 # Check for and optionally request placement optimization. Database will be updated if placement activated
984 stage
[2] = "Waiting for Placement."
985 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
986 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
987 for vnfr
in db_vnfrs
.values():
988 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
991 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
993 return await self
._instantiate
_ng
_ro
(logging_text
, nsr_id
, nsd
, db_nsr
, db_nslcmop
, db_vnfrs
,
994 db_vnfds
, n2vc_key_list
, stage
, start_deploy
, timeout_ns_deploy
)
995 except Exception as e
:
996 stage
[2] = "ERROR deploying at VIM"
997 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
998 self
.logger
.error("Error deploying at VIM {}".format(e
),
999 exc_info
=not isinstance(e
, (ROclient
.ROClientException
, LcmException
, DbException
,
1003 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1005 Wait for kdu to be up, get ip address
1006 :param logging_text: prefix use for logging
1013 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1016 while nb_tries
< 360:
1017 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1018 kdur
= next((x
for x
in get_iterable(db_vnfr
, "kdur") if x
.get("kdu-name") == kdu_name
), None)
1020 raise LcmException("Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
))
1021 if kdur
.get("status"):
1022 if kdur
["status"] in ("READY", "ENABLED"):
1023 return kdur
.get("ip-address")
1025 raise LcmException("target KDU={} is in error state".format(kdu_name
))
1027 await asyncio
.sleep(10, loop
=self
.loop
)
1029 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1031 async def wait_vm_up_insert_key_ro(self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None):
1033 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1034 :param logging_text: prefix use for logging
1039 :param pub_key: public ssh key to inject, None to skip
1040 :param user: user to apply the public ssh key
1044 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1048 target_vdu_id
= None
1054 if ro_retries
>= 360: # 1 hour
1055 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
))
1057 await asyncio
.sleep(10, loop
=self
.loop
)
1060 if not target_vdu_id
:
1061 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1063 if not vdu_id
: # for the VNF case
1064 if db_vnfr
.get("status") == "ERROR":
1065 raise LcmException("Cannot inject ssh-key because target VNF is in error state")
1066 ip_address
= db_vnfr
.get("ip-address")
1069 vdur
= next((x
for x
in get_iterable(db_vnfr
, "vdur") if x
.get("ip-address") == ip_address
), None)
1071 vdur
= next((x
for x
in get_iterable(db_vnfr
, "vdur")
1072 if x
.get("vdu-id-ref") == vdu_id
and x
.get("count-index") == vdu_index
), None)
1074 if not vdur
and len(db_vnfr
.get("vdur", ())) == 1: # If only one, this should be the target vdu
1075 vdur
= db_vnfr
["vdur"][0]
1077 raise LcmException("Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(vnfr_id
, vdu_id
,
1079 # New generation RO stores information at "vim_info"
1082 if vdur
.get("vim_info"):
1083 target_vim
= next(t
for t
in vdur
["vim_info"]) # there should be only one key
1084 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1085 if vdur
.get("pdu-type") or vdur
.get("status") == "ACTIVE" or ng_ro_status
== "ACTIVE":
1086 ip_address
= vdur
.get("ip-address")
1089 target_vdu_id
= vdur
["vdu-id-ref"]
1090 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1091 raise LcmException("Cannot inject ssh-key because target VM is in error state")
1093 if not target_vdu_id
:
1096 # inject public key into machine
1097 if pub_key
and user
:
1098 self
.logger
.debug(logging_text
+ "Inserting RO key")
1099 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1100 if vdur
.get("pdu-type"):
1101 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1104 ro_vm_id
= "{}-{}".format(db_vnfr
["member-vnf-index-ref"], target_vdu_id
) # TODO add vdu_index
1106 target
= {"action": {"action": "inject_ssh_key", "key": pub_key
, "user": user
},
1107 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1109 desc
= await self
.RO
.deploy(nsr_id
, target
)
1110 action_id
= desc
["action_id"]
1111 await self
._wait
_ng
_ro
(nsr_id
, action_id
, timeout
=600)
1114 # wait until NS is deployed at RO
1116 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1117 ro_nsr_id
= deep_get(db_nsrs
, ("_admin", "deployed", "RO", "nsr_id"))
1120 result_dict
= await self
.RO
.create_action(
1122 item_id_name
=ro_nsr_id
,
1123 descriptor
={"add_public_key": pub_key
, "vms": [ro_vm_id
], "user": user
}
1125 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1126 if not result_dict
or not isinstance(result_dict
, dict):
1127 raise LcmException("Unknown response from RO when injecting key")
1128 for result
in result_dict
.values():
1129 if result
.get("vim_result") == 200:
1132 raise ROclient
.ROClientException("error injecting key: {}".format(
1133 result
.get("description")))
1135 except NgRoException
as e
:
1136 raise LcmException("Reaching max tries injecting key. Error: {}".format(e
))
1137 except ROclient
.ROClientException
as e
:
1139 self
.logger
.debug(logging_text
+ "error injecting key: {}. Retrying until {} seconds".
1143 raise LcmException("Reaching max tries injecting key. Error: {}".format(e
))
1149 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1151 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1153 my_vca
= vca_deployed_list
[vca_index
]
1154 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1155 # vdu or kdu: no dependencies
1159 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1160 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1161 configuration_status_list
= db_nsr
["configurationStatus"]
1162 for index
, vca_deployed
in enumerate(configuration_status_list
):
1163 if index
== vca_index
:
1166 if not my_vca
.get("member-vnf-index") or \
1167 (vca_deployed
.get("member-vnf-index") == my_vca
.get("member-vnf-index")):
1168 internal_status
= configuration_status_list
[index
].get("status")
1169 if internal_status
== 'READY':
1171 elif internal_status
== 'BROKEN':
1172 raise LcmException("Configuration aborted because dependent charm/s has failed")
1176 # no dependencies, return
1178 await asyncio
.sleep(10)
1181 raise LcmException("Configuration aborted because dependent charm/s timeout")
1183 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1185 deep_get(db_vnfr
, ("vca-id",)) or
1186 deep_get(db_nsr
, ("instantiate_params", "vcaId"))
1189 async def instantiate_N2VC(self
, logging_text
, vca_index
, nsi_id
, db_nsr
, db_vnfr
, vdu_id
, kdu_name
, vdu_index
,
1190 config_descriptor
, deploy_params
, base_folder
, nslcmop_id
, stage
, vca_type
, vca_name
,
1191 ee_config_descriptor
):
1192 nsr_id
= db_nsr
["_id"]
1193 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1194 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1195 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1196 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1198 'collection': 'nsrs',
1199 'filter': {'_id': nsr_id
},
1200 'path': db_update_entry
1206 element_under_configuration
= nsr_id
1210 vnfr_id
= db_vnfr
["_id"]
1211 osm_config
["osm"]["vnf_id"] = vnfr_id
1213 namespace
= "{nsi}.{ns}".format(
1214 nsi
=nsi_id
if nsi_id
else "",
1218 element_type
= 'VNF'
1219 element_under_configuration
= vnfr_id
1220 namespace
+= ".{}-{}".format(vnfr_id
, vdu_index
or 0)
1222 namespace
+= ".{}-{}".format(vdu_id
, vdu_index
or 0)
1223 element_type
= 'VDU'
1224 element_under_configuration
= "{}-{}".format(vdu_id
, vdu_index
or 0)
1225 osm_config
["osm"]["vdu_id"] = vdu_id
1227 namespace
+= ".{}.{}".format(kdu_name
, vdu_index
or 0)
1228 element_type
= 'KDU'
1229 element_under_configuration
= kdu_name
1230 osm_config
["osm"]["kdu_name"] = kdu_name
1233 artifact_path
= "{}/{}/{}/{}".format(
1234 base_folder
["folder"],
1235 base_folder
["pkg-dir"],
1236 "charms" if vca_type
in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm") else "helm-charts",
1240 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1242 # get initial_config_primitive_list that applies to this element
1243 initial_config_primitive_list
= config_descriptor
.get('initial-config-primitive')
1245 self
.logger
.debug("Initial config primitive list > {}".format(initial_config_primitive_list
))
1247 # add config if not present for NS charm
1248 ee_descriptor_id
= ee_config_descriptor
.get("id")
1249 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1250 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(initial_config_primitive_list
,
1251 vca_deployed
, ee_descriptor_id
)
1253 self
.logger
.debug("Initial config primitive list #2 > {}".format(initial_config_primitive_list
))
1254 # n2vc_redesign STEP 3.1
1255 # find old ee_id if exists
1256 ee_id
= vca_deployed
.get("ee_id")
1258 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1259 # create or register execution environment in VCA
1260 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1262 self
._write
_configuration
_status
(
1264 vca_index
=vca_index
,
1266 element_under_configuration
=element_under_configuration
,
1267 element_type
=element_type
1270 step
= "create execution environment"
1271 self
.logger
.debug(logging_text
+ step
)
1275 if vca_type
== "k8s_proxy_charm":
1276 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1277 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1:],
1278 namespace
=namespace
,
1279 artifact_path
=artifact_path
,
1283 elif vca_type
== "helm" or vca_type
== "helm-v3":
1284 ee_id
, credentials
= await self
.vca_map
[vca_type
].create_execution_environment(
1285 namespace
=namespace
,
1289 artifact_path
=artifact_path
,
1293 ee_id
, credentials
= await self
.vca_map
[vca_type
].create_execution_environment(
1294 namespace
=namespace
,
1300 elif vca_type
== "native_charm":
1301 step
= "Waiting to VM being up and getting IP address"
1302 self
.logger
.debug(logging_text
+ step
)
1303 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
,
1304 user
=None, pub_key
=None)
1305 credentials
= {"hostname": rw_mgmt_ip
}
1307 username
= deep_get(config_descriptor
, ("config-access", "ssh-access", "default-user"))
1308 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1309 # merged. Meanwhile let's get username from initial-config-primitive
1310 if not username
and initial_config_primitive_list
:
1311 for config_primitive
in initial_config_primitive_list
:
1312 for param
in config_primitive
.get("parameter", ()):
1313 if param
["name"] == "ssh-username":
1314 username
= param
["value"]
1317 raise LcmException("Cannot determine the username neither with 'initial-config-primitive' nor with "
1318 "'config-access.ssh-access.default-user'")
1319 credentials
["username"] = username
1320 # n2vc_redesign STEP 3.2
1322 self
._write
_configuration
_status
(
1324 vca_index
=vca_index
,
1325 status
='REGISTERING',
1326 element_under_configuration
=element_under_configuration
,
1327 element_type
=element_type
1330 step
= "register execution environment {}".format(credentials
)
1331 self
.logger
.debug(logging_text
+ step
)
1332 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1333 credentials
=credentials
,
1334 namespace
=namespace
,
1339 # for compatibility with MON/POL modules, the need model and application name at database
1340 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1341 ee_id_parts
= ee_id
.split('.')
1342 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1343 if len(ee_id_parts
) >= 2:
1344 model_name
= ee_id_parts
[0]
1345 application_name
= ee_id_parts
[1]
1346 db_nsr_update
[db_update_entry
+ "model"] = model_name
1347 db_nsr_update
[db_update_entry
+ "application"] = application_name
1349 # n2vc_redesign STEP 3.3
1350 step
= "Install configuration Software"
1352 self
._write
_configuration
_status
(
1354 vca_index
=vca_index
,
1355 status
='INSTALLING SW',
1356 element_under_configuration
=element_under_configuration
,
1357 element_type
=element_type
,
1358 other_update
=db_nsr_update
1361 # TODO check if already done
1362 self
.logger
.debug(logging_text
+ step
)
1364 if vca_type
== "native_charm":
1365 config_primitive
= next((p
for p
in initial_config_primitive_list
if p
["name"] == "config"), None)
1366 if config_primitive
:
1367 config
= self
._map
_primitive
_params
(
1373 if vca_type
== "lxc_proxy_charm":
1374 if element_type
== "NS":
1375 num_units
= db_nsr
.get("config-units") or 1
1376 elif element_type
== "VNF":
1377 num_units
= db_vnfr
.get("config-units") or 1
1378 elif element_type
== "VDU":
1379 for v
in db_vnfr
["vdur"]:
1380 if vdu_id
== v
["vdu-id-ref"]:
1381 num_units
= v
.get("config-units") or 1
1383 if vca_type
!= "k8s_proxy_charm":
1384 await self
.vca_map
[vca_type
].install_configuration_sw(
1386 artifact_path
=artifact_path
,
1389 num_units
=num_units
,
1393 # write in db flag of configuration_sw already installed
1394 self
.update_db_2("nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True})
1396 # add relations for this VCA (wait for other peers related with this VCA)
1397 await self
._add
_vca
_relations
(logging_text
=logging_text
, nsr_id
=nsr_id
,
1398 vca_index
=vca_index
, vca_id
=vca_id
, vca_type
=vca_type
)
1400 # if SSH access is required, then get execution environment SSH public
1401 # if native charm we have waited already to VM be UP
1402 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1405 # self.logger.debug("get ssh key block")
1406 if deep_get(config_descriptor
, ("config-access", "ssh-access", "required")):
1407 # self.logger.debug("ssh key needed")
1408 # Needed to inject a ssh key
1409 user
= deep_get(config_descriptor
, ("config-access", "ssh-access", "default-user"))
1410 step
= "Install configuration Software, getting public ssh key"
1411 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
1417 step
= "Insert public key into VM user={} ssh_key={}".format(user
, pub_key
)
1419 # self.logger.debug("no need to get ssh key")
1420 step
= "Waiting to VM being up and getting IP address"
1421 self
.logger
.debug(logging_text
+ step
)
1423 # n2vc_redesign STEP 5.1
1424 # wait for RO (ip-address) Insert pub_key into VM
1427 rw_mgmt_ip
= await self
.wait_kdu_up(logging_text
, nsr_id
, vnfr_id
, kdu_name
)
1429 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(logging_text
, nsr_id
, vnfr_id
, vdu_id
,
1430 vdu_index
, user
=user
, pub_key
=pub_key
)
1432 rw_mgmt_ip
= None # This is for a NS configuration
1434 self
.logger
.debug(logging_text
+ ' VM_ip_address={}'.format(rw_mgmt_ip
))
1436 # store rw_mgmt_ip in deploy params for later replacement
1437 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
1439 # n2vc_redesign STEP 6 Execute initial config primitive
1440 step
= 'execute initial config primitive'
1442 # wait for dependent primitives execution (NS -> VNF -> VDU)
1443 if initial_config_primitive_list
:
1444 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
1446 # stage, in function of element type: vdu, kdu, vnf or ns
1447 my_vca
= vca_deployed_list
[vca_index
]
1448 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1450 stage
[0] = 'Stage 3/5: running Day-1 primitives for VDU.'
1451 elif my_vca
.get("member-vnf-index"):
1453 stage
[0] = 'Stage 4/5: running Day-1 primitives for VNF.'
1456 stage
[0] = 'Stage 5/5: running Day-1 primitives for NS.'
1458 self
._write
_configuration
_status
(
1460 vca_index
=vca_index
,
1461 status
='EXECUTING PRIMITIVE'
1464 self
._write
_op
_status
(
1469 check_if_terminated_needed
= True
1470 for initial_config_primitive
in initial_config_primitive_list
:
1471 # adding information on the vca_deployed if it is a NS execution environment
1472 if not vca_deployed
["member-vnf-index"]:
1473 deploy_params
["ns_config_info"] = json
.dumps(self
._get
_ns
_config
_info
(nsr_id
))
1474 # TODO check if already done
1475 primitive_params_
= self
._map
_primitive
_params
(initial_config_primitive
, {}, deploy_params
)
1477 step
= "execute primitive '{}' params '{}'".format(initial_config_primitive
["name"], primitive_params_
)
1478 self
.logger
.debug(logging_text
+ step
)
1479 await self
.vca_map
[vca_type
].exec_primitive(
1481 primitive_name
=initial_config_primitive
["name"],
1482 params_dict
=primitive_params_
,
1486 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
1487 if check_if_terminated_needed
:
1488 if config_descriptor
.get('terminate-config-primitive'):
1489 self
.update_db_2("nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True})
1490 check_if_terminated_needed
= False
1492 # TODO register in database that primitive is done
1494 # STEP 7 Configure metrics
1495 if vca_type
== "helm" or vca_type
== "helm-v3":
1496 prometheus_jobs
= await self
.add_prometheus_metrics(
1498 artifact_path
=artifact_path
,
1499 ee_config_descriptor
=ee_config_descriptor
,
1502 target_ip
=rw_mgmt_ip
,
1505 self
.update_db_2("nsrs", nsr_id
, {db_update_entry
+ "prometheus_jobs": prometheus_jobs
})
1507 step
= "instantiated at VCA"
1508 self
.logger
.debug(logging_text
+ step
)
1510 self
._write
_configuration
_status
(
1512 vca_index
=vca_index
,
1516 except Exception as e
: # TODO not use Exception but N2VC exception
1517 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1518 if not isinstance(e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)):
1519 self
.logger
.error("Exception while {} : {}".format(step
, e
), exc_info
=True)
1520 self
._write
_configuration
_status
(
1522 vca_index
=vca_index
,
1525 raise LcmException("{} {}".format(step
, e
)) from e
1527 def _write_ns_status(self
, nsr_id
: str, ns_state
: str, current_operation
: str, current_operation_id
: str,
1528 error_description
: str = None, error_detail
: str = None, other_update
: dict = None):
1530 Update db_nsr fields.
1533 :param current_operation:
1534 :param current_operation_id:
1535 :param error_description:
1536 :param error_detail:
1537 :param other_update: Other required changes at database if provided, will be cleared
1541 db_dict
= other_update
or {}
1542 db_dict
["_admin.nslcmop"] = current_operation_id
# for backward compatibility
1543 db_dict
["_admin.current-operation"] = current_operation_id
1544 db_dict
["_admin.operation-type"] = current_operation
if current_operation
!= "IDLE" else None
1545 db_dict
["currentOperation"] = current_operation
1546 db_dict
["currentOperationID"] = current_operation_id
1547 db_dict
["errorDescription"] = error_description
1548 db_dict
["errorDetail"] = error_detail
1551 db_dict
["nsState"] = ns_state
1552 self
.update_db_2("nsrs", nsr_id
, db_dict
)
1553 except DbException
as e
:
1554 self
.logger
.warn('Error writing NS status, ns={}: {}'.format(nsr_id
, e
))
1556 def _write_op_status(self
, op_id
: str, stage
: list = None, error_message
: str = None, queuePosition
: int = 0,
1557 operation_state
: str = None, other_update
: dict = None):
1559 db_dict
= other_update
or {}
1560 db_dict
['queuePosition'] = queuePosition
1561 if isinstance(stage
, list):
1562 db_dict
['stage'] = stage
[0]
1563 db_dict
['detailed-status'] = " ".join(stage
)
1564 elif stage
is not None:
1565 db_dict
['stage'] = str(stage
)
1567 if error_message
is not None:
1568 db_dict
['errorMessage'] = error_message
1569 if operation_state
is not None:
1570 db_dict
['operationState'] = operation_state
1571 db_dict
["statusEnteredTime"] = time()
1572 self
.update_db_2("nslcmops", op_id
, db_dict
)
1573 except DbException
as e
:
1574 self
.logger
.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id
, e
))
1576 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
1578 nsr_id
= db_nsr
["_id"]
1579 # configurationStatus
1580 config_status
= db_nsr
.get('configurationStatus')
1582 db_nsr_update
= {"configurationStatus.{}.status".format(index
): status
for index
, v
in
1583 enumerate(config_status
) if v
}
1585 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1587 except DbException
as e
:
1588 self
.logger
.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id
, e
))
1590 def _write_configuration_status(self
, nsr_id
: str, vca_index
: int, status
: str = None,
1591 element_under_configuration
: str = None, element_type
: str = None,
1592 other_update
: dict = None):
1594 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
1595 # .format(vca_index, status))
1598 db_path
= 'configurationStatus.{}.'.format(vca_index
)
1599 db_dict
= other_update
or {}
1601 db_dict
[db_path
+ 'status'] = status
1602 if element_under_configuration
:
1603 db_dict
[db_path
+ 'elementUnderConfiguration'] = element_under_configuration
1605 db_dict
[db_path
+ 'elementType'] = element_type
1606 self
.update_db_2("nsrs", nsr_id
, db_dict
)
1607 except DbException
as e
:
1608 self
.logger
.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
1609 .format(status
, nsr_id
, vca_index
, e
))
1611 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
1613 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
1614 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
1615 Database is used because the result can be obtained from a different LCM worker in case of HA.
1616 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
1617 :param db_nslcmop: database content of nslcmop
1618 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
1619 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
1620 computed 'vim-account-id'
1623 nslcmop_id
= db_nslcmop
['_id']
1624 placement_engine
= deep_get(db_nslcmop
, ('operationParams', 'placement-engine'))
1625 if placement_engine
== "PLA":
1626 self
.logger
.debug(logging_text
+ "Invoke and wait for placement optimization")
1627 await self
.msg
.aiowrite("pla", "get_placement", {'nslcmopId': nslcmop_id
}, loop
=self
.loop
)
1628 db_poll_interval
= 5
1629 wait
= db_poll_interval
* 10
1631 while not pla_result
and wait
>= 0:
1632 await asyncio
.sleep(db_poll_interval
)
1633 wait
-= db_poll_interval
1634 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
1635 pla_result
= deep_get(db_nslcmop
, ('_admin', 'pla'))
1638 raise LcmException("Placement timeout for nslcmopId={}".format(nslcmop_id
))
1640 for pla_vnf
in pla_result
['vnf']:
1641 vnfr
= db_vnfrs
.get(pla_vnf
['member-vnf-index'])
1642 if not pla_vnf
.get('vimAccountId') or not vnfr
:
1645 self
.db
.set_one("vnfrs", {"_id": vnfr
["_id"]}, {"vim-account-id": pla_vnf
['vimAccountId']})
1647 vnfr
["vim-account-id"] = pla_vnf
['vimAccountId']
1650 def update_nsrs_with_pla_result(self
, params
):
1652 nslcmop_id
= deep_get(params
, ('placement', 'nslcmopId'))
1653 self
.update_db_2("nslcmops", nslcmop_id
, {"_admin.pla": params
.get('placement')})
1654 except Exception as e
:
1655 self
.logger
.warn('Update failed for nslcmop_id={}:{}'.format(nslcmop_id
, e
))
1657 async def instantiate(self
, nsr_id
, nslcmop_id
):
1660 :param nsr_id: ns instance to deploy
1661 :param nslcmop_id: operation to run
1665 # Try to lock HA task here
1666 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
1667 if not task_is_locked_by_me
:
1668 self
.logger
.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id
))
1671 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
1672 self
.logger
.debug(logging_text
+ "Enter")
1674 # get all needed from database
1676 # database nsrs record
1679 # database nslcmops record
1682 # update operation on nsrs
1684 # update operation on nslcmops
1685 db_nslcmop_update
= {}
1687 nslcmop_operation_state
= None
1688 db_vnfrs
= {} # vnf's info indexed by member-index
1690 tasks_dict_info
= {} # from task to info text
1693 stage
= ['Stage 1/5: preparation of the environment.', "Waiting for previous operations to terminate.", ""]
1694 # ^ stage, step, VIM progress
1696 # wait for any previous tasks in process
1697 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
1699 stage
[1] = "Sync filesystem from database."
1700 self
.fs
.sync() # TODO, make use of partial sync, only for the needed packages
1702 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1703 stage
[1] = "Reading from database."
1704 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1705 db_nsr_update
["detailed-status"] = "creating"
1706 db_nsr_update
["operational-status"] = "init"
1707 self
._write
_ns
_status
(
1709 ns_state
="BUILDING",
1710 current_operation
="INSTANTIATING",
1711 current_operation_id
=nslcmop_id
,
1712 other_update
=db_nsr_update
1714 self
._write
_op
_status
(
1720 # read from db: operation
1721 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
1722 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
1723 ns_params
= db_nslcmop
.get("operationParams")
1724 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1725 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1727 timeout_ns_deploy
= self
.timeout
.get("ns_deploy", self
.timeout_ns_deploy
)
1730 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
1731 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1732 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
1733 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
1735 # nsr_name = db_nsr["name"] # TODO short-name??
1737 # read from db: vnf's of this ns
1738 stage
[1] = "Getting vnfrs from db."
1739 self
.logger
.debug(logging_text
+ stage
[1])
1740 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
1742 # read from db: vnfd's for every vnf
1743 db_vnfds
= [] # every vnfd data
1745 # for each vnf in ns, read vnfd
1746 for vnfr
in db_vnfrs_list
:
1747 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
1748 vnfd_id
= vnfr
["vnfd-id"]
1749 vnfd_ref
= vnfr
["vnfd-ref"]
1751 # if we haven't this vnfd, read it from db
1752 if vnfd_id
not in db_vnfds
:
1754 stage
[1] = "Getting vnfd={} id='{}' from db.".format(vnfd_id
, vnfd_ref
)
1755 self
.logger
.debug(logging_text
+ stage
[1])
1756 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
1759 db_vnfds
.append(vnfd
)
1761 # Get or generates the _admin.deployed.VCA list
1762 vca_deployed_list
= None
1763 if db_nsr
["_admin"].get("deployed"):
1764 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
1765 if vca_deployed_list
is None:
1766 vca_deployed_list
= []
1767 configuration_status_list
= []
1768 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
1769 db_nsr_update
["configurationStatus"] = configuration_status_list
1770 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
1771 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
1772 elif isinstance(vca_deployed_list
, dict):
1773 # maintain backward compatibility. Change a dict to list at database
1774 vca_deployed_list
= list(vca_deployed_list
.values())
1775 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
1776 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
1778 if not isinstance(deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list):
1779 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
1780 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
1782 # set state to INSTANTIATED. When instantiated NBI will not delete directly
1783 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
1784 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1785 self
.db
.set_list("vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"})
1787 # n2vc_redesign STEP 2 Deploy Network Scenario
1788 stage
[0] = 'Stage 2/5: deployment of KDUs, VMs and execution environments.'
1789 self
._write
_op
_status
(
1794 stage
[1] = "Deploying KDUs."
1795 # self.logger.debug(logging_text + "Before deploy_kdus")
1796 # Call to deploy_kdus in case exists the "vdu:kdu" param
1797 await self
.deploy_kdus(
1798 logging_text
=logging_text
,
1800 nslcmop_id
=nslcmop_id
,
1803 task_instantiation_info
=tasks_dict_info
,
1806 stage
[1] = "Getting VCA public key."
1807 # n2vc_redesign STEP 1 Get VCA public ssh-key
1808 # feature 1429. Add n2vc public key to needed VMs
1809 n2vc_key
= self
.n2vc
.get_public_key()
1810 n2vc_key_list
= [n2vc_key
]
1811 if self
.vca_config
.get("public_key"):
1812 n2vc_key_list
.append(self
.vca_config
["public_key"])
1814 stage
[1] = "Deploying NS at VIM."
1815 task_ro
= asyncio
.ensure_future(
1816 self
.instantiate_RO(
1817 logging_text
=logging_text
,
1821 db_nslcmop
=db_nslcmop
,
1824 n2vc_key_list
=n2vc_key_list
,
1828 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
1829 tasks_dict_info
[task_ro
] = "Deploying at VIM"
1831 # n2vc_redesign STEP 3 to 6 Deploy N2VC
1832 stage
[1] = "Deploying Execution Environments."
1833 self
.logger
.debug(logging_text
+ stage
[1])
1835 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
1836 for vnf_profile
in get_vnf_profiles(nsd
):
1837 vnfd_id
= vnf_profile
["vnfd-id"]
1838 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
1839 member_vnf_index
= str(vnf_profile
["id"])
1840 db_vnfr
= db_vnfrs
[member_vnf_index
]
1841 base_folder
= vnfd
["_admin"]["storage"]
1847 # Get additional parameters
1848 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
1849 if db_vnfr
.get("additionalParamsForVnf"):
1850 deploy_params
.update(parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy()))
1852 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
1853 if descriptor_config
:
1855 logging_text
=logging_text
+ "member_vnf_index={} ".format(member_vnf_index
),
1858 nslcmop_id
=nslcmop_id
,
1864 member_vnf_index
=member_vnf_index
,
1865 vdu_index
=vdu_index
,
1867 deploy_params
=deploy_params
,
1868 descriptor_config
=descriptor_config
,
1869 base_folder
=base_folder
,
1870 task_instantiation_info
=tasks_dict_info
,
1874 # Deploy charms for each VDU that supports one.
1875 for vdud
in get_vdu_list(vnfd
):
1877 descriptor_config
= get_configuration(vnfd
, vdu_id
)
1878 vdur
= find_in_list(db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
)
1880 if vdur
.get("additionalParams"):
1881 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
1883 deploy_params_vdu
= deploy_params
1884 deploy_params_vdu
["OSM"] = get_osm_params(db_vnfr
, vdu_id
, vdu_count_index
=0)
1885 vdud_count
= get_vdu_profile(vnfd
, vdu_id
).get("max-number-of-instances", 1)
1887 self
.logger
.debug("VDUD > {}".format(vdud
))
1888 self
.logger
.debug("Descriptor config > {}".format(descriptor_config
))
1889 if descriptor_config
:
1892 for vdu_index
in range(vdud_count
):
1893 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
1895 logging_text
=logging_text
+ "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
1896 member_vnf_index
, vdu_id
, vdu_index
),
1899 nslcmop_id
=nslcmop_id
,
1905 member_vnf_index
=member_vnf_index
,
1906 vdu_index
=vdu_index
,
1908 deploy_params
=deploy_params_vdu
,
1909 descriptor_config
=descriptor_config
,
1910 base_folder
=base_folder
,
1911 task_instantiation_info
=tasks_dict_info
,
1914 for kdud
in get_kdu_list(vnfd
):
1915 kdu_name
= kdud
["name"]
1916 descriptor_config
= get_configuration(vnfd
, kdu_name
)
1917 if descriptor_config
:
1921 kdur
= next(x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
)
1922 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
1923 if kdur
.get("additionalParams"):
1924 deploy_params_kdu
= parse_yaml_strings(kdur
["additionalParams"])
1927 logging_text
=logging_text
,
1930 nslcmop_id
=nslcmop_id
,
1936 member_vnf_index
=member_vnf_index
,
1937 vdu_index
=vdu_index
,
1939 deploy_params
=deploy_params_kdu
,
1940 descriptor_config
=descriptor_config
,
1941 base_folder
=base_folder
,
1942 task_instantiation_info
=tasks_dict_info
,
1946 # Check if this NS has a charm configuration
1947 descriptor_config
= nsd
.get("ns-configuration")
1948 if descriptor_config
and descriptor_config
.get("juju"):
1951 member_vnf_index
= None
1957 # Get additional parameters
1958 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
1959 if db_nsr
.get("additionalParamsForNs"):
1960 deploy_params
.update(parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy()))
1961 base_folder
= nsd
["_admin"]["storage"]
1963 logging_text
=logging_text
,
1966 nslcmop_id
=nslcmop_id
,
1972 member_vnf_index
=member_vnf_index
,
1973 vdu_index
=vdu_index
,
1975 deploy_params
=deploy_params
,
1976 descriptor_config
=descriptor_config
,
1977 base_folder
=base_folder
,
1978 task_instantiation_info
=tasks_dict_info
,
1982 # rest of staff will be done at finally
1984 except (ROclient
.ROClientException
, DbException
, LcmException
, N2VCException
) as e
:
1985 self
.logger
.error(logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
))
1987 except asyncio
.CancelledError
:
1988 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(stage
[1]))
1989 exc
= "Operation was cancelled"
1990 except Exception as e
:
1991 exc
= traceback
.format_exc()
1992 self
.logger
.critical(logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
), exc_info
=True)
1995 error_list
.append(str(exc
))
1997 # wait for pending tasks
1999 stage
[1] = "Waiting for instantiate pending tasks."
2000 self
.logger
.debug(logging_text
+ stage
[1])
2001 error_list
+= await self
._wait
_for
_tasks
(logging_text
, tasks_dict_info
, timeout_ns_deploy
,
2002 stage
, nslcmop_id
, nsr_id
=nsr_id
)
2003 stage
[1] = stage
[2] = ""
2004 except asyncio
.CancelledError
:
2005 error_list
.append("Cancelled")
2006 # TODO cancel all tasks
2007 except Exception as exc
:
2008 error_list
.append(str(exc
))
2010 # update operation-status
2011 db_nsr_update
["operational-status"] = "running"
2012 # let's begin with VCA 'configured' status (later we can change it)
2013 db_nsr_update
["config-status"] = "configured"
2014 for task
, task_name
in tasks_dict_info
.items():
2015 if not task
.done() or task
.cancelled() or task
.exception():
2016 if task_name
.startswith(self
.task_name_deploy_vca
):
2017 # A N2VC task is pending
2018 db_nsr_update
["config-status"] = "failed"
2020 # RO or KDU task is pending
2021 db_nsr_update
["operational-status"] = "failed"
2023 # update status at database
2025 error_detail
= ". ".join(error_list
)
2026 self
.logger
.error(logging_text
+ error_detail
)
2027 error_description_nslcmop
= '{} Detail: {}'.format(stage
[0], error_detail
)
2028 error_description_nsr
= 'Operation: INSTANTIATING.{}, {}'.format(nslcmop_id
, stage
[0])
2030 db_nsr_update
["detailed-status"] = error_description_nsr
+ " Detail: " + error_detail
2031 db_nslcmop_update
["detailed-status"] = error_detail
2032 nslcmop_operation_state
= "FAILED"
2036 error_description_nsr
= error_description_nslcmop
= None
2038 db_nsr_update
["detailed-status"] = "Done"
2039 db_nslcmop_update
["detailed-status"] = "Done"
2040 nslcmop_operation_state
= "COMPLETED"
2043 self
._write
_ns
_status
(
2046 current_operation
="IDLE",
2047 current_operation_id
=None,
2048 error_description
=error_description_nsr
,
2049 error_detail
=error_detail
,
2050 other_update
=db_nsr_update
2052 self
._write
_op
_status
(
2055 error_message
=error_description_nslcmop
,
2056 operation_state
=nslcmop_operation_state
,
2057 other_update
=db_nslcmop_update
,
2060 if nslcmop_operation_state
:
2062 await self
.msg
.aiowrite("ns", "instantiated", {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
,
2063 "operationState": nslcmop_operation_state
},
2065 except Exception as e
:
2066 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
2068 self
.logger
.debug(logging_text
+ "Exit")
2069 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2071 async def _add_vca_relations(
2076 timeout
: int = 3600,
2077 vca_type
: str = None,
2082 # 1. find all relations for this VCA
2083 # 2. wait for other peers related
2087 vca_type
= vca_type
or "lxc_proxy_charm"
2089 # STEP 1: find all relations for this VCA
2092 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2093 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2096 my_vca
= deep_get(db_nsr
, ('_admin', 'deployed', 'VCA'))[vca_index
]
2098 # read all ns-configuration relations
2099 ns_relations
= list()
2100 db_ns_relations
= deep_get(nsd
, ('ns-configuration', 'relation'))
2102 for r
in db_ns_relations
:
2103 # check if this VCA is in the relation
2104 if my_vca
.get('member-vnf-index') in\
2105 (r
.get('entities')[0].get('id'), r
.get('entities')[1].get('id')):
2106 ns_relations
.append(r
)
2108 # read all vnf-configuration relations
2109 vnf_relations
= list()
2110 db_vnfd_list
= db_nsr
.get('vnfd-id')
2112 for vnfd
in db_vnfd_list
:
2113 db_vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd
})
2114 db_vnf_relations
= get_configuration(db_vnfd
, db_vnfd
["id"]).get("relation", [])
2115 if db_vnf_relations
:
2116 for r
in db_vnf_relations
:
2117 # check if this VCA is in the relation
2118 if my_vca
.get('vdu_id') in (r
.get('entities')[0].get('id'), r
.get('entities')[1].get('id')):
2119 vnf_relations
.append(r
)
2121 # if no relations, terminate
2122 if not ns_relations
and not vnf_relations
:
2123 self
.logger
.debug(logging_text
+ ' No relations')
2126 self
.logger
.debug(logging_text
+ ' adding relations\n {}\n {}'.format(ns_relations
, vnf_relations
))
2133 if now
- start
>= timeout
:
2134 self
.logger
.error(logging_text
+ ' : timeout adding relations')
2137 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
2138 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2140 # for each defined NS relation, find the VCA's related
2141 for r
in ns_relations
.copy():
2142 from_vca_ee_id
= None
2144 from_vca_endpoint
= None
2145 to_vca_endpoint
= None
2146 vca_list
= deep_get(db_nsr
, ('_admin', 'deployed', 'VCA'))
2147 for vca
in vca_list
:
2148 if vca
.get('member-vnf-index') == r
.get('entities')[0].get('id') \
2149 and vca
.get('config_sw_installed'):
2150 from_vca_ee_id
= vca
.get('ee_id')
2151 from_vca_endpoint
= r
.get('entities')[0].get('endpoint')
2152 if vca
.get('member-vnf-index') == r
.get('entities')[1].get('id') \
2153 and vca
.get('config_sw_installed'):
2154 to_vca_ee_id
= vca
.get('ee_id')
2155 to_vca_endpoint
= r
.get('entities')[1].get('endpoint')
2156 if from_vca_ee_id
and to_vca_ee_id
:
2158 await self
.vca_map
[vca_type
].add_relation(
2159 ee_id_1
=from_vca_ee_id
,
2160 ee_id_2
=to_vca_ee_id
,
2161 endpoint_1
=from_vca_endpoint
,
2162 endpoint_2
=to_vca_endpoint
,
2165 # remove entry from relations list
2166 ns_relations
.remove(r
)
2168 # check failed peers
2170 vca_status_list
= db_nsr
.get('configurationStatus')
2172 for i
in range(len(vca_list
)):
2174 vca_status
= vca_status_list
[i
]
2175 if vca
.get('member-vnf-index') == r
.get('entities')[0].get('id'):
2176 if vca_status
.get('status') == 'BROKEN':
2177 # peer broken: remove relation from list
2178 ns_relations
.remove(r
)
2179 if vca
.get('member-vnf-index') == r
.get('entities')[1].get('id'):
2180 if vca_status
.get('status') == 'BROKEN':
2181 # peer broken: remove relation from list
2182 ns_relations
.remove(r
)
2187 # for each defined VNF relation, find the VCA's related
2188 for r
in vnf_relations
.copy():
2189 from_vca_ee_id
= None
2191 from_vca_endpoint
= None
2192 to_vca_endpoint
= None
2193 vca_list
= deep_get(db_nsr
, ('_admin', 'deployed', 'VCA'))
2194 for vca
in vca_list
:
2195 key_to_check
= "vdu_id"
2196 if vca
.get("vdu_id") is None:
2197 key_to_check
= "vnfd_id"
2198 if vca
.get(key_to_check
) == r
.get('entities')[0].get('id') and vca
.get('config_sw_installed'):
2199 from_vca_ee_id
= vca
.get('ee_id')
2200 from_vca_endpoint
= r
.get('entities')[0].get('endpoint')
2201 if vca
.get(key_to_check
) == r
.get('entities')[1].get('id') and vca
.get('config_sw_installed'):
2202 to_vca_ee_id
= vca
.get('ee_id')
2203 to_vca_endpoint
= r
.get('entities')[1].get('endpoint')
2204 if from_vca_ee_id
and to_vca_ee_id
:
2206 await self
.vca_map
[vca_type
].add_relation(
2207 ee_id_1
=from_vca_ee_id
,
2208 ee_id_2
=to_vca_ee_id
,
2209 endpoint_1
=from_vca_endpoint
,
2210 endpoint_2
=to_vca_endpoint
,
2213 # remove entry from relations list
2214 vnf_relations
.remove(r
)
2216 # check failed peers
2218 vca_status_list
= db_nsr
.get('configurationStatus')
2220 for i
in range(len(vca_list
)):
2222 vca_status
= vca_status_list
[i
]
2223 if vca
.get('vdu_id') == r
.get('entities')[0].get('id'):
2224 if vca_status
.get('status') == 'BROKEN':
2225 # peer broken: remove relation from list
2226 vnf_relations
.remove(r
)
2227 if vca
.get('vdu_id') == r
.get('entities')[1].get('id'):
2228 if vca_status
.get('status') == 'BROKEN':
2229 # peer broken: remove relation from list
2230 vnf_relations
.remove(r
)
2236 await asyncio
.sleep(5.0)
2238 if not ns_relations
and not vnf_relations
:
2239 self
.logger
.debug('Relations added')
2244 except Exception as e
:
2245 self
.logger
.warn(logging_text
+ ' ERROR adding relations: {}'.format(e
))
2248 async def _install_kdu(self
, nsr_id
: str, nsr_db_path
: str, vnfr_data
: dict, kdu_index
: int, kdud
: dict,
2249 vnfd
: dict, k8s_instance_info
: dict, k8params
: dict = None, timeout
: int = 600,
2250 vca_id
: str = None):
2253 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
2255 db_dict_install
= {"collection": "nsrs",
2256 "filter": {"_id": nsr_id
},
2257 "path": nsr_db_path
}
2259 kdu_instance
= self
.k8scluster_map
[k8sclustertype
].generate_kdu_instance_name(
2260 db_dict
=db_dict_install
,
2261 kdu_model
=k8s_instance_info
["kdu-model"],
2262 kdu_name
=k8s_instance_info
["kdu-name"],
2264 self
.update_db_2("nsrs", nsr_id
, {nsr_db_path
+ ".kdu-instance": kdu_instance
})
2265 await self
.k8scluster_map
[k8sclustertype
].install(
2266 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
2267 kdu_model
=k8s_instance_info
["kdu-model"],
2270 db_dict
=db_dict_install
,
2272 kdu_name
=k8s_instance_info
["kdu-name"],
2273 namespace
=k8s_instance_info
["namespace"],
2274 kdu_instance
=kdu_instance
,
2277 self
.update_db_2("nsrs", nsr_id
, {nsr_db_path
+ ".kdu-instance": kdu_instance
})
2279 # Obtain services to obtain management service ip
2280 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
2281 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
2282 kdu_instance
=kdu_instance
,
2283 namespace
=k8s_instance_info
["namespace"])
2285 # Obtain management service info (if exists)
2286 vnfr_update_dict
= {}
2288 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
2289 mgmt_services
= [service
for service
in kdud
.get("service", []) if service
.get("mgmt-service")]
2290 for mgmt_service
in mgmt_services
:
2291 for service
in services
:
2292 if service
["name"].startswith(mgmt_service
["name"]):
2293 # Mgmt service found, Obtain service ip
2294 ip
= service
.get("external_ip", service
.get("cluster_ip"))
2295 if isinstance(ip
, list) and len(ip
) == 1:
2298 vnfr_update_dict
["kdur.{}.ip-address".format(kdu_index
)] = ip
2300 # Check if must update also mgmt ip at the vnf
2301 service_external_cp
= mgmt_service
.get("external-connection-point-ref")
2302 if service_external_cp
:
2303 if deep_get(vnfd
, ("mgmt-interface", "cp")) == service_external_cp
:
2304 vnfr_update_dict
["ip-address"] = ip
2308 self
.logger
.warn("Mgmt service name: {} not found".format(mgmt_service
["name"]))
2310 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
2311 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
2313 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
2314 if kdu_config
and kdu_config
.get("initial-config-primitive") and \
2315 get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None:
2316 initial_config_primitive_list
= kdu_config
.get("initial-config-primitive")
2317 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
2319 for initial_config_primitive
in initial_config_primitive_list
:
2320 primitive_params_
= self
._map
_primitive
_params
(initial_config_primitive
, {}, {})
2322 await asyncio
.wait_for(
2323 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
2324 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
2325 kdu_instance
=kdu_instance
,
2326 primitive_name
=initial_config_primitive
["name"],
2327 params
=primitive_params_
, db_dict
=db_dict_install
,
2333 except Exception as e
:
2334 # Prepare update db with error and raise exception
2336 self
.update_db_2("nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)})
2337 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), {"kdur.{}.status".format(kdu_index
): "ERROR"})
2339 # ignore to keep original exception
2341 # reraise original error
2346 async def deploy_kdus(self
, logging_text
, nsr_id
, nslcmop_id
, db_vnfrs
, db_vnfds
, task_instantiation_info
):
2347 # Launch kdus if present in the descriptor
2349 k8scluster_id_2_uuic
= {"helm-chart-v3": {}, "helm-chart": {}, "juju-bundle": {}}
2351 async def _get_cluster_id(cluster_id
, cluster_type
):
2352 nonlocal k8scluster_id_2_uuic
2353 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
2354 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
2356 # check if K8scluster is creating and wait look if previous tasks in process
2357 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related("k8scluster", cluster_id
)
2359 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(task_name
, cluster_id
)
2360 self
.logger
.debug(logging_text
+ text
)
2361 await asyncio
.wait(task_dependency
, timeout
=3600)
2363 db_k8scluster
= self
.db
.get_one("k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False)
2364 if not db_k8scluster
:
2365 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
2367 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
2369 if cluster_type
== "helm-chart-v3":
2371 # backward compatibility for existing clusters that have not been initialized for helm v3
2372 k8s_credentials
= yaml
.safe_dump(db_k8scluster
.get("credentials"))
2373 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(k8s_credentials
,
2374 reuse_cluster_uuid
=cluster_id
)
2375 db_k8scluster_update
= {}
2376 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
2377 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
2378 db_k8scluster_update
["_admin.helm-chart-v3.created"] = uninstall_sw
2379 db_k8scluster_update
["_admin.helm-chart-v3.operationalState"] = "ENABLED"
2380 self
.update_db_2("k8sclusters", cluster_id
, db_k8scluster_update
)
2381 except Exception as e
:
2382 self
.logger
.error(logging_text
+ "error initializing helm-v3 cluster: {}".format(str(e
)))
2383 raise LcmException("K8s cluster '{}' has not been initialized for '{}'".format(cluster_id
,
2386 raise LcmException("K8s cluster '{}' has not been initialized for '{}'".
2387 format(cluster_id
, cluster_type
))
2388 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
2391 logging_text
+= "Deploy kdus: "
2394 db_nsr_update
= {"_admin.deployed.K8s": []}
2395 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2398 updated_cluster_list
= []
2399 updated_v3_cluster_list
= []
2401 for vnfr_data
in db_vnfrs
.values():
2402 vca_id
= self
.get_vca_id(vnfr_data
, {})
2403 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
2404 # Step 0: Prepare and set parameters
2405 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
2406 vnfd_id
= vnfr_data
.get('vnfd-id')
2407 vnfd_with_id
= find_in_list(db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
)
2408 kdud
= next(kdud
for kdud
in vnfd_with_id
["kdu"] if kdud
["name"] == kdur
["kdu-name"])
2409 namespace
= kdur
.get("k8s-namespace")
2410 if kdur
.get("helm-chart"):
2411 kdumodel
= kdur
["helm-chart"]
2412 # Default version: helm3, if helm-version is v2 assign v2
2413 k8sclustertype
= "helm-chart-v3"
2414 self
.logger
.debug("kdur: {}".format(kdur
))
2415 if kdur
.get("helm-version") and kdur
.get("helm-version") == "v2":
2416 k8sclustertype
= "helm-chart"
2417 elif kdur
.get("juju-bundle"):
2418 kdumodel
= kdur
["juju-bundle"]
2419 k8sclustertype
= "juju-bundle"
2421 raise LcmException("kdu type for kdu='{}.{}' is neither helm-chart nor "
2422 "juju-bundle. Maybe an old NBI version is running".
2423 format(vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]))
2424 # check if kdumodel is a file and exists
2426 vnfd_with_id
= find_in_list(db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
)
2427 storage
= deep_get(vnfd_with_id
, ('_admin', 'storage'))
2428 if storage
and storage
.get('pkg-dir'): # may be not present if vnfd has not artifacts
2429 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
2430 filename
= '{}/{}/{}s/{}'.format(storage
["folder"], storage
["pkg-dir"], k8sclustertype
,
2432 if self
.fs
.file_exists(filename
, mode
='file') or self
.fs
.file_exists(filename
, mode
='dir'):
2433 kdumodel
= self
.fs
.path
+ filename
2434 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
2436 except Exception: # it is not a file
2439 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
2440 step
= "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id
)
2441 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
2444 if (k8sclustertype
== "helm-chart" and cluster_uuid
not in updated_cluster_list
)\
2445 or (k8sclustertype
== "helm-chart-v3" and cluster_uuid
not in updated_v3_cluster_list
):
2446 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
2447 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(cluster_uuid
=cluster_uuid
))
2448 if del_repo_list
or added_repo_dict
:
2449 if k8sclustertype
== "helm-chart":
2450 unset
= {'_admin.helm_charts_added.' + item
: None for item
in del_repo_list
}
2451 updated
= {'_admin.helm_charts_added.' +
2452 item
: name
for item
, name
in added_repo_dict
.items()}
2453 updated_cluster_list
.append(cluster_uuid
)
2454 elif k8sclustertype
== "helm-chart-v3":
2455 unset
= {'_admin.helm_charts_v3_added.' + item
: None for item
in del_repo_list
}
2456 updated
= {'_admin.helm_charts_v3_added.' +
2457 item
: name
for item
, name
in added_repo_dict
.items()}
2458 updated_v3_cluster_list
.append(cluster_uuid
)
2459 self
.logger
.debug(logging_text
+ "repos synchronized on k8s cluster "
2460 "'{}' to_delete: {}, to_add: {}".
2461 format(k8s_cluster_id
, del_repo_list
, added_repo_dict
))
2462 self
.db
.set_one("k8sclusters", {"_id": k8s_cluster_id
}, updated
, unset
=unset
)
2465 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data
["member-vnf-index-ref"],
2466 kdur
["kdu-name"], k8s_cluster_id
)
2467 k8s_instance_info
= {"kdu-instance": None,
2468 "k8scluster-uuid": cluster_uuid
,
2469 "k8scluster-type": k8sclustertype
,
2470 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
2471 "kdu-name": kdur
["kdu-name"],
2472 "kdu-model": kdumodel
,
2473 "namespace": namespace
}
2474 db_path
= "_admin.deployed.K8s.{}".format(index
)
2475 db_nsr_update
[db_path
] = k8s_instance_info
2476 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2477 vnfd_with_id
= find_in_list(db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
)
2478 task
= asyncio
.ensure_future(
2479 self
._install
_kdu
(nsr_id
, db_path
, vnfr_data
, kdu_index
, kdud
, vnfd_with_id
,
2480 k8s_instance_info
, k8params
=desc_params
, timeout
=600, vca_id
=vca_id
))
2481 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_KDU-{}".format(index
), task
)
2482 task_instantiation_info
[task
] = "Deploying KDU {}".format(kdur
["kdu-name"])
2486 except (LcmException
, asyncio
.CancelledError
):
2488 except Exception as e
:
2489 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
2490 if isinstance(e
, (N2VCException
, DbException
)):
2491 self
.logger
.error(logging_text
+ msg
)
2493 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
2494 raise LcmException(msg
)
2497 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2499 def _deploy_n2vc(self
, logging_text
, db_nsr
, db_vnfr
, nslcmop_id
, nsr_id
, nsi_id
, vnfd_id
, vdu_id
,
2500 kdu_name
, member_vnf_index
, vdu_index
, vdu_name
, deploy_params
, descriptor_config
,
2501 base_folder
, task_instantiation_info
, stage
):
2502 # launch instantiate_N2VC in a asyncio task and register task object
2503 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
2504 # if not found, create one entry and update database
2505 # fill db_nsr._admin.deployed.VCA.<index>
2507 self
.logger
.debug(logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
))
2508 if "execution-environment-list" in descriptor_config
:
2509 ee_list
= descriptor_config
.get("execution-environment-list", [])
2510 else: # other types as script are not supported
2513 for ee_item
in ee_list
:
2514 self
.logger
.debug(logging_text
+ "_deploy_n2vc ee_item juju={}, helm={}".format(ee_item
.get('juju'),
2515 ee_item
.get("helm-chart")))
2516 ee_descriptor_id
= ee_item
.get("id")
2517 if ee_item
.get("juju"):
2518 vca_name
= ee_item
['juju'].get('charm')
2519 vca_type
= "lxc_proxy_charm" if ee_item
['juju'].get('charm') is not None else "native_charm"
2520 if ee_item
['juju'].get('cloud') == "k8s":
2521 vca_type
= "k8s_proxy_charm"
2522 elif ee_item
['juju'].get('proxy') is False:
2523 vca_type
= "native_charm"
2524 elif ee_item
.get("helm-chart"):
2525 vca_name
= ee_item
['helm-chart']
2526 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
2529 vca_type
= "helm-v3"
2531 self
.logger
.debug(logging_text
+ "skipping non juju neither charm configuration")
2535 for vca_index
, vca_deployed
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
2536 if not vca_deployed
:
2538 if vca_deployed
.get("member-vnf-index") == member_vnf_index
and \
2539 vca_deployed
.get("vdu_id") == vdu_id
and \
2540 vca_deployed
.get("kdu_name") == kdu_name
and \
2541 vca_deployed
.get("vdu_count_index", 0) == vdu_index
and \
2542 vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
:
2545 # not found, create one.
2546 target
= "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
2548 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
2550 target
+= "/kdu/{}".format(kdu_name
)
2552 "target_element": target
,
2553 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
2554 "member-vnf-index": member_vnf_index
,
2556 "kdu_name": kdu_name
,
2557 "vdu_count_index": vdu_index
,
2558 "operational-status": "init", # TODO revise
2559 "detailed-status": "", # TODO revise
2560 "step": "initial-deploy", # TODO revise
2562 "vdu_name": vdu_name
,
2564 "ee_descriptor_id": ee_descriptor_id
2568 # create VCA and configurationStatus in db
2570 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
2571 "configurationStatus.{}".format(vca_index
): dict()
2573 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2575 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
2577 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
2578 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
2579 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
2582 task_n2vc
= asyncio
.ensure_future(
2583 self
.instantiate_N2VC(
2584 logging_text
=logging_text
,
2585 vca_index
=vca_index
,
2591 vdu_index
=vdu_index
,
2592 deploy_params
=deploy_params
,
2593 config_descriptor
=descriptor_config
,
2594 base_folder
=base_folder
,
2595 nslcmop_id
=nslcmop_id
,
2599 ee_config_descriptor
=ee_item
2602 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_N2VC-{}".format(vca_index
), task_n2vc
)
2603 task_instantiation_info
[task_n2vc
] = self
.task_name_deploy_vca
+ " {}.{}".format(
2604 member_vnf_index
or "", vdu_id
or "")
2607 def _create_nslcmop(nsr_id
, operation
, params
):
2609 Creates a ns-lcm-opp content to be stored at database.
2610 :param nsr_id: internal id of the instance
2611 :param operation: instantiate, terminate, scale, action, ...
2612 :param params: user parameters for the operation
2613 :return: dictionary following SOL005 format
2615 # Raise exception if invalid arguments
2616 if not (nsr_id
and operation
and params
):
2618 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
2624 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
2625 "operationState": "PROCESSING",
2626 "statusEnteredTime": now
,
2627 "nsInstanceId": nsr_id
,
2628 "lcmOperationType": operation
,
2630 "isAutomaticInvocation": False,
2631 "operationParams": params
,
2632 "isCancelPending": False,
2634 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
2635 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
2640 def _format_additional_params(self
, params
):
2641 params
= params
or {}
2642 for key
, value
in params
.items():
2643 if str(value
).startswith("!!yaml "):
2644 params
[key
] = yaml
.safe_load(value
[7:])
2647 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
2648 primitive
= seq
.get('name')
2649 primitive_params
= {}
2651 "member_vnf_index": vnf_index
,
2652 "primitive": primitive
,
2653 "primitive_params": primitive_params
,
2656 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
2660 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
2661 op
= deep_get(db_nslcmop
, ('_admin', 'operations'), [])[op_index
]
2662 if op
.get('operationState') == 'COMPLETED':
2663 # b. Skip sub-operation
2664 # _ns_execute_primitive() or RO.create_action() will NOT be executed
2665 return self
.SUBOPERATION_STATUS_SKIP
2667 # c. retry executing sub-operation
2668 # The sub-operation exists, and operationState != 'COMPLETED'
2669 # Update operationState = 'PROCESSING' to indicate a retry.
2670 operationState
= 'PROCESSING'
2671 detailed_status
= 'In progress'
2672 self
._update
_suboperation
_status
(
2673 db_nslcmop
, op_index
, operationState
, detailed_status
)
2674 # Return the sub-operation index
2675 # _ns_execute_primitive() or RO.create_action() will be called from scale()
2676 # with arguments extracted from the sub-operation
2679 # Find a sub-operation where all keys in a matching dictionary must match
2680 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
2681 def _find_suboperation(self
, db_nslcmop
, match
):
2682 if db_nslcmop
and match
:
2683 op_list
= db_nslcmop
.get('_admin', {}).get('operations', [])
2684 for i
, op
in enumerate(op_list
):
2685 if all(op
.get(k
) == match
[k
] for k
in match
):
2687 return self
.SUBOPERATION_STATUS_NOT_FOUND
2689 # Update status for a sub-operation given its index
2690 def _update_suboperation_status(self
, db_nslcmop
, op_index
, operationState
, detailed_status
):
2691 # Update DB for HA tasks
2692 q_filter
= {'_id': db_nslcmop
['_id']}
2693 update_dict
= {'_admin.operations.{}.operationState'.format(op_index
): operationState
,
2694 '_admin.operations.{}.detailed-status'.format(op_index
): detailed_status
}
2695 self
.db
.set_one("nslcmops",
2697 update_dict
=update_dict
,
2698 fail_on_empty
=False)
2700 # Add sub-operation, return the index of the added sub-operation
2701 # Optionally, set operationState, detailed-status, and operationType
2702 # Status and type are currently set for 'scale' sub-operations:
2703 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
2704 # 'detailed-status' : status message
2705 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
2706 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
2707 def _add_suboperation(self
, db_nslcmop
, vnf_index
, vdu_id
, vdu_count_index
, vdu_name
, primitive
,
2708 mapped_primitive_params
, operationState
=None, detailed_status
=None, operationType
=None,
2709 RO_nsr_id
=None, RO_scaling_info
=None):
2711 return self
.SUBOPERATION_STATUS_NOT_FOUND
2712 # Get the "_admin.operations" list, if it exists
2713 db_nslcmop_admin
= db_nslcmop
.get('_admin', {})
2714 op_list
= db_nslcmop_admin
.get('operations')
2715 # Create or append to the "_admin.operations" list
2716 new_op
= {'member_vnf_index': vnf_index
,
2718 'vdu_count_index': vdu_count_index
,
2719 'primitive': primitive
,
2720 'primitive_params': mapped_primitive_params
}
2722 new_op
['operationState'] = operationState
2724 new_op
['detailed-status'] = detailed_status
2726 new_op
['lcmOperationType'] = operationType
2728 new_op
['RO_nsr_id'] = RO_nsr_id
2730 new_op
['RO_scaling_info'] = RO_scaling_info
2732 # No existing operations, create key 'operations' with current operation as first list element
2733 db_nslcmop_admin
.update({'operations': [new_op
]})
2734 op_list
= db_nslcmop_admin
.get('operations')
2736 # Existing operations, append operation to list
2737 op_list
.append(new_op
)
2739 db_nslcmop_update
= {'_admin.operations': op_list
}
2740 self
.update_db_2("nslcmops", db_nslcmop
['_id'], db_nslcmop_update
)
2741 op_index
= len(op_list
) - 1
2744 # Helper methods for scale() sub-operations
2746 # pre-scale/post-scale:
2747 # Check for 3 different cases:
2748 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
2749 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
2750 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
2751 def _check_or_add_scale_suboperation(self
, db_nslcmop
, vnf_index
, vnf_config_primitive
, primitive_params
,
2752 operationType
, RO_nsr_id
=None, RO_scaling_info
=None):
2753 # Find this sub-operation
2754 if RO_nsr_id
and RO_scaling_info
:
2755 operationType
= 'SCALE-RO'
2757 'member_vnf_index': vnf_index
,
2758 'RO_nsr_id': RO_nsr_id
,
2759 'RO_scaling_info': RO_scaling_info
,
2763 'member_vnf_index': vnf_index
,
2764 'primitive': vnf_config_primitive
,
2765 'primitive_params': primitive_params
,
2766 'lcmOperationType': operationType
2768 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
2769 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
2770 # a. New sub-operation
2771 # The sub-operation does not exist, add it.
2772 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
2773 # The following parameters are set to None for all kind of scaling:
2775 vdu_count_index
= None
2777 if RO_nsr_id
and RO_scaling_info
:
2778 vnf_config_primitive
= None
2779 primitive_params
= None
2782 RO_scaling_info
= None
2783 # Initial status for sub-operation
2784 operationState
= 'PROCESSING'
2785 detailed_status
= 'In progress'
2786 # Add sub-operation for pre/post-scaling (zero or more operations)
2787 self
._add
_suboperation
(db_nslcmop
,
2792 vnf_config_primitive
,
2799 return self
.SUBOPERATION_STATUS_NEW
2801 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
2802 # or op_index (operationState != 'COMPLETED')
2803 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
2805 # Function to return execution_environment id
2807 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
2808 # TODO vdu_index_count
2809 for vca
in vca_deployed_list
:
2810 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
2813 async def destroy_N2VC(
2821 exec_primitives
=True,
2826 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
2827 :param logging_text:
2829 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
2830 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
2831 :param vca_index: index in the database _admin.deployed.VCA
2832 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
2833 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
2834 not executed properly
2835 :param scaling_in: True destroys the application, False destroys the model
2836 :return: None or exception
2840 logging_text
+ " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
2841 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
2845 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
2847 # execute terminate_primitives
2849 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
2850 config_descriptor
.get("terminate-config-primitive"), vca_deployed
.get("ee_descriptor_id"))
2851 vdu_id
= vca_deployed
.get("vdu_id")
2852 vdu_count_index
= vca_deployed
.get("vdu_count_index")
2853 vdu_name
= vca_deployed
.get("vdu_name")
2854 vnf_index
= vca_deployed
.get("member-vnf-index")
2855 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
2856 for seq
in terminate_primitives
:
2857 # For each sequence in list, get primitive and call _ns_execute_primitive()
2858 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
2859 vnf_index
, seq
.get("name"))
2860 self
.logger
.debug(logging_text
+ step
)
2861 # Create the primitive for each sequence, i.e. "primitive": "touch"
2862 primitive
= seq
.get('name')
2863 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(seq
, vnf_index
)
2866 self
._add
_suboperation
(db_nslcmop
,
2872 mapped_primitive_params
)
2873 # Sub-operations: Call _ns_execute_primitive() instead of action()
2875 result
, result_detail
= await self
._ns
_execute
_primitive
(
2876 vca_deployed
["ee_id"], primitive
,
2877 mapped_primitive_params
,
2881 except LcmException
:
2882 # this happens when VCA is not deployed. In this case it is not needed to terminate
2884 result_ok
= ['COMPLETED', 'PARTIALLY_COMPLETED']
2885 if result
not in result_ok
:
2886 raise LcmException("terminate_primitive {} for vnf_member_index={} fails with "
2887 "error {}".format(seq
.get("name"), vnf_index
, result_detail
))
2888 # set that this VCA do not need terminated
2889 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(vca_index
)
2890 self
.update_db_2("nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False})
2892 if vca_deployed
.get("prometheus_jobs") and self
.prometheus
:
2893 await self
.prometheus
.update(remove_jobs
=vca_deployed
["prometheus_jobs"])
2896 await self
.vca_map
[vca_type
].delete_execution_environment(
2897 vca_deployed
["ee_id"],
2898 scaling_in
=scaling_in
,
2902 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
2903 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
='TERMINATING')
2904 namespace
= "." + db_nsr
["_id"]
2906 await self
.n2vc
.delete_namespace(
2907 namespace
=namespace
,
2908 total_timeout
=self
.timeout_charm_delete
,
2911 except N2VCNotFound
: # already deleted. Skip
2913 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
='DELETED')
2915 async def _terminate_RO(self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
):
2917 Terminates a deployment from RO
2918 :param logging_text:
2919 :param nsr_deployed: db_nsr._admin.deployed
2922 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
2923 this method will update only the index 2, but it will write on database the concatenated content of the list
2928 ro_nsr_id
= ro_delete_action
= None
2929 if nsr_deployed
and nsr_deployed
.get("RO"):
2930 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
2931 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
2934 stage
[2] = "Deleting ns from VIM."
2935 db_nsr_update
["detailed-status"] = " ".join(stage
)
2936 self
._write
_op
_status
(nslcmop_id
, stage
)
2937 self
.logger
.debug(logging_text
+ stage
[2])
2938 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2939 self
._write
_op
_status
(nslcmop_id
, stage
)
2940 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
2941 ro_delete_action
= desc
["action_id"]
2942 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = ro_delete_action
2943 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
2944 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
2945 if ro_delete_action
:
2946 # wait until NS is deleted from VIM
2947 stage
[2] = "Waiting ns deleted from VIM."
2948 detailed_status_old
= None
2949 self
.logger
.debug(logging_text
+ stage
[2] + " RO_id={} ro_delete_action={}".format(ro_nsr_id
,
2951 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2952 self
._write
_op
_status
(nslcmop_id
, stage
)
2954 delete_timeout
= 20 * 60 # 20 minutes
2955 while delete_timeout
> 0:
2956 desc
= await self
.RO
.show(
2958 item_id_name
=ro_nsr_id
,
2959 extra_item
="action",
2960 extra_item_id
=ro_delete_action
)
2963 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
2965 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
2966 if ns_status
== "ERROR":
2967 raise ROclient
.ROClientException(ns_status_info
)
2968 elif ns_status
== "BUILD":
2969 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
2970 elif ns_status
== "ACTIVE":
2971 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
2972 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
2975 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status
)
2976 if stage
[2] != detailed_status_old
:
2977 detailed_status_old
= stage
[2]
2978 db_nsr_update
["detailed-status"] = " ".join(stage
)
2979 self
._write
_op
_status
(nslcmop_id
, stage
)
2980 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2981 await asyncio
.sleep(5, loop
=self
.loop
)
2983 else: # delete_timeout <= 0:
2984 raise ROclient
.ROClientException("Timeout waiting ns deleted from VIM")
2986 except Exception as e
:
2987 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2988 if isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404: # not found
2989 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
2990 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
2991 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
2992 self
.logger
.debug(logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
))
2993 elif isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409: # conflict
2994 failed_detail
.append("delete conflict: {}".format(e
))
2995 self
.logger
.debug(logging_text
+ "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
))
2997 failed_detail
.append("delete error: {}".format(e
))
2998 self
.logger
.error(logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
))
3001 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
3002 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
3004 stage
[2] = "Deleting nsd from RO."
3005 db_nsr_update
["detailed-status"] = " ".join(stage
)
3006 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3007 self
._write
_op
_status
(nslcmop_id
, stage
)
3008 await self
.RO
.delete("nsd", ro_nsd_id
)
3009 self
.logger
.debug(logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
))
3010 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
3011 except Exception as e
:
3012 if isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404: # not found
3013 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
3014 self
.logger
.debug(logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
))
3015 elif isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409: # conflict
3016 failed_detail
.append("ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
))
3017 self
.logger
.debug(logging_text
+ failed_detail
[-1])
3019 failed_detail
.append("ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
))
3020 self
.logger
.error(logging_text
+ failed_detail
[-1])
3022 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
3023 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
3024 if not vnf_deployed
or not vnf_deployed
["id"]:
3027 ro_vnfd_id
= vnf_deployed
["id"]
3028 stage
[2] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
3029 vnf_deployed
["member-vnf-index"], ro_vnfd_id
)
3030 db_nsr_update
["detailed-status"] = " ".join(stage
)
3031 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3032 self
._write
_op
_status
(nslcmop_id
, stage
)
3033 await self
.RO
.delete("vnfd", ro_vnfd_id
)
3034 self
.logger
.debug(logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
))
3035 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
3036 except Exception as e
:
3037 if isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404: # not found
3038 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
3039 self
.logger
.debug(logging_text
+ "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
))
3040 elif isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409: # conflict
3041 failed_detail
.append("ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
))
3042 self
.logger
.debug(logging_text
+ failed_detail
[-1])
3044 failed_detail
.append("ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
))
3045 self
.logger
.error(logging_text
+ failed_detail
[-1])
3048 stage
[2] = "Error deleting from VIM"
3050 stage
[2] = "Deleted from VIM"
3051 db_nsr_update
["detailed-status"] = " ".join(stage
)
3052 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3053 self
._write
_op
_status
(nslcmop_id
, stage
)
3056 raise LcmException("; ".join(failed_detail
))
3058 async def terminate(self
, nsr_id
, nslcmop_id
):
3059 # Try to lock HA task here
3060 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
3061 if not task_is_locked_by_me
:
3064 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
3065 self
.logger
.debug(logging_text
+ "Enter")
3066 timeout_ns_terminate
= self
.timeout_ns_terminate
3069 operation_params
= None
3071 error_list
= [] # annotates all failed error messages
3072 db_nslcmop_update
= {}
3073 autoremove
= False # autoremove after terminated
3074 tasks_dict_info
= {}
3076 stage
= ["Stage 1/3: Preparing task.", "Waiting for previous operations to terminate.", ""]
3077 # ^ contains [stage, step, VIM-status]
3079 # wait for any previous tasks in process
3080 await self
.lcm_tasks
.waitfor_related_HA("ns", 'nslcmops', nslcmop_id
)
3082 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
3083 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
3084 operation_params
= db_nslcmop
.get("operationParams") or {}
3085 if operation_params
.get("timeout_ns_terminate"):
3086 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
3087 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
3088 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3090 db_nsr_update
["operational-status"] = "terminating"
3091 db_nsr_update
["config-status"] = "terminating"
3092 self
._write
_ns
_status
(
3094 ns_state
="TERMINATING",
3095 current_operation
="TERMINATING",
3096 current_operation_id
=nslcmop_id
,
3097 other_update
=db_nsr_update
3099 self
._write
_op
_status
(
3104 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
3105 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
3108 stage
[1] = "Getting vnf descriptors from db."
3109 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
3110 db_vnfrs_dict
= {db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
}
3111 db_vnfds_from_id
= {}
3112 db_vnfds_from_member_index
= {}
3114 for vnfr
in db_vnfrs_list
:
3115 vnfd_id
= vnfr
["vnfd-id"]
3116 if vnfd_id
not in db_vnfds_from_id
:
3117 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
3118 db_vnfds_from_id
[vnfd_id
] = vnfd
3119 db_vnfds_from_member_index
[vnfr
["member-vnf-index-ref"]] = db_vnfds_from_id
[vnfd_id
]
3121 # Destroy individual execution environments when there are terminating primitives.
3122 # Rest of EE will be deleted at once
3123 # TODO - check before calling _destroy_N2VC
3124 # if not operation_params.get("skip_terminate_primitives"):#
3125 # or not vca.get("needed_terminate"):
3126 stage
[0] = "Stage 2/3 execute terminating primitives."
3127 self
.logger
.debug(logging_text
+ stage
[0])
3128 stage
[1] = "Looking execution environment that needs terminate."
3129 self
.logger
.debug(logging_text
+ stage
[1])
3131 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
3132 config_descriptor
= None
3134 vca_id
= self
.get_vca_id(db_vnfrs_dict
[vca
["member-vnf-index"]], db_nsr
)
3135 if not vca
or not vca
.get("ee_id"):
3137 if not vca
.get("member-vnf-index"):
3139 config_descriptor
= db_nsr
.get("ns-configuration")
3140 elif vca
.get("vdu_id"):
3141 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
3142 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
3143 elif vca
.get("kdu_name"):
3144 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
3145 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
3147 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
3148 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
3149 vca_type
= vca
.get("type")
3150 exec_terminate_primitives
= (not operation_params
.get("skip_terminate_primitives") and
3151 vca
.get("needed_terminate"))
3152 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
3153 # pending native charms
3154 destroy_ee
= True if vca_type
in ("helm", "helm-v3", "native_charm") else False
3155 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
3156 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
3157 task
= asyncio
.ensure_future(
3165 exec_terminate_primitives
,
3169 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
3171 # wait for pending tasks of terminate primitives
3173 self
.logger
.debug(logging_text
+ 'Waiting for tasks {}'.format(list(tasks_dict_info
.keys())))
3174 error_list
= await self
._wait
_for
_tasks
(logging_text
, tasks_dict_info
,
3175 min(self
.timeout_charm_delete
, timeout_ns_terminate
),
3177 tasks_dict_info
.clear()
3179 return # raise LcmException("; ".join(error_list))
3181 # remove All execution environments at once
3182 stage
[0] = "Stage 3/3 delete all."
3184 if nsr_deployed
.get("VCA"):
3185 stage
[1] = "Deleting all execution environments."
3186 self
.logger
.debug(logging_text
+ stage
[1])
3187 vca_id
= self
.get_vca_id({}, db_nsr
)
3188 task_delete_ee
= asyncio
.ensure_future(
3190 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
3191 timeout
=self
.timeout_charm_delete
3194 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
3195 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
3197 # Delete from k8scluster
3198 stage
[1] = "Deleting KDUs."
3199 self
.logger
.debug(logging_text
+ stage
[1])
3200 # print(nsr_deployed)
3201 for kdu
in get_iterable(nsr_deployed
, "K8s"):
3202 if not kdu
or not kdu
.get("kdu-instance"):
3204 kdu_instance
= kdu
.get("kdu-instance")
3205 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
3206 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
3207 vca_id
= self
.get_vca_id({}, db_nsr
)
3208 task_delete_kdu_instance
= asyncio
.ensure_future(
3209 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
3210 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3211 kdu_instance
=kdu_instance
,
3216 self
.logger
.error(logging_text
+ "Unknown k8s deployment type {}".
3217 format(kdu
.get("k8scluster-type")))
3219 tasks_dict_info
[task_delete_kdu_instance
] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
3222 stage
[1] = "Deleting ns from VIM."
3224 task_delete_ro
= asyncio
.ensure_future(
3225 self
._terminate
_ng
_ro
(logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
))
3227 task_delete_ro
= asyncio
.ensure_future(
3228 self
._terminate
_RO
(logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
))
3229 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
3231 # rest of staff will be done at finally
3233 except (ROclient
.ROClientException
, DbException
, LcmException
, N2VCException
) as e
:
3234 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
3236 except asyncio
.CancelledError
:
3237 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(stage
[1]))
3238 exc
= "Operation was cancelled"
3239 except Exception as e
:
3240 exc
= traceback
.format_exc()
3241 self
.logger
.critical(logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
), exc_info
=True)
3244 error_list
.append(str(exc
))
3246 # wait for pending tasks
3248 stage
[1] = "Waiting for terminate pending tasks."
3249 self
.logger
.debug(logging_text
+ stage
[1])
3250 error_list
+= await self
._wait
_for
_tasks
(logging_text
, tasks_dict_info
, timeout_ns_terminate
,
3252 stage
[1] = stage
[2] = ""
3253 except asyncio
.CancelledError
:
3254 error_list
.append("Cancelled")
3255 # TODO cancell all tasks
3256 except Exception as exc
:
3257 error_list
.append(str(exc
))
3258 # update status at database
3260 error_detail
= "; ".join(error_list
)
3261 # self.logger.error(logging_text + error_detail)
3262 error_description_nslcmop
= '{} Detail: {}'.format(stage
[0], error_detail
)
3263 error_description_nsr
= 'Operation: TERMINATING.{}, {}.'.format(nslcmop_id
, stage
[0])
3265 db_nsr_update
["operational-status"] = "failed"
3266 db_nsr_update
["detailed-status"] = error_description_nsr
+ " Detail: " + error_detail
3267 db_nslcmop_update
["detailed-status"] = error_detail
3268 nslcmop_operation_state
= "FAILED"
3272 error_description_nsr
= error_description_nslcmop
= None
3273 ns_state
= "NOT_INSTANTIATED"
3274 db_nsr_update
["operational-status"] = "terminated"
3275 db_nsr_update
["detailed-status"] = "Done"
3276 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
3277 db_nslcmop_update
["detailed-status"] = "Done"
3278 nslcmop_operation_state
= "COMPLETED"
3281 self
._write
_ns
_status
(
3284 current_operation
="IDLE",
3285 current_operation_id
=None,
3286 error_description
=error_description_nsr
,
3287 error_detail
=error_detail
,
3288 other_update
=db_nsr_update
3290 self
._write
_op
_status
(
3293 error_message
=error_description_nslcmop
,
3294 operation_state
=nslcmop_operation_state
,
3295 other_update
=db_nslcmop_update
,
3297 if ns_state
== "NOT_INSTANTIATED":
3299 self
.db
.set_list("vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "NOT_INSTANTIATED"})
3300 except DbException
as e
:
3301 self
.logger
.warn(logging_text
+ 'Error writing VNFR status for nsr-id-ref: {} -> {}'.
3303 if operation_params
:
3304 autoremove
= operation_params
.get("autoremove", False)
3305 if nslcmop_operation_state
:
3307 await self
.msg
.aiowrite("ns", "terminated", {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
,
3308 "operationState": nslcmop_operation_state
,
3309 "autoremove": autoremove
},
3311 except Exception as e
:
3312 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
3314 self
.logger
.debug(logging_text
+ "Exit")
3315 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
3317 async def _wait_for_tasks(self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None):
3319 error_detail_list
= []
3321 pending_tasks
= list(created_tasks_info
.keys())
3322 num_tasks
= len(pending_tasks
)
3324 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
3325 self
._write
_op
_status
(nslcmop_id
, stage
)
3326 while pending_tasks
:
3328 _timeout
= timeout
+ time_start
- time()
3329 done
, pending_tasks
= await asyncio
.wait(pending_tasks
, timeout
=_timeout
,
3330 return_when
=asyncio
.FIRST_COMPLETED
)
3331 num_done
+= len(done
)
3332 if not done
: # Timeout
3333 for task
in pending_tasks
:
3334 new_error
= created_tasks_info
[task
] + ": Timeout"
3335 error_detail_list
.append(new_error
)
3336 error_list
.append(new_error
)
3339 if task
.cancelled():
3342 exc
= task
.exception()
3344 if isinstance(exc
, asyncio
.TimeoutError
):
3346 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
3347 error_list
.append(created_tasks_info
[task
])
3348 error_detail_list
.append(new_error
)
3349 if isinstance(exc
, (str, DbException
, N2VCException
, ROclient
.ROClientException
, LcmException
,
3350 K8sException
, NgRoException
)):
3351 self
.logger
.error(logging_text
+ new_error
)
3353 exc_traceback
= "".join(traceback
.format_exception(None, exc
, exc
.__traceback
__))
3354 self
.logger
.error(logging_text
+ created_tasks_info
[task
] + " " + exc_traceback
)
3356 self
.logger
.debug(logging_text
+ created_tasks_info
[task
] + ": Done")
3357 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
3359 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
3360 if nsr_id
: # update also nsr
3361 self
.update_db_2("nsrs", nsr_id
, {"errorDescription": "Error at: " + ", ".join(error_list
),
3362 "errorDetail": ". ".join(error_detail_list
)})
3363 self
._write
_op
_status
(nslcmop_id
, stage
)
3364 return error_detail_list
3367 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
3369 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
3370 The default-value is used. If it is between < > it look for a value at instantiation_params
3371 :param primitive_desc: portion of VNFD/NSD that describes primitive
3372 :param params: Params provided by user
3373 :param instantiation_params: Instantiation params provided by user
3374 :return: a dictionary with the calculated params
3376 calculated_params
= {}
3377 for parameter
in primitive_desc
.get("parameter", ()):
3378 param_name
= parameter
["name"]
3379 if param_name
in params
:
3380 calculated_params
[param_name
] = params
[param_name
]
3381 elif "default-value" in parameter
or "value" in parameter
:
3382 if "value" in parameter
:
3383 calculated_params
[param_name
] = parameter
["value"]
3385 calculated_params
[param_name
] = parameter
["default-value"]
3386 if isinstance(calculated_params
[param_name
], str) and calculated_params
[param_name
].startswith("<") \
3387 and calculated_params
[param_name
].endswith(">"):
3388 if calculated_params
[param_name
][1:-1] in instantiation_params
:
3389 calculated_params
[param_name
] = instantiation_params
[calculated_params
[param_name
][1:-1]]
3391 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3392 format(calculated_params
[param_name
], primitive_desc
["name"]))
3394 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3395 format(param_name
, primitive_desc
["name"]))
3397 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
3398 calculated_params
[param_name
] = yaml
.safe_dump(calculated_params
[param_name
],
3399 default_flow_style
=True, width
=256)
3400 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[param_name
].startswith("!!yaml "):
3401 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
3402 if parameter
.get("data-type") == "INTEGER":
3404 calculated_params
[param_name
] = int(calculated_params
[param_name
])
3405 except ValueError: # error converting string to int
3407 "Parameter {} of primitive {} must be integer".format(param_name
, primitive_desc
["name"]))
3408 elif parameter
.get("data-type") == "BOOLEAN":
3409 calculated_params
[param_name
] = not ((str(calculated_params
[param_name
])).lower() == 'false')
3411 # add always ns_config_info if primitive name is config
3412 if primitive_desc
["name"] == "config":
3413 if "ns_config_info" in instantiation_params
:
3414 calculated_params
["ns_config_info"] = instantiation_params
["ns_config_info"]
3415 return calculated_params
3417 def _look_for_deployed_vca(self
, deployed_vca
, member_vnf_index
, vdu_id
, vdu_count_index
, kdu_name
=None,
3418 ee_descriptor_id
=None):
3419 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
3420 for vca
in deployed_vca
:
3423 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
3425 if vdu_count_index
is not None and vdu_count_index
!= vca
["vdu_count_index"]:
3427 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
3429 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
3433 # vca_deployed not found
3434 raise LcmException("charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
3435 " is not deployed".format(member_vnf_index
, vdu_id
, vdu_count_index
, kdu_name
,
3438 ee_id
= vca
.get("ee_id")
3439 vca_type
= vca
.get("type", "lxc_proxy_charm") # default value for backward compatibility - proxy charm
3441 raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
3442 "execution environment"
3443 .format(member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
))
3444 return ee_id
, vca_type
3446 async def _ns_execute_primitive(
3452 retries_interval
=30,
3459 if primitive
== "config":
3460 primitive_params
= {"params": primitive_params
}
3462 vca_type
= vca_type
or "lxc_proxy_charm"
3466 output
= await asyncio
.wait_for(
3467 self
.vca_map
[vca_type
].exec_primitive(
3469 primitive_name
=primitive
,
3470 params_dict
=primitive_params
,
3471 progress_timeout
=self
.timeout_progress_primitive
,
3472 total_timeout
=self
.timeout_primitive
,
3476 timeout
=timeout
or self
.timeout_primitive
)
3479 except asyncio
.CancelledError
:
3481 except Exception as e
: # asyncio.TimeoutError
3482 if isinstance(e
, asyncio
.TimeoutError
):
3486 self
.logger
.debug('Error executing action {} on {} -> {}'.format(primitive
, ee_id
, e
))
3488 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
3490 return 'FAILED', str(e
)
3492 return 'COMPLETED', output
3494 except (LcmException
, asyncio
.CancelledError
):
3496 except Exception as e
:
3497 return 'FAIL', 'Error executing action {}: {}'.format(primitive
, e
)
3499 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
3501 Updating the vca_status with latest juju information in nsrs record
3502 :param: nsr_id: Id of the nsr
3503 :param: nslcmop_id: Id of the nslcmop
3507 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
3508 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3509 vca_id
= self
.get_vca_id({}, db_nsr
)
3510 if db_nsr
['_admin']['deployed']['K8s']:
3511 for k8s_index
, k8s
in enumerate(db_nsr
['_admin']['deployed']['K8s']):
3512 cluster_uuid
, kdu_instance
= k8s
["k8scluster-uuid"], k8s
["kdu-instance"]
3513 await self
._on
_update
_k
8s
_db
(cluster_uuid
, kdu_instance
, filter={'_id': nsr_id
}, vca_id
=vca_id
)
3515 for vca_index
, _
in enumerate(db_nsr
['_admin']['deployed']['VCA']):
3516 table
, filter = "nsrs", {"_id": nsr_id
}
3517 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
3518 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
3520 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
3521 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
3523 async def action(self
, nsr_id
, nslcmop_id
):
3524 # Try to lock HA task here
3525 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
3526 if not task_is_locked_by_me
:
3529 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
3530 self
.logger
.debug(logging_text
+ "Enter")
3531 # get all needed from database
3535 db_nslcmop_update
= {}
3536 nslcmop_operation_state
= None
3537 error_description_nslcmop
= None
3540 # wait for any previous tasks in process
3541 step
= "Waiting for previous operations to terminate"
3542 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
3544 self
._write
_ns
_status
(
3547 current_operation
="RUNNING ACTION",
3548 current_operation_id
=nslcmop_id
3551 step
= "Getting information from database"
3552 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
3553 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3555 nsr_deployed
= db_nsr
["_admin"].get("deployed")
3556 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
3557 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
3558 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
3559 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
3560 primitive
= db_nslcmop
["operationParams"]["primitive"]
3561 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
3562 timeout_ns_action
= db_nslcmop
["operationParams"].get("timeout_ns_action", self
.timeout_primitive
)
3565 step
= "Getting vnfr from database"
3566 db_vnfr
= self
.db
.get_one("vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
})
3567 step
= "Getting vnfd from database"
3568 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
3570 step
= "Getting nsd from database"
3571 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
3573 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
3574 # for backward compatibility
3575 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
3576 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
3577 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
3578 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3580 # look for primitive
3581 config_primitive_desc
= descriptor_configuration
= None
3583 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
3585 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
3587 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
3589 descriptor_configuration
= db_nsd
.get("ns-configuration")
3591 if descriptor_configuration
and descriptor_configuration
.get("config-primitive"):
3592 for config_primitive
in descriptor_configuration
["config-primitive"]:
3593 if config_primitive
["name"] == primitive
:
3594 config_primitive_desc
= config_primitive
3597 if not config_primitive_desc
:
3598 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
3599 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
3601 primitive_name
= primitive
3602 ee_descriptor_id
= None
3604 primitive_name
= config_primitive_desc
.get("execution-environment-primitive", primitive
)
3605 ee_descriptor_id
= config_primitive_desc
.get("execution-environment-ref")
3609 vdur
= next((x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None)
3610 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
3612 kdur
= next((x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None)
3613 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3615 desc_params
= parse_yaml_strings(db_vnfr
.get("additionalParamsForVnf"))
3617 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
3618 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
3619 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
3621 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
3622 actions
.add(primitive
["name"])
3623 for primitive
in kdu_configuration
.get("config-primitive", []):
3624 actions
.add(primitive
["name"])
3625 kdu_action
= True if primitive_name
in actions
else False
3627 # TODO check if ns is in a proper status
3628 if kdu_name
and (primitive_name
in ("upgrade", "rollback", "status") or kdu_action
):
3629 # kdur and desc_params already set from before
3630 if primitive_params
:
3631 desc_params
.update(primitive_params
)
3632 # TODO Check if we will need something at vnf level
3633 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
3634 if kdu_name
== kdu
["kdu-name"] and kdu
["member-vnf-index"] == vnf_index
:
3637 raise LcmException("KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
))
3639 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
3640 msg
= "unknown k8scluster-type '{}'".format(kdu
.get("k8scluster-type"))
3641 raise LcmException(msg
)
3643 db_dict
= {"collection": "nsrs",
3644 "filter": {"_id": nsr_id
},
3645 "path": "_admin.deployed.K8s.{}".format(index
)}
3646 self
.logger
.debug(logging_text
+ "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
))
3647 step
= "Executing kdu {}".format(primitive_name
)
3648 if primitive_name
== "upgrade":
3649 if desc_params
.get("kdu_model"):
3650 kdu_model
= desc_params
.get("kdu_model")
3651 del desc_params
["kdu_model"]
3653 kdu_model
= kdu
.get("kdu-model")
3654 parts
= kdu_model
.split(sep
=":")
3656 kdu_model
= parts
[0]
3658 detailed_status
= await asyncio
.wait_for(
3659 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
3660 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3661 kdu_instance
=kdu
.get("kdu-instance"),
3662 atomic
=True, kdu_model
=kdu_model
,
3663 params
=desc_params
, db_dict
=db_dict
,
3664 timeout
=timeout_ns_action
),
3665 timeout
=timeout_ns_action
+ 10)
3666 self
.logger
.debug(logging_text
+ " Upgrade of kdu {} done".format(detailed_status
))
3667 elif primitive_name
== "rollback":
3668 detailed_status
= await asyncio
.wait_for(
3669 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
3670 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3671 kdu_instance
=kdu
.get("kdu-instance"),
3673 timeout
=timeout_ns_action
)
3674 elif primitive_name
== "status":
3675 detailed_status
= await asyncio
.wait_for(
3676 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
3677 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3678 kdu_instance
=kdu
.get("kdu-instance"),
3681 timeout
=timeout_ns_action
3684 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(kdu
["kdu-name"], nsr_id
)
3685 params
= self
._map
_primitive
_params
(config_primitive_desc
, primitive_params
, desc_params
)
3687 detailed_status
= await asyncio
.wait_for(
3688 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
3689 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3690 kdu_instance
=kdu_instance
,
3691 primitive_name
=primitive_name
,
3692 params
=params
, db_dict
=db_dict
,
3693 timeout
=timeout_ns_action
,
3696 timeout
=timeout_ns_action
3700 nslcmop_operation_state
= 'COMPLETED'
3702 detailed_status
= ''
3703 nslcmop_operation_state
= 'FAILED'
3705 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(nsr_deployed
["VCA"], member_vnf_index
=vnf_index
,
3706 vdu_id
=vdu_id
, vdu_count_index
=vdu_count_index
,
3707 ee_descriptor_id
=ee_descriptor_id
)
3708 for vca_index
, vca_deployed
in enumerate(db_nsr
['_admin']['deployed']['VCA']):
3709 if vca_deployed
.get("member-vnf-index") == vnf_index
:
3710 db_dict
= {"collection": "nsrs",
3711 "filter": {"_id": nsr_id
},
3712 "path": "_admin.deployed.VCA.{}.".format(vca_index
)}
3714 nslcmop_operation_state
, detailed_status
= await self
._ns
_execute
_primitive
(
3716 primitive
=primitive_name
,
3717 primitive_params
=self
._map
_primitive
_params
(config_primitive_desc
, primitive_params
, desc_params
),
3718 timeout
=timeout_ns_action
,
3724 db_nslcmop_update
["detailed-status"] = detailed_status
3725 error_description_nslcmop
= detailed_status
if nslcmop_operation_state
== "FAILED" else ""
3726 self
.logger
.debug(logging_text
+ " task Done with result {} {}".format(nslcmop_operation_state
,
3728 return # database update is called inside finally
3730 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
3731 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
3733 except asyncio
.CancelledError
:
3734 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(step
))
3735 exc
= "Operation was cancelled"
3736 except asyncio
.TimeoutError
:
3737 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
3739 except Exception as e
:
3740 exc
= traceback
.format_exc()
3741 self
.logger
.critical(logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
3744 db_nslcmop_update
["detailed-status"] = detailed_status
= error_description_nslcmop
= \
3745 "FAILED {}: {}".format(step
, exc
)
3746 nslcmop_operation_state
= "FAILED"
3748 self
._write
_ns
_status
(
3750 ns_state
=db_nsr
["nsState"], # TODO check if degraded. For the moment use previous status
3751 current_operation
="IDLE",
3752 current_operation_id
=None,
3753 # error_description=error_description_nsr,
3754 # error_detail=error_detail,
3755 other_update
=db_nsr_update
3758 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
="", error_message
=error_description_nslcmop
,
3759 operation_state
=nslcmop_operation_state
, other_update
=db_nslcmop_update
)
3761 if nslcmop_operation_state
:
3763 await self
.msg
.aiowrite("ns", "actioned", {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
,
3764 "operationState": nslcmop_operation_state
},
3766 except Exception as e
:
3767 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
3768 self
.logger
.debug(logging_text
+ "Exit")
3769 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
3770 return nslcmop_operation_state
, detailed_status
3772 async def scale(self
, nsr_id
, nslcmop_id
):
3773 # Try to lock HA task here
3774 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
3775 if not task_is_locked_by_me
:
3778 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
3779 stage
= ['', '', '']
3780 tasks_dict_info
= {}
3781 # ^ stage, step, VIM progress
3782 self
.logger
.debug(logging_text
+ "Enter")
3783 # get all needed from database
3785 db_nslcmop_update
= {}
3788 # in case of error, indicates what part of scale was failed to put nsr at error status
3789 scale_process
= None
3790 old_operational_status
= ""
3791 old_config_status
= ""
3794 # wait for any previous tasks in process
3795 step
= "Waiting for previous operations to terminate"
3796 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
3797 self
._write
_ns
_status
(nsr_id
=nsr_id
, ns_state
=None,
3798 current_operation
="SCALING", current_operation_id
=nslcmop_id
)
3800 step
= "Getting nslcmop from database"
3801 self
.logger
.debug(step
+ " after having waited for previous tasks to be completed")
3802 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
3804 step
= "Getting nsr from database"
3805 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3806 old_operational_status
= db_nsr
["operational-status"]
3807 old_config_status
= db_nsr
["config-status"]
3809 step
= "Parsing scaling parameters"
3810 db_nsr_update
["operational-status"] = "scaling"
3811 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3812 nsr_deployed
= db_nsr
["_admin"].get("deployed")
3815 nsr_deployed
= db_nsr
["_admin"].get("deployed")
3816 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
3817 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3818 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3819 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
3822 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
3823 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
3824 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
3825 # for backward compatibility
3826 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
3827 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
3828 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
3829 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3831 step
= "Getting vnfr from database"
3832 db_vnfr
= self
.db
.get_one("vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
})
3834 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
3836 step
= "Getting vnfd from database"
3837 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
3839 base_folder
= db_vnfd
["_admin"]["storage"]
3841 step
= "Getting scaling-group-descriptor"
3842 scaling_descriptor
= find_in_list(
3846 lambda scale_desc
: scale_desc
["name"] == scaling_group
3848 if not scaling_descriptor
:
3849 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
3850 "at vnfd:scaling-group-descriptor".format(scaling_group
))
3852 step
= "Sending scale order to VIM"
3853 # TODO check if ns is in a proper status
3855 if not db_nsr
["_admin"].get("scaling-group"):
3856 self
.update_db_2("nsrs", nsr_id
, {"_admin.scaling-group": [{"name": scaling_group
, "nb-scale-op": 0}]})
3857 admin_scale_index
= 0
3859 for admin_scale_index
, admin_scale_info
in enumerate(db_nsr
["_admin"]["scaling-group"]):
3860 if admin_scale_info
["name"] == scaling_group
:
3861 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
3863 else: # not found, set index one plus last element and add new entry with the name
3864 admin_scale_index
+= 1
3865 db_nsr_update
["_admin.scaling-group.{}.name".format(admin_scale_index
)] = scaling_group
3866 RO_scaling_info
= []
3867 VCA_scaling_info
= []
3868 vdu_scaling_info
= {"scaling_group_name": scaling_group
, "vdu": []}
3869 if scaling_type
== "SCALE_OUT":
3870 if "aspect-delta-details" not in scaling_descriptor
:
3872 "Aspect delta details not fount in scaling descriptor {}".format(
3873 scaling_descriptor
["name"]
3876 # count if max-instance-count is reached
3877 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
3879 vdu_scaling_info
["scaling_direction"] = "OUT"
3880 vdu_scaling_info
["vdu-create"] = {}
3881 for delta
in deltas
:
3882 for vdu_delta
in delta
["vdu-delta"]:
3883 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
3884 vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
3885 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(vdud
, db_vnfd
)
3887 additional_params
= self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"]) or {}
3888 cloud_init_list
= []
3890 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
3891 max_instance_count
= 10
3892 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
3893 max_instance_count
= vdu_profile
.get("max-number-of-instances", 10)
3895 default_instance_num
= get_number_of_instances(db_vnfd
, vdud
["id"])
3897 nb_scale_op
+= vdu_delta
.get("number-of-instances", 1)
3899 if nb_scale_op
+ default_instance_num
> max_instance_count
:
3901 "reached the limit of {} (max-instance-count) "
3902 "scaling-out operations for the "
3903 "scaling-group-descriptor '{}'".format(nb_scale_op
, scaling_group
)
3905 for x
in range(vdu_delta
.get("number-of-instances", 1)):
3907 # TODO Information of its own ip is not available because db_vnfr is not updated.
3908 additional_params
["OSM"] = get_osm_params(
3913 cloud_init_list
.append(
3914 self
._parse
_cloud
_init
(
3921 VCA_scaling_info
.append(
3923 "osm_vdu_id": vdu_delta
["id"],
3924 "member-vnf-index": vnf_index
,
3926 "vdu_index": vdu_index
+ x
3929 RO_scaling_info
.append(
3931 "osm_vdu_id": vdu_delta
["id"],
3932 "member-vnf-index": vnf_index
,
3934 "count": vdu_delta
.get("number-of-instances", 1)
3938 RO_scaling_info
[-1]["cloud_init"] = cloud_init_list
3939 vdu_scaling_info
["vdu-create"][vdu_delta
["id"]] = vdu_delta
.get("number-of-instances", 1)
3941 elif scaling_type
== "SCALE_IN":
3942 if "min-instance-count" in scaling_descriptor
and scaling_descriptor
["min-instance-count"] is not None:
3943 min_instance_count
= int(scaling_descriptor
["min-instance-count"])
3945 vdu_scaling_info
["scaling_direction"] = "IN"
3946 vdu_scaling_info
["vdu-delete"] = {}
3947 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
3948 for delta
in deltas
:
3949 for vdu_delta
in delta
["vdu-delta"]:
3950 vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
3951 min_instance_count
= 0
3952 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
3953 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
3954 min_instance_count
= vdu_profile
["min-number-of-instances"]
3956 default_instance_num
= get_number_of_instances(db_vnfd
, vdu_delta
["id"])
3958 nb_scale_op
-= vdu_delta
.get("number-of-instances", 1)
3959 if nb_scale_op
+ default_instance_num
< min_instance_count
:
3961 "reached the limit of {} (min-instance-count) scaling-in operations for the "
3962 "scaling-group-descriptor '{}'".format(nb_scale_op
, scaling_group
)
3964 RO_scaling_info
.append({"osm_vdu_id": vdu_delta
["id"], "member-vnf-index": vnf_index
,
3965 "type": "delete", "count": vdu_delta
.get("number-of-instances", 1),
3966 "vdu_index": vdu_index
- 1})
3967 for x
in range(vdu_delta
.get("number-of-instances", 1)):
3968 VCA_scaling_info
.append(
3970 "osm_vdu_id": vdu_delta
["id"],
3971 "member-vnf-index": vnf_index
,
3973 "vdu_index": vdu_index
- 1 - x
3976 vdu_scaling_info
["vdu-delete"][vdu_delta
["id"]] = vdu_delta
.get("number-of-instances", 1)
3978 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
3979 vdu_delete
= copy(vdu_scaling_info
.get("vdu-delete"))
3980 if vdu_scaling_info
["scaling_direction"] == "IN":
3981 for vdur
in reversed(db_vnfr
["vdur"]):
3982 if vdu_delete
.get(vdur
["vdu-id-ref"]):
3983 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
3984 vdu_scaling_info
["vdu"].append({
3985 "name": vdur
.get("name") or vdur
.get("vdu-name"),
3986 "vdu_id": vdur
["vdu-id-ref"],
3989 for interface
in vdur
["interfaces"]:
3990 vdu_scaling_info
["vdu"][-1]["interface"].append({
3991 "name": interface
["name"],
3992 "ip_address": interface
["ip-address"],
3993 "mac_address": interface
.get("mac-address"),
3995 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
3998 step
= "Executing pre-scale vnf-config-primitive"
3999 if scaling_descriptor
.get("scaling-config-action"):
4000 for scaling_config_action
in scaling_descriptor
["scaling-config-action"]:
4001 if (scaling_config_action
.get("trigger") == "pre-scale-in" and scaling_type
== "SCALE_IN") \
4002 or (scaling_config_action
.get("trigger") == "pre-scale-out" and scaling_type
== "SCALE_OUT"):
4003 vnf_config_primitive
= scaling_config_action
["vnf-config-primitive-name-ref"]
4004 step
= db_nslcmop_update
["detailed-status"] = \
4005 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive
)
4007 # look for primitive
4008 for config_primitive
in (get_configuration(
4009 db_vnfd
, db_vnfd
["id"]
4010 ) or {}).get("config-primitive", ()):
4011 if config_primitive
["name"] == vnf_config_primitive
:
4015 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
4016 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
4017 "primitive".format(scaling_group
, vnf_config_primitive
))
4019 vnfr_params
= {"VDU_SCALE_INFO": vdu_scaling_info
}
4020 if db_vnfr
.get("additionalParamsForVnf"):
4021 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
4023 scale_process
= "VCA"
4024 db_nsr_update
["config-status"] = "configuring pre-scaling"
4025 primitive_params
= self
._map
_primitive
_params
(config_primitive
, {}, vnfr_params
)
4027 # Pre-scale retry check: Check if this sub-operation has been executed before
4028 op_index
= self
._check
_or
_add
_scale
_suboperation
(
4029 db_nslcmop
, nslcmop_id
, vnf_index
, vnf_config_primitive
, primitive_params
, 'PRE-SCALE')
4030 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
4031 # Skip sub-operation
4032 result
= 'COMPLETED'
4033 result_detail
= 'Done'
4034 self
.logger
.debug(logging_text
+
4035 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
4036 vnf_config_primitive
, result
, result_detail
))
4038 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
4039 # New sub-operation: Get index of this sub-operation
4040 op_index
= len(db_nslcmop
.get('_admin', {}).get('operations')) - 1
4041 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} New sub-operation".
4042 format(vnf_config_primitive
))
4044 # retry: Get registered params for this existing sub-operation
4045 op
= db_nslcmop
.get('_admin', {}).get('operations', [])[op_index
]
4046 vnf_index
= op
.get('member_vnf_index')
4047 vnf_config_primitive
= op
.get('primitive')
4048 primitive_params
= op
.get('primitive_params')
4049 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Sub-operation retry".
4050 format(vnf_config_primitive
))
4051 # Execute the primitive, either with new (first-time) or registered (reintent) args
4052 ee_descriptor_id
= config_primitive
.get("execution-environment-ref")
4053 primitive_name
= config_primitive
.get("execution-environment-primitive",
4054 vnf_config_primitive
)
4055 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(nsr_deployed
["VCA"],
4056 member_vnf_index
=vnf_index
,
4058 vdu_count_index
=None,
4059 ee_descriptor_id
=ee_descriptor_id
)
4060 result
, result_detail
= await self
._ns
_execute
_primitive
(
4061 ee_id
, primitive_name
,
4066 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Done with result {} {}".format(
4067 vnf_config_primitive
, result
, result_detail
))
4068 # Update operationState = COMPLETED | FAILED
4069 self
._update
_suboperation
_status
(
4070 db_nslcmop
, op_index
, result
, result_detail
)
4072 if result
== "FAILED":
4073 raise LcmException(result_detail
)
4074 db_nsr_update
["config-status"] = old_config_status
4075 scale_process
= None
4078 db_nsr_update
["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)] = nb_scale_op
4079 db_nsr_update
["_admin.scaling-group.{}.time".format(admin_scale_index
)] = time()
4081 # SCALE-IN VCA - BEGIN
4082 if VCA_scaling_info
:
4083 step
= db_nslcmop_update
["detailed-status"] = \
4084 "Deleting the execution environments"
4085 scale_process
= "VCA"
4086 for vdu_info
in VCA_scaling_info
:
4087 if vdu_info
["type"] == "delete":
4088 member_vnf_index
= str(vdu_info
["member-vnf-index"])
4089 self
.logger
.debug(logging_text
+ "vdu info: {}".format(vdu_info
))
4090 vdu_id
= vdu_info
["osm_vdu_id"]
4091 vdu_index
= int(vdu_info
["vdu_index"])
4092 stage
[1] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
4093 member_vnf_index
, vdu_id
, vdu_index
)
4094 stage
[2] = step
= "Scaling in VCA"
4095 self
._write
_op
_status
(
4099 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
4100 config_update
= db_nsr
["configurationStatus"]
4101 for vca_index
, vca
in enumerate(vca_update
):
4102 if (vca
or vca
.get("ee_id")) and vca
["member-vnf-index"] == member_vnf_index
and \
4103 vca
["vdu_count_index"] == vdu_index
:
4104 if vca
.get("vdu_id"):
4105 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4106 elif vca
.get("kdu_name"):
4107 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4109 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4110 operation_params
= db_nslcmop
.get("operationParams") or {}
4111 exec_terminate_primitives
= (not operation_params
.get("skip_terminate_primitives") and
4112 vca
.get("needed_terminate"))
4113 task
= asyncio
.ensure_future(
4122 exec_primitives
=exec_terminate_primitives
,
4126 timeout
=self
.timeout_charm_delete
4129 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4130 del vca_update
[vca_index
]
4131 del config_update
[vca_index
]
4132 # wait for pending tasks of terminate primitives
4134 self
.logger
.debug(logging_text
+
4135 'Waiting for tasks {}'.format(list(tasks_dict_info
.keys())))
4136 error_list
= await self
._wait
_for
_tasks
(logging_text
, tasks_dict_info
,
4137 min(self
.timeout_charm_delete
,
4138 self
.timeout_ns_terminate
),
4140 tasks_dict_info
.clear()
4142 raise LcmException("; ".join(error_list
))
4144 db_vca_and_config_update
= {
4145 "_admin.deployed.VCA": vca_update
,
4146 "configurationStatus": config_update
4148 self
.update_db_2("nsrs", db_nsr
["_id"], db_vca_and_config_update
)
4149 scale_process
= None
4150 # SCALE-IN VCA - END
4154 scale_process
= "RO"
4155 if self
.ro_config
.get("ng"):
4156 await self
._scale
_ng
_ro
(logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
)
4157 vdu_scaling_info
.pop("vdu-create", None)
4158 vdu_scaling_info
.pop("vdu-delete", None)
4160 scale_process
= None
4162 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4165 # SCALE-UP VCA - BEGIN
4166 if VCA_scaling_info
:
4167 step
= db_nslcmop_update
["detailed-status"] = \
4168 "Creating new execution environments"
4169 scale_process
= "VCA"
4170 for vdu_info
in VCA_scaling_info
:
4171 if vdu_info
["type"] == "create":
4172 member_vnf_index
= str(vdu_info
["member-vnf-index"])
4173 self
.logger
.debug(logging_text
+ "vdu info: {}".format(vdu_info
))
4174 vnfd_id
= db_vnfr
["vnfd-ref"]
4175 vdu_index
= int(vdu_info
["vdu_index"])
4176 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
4177 if db_vnfr
.get("additionalParamsForVnf"):
4178 deploy_params
.update(parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy()))
4179 descriptor_config
= get_configuration(db_vnfd
, db_vnfd
["id"])
4180 if descriptor_config
:
4185 logging_text
=logging_text
+ "member_vnf_index={} ".format(member_vnf_index
),
4188 nslcmop_id
=nslcmop_id
,
4194 member_vnf_index
=member_vnf_index
,
4195 vdu_index
=vdu_index
,
4197 deploy_params
=deploy_params
,
4198 descriptor_config
=descriptor_config
,
4199 base_folder
=base_folder
,
4200 task_instantiation_info
=tasks_dict_info
,
4203 vdu_id
= vdu_info
["osm_vdu_id"]
4204 vdur
= find_in_list(db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
)
4205 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
4206 if vdur
.get("additionalParams"):
4207 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
4209 deploy_params_vdu
= deploy_params
4210 deploy_params_vdu
["OSM"] = get_osm_params(db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
)
4211 if descriptor_config
:
4214 stage
[1] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
4215 member_vnf_index
, vdu_id
, vdu_index
)
4216 stage
[2] = step
= "Scaling out VCA"
4217 self
._write
_op
_status
(
4222 logging_text
=logging_text
+ "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
4223 member_vnf_index
, vdu_id
, vdu_index
),
4226 nslcmop_id
=nslcmop_id
,
4232 member_vnf_index
=member_vnf_index
,
4233 vdu_index
=vdu_index
,
4235 deploy_params
=deploy_params_vdu
,
4236 descriptor_config
=descriptor_config
,
4237 base_folder
=base_folder
,
4238 task_instantiation_info
=tasks_dict_info
,
4241 # SCALE-UP VCA - END
4242 scale_process
= None
4245 # execute primitive service POST-SCALING
4246 step
= "Executing post-scale vnf-config-primitive"
4247 if scaling_descriptor
.get("scaling-config-action"):
4248 for scaling_config_action
in scaling_descriptor
["scaling-config-action"]:
4249 if (scaling_config_action
.get("trigger") == "post-scale-in" and scaling_type
== "SCALE_IN") \
4250 or (scaling_config_action
.get("trigger") == "post-scale-out" and scaling_type
== "SCALE_OUT"):
4251 vnf_config_primitive
= scaling_config_action
["vnf-config-primitive-name-ref"]
4252 step
= db_nslcmop_update
["detailed-status"] = \
4253 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive
)
4255 vnfr_params
= {"VDU_SCALE_INFO": vdu_scaling_info
}
4256 if db_vnfr
.get("additionalParamsForVnf"):
4257 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
4259 # look for primitive
4260 for config_primitive
in (
4261 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
4262 ).get("config-primitive", ()):
4263 if config_primitive
["name"] == vnf_config_primitive
:
4267 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
4268 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
4269 "config-primitive".format(scaling_group
, vnf_config_primitive
))
4270 scale_process
= "VCA"
4271 db_nsr_update
["config-status"] = "configuring post-scaling"
4272 primitive_params
= self
._map
_primitive
_params
(config_primitive
, {}, vnfr_params
)
4274 # Post-scale retry check: Check if this sub-operation has been executed before
4275 op_index
= self
._check
_or
_add
_scale
_suboperation
(
4276 db_nslcmop
, nslcmop_id
, vnf_index
, vnf_config_primitive
, primitive_params
, 'POST-SCALE')
4277 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
4278 # Skip sub-operation
4279 result
= 'COMPLETED'
4280 result_detail
= 'Done'
4281 self
.logger
.debug(logging_text
+
4282 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
4283 format(vnf_config_primitive
, result
, result_detail
))
4285 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
4286 # New sub-operation: Get index of this sub-operation
4287 op_index
= len(db_nslcmop
.get('_admin', {}).get('operations')) - 1
4288 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} New sub-operation".
4289 format(vnf_config_primitive
))
4291 # retry: Get registered params for this existing sub-operation
4292 op
= db_nslcmop
.get('_admin', {}).get('operations', [])[op_index
]
4293 vnf_index
= op
.get('member_vnf_index')
4294 vnf_config_primitive
= op
.get('primitive')
4295 primitive_params
= op
.get('primitive_params')
4296 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Sub-operation retry".
4297 format(vnf_config_primitive
))
4298 # Execute the primitive, either with new (first-time) or registered (reintent) args
4299 ee_descriptor_id
= config_primitive
.get("execution-environment-ref")
4300 primitive_name
= config_primitive
.get("execution-environment-primitive",
4301 vnf_config_primitive
)
4302 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(nsr_deployed
["VCA"],
4303 member_vnf_index
=vnf_index
,
4305 vdu_count_index
=None,
4306 ee_descriptor_id
=ee_descriptor_id
)
4307 result
, result_detail
= await self
._ns
_execute
_primitive
(
4314 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Done with result {} {}".format(
4315 vnf_config_primitive
, result
, result_detail
))
4316 # Update operationState = COMPLETED | FAILED
4317 self
._update
_suboperation
_status
(
4318 db_nslcmop
, op_index
, result
, result_detail
)
4320 if result
== "FAILED":
4321 raise LcmException(result_detail
)
4322 db_nsr_update
["config-status"] = old_config_status
4323 scale_process
= None
4326 db_nsr_update
["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
4327 db_nsr_update
["operational-status"] = "running" if old_operational_status
== "failed" \
4328 else old_operational_status
4329 db_nsr_update
["config-status"] = old_config_status
4331 except (ROclient
.ROClientException
, DbException
, LcmException
, NgRoException
) as e
:
4332 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4334 except asyncio
.CancelledError
:
4335 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(step
))
4336 exc
= "Operation was cancelled"
4337 except Exception as e
:
4338 exc
= traceback
.format_exc()
4339 self
.logger
.critical(logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
4341 self
._write
_ns
_status
(nsr_id
=nsr_id
, ns_state
=None, current_operation
="IDLE", current_operation_id
=None)
4343 stage
[1] = "Waiting for instantiate pending tasks."
4344 self
.logger
.debug(logging_text
+ stage
[1])
4345 exc
= await self
._wait
_for
_tasks
(logging_text
, tasks_dict_info
, self
.timeout_ns_deploy
,
4346 stage
, nslcmop_id
, nsr_id
=nsr_id
)
4348 db_nslcmop_update
["detailed-status"] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
4349 nslcmop_operation_state
= "FAILED"
4351 db_nsr_update
["operational-status"] = old_operational_status
4352 db_nsr_update
["config-status"] = old_config_status
4353 db_nsr_update
["detailed-status"] = ""
4355 if "VCA" in scale_process
:
4356 db_nsr_update
["config-status"] = "failed"
4357 if "RO" in scale_process
:
4358 db_nsr_update
["operational-status"] = "failed"
4359 db_nsr_update
["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id
, step
,
4362 error_description_nslcmop
= None
4363 nslcmop_operation_state
= "COMPLETED"
4364 db_nslcmop_update
["detailed-status"] = "Done"
4366 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
="", error_message
=error_description_nslcmop
,
4367 operation_state
=nslcmop_operation_state
, other_update
=db_nslcmop_update
)
4369 self
._write
_ns
_status
(nsr_id
=nsr_id
, ns_state
=None, current_operation
="IDLE",
4370 current_operation_id
=None, other_update
=db_nsr_update
)
4372 if nslcmop_operation_state
:
4374 msg
= {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
, "operationState": nslcmop_operation_state
}
4375 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
4376 except Exception as e
:
4377 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
4378 self
.logger
.debug(logging_text
+ "Exit")
4379 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
4381 async def _scale_ng_ro(self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
):
4382 nsr_id
= db_nslcmop
["nsInstanceId"]
4383 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
4386 # read from db: vnfd's for every vnf
4389 # for each vnf in ns, read vnfd
4390 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
4391 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
4392 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
4393 # if we haven't this vnfd, read it from db
4394 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
4396 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4397 db_vnfds
.append(vnfd
)
4398 n2vc_key
= self
.n2vc
.get_public_key()
4399 n2vc_key_list
= [n2vc_key
]
4400 self
.scale_vnfr(db_vnfr
, vdu_scaling_info
.get("vdu-create"), vdu_scaling_info
.get("vdu-delete"),
4402 # db_vnfr has been updated, update db_vnfrs to use it
4403 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
4404 await self
._instantiate
_ng
_ro
(logging_text
, nsr_id
, db_nsd
, db_nsr
, db_nslcmop
, db_vnfrs
,
4405 db_vnfds
, n2vc_key_list
, stage
=stage
, start_deploy
=time(),
4406 timeout_ns_deploy
=self
.timeout_ns_deploy
)
4407 if vdu_scaling_info
.get("vdu-delete"):
4408 self
.scale_vnfr(db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False)
4410 async def add_prometheus_metrics(self
, ee_id
, artifact_path
, ee_config_descriptor
, vnfr_id
, nsr_id
, target_ip
):
4411 if not self
.prometheus
:
4413 # look if exist a file called 'prometheus*.j2' and
4414 artifact_content
= self
.fs
.dir_ls(artifact_path
)
4415 job_file
= next((f
for f
in artifact_content
if f
.startswith("prometheus") and f
.endswith(".j2")), None)
4418 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
4422 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
4423 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
4425 vnfr_id
= vnfr_id
.replace("-", "")
4427 "JOB_NAME": vnfr_id
,
4428 "TARGET_IP": target_ip
,
4429 "EXPORTER_POD_IP": host_name
,
4430 "EXPORTER_POD_PORT": host_port
,
4432 job_list
= self
.prometheus
.parse_job(job_data
, variables
)
4433 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
4434 for job
in job_list
:
4435 if not isinstance(job
.get("job_name"), str) or vnfr_id
not in job
["job_name"]:
4436 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
4437 job
["nsr_id"] = nsr_id
4438 job_dict
= {jl
["job_name"]: jl
for jl
in job_list
}
4439 if await self
.prometheus
.update(job_dict
):
4440 return list(job_dict
.keys())
4442 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
4444 Get VCA Cloud and VCA Cloud Credentials for the VIM account
4446 :param: vim_account_id: VIM Account ID
4448 :return: (cloud_name, cloud_credential)
4450 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
4451 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
4453 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
4455 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
4457 :param: vim_account_id: VIM Account ID
4459 :return: (cloud_name, cloud_credential)
4461 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
4462 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")