Coverage for osmclient/common/utils.py: 32%

80 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2024-06-22 09:01 +0000

1# Copyright 2017 Sandvine 

2# 

3# All Rights Reserved. 

4# 

5# Licensed under the Apache License, Version 2.0 (the "License"); you may 

6# not use this file except in compliance with the License. You may obtain 

7# a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

14# License for the specific language governing permissions and limitations 

15# under the License. 

16 

17import time 

18from uuid import UUID 

19import hashlib 

20import tarfile 

21from zipfile import ZipFile 

22import re 

23import yaml 

24 

25 

26def wait_for_value(func, result=True, wait_time=10, catch_exception=None): 

27 maxtime = time.time() + wait_time 

28 while time.time() < maxtime: 

29 try: 

30 if func() == result: 

31 return True 

32 except catch_exception: 

33 pass 

34 time.sleep(5) 

35 try: 

36 return func() == result 

37 except catch_exception: 

38 return False 

39 

40 

41def validate_uuid4(uuid_text): 

42 try: 

43 UUID(uuid_text) 

44 return True 

45 except (ValueError, TypeError): 

46 return False 

47 

48 

49def md5(fname): 

50 """ 

51 Checksum generator 

52 :param fname: file path 

53 :return: checksum string 

54 """ 

55 hash_md5 = hashlib.md5() 

56 with open(fname, "rb") as f: 

57 for chunk in iter(lambda: f.read(4096), b""): 

58 hash_md5.update(chunk) 

59 return hash_md5.hexdigest() 

60 

61 

62def get_key_val_from_pkg(descriptor_file): 

63 if descriptor_file.split(".")[-1] == "zip": 

64 return get_key_val_from_pkg_sol004(descriptor_file) 

65 else: 

66 return get_key_val_from_pkg_old(descriptor_file) 

67 

68 

69def get_key_val_from_pkg_sol004(package_file): 

70 """Method opens up a package and finds the name of the resulting 

71 descriptor (vnfd or nsd name), using SOL004 spec 

72 """ 

73 with ZipFile(package_file) as zipfile: 

74 yamlfile = None 

75 for filedata in zipfile.infolist(): 

76 if ( 

77 re.match(".*.yaml", filedata.filename) 

78 and filedata.filename.find("Scripts") < 0 

79 ): 

80 yamlfile = filedata.filename 

81 break 

82 if yamlfile is None: 

83 return None 

84 

85 return get_key_val_from_descriptor(zipfile.open(yamlfile)) 

86 

87 

88def get_key_val_from_pkg_old(descriptor_file): 

89 """Method opens up a package and finds the name of the resulting 

90 descriptor (vnfd or nsd name) 

91 """ 

92 tar = tarfile.open(descriptor_file) 

93 yamlfile = None 

94 for member in tar.getmembers(): 

95 if re.match(".*.yaml", member.name) and len(member.name.split("/")) == 2: 

96 yamlfile = member.name 

97 break 

98 if yamlfile is None: 

99 return None 

100 

101 result = get_key_val_from_descriptor(tar.extractfile(yamlfile)) 

102 

103 tar.close() 

104 return result 

105 

106 

107def get_key_val_from_descriptor(descriptor): 

108 dict = yaml.safe_load(descriptor) 

109 result = {} 

110 for k in dict: 

111 if "nsd" in k: 

112 result["type"] = "nsd" 

113 else: 

114 result["type"] = "vnfd" 

115 

116 if "type" not in result: 

117 for k1, v1 in list(dict.items()): 

118 if not k1.endswith("-catalog"): 

119 continue 

120 for k2, v2 in v1.items(): 

121 if not k2.endswith("nsd") and not k2.endswith("vnfd"): 

122 continue 

123 if "nsd" in k2: 

124 result["type"] = "nsd" 

125 else: 

126 result["type"] = "vnfd" 

127 for entry in v2: 

128 for k3, v3 in list(entry.items()): 

129 # strip off preceeding chars before : 

130 key_name = k3.split(":").pop() 

131 result[key_name] = v3 

132 return result