Coverage for osmclient/common/utils.py: 32%
80 statements
« prev ^ index » next coverage.py v7.3.1, created at 2024-06-29 09:50 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2024-06-29 09:50 +0000
1# Copyright 2017 Sandvine
2#
3# All Rights Reserved.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
17import time
18from uuid import UUID
19import hashlib
20import tarfile
21from zipfile import ZipFile
22import re
23import yaml
26def wait_for_value(func, result=True, wait_time=10, catch_exception=None):
27 maxtime = time.time() + wait_time
28 while time.time() < maxtime:
29 try:
30 if func() == result:
31 return True
32 except catch_exception:
33 pass
34 time.sleep(5)
35 try:
36 return func() == result
37 except catch_exception:
38 return False
41def validate_uuid4(uuid_text):
42 try:
43 UUID(uuid_text)
44 return True
45 except (ValueError, TypeError):
46 return False
49def md5(fname):
50 """
51 Checksum generator
52 :param fname: file path
53 :return: checksum string
54 """
55 hash_md5 = hashlib.md5()
56 with open(fname, "rb") as f:
57 for chunk in iter(lambda: f.read(4096), b""):
58 hash_md5.update(chunk)
59 return hash_md5.hexdigest()
62def get_key_val_from_pkg(descriptor_file):
63 if descriptor_file.split(".")[-1] == "zip":
64 return get_key_val_from_pkg_sol004(descriptor_file)
65 else:
66 return get_key_val_from_pkg_old(descriptor_file)
69def get_key_val_from_pkg_sol004(package_file):
70 """Method opens up a package and finds the name of the resulting
71 descriptor (vnfd or nsd name), using SOL004 spec
72 """
73 with ZipFile(package_file) as zipfile:
74 yamlfile = None
75 for filedata in zipfile.infolist():
76 if (
77 re.match(".*.yaml", filedata.filename)
78 and filedata.filename.find("Scripts") < 0
79 ):
80 yamlfile = filedata.filename
81 break
82 if yamlfile is None:
83 return None
85 return get_key_val_from_descriptor(zipfile.open(yamlfile))
88def get_key_val_from_pkg_old(descriptor_file):
89 """Method opens up a package and finds the name of the resulting
90 descriptor (vnfd or nsd name)
91 """
92 tar = tarfile.open(descriptor_file)
93 yamlfile = None
94 for member in tar.getmembers():
95 if re.match(".*.yaml", member.name) and len(member.name.split("/")) == 2:
96 yamlfile = member.name
97 break
98 if yamlfile is None:
99 return None
101 result = get_key_val_from_descriptor(tar.extractfile(yamlfile))
103 tar.close()
104 return result
107def get_key_val_from_descriptor(descriptor):
108 dict = yaml.safe_load(descriptor)
109 result = {}
110 for k in dict:
111 if "nsd" in k:
112 result["type"] = "nsd"
113 else:
114 result["type"] = "vnfd"
116 if "type" not in result:
117 for k1, v1 in list(dict.items()):
118 if not k1.endswith("-catalog"):
119 continue
120 for k2, v2 in v1.items():
121 if not k2.endswith("nsd") and not k2.endswith("vnfd"):
122 continue
123 if "nsd" in k2:
124 result["type"] = "nsd"
125 else:
126 result["type"] = "vnfd"
127 for entry in v2:
128 for k3, v3 in list(entry.items()):
129 # strip off preceeding chars before :
130 key_name = k3.split(":").pop()
131 result[key_name] = v3
132 return result