Coverage for osmclient/sol005/vnf.py: 18%
56 statements
« prev ^ index » next coverage.py v7.3.1, created at 2024-06-22 09:01 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2024-06-22 09:01 +0000
1# Copyright 2018 Telefonica
2#
3# All Rights Reserved.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
17"""
18OSM vnf API handling
19"""
21from osmclient.common import utils
22from osmclient.common.exceptions import NotFound
23import logging
24import json
27class Vnf(object):
28 def __init__(self, http=None, client=None):
29 self._http = http
30 self._client = client
31 self._logger = logging.getLogger("osmclient")
32 self._apiName = "/nslcm"
33 self._apiVersion = "/v1"
34 self._apiResource = "/vnfrs"
35 self._apiBase = "{}{}{}".format(
36 self._apiName, self._apiVersion, self._apiResource
37 )
39 def list(self, ns=None, filter=None):
40 """Returns a list of VNF instances"""
41 self._logger.debug("")
42 self._client.get_token()
43 filter_string = ""
44 if filter:
45 filter_string = "?{}".format(filter)
46 if ns:
47 ns_instance = self._client.ns.get(ns)
48 if filter_string:
49 filter_string += "&nsr-id-ref={}".format(ns_instance["_id"])
50 else:
51 filter_string = "?nsr-id-ref={}".format(ns_instance["_id"])
52 _, resp = self._http.get2_cmd("{}{}".format(self._apiBase, filter_string))
53 # print('RESP: {}'.format(resp))
54 if resp:
55 return json.loads(resp)
56 return list()
58 def get(self, name):
59 """Returns a VNF instance based on name or id"""
60 self._logger.debug("")
61 self._client.get_token()
62 if utils.validate_uuid4(name):
63 for vnf in self.list():
64 if name == vnf["_id"]:
65 return vnf
66 else:
67 for vnf in self.list():
68 if name == vnf.get("name"):
69 return vnf
70 raise NotFound("vnf {} not found".format(name))
72 def get_individual(self, name):
73 self._logger.debug("")
74 self._client.get_token()
75 vnf_id = name
76 if not utils.validate_uuid4(name):
77 for vnf in self.list():
78 if name == vnf["name"]:
79 vnf_id = vnf["_id"]
80 break
81 try:
82 _, resp = self._http.get2_cmd("{}/{}".format(self._apiBase, vnf_id))
83 # print('RESP: {}'.format(resp))
84 if resp:
85 return json.loads(resp)
86 except NotFound:
87 raise NotFound("vnf '{}' not found".format(name))
88 raise NotFound("vnf '{}' not found".format(name))