Extracting Ns._process_vdu_params() and creating unit test
[osm/RO.git] / NG-RO / osm_ng_ro / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 ##
18
19 from http import HTTPStatus
20 import logging
21 from random import choice as random_choice
22 from threading import Lock
23 from time import time
24 from traceback import format_exc as traceback_format_exc
25 from typing import Any, Dict, Tuple, Type
26 from uuid import uuid4
27
28 from cryptography.hazmat.backends import default_backend as crypto_default_backend
29 from cryptography.hazmat.primitives import serialization as crypto_serialization
30 from cryptography.hazmat.primitives.asymmetric import rsa
31 from jinja2 import (
32 Environment,
33 StrictUndefined,
34 TemplateError,
35 TemplateNotFound,
36 UndefinedError,
37 )
38 from osm_common import (
39 dbmemory,
40 dbmongo,
41 fslocal,
42 fsmongo,
43 msgkafka,
44 msglocal,
45 version as common_version,
46 )
47 from osm_common.dbbase import DbBase, DbException
48 from osm_common.fsbase import FsBase, FsException
49 from osm_common.msgbase import MsgException
50 from osm_ng_ro.ns_thread import deep_get, NsWorker, NsWorkerException
51 from osm_ng_ro.validation import deploy_schema, validate_input
52
53 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
54 min_common_version = "0.1.16"
55
56
57 class NsException(Exception):
58 def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST):
59 self.http_code = http_code
60 super(Exception, self).__init__(message)
61
62
63 def get_process_id():
64 """
65 Obtain a unique ID for this process. If running from inside docker, it will get docker ID. If not it
66 will provide a random one
67 :return: Obtained ID
68 """
69 # Try getting docker id. If fails, get pid
70 try:
71 with open("/proc/self/cgroup", "r") as f:
72 text_id_ = f.readline()
73 _, _, text_id = text_id_.rpartition("/")
74 text_id = text_id.replace("\n", "")[:12]
75
76 if text_id:
77 return text_id
78 except Exception:
79 pass
80
81 # Return a random id
82 return "".join(random_choice("0123456789abcdef") for _ in range(12))
83
84
85 def versiontuple(v):
86 """utility for compare dot separate versions. Fills with zeros to proper number comparison"""
87 filled = []
88
89 for point in v.split("."):
90 filled.append(point.zfill(8))
91
92 return tuple(filled)
93
94
95 class Ns(object):
96 def __init__(self):
97 self.db = None
98 self.fs = None
99 self.msg = None
100 self.config = None
101 # self.operations = None
102 self.logger = None
103 # ^ Getting logger inside method self.start because parent logger (ro) is not available yet.
104 # If done now it will not be linked to parent not getting its handler and level
105 self.map_topic = {}
106 self.write_lock = None
107 self.vims_assigned = {}
108 self.next_worker = 0
109 self.plugins = {}
110 self.workers = []
111
112 def init_db(self, target_version):
113 pass
114
115 def start(self, config):
116 """
117 Connect to database, filesystem storage, and messaging
118 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
119 :param config: Configuration of db, storage, etc
120 :return: None
121 """
122 self.config = config
123 self.config["process_id"] = get_process_id() # used for HA identity
124 self.logger = logging.getLogger("ro.ns")
125
126 # check right version of common
127 if versiontuple(common_version) < versiontuple(min_common_version):
128 raise NsException(
129 "Not compatible osm/common version '{}'. Needed '{}' or higher".format(
130 common_version, min_common_version
131 )
132 )
133
134 try:
135 if not self.db:
136 if config["database"]["driver"] == "mongo":
137 self.db = dbmongo.DbMongo()
138 self.db.db_connect(config["database"])
139 elif config["database"]["driver"] == "memory":
140 self.db = dbmemory.DbMemory()
141 self.db.db_connect(config["database"])
142 else:
143 raise NsException(
144 "Invalid configuration param '{}' at '[database]':'driver'".format(
145 config["database"]["driver"]
146 )
147 )
148
149 if not self.fs:
150 if config["storage"]["driver"] == "local":
151 self.fs = fslocal.FsLocal()
152 self.fs.fs_connect(config["storage"])
153 elif config["storage"]["driver"] == "mongo":
154 self.fs = fsmongo.FsMongo()
155 self.fs.fs_connect(config["storage"])
156 elif config["storage"]["driver"] is None:
157 pass
158 else:
159 raise NsException(
160 "Invalid configuration param '{}' at '[storage]':'driver'".format(
161 config["storage"]["driver"]
162 )
163 )
164
165 if not self.msg:
166 if config["message"]["driver"] == "local":
167 self.msg = msglocal.MsgLocal()
168 self.msg.connect(config["message"])
169 elif config["message"]["driver"] == "kafka":
170 self.msg = msgkafka.MsgKafka()
171 self.msg.connect(config["message"])
172 else:
173 raise NsException(
174 "Invalid configuration param '{}' at '[message]':'driver'".format(
175 config["message"]["driver"]
176 )
177 )
178
179 # TODO load workers to deal with exising database tasks
180
181 self.write_lock = Lock()
182 except (DbException, FsException, MsgException) as e:
183 raise NsException(str(e), http_code=e.http_code)
184
185 def get_assigned_vims(self):
186 return list(self.vims_assigned.keys())
187
188 def stop(self):
189 try:
190 if self.db:
191 self.db.db_disconnect()
192
193 if self.fs:
194 self.fs.fs_disconnect()
195
196 if self.msg:
197 self.msg.disconnect()
198
199 self.write_lock = None
200 except (DbException, FsException, MsgException) as e:
201 raise NsException(str(e), http_code=e.http_code)
202
203 for worker in self.workers:
204 worker.insert_task(("terminate",))
205
206 def _create_worker(self):
207 """
208 Look for a worker thread in idle status. If not found it creates one unless the number of threads reach the
209 limit of 'server.ns_threads' configuration. If reached, it just assigns one existing thread
210 return the index of the assigned worker thread. Worker threads are storead at self.workers
211 """
212 # Look for a thread in idle status
213 worker_id = next(
214 (
215 i
216 for i in range(len(self.workers))
217 if self.workers[i] and self.workers[i].idle
218 ),
219 None,
220 )
221
222 if worker_id is not None:
223 # unset idle status to avoid race conditions
224 self.workers[worker_id].idle = False
225 else:
226 worker_id = len(self.workers)
227
228 if worker_id < self.config["global"]["server.ns_threads"]:
229 # create a new worker
230 self.workers.append(
231 NsWorker(worker_id, self.config, self.plugins, self.db)
232 )
233 self.workers[worker_id].start()
234 else:
235 # reached maximum number of threads, assign VIM to an existing one
236 worker_id = self.next_worker
237 self.next_worker = (self.next_worker + 1) % self.config["global"][
238 "server.ns_threads"
239 ]
240
241 return worker_id
242
243 def assign_vim(self, target_id):
244 with self.write_lock:
245 return self._assign_vim(target_id)
246
247 def _assign_vim(self, target_id):
248 if target_id not in self.vims_assigned:
249 worker_id = self.vims_assigned[target_id] = self._create_worker()
250 self.workers[worker_id].insert_task(("load_vim", target_id))
251
252 def reload_vim(self, target_id):
253 # send reload_vim to the thread working with this VIM and inform all that a VIM has been changed,
254 # this is because database VIM information is cached for threads working with SDN
255 with self.write_lock:
256 for worker in self.workers:
257 if worker and not worker.idle:
258 worker.insert_task(("reload_vim", target_id))
259
260 def unload_vim(self, target_id):
261 with self.write_lock:
262 return self._unload_vim(target_id)
263
264 def _unload_vim(self, target_id):
265 if target_id in self.vims_assigned:
266 worker_id = self.vims_assigned[target_id]
267 self.workers[worker_id].insert_task(("unload_vim", target_id))
268 del self.vims_assigned[target_id]
269
270 def check_vim(self, target_id):
271 with self.write_lock:
272 if target_id in self.vims_assigned:
273 worker_id = self.vims_assigned[target_id]
274 else:
275 worker_id = self._create_worker()
276
277 worker = self.workers[worker_id]
278 worker.insert_task(("check_vim", target_id))
279
280 def unload_unused_vims(self):
281 with self.write_lock:
282 vims_to_unload = []
283
284 for target_id in self.vims_assigned:
285 if not self.db.get_one(
286 "ro_tasks",
287 q_filter={
288 "target_id": target_id,
289 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
290 },
291 fail_on_empty=False,
292 ):
293 vims_to_unload.append(target_id)
294
295 for target_id in vims_to_unload:
296 self._unload_vim(target_id)
297
298 @staticmethod
299 def _get_cloud_init(
300 db: Type[DbBase],
301 fs: Type[FsBase],
302 location: str,
303 ) -> str:
304 """This method reads cloud init from a file.
305
306 Note: Not used as cloud init content is provided in the http body.
307
308 Args:
309 db (Type[DbBase]): [description]
310 fs (Type[FsBase]): [description]
311 location (str): can be 'vnfr_id:file:file_name' or 'vnfr_id:vdu:vdu_idex'
312
313 Raises:
314 NsException: [description]
315 NsException: [description]
316
317 Returns:
318 str: [description]
319 """
320 vnfd_id, _, other = location.partition(":")
321 _type, _, name = other.partition(":")
322 vnfd = db.get_one("vnfds", {"_id": vnfd_id})
323
324 if _type == "file":
325 base_folder = vnfd["_admin"]["storage"]
326 cloud_init_file = "{}/{}/cloud_init/{}".format(
327 base_folder["folder"], base_folder["pkg-dir"], name
328 )
329
330 if not fs:
331 raise NsException(
332 "Cannot read file '{}'. Filesystem not loaded, change configuration at storage.driver".format(
333 cloud_init_file
334 )
335 )
336
337 with fs.file_open(cloud_init_file, "r") as ci_file:
338 cloud_init_content = ci_file.read()
339 elif _type == "vdu":
340 cloud_init_content = vnfd["vdu"][int(name)]["cloud-init"]
341 else:
342 raise NsException("Mismatch descriptor for cloud init: {}".format(location))
343
344 return cloud_init_content
345
346 @staticmethod
347 def _parse_jinja2(
348 cloud_init_content: str,
349 params: Dict[str, Any],
350 context: str,
351 ) -> str:
352 """Function that processes the cloud init to replace Jinja2 encoded parameters.
353
354 Args:
355 cloud_init_content (str): [description]
356 params (Dict[str, Any]): [description]
357 context (str): [description]
358
359 Raises:
360 NsException: [description]
361 NsException: [description]
362
363 Returns:
364 str: [description]
365 """
366 try:
367 env = Environment(undefined=StrictUndefined)
368 template = env.from_string(cloud_init_content)
369
370 return template.render(params or {})
371 except UndefinedError as e:
372 raise NsException(
373 "Variable '{}' defined at vnfd='{}' must be provided in the instantiation parameters"
374 "inside the 'additionalParamsForVnf' block".format(e, context)
375 )
376 except (TemplateError, TemplateNotFound) as e:
377 raise NsException(
378 "Error parsing Jinja2 to cloud-init content at vnfd='{}': {}".format(
379 context, e
380 )
381 )
382
383 def _create_db_ro_nsrs(self, nsr_id, now):
384 try:
385 key = rsa.generate_private_key(
386 backend=crypto_default_backend(), public_exponent=65537, key_size=2048
387 )
388 private_key = key.private_bytes(
389 crypto_serialization.Encoding.PEM,
390 crypto_serialization.PrivateFormat.PKCS8,
391 crypto_serialization.NoEncryption(),
392 )
393 public_key = key.public_key().public_bytes(
394 crypto_serialization.Encoding.OpenSSH,
395 crypto_serialization.PublicFormat.OpenSSH,
396 )
397 private_key = private_key.decode("utf8")
398 # Change first line because Paramiko needs a explicit start with 'BEGIN RSA PRIVATE KEY'
399 i = private_key.find("\n")
400 private_key = "-----BEGIN RSA PRIVATE KEY-----" + private_key[i:]
401 public_key = public_key.decode("utf8")
402 except Exception as e:
403 raise NsException("Cannot create ssh-keys: {}".format(e))
404
405 schema_version = "1.1"
406 private_key_encrypted = self.db.encrypt(
407 private_key, schema_version=schema_version, salt=nsr_id
408 )
409 db_content = {
410 "_id": nsr_id,
411 "_admin": {
412 "created": now,
413 "modified": now,
414 "schema_version": schema_version,
415 },
416 "public_key": public_key,
417 "private_key": private_key_encrypted,
418 "actions": [],
419 }
420 self.db.create("ro_nsrs", db_content)
421
422 return db_content
423
424 @staticmethod
425 def _create_task(
426 deployment_info: Dict[str, Any],
427 target_id: str,
428 item: str,
429 action: str,
430 target_record: str,
431 target_record_id: str,
432 extra_dict: Dict[str, Any] = None,
433 ) -> Dict[str, Any]:
434 """Function to create task dict from deployment information.
435
436 Args:
437 deployment_info (Dict[str, Any]): [description]
438 target_id (str): [description]
439 item (str): [description]
440 action (str): [description]
441 target_record (str): [description]
442 target_record_id (str): [description]
443 extra_dict (Dict[str, Any], optional): [description]. Defaults to None.
444
445 Returns:
446 Dict[str, Any]: [description]
447 """
448 task = {
449 "target_id": target_id, # it will be removed before pushing at database
450 "action_id": deployment_info.get("action_id"),
451 "nsr_id": deployment_info.get("nsr_id"),
452 "task_id": f"{deployment_info.get('action_id')}:{deployment_info.get('task_index')}",
453 "status": "SCHEDULED",
454 "action": action,
455 "item": item,
456 "target_record": target_record,
457 "target_record_id": target_record_id,
458 }
459
460 if extra_dict:
461 task.update(extra_dict) # params, find_params, depends_on
462
463 deployment_info["task_index"] = deployment_info.get("task_index", 0) + 1
464
465 return task
466
467 @staticmethod
468 def _create_ro_task(
469 target_id: str,
470 task: Dict[str, Any],
471 ) -> Dict[str, Any]:
472 """Function to create an RO task from task information.
473
474 Args:
475 target_id (str): [description]
476 task (Dict[str, Any]): [description]
477
478 Returns:
479 Dict[str, Any]: [description]
480 """
481 now = time()
482
483 _id = task.get("task_id")
484 db_ro_task = {
485 "_id": _id,
486 "locked_by": None,
487 "locked_at": 0.0,
488 "target_id": target_id,
489 "vim_info": {
490 "created": False,
491 "created_items": None,
492 "vim_id": None,
493 "vim_name": None,
494 "vim_status": None,
495 "vim_details": None,
496 "refresh_at": None,
497 },
498 "modified_at": now,
499 "created_at": now,
500 "to_check_at": now,
501 "tasks": [task],
502 }
503
504 return db_ro_task
505
506 @staticmethod
507 def _process_image_params(
508 target_image: Dict[str, Any],
509 indata: Dict[str, Any],
510 vim_info: Dict[str, Any],
511 target_record_id: str,
512 **kwargs: Dict[str, Any],
513 ) -> Dict[str, Any]:
514 """Function to process VDU image parameters.
515
516 Args:
517 target_image (Dict[str, Any]): [description]
518 indata (Dict[str, Any]): [description]
519 vim_info (Dict[str, Any]): [description]
520 target_record_id (str): [description]
521
522 Returns:
523 Dict[str, Any]: [description]
524 """
525 find_params = {}
526
527 if target_image.get("image"):
528 find_params["filter_dict"] = {"name": target_image.get("image")}
529
530 if target_image.get("vim_image_id"):
531 find_params["filter_dict"] = {"id": target_image.get("vim_image_id")}
532
533 if target_image.get("image_checksum"):
534 find_params["filter_dict"] = {
535 "checksum": target_image.get("image_checksum")
536 }
537
538 return {"find_params": find_params}
539
540 @staticmethod
541 def _get_resource_allocation_params(
542 quota_descriptor: Dict[str, Any],
543 ) -> Dict[str, Any]:
544 """Read the quota_descriptor from vnfd and fetch the resource allocation properties from the
545 descriptor object.
546
547 Args:
548 quota_descriptor (Dict[str, Any]): cpu/mem/vif/disk-io quota descriptor
549
550 Returns:
551 Dict[str, Any]: quota params for limit, reserve, shares from the descriptor object
552 """
553 quota = {}
554
555 if quota_descriptor.get("limit"):
556 quota["limit"] = int(quota_descriptor["limit"])
557
558 if quota_descriptor.get("reserve"):
559 quota["reserve"] = int(quota_descriptor["reserve"])
560
561 if quota_descriptor.get("shares"):
562 quota["shares"] = int(quota_descriptor["shares"])
563
564 return quota
565
566 @staticmethod
567 def _process_guest_epa_quota_params(
568 guest_epa_quota: Dict[str, Any],
569 epa_vcpu_set: bool,
570 ) -> Dict[str, Any]:
571 """Function to extract the guest epa quota parameters.
572
573 Args:
574 guest_epa_quota (Dict[str, Any]): [description]
575 epa_vcpu_set (bool): [description]
576
577 Returns:
578 Dict[str, Any]: [description]
579 """
580 result = {}
581
582 if guest_epa_quota.get("cpu-quota") and not epa_vcpu_set:
583 cpuquota = Ns._get_resource_allocation_params(
584 guest_epa_quota.get("cpu-quota")
585 )
586
587 if cpuquota:
588 result["cpu-quota"] = cpuquota
589
590 if guest_epa_quota.get("mem-quota"):
591 vduquota = Ns._get_resource_allocation_params(
592 guest_epa_quota.get("mem-quota")
593 )
594
595 if vduquota:
596 result["mem-quota"] = vduquota
597
598 if guest_epa_quota.get("disk-io-quota"):
599 diskioquota = Ns._get_resource_allocation_params(
600 guest_epa_quota.get("disk-io-quota")
601 )
602
603 if diskioquota:
604 result["disk-io-quota"] = diskioquota
605
606 if guest_epa_quota.get("vif-quota"):
607 vifquota = Ns._get_resource_allocation_params(
608 guest_epa_quota.get("vif-quota")
609 )
610
611 if vifquota:
612 result["vif-quota"] = vifquota
613
614 return result
615
616 @staticmethod
617 def _process_guest_epa_numa_params(
618 guest_epa_quota: Dict[str, Any],
619 ) -> Tuple[Dict[str, Any], bool]:
620 """[summary]
621
622 Args:
623 guest_epa_quota (Dict[str, Any]): [description]
624
625 Returns:
626 Tuple[Dict[str, Any], bool]: [description]
627 """
628 numa = {}
629 epa_vcpu_set = False
630
631 if guest_epa_quota.get("numa-node-policy"):
632 numa_node_policy = guest_epa_quota.get("numa-node-policy")
633
634 if numa_node_policy.get("node"):
635 numa_node = numa_node_policy["node"][0]
636
637 if numa_node.get("num-cores"):
638 numa["cores"] = numa_node["num-cores"]
639 epa_vcpu_set = True
640
641 paired_threads = numa_node.get("paired-threads", {})
642 if paired_threads.get("num-paired-threads"):
643 numa["paired-threads"] = int(
644 numa_node["paired-threads"]["num-paired-threads"]
645 )
646 epa_vcpu_set = True
647
648 if paired_threads.get("paired-thread-ids"):
649 numa["paired-threads-id"] = []
650
651 for pair in paired_threads["paired-thread-ids"]:
652 numa["paired-threads-id"].append(
653 (
654 str(pair["thread-a"]),
655 str(pair["thread-b"]),
656 )
657 )
658
659 if numa_node.get("num-threads"):
660 numa["threads"] = int(numa_node["num-threads"])
661 epa_vcpu_set = True
662
663 if numa_node.get("memory-mb"):
664 numa["memory"] = max(int(int(numa_node["memory-mb"]) / 1024), 1)
665
666 return numa, epa_vcpu_set
667
668 @staticmethod
669 def _process_guest_epa_cpu_pinning_params(
670 guest_epa_quota: Dict[str, Any],
671 vcpu_count: int,
672 epa_vcpu_set: bool,
673 ) -> Tuple[Dict[str, Any], bool]:
674 """[summary]
675
676 Args:
677 guest_epa_quota (Dict[str, Any]): [description]
678 vcpu_count (int): [description]
679 epa_vcpu_set (bool): [description]
680
681 Returns:
682 Tuple[Dict[str, Any], bool]: [description]
683 """
684 numa = {}
685 local_epa_vcpu_set = epa_vcpu_set
686
687 if (
688 guest_epa_quota.get("cpu-pinning-policy") == "DEDICATED"
689 and not epa_vcpu_set
690 ):
691 numa[
692 "cores"
693 if guest_epa_quota.get("cpu-thread-pinning-policy") != "PREFER"
694 else "threads"
695 ] = max(vcpu_count, 1)
696 local_epa_vcpu_set = True
697
698 return numa, local_epa_vcpu_set
699
700 @staticmethod
701 def _process_epa_params(
702 target_flavor: Dict[str, Any],
703 ) -> Dict[str, Any]:
704 """[summary]
705
706 Args:
707 target_flavor (Dict[str, Any]): [description]
708
709 Returns:
710 Dict[str, Any]: [description]
711 """
712 extended = {}
713 numa = {}
714
715 if target_flavor.get("guest-epa"):
716 guest_epa = target_flavor["guest-epa"]
717
718 numa, epa_vcpu_set = Ns._process_guest_epa_numa_params(
719 guest_epa_quota=guest_epa
720 )
721
722 if guest_epa.get("mempage-size"):
723 extended["mempage-size"] = guest_epa.get("mempage-size")
724
725 tmp_numa, epa_vcpu_set = Ns._process_guest_epa_cpu_pinning_params(
726 guest_epa_quota=guest_epa,
727 vcpu_count=int(target_flavor.get("vcpu-count", 1)),
728 epa_vcpu_set=epa_vcpu_set,
729 )
730 numa.update(tmp_numa)
731
732 extended.update(
733 Ns._process_guest_epa_quota_params(
734 guest_epa_quota=guest_epa,
735 epa_vcpu_set=epa_vcpu_set,
736 )
737 )
738
739 if numa:
740 extended["numas"] = [numa]
741
742 return extended
743
744 @staticmethod
745 def _process_flavor_params(
746 target_flavor: Dict[str, Any],
747 indata: Dict[str, Any],
748 vim_info: Dict[str, Any],
749 target_record_id: str,
750 **kwargs: Dict[str, Any],
751 ) -> Dict[str, Any]:
752 """[summary]
753
754 Args:
755 target_flavor (Dict[str, Any]): [description]
756 indata (Dict[str, Any]): [description]
757 vim_info (Dict[str, Any]): [description]
758 target_record_id (str): [description]
759
760 Returns:
761 Dict[str, Any]: [description]
762 """
763 flavor_data = {
764 "disk": int(target_flavor["storage-gb"]),
765 "ram": int(target_flavor["memory-mb"]),
766 "vcpus": int(target_flavor["vcpu-count"]),
767 }
768
769 target_vdur = {}
770 for vnf in indata.get("vnf", []):
771 for vdur in vnf.get("vdur", []):
772 if vdur.get("ns-flavor-id") == target_flavor["id"]:
773 target_vdur = vdur
774
775 for storage in target_vdur.get("virtual-storages", []):
776 if (
777 storage.get("type-of-storage")
778 == "etsi-nfv-descriptors:ephemeral-storage"
779 ):
780 flavor_data["ephemeral"] = int(storage.get("size-of-storage", 0))
781 elif storage.get("type-of-storage") == "etsi-nfv-descriptors:swap-storage":
782 flavor_data["swap"] = int(storage.get("size-of-storage", 0))
783
784 extended = Ns._process_epa_params(target_flavor)
785 if extended:
786 flavor_data["extended"] = extended
787
788 extra_dict = {"find_params": {"flavor_data": flavor_data}}
789 flavor_data_name = flavor_data.copy()
790 flavor_data_name["name"] = target_flavor["name"]
791 extra_dict["params"] = {"flavor_data": flavor_data_name}
792
793 return extra_dict
794
795 @staticmethod
796 def _ip_profile_to_ro(
797 ip_profile: Dict[str, Any],
798 ) -> Dict[str, Any]:
799 """[summary]
800
801 Args:
802 ip_profile (Dict[str, Any]): [description]
803
804 Returns:
805 Dict[str, Any]: [description]
806 """
807 if not ip_profile:
808 return None
809
810 ro_ip_profile = {
811 "ip_version": "IPv4"
812 if "v4" in ip_profile.get("ip-version", "ipv4")
813 else "IPv6",
814 "subnet_address": ip_profile.get("subnet-address"),
815 "gateway_address": ip_profile.get("gateway-address"),
816 "dhcp_enabled": ip_profile.get("dhcp-params", {}).get("enabled", False),
817 "dhcp_start_address": ip_profile.get("dhcp-params", {}).get(
818 "start-address", None
819 ),
820 "dhcp_count": ip_profile.get("dhcp-params", {}).get("count", None),
821 }
822
823 if ip_profile.get("dns-server"):
824 ro_ip_profile["dns_address"] = ";".join(
825 [v["address"] for v in ip_profile["dns-server"] if v.get("address")]
826 )
827
828 if ip_profile.get("security-group"):
829 ro_ip_profile["security_group"] = ip_profile["security-group"]
830
831 return ro_ip_profile
832
833 @staticmethod
834 def _process_net_params(
835 target_vld: Dict[str, Any],
836 indata: Dict[str, Any],
837 vim_info: Dict[str, Any],
838 target_record_id: str,
839 **kwargs: Dict[str, Any],
840 ) -> Dict[str, Any]:
841 """Function to process network parameters.
842
843 Args:
844 target_vld (Dict[str, Any]): [description]
845 indata (Dict[str, Any]): [description]
846 vim_info (Dict[str, Any]): [description]
847 target_record_id (str): [description]
848
849 Returns:
850 Dict[str, Any]: [description]
851 """
852 extra_dict = {}
853
854 if vim_info.get("sdn"):
855 # vnf_preffix = "vnfrs:{}".format(vnfr_id)
856 # ns_preffix = "nsrs:{}".format(nsr_id)
857 # remove the ending ".sdn
858 vld_target_record_id, _, _ = target_record_id.rpartition(".")
859 extra_dict["params"] = {
860 k: vim_info[k]
861 for k in ("sdn-ports", "target_vim", "vlds", "type")
862 if vim_info.get(k)
863 }
864
865 # TODO needed to add target_id in the dependency.
866 if vim_info.get("target_vim"):
867 extra_dict["depends_on"] = [
868 f"{vim_info.get('target_vim')} {vld_target_record_id}"
869 ]
870
871 return extra_dict
872
873 if vim_info.get("vim_network_name"):
874 extra_dict["find_params"] = {
875 "filter_dict": {
876 "name": vim_info.get("vim_network_name"),
877 },
878 }
879 elif vim_info.get("vim_network_id"):
880 extra_dict["find_params"] = {
881 "filter_dict": {
882 "id": vim_info.get("vim_network_id"),
883 },
884 }
885 elif target_vld.get("mgmt-network"):
886 extra_dict["find_params"] = {
887 "mgmt": True,
888 "name": target_vld["id"],
889 }
890 else:
891 # create
892 extra_dict["params"] = {
893 "net_name": (
894 f"{indata.get('name')[:16]}-{target_vld.get('name', target_vld.get('id'))[:16]}"
895 ),
896 "ip_profile": Ns._ip_profile_to_ro(vim_info.get("ip_profile")),
897 "provider_network_profile": vim_info.get("provider_network"),
898 }
899
900 if not target_vld.get("underlay"):
901 extra_dict["params"]["net_type"] = "bridge"
902 else:
903 extra_dict["params"]["net_type"] = (
904 "ptp" if target_vld.get("type") == "ELINE" else "data"
905 )
906
907 return extra_dict
908
909 @staticmethod
910 def _process_vdu_params(
911 target_vdu: Dict[str, Any],
912 indata: Dict[str, Any],
913 vim_info: Dict[str, Any],
914 target_record_id: str,
915 **kwargs: Dict[str, Any],
916 ) -> Dict[str, Any]:
917 """Function to process VDU parameters.
918
919 Args:
920 target_vdu (Dict[str, Any]): [description]
921 indata (Dict[str, Any]): [description]
922 vim_info (Dict[str, Any]): [description]
923 target_record_id (str): [description]
924
925 Returns:
926 Dict[str, Any]: [description]
927 """
928 vnfr_id = kwargs.get("vnfr_id")
929 nsr_id = kwargs.get("nsr_id")
930 vnfr = kwargs.get("vnfr")
931 vdu2cloud_init = kwargs.get("vdu2cloud_init")
932 tasks_by_target_record_id = kwargs.get("tasks_by_target_record_id")
933 logger = kwargs.get("logger")
934 db = kwargs.get("db")
935 fs = kwargs.get("fs")
936 ro_nsr_public_key = kwargs.get("ro_nsr_public_key")
937
938 vnf_preffix = "vnfrs:{}".format(vnfr_id)
939 ns_preffix = "nsrs:{}".format(nsr_id)
940 image_text = ns_preffix + ":image." + target_vdu["ns-image-id"]
941 flavor_text = ns_preffix + ":flavor." + target_vdu["ns-flavor-id"]
942 extra_dict = {"depends_on": [image_text, flavor_text]}
943 net_list = []
944
945 for iface_index, interface in enumerate(target_vdu["interfaces"]):
946 if interface.get("ns-vld-id"):
947 net_text = ns_preffix + ":vld." + interface["ns-vld-id"]
948 elif interface.get("vnf-vld-id"):
949 net_text = vnf_preffix + ":vld." + interface["vnf-vld-id"]
950 else:
951 logger.error(
952 "Interface {} from vdu {} not connected to any vld".format(
953 iface_index, target_vdu["vdu-name"]
954 )
955 )
956
957 continue # interface not connected to any vld
958
959 extra_dict["depends_on"].append(net_text)
960
961 if "port-security-enabled" in interface:
962 interface["port_security"] = interface.pop("port-security-enabled")
963
964 if "port-security-disable-strategy" in interface:
965 interface["port_security_disable_strategy"] = interface.pop(
966 "port-security-disable-strategy"
967 )
968
969 net_item = {
970 x: v
971 for x, v in interface.items()
972 if x
973 in (
974 "name",
975 "vpci",
976 "port_security",
977 "port_security_disable_strategy",
978 "floating_ip",
979 )
980 }
981 net_item["net_id"] = "TASK-" + net_text
982 net_item["type"] = "virtual"
983
984 # TODO mac_address: used for SR-IOV ifaces #TODO for other types
985 # TODO floating_ip: True/False (or it can be None)
986 if interface.get("type") in ("SR-IOV", "PCI-PASSTHROUGH"):
987 # mark the net create task as type data
988 if deep_get(
989 tasks_by_target_record_id,
990 net_text,
991 "params",
992 "net_type",
993 ):
994 tasks_by_target_record_id[net_text]["params"]["net_type"] = "data"
995
996 net_item["use"] = "data"
997 net_item["model"] = interface["type"]
998 net_item["type"] = interface["type"]
999 elif (
1000 interface.get("type") == "OM-MGMT"
1001 or interface.get("mgmt-interface")
1002 or interface.get("mgmt-vnf")
1003 ):
1004 net_item["use"] = "mgmt"
1005 else:
1006 # if interface.get("type") in ("VIRTIO", "E1000", "PARAVIRT"):
1007 net_item["use"] = "bridge"
1008 net_item["model"] = interface.get("type")
1009
1010 if interface.get("ip-address"):
1011 net_item["ip_address"] = interface["ip-address"]
1012
1013 if interface.get("mac-address"):
1014 net_item["mac_address"] = interface["mac-address"]
1015
1016 net_list.append(net_item)
1017
1018 if interface.get("mgmt-vnf"):
1019 extra_dict["mgmt_vnf_interface"] = iface_index
1020 elif interface.get("mgmt-interface"):
1021 extra_dict["mgmt_vdu_interface"] = iface_index
1022
1023 # cloud config
1024 cloud_config = {}
1025
1026 if target_vdu.get("cloud-init"):
1027 if target_vdu["cloud-init"] not in vdu2cloud_init:
1028 vdu2cloud_init[target_vdu["cloud-init"]] = Ns._get_cloud_init(
1029 db=db,
1030 fs=fs,
1031 location=target_vdu["cloud-init"],
1032 )
1033
1034 cloud_content_ = vdu2cloud_init[target_vdu["cloud-init"]]
1035 cloud_config["user-data"] = Ns._parse_jinja2(
1036 cloud_init_content=cloud_content_,
1037 params=target_vdu.get("additionalParams"),
1038 context=target_vdu["cloud-init"],
1039 )
1040
1041 if target_vdu.get("boot-data-drive"):
1042 cloud_config["boot-data-drive"] = target_vdu.get("boot-data-drive")
1043
1044 ssh_keys = []
1045
1046 if target_vdu.get("ssh-keys"):
1047 ssh_keys += target_vdu.get("ssh-keys")
1048
1049 if target_vdu.get("ssh-access-required"):
1050 ssh_keys.append(ro_nsr_public_key)
1051
1052 if ssh_keys:
1053 cloud_config["key-pairs"] = ssh_keys
1054
1055 disk_list = None
1056 if target_vdu.get("virtual-storages"):
1057 disk_list = [
1058 {"size": disk["size-of-storage"]}
1059 for disk in target_vdu["virtual-storages"]
1060 if disk.get("type-of-storage")
1061 == "persistent-storage:persistent-storage"
1062 ]
1063
1064 extra_dict["params"] = {
1065 "name": "{}-{}-{}-{}".format(
1066 indata["name"][:16],
1067 vnfr["member-vnf-index-ref"][:16],
1068 target_vdu["vdu-name"][:32],
1069 target_vdu.get("count-index") or 0,
1070 ),
1071 "description": target_vdu["vdu-name"],
1072 "start": True,
1073 "image_id": "TASK-" + image_text,
1074 "flavor_id": "TASK-" + flavor_text,
1075 "net_list": net_list,
1076 "cloud_config": cloud_config or None,
1077 "disk_list": disk_list,
1078 "availability_zone_index": None, # TODO
1079 "availability_zone_list": None, # TODO
1080 }
1081
1082 return extra_dict
1083
1084 def deploy(self, session, indata, version, nsr_id, *args, **kwargs):
1085 self.logger.debug("ns.deploy nsr_id={} indata={}".format(nsr_id, indata))
1086 validate_input(indata, deploy_schema)
1087 action_id = indata.get("action_id", str(uuid4()))
1088 task_index = 0
1089 # get current deployment
1090 db_nsr_update = {} # update operation on nsrs
1091 db_vnfrs_update = {}
1092 db_vnfrs = {} # vnf's info indexed by _id
1093 nb_ro_tasks = 0 # for logging
1094 vdu2cloud_init = indata.get("cloud_init_content") or {}
1095 step = ""
1096 logging_text = "Task deploy nsr_id={} action_id={} ".format(nsr_id, action_id)
1097 self.logger.debug(logging_text + "Enter")
1098
1099 try:
1100 step = "Getting ns and vnfr record from db"
1101 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1102 db_new_tasks = []
1103 tasks_by_target_record_id = {}
1104 # read from db: vnf's of this ns
1105 step = "Getting vnfrs from db"
1106 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1107
1108 if not db_vnfrs_list:
1109 raise NsException("Cannot obtain associated VNF for ns")
1110
1111 for vnfr in db_vnfrs_list:
1112 db_vnfrs[vnfr["_id"]] = vnfr
1113 db_vnfrs_update[vnfr["_id"]] = {}
1114
1115 now = time()
1116 db_ro_nsr = self.db.get_one("ro_nsrs", {"_id": nsr_id}, fail_on_empty=False)
1117
1118 if not db_ro_nsr:
1119 db_ro_nsr = self._create_db_ro_nsrs(nsr_id, now)
1120
1121 ro_nsr_public_key = db_ro_nsr["public_key"]
1122
1123 # check that action_id is not in the list of actions. Suffixed with :index
1124 if action_id in db_ro_nsr["actions"]:
1125 index = 1
1126
1127 while True:
1128 new_action_id = "{}:{}".format(action_id, index)
1129
1130 if new_action_id not in db_ro_nsr["actions"]:
1131 action_id = new_action_id
1132 self.logger.debug(
1133 logging_text
1134 + "Changing action_id in use to {}".format(action_id)
1135 )
1136 break
1137
1138 index += 1
1139
1140 def _process_items(
1141 target_list,
1142 existing_list,
1143 db_record,
1144 db_update,
1145 db_path,
1146 item,
1147 process_params,
1148 ):
1149 nonlocal db_new_tasks
1150 nonlocal tasks_by_target_record_id
1151 nonlocal action_id
1152 nonlocal nsr_id
1153 nonlocal task_index
1154 nonlocal indata
1155
1156 # ensure all the target_list elements has an "id". If not assign the index as id
1157 for target_index, tl in enumerate(target_list):
1158 if tl and not tl.get("id"):
1159 tl["id"] = str(target_index)
1160
1161 # step 1 items (networks,vdus,...) to be deleted/updated
1162 for item_index, existing_item in enumerate(existing_list):
1163 target_item = next(
1164 (t for t in target_list if t["id"] == existing_item["id"]), None
1165 )
1166
1167 for target_vim, existing_viminfo in existing_item.get(
1168 "vim_info", {}
1169 ).items():
1170 if existing_viminfo is None:
1171 continue
1172
1173 if target_item:
1174 target_viminfo = target_item.get("vim_info", {}).get(
1175 target_vim
1176 )
1177 else:
1178 target_viminfo = None
1179
1180 if target_viminfo is None:
1181 # must be deleted
1182 self._assign_vim(target_vim)
1183 target_record_id = "{}.{}".format(
1184 db_record, existing_item["id"]
1185 )
1186 item_ = item
1187
1188 if target_vim.startswith("sdn"):
1189 # item must be sdn-net instead of net if target_vim is a sdn
1190 item_ = "sdn_net"
1191 target_record_id += ".sdn"
1192
1193 deployment_info = {
1194 "action_id": action_id,
1195 "nsr_id": nsr_id,
1196 "task_index": task_index,
1197 }
1198
1199 task = Ns._create_task(
1200 deployment_info=deployment_info,
1201 target_id=target_vim,
1202 item=item_,
1203 action="DELETE",
1204 target_record=f"{db_record}.{item_index}.vim_info.{target_vim}",
1205 target_record_id=target_record_id,
1206 )
1207
1208 task_index = deployment_info.get("task_index")
1209
1210 tasks_by_target_record_id[target_record_id] = task
1211 db_new_tasks.append(task)
1212 # TODO delete
1213 # TODO check one by one the vims to be created/deleted
1214
1215 # step 2 items (networks,vdus,...) to be created
1216 for target_item in target_list:
1217 item_index = -1
1218
1219 for item_index, existing_item in enumerate(existing_list):
1220 if existing_item["id"] == target_item["id"]:
1221 break
1222 else:
1223 item_index += 1
1224 db_update[db_path + ".{}".format(item_index)] = target_item
1225 existing_list.append(target_item)
1226 existing_item = None
1227
1228 for target_vim, target_viminfo in target_item.get(
1229 "vim_info", {}
1230 ).items():
1231 existing_viminfo = None
1232
1233 if existing_item:
1234 existing_viminfo = existing_item.get("vim_info", {}).get(
1235 target_vim
1236 )
1237
1238 # TODO check if different. Delete and create???
1239 # TODO delete if not exist
1240 if existing_viminfo is not None:
1241 continue
1242
1243 target_record_id = "{}.{}".format(db_record, target_item["id"])
1244 item_ = item
1245
1246 if target_vim.startswith("sdn"):
1247 # item must be sdn-net instead of net if target_vim is a sdn
1248 item_ = "sdn_net"
1249 target_record_id += ".sdn"
1250
1251 kwargs = {}
1252 if process_params == Ns._process_vdu_params:
1253 kwargs.update(
1254 {
1255 "vnfr_id": vnfr_id,
1256 "nsr_id": nsr_id,
1257 "vnfr": vnfr,
1258 "vdu2cloud_init": vdu2cloud_init,
1259 "tasks_by_target_record_id": tasks_by_target_record_id,
1260 "logger": self.logger,
1261 "db": self.db,
1262 "fs": self.fs,
1263 "ro_nsr_public_key": ro_nsr_public_key,
1264 }
1265 )
1266
1267 extra_dict = process_params(
1268 target_item,
1269 indata,
1270 target_viminfo,
1271 target_record_id,
1272 **kwargs,
1273 )
1274 self._assign_vim(target_vim)
1275
1276 deployment_info = {
1277 "action_id": action_id,
1278 "nsr_id": nsr_id,
1279 "task_index": task_index,
1280 }
1281
1282 task = Ns._create_task(
1283 deployment_info=deployment_info,
1284 target_id=target_vim,
1285 item=item_,
1286 action="CREATE",
1287 target_record=f"{db_record}.{item_index}.vim_info.{target_vim}",
1288 target_record_id=target_record_id,
1289 extra_dict=extra_dict,
1290 )
1291
1292 task_index = deployment_info.get("task_index")
1293
1294 tasks_by_target_record_id[target_record_id] = task
1295 db_new_tasks.append(task)
1296
1297 if target_item.get("common_id"):
1298 task["common_id"] = target_item["common_id"]
1299
1300 db_update[db_path + ".{}".format(item_index)] = target_item
1301
1302 def _process_action(indata):
1303 nonlocal db_new_tasks
1304 nonlocal action_id
1305 nonlocal nsr_id
1306 nonlocal task_index
1307 nonlocal db_vnfrs
1308 nonlocal db_ro_nsr
1309
1310 if indata["action"]["action"] == "inject_ssh_key":
1311 key = indata["action"].get("key")
1312 user = indata["action"].get("user")
1313 password = indata["action"].get("password")
1314
1315 for vnf in indata.get("vnf", ()):
1316 if vnf["_id"] not in db_vnfrs:
1317 raise NsException("Invalid vnf={}".format(vnf["_id"]))
1318
1319 db_vnfr = db_vnfrs[vnf["_id"]]
1320
1321 for target_vdu in vnf.get("vdur", ()):
1322 vdu_index, vdur = next(
1323 (
1324 i_v
1325 for i_v in enumerate(db_vnfr["vdur"])
1326 if i_v[1]["id"] == target_vdu["id"]
1327 ),
1328 (None, None),
1329 )
1330
1331 if not vdur:
1332 raise NsException(
1333 "Invalid vdu vnf={}.{}".format(
1334 vnf["_id"], target_vdu["id"]
1335 )
1336 )
1337
1338 target_vim, vim_info = next(
1339 k_v for k_v in vdur["vim_info"].items()
1340 )
1341 self._assign_vim(target_vim)
1342 target_record = "vnfrs:{}:vdur.{}.ssh_keys".format(
1343 vnf["_id"], vdu_index
1344 )
1345 extra_dict = {
1346 "depends_on": [
1347 "vnfrs:{}:vdur.{}".format(vnf["_id"], vdur["id"])
1348 ],
1349 "params": {
1350 "ip_address": vdur.get("ip-address"),
1351 "user": user,
1352 "key": key,
1353 "password": password,
1354 "private_key": db_ro_nsr["private_key"],
1355 "salt": db_ro_nsr["_id"],
1356 "schema_version": db_ro_nsr["_admin"][
1357 "schema_version"
1358 ],
1359 },
1360 }
1361
1362 deployment_info = {
1363 "action_id": action_id,
1364 "nsr_id": nsr_id,
1365 "task_index": task_index,
1366 }
1367
1368 task = Ns._create_task(
1369 deployment_info=deployment_info,
1370 target_id=target_vim,
1371 item="vdu",
1372 action="EXEC",
1373 target_record=target_record,
1374 target_record_id=None,
1375 extra_dict=extra_dict,
1376 )
1377
1378 task_index = deployment_info.get("task_index")
1379
1380 db_new_tasks.append(task)
1381
1382 with self.write_lock:
1383 if indata.get("action"):
1384 _process_action(indata)
1385 else:
1386 # compute network differences
1387 # NS.vld
1388 step = "process NS VLDs"
1389 _process_items(
1390 target_list=indata["ns"]["vld"] or [],
1391 existing_list=db_nsr.get("vld") or [],
1392 db_record="nsrs:{}:vld".format(nsr_id),
1393 db_update=db_nsr_update,
1394 db_path="vld",
1395 item="net",
1396 process_params=Ns._process_net_params,
1397 )
1398
1399 step = "process NS images"
1400 _process_items(
1401 target_list=indata.get("image") or [],
1402 existing_list=db_nsr.get("image") or [],
1403 db_record="nsrs:{}:image".format(nsr_id),
1404 db_update=db_nsr_update,
1405 db_path="image",
1406 item="image",
1407 process_params=Ns._process_image_params,
1408 )
1409
1410 step = "process NS flavors"
1411 _process_items(
1412 target_list=indata.get("flavor") or [],
1413 existing_list=db_nsr.get("flavor") or [],
1414 db_record="nsrs:{}:flavor".format(nsr_id),
1415 db_update=db_nsr_update,
1416 db_path="flavor",
1417 item="flavor",
1418 process_params=Ns._process_flavor_params,
1419 )
1420
1421 # VNF.vld
1422 for vnfr_id, vnfr in db_vnfrs.items():
1423 # vnfr_id need to be set as global variable for among others nested method _process_vdu_params
1424 step = "process VNF={} VLDs".format(vnfr_id)
1425 target_vnf = next(
1426 (
1427 vnf
1428 for vnf in indata.get("vnf", ())
1429 if vnf["_id"] == vnfr_id
1430 ),
1431 None,
1432 )
1433 target_list = target_vnf.get("vld") if target_vnf else None
1434 _process_items(
1435 target_list=target_list or [],
1436 existing_list=vnfr.get("vld") or [],
1437 db_record="vnfrs:{}:vld".format(vnfr_id),
1438 db_update=db_vnfrs_update[vnfr["_id"]],
1439 db_path="vld",
1440 item="net",
1441 process_params=Ns._process_net_params,
1442 )
1443
1444 target_list = target_vnf.get("vdur") if target_vnf else None
1445 step = "process VNF={} VDUs".format(vnfr_id)
1446 _process_items(
1447 target_list=target_list or [],
1448 existing_list=vnfr.get("vdur") or [],
1449 db_record="vnfrs:{}:vdur".format(vnfr_id),
1450 db_update=db_vnfrs_update[vnfr["_id"]],
1451 db_path="vdur",
1452 item="vdu",
1453 process_params=Ns._process_vdu_params,
1454 )
1455
1456 for db_task in db_new_tasks:
1457 step = "Updating database, Appending tasks to ro_tasks"
1458 target_id = db_task.pop("target_id")
1459 common_id = db_task.get("common_id")
1460
1461 if common_id:
1462 if self.db.set_one(
1463 "ro_tasks",
1464 q_filter={
1465 "target_id": target_id,
1466 "tasks.common_id": common_id,
1467 },
1468 update_dict={"to_check_at": now, "modified_at": now},
1469 push={"tasks": db_task},
1470 fail_on_empty=False,
1471 ):
1472 continue
1473
1474 if not self.db.set_one(
1475 "ro_tasks",
1476 q_filter={
1477 "target_id": target_id,
1478 "tasks.target_record": db_task["target_record"],
1479 },
1480 update_dict={"to_check_at": now, "modified_at": now},
1481 push={"tasks": db_task},
1482 fail_on_empty=False,
1483 ):
1484 # Create a ro_task
1485 step = "Updating database, Creating ro_tasks"
1486 db_ro_task = Ns._create_ro_task(target_id, db_task)
1487 nb_ro_tasks += 1
1488 self.db.create("ro_tasks", db_ro_task)
1489
1490 step = "Updating database, nsrs"
1491 if db_nsr_update:
1492 self.db.set_one("nsrs", {"_id": nsr_id}, db_nsr_update)
1493
1494 for vnfr_id, db_vnfr_update in db_vnfrs_update.items():
1495 if db_vnfr_update:
1496 step = "Updating database, vnfrs={}".format(vnfr_id)
1497 self.db.set_one("vnfrs", {"_id": vnfr_id}, db_vnfr_update)
1498
1499 self.logger.debug(
1500 logging_text
1501 + "Exit. Created {} ro_tasks; {} tasks".format(
1502 nb_ro_tasks, len(db_new_tasks)
1503 )
1504 )
1505
1506 return (
1507 {"status": "ok", "nsr_id": nsr_id, "action_id": action_id},
1508 action_id,
1509 True,
1510 )
1511 except Exception as e:
1512 if isinstance(e, (DbException, NsException)):
1513 self.logger.error(
1514 logging_text + "Exit Exception while '{}': {}".format(step, e)
1515 )
1516 else:
1517 e = traceback_format_exc()
1518 self.logger.critical(
1519 logging_text + "Exit Exception while '{}': {}".format(step, e),
1520 exc_info=True,
1521 )
1522
1523 raise NsException(e)
1524
1525 def delete(self, session, indata, version, nsr_id, *args, **kwargs):
1526 self.logger.debug("ns.delete version={} nsr_id={}".format(version, nsr_id))
1527 # self.db.del_list({"_id": ro_task["_id"], "tasks.nsr_id.ne": nsr_id})
1528
1529 with self.write_lock:
1530 try:
1531 NsWorker.delete_db_tasks(self.db, nsr_id, None)
1532 except NsWorkerException as e:
1533 raise NsException(e)
1534
1535 return None, None, True
1536
1537 def status(self, session, indata, version, nsr_id, action_id, *args, **kwargs):
1538 # self.logger.debug("ns.status version={} nsr_id={}, action_id={} indata={}"
1539 # .format(version, nsr_id, action_id, indata))
1540 task_list = []
1541 done = 0
1542 total = 0
1543 ro_tasks = self.db.get_list("ro_tasks", {"tasks.action_id": action_id})
1544 global_status = "DONE"
1545 details = []
1546
1547 for ro_task in ro_tasks:
1548 for task in ro_task["tasks"]:
1549 if task and task["action_id"] == action_id:
1550 task_list.append(task)
1551 total += 1
1552
1553 if task["status"] == "FAILED":
1554 global_status = "FAILED"
1555 error_text = "Error at {} {}: {}".format(
1556 task["action"].lower(),
1557 task["item"],
1558 ro_task["vim_info"].get("vim_details") or "unknown",
1559 )
1560 details.append(error_text)
1561 elif task["status"] in ("SCHEDULED", "BUILD"):
1562 if global_status != "FAILED":
1563 global_status = "BUILD"
1564 else:
1565 done += 1
1566
1567 return_data = {
1568 "status": global_status,
1569 "details": ". ".join(details)
1570 if details
1571 else "progress {}/{}".format(done, total),
1572 "nsr_id": nsr_id,
1573 "action_id": action_id,
1574 "tasks": task_list,
1575 }
1576
1577 return return_data, None, True
1578
1579 def cancel(self, session, indata, version, nsr_id, action_id, *args, **kwargs):
1580 print(
1581 "ns.cancel session={} indata={} version={} nsr_id={}, action_id={}".format(
1582 session, indata, version, nsr_id, action_id
1583 )
1584 )
1585
1586 return None, None, True
1587
1588 def get_deploy(self, session, indata, version, nsr_id, action_id, *args, **kwargs):
1589 nsrs = self.db.get_list("nsrs", {})
1590 return_data = []
1591
1592 for ns in nsrs:
1593 return_data.append({"_id": ns["_id"], "name": ns["name"]})
1594
1595 return return_data, None, True
1596
1597 def get_actions(self, session, indata, version, nsr_id, action_id, *args, **kwargs):
1598 ro_tasks = self.db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
1599 return_data = []
1600
1601 for ro_task in ro_tasks:
1602 for task in ro_task["tasks"]:
1603 if task["action_id"] not in return_data:
1604 return_data.append(task["action_id"])
1605
1606 return return_data, None, True