Fix improper restriction of XMl External Entity Reference, by using lxml
[osm/MON.git] / osm_mon / dashboarder / backends / grafana.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
5
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
8
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
12
13 # http://www.apache.org/licenses/LICENSE-2.0
14
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
19 # under the License.
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact: glavado@whitestack.com or fbravo@whitestack.com
22 ##
23 import logging
24 import requests
25 import base64
26 import json
27 from osm_mon.core.config import Config
28
29 log = logging.getLogger(__name__)
30
31
32 class GrafanaBackend:
33 def __init__(self, config: Config):
34 self.conf = config
35 self.url = config.get("grafana", "url")
36 grafana_user = config.get("grafana", "user")
37 grafana_password = config.get("grafana", "password")
38 self.headers = {
39 "content-type": "application/json",
40 "authorization": "Basic %s"
41 % base64.b64encode(
42 (grafana_user + ":" + grafana_password).encode("utf-8")
43 ).decode(),
44 }
45
46 def get_all_dashboard_uids(self):
47 # Gets only dashboards that were automated by OSM (with tag 'osm_automated')
48 response = requests.request(
49 "GET", self.url + "/api/search?tag=osm_automated", headers=self.headers
50 )
51 dashboards = response.json()
52 dashboard_uids = []
53 for dashboard in dashboards:
54 dashboard_uids.append(dashboard["uid"])
55 log.debug("Searching for all dashboard uids: %s", dashboard_uids)
56 return dashboard_uids
57
58 def get_all_datasource_names(self, datasource_name_substr):
59 # Gets only dashboards that were created for prom-operator
60 response = requests.request(
61 "GET", self.url + "/api/datasources", headers=self.headers
62 )
63 datasources = response.json()
64 datasource_names = []
65 for datasource in datasources:
66 if datasource["name"].startswith(datasource_name_substr):
67 datasource_names.append(datasource["name"])
68 log.debug("Searching for all datasource names: %s", datasource_names)
69 return datasource_names
70
71 def get_dashboard_status(self, uid):
72 response = requests.request(
73 "GET", self.url + "/api/dashboards/uid/" + uid, headers=self.headers
74 )
75 log.debug("Searching for dashboard result: %s", response.text)
76 return response
77
78 def create_dashboard(
79 self, uid, name, json_file, project_name=None, datasource_name=None
80 ):
81 try:
82 with open(json_file) as f:
83 dashboard_data = f.read()
84
85 dashboard_data = dashboard_data.replace("OSM_ID", uid).replace(
86 "OSM_NAME", name
87 )
88 if datasource_name:
89 dashboard_data = dashboard_data.replace(
90 "OSM_DATASOURCE_NAME", datasource_name
91 )
92 dashboard_json_data = json.loads(dashboard_data)
93 # Get folder id
94 if project_name:
95 folder_name = project_name
96 else:
97 folder_name = name
98 response_folder_id = requests.request(
99 "GET",
100 self.url + "/api/folders/{}".format(folder_name),
101 headers=self.headers,
102 )
103 if response_folder_id.status_code == 200:
104 folder_id = json.loads(response_folder_id.text)["id"]
105 dashboard_json_data["folderId"] = folder_id
106 dashboard_json_data["overwrite"] = False
107
108 response = self.send_request_for_creating_dashboard(dashboard_json_data)
109
110 # Admin dashboard will be created if already exists. Rest will remain same.
111 if json.loads(response.text).get("status") == "name-exists":
112 # Delete any previous project-admin dashboard if it already exist.
113 if name == "admin":
114 self.delete_admin_dashboard()
115 response = self.send_request_for_creating_dashboard(
116 dashboard_json_data
117 )
118 else:
119 return
120
121 # Get team id
122 if project_name is not None:
123 name = project_name
124 response_team = requests.request(
125 "GET",
126 self.url + "/api/teams/search?name={}".format(name),
127 headers=self.headers,
128 )
129
130 # Remove default permissions of admin user's dashboard so that it is not visible to non-admin users
131 if len(json.loads(response_team.text)["teams"]) == 0:
132 # As team information is not available so it is admin user
133 dahboard_id = json.loads(response.text)["id"]
134 requests.request(
135 "POST",
136 self.url + "/api/dashboards/id/{}/permissions".format(dahboard_id),
137 headers=self.headers,
138 )
139
140 log.info("Dashboard %s is created in Grafana", name)
141 return response
142 except Exception:
143 log.exception("Exception processing message: ")
144
145 def create_datasource(self, datasource_name, datasource_type, datasource_url):
146 try:
147 datasource_data = {
148 "name": datasource_name,
149 "type": datasource_type,
150 "url": datasource_url,
151 "access": "proxy",
152 "readOnly": False,
153 "basicAuth": False,
154 }
155 response = requests.request(
156 "POST",
157 self.url + "/api/datasources",
158 data=json.dumps(datasource_data),
159 headers=self.headers,
160 )
161 log.info("Datasource %s is created in Grafana", datasource_name)
162 log.info("************* response: {}".format(response.__dict__))
163 return response
164 except Exception:
165 log.exception("Exception processing request for creating datasource: ")
166
167 def send_request_for_creating_dashboard(self, dashboard_data):
168 response = requests.request(
169 "POST",
170 self.url + "/api/dashboards/db/",
171 data=json.dumps(dashboard_data),
172 headers=self.headers,
173 )
174 return response
175
176 def delete_dashboard(self, uid):
177 response = requests.request(
178 "DELETE", self.url + "/api/dashboards/uid/" + uid, headers=self.headers
179 )
180 log.debug("Dashboard %s deleted from Grafana", uid)
181 return response
182
183 def delete_datasource(self, datasource_name):
184 response = requests.request(
185 "DELETE",
186 self.url + "/api/datasources/name/" + datasource_name,
187 headers=self.headers,
188 )
189 log.debug("Datasource %s deleted from Grafana", datasource_name)
190 return response
191
192 def delete_admin_dashboard(self):
193 requests.request(
194 "DELETE",
195 self.url + "/api/dashboards/db/osm-project-status-admin",
196 headers=self.headers,
197 )
198 log.debug("Dashboard osm-project-status-admin deleted from Grafana")
199
200 def create_grafana_users(self, user):
201 email = "{}@osm.etsi.org".format(user)
202 user_payload = {
203 "name": user,
204 "email": email,
205 "login": user,
206 "password": user,
207 }
208 response_users = requests.request(
209 "POST",
210 self.url + "/api/admin/users/",
211 json=user_payload,
212 headers=self.headers,
213 )
214 json_data = json.loads(response_users.text)
215 url = "/api/org/users/{}/".format(json_data["id"])
216 permission_payload = {
217 "role": "Editor",
218 }
219 requests.request(
220 "PATCH", self.url + url, json=permission_payload, headers=self.headers
221 )
222 log.info("New user %s created in Grafana", user)
223 return response_users
224
225 # Get Grafana users
226 def get_grafana_users(self):
227 response_users = requests.request(
228 "GET",
229 self.url + "/api/users",
230 headers=self.headers,
231 )
232 user_list = []
233 users = json.loads(response_users.text)
234 for user in users:
235 if user["name"] and user["name"] != "admin":
236 user_list.append(user["name"])
237 return user_list
238
239 # Create Grafana team with member
240 def create_grafana_teams_members(
241 self, project_name, user_name, is_admin, proj_list
242 ):
243 # Check if user exist in Grafana
244 user_response = requests.request(
245 "GET",
246 self.url + "/api/users/lookup?loginOrEmail={}".format(user_name),
247 headers=self.headers,
248 )
249 user_obj = json.loads(user_response.text)
250 if user_response.status_code != 200:
251 user_response = self.create_grafana_users(user_name)
252 user_obj = json.loads(user_response.text)
253
254 user_id = user_obj["id"]
255
256 # Get teams for user
257 team_objs = requests.request(
258 "GET",
259 self.url + "/api/users/{}/teams".format(user_id),
260 headers=self.headers,
261 )
262 team_obj = json.loads(team_objs.text)
263 team_list = []
264 if len(team_obj):
265 for team in team_obj:
266 team_list.append(team["name"])
267
268 proj_unlink = set(team_list) - set(proj_list)
269 for prj in proj_unlink:
270 response_team = requests.request(
271 "GET",
272 self.url + "/api/teams/search?name={}".format(prj),
273 headers=self.headers,
274 )
275 team_id = json.loads(response_team.text)["teams"][0]["id"]
276 requests.request(
277 "DELETE",
278 self.url + "/api/teams/{}/members/{}".format(team_id, user_id),
279 headers=self.headers,
280 )
281 if project_name != "admin":
282 # Add member to team
283 response_team = requests.request(
284 "GET",
285 self.url + "/api/teams/search?name={}".format(project_name),
286 headers=self.headers,
287 )
288
289 # Search if team in Grafana corresponding to the project already exists
290 if not json.loads(response_team.text)["teams"]:
291 self.create_grafana_teams(project_name)
292 response_team = requests.request(
293 "GET",
294 self.url + "/api/teams/search?name={}".format(project_name),
295 headers=self.headers,
296 )
297 team_id = json.loads(response_team.text)["teams"][0]["id"]
298 if project_name not in team_list:
299 # Create a team in Grafana corresponding to the project as it doesn't exist
300 member_payload = {"userId": user_id}
301 requests.request(
302 "POST",
303 self.url + "/api/teams/{}/members".format(team_id),
304 json=member_payload,
305 headers=self.headers,
306 )
307 # Check if user role or project name is admin
308 if is_admin or project_name == "admin":
309 # Give admin righsts to user
310 url = "/api/org/users/{}/".format(user_id)
311 permission_payload = {
312 "role": "Admin",
313 }
314 requests.request(
315 "PATCH", self.url + url, json=permission_payload, headers=self.headers
316 )
317 log.info("User %s is assigned Admin permission", user_name)
318 else:
319 # Give editor rights to user
320 url = "/api/org/users/{}/".format(user_id)
321 permission_payload = {
322 "role": "Editor",
323 }
324 requests.request(
325 "PATCH", self.url + url, json=permission_payload, headers=self.headers
326 )
327 log.info("User %s is assigned Editor permission", user_name)
328
329 # Create team in Grafana
330 def create_grafana_teams(self, team_name):
331 team_payload = {
332 "name": team_name,
333 }
334 requests.request(
335 "POST", self.url + "/api/teams", json=team_payload, headers=self.headers
336 )
337 log.info("New team %s created in Grafana", team_name)
338
339 # Create folder in Grafana
340 def create_grafana_folders(self, folder_name):
341 folder_payload = {"uid": folder_name, "title": folder_name}
342 requests.request(
343 "POST", self.url + "/api/folders", json=folder_payload, headers=self.headers
344 )
345 log.info("Dashboard folder %s created", folder_name)
346
347 response_team = requests.request(
348 "GET",
349 self.url + "/api/teams/search?name={}".format(folder_name),
350 headers=self.headers,
351 )
352 # Create team if it doesn't already exists
353 if len(json.loads(response_team.text)["teams"]) == 0:
354 self.create_grafana_teams(folder_name)
355 response_team = requests.request(
356 "GET",
357 self.url + "/api/teams/search?name={}".format(folder_name),
358 headers=self.headers,
359 )
360 # Assign required permission to the team's folder
361 team_id = json.loads(response_team.text)["teams"][0]["id"]
362 permission_data = {
363 "items": [
364 {"teamId": team_id, "permission": 2},
365 ]
366 }
367 requests.request(
368 "POST",
369 self.url + "/api/folders/{}/permissions".format(folder_name),
370 json=permission_data,
371 headers=self.headers,
372 )
373
374 # delete user from grafana
375 def delete_grafana_users(self, user_name):
376 # Get user id
377 response_id = requests.request(
378 "GET",
379 self.url + "/api/users/lookup?loginOrEmail={}".format(user_name),
380 headers=self.headers,
381 )
382 try:
383 user_id = json.loads(response_id.text)["id"]
384 except Exception:
385 log.exception("Exception processing message: ")
386 # Delete user
387 response = requests.request(
388 "DELETE",
389 self.url + "/api/admin/users/{}".format(user_id),
390 headers=self.headers,
391 )
392 log.info("User %s deleted in Grafana", user_name)
393 return response
394
395 # delete team from grafana
396 def delete_grafana_team(self, project_name):
397 # Delete Grafana folder
398 requests.request(
399 "DELETE",
400 self.url + "/api/folders/{}".format(project_name),
401 headers=self.headers,
402 )
403 # Delete Grafana team
404 team_obj = requests.request(
405 "GET",
406 self.url + "/api/teams/search?name={}".format(project_name),
407 headers=self.headers,
408 )
409 team_id = json.loads(team_obj.text)["teams"][0]["id"]
410 response = requests.request(
411 "DELETE", self.url + "/api/teams/{}".format(team_id), headers=self.headers
412 )
413 log.info("Team %s deleted in Grafana", project_name)
414 return response
415
416 # update grafana team
417 def update_grafana_teams(self, project_new_name, project_old_name):
418 team_obj = requests.request(
419 "GET",
420 self.url + "/api/teams/search?name={}".format(project_old_name),
421 headers=self.headers,
422 )
423 team_id = json.loads(team_obj.text)["teams"][0]["id"]
424 data = {
425 "name": project_new_name,
426 }
427 response = requests.request(
428 "PUT",
429 self.url + "/api/teams/{}".format(team_id),
430 json=data,
431 headers=self.headers,
432 )
433 log.info("Grafana team updated %s", response.text)
434 return response
435
436 # remove member from grafana team
437 def remove_grafana_team_member(self, user_name, project_data):
438 # Get user id
439 response_id = requests.request(
440 "GET",
441 self.url + "/api/users/lookup?loginOrEmail={}".format(user_name),
442 headers=self.headers,
443 )
444 user_id = json.loads(response_id.text)["id"]
445 for project in project_data:
446 # Get team id
447 team_obj = requests.request(
448 "GET",
449 self.url + "/api/teams/search?name={}".format(project["project"]),
450 headers=self.headers,
451 )
452 team_id = json.loads(team_obj.text)["teams"][0]["id"]
453 response = requests.request(
454 "DELETE",
455 self.url + "/api/teams/{}/members/{}".format(team_id, user_id),
456 headers=self.headers,
457 )
458 return response