1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
22 import logging
.handlers
25 from jinja2
import Environment
, TemplateError
, TemplateNotFound
, StrictUndefined
, UndefinedError
27 from osm_lcm
import ROclient
28 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
29 from osm_lcm
.lcm_utils
import LcmException
, LcmExceptionNoMgmtIP
, LcmBase
, deep_get
, get_iterable
, populate_dict
30 from osm_lcm
.data_utils
.nsd
import get_vnf_profiles
31 from osm_lcm
.data_utils
.vnfd
import get_vdu_list
, get_vdu_profile
, \
32 get_ee_sorted_initial_config_primitive_list
, get_ee_sorted_terminate_config_primitive_list
, \
33 get_kdu_list
, get_virtual_link_profiles
, get_vdu
, get_configuration
, \
34 get_vdu_index
, get_scaling_aspect
, get_number_of_instances
, get_juju_ee_ref
35 from osm_lcm
.data_utils
.list_utils
import find_in_list
36 from osm_lcm
.data_utils
.vnfr
import get_osm_params
, get_vdur_index
37 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
38 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
39 from n2vc
.k8s_helm_conn
import K8sHelmConnector
40 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
41 from n2vc
.k8s_juju_conn
import K8sJujuConnector
43 from osm_common
.dbbase
import DbException
44 from osm_common
.fsbase
import FsException
46 from osm_lcm
.data_utils
.database
.database
import Database
47 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
49 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
50 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
52 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
54 from copy
import copy
, deepcopy
56 from uuid
import uuid4
58 from random
import randint
60 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
64 timeout_vca_on_error
= 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
65 timeout_ns_deploy
= 2 * 3600 # default global timeout for deployment a ns
66 timeout_ns_terminate
= 1800 # default global timeout for un deployment a ns
67 timeout_charm_delete
= 10 * 60
68 timeout_primitive
= 30 * 60 # timeout for primitive execution
69 timeout_progress_primitive
= 10 * 60 # timeout for some progress in a primitive execution
71 SUBOPERATION_STATUS_NOT_FOUND
= -1
72 SUBOPERATION_STATUS_NEW
= -2
73 SUBOPERATION_STATUS_SKIP
= -3
74 task_name_deploy_vca
= "Deploying VCA"
76 def __init__(self
, msg
, lcm_tasks
, config
, loop
, prometheus
=None):
78 Init, Connect to database, filesystem storage, and messaging
79 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
84 logger
=logging
.getLogger('lcm.ns')
87 self
.db
= Database().instance
.db
88 self
.fs
= Filesystem().instance
.fs
90 self
.lcm_tasks
= lcm_tasks
91 self
.timeout
= config
["timeout"]
92 self
.ro_config
= config
["ro_config"]
93 self
.ng_ro
= config
["ro_config"].get("ng")
94 self
.vca_config
= config
["VCA"].copy()
96 # create N2VC connector
97 self
.n2vc
= N2VCJujuConnector(
100 url
='{}:{}'.format(self
.vca_config
['host'], self
.vca_config
['port']),
101 username
=self
.vca_config
.get('user', None),
102 vca_config
=self
.vca_config
,
103 on_update_db
=self
._on
_update
_n
2vc
_db
,
108 self
.conn_helm_ee
= LCMHelmConn(
113 vca_config
=self
.vca_config
,
114 on_update_db
=self
._on
_update
_n
2vc
_db
117 self
.k8sclusterhelm2
= K8sHelmConnector(
118 kubectl_command
=self
.vca_config
.get("kubectlpath"),
119 helm_command
=self
.vca_config
.get("helmpath"),
126 self
.k8sclusterhelm3
= K8sHelm3Connector(
127 kubectl_command
=self
.vca_config
.get("kubectlpath"),
128 helm_command
=self
.vca_config
.get("helm3path"),
135 self
.k8sclusterjuju
= K8sJujuConnector(
136 kubectl_command
=self
.vca_config
.get("kubectlpath"),
137 juju_command
=self
.vca_config
.get("jujupath"),
140 on_update_db
=self
._on
_update
_k
8s
_db
,
141 vca_config
=self
.vca_config
,
146 self
.k8scluster_map
= {
147 "helm-chart": self
.k8sclusterhelm2
,
148 "helm-chart-v3": self
.k8sclusterhelm3
,
149 "chart": self
.k8sclusterhelm3
,
150 "juju-bundle": self
.k8sclusterjuju
,
151 "juju": self
.k8sclusterjuju
,
155 "lxc_proxy_charm": self
.n2vc
,
156 "native_charm": self
.n2vc
,
157 "k8s_proxy_charm": self
.n2vc
,
158 "helm": self
.conn_helm_ee
,
159 "helm-v3": self
.conn_helm_ee
162 self
.prometheus
= prometheus
165 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
)
168 def increment_ip_mac(ip_mac
, vm_index
=1):
169 if not isinstance(ip_mac
, str):
172 # try with ipv4 look for last dot
173 i
= ip_mac
.rfind(".")
176 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
177 # try with ipv6 or mac look for last colon. Operate in hex
178 i
= ip_mac
.rfind(":")
181 # format in hex, len can be 2 for mac or 4 for ipv6
182 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
)
187 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
189 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
192 # TODO filter RO descriptor fields...
196 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
197 db_dict
['deploymentStatus'] = ro_descriptor
198 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
200 except Exception as e
:
201 self
.logger
.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id
, e
))
203 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
):
205 # remove last dot from path (if exists)
206 if path
.endswith('.'):
209 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
210 # .format(table, filter, path, updated_data))
213 nsr_id
= filter.get('_id')
215 # read ns record from database
216 nsr
= self
.db
.get_one(table
='nsrs', q_filter
=filter)
217 current_ns_status
= nsr
.get('nsState')
219 # get vca status for NS
220 status_dict
= await self
.n2vc
.get_status(namespace
='.' + nsr_id
, yaml_format
=False)
224 db_dict
['vcaStatus'] = status_dict
225 await self
.n2vc
.update_vca_status(db_dict
['vcaStatus'])
227 # update configurationStatus for this VCA
229 vca_index
= int(path
[path
.rfind(".")+1:])
231 vca_list
= deep_get(target_dict
=nsr
, key_list
=('_admin', 'deployed', 'VCA'))
232 vca_status
= vca_list
[vca_index
].get('status')
234 configuration_status_list
= nsr
.get('configurationStatus')
235 config_status
= configuration_status_list
[vca_index
].get('status')
237 if config_status
== 'BROKEN' and vca_status
!= 'failed':
238 db_dict
['configurationStatus'][vca_index
] = 'READY'
239 elif config_status
!= 'BROKEN' and vca_status
== 'failed':
240 db_dict
['configurationStatus'][vca_index
] = 'BROKEN'
241 except Exception as e
:
242 # not update configurationStatus
243 self
.logger
.debug('Error updating vca_index (ignore): {}'.format(e
))
245 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
246 # if nsState = 'DEGRADED' check if all is OK
248 if current_ns_status
in ('READY', 'DEGRADED'):
249 error_description
= ''
251 if status_dict
.get('machines'):
252 for machine_id
in status_dict
.get('machines'):
253 machine
= status_dict
.get('machines').get(machine_id
)
254 # check machine agent-status
255 if machine
.get('agent-status'):
256 s
= machine
.get('agent-status').get('status')
259 error_description
+= 'machine {} agent-status={} ; '.format(machine_id
, s
)
260 # check machine instance status
261 if machine
.get('instance-status'):
262 s
= machine
.get('instance-status').get('status')
265 error_description
+= 'machine {} instance-status={} ; '.format(machine_id
, s
)
267 if status_dict
.get('applications'):
268 for app_id
in status_dict
.get('applications'):
269 app
= status_dict
.get('applications').get(app_id
)
270 # check application status
271 if app
.get('status'):
272 s
= app
.get('status').get('status')
275 error_description
+= 'application {} status={} ; '.format(app_id
, s
)
277 if error_description
:
278 db_dict
['errorDescription'] = error_description
279 if current_ns_status
== 'READY' and is_degraded
:
280 db_dict
['nsState'] = 'DEGRADED'
281 if current_ns_status
== 'DEGRADED' and not is_degraded
:
282 db_dict
['nsState'] = 'READY'
285 self
.update_db_2("nsrs", nsr_id
, db_dict
)
287 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
289 except Exception as e
:
290 self
.logger
.warn('Error updating NS state for ns={}: {}'.format(nsr_id
, e
))
292 async def _on_update_k8s_db(self
, cluster_uuid
, kdu_instance
, filter=None):
294 Updating vca status in NSR record
295 :param cluster_uuid: UUID of a k8s cluster
296 :param kdu_instance: The unique name of the KDU instance
297 :param filter: To get nsr_id
301 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
302 # .format(cluster_uuid, kdu_instance, filter))
305 nsr_id
= filter.get('_id')
307 # get vca status for NS
308 vca_status
= await self
.k8sclusterjuju
.status_kdu(cluster_uuid
,
310 complete_status
=True,
314 db_dict
['vcaStatus'] = {nsr_id
: vca_status
}
316 await self
.k8sclusterjuju
.update_vca_status(db_dict
['vcaStatus'], kdu_instance
)
319 self
.update_db_2("nsrs", nsr_id
, db_dict
)
321 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
323 except Exception as e
:
324 self
.logger
.warn('Error updating NS state for ns={}: {}'.format(nsr_id
, e
))
327 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
329 env
= Environment(undefined
=StrictUndefined
)
330 template
= env
.from_string(cloud_init_text
)
331 return template
.render(additional_params
or {})
332 except UndefinedError
as e
:
333 raise LcmException("Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
334 "file, must be provided in the instantiation parameters inside the "
335 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
))
336 except (TemplateError
, TemplateNotFound
) as e
:
337 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
338 format(vnfd_id
, vdu_id
, e
))
340 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
341 cloud_init_content
= cloud_init_file
= None
343 if vdu
.get("cloud-init-file"):
344 base_folder
= vnfd
["_admin"]["storage"]
345 cloud_init_file
= "{}/{}/cloud_init/{}".format(base_folder
["folder"], base_folder
["pkg-dir"],
346 vdu
["cloud-init-file"])
347 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
348 cloud_init_content
= ci_file
.read()
349 elif vdu
.get("cloud-init"):
350 cloud_init_content
= vdu
["cloud-init"]
352 return cloud_init_content
353 except FsException
as e
:
354 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
355 format(vnfd
["id"], vdu
["id"], cloud_init_file
, e
))
357 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
358 vdur
= next(vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"])
359 additional_params
= vdur
.get("additionalParams")
360 return parse_yaml_strings(additional_params
)
362 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
364 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
365 :param vnfd: input vnfd
366 :param new_id: overrides vnf id if provided
367 :param additionalParams: Instantiation params for VNFs provided
368 :param nsrId: Id of the NSR
369 :return: copy of vnfd
371 vnfd_RO
= deepcopy(vnfd
)
372 # remove unused by RO configuration, monitoring, scaling and internal keys
373 vnfd_RO
.pop("_id", None)
374 vnfd_RO
.pop("_admin", None)
375 vnfd_RO
.pop("monitoring-param", None)
376 vnfd_RO
.pop("scaling-group-descriptor", None)
377 vnfd_RO
.pop("kdu", None)
378 vnfd_RO
.pop("k8s-cluster", None)
380 vnfd_RO
["id"] = new_id
382 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
383 for vdu
in get_iterable(vnfd_RO
, "vdu"):
384 vdu
.pop("cloud-init-file", None)
385 vdu
.pop("cloud-init", None)
389 def ip_profile_2_RO(ip_profile
):
390 RO_ip_profile
= deepcopy(ip_profile
)
391 if "dns-server" in RO_ip_profile
:
392 if isinstance(RO_ip_profile
["dns-server"], list):
393 RO_ip_profile
["dns-address"] = []
394 for ds
in RO_ip_profile
.pop("dns-server"):
395 RO_ip_profile
["dns-address"].append(ds
['address'])
397 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
398 if RO_ip_profile
.get("ip-version") == "ipv4":
399 RO_ip_profile
["ip-version"] = "IPv4"
400 if RO_ip_profile
.get("ip-version") == "ipv6":
401 RO_ip_profile
["ip-version"] = "IPv6"
402 if "dhcp-params" in RO_ip_profile
:
403 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
406 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
407 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
408 if db_vim
["_admin"]["operationalState"] != "ENABLED":
409 raise LcmException("VIM={} is not available. operationalState={}".format(
410 vim_account
, db_vim
["_admin"]["operationalState"]))
411 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
414 def get_ro_wim_id_for_wim_account(self
, wim_account
):
415 if isinstance(wim_account
, str):
416 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
417 if db_wim
["_admin"]["operationalState"] != "ENABLED":
418 raise LcmException("WIM={} is not available. operationalState={}".format(
419 wim_account
, db_wim
["_admin"]["operationalState"]))
420 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
425 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
427 db_vdu_push_list
= []
428 db_update
= {"_admin.modified": time()}
430 for vdu_id
, vdu_count
in vdu_create
.items():
431 vdur
= next((vdur
for vdur
in reversed(db_vnfr
["vdur"]) if vdur
["vdu-id-ref"] == vdu_id
), None)
433 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".
436 for count
in range(vdu_count
):
437 vdur_copy
= deepcopy(vdur
)
438 vdur_copy
["status"] = "BUILD"
439 vdur_copy
["status-detailed"] = None
440 vdur_copy
["ip-address"]: None
441 vdur_copy
["_id"] = str(uuid4())
442 vdur_copy
["count-index"] += count
+ 1
443 vdur_copy
["id"] = "{}-{}".format(vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"])
444 vdur_copy
.pop("vim_info", None)
445 for iface
in vdur_copy
["interfaces"]:
446 if iface
.get("fixed-ip"):
447 iface
["ip-address"] = self
.increment_ip_mac(iface
["ip-address"], count
+1)
449 iface
.pop("ip-address", None)
450 if iface
.get("fixed-mac"):
451 iface
["mac-address"] = self
.increment_ip_mac(iface
["mac-address"], count
+1)
453 iface
.pop("mac-address", None)
454 iface
.pop("mgmt_vnf", None) # only first vdu can be managment of vnf
455 db_vdu_push_list
.append(vdur_copy
)
456 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
458 for vdu_id
, vdu_count
in vdu_delete
.items():
460 indexes_to_delete
= [iv
[0] for iv
in enumerate(db_vnfr
["vdur"]) if iv
[1]["vdu-id-ref"] == vdu_id
]
461 db_update
.update({"vdur.{}.status".format(i
): "DELETING" for i
in indexes_to_delete
[-vdu_count
:]})
463 # it must be deleted one by one because common.db does not allow otherwise
464 vdus_to_delete
= [v
for v
in reversed(db_vnfr
["vdur"]) if v
["vdu-id-ref"] == vdu_id
]
465 for vdu
in vdus_to_delete
[:vdu_count
]:
466 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, None, pull
={"vdur": {"_id": vdu
["_id"]}})
467 db_push
= {"vdur": db_vdu_push_list
} if db_vdu_push_list
else None
468 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
469 # modify passed dictionary db_vnfr
470 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
471 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
473 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
475 Updates database nsr with the RO info for the created vld
476 :param ns_update_nsr: dictionary to be filled with the updated info
477 :param db_nsr: content of db_nsr. This is also modified
478 :param nsr_desc_RO: nsr descriptor from RO
479 :return: Nothing, LcmException is raised on errors
482 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
483 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
484 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
486 vld
["vim-id"] = net_RO
.get("vim_net_id")
487 vld
["name"] = net_RO
.get("vim_name")
488 vld
["status"] = net_RO
.get("status")
489 vld
["status-detailed"] = net_RO
.get("error_msg")
490 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
493 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld
["id"]))
495 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
497 for db_vnfr
in db_vnfrs
.values():
498 vnfr_update
= {"status": "ERROR"}
499 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
500 if "status" not in vdur
:
501 vdur
["status"] = "ERROR"
502 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
504 vdur
["status-detailed"] = str(error_text
)
505 vnfr_update
["vdur.{}.status-detailed".format(vdu_index
)] = "ERROR"
506 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
507 except DbException
as e
:
508 self
.logger
.error("Cannot update vnf. {}".format(e
))
510 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
512 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
513 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
514 :param nsr_desc_RO: nsr descriptor from RO
515 :return: Nothing, LcmException is raised on errors
517 for vnf_index
, db_vnfr
in db_vnfrs
.items():
518 for vnf_RO
in nsr_desc_RO
["vnfs"]:
519 if vnf_RO
["member_vnf_index"] != vnf_index
:
522 if vnf_RO
.get("ip_address"):
523 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
["ip_address"].split(";")[0]
524 elif not db_vnfr
.get("ip-address"):
525 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
526 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index
))
528 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
529 vdur_RO_count_index
= 0
530 if vdur
.get("pdu-type"):
532 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
533 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
535 if vdur
["count-index"] != vdur_RO_count_index
:
536 vdur_RO_count_index
+= 1
538 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
539 if vdur_RO
.get("ip_address"):
540 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
542 vdur
["ip-address"] = None
543 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
544 vdur
["name"] = vdur_RO
.get("vim_name")
545 vdur
["status"] = vdur_RO
.get("status")
546 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
547 for ifacer
in get_iterable(vdur
, "interfaces"):
548 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
549 if ifacer
["name"] == interface_RO
.get("internal_name"):
550 ifacer
["ip-address"] = interface_RO
.get("ip_address")
551 ifacer
["mac-address"] = interface_RO
.get("mac_address")
554 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
556 .format(vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]))
557 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
560 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
561 "VIM info".format(vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]))
563 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
564 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
565 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
567 vld
["vim-id"] = net_RO
.get("vim_net_id")
568 vld
["name"] = net_RO
.get("vim_name")
569 vld
["status"] = net_RO
.get("status")
570 vld
["status-detailed"] = net_RO
.get("error_msg")
571 vnfr_update
["vld.{}".format(vld_index
)] = vld
574 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
575 vnf_index
, vld
["id"]))
577 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
581 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index
))
583 def _get_ns_config_info(self
, nsr_id
):
585 Generates a mapping between vnf,vdu elements and the N2VC id
586 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
587 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
588 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
589 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
591 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
592 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
594 ns_config_info
= {"osm-config-mapping": mapping
}
595 for vca
in vca_deployed_list
:
596 if not vca
["member-vnf-index"]:
598 if not vca
["vdu_id"]:
599 mapping
[vca
["member-vnf-index"]] = vca
["application"]
601 mapping
["{}.{}.{}".format(vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"])] =\
603 return ns_config_info
605 async def _instantiate_ng_ro(self
, logging_text
, nsr_id
, nsd
, db_nsr
, db_nslcmop
, db_vnfrs
, db_vnfds
,
606 n2vc_key_list
, stage
, start_deploy
, timeout_ns_deploy
):
610 def get_vim_account(vim_account_id
):
612 if vim_account_id
in db_vims
:
613 return db_vims
[vim_account_id
]
614 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
615 db_vims
[vim_account_id
] = db_vim
618 # modify target_vld info with instantiation parameters
619 def parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, target_sdn
):
620 if vld_params
.get("ip-profile"):
621 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
["ip-profile"]
622 if vld_params
.get("provider-network"):
623 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
["provider-network"]
624 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
625 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
["provider-network"]["sdn-ports"]
626 if vld_params
.get("wimAccountId"):
627 target_wim
= "wim:{}".format(vld_params
["wimAccountId"])
628 target_vld
["vim_info"][target_wim
] = {}
629 for param
in ("vim-network-name", "vim-network-id"):
630 if vld_params
.get(param
):
631 if isinstance(vld_params
[param
], dict):
632 for vim
, vim_net
in vld_params
[param
].items():
633 other_target_vim
= "vim:" + vim
634 populate_dict(target_vld
["vim_info"], (other_target_vim
, param
.replace("-", "_")), vim_net
)
635 else: # isinstance str
636 target_vld
["vim_info"][target_vim
][param
.replace("-", "_")] = vld_params
[param
]
637 if vld_params
.get("common_id"):
638 target_vld
["common_id"] = vld_params
.get("common_id")
640 nslcmop_id
= db_nslcmop
["_id"]
642 "name": db_nsr
["name"],
645 "image": deepcopy(db_nsr
["image"]),
646 "flavor": deepcopy(db_nsr
["flavor"]),
647 "action_id": nslcmop_id
,
648 "cloud_init_content": {},
650 for image
in target
["image"]:
651 image
["vim_info"] = {}
652 for flavor
in target
["flavor"]:
653 flavor
["vim_info"] = {}
655 if db_nslcmop
.get("lcmOperationType") != "instantiate":
656 # get parameters of instantiation:
657 db_nslcmop_instantiate
= self
.db
.get_list("nslcmops", {"nsInstanceId": db_nslcmop
["nsInstanceId"],
658 "lcmOperationType": "instantiate"})[-1]
659 ns_params
= db_nslcmop_instantiate
.get("operationParams")
661 ns_params
= db_nslcmop
.get("operationParams")
662 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
663 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
666 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
667 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
671 "mgmt-network": vld
.get("mgmt-network", False),
672 "type": vld
.get("type"),
675 "vim_network_name": vld
.get("vim-network-name"),
676 "vim_account_id": ns_params
["vimAccountId"]
680 # check if this network needs SDN assist
681 if vld
.get("pci-interfaces"):
682 db_vim
= get_vim_account(ns_params
["vimAccountId"])
683 sdnc_id
= db_vim
["config"].get("sdn-controller")
685 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
686 target_sdn
= "sdn:{}".format(sdnc_id
)
687 target_vld
["vim_info"][target_sdn
] = {
688 "sdn": True, "target_vim": target_vim
, "vlds": [sdn_vld
], "type": vld
.get("type")}
690 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
691 for nsd_vnf_profile
in nsd_vnf_profiles
:
692 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
693 if cp
["virtual-link-profile-id"] == vld
["id"]:
694 cp2target
["member_vnf:{}.{}".format(
695 cp
["constituent-cpd-id"][0]["constituent-base-element-id"],
696 cp
["constituent-cpd-id"][0]["constituent-cpd-id"]
697 )] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
699 # check at nsd descriptor, if there is an ip-profile
701 virtual_link_profiles
= get_virtual_link_profiles(nsd
)
703 for vlp
in virtual_link_profiles
:
704 ip_profile
= find_in_list(nsd
["ip-profiles"],
705 lambda profile
: profile
["name"] == vlp
["ip-profile-ref"])
706 vld_params
["ip-profile"] = ip_profile
["ip-profile-params"]
707 # update vld_params with instantiation params
708 vld_instantiation_params
= find_in_list(get_iterable(ns_params
, "vld"),
709 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]))
710 if vld_instantiation_params
:
711 vld_params
.update(vld_instantiation_params
)
712 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
713 target
["ns"]["vld"].append(target_vld
)
715 for vnfr
in db_vnfrs
.values():
716 vnfd
= find_in_list(db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"])
717 vnf_params
= find_in_list(get_iterable(ns_params
, "vnf"),
718 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"])
719 target_vnf
= deepcopy(vnfr
)
720 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
721 for vld
in target_vnf
.get("vld", ()):
722 # check if connected to a ns.vld, to fill target'
723 vnf_cp
= find_in_list(vnfd
.get("int-virtual-link-desc", ()),
724 lambda cpd
: cpd
.get("id") == vld
["id"])
726 ns_cp
= "member_vnf:{}.{}".format(vnfr
["member-vnf-index-ref"], vnf_cp
["id"])
727 if cp2target
.get(ns_cp
):
728 vld
["target"] = cp2target
[ns_cp
]
730 vld
["vim_info"] = {target_vim
: {"vim_network_name": vld
.get("vim-network-name")}}
731 # check if this network needs SDN assist
733 if vld
.get("pci-interfaces"):
734 db_vim
= get_vim_account(vnfr
["vim-account-id"])
735 sdnc_id
= db_vim
["config"].get("sdn-controller")
737 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
738 target_sdn
= "sdn:{}".format(sdnc_id
)
739 vld
["vim_info"][target_sdn
] = {
740 "sdn": True, "target_vim": target_vim
, "vlds": [sdn_vld
], "type": vld
.get("type")}
742 # check at vnfd descriptor, if there is an ip-profile
744 vnfd_vlp
= find_in_list(
745 get_virtual_link_profiles(vnfd
),
746 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"]
748 if vnfd_vlp
and vnfd_vlp
.get("virtual-link-protocol-data") and \
749 vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data"):
750 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"]["l3-protocol-data"]
751 ip_profile_dest_data
= {}
752 if "ip-version" in ip_profile_source_data
:
753 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
["ip-version"]
754 if "cidr" in ip_profile_source_data
:
755 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
["cidr"]
756 if "gateway-ip" in ip_profile_source_data
:
757 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
["gateway-ip"]
758 if "dhcp-enabled" in ip_profile_source_data
:
759 ip_profile_dest_data
["dhcp-params"] = {
760 "enabled": ip_profile_source_data
["dhcp-enabled"]
763 vld_params
["ip-profile"] = ip_profile_dest_data
764 # update vld_params with instantiation params
766 vld_instantiation_params
= find_in_list(get_iterable(vnf_params
, "internal-vld"),
767 lambda i_vld
: i_vld
["name"] == vld
["id"])
768 if vld_instantiation_params
:
769 vld_params
.update(vld_instantiation_params
)
770 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
773 for vdur
in target_vnf
.get("vdur", ()):
774 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
775 continue # This vdu must not be created
776 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
778 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
781 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
782 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
783 if vdu_configuration
and vdu_configuration
.get("config-access") and \
784 vdu_configuration
.get("config-access").get("ssh-access"):
785 vdur
["ssh-keys"] = ssh_keys_all
786 vdur
["ssh-access-required"] = vdu_configuration
["config-access"]["ssh-access"]["required"]
787 elif vnf_configuration
and vnf_configuration
.get("config-access") and \
788 vnf_configuration
.get("config-access").get("ssh-access") and \
789 any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"]):
790 vdur
["ssh-keys"] = ssh_keys_all
791 vdur
["ssh-access-required"] = vnf_configuration
["config-access"]["ssh-access"]["required"]
792 elif ssh_keys_instantiation
and \
793 find_in_list(vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")):
794 vdur
["ssh-keys"] = ssh_keys_instantiation
796 self
.logger
.debug("NS > vdur > {}".format(vdur
))
798 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
800 if vdud
.get("cloud-init-file"):
801 vdur
["cloud-init"] = "{}:file:{}".format(vnfd
["_id"], vdud
.get("cloud-init-file"))
802 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
803 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
804 base_folder
= vnfd
["_admin"]["storage"]
805 cloud_init_file
= "{}/{}/cloud_init/{}".format(base_folder
["folder"], base_folder
["pkg-dir"],
806 vdud
.get("cloud-init-file"))
807 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
808 target
["cloud_init_content"][vdur
["cloud-init"]] = ci_file
.read()
809 elif vdud
.get("cloud-init"):
810 vdur
["cloud-init"] = "{}:vdu:{}".format(vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"]))
811 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
812 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
["cloud-init"]
813 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
814 deploy_params_vdu
= self
._format
_additional
_params
(vdur
.get("additionalParams") or {})
815 deploy_params_vdu
["OSM"] = get_osm_params(vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"])
816 vdur
["additionalParams"] = deploy_params_vdu
819 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
820 if target_vim
not in ns_flavor
["vim_info"]:
821 ns_flavor
["vim_info"][target_vim
] = {}
824 # in case alternative images are provided we must check if they should be applied
825 # for the vim_type, modify the vim_type taking into account
826 ns_image_id
= int(vdur
["ns-image-id"])
827 if vdur
.get("alt-image-ids"):
828 db_vim
= get_vim_account(vnfr
["vim-account-id"])
829 vim_type
= db_vim
["vim_type"]
830 for alt_image_id
in vdur
.get("alt-image-ids"):
831 ns_alt_image
= target
["image"][int(alt_image_id
)]
832 if vim_type
== ns_alt_image
.get("vim-type"):
833 # must use alternative image
834 self
.logger
.debug("use alternative image id: {}".format(alt_image_id
))
835 ns_image_id
= alt_image_id
836 vdur
["ns-image-id"] = ns_image_id
838 ns_image
= target
["image"][int(ns_image_id
)]
839 if target_vim
not in ns_image
["vim_info"]:
840 ns_image
["vim_info"][target_vim
] = {}
842 vdur
["vim_info"] = {target_vim
: {}}
843 # instantiation parameters
845 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
847 vdur_list
.append(vdur
)
848 target_vnf
["vdur"] = vdur_list
849 target
["vnf"].append(target_vnf
)
851 desc
= await self
.RO
.deploy(nsr_id
, target
)
852 self
.logger
.debug("RO return > {}".format(desc
))
853 action_id
= desc
["action_id"]
854 await self
._wait
_ng
_ro
(nsr_id
, action_id
, nslcmop_id
, start_deploy
, timeout_ns_deploy
, stage
)
858 "_admin.deployed.RO.operational-status": "running",
859 "detailed-status": " ".join(stage
)
861 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
862 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
863 self
._write
_op
_status
(nslcmop_id
, stage
)
864 self
.logger
.debug(logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
))
867 async def _wait_ng_ro(self
, nsr_id
, action_id
, nslcmop_id
=None, start_time
=None, timeout
=600, stage
=None):
868 detailed_status_old
= None
870 start_time
= start_time
or time()
871 while time() <= start_time
+ timeout
:
872 desc_status
= await self
.RO
.status(nsr_id
, action_id
)
873 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
874 if desc_status
["status"] == "FAILED":
875 raise NgRoException(desc_status
["details"])
876 elif desc_status
["status"] == "BUILD":
878 stage
[2] = "VIM: ({})".format(desc_status
["details"])
879 elif desc_status
["status"] == "DONE":
881 stage
[2] = "Deployed at VIM"
884 assert False, "ROclient.check_ns_status returns unknown {}".format(desc_status
["status"])
885 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
886 detailed_status_old
= stage
[2]
887 db_nsr_update
["detailed-status"] = " ".join(stage
)
888 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
889 self
._write
_op
_status
(nslcmop_id
, stage
)
890 await asyncio
.sleep(15, loop
=self
.loop
)
891 else: # timeout_ns_deploy
892 raise NgRoException("Timeout waiting ns to deploy")
894 async def _terminate_ng_ro(self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
):
898 start_deploy
= time()
905 "action_id": nslcmop_id
907 desc
= await self
.RO
.deploy(nsr_id
, target
)
908 action_id
= desc
["action_id"]
909 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
910 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
911 self
.logger
.debug(logging_text
+ "ns terminate action at RO. action_id={}".format(action_id
))
914 delete_timeout
= 20 * 60 # 20 minutes
915 await self
._wait
_ng
_ro
(nsr_id
, action_id
, nslcmop_id
, start_deploy
, delete_timeout
, stage
)
917 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
918 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
920 await self
.RO
.delete(nsr_id
)
921 except Exception as e
:
922 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
923 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
924 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
925 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
926 self
.logger
.debug(logging_text
+ "RO_action_id={} already deleted".format(action_id
))
927 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
928 failed_detail
.append("delete conflict: {}".format(e
))
929 self
.logger
.debug(logging_text
+ "RO_action_id={} delete conflict: {}".format(action_id
, e
))
931 failed_detail
.append("delete error: {}".format(e
))
932 self
.logger
.error(logging_text
+ "RO_action_id={} delete error: {}".format(action_id
, e
))
935 stage
[2] = "Error deleting from VIM"
937 stage
[2] = "Deleted from VIM"
938 db_nsr_update
["detailed-status"] = " ".join(stage
)
939 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
940 self
._write
_op
_status
(nslcmop_id
, stage
)
943 raise LcmException("; ".join(failed_detail
))
946 async def instantiate_RO(self
, logging_text
, nsr_id
, nsd
, db_nsr
, db_nslcmop
, db_vnfrs
, db_vnfds
,
947 n2vc_key_list
, stage
):
950 :param logging_text: preffix text to use at logging
951 :param nsr_id: nsr identity
952 :param nsd: database content of ns descriptor
953 :param db_nsr: database content of ns record
954 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
956 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
957 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
958 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
959 :return: None or exception
962 start_deploy
= time()
963 ns_params
= db_nslcmop
.get("operationParams")
964 if ns_params
and ns_params
.get("timeout_ns_deploy"):
965 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
967 timeout_ns_deploy
= self
.timeout
.get("ns_deploy", self
.timeout_ns_deploy
)
969 # Check for and optionally request placement optimization. Database will be updated if placement activated
970 stage
[2] = "Waiting for Placement."
971 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
972 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
973 for vnfr
in db_vnfrs
.values():
974 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
977 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
979 return await self
._instantiate
_ng
_ro
(logging_text
, nsr_id
, nsd
, db_nsr
, db_nslcmop
, db_vnfrs
,
980 db_vnfds
, n2vc_key_list
, stage
, start_deploy
, timeout_ns_deploy
)
981 except Exception as e
:
982 stage
[2] = "ERROR deploying at VIM"
983 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
984 self
.logger
.error("Error deploying at VIM {}".format(e
),
985 exc_info
=not isinstance(e
, (ROclient
.ROClientException
, LcmException
, DbException
,
989 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
991 Wait for kdu to be up, get ip address
992 :param logging_text: prefix use for logging
999 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1002 while nb_tries
< 360:
1003 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1004 kdur
= next((x
for x
in get_iterable(db_vnfr
, "kdur") if x
.get("kdu-name") == kdu_name
), None)
1006 raise LcmException("Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
))
1007 if kdur
.get("status"):
1008 if kdur
["status"] in ("READY", "ENABLED"):
1009 return kdur
.get("ip-address")
1011 raise LcmException("target KDU={} is in error state".format(kdu_name
))
1013 await asyncio
.sleep(10, loop
=self
.loop
)
1015 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1017 async def wait_vm_up_insert_key_ro(self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None):
1019 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1020 :param logging_text: prefix use for logging
1025 :param pub_key: public ssh key to inject, None to skip
1026 :param user: user to apply the public ssh key
1030 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1034 target_vdu_id
= None
1040 if ro_retries
>= 360: # 1 hour
1041 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
))
1043 await asyncio
.sleep(10, loop
=self
.loop
)
1046 if not target_vdu_id
:
1047 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1049 if not vdu_id
: # for the VNF case
1050 if db_vnfr
.get("status") == "ERROR":
1051 raise LcmException("Cannot inject ssh-key because target VNF is in error state")
1052 ip_address
= db_vnfr
.get("ip-address")
1055 vdur
= next((x
for x
in get_iterable(db_vnfr
, "vdur") if x
.get("ip-address") == ip_address
), None)
1057 vdur
= next((x
for x
in get_iterable(db_vnfr
, "vdur")
1058 if x
.get("vdu-id-ref") == vdu_id
and x
.get("count-index") == vdu_index
), None)
1060 if not vdur
and len(db_vnfr
.get("vdur", ())) == 1: # If only one, this should be the target vdu
1061 vdur
= db_vnfr
["vdur"][0]
1063 raise LcmException("Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(vnfr_id
, vdu_id
,
1065 # New generation RO stores information at "vim_info"
1068 if vdur
.get("vim_info"):
1069 target_vim
= next(t
for t
in vdur
["vim_info"]) # there should be only one key
1070 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1071 if vdur
.get("pdu-type") or vdur
.get("status") == "ACTIVE" or ng_ro_status
== "ACTIVE":
1072 ip_address
= vdur
.get("ip-address")
1075 target_vdu_id
= vdur
["vdu-id-ref"]
1076 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1077 raise LcmException("Cannot inject ssh-key because target VM is in error state")
1079 if not target_vdu_id
:
1082 # inject public key into machine
1083 if pub_key
and user
:
1084 self
.logger
.debug(logging_text
+ "Inserting RO key")
1085 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1086 if vdur
.get("pdu-type"):
1087 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1090 ro_vm_id
= "{}-{}".format(db_vnfr
["member-vnf-index-ref"], target_vdu_id
) # TODO add vdu_index
1092 target
= {"action": {"action": "inject_ssh_key", "key": pub_key
, "user": user
},
1093 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1095 desc
= await self
.RO
.deploy(nsr_id
, target
)
1096 action_id
= desc
["action_id"]
1097 await self
._wait
_ng
_ro
(nsr_id
, action_id
, timeout
=600)
1100 # wait until NS is deployed at RO
1102 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1103 ro_nsr_id
= deep_get(db_nsrs
, ("_admin", "deployed", "RO", "nsr_id"))
1106 result_dict
= await self
.RO
.create_action(
1108 item_id_name
=ro_nsr_id
,
1109 descriptor
={"add_public_key": pub_key
, "vms": [ro_vm_id
], "user": user
}
1111 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1112 if not result_dict
or not isinstance(result_dict
, dict):
1113 raise LcmException("Unknown response from RO when injecting key")
1114 for result
in result_dict
.values():
1115 if result
.get("vim_result") == 200:
1118 raise ROclient
.ROClientException("error injecting key: {}".format(
1119 result
.get("description")))
1121 except NgRoException
as e
:
1122 raise LcmException("Reaching max tries injecting key. Error: {}".format(e
))
1123 except ROclient
.ROClientException
as e
:
1125 self
.logger
.debug(logging_text
+ "error injecting key: {}. Retrying until {} seconds".
1129 raise LcmException("Reaching max tries injecting key. Error: {}".format(e
))
1135 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1137 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1139 my_vca
= vca_deployed_list
[vca_index
]
1140 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1141 # vdu or kdu: no dependencies
1145 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1146 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1147 configuration_status_list
= db_nsr
["configurationStatus"]
1148 for index
, vca_deployed
in enumerate(configuration_status_list
):
1149 if index
== vca_index
:
1152 if not my_vca
.get("member-vnf-index") or \
1153 (vca_deployed
.get("member-vnf-index") == my_vca
.get("member-vnf-index")):
1154 internal_status
= configuration_status_list
[index
].get("status")
1155 if internal_status
== 'READY':
1157 elif internal_status
== 'BROKEN':
1158 raise LcmException("Configuration aborted because dependent charm/s has failed")
1162 # no dependencies, return
1164 await asyncio
.sleep(10)
1167 raise LcmException("Configuration aborted because dependent charm/s timeout")
1169 async def instantiate_N2VC(self
, logging_text
, vca_index
, nsi_id
, db_nsr
, db_vnfr
, vdu_id
, kdu_name
, vdu_index
,
1170 config_descriptor
, deploy_params
, base_folder
, nslcmop_id
, stage
, vca_type
, vca_name
,
1171 ee_config_descriptor
):
1172 nsr_id
= db_nsr
["_id"]
1173 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1174 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1175 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1176 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1178 'collection': 'nsrs',
1179 'filter': {'_id': nsr_id
},
1180 'path': db_update_entry
1186 element_under_configuration
= nsr_id
1190 vnfr_id
= db_vnfr
["_id"]
1191 osm_config
["osm"]["vnf_id"] = vnfr_id
1193 namespace
= "{nsi}.{ns}".format(
1194 nsi
=nsi_id
if nsi_id
else "",
1198 element_type
= 'VNF'
1199 element_under_configuration
= vnfr_id
1200 namespace
+= ".{}-{}".format(vnfr_id
, vdu_index
or 0)
1202 namespace
+= ".{}-{}".format(vdu_id
, vdu_index
or 0)
1203 element_type
= 'VDU'
1204 element_under_configuration
= "{}-{}".format(vdu_id
, vdu_index
or 0)
1205 osm_config
["osm"]["vdu_id"] = vdu_id
1207 namespace
+= ".{}.{}".format(kdu_name
, vdu_index
or 0)
1208 element_type
= 'KDU'
1209 element_under_configuration
= kdu_name
1210 osm_config
["osm"]["kdu_name"] = kdu_name
1213 artifact_path
= "{}/{}/{}/{}".format(
1214 base_folder
["folder"],
1215 base_folder
["pkg-dir"],
1216 "charms" if vca_type
in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm") else "helm-charts",
1220 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1222 # get initial_config_primitive_list that applies to this element
1223 initial_config_primitive_list
= config_descriptor
.get('initial-config-primitive')
1225 self
.logger
.debug("Initial config primitive list > {}".format(initial_config_primitive_list
))
1227 # add config if not present for NS charm
1228 ee_descriptor_id
= ee_config_descriptor
.get("id")
1229 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1230 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(initial_config_primitive_list
,
1231 vca_deployed
, ee_descriptor_id
)
1233 self
.logger
.debug("Initial config primitive list #2 > {}".format(initial_config_primitive_list
))
1234 # n2vc_redesign STEP 3.1
1235 # find old ee_id if exists
1236 ee_id
= vca_deployed
.get("ee_id")
1239 deep_get(db_vnfr
, ("vim-account-id",)) or
1240 deep_get(deploy_params
, ("OSM", "vim_account_id"))
1242 vca_cloud
, vca_cloud_credential
= self
.get_vca_cloud_and_credentials(vim_account_id
)
1243 vca_k8s_cloud
, vca_k8s_cloud_credential
= self
.get_vca_k8s_cloud_and_credentials(vim_account_id
)
1244 # create or register execution environment in VCA
1245 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1247 self
._write
_configuration
_status
(
1249 vca_index
=vca_index
,
1251 element_under_configuration
=element_under_configuration
,
1252 element_type
=element_type
1255 step
= "create execution environment"
1256 self
.logger
.debug(logging_text
+ step
)
1260 if vca_type
== "k8s_proxy_charm":
1261 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1262 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1:],
1263 namespace
=namespace
,
1264 artifact_path
=artifact_path
,
1266 cloud_name
=vca_k8s_cloud
,
1267 credential_name
=vca_k8s_cloud_credential
,
1269 elif vca_type
== "helm" or vca_type
== "helm-v3":
1270 ee_id
, credentials
= await self
.vca_map
[vca_type
].create_execution_environment(
1271 namespace
=namespace
,
1275 artifact_path
=artifact_path
,
1279 ee_id
, credentials
= await self
.vca_map
[vca_type
].create_execution_environment(
1280 namespace
=namespace
,
1283 cloud_name
=vca_cloud
,
1284 credential_name
=vca_cloud_credential
,
1287 elif vca_type
== "native_charm":
1288 step
= "Waiting to VM being up and getting IP address"
1289 self
.logger
.debug(logging_text
+ step
)
1290 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
,
1291 user
=None, pub_key
=None)
1292 credentials
= {"hostname": rw_mgmt_ip
}
1294 username
= deep_get(config_descriptor
, ("config-access", "ssh-access", "default-user"))
1295 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1296 # merged. Meanwhile let's get username from initial-config-primitive
1297 if not username
and initial_config_primitive_list
:
1298 for config_primitive
in initial_config_primitive_list
:
1299 for param
in config_primitive
.get("parameter", ()):
1300 if param
["name"] == "ssh-username":
1301 username
= param
["value"]
1304 raise LcmException("Cannot determine the username neither with 'initial-config-primitive' nor with "
1305 "'config-access.ssh-access.default-user'")
1306 credentials
["username"] = username
1307 # n2vc_redesign STEP 3.2
1309 self
._write
_configuration
_status
(
1311 vca_index
=vca_index
,
1312 status
='REGISTERING',
1313 element_under_configuration
=element_under_configuration
,
1314 element_type
=element_type
1317 step
= "register execution environment {}".format(credentials
)
1318 self
.logger
.debug(logging_text
+ step
)
1319 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1320 credentials
=credentials
,
1321 namespace
=namespace
,
1323 cloud_name
=vca_cloud
,
1324 credential_name
=vca_cloud_credential
,
1327 # for compatibility with MON/POL modules, the need model and application name at database
1328 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1329 ee_id_parts
= ee_id
.split('.')
1330 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1331 if len(ee_id_parts
) >= 2:
1332 model_name
= ee_id_parts
[0]
1333 application_name
= ee_id_parts
[1]
1334 db_nsr_update
[db_update_entry
+ "model"] = model_name
1335 db_nsr_update
[db_update_entry
+ "application"] = application_name
1337 # n2vc_redesign STEP 3.3
1338 step
= "Install configuration Software"
1340 self
._write
_configuration
_status
(
1342 vca_index
=vca_index
,
1343 status
='INSTALLING SW',
1344 element_under_configuration
=element_under_configuration
,
1345 element_type
=element_type
,
1346 other_update
=db_nsr_update
1349 # TODO check if already done
1350 self
.logger
.debug(logging_text
+ step
)
1352 if vca_type
== "native_charm":
1353 config_primitive
= next((p
for p
in initial_config_primitive_list
if p
["name"] == "config"), None)
1354 if config_primitive
:
1355 config
= self
._map
_primitive
_params
(
1361 if vca_type
== "lxc_proxy_charm":
1362 if element_type
== "NS":
1363 num_units
= db_nsr
.get("config-units") or 1
1364 elif element_type
== "VNF":
1365 num_units
= db_vnfr
.get("config-units") or 1
1366 elif element_type
== "VDU":
1367 for v
in db_vnfr
["vdur"]:
1368 if vdu_id
== v
["vdu-id-ref"]:
1369 num_units
= v
.get("config-units") or 1
1371 if vca_type
!= "k8s_proxy_charm":
1372 await self
.vca_map
[vca_type
].install_configuration_sw(
1374 artifact_path
=artifact_path
,
1377 num_units
=num_units
,
1380 # write in db flag of configuration_sw already installed
1381 self
.update_db_2("nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True})
1383 # add relations for this VCA (wait for other peers related with this VCA)
1384 await self
._add
_vca
_relations
(logging_text
=logging_text
, nsr_id
=nsr_id
,
1385 vca_index
=vca_index
, vca_type
=vca_type
)
1387 # if SSH access is required, then get execution environment SSH public
1388 # if native charm we have waited already to VM be UP
1389 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1392 # self.logger.debug("get ssh key block")
1393 if deep_get(config_descriptor
, ("config-access", "ssh-access", "required")):
1394 # self.logger.debug("ssh key needed")
1395 # Needed to inject a ssh key
1396 user
= deep_get(config_descriptor
, ("config-access", "ssh-access", "default-user"))
1397 step
= "Install configuration Software, getting public ssh key"
1398 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(ee_id
=ee_id
, db_dict
=db_dict
)
1400 step
= "Insert public key into VM user={} ssh_key={}".format(user
, pub_key
)
1402 # self.logger.debug("no need to get ssh key")
1403 step
= "Waiting to VM being up and getting IP address"
1404 self
.logger
.debug(logging_text
+ step
)
1406 # n2vc_redesign STEP 5.1
1407 # wait for RO (ip-address) Insert pub_key into VM
1410 rw_mgmt_ip
= await self
.wait_kdu_up(logging_text
, nsr_id
, vnfr_id
, kdu_name
)
1412 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(logging_text
, nsr_id
, vnfr_id
, vdu_id
,
1413 vdu_index
, user
=user
, pub_key
=pub_key
)
1415 rw_mgmt_ip
= None # This is for a NS configuration
1417 self
.logger
.debug(logging_text
+ ' VM_ip_address={}'.format(rw_mgmt_ip
))
1419 # store rw_mgmt_ip in deploy params for later replacement
1420 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
1422 # n2vc_redesign STEP 6 Execute initial config primitive
1423 step
= 'execute initial config primitive'
1425 # wait for dependent primitives execution (NS -> VNF -> VDU)
1426 if initial_config_primitive_list
:
1427 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
1429 # stage, in function of element type: vdu, kdu, vnf or ns
1430 my_vca
= vca_deployed_list
[vca_index
]
1431 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1433 stage
[0] = 'Stage 3/5: running Day-1 primitives for VDU.'
1434 elif my_vca
.get("member-vnf-index"):
1436 stage
[0] = 'Stage 4/5: running Day-1 primitives for VNF.'
1439 stage
[0] = 'Stage 5/5: running Day-1 primitives for NS.'
1441 self
._write
_configuration
_status
(
1443 vca_index
=vca_index
,
1444 status
='EXECUTING PRIMITIVE'
1447 self
._write
_op
_status
(
1452 check_if_terminated_needed
= True
1453 for initial_config_primitive
in initial_config_primitive_list
:
1454 # adding information on the vca_deployed if it is a NS execution environment
1455 if not vca_deployed
["member-vnf-index"]:
1456 deploy_params
["ns_config_info"] = json
.dumps(self
._get
_ns
_config
_info
(nsr_id
))
1457 # TODO check if already done
1458 primitive_params_
= self
._map
_primitive
_params
(initial_config_primitive
, {}, deploy_params
)
1460 step
= "execute primitive '{}' params '{}'".format(initial_config_primitive
["name"], primitive_params_
)
1461 self
.logger
.debug(logging_text
+ step
)
1462 await self
.vca_map
[vca_type
].exec_primitive(
1464 primitive_name
=initial_config_primitive
["name"],
1465 params_dict
=primitive_params_
,
1468 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
1469 if check_if_terminated_needed
:
1470 if config_descriptor
.get('terminate-config-primitive'):
1471 self
.update_db_2("nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True})
1472 check_if_terminated_needed
= False
1474 # TODO register in database that primitive is done
1476 # STEP 7 Configure metrics
1477 if vca_type
== "helm" or vca_type
== "helm-v3":
1478 prometheus_jobs
= await self
.add_prometheus_metrics(
1480 artifact_path
=artifact_path
,
1481 ee_config_descriptor
=ee_config_descriptor
,
1484 target_ip
=rw_mgmt_ip
,
1487 self
.update_db_2("nsrs", nsr_id
, {db_update_entry
+ "prometheus_jobs": prometheus_jobs
})
1489 step
= "instantiated at VCA"
1490 self
.logger
.debug(logging_text
+ step
)
1492 self
._write
_configuration
_status
(
1494 vca_index
=vca_index
,
1498 except Exception as e
: # TODO not use Exception but N2VC exception
1499 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1500 if not isinstance(e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)):
1501 self
.logger
.error("Exception while {} : {}".format(step
, e
), exc_info
=True)
1502 self
._write
_configuration
_status
(
1504 vca_index
=vca_index
,
1507 raise LcmException("{} {}".format(step
, e
)) from e
1509 def _write_ns_status(self
, nsr_id
: str, ns_state
: str, current_operation
: str, current_operation_id
: str,
1510 error_description
: str = None, error_detail
: str = None, other_update
: dict = None):
1512 Update db_nsr fields.
1515 :param current_operation:
1516 :param current_operation_id:
1517 :param error_description:
1518 :param error_detail:
1519 :param other_update: Other required changes at database if provided, will be cleared
1523 db_dict
= other_update
or {}
1524 db_dict
["_admin.nslcmop"] = current_operation_id
# for backward compatibility
1525 db_dict
["_admin.current-operation"] = current_operation_id
1526 db_dict
["_admin.operation-type"] = current_operation
if current_operation
!= "IDLE" else None
1527 db_dict
["currentOperation"] = current_operation
1528 db_dict
["currentOperationID"] = current_operation_id
1529 db_dict
["errorDescription"] = error_description
1530 db_dict
["errorDetail"] = error_detail
1533 db_dict
["nsState"] = ns_state
1534 self
.update_db_2("nsrs", nsr_id
, db_dict
)
1535 except DbException
as e
:
1536 self
.logger
.warn('Error writing NS status, ns={}: {}'.format(nsr_id
, e
))
1538 def _write_op_status(self
, op_id
: str, stage
: list = None, error_message
: str = None, queuePosition
: int = 0,
1539 operation_state
: str = None, other_update
: dict = None):
1541 db_dict
= other_update
or {}
1542 db_dict
['queuePosition'] = queuePosition
1543 if isinstance(stage
, list):
1544 db_dict
['stage'] = stage
[0]
1545 db_dict
['detailed-status'] = " ".join(stage
)
1546 elif stage
is not None:
1547 db_dict
['stage'] = str(stage
)
1549 if error_message
is not None:
1550 db_dict
['errorMessage'] = error_message
1551 if operation_state
is not None:
1552 db_dict
['operationState'] = operation_state
1553 db_dict
["statusEnteredTime"] = time()
1554 self
.update_db_2("nslcmops", op_id
, db_dict
)
1555 except DbException
as e
:
1556 self
.logger
.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id
, e
))
1558 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
1560 nsr_id
= db_nsr
["_id"]
1561 # configurationStatus
1562 config_status
= db_nsr
.get('configurationStatus')
1564 db_nsr_update
= {"configurationStatus.{}.status".format(index
): status
for index
, v
in
1565 enumerate(config_status
) if v
}
1567 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1569 except DbException
as e
:
1570 self
.logger
.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id
, e
))
1572 def _write_configuration_status(self
, nsr_id
: str, vca_index
: int, status
: str = None,
1573 element_under_configuration
: str = None, element_type
: str = None,
1574 other_update
: dict = None):
1576 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
1577 # .format(vca_index, status))
1580 db_path
= 'configurationStatus.{}.'.format(vca_index
)
1581 db_dict
= other_update
or {}
1583 db_dict
[db_path
+ 'status'] = status
1584 if element_under_configuration
:
1585 db_dict
[db_path
+ 'elementUnderConfiguration'] = element_under_configuration
1587 db_dict
[db_path
+ 'elementType'] = element_type
1588 self
.update_db_2("nsrs", nsr_id
, db_dict
)
1589 except DbException
as e
:
1590 self
.logger
.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
1591 .format(status
, nsr_id
, vca_index
, e
))
1593 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
1595 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
1596 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
1597 Database is used because the result can be obtained from a different LCM worker in case of HA.
1598 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
1599 :param db_nslcmop: database content of nslcmop
1600 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
1601 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
1602 computed 'vim-account-id'
1605 nslcmop_id
= db_nslcmop
['_id']
1606 placement_engine
= deep_get(db_nslcmop
, ('operationParams', 'placement-engine'))
1607 if placement_engine
== "PLA":
1608 self
.logger
.debug(logging_text
+ "Invoke and wait for placement optimization")
1609 await self
.msg
.aiowrite("pla", "get_placement", {'nslcmopId': nslcmop_id
}, loop
=self
.loop
)
1610 db_poll_interval
= 5
1611 wait
= db_poll_interval
* 10
1613 while not pla_result
and wait
>= 0:
1614 await asyncio
.sleep(db_poll_interval
)
1615 wait
-= db_poll_interval
1616 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
1617 pla_result
= deep_get(db_nslcmop
, ('_admin', 'pla'))
1620 raise LcmException("Placement timeout for nslcmopId={}".format(nslcmop_id
))
1622 for pla_vnf
in pla_result
['vnf']:
1623 vnfr
= db_vnfrs
.get(pla_vnf
['member-vnf-index'])
1624 if not pla_vnf
.get('vimAccountId') or not vnfr
:
1627 self
.db
.set_one("vnfrs", {"_id": vnfr
["_id"]}, {"vim-account-id": pla_vnf
['vimAccountId']})
1629 vnfr
["vim-account-id"] = pla_vnf
['vimAccountId']
1632 def update_nsrs_with_pla_result(self
, params
):
1634 nslcmop_id
= deep_get(params
, ('placement', 'nslcmopId'))
1635 self
.update_db_2("nslcmops", nslcmop_id
, {"_admin.pla": params
.get('placement')})
1636 except Exception as e
:
1637 self
.logger
.warn('Update failed for nslcmop_id={}:{}'.format(nslcmop_id
, e
))
1639 async def instantiate(self
, nsr_id
, nslcmop_id
):
1642 :param nsr_id: ns instance to deploy
1643 :param nslcmop_id: operation to run
1647 # Try to lock HA task here
1648 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
1649 if not task_is_locked_by_me
:
1650 self
.logger
.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id
))
1653 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
1654 self
.logger
.debug(logging_text
+ "Enter")
1656 # get all needed from database
1658 # database nsrs record
1661 # database nslcmops record
1664 # update operation on nsrs
1666 # update operation on nslcmops
1667 db_nslcmop_update
= {}
1669 nslcmop_operation_state
= None
1670 db_vnfrs
= {} # vnf's info indexed by member-index
1672 tasks_dict_info
= {} # from task to info text
1675 stage
= ['Stage 1/5: preparation of the environment.', "Waiting for previous operations to terminate.", ""]
1676 # ^ stage, step, VIM progress
1678 # wait for any previous tasks in process
1679 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
1681 stage
[1] = "Sync filesystem from database."
1682 self
.fs
.sync() # TODO, make use of partial sync, only for the needed packages
1684 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1685 stage
[1] = "Reading from database."
1686 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1687 db_nsr_update
["detailed-status"] = "creating"
1688 db_nsr_update
["operational-status"] = "init"
1689 self
._write
_ns
_status
(
1691 ns_state
="BUILDING",
1692 current_operation
="INSTANTIATING",
1693 current_operation_id
=nslcmop_id
,
1694 other_update
=db_nsr_update
1696 self
._write
_op
_status
(
1702 # read from db: operation
1703 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
1704 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
1705 ns_params
= db_nslcmop
.get("operationParams")
1706 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1707 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1709 timeout_ns_deploy
= self
.timeout
.get("ns_deploy", self
.timeout_ns_deploy
)
1712 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
1713 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1714 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
1715 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
1717 # nsr_name = db_nsr["name"] # TODO short-name??
1719 # read from db: vnf's of this ns
1720 stage
[1] = "Getting vnfrs from db."
1721 self
.logger
.debug(logging_text
+ stage
[1])
1722 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
1724 # read from db: vnfd's for every vnf
1725 db_vnfds
= [] # every vnfd data
1727 # for each vnf in ns, read vnfd
1728 for vnfr
in db_vnfrs_list
:
1729 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
1730 vnfd_id
= vnfr
["vnfd-id"]
1731 vnfd_ref
= vnfr
["vnfd-ref"]
1733 # if we haven't this vnfd, read it from db
1734 if vnfd_id
not in db_vnfds
:
1736 stage
[1] = "Getting vnfd={} id='{}' from db.".format(vnfd_id
, vnfd_ref
)
1737 self
.logger
.debug(logging_text
+ stage
[1])
1738 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
1741 db_vnfds
.append(vnfd
)
1743 # Get or generates the _admin.deployed.VCA list
1744 vca_deployed_list
= None
1745 if db_nsr
["_admin"].get("deployed"):
1746 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
1747 if vca_deployed_list
is None:
1748 vca_deployed_list
= []
1749 configuration_status_list
= []
1750 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
1751 db_nsr_update
["configurationStatus"] = configuration_status_list
1752 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
1753 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
1754 elif isinstance(vca_deployed_list
, dict):
1755 # maintain backward compatibility. Change a dict to list at database
1756 vca_deployed_list
= list(vca_deployed_list
.values())
1757 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
1758 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
1760 if not isinstance(deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list):
1761 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
1762 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
1764 # set state to INSTANTIATED. When instantiated NBI will not delete directly
1765 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
1766 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1767 self
.db
.set_list("vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"})
1769 # n2vc_redesign STEP 2 Deploy Network Scenario
1770 stage
[0] = 'Stage 2/5: deployment of KDUs, VMs and execution environments.'
1771 self
._write
_op
_status
(
1776 stage
[1] = "Deploying KDUs."
1777 # self.logger.debug(logging_text + "Before deploy_kdus")
1778 # Call to deploy_kdus in case exists the "vdu:kdu" param
1779 await self
.deploy_kdus(
1780 logging_text
=logging_text
,
1782 nslcmop_id
=nslcmop_id
,
1785 task_instantiation_info
=tasks_dict_info
,
1788 stage
[1] = "Getting VCA public key."
1789 # n2vc_redesign STEP 1 Get VCA public ssh-key
1790 # feature 1429. Add n2vc public key to needed VMs
1791 n2vc_key
= self
.n2vc
.get_public_key()
1792 n2vc_key_list
= [n2vc_key
]
1793 if self
.vca_config
.get("public_key"):
1794 n2vc_key_list
.append(self
.vca_config
["public_key"])
1796 stage
[1] = "Deploying NS at VIM."
1797 task_ro
= asyncio
.ensure_future(
1798 self
.instantiate_RO(
1799 logging_text
=logging_text
,
1803 db_nslcmop
=db_nslcmop
,
1806 n2vc_key_list
=n2vc_key_list
,
1810 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
1811 tasks_dict_info
[task_ro
] = "Deploying at VIM"
1813 # n2vc_redesign STEP 3 to 6 Deploy N2VC
1814 stage
[1] = "Deploying Execution Environments."
1815 self
.logger
.debug(logging_text
+ stage
[1])
1817 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
1818 for vnf_profile
in get_vnf_profiles(nsd
):
1819 vnfd_id
= vnf_profile
["vnfd-id"]
1820 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
1821 member_vnf_index
= str(vnf_profile
["id"])
1822 db_vnfr
= db_vnfrs
[member_vnf_index
]
1823 base_folder
= vnfd
["_admin"]["storage"]
1829 # Get additional parameters
1830 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
1831 if db_vnfr
.get("additionalParamsForVnf"):
1832 deploy_params
.update(parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy()))
1834 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
1835 if descriptor_config
:
1837 logging_text
=logging_text
+ "member_vnf_index={} ".format(member_vnf_index
),
1840 nslcmop_id
=nslcmop_id
,
1846 member_vnf_index
=member_vnf_index
,
1847 vdu_index
=vdu_index
,
1849 deploy_params
=deploy_params
,
1850 descriptor_config
=descriptor_config
,
1851 base_folder
=base_folder
,
1852 task_instantiation_info
=tasks_dict_info
,
1856 # Deploy charms for each VDU that supports one.
1857 for vdud
in get_vdu_list(vnfd
):
1859 descriptor_config
= get_configuration(vnfd
, vdu_id
)
1860 vdur
= find_in_list(db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
)
1862 if vdur
.get("additionalParams"):
1863 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
1865 deploy_params_vdu
= deploy_params
1866 deploy_params_vdu
["OSM"] = get_osm_params(db_vnfr
, vdu_id
, vdu_count_index
=0)
1867 vdud_count
= get_vdu_profile(vnfd
, vdu_id
).get("max-number-of-instances", 1)
1869 self
.logger
.debug("VDUD > {}".format(vdud
))
1870 self
.logger
.debug("Descriptor config > {}".format(descriptor_config
))
1871 if descriptor_config
:
1874 for vdu_index
in range(vdud_count
):
1875 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
1877 logging_text
=logging_text
+ "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
1878 member_vnf_index
, vdu_id
, vdu_index
),
1881 nslcmop_id
=nslcmop_id
,
1887 member_vnf_index
=member_vnf_index
,
1888 vdu_index
=vdu_index
,
1890 deploy_params
=deploy_params_vdu
,
1891 descriptor_config
=descriptor_config
,
1892 base_folder
=base_folder
,
1893 task_instantiation_info
=tasks_dict_info
,
1896 for kdud
in get_kdu_list(vnfd
):
1897 kdu_name
= kdud
["name"]
1898 descriptor_config
= get_configuration(vnfd
, kdu_name
)
1899 if descriptor_config
:
1903 kdur
= next(x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
)
1904 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
1905 if kdur
.get("additionalParams"):
1906 deploy_params_kdu
= parse_yaml_strings(kdur
["additionalParams"])
1909 logging_text
=logging_text
,
1912 nslcmop_id
=nslcmop_id
,
1918 member_vnf_index
=member_vnf_index
,
1919 vdu_index
=vdu_index
,
1921 deploy_params
=deploy_params_kdu
,
1922 descriptor_config
=descriptor_config
,
1923 base_folder
=base_folder
,
1924 task_instantiation_info
=tasks_dict_info
,
1928 # Check if this NS has a charm configuration
1929 descriptor_config
= nsd
.get("ns-configuration")
1930 if descriptor_config
and descriptor_config
.get("juju"):
1933 member_vnf_index
= None
1939 # Get additional parameters
1940 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
1941 if db_nsr
.get("additionalParamsForNs"):
1942 deploy_params
.update(parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy()))
1943 base_folder
= nsd
["_admin"]["storage"]
1945 logging_text
=logging_text
,
1948 nslcmop_id
=nslcmop_id
,
1954 member_vnf_index
=member_vnf_index
,
1955 vdu_index
=vdu_index
,
1957 deploy_params
=deploy_params
,
1958 descriptor_config
=descriptor_config
,
1959 base_folder
=base_folder
,
1960 task_instantiation_info
=tasks_dict_info
,
1964 # rest of staff will be done at finally
1966 except (ROclient
.ROClientException
, DbException
, LcmException
, N2VCException
) as e
:
1967 self
.logger
.error(logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
))
1969 except asyncio
.CancelledError
:
1970 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(stage
[1]))
1971 exc
= "Operation was cancelled"
1972 except Exception as e
:
1973 exc
= traceback
.format_exc()
1974 self
.logger
.critical(logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
), exc_info
=True)
1977 error_list
.append(str(exc
))
1979 # wait for pending tasks
1981 stage
[1] = "Waiting for instantiate pending tasks."
1982 self
.logger
.debug(logging_text
+ stage
[1])
1983 error_list
+= await self
._wait
_for
_tasks
(logging_text
, tasks_dict_info
, timeout_ns_deploy
,
1984 stage
, nslcmop_id
, nsr_id
=nsr_id
)
1985 stage
[1] = stage
[2] = ""
1986 except asyncio
.CancelledError
:
1987 error_list
.append("Cancelled")
1988 # TODO cancel all tasks
1989 except Exception as exc
:
1990 error_list
.append(str(exc
))
1992 # update operation-status
1993 db_nsr_update
["operational-status"] = "running"
1994 # let's begin with VCA 'configured' status (later we can change it)
1995 db_nsr_update
["config-status"] = "configured"
1996 for task
, task_name
in tasks_dict_info
.items():
1997 if not task
.done() or task
.cancelled() or task
.exception():
1998 if task_name
.startswith(self
.task_name_deploy_vca
):
1999 # A N2VC task is pending
2000 db_nsr_update
["config-status"] = "failed"
2002 # RO or KDU task is pending
2003 db_nsr_update
["operational-status"] = "failed"
2005 # update status at database
2007 error_detail
= ". ".join(error_list
)
2008 self
.logger
.error(logging_text
+ error_detail
)
2009 error_description_nslcmop
= '{} Detail: {}'.format(stage
[0], error_detail
)
2010 error_description_nsr
= 'Operation: INSTANTIATING.{}, {}'.format(nslcmop_id
, stage
[0])
2012 db_nsr_update
["detailed-status"] = error_description_nsr
+ " Detail: " + error_detail
2013 db_nslcmop_update
["detailed-status"] = error_detail
2014 nslcmop_operation_state
= "FAILED"
2018 error_description_nsr
= error_description_nslcmop
= None
2020 db_nsr_update
["detailed-status"] = "Done"
2021 db_nslcmop_update
["detailed-status"] = "Done"
2022 nslcmop_operation_state
= "COMPLETED"
2025 self
._write
_ns
_status
(
2028 current_operation
="IDLE",
2029 current_operation_id
=None,
2030 error_description
=error_description_nsr
,
2031 error_detail
=error_detail
,
2032 other_update
=db_nsr_update
2034 self
._write
_op
_status
(
2037 error_message
=error_description_nslcmop
,
2038 operation_state
=nslcmop_operation_state
,
2039 other_update
=db_nslcmop_update
,
2042 if nslcmop_operation_state
:
2044 await self
.msg
.aiowrite("ns", "instantiated", {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
,
2045 "operationState": nslcmop_operation_state
},
2047 except Exception as e
:
2048 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
2050 self
.logger
.debug(logging_text
+ "Exit")
2051 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2053 async def _add_vca_relations(self
, logging_text
, nsr_id
, vca_index
: int,
2054 timeout
: int = 3600, vca_type
: str = None) -> bool:
2057 # 1. find all relations for this VCA
2058 # 2. wait for other peers related
2062 vca_type
= vca_type
or "lxc_proxy_charm"
2064 # STEP 1: find all relations for this VCA
2067 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2068 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2071 my_vca
= deep_get(db_nsr
, ('_admin', 'deployed', 'VCA'))[vca_index
]
2073 # read all ns-configuration relations
2074 ns_relations
= list()
2075 db_ns_relations
= deep_get(nsd
, ('ns-configuration', 'relation'))
2077 for r
in db_ns_relations
:
2078 # check if this VCA is in the relation
2079 if my_vca
.get('member-vnf-index') in\
2080 (r
.get('entities')[0].get('id'), r
.get('entities')[1].get('id')):
2081 ns_relations
.append(r
)
2083 # read all vnf-configuration relations
2084 vnf_relations
= list()
2085 db_vnfd_list
= db_nsr
.get('vnfd-id')
2087 for vnfd
in db_vnfd_list
:
2088 db_vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd
})
2089 db_vnf_relations
= get_configuration(db_vnfd
, db_vnfd
["id"]).get("relation", [])
2090 if db_vnf_relations
:
2091 for r
in db_vnf_relations
:
2092 # check if this VCA is in the relation
2093 if my_vca
.get('vdu_id') in (r
.get('entities')[0].get('id'), r
.get('entities')[1].get('id')):
2094 vnf_relations
.append(r
)
2096 # if no relations, terminate
2097 if not ns_relations
and not vnf_relations
:
2098 self
.logger
.debug(logging_text
+ ' No relations')
2101 self
.logger
.debug(logging_text
+ ' adding relations\n {}\n {}'.format(ns_relations
, vnf_relations
))
2108 if now
- start
>= timeout
:
2109 self
.logger
.error(logging_text
+ ' : timeout adding relations')
2112 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
2113 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2115 # for each defined NS relation, find the VCA's related
2116 for r
in ns_relations
.copy():
2117 from_vca_ee_id
= None
2119 from_vca_endpoint
= None
2120 to_vca_endpoint
= None
2121 vca_list
= deep_get(db_nsr
, ('_admin', 'deployed', 'VCA'))
2122 for vca
in vca_list
:
2123 if vca
.get('member-vnf-index') == r
.get('entities')[0].get('id') \
2124 and vca
.get('config_sw_installed'):
2125 from_vca_ee_id
= vca
.get('ee_id')
2126 from_vca_endpoint
= r
.get('entities')[0].get('endpoint')
2127 if vca
.get('member-vnf-index') == r
.get('entities')[1].get('id') \
2128 and vca
.get('config_sw_installed'):
2129 to_vca_ee_id
= vca
.get('ee_id')
2130 to_vca_endpoint
= r
.get('entities')[1].get('endpoint')
2131 if from_vca_ee_id
and to_vca_ee_id
:
2133 await self
.vca_map
[vca_type
].add_relation(
2134 ee_id_1
=from_vca_ee_id
,
2135 ee_id_2
=to_vca_ee_id
,
2136 endpoint_1
=from_vca_endpoint
,
2137 endpoint_2
=to_vca_endpoint
)
2138 # remove entry from relations list
2139 ns_relations
.remove(r
)
2141 # check failed peers
2143 vca_status_list
= db_nsr
.get('configurationStatus')
2145 for i
in range(len(vca_list
)):
2147 vca_status
= vca_status_list
[i
]
2148 if vca
.get('member-vnf-index') == r
.get('entities')[0].get('id'):
2149 if vca_status
.get('status') == 'BROKEN':
2150 # peer broken: remove relation from list
2151 ns_relations
.remove(r
)
2152 if vca
.get('member-vnf-index') == r
.get('entities')[1].get('id'):
2153 if vca_status
.get('status') == 'BROKEN':
2154 # peer broken: remove relation from list
2155 ns_relations
.remove(r
)
2160 # for each defined VNF relation, find the VCA's related
2161 for r
in vnf_relations
.copy():
2162 from_vca_ee_id
= None
2164 from_vca_endpoint
= None
2165 to_vca_endpoint
= None
2166 vca_list
= deep_get(db_nsr
, ('_admin', 'deployed', 'VCA'))
2167 for vca
in vca_list
:
2168 key_to_check
= "vdu_id"
2169 if vca
.get("vdu_id") is None:
2170 key_to_check
= "vnfd_id"
2171 if vca
.get(key_to_check
) == r
.get('entities')[0].get('id') and vca
.get('config_sw_installed'):
2172 from_vca_ee_id
= vca
.get('ee_id')
2173 from_vca_endpoint
= r
.get('entities')[0].get('endpoint')
2174 if vca
.get(key_to_check
) == r
.get('entities')[1].get('id') and vca
.get('config_sw_installed'):
2175 to_vca_ee_id
= vca
.get('ee_id')
2176 to_vca_endpoint
= r
.get('entities')[1].get('endpoint')
2177 if from_vca_ee_id
and to_vca_ee_id
:
2179 await self
.vca_map
[vca_type
].add_relation(
2180 ee_id_1
=from_vca_ee_id
,
2181 ee_id_2
=to_vca_ee_id
,
2182 endpoint_1
=from_vca_endpoint
,
2183 endpoint_2
=to_vca_endpoint
)
2184 # remove entry from relations list
2185 vnf_relations
.remove(r
)
2187 # check failed peers
2189 vca_status_list
= db_nsr
.get('configurationStatus')
2191 for i
in range(len(vca_list
)):
2193 vca_status
= vca_status_list
[i
]
2194 if vca
.get('vdu_id') == r
.get('entities')[0].get('id'):
2195 if vca_status
.get('status') == 'BROKEN':
2196 # peer broken: remove relation from list
2197 vnf_relations
.remove(r
)
2198 if vca
.get('vdu_id') == r
.get('entities')[1].get('id'):
2199 if vca_status
.get('status') == 'BROKEN':
2200 # peer broken: remove relation from list
2201 vnf_relations
.remove(r
)
2207 await asyncio
.sleep(5.0)
2209 if not ns_relations
and not vnf_relations
:
2210 self
.logger
.debug('Relations added')
2215 except Exception as e
:
2216 self
.logger
.warn(logging_text
+ ' ERROR adding relations: {}'.format(e
))
2219 async def _install_kdu(self
, nsr_id
: str, nsr_db_path
: str, vnfr_data
: dict, kdu_index
: int, kdud
: dict,
2220 vnfd
: dict, k8s_instance_info
: dict, k8params
: dict = None, timeout
: int = 600):
2223 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
2225 db_dict_install
= {"collection": "nsrs",
2226 "filter": {"_id": nsr_id
},
2227 "path": nsr_db_path
}
2229 kdu_instance
= self
.k8scluster_map
[k8sclustertype
].generate_kdu_instance_name(
2230 db_dict
=db_dict_install
,
2231 kdu_model
=k8s_instance_info
["kdu-model"],
2232 kdu_name
=k8s_instance_info
["kdu-name"],
2234 self
.update_db_2("nsrs", nsr_id
, {nsr_db_path
+ ".kdu-instance": kdu_instance
})
2235 await self
.k8scluster_map
[k8sclustertype
].install(
2236 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
2237 kdu_model
=k8s_instance_info
["kdu-model"],
2240 db_dict
=db_dict_install
,
2242 kdu_name
=k8s_instance_info
["kdu-name"],
2243 namespace
=k8s_instance_info
["namespace"],
2244 kdu_instance
=kdu_instance
,
2246 self
.update_db_2("nsrs", nsr_id
, {nsr_db_path
+ ".kdu-instance": kdu_instance
})
2248 # Obtain services to obtain management service ip
2249 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
2250 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
2251 kdu_instance
=kdu_instance
,
2252 namespace
=k8s_instance_info
["namespace"])
2254 # Obtain management service info (if exists)
2255 vnfr_update_dict
= {}
2257 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
2258 mgmt_services
= [service
for service
in kdud
.get("service", []) if service
.get("mgmt-service")]
2259 for mgmt_service
in mgmt_services
:
2260 for service
in services
:
2261 if service
["name"].startswith(mgmt_service
["name"]):
2262 # Mgmt service found, Obtain service ip
2263 ip
= service
.get("external_ip", service
.get("cluster_ip"))
2264 if isinstance(ip
, list) and len(ip
) == 1:
2267 vnfr_update_dict
["kdur.{}.ip-address".format(kdu_index
)] = ip
2269 # Check if must update also mgmt ip at the vnf
2270 service_external_cp
= mgmt_service
.get("external-connection-point-ref")
2271 if service_external_cp
:
2272 if deep_get(vnfd
, ("mgmt-interface", "cp")) == service_external_cp
:
2273 vnfr_update_dict
["ip-address"] = ip
2277 self
.logger
.warn("Mgmt service name: {} not found".format(mgmt_service
["name"]))
2279 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
2280 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
2282 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
2283 if kdu_config
and kdu_config
.get("initial-config-primitive") and \
2284 get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None:
2285 initial_config_primitive_list
= kdu_config
.get("initial-config-primitive")
2286 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
2288 for initial_config_primitive
in initial_config_primitive_list
:
2289 primitive_params_
= self
._map
_primitive
_params
(initial_config_primitive
, {}, {})
2291 await asyncio
.wait_for(
2292 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
2293 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
2294 kdu_instance
=kdu_instance
,
2295 primitive_name
=initial_config_primitive
["name"],
2296 params
=primitive_params_
, db_dict
=db_dict_install
),
2299 except Exception as e
:
2300 # Prepare update db with error and raise exception
2302 self
.update_db_2("nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)})
2303 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), {"kdur.{}.status".format(kdu_index
): "ERROR"})
2305 # ignore to keep original exception
2307 # reraise original error
2312 async def deploy_kdus(self
, logging_text
, nsr_id
, nslcmop_id
, db_vnfrs
, db_vnfds
, task_instantiation_info
):
2313 # Launch kdus if present in the descriptor
2315 k8scluster_id_2_uuic
= {"helm-chart-v3": {}, "helm-chart": {}, "juju-bundle": {}}
2317 async def _get_cluster_id(cluster_id
, cluster_type
):
2318 nonlocal k8scluster_id_2_uuic
2319 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
2320 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
2322 # check if K8scluster is creating and wait look if previous tasks in process
2323 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related("k8scluster", cluster_id
)
2325 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(task_name
, cluster_id
)
2326 self
.logger
.debug(logging_text
+ text
)
2327 await asyncio
.wait(task_dependency
, timeout
=3600)
2329 db_k8scluster
= self
.db
.get_one("k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False)
2330 if not db_k8scluster
:
2331 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
2333 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
2335 if cluster_type
== "helm-chart-v3":
2337 # backward compatibility for existing clusters that have not been initialized for helm v3
2338 k8s_credentials
= yaml
.safe_dump(db_k8scluster
.get("credentials"))
2339 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(k8s_credentials
,
2340 reuse_cluster_uuid
=cluster_id
)
2341 db_k8scluster_update
= {}
2342 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
2343 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
2344 db_k8scluster_update
["_admin.helm-chart-v3.created"] = uninstall_sw
2345 db_k8scluster_update
["_admin.helm-chart-v3.operationalState"] = "ENABLED"
2346 self
.update_db_2("k8sclusters", cluster_id
, db_k8scluster_update
)
2347 except Exception as e
:
2348 self
.logger
.error(logging_text
+ "error initializing helm-v3 cluster: {}".format(str(e
)))
2349 raise LcmException("K8s cluster '{}' has not been initialized for '{}'".format(cluster_id
,
2352 raise LcmException("K8s cluster '{}' has not been initialized for '{}'".
2353 format(cluster_id
, cluster_type
))
2354 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
2357 logging_text
+= "Deploy kdus: "
2360 db_nsr_update
= {"_admin.deployed.K8s": []}
2361 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2364 updated_cluster_list
= []
2365 updated_v3_cluster_list
= []
2367 for vnfr_data
in db_vnfrs
.values():
2368 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
2369 # Step 0: Prepare and set parameters
2370 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
2371 vnfd_id
= vnfr_data
.get('vnfd-id')
2372 vnfd_with_id
= find_in_list(db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
)
2373 kdud
= next(kdud
for kdud
in vnfd_with_id
["kdu"] if kdud
["name"] == kdur
["kdu-name"])
2374 namespace
= kdur
.get("k8s-namespace")
2375 if kdur
.get("helm-chart"):
2376 kdumodel
= kdur
["helm-chart"]
2377 # Default version: helm3, if helm-version is v2 assign v2
2378 k8sclustertype
= "helm-chart-v3"
2379 self
.logger
.debug("kdur: {}".format(kdur
))
2380 if kdur
.get("helm-version") and kdur
.get("helm-version") == "v2":
2381 k8sclustertype
= "helm-chart"
2382 elif kdur
.get("juju-bundle"):
2383 kdumodel
= kdur
["juju-bundle"]
2384 k8sclustertype
= "juju-bundle"
2386 raise LcmException("kdu type for kdu='{}.{}' is neither helm-chart nor "
2387 "juju-bundle. Maybe an old NBI version is running".
2388 format(vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]))
2389 # check if kdumodel is a file and exists
2391 vnfd_with_id
= find_in_list(db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
)
2392 storage
= deep_get(vnfd_with_id
, ('_admin', 'storage'))
2393 if storage
and storage
.get('pkg-dir'): # may be not present if vnfd has not artifacts
2394 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
2395 filename
= '{}/{}/{}s/{}'.format(storage
["folder"], storage
["pkg-dir"], k8sclustertype
,
2397 if self
.fs
.file_exists(filename
, mode
='file') or self
.fs
.file_exists(filename
, mode
='dir'):
2398 kdumodel
= self
.fs
.path
+ filename
2399 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
2401 except Exception: # it is not a file
2404 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
2405 step
= "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id
)
2406 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
2409 if (k8sclustertype
== "helm-chart" and cluster_uuid
not in updated_cluster_list
)\
2410 or (k8sclustertype
== "helm-chart-v3" and cluster_uuid
not in updated_v3_cluster_list
):
2411 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
2412 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(cluster_uuid
=cluster_uuid
))
2413 if del_repo_list
or added_repo_dict
:
2414 if k8sclustertype
== "helm-chart":
2415 unset
= {'_admin.helm_charts_added.' + item
: None for item
in del_repo_list
}
2416 updated
= {'_admin.helm_charts_added.' +
2417 item
: name
for item
, name
in added_repo_dict
.items()}
2418 updated_cluster_list
.append(cluster_uuid
)
2419 elif k8sclustertype
== "helm-chart-v3":
2420 unset
= {'_admin.helm_charts_v3_added.' + item
: None for item
in del_repo_list
}
2421 updated
= {'_admin.helm_charts_v3_added.' +
2422 item
: name
for item
, name
in added_repo_dict
.items()}
2423 updated_v3_cluster_list
.append(cluster_uuid
)
2424 self
.logger
.debug(logging_text
+ "repos synchronized on k8s cluster "
2425 "'{}' to_delete: {}, to_add: {}".
2426 format(k8s_cluster_id
, del_repo_list
, added_repo_dict
))
2427 self
.db
.set_one("k8sclusters", {"_id": k8s_cluster_id
}, updated
, unset
=unset
)
2430 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data
["member-vnf-index-ref"],
2431 kdur
["kdu-name"], k8s_cluster_id
)
2432 k8s_instance_info
= {"kdu-instance": None,
2433 "k8scluster-uuid": cluster_uuid
,
2434 "k8scluster-type": k8sclustertype
,
2435 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
2436 "kdu-name": kdur
["kdu-name"],
2437 "kdu-model": kdumodel
,
2438 "namespace": namespace
}
2439 db_path
= "_admin.deployed.K8s.{}".format(index
)
2440 db_nsr_update
[db_path
] = k8s_instance_info
2441 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2442 vnfd_with_id
= find_in_list(db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
)
2443 task
= asyncio
.ensure_future(
2444 self
._install
_kdu
(nsr_id
, db_path
, vnfr_data
, kdu_index
, kdud
, vnfd_with_id
,
2445 k8s_instance_info
, k8params
=desc_params
, timeout
=600))
2446 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_KDU-{}".format(index
), task
)
2447 task_instantiation_info
[task
] = "Deploying KDU {}".format(kdur
["kdu-name"])
2451 except (LcmException
, asyncio
.CancelledError
):
2453 except Exception as e
:
2454 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
2455 if isinstance(e
, (N2VCException
, DbException
)):
2456 self
.logger
.error(logging_text
+ msg
)
2458 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
2459 raise LcmException(msg
)
2462 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2464 def _deploy_n2vc(self
, logging_text
, db_nsr
, db_vnfr
, nslcmop_id
, nsr_id
, nsi_id
, vnfd_id
, vdu_id
,
2465 kdu_name
, member_vnf_index
, vdu_index
, vdu_name
, deploy_params
, descriptor_config
,
2466 base_folder
, task_instantiation_info
, stage
):
2467 # launch instantiate_N2VC in a asyncio task and register task object
2468 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
2469 # if not found, create one entry and update database
2470 # fill db_nsr._admin.deployed.VCA.<index>
2472 self
.logger
.debug(logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
))
2473 if "execution-environment-list" in descriptor_config
:
2474 ee_list
= descriptor_config
.get("execution-environment-list", [])
2475 else: # other types as script are not supported
2478 for ee_item
in ee_list
:
2479 self
.logger
.debug(logging_text
+ "_deploy_n2vc ee_item juju={}, helm={}".format(ee_item
.get('juju'),
2480 ee_item
.get("helm-chart")))
2481 ee_descriptor_id
= ee_item
.get("id")
2482 if ee_item
.get("juju"):
2483 vca_name
= ee_item
['juju'].get('charm')
2484 vca_type
= "lxc_proxy_charm" if ee_item
['juju'].get('charm') is not None else "native_charm"
2485 if ee_item
['juju'].get('cloud') == "k8s":
2486 vca_type
= "k8s_proxy_charm"
2487 elif ee_item
['juju'].get('proxy') is False:
2488 vca_type
= "native_charm"
2489 elif ee_item
.get("helm-chart"):
2490 vca_name
= ee_item
['helm-chart']
2491 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
2494 vca_type
= "helm-v3"
2496 self
.logger
.debug(logging_text
+ "skipping non juju neither charm configuration")
2500 for vca_index
, vca_deployed
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
2501 if not vca_deployed
:
2503 if vca_deployed
.get("member-vnf-index") == member_vnf_index
and \
2504 vca_deployed
.get("vdu_id") == vdu_id
and \
2505 vca_deployed
.get("kdu_name") == kdu_name
and \
2506 vca_deployed
.get("vdu_count_index", 0) == vdu_index
and \
2507 vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
:
2510 # not found, create one.
2511 target
= "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
2513 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
2515 target
+= "/kdu/{}".format(kdu_name
)
2517 "target_element": target
,
2518 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
2519 "member-vnf-index": member_vnf_index
,
2521 "kdu_name": kdu_name
,
2522 "vdu_count_index": vdu_index
,
2523 "operational-status": "init", # TODO revise
2524 "detailed-status": "", # TODO revise
2525 "step": "initial-deploy", # TODO revise
2527 "vdu_name": vdu_name
,
2529 "ee_descriptor_id": ee_descriptor_id
2533 # create VCA and configurationStatus in db
2535 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
2536 "configurationStatus.{}".format(vca_index
): dict()
2538 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2540 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
2542 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
2543 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
2544 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
2547 task_n2vc
= asyncio
.ensure_future(
2548 self
.instantiate_N2VC(
2549 logging_text
=logging_text
,
2550 vca_index
=vca_index
,
2556 vdu_index
=vdu_index
,
2557 deploy_params
=deploy_params
,
2558 config_descriptor
=descriptor_config
,
2559 base_folder
=base_folder
,
2560 nslcmop_id
=nslcmop_id
,
2564 ee_config_descriptor
=ee_item
2567 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_N2VC-{}".format(vca_index
), task_n2vc
)
2568 task_instantiation_info
[task_n2vc
] = self
.task_name_deploy_vca
+ " {}.{}".format(
2569 member_vnf_index
or "", vdu_id
or "")
2572 def _create_nslcmop(nsr_id
, operation
, params
):
2574 Creates a ns-lcm-opp content to be stored at database.
2575 :param nsr_id: internal id of the instance
2576 :param operation: instantiate, terminate, scale, action, ...
2577 :param params: user parameters for the operation
2578 :return: dictionary following SOL005 format
2580 # Raise exception if invalid arguments
2581 if not (nsr_id
and operation
and params
):
2583 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
2589 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
2590 "operationState": "PROCESSING",
2591 "statusEnteredTime": now
,
2592 "nsInstanceId": nsr_id
,
2593 "lcmOperationType": operation
,
2595 "isAutomaticInvocation": False,
2596 "operationParams": params
,
2597 "isCancelPending": False,
2599 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
2600 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
2605 def _format_additional_params(self
, params
):
2606 params
= params
or {}
2607 for key
, value
in params
.items():
2608 if str(value
).startswith("!!yaml "):
2609 params
[key
] = yaml
.safe_load(value
[7:])
2612 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
2613 primitive
= seq
.get('name')
2614 primitive_params
= {}
2616 "member_vnf_index": vnf_index
,
2617 "primitive": primitive
,
2618 "primitive_params": primitive_params
,
2621 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
2625 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
2626 op
= deep_get(db_nslcmop
, ('_admin', 'operations'), [])[op_index
]
2627 if op
.get('operationState') == 'COMPLETED':
2628 # b. Skip sub-operation
2629 # _ns_execute_primitive() or RO.create_action() will NOT be executed
2630 return self
.SUBOPERATION_STATUS_SKIP
2632 # c. retry executing sub-operation
2633 # The sub-operation exists, and operationState != 'COMPLETED'
2634 # Update operationState = 'PROCESSING' to indicate a retry.
2635 operationState
= 'PROCESSING'
2636 detailed_status
= 'In progress'
2637 self
._update
_suboperation
_status
(
2638 db_nslcmop
, op_index
, operationState
, detailed_status
)
2639 # Return the sub-operation index
2640 # _ns_execute_primitive() or RO.create_action() will be called from scale()
2641 # with arguments extracted from the sub-operation
2644 # Find a sub-operation where all keys in a matching dictionary must match
2645 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
2646 def _find_suboperation(self
, db_nslcmop
, match
):
2647 if db_nslcmop
and match
:
2648 op_list
= db_nslcmop
.get('_admin', {}).get('operations', [])
2649 for i
, op
in enumerate(op_list
):
2650 if all(op
.get(k
) == match
[k
] for k
in match
):
2652 return self
.SUBOPERATION_STATUS_NOT_FOUND
2654 # Update status for a sub-operation given its index
2655 def _update_suboperation_status(self
, db_nslcmop
, op_index
, operationState
, detailed_status
):
2656 # Update DB for HA tasks
2657 q_filter
= {'_id': db_nslcmop
['_id']}
2658 update_dict
= {'_admin.operations.{}.operationState'.format(op_index
): operationState
,
2659 '_admin.operations.{}.detailed-status'.format(op_index
): detailed_status
}
2660 self
.db
.set_one("nslcmops",
2662 update_dict
=update_dict
,
2663 fail_on_empty
=False)
2665 # Add sub-operation, return the index of the added sub-operation
2666 # Optionally, set operationState, detailed-status, and operationType
2667 # Status and type are currently set for 'scale' sub-operations:
2668 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
2669 # 'detailed-status' : status message
2670 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
2671 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
2672 def _add_suboperation(self
, db_nslcmop
, vnf_index
, vdu_id
, vdu_count_index
, vdu_name
, primitive
,
2673 mapped_primitive_params
, operationState
=None, detailed_status
=None, operationType
=None,
2674 RO_nsr_id
=None, RO_scaling_info
=None):
2676 return self
.SUBOPERATION_STATUS_NOT_FOUND
2677 # Get the "_admin.operations" list, if it exists
2678 db_nslcmop_admin
= db_nslcmop
.get('_admin', {})
2679 op_list
= db_nslcmop_admin
.get('operations')
2680 # Create or append to the "_admin.operations" list
2681 new_op
= {'member_vnf_index': vnf_index
,
2683 'vdu_count_index': vdu_count_index
,
2684 'primitive': primitive
,
2685 'primitive_params': mapped_primitive_params
}
2687 new_op
['operationState'] = operationState
2689 new_op
['detailed-status'] = detailed_status
2691 new_op
['lcmOperationType'] = operationType
2693 new_op
['RO_nsr_id'] = RO_nsr_id
2695 new_op
['RO_scaling_info'] = RO_scaling_info
2697 # No existing operations, create key 'operations' with current operation as first list element
2698 db_nslcmop_admin
.update({'operations': [new_op
]})
2699 op_list
= db_nslcmop_admin
.get('operations')
2701 # Existing operations, append operation to list
2702 op_list
.append(new_op
)
2704 db_nslcmop_update
= {'_admin.operations': op_list
}
2705 self
.update_db_2("nslcmops", db_nslcmop
['_id'], db_nslcmop_update
)
2706 op_index
= len(op_list
) - 1
2709 # Helper methods for scale() sub-operations
2711 # pre-scale/post-scale:
2712 # Check for 3 different cases:
2713 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
2714 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
2715 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
2716 def _check_or_add_scale_suboperation(self
, db_nslcmop
, vnf_index
, vnf_config_primitive
, primitive_params
,
2717 operationType
, RO_nsr_id
=None, RO_scaling_info
=None):
2718 # Find this sub-operation
2719 if RO_nsr_id
and RO_scaling_info
:
2720 operationType
= 'SCALE-RO'
2722 'member_vnf_index': vnf_index
,
2723 'RO_nsr_id': RO_nsr_id
,
2724 'RO_scaling_info': RO_scaling_info
,
2728 'member_vnf_index': vnf_index
,
2729 'primitive': vnf_config_primitive
,
2730 'primitive_params': primitive_params
,
2731 'lcmOperationType': operationType
2733 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
2734 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
2735 # a. New sub-operation
2736 # The sub-operation does not exist, add it.
2737 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
2738 # The following parameters are set to None for all kind of scaling:
2740 vdu_count_index
= None
2742 if RO_nsr_id
and RO_scaling_info
:
2743 vnf_config_primitive
= None
2744 primitive_params
= None
2747 RO_scaling_info
= None
2748 # Initial status for sub-operation
2749 operationState
= 'PROCESSING'
2750 detailed_status
= 'In progress'
2751 # Add sub-operation for pre/post-scaling (zero or more operations)
2752 self
._add
_suboperation
(db_nslcmop
,
2757 vnf_config_primitive
,
2764 return self
.SUBOPERATION_STATUS_NEW
2766 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
2767 # or op_index (operationState != 'COMPLETED')
2768 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
2770 # Function to return execution_environment id
2772 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
2773 # TODO vdu_index_count
2774 for vca
in vca_deployed_list
:
2775 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
2778 async def destroy_N2VC(self
, logging_text
, db_nslcmop
, vca_deployed
, config_descriptor
,
2779 vca_index
, destroy_ee
=True, exec_primitives
=True, scaling_in
=False):
2781 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
2782 :param logging_text:
2784 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
2785 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
2786 :param vca_index: index in the database _admin.deployed.VCA
2787 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
2788 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
2789 not executed properly
2790 :param scaling_in: True destroys the application, False destroys the model
2791 :return: None or exception
2795 logging_text
+ " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
2796 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
2800 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
2802 # execute terminate_primitives
2804 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
2805 config_descriptor
.get("terminate-config-primitive"), vca_deployed
.get("ee_descriptor_id"))
2806 vdu_id
= vca_deployed
.get("vdu_id")
2807 vdu_count_index
= vca_deployed
.get("vdu_count_index")
2808 vdu_name
= vca_deployed
.get("vdu_name")
2809 vnf_index
= vca_deployed
.get("member-vnf-index")
2810 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
2811 for seq
in terminate_primitives
:
2812 # For each sequence in list, get primitive and call _ns_execute_primitive()
2813 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
2814 vnf_index
, seq
.get("name"))
2815 self
.logger
.debug(logging_text
+ step
)
2816 # Create the primitive for each sequence, i.e. "primitive": "touch"
2817 primitive
= seq
.get('name')
2818 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(seq
, vnf_index
)
2821 self
._add
_suboperation
(db_nslcmop
,
2827 mapped_primitive_params
)
2828 # Sub-operations: Call _ns_execute_primitive() instead of action()
2830 result
, result_detail
= await self
._ns
_execute
_primitive
(vca_deployed
["ee_id"], primitive
,
2831 mapped_primitive_params
,
2833 except LcmException
:
2834 # this happens when VCA is not deployed. In this case it is not needed to terminate
2836 result_ok
= ['COMPLETED', 'PARTIALLY_COMPLETED']
2837 if result
not in result_ok
:
2838 raise LcmException("terminate_primitive {} for vnf_member_index={} fails with "
2839 "error {}".format(seq
.get("name"), vnf_index
, result_detail
))
2840 # set that this VCA do not need terminated
2841 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(vca_index
)
2842 self
.update_db_2("nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False})
2844 if vca_deployed
.get("prometheus_jobs") and self
.prometheus
:
2845 await self
.prometheus
.update(remove_jobs
=vca_deployed
["prometheus_jobs"])
2848 await self
.vca_map
[vca_type
].delete_execution_environment(vca_deployed
["ee_id"], scaling_in
=scaling_in
)
2850 async def _delete_all_N2VC(self
, db_nsr
: dict):
2851 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
='TERMINATING')
2852 namespace
= "." + db_nsr
["_id"]
2854 await self
.n2vc
.delete_namespace(namespace
=namespace
, total_timeout
=self
.timeout_charm_delete
)
2855 except N2VCNotFound
: # already deleted. Skip
2857 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
='DELETED')
2859 async def _terminate_RO(self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
):
2861 Terminates a deployment from RO
2862 :param logging_text:
2863 :param nsr_deployed: db_nsr._admin.deployed
2866 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
2867 this method will update only the index 2, but it will write on database the concatenated content of the list
2872 ro_nsr_id
= ro_delete_action
= None
2873 if nsr_deployed
and nsr_deployed
.get("RO"):
2874 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
2875 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
2878 stage
[2] = "Deleting ns from VIM."
2879 db_nsr_update
["detailed-status"] = " ".join(stage
)
2880 self
._write
_op
_status
(nslcmop_id
, stage
)
2881 self
.logger
.debug(logging_text
+ stage
[2])
2882 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2883 self
._write
_op
_status
(nslcmop_id
, stage
)
2884 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
2885 ro_delete_action
= desc
["action_id"]
2886 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = ro_delete_action
2887 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
2888 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
2889 if ro_delete_action
:
2890 # wait until NS is deleted from VIM
2891 stage
[2] = "Waiting ns deleted from VIM."
2892 detailed_status_old
= None
2893 self
.logger
.debug(logging_text
+ stage
[2] + " RO_id={} ro_delete_action={}".format(ro_nsr_id
,
2895 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2896 self
._write
_op
_status
(nslcmop_id
, stage
)
2898 delete_timeout
= 20 * 60 # 20 minutes
2899 while delete_timeout
> 0:
2900 desc
= await self
.RO
.show(
2902 item_id_name
=ro_nsr_id
,
2903 extra_item
="action",
2904 extra_item_id
=ro_delete_action
)
2907 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
2909 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
2910 if ns_status
== "ERROR":
2911 raise ROclient
.ROClientException(ns_status_info
)
2912 elif ns_status
== "BUILD":
2913 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
2914 elif ns_status
== "ACTIVE":
2915 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
2916 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
2919 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status
)
2920 if stage
[2] != detailed_status_old
:
2921 detailed_status_old
= stage
[2]
2922 db_nsr_update
["detailed-status"] = " ".join(stage
)
2923 self
._write
_op
_status
(nslcmop_id
, stage
)
2924 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2925 await asyncio
.sleep(5, loop
=self
.loop
)
2927 else: # delete_timeout <= 0:
2928 raise ROclient
.ROClientException("Timeout waiting ns deleted from VIM")
2930 except Exception as e
:
2931 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2932 if isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404: # not found
2933 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
2934 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
2935 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
2936 self
.logger
.debug(logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
))
2937 elif isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409: # conflict
2938 failed_detail
.append("delete conflict: {}".format(e
))
2939 self
.logger
.debug(logging_text
+ "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
))
2941 failed_detail
.append("delete error: {}".format(e
))
2942 self
.logger
.error(logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
))
2945 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
2946 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
2948 stage
[2] = "Deleting nsd from RO."
2949 db_nsr_update
["detailed-status"] = " ".join(stage
)
2950 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2951 self
._write
_op
_status
(nslcmop_id
, stage
)
2952 await self
.RO
.delete("nsd", ro_nsd_id
)
2953 self
.logger
.debug(logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
))
2954 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
2955 except Exception as e
:
2956 if isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404: # not found
2957 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
2958 self
.logger
.debug(logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
))
2959 elif isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409: # conflict
2960 failed_detail
.append("ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
))
2961 self
.logger
.debug(logging_text
+ failed_detail
[-1])
2963 failed_detail
.append("ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
))
2964 self
.logger
.error(logging_text
+ failed_detail
[-1])
2966 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
2967 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
2968 if not vnf_deployed
or not vnf_deployed
["id"]:
2971 ro_vnfd_id
= vnf_deployed
["id"]
2972 stage
[2] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
2973 vnf_deployed
["member-vnf-index"], ro_vnfd_id
)
2974 db_nsr_update
["detailed-status"] = " ".join(stage
)
2975 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2976 self
._write
_op
_status
(nslcmop_id
, stage
)
2977 await self
.RO
.delete("vnfd", ro_vnfd_id
)
2978 self
.logger
.debug(logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
))
2979 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
2980 except Exception as e
:
2981 if isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404: # not found
2982 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
2983 self
.logger
.debug(logging_text
+ "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
))
2984 elif isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409: # conflict
2985 failed_detail
.append("ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
))
2986 self
.logger
.debug(logging_text
+ failed_detail
[-1])
2988 failed_detail
.append("ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
))
2989 self
.logger
.error(logging_text
+ failed_detail
[-1])
2992 stage
[2] = "Error deleting from VIM"
2994 stage
[2] = "Deleted from VIM"
2995 db_nsr_update
["detailed-status"] = " ".join(stage
)
2996 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2997 self
._write
_op
_status
(nslcmop_id
, stage
)
3000 raise LcmException("; ".join(failed_detail
))
3002 async def terminate(self
, nsr_id
, nslcmop_id
):
3003 # Try to lock HA task here
3004 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
3005 if not task_is_locked_by_me
:
3008 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
3009 self
.logger
.debug(logging_text
+ "Enter")
3010 timeout_ns_terminate
= self
.timeout_ns_terminate
3013 operation_params
= None
3015 error_list
= [] # annotates all failed error messages
3016 db_nslcmop_update
= {}
3017 autoremove
= False # autoremove after terminated
3018 tasks_dict_info
= {}
3020 stage
= ["Stage 1/3: Preparing task.", "Waiting for previous operations to terminate.", ""]
3021 # ^ contains [stage, step, VIM-status]
3023 # wait for any previous tasks in process
3024 await self
.lcm_tasks
.waitfor_related_HA("ns", 'nslcmops', nslcmop_id
)
3026 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
3027 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
3028 operation_params
= db_nslcmop
.get("operationParams") or {}
3029 if operation_params
.get("timeout_ns_terminate"):
3030 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
3031 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
3032 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3034 db_nsr_update
["operational-status"] = "terminating"
3035 db_nsr_update
["config-status"] = "terminating"
3036 self
._write
_ns
_status
(
3038 ns_state
="TERMINATING",
3039 current_operation
="TERMINATING",
3040 current_operation_id
=nslcmop_id
,
3041 other_update
=db_nsr_update
3043 self
._write
_op
_status
(
3048 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
3049 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
3052 stage
[1] = "Getting vnf descriptors from db."
3053 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
3054 db_vnfds_from_id
= {}
3055 db_vnfds_from_member_index
= {}
3057 for vnfr
in db_vnfrs_list
:
3058 vnfd_id
= vnfr
["vnfd-id"]
3059 if vnfd_id
not in db_vnfds_from_id
:
3060 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
3061 db_vnfds_from_id
[vnfd_id
] = vnfd
3062 db_vnfds_from_member_index
[vnfr
["member-vnf-index-ref"]] = db_vnfds_from_id
[vnfd_id
]
3064 # Destroy individual execution environments when there are terminating primitives.
3065 # Rest of EE will be deleted at once
3066 # TODO - check before calling _destroy_N2VC
3067 # if not operation_params.get("skip_terminate_primitives"):#
3068 # or not vca.get("needed_terminate"):
3069 stage
[0] = "Stage 2/3 execute terminating primitives."
3070 self
.logger
.debug(logging_text
+ stage
[0])
3071 stage
[1] = "Looking execution environment that needs terminate."
3072 self
.logger
.debug(logging_text
+ stage
[1])
3074 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
3075 config_descriptor
= None
3076 if not vca
or not vca
.get("ee_id"):
3078 if not vca
.get("member-vnf-index"):
3080 config_descriptor
= db_nsr
.get("ns-configuration")
3081 elif vca
.get("vdu_id"):
3082 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
3083 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
3084 elif vca
.get("kdu_name"):
3085 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
3086 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
3088 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
3089 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
3090 vca_type
= vca
.get("type")
3091 exec_terminate_primitives
= (not operation_params
.get("skip_terminate_primitives") and
3092 vca
.get("needed_terminate"))
3093 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
3094 # pending native charms
3095 destroy_ee
= True if vca_type
in ("helm", "helm-v3", "native_charm") else False
3096 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
3097 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
3098 task
= asyncio
.ensure_future(
3099 self
.destroy_N2VC(logging_text
, db_nslcmop
, vca
, config_descriptor
, vca_index
,
3100 destroy_ee
, exec_terminate_primitives
))
3101 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
3103 # wait for pending tasks of terminate primitives
3105 self
.logger
.debug(logging_text
+ 'Waiting for tasks {}'.format(list(tasks_dict_info
.keys())))
3106 error_list
= await self
._wait
_for
_tasks
(logging_text
, tasks_dict_info
,
3107 min(self
.timeout_charm_delete
, timeout_ns_terminate
),
3109 tasks_dict_info
.clear()
3111 return # raise LcmException("; ".join(error_list))
3113 # remove All execution environments at once
3114 stage
[0] = "Stage 3/3 delete all."
3116 if nsr_deployed
.get("VCA"):
3117 stage
[1] = "Deleting all execution environments."
3118 self
.logger
.debug(logging_text
+ stage
[1])
3119 task_delete_ee
= asyncio
.ensure_future(asyncio
.wait_for(self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
),
3120 timeout
=self
.timeout_charm_delete
))
3121 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
3122 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
3124 # Delete from k8scluster
3125 stage
[1] = "Deleting KDUs."
3126 self
.logger
.debug(logging_text
+ stage
[1])
3127 # print(nsr_deployed)
3128 for kdu
in get_iterable(nsr_deployed
, "K8s"):
3129 if not kdu
or not kdu
.get("kdu-instance"):
3131 kdu_instance
= kdu
.get("kdu-instance")
3132 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
3133 task_delete_kdu_instance
= asyncio
.ensure_future(
3134 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
3135 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3136 kdu_instance
=kdu_instance
))
3138 self
.logger
.error(logging_text
+ "Unknown k8s deployment type {}".
3139 format(kdu
.get("k8scluster-type")))
3141 tasks_dict_info
[task_delete_kdu_instance
] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
3144 stage
[1] = "Deleting ns from VIM."
3146 task_delete_ro
= asyncio
.ensure_future(
3147 self
._terminate
_ng
_ro
(logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
))
3149 task_delete_ro
= asyncio
.ensure_future(
3150 self
._terminate
_RO
(logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
))
3151 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
3153 # rest of staff will be done at finally
3155 except (ROclient
.ROClientException
, DbException
, LcmException
, N2VCException
) as e
:
3156 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
3158 except asyncio
.CancelledError
:
3159 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(stage
[1]))
3160 exc
= "Operation was cancelled"
3161 except Exception as e
:
3162 exc
= traceback
.format_exc()
3163 self
.logger
.critical(logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
), exc_info
=True)
3166 error_list
.append(str(exc
))
3168 # wait for pending tasks
3170 stage
[1] = "Waiting for terminate pending tasks."
3171 self
.logger
.debug(logging_text
+ stage
[1])
3172 error_list
+= await self
._wait
_for
_tasks
(logging_text
, tasks_dict_info
, timeout_ns_terminate
,
3174 stage
[1] = stage
[2] = ""
3175 except asyncio
.CancelledError
:
3176 error_list
.append("Cancelled")
3177 # TODO cancell all tasks
3178 except Exception as exc
:
3179 error_list
.append(str(exc
))
3180 # update status at database
3182 error_detail
= "; ".join(error_list
)
3183 # self.logger.error(logging_text + error_detail)
3184 error_description_nslcmop
= '{} Detail: {}'.format(stage
[0], error_detail
)
3185 error_description_nsr
= 'Operation: TERMINATING.{}, {}.'.format(nslcmop_id
, stage
[0])
3187 db_nsr_update
["operational-status"] = "failed"
3188 db_nsr_update
["detailed-status"] = error_description_nsr
+ " Detail: " + error_detail
3189 db_nslcmop_update
["detailed-status"] = error_detail
3190 nslcmop_operation_state
= "FAILED"
3194 error_description_nsr
= error_description_nslcmop
= None
3195 ns_state
= "NOT_INSTANTIATED"
3196 db_nsr_update
["operational-status"] = "terminated"
3197 db_nsr_update
["detailed-status"] = "Done"
3198 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
3199 db_nslcmop_update
["detailed-status"] = "Done"
3200 nslcmop_operation_state
= "COMPLETED"
3203 self
._write
_ns
_status
(
3206 current_operation
="IDLE",
3207 current_operation_id
=None,
3208 error_description
=error_description_nsr
,
3209 error_detail
=error_detail
,
3210 other_update
=db_nsr_update
3212 self
._write
_op
_status
(
3215 error_message
=error_description_nslcmop
,
3216 operation_state
=nslcmop_operation_state
,
3217 other_update
=db_nslcmop_update
,
3219 if ns_state
== "NOT_INSTANTIATED":
3221 self
.db
.set_list("vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "NOT_INSTANTIATED"})
3222 except DbException
as e
:
3223 self
.logger
.warn(logging_text
+ 'Error writing VNFR status for nsr-id-ref: {} -> {}'.
3225 if operation_params
:
3226 autoremove
= operation_params
.get("autoremove", False)
3227 if nslcmop_operation_state
:
3229 await self
.msg
.aiowrite("ns", "terminated", {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
,
3230 "operationState": nslcmop_operation_state
,
3231 "autoremove": autoremove
},
3233 except Exception as e
:
3234 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
3236 self
.logger
.debug(logging_text
+ "Exit")
3237 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
3239 async def _wait_for_tasks(self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None):
3241 error_detail_list
= []
3243 pending_tasks
= list(created_tasks_info
.keys())
3244 num_tasks
= len(pending_tasks
)
3246 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
3247 self
._write
_op
_status
(nslcmop_id
, stage
)
3248 while pending_tasks
:
3250 _timeout
= timeout
+ time_start
- time()
3251 done
, pending_tasks
= await asyncio
.wait(pending_tasks
, timeout
=_timeout
,
3252 return_when
=asyncio
.FIRST_COMPLETED
)
3253 num_done
+= len(done
)
3254 if not done
: # Timeout
3255 for task
in pending_tasks
:
3256 new_error
= created_tasks_info
[task
] + ": Timeout"
3257 error_detail_list
.append(new_error
)
3258 error_list
.append(new_error
)
3261 if task
.cancelled():
3264 exc
= task
.exception()
3266 if isinstance(exc
, asyncio
.TimeoutError
):
3268 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
3269 error_list
.append(created_tasks_info
[task
])
3270 error_detail_list
.append(new_error
)
3271 if isinstance(exc
, (str, DbException
, N2VCException
, ROclient
.ROClientException
, LcmException
,
3272 K8sException
, NgRoException
)):
3273 self
.logger
.error(logging_text
+ new_error
)
3275 exc_traceback
= "".join(traceback
.format_exception(None, exc
, exc
.__traceback
__))
3276 self
.logger
.error(logging_text
+ created_tasks_info
[task
] + " " + exc_traceback
)
3278 self
.logger
.debug(logging_text
+ created_tasks_info
[task
] + ": Done")
3279 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
3281 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
3282 if nsr_id
: # update also nsr
3283 self
.update_db_2("nsrs", nsr_id
, {"errorDescription": "Error at: " + ", ".join(error_list
),
3284 "errorDetail": ". ".join(error_detail_list
)})
3285 self
._write
_op
_status
(nslcmop_id
, stage
)
3286 return error_detail_list
3289 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
3291 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
3292 The default-value is used. If it is between < > it look for a value at instantiation_params
3293 :param primitive_desc: portion of VNFD/NSD that describes primitive
3294 :param params: Params provided by user
3295 :param instantiation_params: Instantiation params provided by user
3296 :return: a dictionary with the calculated params
3298 calculated_params
= {}
3299 for parameter
in primitive_desc
.get("parameter", ()):
3300 param_name
= parameter
["name"]
3301 if param_name
in params
:
3302 calculated_params
[param_name
] = params
[param_name
]
3303 elif "default-value" in parameter
or "value" in parameter
:
3304 if "value" in parameter
:
3305 calculated_params
[param_name
] = parameter
["value"]
3307 calculated_params
[param_name
] = parameter
["default-value"]
3308 if isinstance(calculated_params
[param_name
], str) and calculated_params
[param_name
].startswith("<") \
3309 and calculated_params
[param_name
].endswith(">"):
3310 if calculated_params
[param_name
][1:-1] in instantiation_params
:
3311 calculated_params
[param_name
] = instantiation_params
[calculated_params
[param_name
][1:-1]]
3313 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3314 format(calculated_params
[param_name
], primitive_desc
["name"]))
3316 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3317 format(param_name
, primitive_desc
["name"]))
3319 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
3320 calculated_params
[param_name
] = yaml
.safe_dump(calculated_params
[param_name
],
3321 default_flow_style
=True, width
=256)
3322 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[param_name
].startswith("!!yaml "):
3323 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
3324 if parameter
.get("data-type") == "INTEGER":
3326 calculated_params
[param_name
] = int(calculated_params
[param_name
])
3327 except ValueError: # error converting string to int
3329 "Parameter {} of primitive {} must be integer".format(param_name
, primitive_desc
["name"]))
3330 elif parameter
.get("data-type") == "BOOLEAN":
3331 calculated_params
[param_name
] = not ((str(calculated_params
[param_name
])).lower() == 'false')
3333 # add always ns_config_info if primitive name is config
3334 if primitive_desc
["name"] == "config":
3335 if "ns_config_info" in instantiation_params
:
3336 calculated_params
["ns_config_info"] = instantiation_params
["ns_config_info"]
3337 return calculated_params
3339 def _look_for_deployed_vca(self
, deployed_vca
, member_vnf_index
, vdu_id
, vdu_count_index
, kdu_name
=None,
3340 ee_descriptor_id
=None):
3341 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
3342 for vca
in deployed_vca
:
3345 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
3347 if vdu_count_index
is not None and vdu_count_index
!= vca
["vdu_count_index"]:
3349 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
3351 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
3355 # vca_deployed not found
3356 raise LcmException("charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
3357 " is not deployed".format(member_vnf_index
, vdu_id
, vdu_count_index
, kdu_name
,
3360 ee_id
= vca
.get("ee_id")
3361 vca_type
= vca
.get("type", "lxc_proxy_charm") # default value for backward compatibility - proxy charm
3363 raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
3364 "execution environment"
3365 .format(member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
))
3366 return ee_id
, vca_type
3368 async def _ns_execute_primitive(self
, ee_id
, primitive
, primitive_params
, retries
=0, retries_interval
=30,
3369 timeout
=None, vca_type
=None, db_dict
=None) -> (str, str):
3371 if primitive
== "config":
3372 primitive_params
= {"params": primitive_params
}
3374 vca_type
= vca_type
or "lxc_proxy_charm"
3378 output
= await asyncio
.wait_for(
3379 self
.vca_map
[vca_type
].exec_primitive(
3381 primitive_name
=primitive
,
3382 params_dict
=primitive_params
,
3383 progress_timeout
=self
.timeout_progress_primitive
,
3384 total_timeout
=self
.timeout_primitive
,
3386 timeout
=timeout
or self
.timeout_primitive
)
3389 except asyncio
.CancelledError
:
3391 except Exception as e
: # asyncio.TimeoutError
3392 if isinstance(e
, asyncio
.TimeoutError
):
3396 self
.logger
.debug('Error executing action {} on {} -> {}'.format(primitive
, ee_id
, e
))
3398 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
3400 return 'FAILED', str(e
)
3402 return 'COMPLETED', output
3404 except (LcmException
, asyncio
.CancelledError
):
3406 except Exception as e
:
3407 return 'FAIL', 'Error executing action {}: {}'.format(primitive
, e
)
3409 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
3411 Updating the vca_status with latest juju information in nsrs record
3412 :param: nsr_id: Id of the nsr
3413 :param: nslcmop_id: Id of the nslcmop
3417 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
3418 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3419 if db_nsr
['_admin']['deployed']['K8s']:
3420 for k8s_index
, k8s
in enumerate(db_nsr
['_admin']['deployed']['K8s']):
3421 cluster_uuid
, kdu_instance
= k8s
["k8scluster-uuid"], k8s
["kdu-instance"]
3422 await self
._on
_update
_k
8s
_db
(cluster_uuid
, kdu_instance
, filter={'_id': nsr_id
})
3424 for vca_index
, _
in enumerate(db_nsr
['_admin']['deployed']['VCA']):
3425 table
, filter = "nsrs", {"_id": nsr_id
}
3426 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
3427 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
3429 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
3430 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
3432 async def action(self
, nsr_id
, nslcmop_id
):
3433 # Try to lock HA task here
3434 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
3435 if not task_is_locked_by_me
:
3438 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
3439 self
.logger
.debug(logging_text
+ "Enter")
3440 # get all needed from database
3444 db_nslcmop_update
= {}
3445 nslcmop_operation_state
= None
3446 error_description_nslcmop
= None
3449 # wait for any previous tasks in process
3450 step
= "Waiting for previous operations to terminate"
3451 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
3453 self
._write
_ns
_status
(
3456 current_operation
="RUNNING ACTION",
3457 current_operation_id
=nslcmop_id
3460 step
= "Getting information from database"
3461 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
3462 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3464 nsr_deployed
= db_nsr
["_admin"].get("deployed")
3465 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
3466 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
3467 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
3468 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
3469 primitive
= db_nslcmop
["operationParams"]["primitive"]
3470 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
3471 timeout_ns_action
= db_nslcmop
["operationParams"].get("timeout_ns_action", self
.timeout_primitive
)
3474 step
= "Getting vnfr from database"
3475 db_vnfr
= self
.db
.get_one("vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
})
3476 step
= "Getting vnfd from database"
3477 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
3479 step
= "Getting nsd from database"
3480 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
3482 # for backward compatibility
3483 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
3484 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
3485 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
3486 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3488 # look for primitive
3489 config_primitive_desc
= descriptor_configuration
= None
3491 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
3493 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
3495 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
3497 descriptor_configuration
= db_nsd
.get("ns-configuration")
3499 if descriptor_configuration
and descriptor_configuration
.get("config-primitive"):
3500 for config_primitive
in descriptor_configuration
["config-primitive"]:
3501 if config_primitive
["name"] == primitive
:
3502 config_primitive_desc
= config_primitive
3505 if not config_primitive_desc
:
3506 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
3507 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
3509 primitive_name
= primitive
3510 ee_descriptor_id
= None
3512 primitive_name
= config_primitive_desc
.get("execution-environment-primitive", primitive
)
3513 ee_descriptor_id
= config_primitive_desc
.get("execution-environment-ref")
3517 vdur
= next((x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None)
3518 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
3520 kdur
= next((x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None)
3521 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3523 desc_params
= parse_yaml_strings(db_vnfr
.get("additionalParamsForVnf"))
3525 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
3526 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
3527 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
3529 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
3530 actions
.add(primitive
["name"])
3531 for primitive
in kdu_configuration
.get("config-primitive", []):
3532 actions
.add(primitive
["name"])
3533 kdu_action
= True if primitive_name
in actions
else False
3535 # TODO check if ns is in a proper status
3536 if kdu_name
and (primitive_name
in ("upgrade", "rollback", "status") or kdu_action
):
3537 # kdur and desc_params already set from before
3538 if primitive_params
:
3539 desc_params
.update(primitive_params
)
3540 # TODO Check if we will need something at vnf level
3541 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
3542 if kdu_name
== kdu
["kdu-name"] and kdu
["member-vnf-index"] == vnf_index
:
3545 raise LcmException("KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
))
3547 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
3548 msg
= "unknown k8scluster-type '{}'".format(kdu
.get("k8scluster-type"))
3549 raise LcmException(msg
)
3551 db_dict
= {"collection": "nsrs",
3552 "filter": {"_id": nsr_id
},
3553 "path": "_admin.deployed.K8s.{}".format(index
)}
3554 self
.logger
.debug(logging_text
+ "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
))
3555 step
= "Executing kdu {}".format(primitive_name
)
3556 if primitive_name
== "upgrade":
3557 if desc_params
.get("kdu_model"):
3558 kdu_model
= desc_params
.get("kdu_model")
3559 del desc_params
["kdu_model"]
3561 kdu_model
= kdu
.get("kdu-model")
3562 parts
= kdu_model
.split(sep
=":")
3564 kdu_model
= parts
[0]
3566 detailed_status
= await asyncio
.wait_for(
3567 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
3568 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3569 kdu_instance
=kdu
.get("kdu-instance"),
3570 atomic
=True, kdu_model
=kdu_model
,
3571 params
=desc_params
, db_dict
=db_dict
,
3572 timeout
=timeout_ns_action
),
3573 timeout
=timeout_ns_action
+ 10)
3574 self
.logger
.debug(logging_text
+ " Upgrade of kdu {} done".format(detailed_status
))
3575 elif primitive_name
== "rollback":
3576 detailed_status
= await asyncio
.wait_for(
3577 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
3578 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3579 kdu_instance
=kdu
.get("kdu-instance"),
3581 timeout
=timeout_ns_action
)
3582 elif primitive_name
== "status":
3583 detailed_status
= await asyncio
.wait_for(
3584 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
3585 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3586 kdu_instance
=kdu
.get("kdu-instance")),
3587 timeout
=timeout_ns_action
)
3589 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(kdu
["kdu-name"], nsr_id
)
3590 params
= self
._map
_primitive
_params
(config_primitive_desc
, primitive_params
, desc_params
)
3592 detailed_status
= await asyncio
.wait_for(
3593 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
3594 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3595 kdu_instance
=kdu_instance
,
3596 primitive_name
=primitive_name
,
3597 params
=params
, db_dict
=db_dict
,
3598 timeout
=timeout_ns_action
),
3599 timeout
=timeout_ns_action
)
3602 nslcmop_operation_state
= 'COMPLETED'
3604 detailed_status
= ''
3605 nslcmop_operation_state
= 'FAILED'
3607 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(nsr_deployed
["VCA"], member_vnf_index
=vnf_index
,
3608 vdu_id
=vdu_id
, vdu_count_index
=vdu_count_index
,
3609 ee_descriptor_id
=ee_descriptor_id
)
3610 for vca_index
, vca_deployed
in enumerate(db_nsr
['_admin']['deployed']['VCA']):
3611 if vca_deployed
.get("member-vnf-index") == vnf_index
:
3612 db_dict
= {"collection": "nsrs",
3613 "filter": {"_id": nsr_id
},
3614 "path": "_admin.deployed.VCA.{}.".format(vca_index
)}
3616 nslcmop_operation_state
, detailed_status
= await self
._ns
_execute
_primitive
(
3618 primitive
=primitive_name
,
3619 primitive_params
=self
._map
_primitive
_params
(config_primitive_desc
, primitive_params
, desc_params
),
3620 timeout
=timeout_ns_action
,
3624 db_nslcmop_update
["detailed-status"] = detailed_status
3625 error_description_nslcmop
= detailed_status
if nslcmop_operation_state
== "FAILED" else ""
3626 self
.logger
.debug(logging_text
+ " task Done with result {} {}".format(nslcmop_operation_state
,
3628 return # database update is called inside finally
3630 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
3631 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
3633 except asyncio
.CancelledError
:
3634 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(step
))
3635 exc
= "Operation was cancelled"
3636 except asyncio
.TimeoutError
:
3637 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
3639 except Exception as e
:
3640 exc
= traceback
.format_exc()
3641 self
.logger
.critical(logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
3644 db_nslcmop_update
["detailed-status"] = detailed_status
= error_description_nslcmop
= \
3645 "FAILED {}: {}".format(step
, exc
)
3646 nslcmop_operation_state
= "FAILED"
3648 self
._write
_ns
_status
(
3650 ns_state
=db_nsr
["nsState"], # TODO check if degraded. For the moment use previous status
3651 current_operation
="IDLE",
3652 current_operation_id
=None,
3653 # error_description=error_description_nsr,
3654 # error_detail=error_detail,
3655 other_update
=db_nsr_update
3658 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
="", error_message
=error_description_nslcmop
,
3659 operation_state
=nslcmop_operation_state
, other_update
=db_nslcmop_update
)
3661 if nslcmop_operation_state
:
3663 await self
.msg
.aiowrite("ns", "actioned", {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
,
3664 "operationState": nslcmop_operation_state
},
3666 except Exception as e
:
3667 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
3668 self
.logger
.debug(logging_text
+ "Exit")
3669 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
3670 return nslcmop_operation_state
, detailed_status
3672 async def scale(self
, nsr_id
, nslcmop_id
):
3673 # Try to lock HA task here
3674 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
3675 if not task_is_locked_by_me
:
3678 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
3679 stage
= ['', '', '']
3680 tasks_dict_info
= {}
3681 # ^ stage, step, VIM progress
3682 self
.logger
.debug(logging_text
+ "Enter")
3683 # get all needed from database
3685 db_nslcmop_update
= {}
3688 # in case of error, indicates what part of scale was failed to put nsr at error status
3689 scale_process
= None
3690 old_operational_status
= ""
3691 old_config_status
= ""
3694 # wait for any previous tasks in process
3695 step
= "Waiting for previous operations to terminate"
3696 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
3697 self
._write
_ns
_status
(nsr_id
=nsr_id
, ns_state
=None,
3698 current_operation
="SCALING", current_operation_id
=nslcmop_id
)
3700 step
= "Getting nslcmop from database"
3701 self
.logger
.debug(step
+ " after having waited for previous tasks to be completed")
3702 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
3704 step
= "Getting nsr from database"
3705 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3706 old_operational_status
= db_nsr
["operational-status"]
3707 old_config_status
= db_nsr
["config-status"]
3709 step
= "Parsing scaling parameters"
3710 db_nsr_update
["operational-status"] = "scaling"
3711 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3712 nsr_deployed
= db_nsr
["_admin"].get("deployed")
3715 nsr_deployed
= db_nsr
["_admin"].get("deployed")
3716 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
3717 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3718 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3719 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
3722 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
3723 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
3724 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
3725 # for backward compatibility
3726 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
3727 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
3728 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
3729 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3731 step
= "Getting vnfr from database"
3732 db_vnfr
= self
.db
.get_one("vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
})
3734 step
= "Getting vnfd from database"
3735 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
3737 base_folder
= db_vnfd
["_admin"]["storage"]
3739 step
= "Getting scaling-group-descriptor"
3740 scaling_descriptor
= find_in_list(
3744 lambda scale_desc
: scale_desc
["name"] == scaling_group
3746 if not scaling_descriptor
:
3747 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
3748 "at vnfd:scaling-group-descriptor".format(scaling_group
))
3750 step
= "Sending scale order to VIM"
3751 # TODO check if ns is in a proper status
3753 if not db_nsr
["_admin"].get("scaling-group"):
3754 self
.update_db_2("nsrs", nsr_id
, {"_admin.scaling-group": [{"name": scaling_group
, "nb-scale-op": 0}]})
3755 admin_scale_index
= 0
3757 for admin_scale_index
, admin_scale_info
in enumerate(db_nsr
["_admin"]["scaling-group"]):
3758 if admin_scale_info
["name"] == scaling_group
:
3759 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
3761 else: # not found, set index one plus last element and add new entry with the name
3762 admin_scale_index
+= 1
3763 db_nsr_update
["_admin.scaling-group.{}.name".format(admin_scale_index
)] = scaling_group
3764 RO_scaling_info
= []
3765 VCA_scaling_info
= []
3766 vdu_scaling_info
= {"scaling_group_name": scaling_group
, "vdu": []}
3767 if scaling_type
== "SCALE_OUT":
3768 if "aspect-delta-details" not in scaling_descriptor
:
3770 "Aspect delta details not fount in scaling descriptor {}".format(
3771 scaling_descriptor
["name"]
3774 # count if max-instance-count is reached
3775 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
3777 vdu_scaling_info
["scaling_direction"] = "OUT"
3778 vdu_scaling_info
["vdu-create"] = {}
3779 for delta
in deltas
:
3780 for vdu_delta
in delta
["vdu-delta"]:
3781 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
3782 vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
3783 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(vdud
, db_vnfd
)
3785 additional_params
= self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"]) or {}
3786 cloud_init_list
= []
3788 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
3789 max_instance_count
= 10
3790 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
3791 max_instance_count
= vdu_profile
.get("max-number-of-instances", 10)
3793 default_instance_num
= get_number_of_instances(db_vnfd
, vdud
["id"])
3795 nb_scale_op
+= vdu_delta
.get("number-of-instances", 1)
3797 if nb_scale_op
+ default_instance_num
> max_instance_count
:
3799 "reached the limit of {} (max-instance-count) "
3800 "scaling-out operations for the "
3801 "scaling-group-descriptor '{}'".format(nb_scale_op
, scaling_group
)
3803 for x
in range(vdu_delta
.get("number-of-instances", 1)):
3805 # TODO Information of its own ip is not available because db_vnfr is not updated.
3806 additional_params
["OSM"] = get_osm_params(
3811 cloud_init_list
.append(
3812 self
._parse
_cloud
_init
(
3819 VCA_scaling_info
.append(
3821 "osm_vdu_id": vdu_delta
["id"],
3822 "member-vnf-index": vnf_index
,
3824 "vdu_index": vdu_index
+ x
3827 RO_scaling_info
.append(
3829 "osm_vdu_id": vdu_delta
["id"],
3830 "member-vnf-index": vnf_index
,
3832 "count": vdu_delta
.get("number-of-instances", 1)
3836 RO_scaling_info
[-1]["cloud_init"] = cloud_init_list
3837 vdu_scaling_info
["vdu-create"][vdu_delta
["id"]] = vdu_delta
.get("number-of-instances", 1)
3839 elif scaling_type
== "SCALE_IN":
3840 if "min-instance-count" in scaling_descriptor
and scaling_descriptor
["min-instance-count"] is not None:
3841 min_instance_count
= int(scaling_descriptor
["min-instance-count"])
3843 vdu_scaling_info
["scaling_direction"] = "IN"
3844 vdu_scaling_info
["vdu-delete"] = {}
3845 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
3846 for delta
in deltas
:
3847 for vdu_delta
in delta
["vdu-delta"]:
3848 vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
3849 min_instance_count
= 0
3850 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
3851 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
3852 min_instance_count
= vdu_profile
["min-number-of-instances"]
3854 default_instance_num
= get_number_of_instances(db_vnfd
, vdu_delta
["id"])
3856 nb_scale_op
-= vdu_delta
.get("number-of-instances", 1)
3857 if nb_scale_op
+ default_instance_num
< min_instance_count
:
3859 "reached the limit of {} (min-instance-count) scaling-in operations for the "
3860 "scaling-group-descriptor '{}'".format(nb_scale_op
, scaling_group
)
3862 RO_scaling_info
.append({"osm_vdu_id": vdu_delta
["id"], "member-vnf-index": vnf_index
,
3863 "type": "delete", "count": vdu_delta
.get("number-of-instances", 1),
3864 "vdu_index": vdu_index
- 1})
3865 for x
in range(vdu_delta
.get("number-of-instances", 1)):
3866 VCA_scaling_info
.append(
3868 "osm_vdu_id": vdu_delta
["id"],
3869 "member-vnf-index": vnf_index
,
3871 "vdu_index": vdu_index
- 1 - x
3874 vdu_scaling_info
["vdu-delete"][vdu_delta
["id"]] = vdu_delta
.get("number-of-instances", 1)
3876 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
3877 vdu_delete
= copy(vdu_scaling_info
.get("vdu-delete"))
3878 if vdu_scaling_info
["scaling_direction"] == "IN":
3879 for vdur
in reversed(db_vnfr
["vdur"]):
3880 if vdu_delete
.get(vdur
["vdu-id-ref"]):
3881 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
3882 vdu_scaling_info
["vdu"].append({
3883 "name": vdur
.get("name") or vdur
.get("vdu-name"),
3884 "vdu_id": vdur
["vdu-id-ref"],
3887 for interface
in vdur
["interfaces"]:
3888 vdu_scaling_info
["vdu"][-1]["interface"].append({
3889 "name": interface
["name"],
3890 "ip_address": interface
["ip-address"],
3891 "mac_address": interface
.get("mac-address"),
3893 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
3896 step
= "Executing pre-scale vnf-config-primitive"
3897 if scaling_descriptor
.get("scaling-config-action"):
3898 for scaling_config_action
in scaling_descriptor
["scaling-config-action"]:
3899 if (scaling_config_action
.get("trigger") == "pre-scale-in" and scaling_type
== "SCALE_IN") \
3900 or (scaling_config_action
.get("trigger") == "pre-scale-out" and scaling_type
== "SCALE_OUT"):
3901 vnf_config_primitive
= scaling_config_action
["vnf-config-primitive-name-ref"]
3902 step
= db_nslcmop_update
["detailed-status"] = \
3903 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive
)
3905 # look for primitive
3906 for config_primitive
in (get_configuration(
3907 db_vnfd
, db_vnfd
["id"]
3908 ) or {}).get("config-primitive", ()):
3909 if config_primitive
["name"] == vnf_config_primitive
:
3913 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
3914 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
3915 "primitive".format(scaling_group
, vnf_config_primitive
))
3917 vnfr_params
= {"VDU_SCALE_INFO": vdu_scaling_info
}
3918 if db_vnfr
.get("additionalParamsForVnf"):
3919 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
3921 scale_process
= "VCA"
3922 db_nsr_update
["config-status"] = "configuring pre-scaling"
3923 primitive_params
= self
._map
_primitive
_params
(config_primitive
, {}, vnfr_params
)
3925 # Pre-scale retry check: Check if this sub-operation has been executed before
3926 op_index
= self
._check
_or
_add
_scale
_suboperation
(
3927 db_nslcmop
, nslcmop_id
, vnf_index
, vnf_config_primitive
, primitive_params
, 'PRE-SCALE')
3928 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
3929 # Skip sub-operation
3930 result
= 'COMPLETED'
3931 result_detail
= 'Done'
3932 self
.logger
.debug(logging_text
+
3933 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
3934 vnf_config_primitive
, result
, result_detail
))
3936 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
3937 # New sub-operation: Get index of this sub-operation
3938 op_index
= len(db_nslcmop
.get('_admin', {}).get('operations')) - 1
3939 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} New sub-operation".
3940 format(vnf_config_primitive
))
3942 # retry: Get registered params for this existing sub-operation
3943 op
= db_nslcmop
.get('_admin', {}).get('operations', [])[op_index
]
3944 vnf_index
= op
.get('member_vnf_index')
3945 vnf_config_primitive
= op
.get('primitive')
3946 primitive_params
= op
.get('primitive_params')
3947 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Sub-operation retry".
3948 format(vnf_config_primitive
))
3949 # Execute the primitive, either with new (first-time) or registered (reintent) args
3950 ee_descriptor_id
= config_primitive
.get("execution-environment-ref")
3951 primitive_name
= config_primitive
.get("execution-environment-primitive",
3952 vnf_config_primitive
)
3953 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(nsr_deployed
["VCA"],
3954 member_vnf_index
=vnf_index
,
3956 vdu_count_index
=None,
3957 ee_descriptor_id
=ee_descriptor_id
)
3958 result
, result_detail
= await self
._ns
_execute
_primitive
(
3959 ee_id
, primitive_name
, primitive_params
, vca_type
=vca_type
)
3960 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Done with result {} {}".format(
3961 vnf_config_primitive
, result
, result_detail
))
3962 # Update operationState = COMPLETED | FAILED
3963 self
._update
_suboperation
_status
(
3964 db_nslcmop
, op_index
, result
, result_detail
)
3966 if result
== "FAILED":
3967 raise LcmException(result_detail
)
3968 db_nsr_update
["config-status"] = old_config_status
3969 scale_process
= None
3972 db_nsr_update
["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)] = nb_scale_op
3973 db_nsr_update
["_admin.scaling-group.{}.time".format(admin_scale_index
)] = time()
3975 # SCALE-IN VCA - BEGIN
3976 if VCA_scaling_info
:
3977 step
= db_nslcmop_update
["detailed-status"] = \
3978 "Deleting the execution environments"
3979 scale_process
= "VCA"
3980 for vdu_info
in VCA_scaling_info
:
3981 if vdu_info
["type"] == "delete":
3982 member_vnf_index
= str(vdu_info
["member-vnf-index"])
3983 self
.logger
.debug(logging_text
+ "vdu info: {}".format(vdu_info
))
3984 vdu_id
= vdu_info
["osm_vdu_id"]
3985 vdu_index
= int(vdu_info
["vdu_index"])
3986 stage
[1] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
3987 member_vnf_index
, vdu_id
, vdu_index
)
3988 stage
[2] = step
= "Scaling in VCA"
3989 self
._write
_op
_status
(
3993 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
3994 config_update
= db_nsr
["configurationStatus"]
3995 for vca_index
, vca
in enumerate(vca_update
):
3996 if (vca
or vca
.get("ee_id")) and vca
["member-vnf-index"] == member_vnf_index
and \
3997 vca
["vdu_count_index"] == vdu_index
:
3998 if vca
.get("vdu_id"):
3999 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4000 elif vca
.get("kdu_name"):
4001 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4003 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4004 operation_params
= db_nslcmop
.get("operationParams") or {}
4005 exec_terminate_primitives
= (not operation_params
.get("skip_terminate_primitives") and
4006 vca
.get("needed_terminate"))
4007 task
= asyncio
.ensure_future(asyncio
.wait_for(
4008 self
.destroy_N2VC(logging_text
, db_nslcmop
, vca
, config_descriptor
,
4009 vca_index
, destroy_ee
=True,
4010 exec_primitives
=exec_terminate_primitives
,
4011 scaling_in
=True), timeout
=self
.timeout_charm_delete
))
4012 # wait before next removal
4013 await asyncio
.sleep(30)
4014 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4015 del vca_update
[vca_index
]
4016 del config_update
[vca_index
]
4017 # wait for pending tasks of terminate primitives
4019 self
.logger
.debug(logging_text
+
4020 'Waiting for tasks {}'.format(list(tasks_dict_info
.keys())))
4021 error_list
= await self
._wait
_for
_tasks
(logging_text
, tasks_dict_info
,
4022 min(self
.timeout_charm_delete
,
4023 self
.timeout_ns_terminate
),
4025 tasks_dict_info
.clear()
4027 raise LcmException("; ".join(error_list
))
4029 db_vca_and_config_update
= {
4030 "_admin.deployed.VCA": vca_update
,
4031 "configurationStatus": config_update
4033 self
.update_db_2("nsrs", db_nsr
["_id"], db_vca_and_config_update
)
4034 scale_process
= None
4035 # SCALE-IN VCA - END
4039 scale_process
= "RO"
4040 if self
.ro_config
.get("ng"):
4041 await self
._scale
_ng
_ro
(logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
)
4042 vdu_scaling_info
.pop("vdu-create", None)
4043 vdu_scaling_info
.pop("vdu-delete", None)
4045 scale_process
= None
4047 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4050 # SCALE-UP VCA - BEGIN
4051 if VCA_scaling_info
:
4052 step
= db_nslcmop_update
["detailed-status"] = \
4053 "Creating new execution environments"
4054 scale_process
= "VCA"
4055 for vdu_info
in VCA_scaling_info
:
4056 if vdu_info
["type"] == "create":
4057 member_vnf_index
= str(vdu_info
["member-vnf-index"])
4058 self
.logger
.debug(logging_text
+ "vdu info: {}".format(vdu_info
))
4059 vnfd_id
= db_vnfr
["vnfd-ref"]
4060 vdu_index
= int(vdu_info
["vdu_index"])
4061 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
4062 if db_vnfr
.get("additionalParamsForVnf"):
4063 deploy_params
.update(parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy()))
4064 descriptor_config
= get_configuration(db_vnfd
, db_vnfd
["id"])
4065 if descriptor_config
:
4070 logging_text
=logging_text
+ "member_vnf_index={} ".format(member_vnf_index
),
4073 nslcmop_id
=nslcmop_id
,
4079 member_vnf_index
=member_vnf_index
,
4080 vdu_index
=vdu_index
,
4082 deploy_params
=deploy_params
,
4083 descriptor_config
=descriptor_config
,
4084 base_folder
=base_folder
,
4085 task_instantiation_info
=tasks_dict_info
,
4088 vdu_id
= vdu_info
["osm_vdu_id"]
4089 vdur
= find_in_list(db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
)
4090 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
4091 if vdur
.get("additionalParams"):
4092 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
4094 deploy_params_vdu
= deploy_params
4095 deploy_params_vdu
["OSM"] = get_osm_params(db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
)
4096 if descriptor_config
:
4099 stage
[1] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
4100 member_vnf_index
, vdu_id
, vdu_index
)
4101 stage
[2] = step
= "Scaling out VCA"
4102 self
._write
_op
_status
(
4107 logging_text
=logging_text
+ "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
4108 member_vnf_index
, vdu_id
, vdu_index
),
4111 nslcmop_id
=nslcmop_id
,
4117 member_vnf_index
=member_vnf_index
,
4118 vdu_index
=vdu_index
,
4120 deploy_params
=deploy_params_vdu
,
4121 descriptor_config
=descriptor_config
,
4122 base_folder
=base_folder
,
4123 task_instantiation_info
=tasks_dict_info
,
4126 # TODO: scaling for kdu is not implemented yet.
4127 kdu_name
= vdu_info
["osm_vdu_id"]
4128 descriptor_config
= get_configuration(db_vnfd
, kdu_name
)
4129 if descriptor_config
:
4131 vdu_index
= vdu_index
4133 kdur
= next(x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
)
4134 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
4135 if kdur
.get("additionalParams"):
4136 deploy_params_kdu
= parse_yaml_strings(kdur
["additionalParams"])
4139 logging_text
=logging_text
,
4142 nslcmop_id
=nslcmop_id
,
4148 member_vnf_index
=member_vnf_index
,
4149 vdu_index
=vdu_index
,
4151 deploy_params
=deploy_params_kdu
,
4152 descriptor_config
=descriptor_config
,
4153 base_folder
=base_folder
,
4154 task_instantiation_info
=tasks_dict_info
,
4157 # SCALE-UP VCA - END
4158 scale_process
= None
4161 # execute primitive service POST-SCALING
4162 step
= "Executing post-scale vnf-config-primitive"
4163 if scaling_descriptor
.get("scaling-config-action"):
4164 for scaling_config_action
in scaling_descriptor
["scaling-config-action"]:
4165 if (scaling_config_action
.get("trigger") == "post-scale-in" and scaling_type
== "SCALE_IN") \
4166 or (scaling_config_action
.get("trigger") == "post-scale-out" and scaling_type
== "SCALE_OUT"):
4167 vnf_config_primitive
= scaling_config_action
["vnf-config-primitive-name-ref"]
4168 step
= db_nslcmop_update
["detailed-status"] = \
4169 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive
)
4171 vnfr_params
= {"VDU_SCALE_INFO": vdu_scaling_info
}
4172 if db_vnfr
.get("additionalParamsForVnf"):
4173 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
4175 # look for primitive
4176 for config_primitive
in (
4177 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
4178 ).get("config-primitive", ()):
4179 if config_primitive
["name"] == vnf_config_primitive
:
4183 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
4184 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
4185 "config-primitive".format(scaling_group
, vnf_config_primitive
))
4186 scale_process
= "VCA"
4187 db_nsr_update
["config-status"] = "configuring post-scaling"
4188 primitive_params
= self
._map
_primitive
_params
(config_primitive
, {}, vnfr_params
)
4190 # Post-scale retry check: Check if this sub-operation has been executed before
4191 op_index
= self
._check
_or
_add
_scale
_suboperation
(
4192 db_nslcmop
, nslcmop_id
, vnf_index
, vnf_config_primitive
, primitive_params
, 'POST-SCALE')
4193 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
4194 # Skip sub-operation
4195 result
= 'COMPLETED'
4196 result_detail
= 'Done'
4197 self
.logger
.debug(logging_text
+
4198 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
4199 format(vnf_config_primitive
, result
, result_detail
))
4201 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
4202 # New sub-operation: Get index of this sub-operation
4203 op_index
= len(db_nslcmop
.get('_admin', {}).get('operations')) - 1
4204 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} New sub-operation".
4205 format(vnf_config_primitive
))
4207 # retry: Get registered params for this existing sub-operation
4208 op
= db_nslcmop
.get('_admin', {}).get('operations', [])[op_index
]
4209 vnf_index
= op
.get('member_vnf_index')
4210 vnf_config_primitive
= op
.get('primitive')
4211 primitive_params
= op
.get('primitive_params')
4212 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Sub-operation retry".
4213 format(vnf_config_primitive
))
4214 # Execute the primitive, either with new (first-time) or registered (reintent) args
4215 ee_descriptor_id
= config_primitive
.get("execution-environment-ref")
4216 primitive_name
= config_primitive
.get("execution-environment-primitive",
4217 vnf_config_primitive
)
4218 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(nsr_deployed
["VCA"],
4219 member_vnf_index
=vnf_index
,
4221 vdu_count_index
=None,
4222 ee_descriptor_id
=ee_descriptor_id
)
4223 result
, result_detail
= await self
._ns
_execute
_primitive
(
4224 ee_id
, primitive_name
, primitive_params
, vca_type
=vca_type
)
4225 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Done with result {} {}".format(
4226 vnf_config_primitive
, result
, result_detail
))
4227 # Update operationState = COMPLETED | FAILED
4228 self
._update
_suboperation
_status
(
4229 db_nslcmop
, op_index
, result
, result_detail
)
4231 if result
== "FAILED":
4232 raise LcmException(result_detail
)
4233 db_nsr_update
["config-status"] = old_config_status
4234 scale_process
= None
4237 db_nsr_update
["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
4238 db_nsr_update
["operational-status"] = "running" if old_operational_status
== "failed" \
4239 else old_operational_status
4240 db_nsr_update
["config-status"] = old_config_status
4242 except (ROclient
.ROClientException
, DbException
, LcmException
, NgRoException
) as e
:
4243 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4245 except asyncio
.CancelledError
:
4246 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(step
))
4247 exc
= "Operation was cancelled"
4248 except Exception as e
:
4249 exc
= traceback
.format_exc()
4250 self
.logger
.critical(logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
4252 self
._write
_ns
_status
(nsr_id
=nsr_id
, ns_state
=None, current_operation
="IDLE", current_operation_id
=None)
4254 stage
[1] = "Waiting for instantiate pending tasks."
4255 self
.logger
.debug(logging_text
+ stage
[1])
4256 exc
= await self
._wait
_for
_tasks
(logging_text
, tasks_dict_info
, self
.timeout_ns_deploy
,
4257 stage
, nslcmop_id
, nsr_id
=nsr_id
)
4259 db_nslcmop_update
["detailed-status"] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
4260 nslcmop_operation_state
= "FAILED"
4262 db_nsr_update
["operational-status"] = old_operational_status
4263 db_nsr_update
["config-status"] = old_config_status
4264 db_nsr_update
["detailed-status"] = ""
4266 if "VCA" in scale_process
:
4267 db_nsr_update
["config-status"] = "failed"
4268 if "RO" in scale_process
:
4269 db_nsr_update
["operational-status"] = "failed"
4270 db_nsr_update
["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id
, step
,
4273 error_description_nslcmop
= None
4274 nslcmop_operation_state
= "COMPLETED"
4275 db_nslcmop_update
["detailed-status"] = "Done"
4277 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
="", error_message
=error_description_nslcmop
,
4278 operation_state
=nslcmop_operation_state
, other_update
=db_nslcmop_update
)
4280 self
._write
_ns
_status
(nsr_id
=nsr_id
, ns_state
=None, current_operation
="IDLE",
4281 current_operation_id
=None, other_update
=db_nsr_update
)
4283 if nslcmop_operation_state
:
4285 msg
= {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
, "operationState": nslcmop_operation_state
}
4286 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
4287 except Exception as e
:
4288 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
4289 self
.logger
.debug(logging_text
+ "Exit")
4290 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
4292 async def _scale_ng_ro(self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
):
4293 nsr_id
= db_nslcmop
["nsInstanceId"]
4294 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
4297 # read from db: vnfd's for every vnf
4300 # for each vnf in ns, read vnfd
4301 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
4302 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
4303 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
4304 # if we haven't this vnfd, read it from db
4305 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
4307 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4308 db_vnfds
.append(vnfd
)
4309 n2vc_key
= self
.n2vc
.get_public_key()
4310 n2vc_key_list
= [n2vc_key
]
4311 self
.scale_vnfr(db_vnfr
, vdu_scaling_info
.get("vdu-create"), vdu_scaling_info
.get("vdu-delete"),
4313 # db_vnfr has been updated, update db_vnfrs to use it
4314 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
4315 await self
._instantiate
_ng
_ro
(logging_text
, nsr_id
, db_nsd
, db_nsr
, db_nslcmop
, db_vnfrs
,
4316 db_vnfds
, n2vc_key_list
, stage
=stage
, start_deploy
=time(),
4317 timeout_ns_deploy
=self
.timeout_ns_deploy
)
4318 if vdu_scaling_info
.get("vdu-delete"):
4319 self
.scale_vnfr(db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False)
4321 async def add_prometheus_metrics(self
, ee_id
, artifact_path
, ee_config_descriptor
, vnfr_id
, nsr_id
, target_ip
):
4322 if not self
.prometheus
:
4324 # look if exist a file called 'prometheus*.j2' and
4325 artifact_content
= self
.fs
.dir_ls(artifact_path
)
4326 job_file
= next((f
for f
in artifact_content
if f
.startswith("prometheus") and f
.endswith(".j2")), None)
4329 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
4333 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
4334 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
4336 vnfr_id
= vnfr_id
.replace("-", "")
4338 "JOB_NAME": vnfr_id
,
4339 "TARGET_IP": target_ip
,
4340 "EXPORTER_POD_IP": host_name
,
4341 "EXPORTER_POD_PORT": host_port
,
4343 job_list
= self
.prometheus
.parse_job(job_data
, variables
)
4344 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
4345 for job
in job_list
:
4346 if not isinstance(job
.get("job_name"), str) or vnfr_id
not in job
["job_name"]:
4347 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
4348 job
["nsr_id"] = nsr_id
4349 job_dict
= {jl
["job_name"]: jl
for jl
in job_list
}
4350 if await self
.prometheus
.update(job_dict
):
4351 return list(job_dict
.keys())
4353 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
4355 Get VCA Cloud and VCA Cloud Credentials for the VIM account
4357 :param: vim_account_id: VIM Account ID
4359 :return: (cloud_name, cloud_credential)
4361 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
4362 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
4364 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
4366 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
4368 :param: vim_account_id: VIM Account ID
4370 :return: (cloud_name, cloud_credential)
4372 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
4373 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")