return vnfd.get("kdu", ())
+def get_kdu(vnfd, kdu_name):
+ return list_utils.find_in_list(
+ get_kdu_list(vnfd), lambda kdu: kdu["name"] == kdu_name
+ )
+
+
+def get_kdu_services(kdu):
+ return kdu.get("service", [])
+
+
def get_ee_sorted_initial_config_primitive_list(
primitive_list, vca_deployed, ee_descriptor_id
):
get_vnf_profiles,
)
from osm_lcm.data_utils.vnfd import (
+ get_kdu,
+ get_kdu_services,
get_relation_list,
get_vdu_list,
get_vdu_profile,
from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
from osm_lcm.lcm_helm_conn import LCMHelmConn
+from osm_lcm.osm_config import OsmConfigBuilder
from osm_lcm.prometheus import parse_job
from copy import copy, deepcopy
:param nsr_id:
:param vnfr_id:
:param kdu_name:
- :return: IP address
+ :return: IP address, K8s services
"""
# self.logger.debug(logging_text + "Starting wait_kdu_up")
)
if kdur.get("status"):
if kdur["status"] in ("READY", "ENABLED"):
- return kdur.get("ip-address")
+ return kdur.get("ip-address"), kdur.get("services")
else:
raise LcmException(
"target KDU={} is in error state".format(kdu_name)
# wait for RO (ip-address) Insert pub_key into VM
if vnfr_id:
if kdu_name:
- rw_mgmt_ip = await self.wait_kdu_up(
+ rw_mgmt_ip, services = await self.wait_kdu_up(
logging_text, nsr_id, vnfr_id, kdu_name
)
+ vnfd = self.db.get_one(
+ "vnfds_revisions",
+ {"_id": f'{db_vnfr["vnfd-id"]}:{db_vnfr["revision"]}'},
+ )
+ kdu = get_kdu(vnfd, kdu_name)
+ kdu_services = [
+ service["name"] for service in get_kdu_services(kdu)
+ ]
+ exposed_services = []
+ for service in services:
+ if any(s in service["name"] for s in kdu_services):
+ exposed_services.append(service)
+ await self.vca_map[vca_type].exec_primitive(
+ ee_id=ee_id,
+ primitive_name="config",
+ params_dict={
+ "osm-config": json.dumps(
+ OsmConfigBuilder(
+ k8s={"services": exposed_services}
+ ).build()
+ )
+ },
+ vca_id=vca_id,
+ )
else:
rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
logging_text,
user=user,
pub_key=pub_key,
)
+
else:
rw_mgmt_ip = None # This is for a NS configuration
--- /dev/null
+# Copyright 2022 Canonical Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import Any, Dict, List, Optional
+from pydantic import BaseModel, validator
+
+
+def _get_ip_from_service(service: Dict[str, Any]) -> List[str]:
+ return (
+ [service["cluster_ip"]]
+ if service["type"] == "ClusterIP"
+ else service["external_ip"]
+ )
+
+
+class K8sConfigV0(BaseModel):
+ services: List[Dict]
+
+ @validator("services")
+ def parse_services(cls, services: Dict[str, Any]):
+ return {
+ service["name"]: {
+ "type": service["type"],
+ "ip": _get_ip_from_service(service),
+ "ports": {
+ port["name"]: {
+ "port": port["port"],
+ "protocol": port["protocol"],
+ }
+ for port in service["ports"]
+ },
+ }
+ for service in services
+ }
+
+
+class OsmConfigV0(BaseModel):
+ k8s: Optional[K8sConfigV0]
+
+
+class OsmConfig(BaseModel):
+ v0: OsmConfigV0
+
+
+class OsmConfigBuilder:
+ def __init__(self, k8s: Dict[str, Any] = {}) -> None:
+ self._k8s = k8s
+ self._configs = {}
+ if k8s:
+ self._configs["k8s"] = k8s
+
+ def build(self) -> Dict[str, Any]:
+ return OsmConfig(v0=OsmConfigV0(**self._configs)).dict()
idna
jinja2
pyyaml==5.4.1
+pydantic
# yarl
protobuf==3.19.3
# via grpcio-tools
+pydantic==1.9.0
+ # via -r requirements.in
pyyaml==5.4.1
# via -r requirements.in
six==1.16.0