Code Coverage

Cobertura Coverage Report > osmclient.common >

utils.py

Trend

File Coverage summary

NameClassesLinesConditionals
utils.py
100%
1/1
33%
26/80
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
utils.py
33%
26/80
N/A

Source

osmclient/common/utils.py
1 # Copyright 2017 Sandvine
2 #
3 # All Rights Reserved.
4 #
5 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
6 #    not use this file except in compliance with the License. You may obtain
7 #    a copy of the License at
8 #
9 #         http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #    Unless required by applicable law or agreed to in writing, software
12 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 #    License for the specific language governing permissions and limitations
15 #    under the License.
16
17 1 import time
18 1 from uuid import UUID
19 1 import hashlib
20 1 import tarfile
21 1 from zipfile import ZipFile
22 1 import re
23 1 import yaml
24
25
26 1 def wait_for_value(func, result=True, wait_time=10, catch_exception=None):
27 1     maxtime = time.time() + wait_time
28 1     while time.time() < maxtime:
29 1         try:
30 1             if func() == result:
31 1                 return True
32 1         except catch_exception:
33 1             pass
34 1         time.sleep(5)
35 1     try:
36 1         return func() == result
37 1     except catch_exception:
38 1         return False
39
40
41 1 def validate_uuid4(uuid_text):
42 0     try:
43 0         UUID(uuid_text)
44 0         return True
45 0     except (ValueError, TypeError):
46 0         return False
47
48
49 1 def md5(fname):
50     """
51     Checksum generator
52     :param fname: file path
53     :return: checksum string
54     """
55 0     hash_md5 = hashlib.md5()
56 0     with open(fname, "rb") as f:
57 0         for chunk in iter(lambda: f.read(4096), b""):
58 0             hash_md5.update(chunk)
59 0     return hash_md5.hexdigest()
60
61
62 1 def get_key_val_from_pkg(descriptor_file):
63 0     if descriptor_file.split(".")[-1] == "zip":
64 0         return get_key_val_from_pkg_sol004(descriptor_file)
65     else:
66 0         return get_key_val_from_pkg_old(descriptor_file)
67
68
69 1 def get_key_val_from_pkg_sol004(package_file):
70     """Method opens up a package and finds the name of the resulting
71     descriptor (vnfd or nsd name), using SOL004 spec
72     """
73 0     with ZipFile(package_file) as zipfile:
74 0         yamlfile = None
75 0         for filedata in zipfile.infolist():
76 0             if (
77                 re.match(".*.yaml", filedata.filename)
78                 and filedata.filename.find("Scripts") < 0
79             ):
80 0                 yamlfile = filedata.filename
81 0                 break
82 0         if yamlfile is None:
83 0             return None
84
85 0         return get_key_val_from_descriptor(zipfile.open(yamlfile))
86
87
88 1 def get_key_val_from_pkg_old(descriptor_file):
89     """Method opens up a package and finds the name of the resulting
90     descriptor (vnfd or nsd name)
91     """
92 0     tar = tarfile.open(descriptor_file)
93 0     yamlfile = None
94 0     for member in tar.getmembers():
95 0         if re.match(".*.yaml", member.name) and len(member.name.split("/")) == 2:
96 0             yamlfile = member.name
97 0             break
98 0     if yamlfile is None:
99 0         return None
100
101 0     result = get_key_val_from_descriptor(tar.extractfile(yamlfile))
102
103 0     tar.close()
104 0     return result
105
106
107 1 def get_key_val_from_descriptor(descriptor):
108 0     dict = yaml.safe_load(descriptor)
109 0     result = {}
110 0     for k in dict:
111 0         if "nsd" in k:
112 0             result["type"] = "nsd"
113         else:
114 0             result["type"] = "vnfd"
115
116 0     if "type" not in result:
117 0         for k1, v1 in list(dict.items()):
118 0             if not k1.endswith("-catalog"):
119 0                 continue
120 0             for k2, v2 in v1.items():
121 0                 if not k2.endswith("nsd") and not k2.endswith("vnfd"):
122 0                     continue
123 0                 if "nsd" in k2:
124 0                     result["type"] = "nsd"
125                 else:
126 0                     result["type"] = "vnfd"
127 0                 for entry in v2:
128 0                     for k3, v3 in list(entry.items()):
129                         # strip off preceeding chars before :
130 0                         key_name = k3.split(":").pop()
131 0                         result[key_name] = v3
132 0     return result