1 # Copyright 2022 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 from typing
import Any
, Dict
, List
, Optional
16 from pydantic
import BaseModel
, validator
19 def _get_ip_from_service(service
: Dict
[str, Any
]) -> List
[str]:
21 [service
["cluster_ip"]]
22 if service
["type"] == "ClusterIP"
23 else service
["external_ip"]
27 class K8sConfigV0(BaseModel
):
30 @validator("services")
32 def parse_services(cls
, services
: Dict
[str, Any
]):
35 "type": service
["type"],
36 "ip": _get_ip_from_service(service
),
40 "protocol": port
["protocol"],
42 for port
in service
["ports"]
45 for service
in services
49 class OsmConfigV0(BaseModel
):
50 k8s
: Optional
[K8sConfigV0
]
53 class OsmConfig(BaseModel
):
57 class OsmConfigBuilder
:
58 def __init__(self
, k8s
: Dict
[str, Any
] = {}) -> None:
62 self
._configs
["k8s"] = k8s
64 def build(self
) -> Dict
[str, Any
]:
65 return OsmConfig(v0
=OsmConfigV0(**self
._configs
)).dict()