Feature-9904: Enhancing NG-UI to enable Juju operational view dashboard
[osm/N2VC.git] / n2vc / utils.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 from enum import Enum
16 from juju.machine import Machine
17 from juju.application import Application
18 from juju.action import Action
19 from juju.unit import Unit
20 import yaml
21
22
23 class N2VCDeploymentStatus(Enum):
24 PENDING = "pending"
25 RUNNING = "running"
26 COMPLETED = "completed"
27 FAILED = "failed"
28 UNKNOWN = "unknown"
29
30
31 class Dict(dict):
32 """
33 Dict class that allows to access the keys like attributes
34 """
35
36 def __getattribute__(self, name):
37 if name in self:
38 return self[name]
39
40
41 class EntityType(Enum):
42 MACHINE = Machine
43 APPLICATION = Application
44 ACTION = Action
45 UNIT = Unit
46
47 @classmethod
48 def has_value(cls, value):
49 return value in cls._value2member_map_ # pylint: disable=E1101
50
51 @classmethod
52 def get_entity(cls, value):
53 return (
54 cls._value2member_map_[value] # pylint: disable=E1101
55 if value in cls._value2member_map_ # pylint: disable=E1101
56 else None # pylint: disable=E1101
57 )
58
59 @classmethod
60 def get_entity_from_delta(cls, delta_entity: str):
61 """
62 Get Value from delta entity
63
64 :param: delta_entity: Possible values are "machine", "application", "unit", "action"
65 """
66 for v in cls._value2member_map_: # pylint: disable=E1101
67 if v.__name__.lower() == delta_entity:
68 return cls.get_entity(v)
69
70
71 JujuStatusToOSM = {
72 "machine": {
73 "pending": N2VCDeploymentStatus.PENDING,
74 "started": N2VCDeploymentStatus.COMPLETED,
75 },
76 "application": {
77 "waiting": N2VCDeploymentStatus.RUNNING,
78 "maintenance": N2VCDeploymentStatus.RUNNING,
79 "blocked": N2VCDeploymentStatus.RUNNING,
80 "error": N2VCDeploymentStatus.FAILED,
81 "active": N2VCDeploymentStatus.COMPLETED,
82 },
83 "action": {
84 "pending": N2VCDeploymentStatus.PENDING,
85 "running": N2VCDeploymentStatus.RUNNING,
86 "completed": N2VCDeploymentStatus.COMPLETED,
87 },
88 "unit": {
89 "waiting": N2VCDeploymentStatus.RUNNING,
90 "maintenance": N2VCDeploymentStatus.RUNNING,
91 "blocked": N2VCDeploymentStatus.RUNNING,
92 "error": N2VCDeploymentStatus.FAILED,
93 "active": N2VCDeploymentStatus.COMPLETED,
94 },
95 }
96
97 DB_DATA = Dict(
98 {
99 "api_endpoints": Dict(
100 {"table": "admin", "filter": {"_id": "juju"}, "key": "api_endpoints"}
101 )
102 }
103 )
104
105
106 def obj_to_yaml(obj: object) -> str:
107 """
108 Converts object to yaml format
109 :return: yaml data
110 """
111 # dump to yaml
112 dump_text = yaml.dump(obj, default_flow_style=False, indent=2)
113 # split lines
114 lines = dump_text.splitlines()
115 # remove !!python/object tags
116 yaml_text = ""
117 for line in lines:
118 index = line.find("!!python/object")
119 if index >= 0:
120 line = line[:index]
121 yaml_text += line + "\n"
122 return yaml_text
123
124
125 def obj_to_dict(obj: object) -> dict:
126 """
127 Converts object to dictionary format
128 :return: dict data
129 """
130 # convert obj to yaml
131 yaml_text = obj_to_yaml(obj)
132 # parse to dict
133 return yaml.load(yaml_text, Loader=yaml.Loader)