Fixes VCA deletion in NS termination
[osm/LCM.git] / osm_lcm / osm_config.py
1 # Copyright 2022 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 from typing import Any, Dict, List, Optional
16 from pydantic import BaseModel, validator
17
18
19 def _get_ip_from_service(service: Dict[str, Any]) -> List[str]:
20 return (
21 [service["cluster_ip"]]
22 if service["type"] == "ClusterIP"
23 else service["external_ip"]
24 )
25
26
27 class K8sConfigV0(BaseModel):
28 services: List[Dict]
29
30 @validator("services")
31 def parse_services(cls, services: Dict[str, Any]):
32 return {
33 service["name"]: {
34 "type": service["type"],
35 "ip": _get_ip_from_service(service),
36 "ports": {
37 port["name"]: {
38 "port": port["port"],
39 "protocol": port["protocol"],
40 }
41 for port in service["ports"]
42 },
43 }
44 for service in services
45 }
46
47
48 class OsmConfigV0(BaseModel):
49 k8s: Optional[K8sConfigV0]
50
51
52 class OsmConfig(BaseModel):
53 v0: OsmConfigV0
54
55
56 class OsmConfigBuilder:
57 def __init__(self, k8s: Dict[str, Any] = {}) -> None:
58 self._k8s = k8s
59 self._configs = {}
60 if k8s:
61 self._configs["k8s"] = k8s
62
63 def build(self) -> Dict[str, Any]:
64 return OsmConfig(v0=OsmConfigV0(**self._configs)).dict()