Coverage for osm_nbi/utils.py: 91%

44 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2024-06-27 02:46 +0000

1# -*- coding: utf-8 -*- 

2 

3# Copyright 2020 Whitestack, LLC 

4# ************************************************************* 

5# 

6# This file is part of OSM NBI module 

7# All Rights Reserved to Whitestack, LLC 

8# 

9# Licensed under the Apache License, Version 2.0 (the "License"); you may 

10# not use this file except in compliance with the License. You may obtain 

11# a copy of the License at 

12# 

13# http://www.apache.org/licenses/LICENSE-2.0 

14# 

15# Unless required by applicable law or agreed to in writing, software 

16# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

17# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

18# License for the specific language governing permissions and limitations 

19# under the License. 

20# 

21# For those usages not covered by the Apache License, Version 2.0 please 

22# contact: fbravo@whitestack.com or agarcia@whitestack.com 

23## 

24from cefevent import CEFEvent 

25from osm_nbi import version 

26 

27 

28def find_in_list(the_list, condition_lambda): 

29 for item in the_list: 

30 if condition_lambda(item): 

31 return item 

32 else: 

33 return None 

34 

35 

36def filter_in_list(the_list, condition_lambda): 

37 ret = [] 

38 for item in the_list: 

39 if condition_lambda(item): 

40 ret.append(item) 

41 return ret 

42 

43 

44def find_index_in_list(the_list, condition_lambda): 

45 for index, item in enumerate(the_list): 

46 if condition_lambda(item): 

47 return index 

48 else: 

49 return -1 

50 

51 

52def deep_update_dict(data, updated_data): 

53 if isinstance(data, list): 

54 processed_items_data = [] 

55 for index, item in enumerate(data): 

56 processed_items_data.append(deep_update_dict(item, updated_data[index])) 

57 return processed_items_data 

58 

59 if isinstance(data, dict): 

60 for key in data.keys(): 

61 if key in updated_data: 

62 if not isinstance(data[key], dict) and not isinstance(data[key], list): 

63 data[key] = updated_data[key] 

64 else: 

65 data[key] = deep_update_dict(data[key], updated_data[key]) 

66 return data 

67 

68 return data 

69 

70 

71def cef_event(cef_logger, cef_fields): 

72 for key, value in cef_fields.items(): 

73 cef_logger.set_field(key, value) 

74 

75 

76def cef_event_builder(config): 

77 cef_logger = CEFEvent() 

78 cef_fields = { 

79 "version": config["version"], 

80 "deviceVendor": config["deviceVendor"], 

81 "deviceProduct": config["deviceProduct"], 

82 "deviceVersion": get_version(), 

83 "message": "CEF Logger", 

84 "sourceUserName": "admin", 

85 "severity": 1, 

86 } 

87 cef_event(cef_logger, cef_fields) 

88 cef_logger.build_cef() 

89 return cef_logger 

90 

91 

92def get_version(): 

93 osm_version = version.split("+") 

94 return osm_version[0]