Code Coverage

Cobertura Coverage Report > osmclient.common >

http.py

Trend

Classes100%
 
Lines45%
   
Conditionals100%
 

File Coverage summary

NameClassesLinesConditionals
http.py
100%
1/1
45%
31/69
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
http.py
45%
31/69
N/A

Source

osmclient/common/http.py
1 # Copyright 2017 Sandvine
2 #
3 # All Rights Reserved.
4 #
5 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
6 #    not use this file except in compliance with the License. You may obtain
7 #    a copy of the License at
8 #
9 #         http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #    Unless required by applicable law or agreed to in writing, software
12 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 #    License for the specific language governing permissions and limitations
15 #    under the License.
16
17 1 from io import BytesIO
18 1 import pycurl
19 1 import json
20 1 import logging
21
22
23 1 class Http(object):
24 1     def __init__(self, url, user="admin", password="admin"):
25 1         self._url = url
26 1         self._user = user
27 1         self._password = password
28 1         self._http_header = None
29 1         self._logger = logging.getLogger("osmclient")
30
31 1     def set_http_header(self, header):
32 1         self._http_header = header
33
34 1     def _get_curl_cmd(self, endpoint):
35 1         curl_cmd = pycurl.Curl()
36 1         curl_cmd.setopt(pycurl.URL, self._url + endpoint)
37 1         curl_cmd.setopt(pycurl.SSL_VERIFYPEER, 0)
38 1         curl_cmd.setopt(pycurl.SSL_VERIFYHOST, 0)
39 1         curl_cmd.setopt(pycurl.USERPWD, "{}:{}".format(self._user, self._password))
40 1         if self._http_header:
41 1             curl_cmd.setopt(pycurl.HTTPHEADER, self._http_header)
42 1         return curl_cmd
43
44 1     def get_cmd(self, endpoint):
45 1         data = BytesIO()
46 1         curl_cmd = self._get_curl_cmd(endpoint)
47 1         curl_cmd.setopt(pycurl.HTTPGET, 1)
48 1         curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
49 1         self._logger.info(
50             "Request METHOD: {} URL: {}".format("GET", self._url + endpoint)
51         )
52 1         curl_cmd.perform()
53 0         http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
54 0         self._logger.info("Response HTTPCODE: {}".format(http_code))
55 0         curl_cmd.close()
56 0         if data.getvalue():
57 0             self._logger.debug(
58                 "Response DATA: {}".format(json.loads(data.getvalue().decode()))
59             )
60 0             return json.loads(data.getvalue().decode())
61 0         return None
62
63 1     def delete_cmd(self, endpoint):
64 0         data = BytesIO()
65 0         curl_cmd = self._get_curl_cmd(endpoint)
66 0         curl_cmd.setopt(pycurl.CUSTOMREQUEST, "DELETE")
67 0         curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
68 0         self._logger.info(
69             "Request METHOD: {} URL: {}".format("DELETE", self._url + endpoint)
70         )
71 0         curl_cmd.perform()
72 0         http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
73 0         self._logger.info("Response HTTPCODE: {}".format(http_code))
74 0         curl_cmd.close()
75 0         if data.getvalue():
76 0             self._logger.debug(
77                 "Response DATA: {}".format(json.loads(data.getvalue().decode()))
78             )
79 0             return json.loads(data.getvalue().decode())
80 0         return None
81
82 1     def post_cmd(
83         self,
84         endpoint="",
85         postfields_dict=None,
86         formfile=None,
87     ):
88 0         data = BytesIO()
89 0         curl_cmd = self._get_curl_cmd(endpoint)
90 0         curl_cmd.setopt(pycurl.POST, 1)
91 0         curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
92
93 0         if postfields_dict is not None:
94 0             jsondata = json.dumps(postfields_dict)
95 0             curl_cmd.setopt(pycurl.POSTFIELDS, jsondata)
96
97 0         if formfile is not None:
98 0             curl_cmd.setopt(
99                 pycurl.HTTPPOST, [((formfile[0], (pycurl.FORM_FILE, formfile[1])))]
100             )
101
102 0         self._logger.info(
103             "Request METHOD: {} URL: {}".format("POST", self._url + endpoint)
104         )
105 0         curl_cmd.perform()
106 0         http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
107 0         self._logger.info("Response HTTPCODE: {}".format(http_code))
108 0         curl_cmd.close()
109 0         if data.getvalue():
110 0             self._logger.debug(
111                 "Response DATA: {}".format(json.loads(data.getvalue().decode()))
112             )
113 0             return json.loads(data.getvalue().decode())
114 0         return None