Coverage for osmclient/common/http.py: 18%

71 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2024-06-30 09:54 +0000

1# Copyright 2017 Sandvine 

2# 

3# All Rights Reserved. 

4# 

5# Licensed under the Apache License, Version 2.0 (the "License"); you may 

6# not use this file except in compliance with the License. You may obtain 

7# a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

14# License for the specific language governing permissions and limitations 

15# under the License. 

16 

17import requests 

18import urllib3 

19from requests.auth import HTTPBasicAuth 

20import json 

21import logging 

22 

23urllib3.disable_warnings() 

24 

25 

26class Http(object): 

27 def __init__(self, url, user="admin", password="admin"): 

28 self._url = url 

29 self._user = user 

30 self._password = password 

31 self._http_header = None 

32 self._logger = logging.getLogger("osmclient") 

33 

34 def set_http_header(self, header): 

35 self._http_header = header 

36 

37 def _get_requests_cmd(self, endpoint): 

38 requests_cmd = requests.Request() 

39 requests_cmd.url = self._url + endpoint 

40 requests_cmd.auth = HTTPBasicAuth(self._user, self._password) 

41 if self._http_header: 

42 requests_cmd.headers = self._http_header 

43 return requests_cmd 

44 

45 def get_cmd(self, endpoint): 

46 session_cmd = requests.Session() 

47 requests_cmd = self._get_requests_cmd(endpoint) 

48 requests_cmd.method = "GET" 

49 self._logger.info( 

50 "Request METHOD: {} URL: {}".format("GET", self._url + endpoint) 

51 ) 

52 requests_cmd = requests_cmd.prepare() 

53 resp = session_cmd.send(requests_cmd, verify=False) 

54 http_code = resp.status_code 

55 self._logger.info("Response HTTPCODE: {}".format(http_code)) 

56 data = resp.content 

57 session_cmd.close() 

58 if data: 

59 self._logger.debug("Response DATA: {}".format(json.loads(data.decode()))) 

60 return json.loads(data.decode()) 

61 return None 

62 

63 def delete_cmd(self, endpoint): 

64 session_cmd = requests.Session() 

65 requests_cmd = self._get_requests_cmd(endpoint) 

66 requests_cmd.method = "DELETE" 

67 self._logger.info( 

68 "Request METHOD: {} URL: {}".format("DELETE", self._url + endpoint) 

69 ) 

70 requests_cmd = requests_cmd.prepare() 

71 resp = session_cmd.send(requests_cmd, verify=False) 

72 http_code = resp.status_code 

73 self._logger.info("Response HTTPCODE: {}".format(http_code)) 

74 data = resp.content 

75 session_cmd.close() 

76 if data: 

77 self._logger.debug("Response DATA: {}".format(json.loads(data.decode()))) 

78 return json.loads(data.decode()) 

79 return None 

80 

81 def post_cmd( 

82 self, 

83 endpoint="", 

84 postfields_dict=None, 

85 formfile=None, 

86 ): 

87 session_cmd = requests.Session() 

88 requests_cmd = self._get_requests_cmd(endpoint) 

89 requests_cmd.method = "POST" 

90 

91 if postfields_dict is not None: 

92 requests_cmd.json = json.dumps(postfields_dict) 

93 

94 if formfile is not None: 

95 requests_cmd.files = {formfile[0]: formfile[1]} 

96 

97 self._logger.info( 

98 "Request METHOD: {} URL: {}".format("POST", self._url + endpoint) 

99 ) 

100 requests_cmd = requests_cmd.prepare() 

101 resp = session_cmd.send(requests_cmd, verify=False) 

102 http_code = resp.status_code 

103 self._logger.info("Response HTTPCODE: {}".format(http_code)) 

104 data = resp.content 

105 session_cmd.close() 

106 if data: 

107 self._logger.debug("Response DATA: {}".format(json.loads(data.decode()))) 

108 return json.loads(data.decode()) 

109 return None