| # -*- coding: utf-8 -*- |
| |
| ## |
| # Copyright 2015 Telefonica Investigacion y Desarrollo, S.A.U. |
| # This file is part of openmano |
| # All Rights Reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may |
| # not use this file except in compliance with the License. You may obtain |
| # a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations |
| # under the License. |
| # |
| # For those usages not covered by the Apache License, Version 2.0 please |
| # contact with: nfvlabs@tid.es |
| ## |
| |
| ''' |
| utils is a module that implements functions that are used by all openmano modules, |
| dealing with aspects such as reading/writing files, formatting inputs/outputs for quick translation |
| from dictionaries to appropriate database dictionaries, etc. |
| ''' |
| __author__="Alfonso Tierno, Gerardo Garcia" |
| __date__ ="$08-sep-2014 12:21:22$" |
| |
| import datetime |
| import time |
| import warnings |
| from functools import reduce, partial, wraps |
| from itertools import tee |
| |
| import six |
| from six.moves import filter, filterfalse |
| |
| from jsonschema import exceptions as js_e |
| from jsonschema import validate as js_v |
| |
| if six.PY3: |
| from inspect import getfullargspec as getspec |
| else: |
| from inspect import getargspec as getspec |
| |
| #from bs4 import BeautifulSoup |
| |
| def read_file(file_to_read): |
| """Reads a file specified by 'file_to_read' and returns (True,<its content as a string>) in case of success or (False, <error message>) in case of failure""" |
| try: |
| f = open(file_to_read, 'r') |
| read_data = f.read() |
| f.close() |
| except Exception as e: |
| return (False, str(e)) |
| |
| return (True, read_data) |
| |
| def write_file(file_to_write, text): |
| """Write a file specified by 'file_to_write' and returns (True,NOne) in case of success or (False, <error message>) in case of failure""" |
| try: |
| f = open(file_to_write, 'w') |
| f.write(text) |
| f.close() |
| except Exception as e: |
| return (False, str(e)) |
| |
| return (True, None) |
| |
| def format_in(http_response, schema): |
| try: |
| client_data = http_response.json() |
| js_v(client_data, schema) |
| #print "Input data: ", str(client_data) |
| return True, client_data |
| except js_e.ValidationError as exc: |
| print "validate_in error, jsonschema exception ", exc.message, "at", exc.path |
| return False, ("validate_in error, jsonschema exception ", exc.message, "at", exc.path) |
| |
| def remove_extra_items(data, schema): |
| deleted = [] |
| if type(data) is tuple or type(data) is list: |
| for d in data: |
| a = remove_extra_items(d, schema['items']) |
| if a is not None: |
| deleted.append(a) |
| elif type(data) is dict: |
| # TODO deal with patternProperties |
| if 'properties' not in schema: |
| return None |
| for k in data.keys(): |
| if k in schema['properties'].keys(): |
| a = remove_extra_items(data[k], schema['properties'][k]) |
| if a is not None: |
| deleted.append({k: a}) |
| elif not schema.get('additionalProperties'): |
| del data[k] |
| deleted.append(k) |
| if len(deleted) == 0: |
| return None |
| elif len(deleted) == 1: |
| return deleted[0] |
| |
| return deleted |
| |
| #def format_html2text(http_content): |
| # soup=BeautifulSoup(http_content) |
| # text = soup.p.get_text() + " " + soup.pre.get_text() |
| # return text |
| |
| |
| def delete_nulls(var): |
| if type(var) is dict: |
| for k in var.keys(): |
| if var[k] is None: del var[k] |
| elif type(var[k]) is dict or type(var[k]) is list or type(var[k]) is tuple: |
| if delete_nulls(var[k]): del var[k] |
| if len(var) == 0: return True |
| elif type(var) is list or type(var) is tuple: |
| for k in var: |
| if type(k) is dict: delete_nulls(k) |
| if len(var) == 0: return True |
| return False |
| |
| |
| def convert_bandwidth(data, reverse=False): |
| '''Check the field bandwidth recursivelly and when found, it removes units and convert to number |
| It assumes that bandwidth is well formed |
| Attributes: |
| 'data': dictionary bottle.FormsDict variable to be checked. None or empty is consideted valid |
| 'reverse': by default convert form str to int (Mbps), if True it convert from number to units |
| Return: |
| None |
| ''' |
| if type(data) is dict: |
| for k in data.keys(): |
| if type(data[k]) is dict or type(data[k]) is tuple or type(data[k]) is list: |
| convert_bandwidth(data[k], reverse) |
| if "bandwidth" in data: |
| try: |
| value=str(data["bandwidth"]) |
| if not reverse: |
| pos = value.find("bps") |
| if pos>0: |
| if value[pos-1]=="G": data["bandwidth"] = int(data["bandwidth"][:pos-1]) * 1000 |
| elif value[pos-1]=="k": data["bandwidth"]= int(data["bandwidth"][:pos-1]) / 1000 |
| else: data["bandwidth"]= int(data["bandwidth"][:pos-1]) |
| else: |
| value = int(data["bandwidth"]) |
| if value % 1000 == 0: data["bandwidth"]=str(value/1000) + " Gbps" |
| else: data["bandwidth"]=str(value) + " Mbps" |
| except: |
| print "convert_bandwidth exception for type", type(data["bandwidth"]), " data", data["bandwidth"] |
| return |
| if type(data) is tuple or type(data) is list: |
| for k in data: |
| if type(k) is dict or type(k) is tuple or type(k) is list: |
| convert_bandwidth(k, reverse) |
| |
| def convert_float_timestamp2str(var): |
| '''Converts timestamps (created_at, modified_at fields) represented as float |
| to a string with the format '%Y-%m-%dT%H:%i:%s' |
| It enters recursively in the dict var finding this kind of variables |
| ''' |
| if type(var) is dict: |
| for k,v in var.items(): |
| if type(v) is float and k in ("created_at", "modified_at"): |
| var[k] = time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(v) ) |
| elif type(v) is dict or type(v) is list or type(v) is tuple: |
| convert_float_timestamp2str(v) |
| if len(var) == 0: return True |
| elif type(var) is list or type(var) is tuple: |
| for v in var: |
| convert_float_timestamp2str(v) |
| |
| def convert_datetime2str(var): |
| '''Converts a datetime variable to a string with the format '%Y-%m-%dT%H:%i:%s' |
| It enters recursively in the dict var finding this kind of variables |
| ''' |
| if type(var) is dict: |
| for k,v in var.items(): |
| if type(v) is datetime.datetime: |
| var[k]= v.strftime('%Y-%m-%dT%H:%M:%S') |
| elif type(v) is dict or type(v) is list or type(v) is tuple: |
| convert_datetime2str(v) |
| if len(var) == 0: return True |
| elif type(var) is list or type(var) is tuple: |
| for v in var: |
| convert_datetime2str(v) |
| |
| def convert_str2boolean(data, items): |
| '''Check recursively the content of data, and if there is an key contained in items, convert value from string to boolean |
| Done recursively |
| Attributes: |
| 'data': dictionary variable to be checked. None or empty is considered valid |
| 'items': tuple of keys to convert |
| Return: |
| None |
| ''' |
| if type(data) is dict: |
| for k in data.keys(): |
| if type(data[k]) is dict or type(data[k]) is tuple or type(data[k]) is list: |
| convert_str2boolean(data[k], items) |
| if k in items: |
| if type(data[k]) is str: |
| if data[k]=="false" or data[k]=="False": data[k]=False |
| elif data[k]=="true" or data[k]=="True": data[k]=True |
| if type(data) is tuple or type(data) is list: |
| for k in data: |
| if type(k) is dict or type(k) is tuple or type(k) is list: |
| convert_str2boolean(k, items) |
| |
| def check_valid_uuid(uuid): |
| id_schema = {"type" : "string", "pattern": "^[a-fA-F0-9]{8}(-[a-fA-F0-9]{4}){3}-[a-fA-F0-9]{12}$"} |
| id_schema2 = {"type" : "string", "pattern": "^[a-fA-F0-9]{32}$"} |
| try: |
| js_v(uuid, id_schema) |
| return True |
| except js_e.ValidationError: |
| try: |
| js_v(uuid, id_schema2) |
| return True |
| except js_e.ValidationError: |
| return False |
| return False |
| |
| |
| def expand_brackets(text): |
| """ |
| Change a text with TEXT[ABC..] into a list with [TEXTA, TEXTB, TEXC, ... |
| if no bracket is used it just return the a list with the single text |
| It uses recursivity to allow several [] in the text |
| :param text: |
| :return: |
| """ |
| if text is None: |
| return (None, ) |
| start = text.find("[") |
| end = text.find("]") |
| if start < 0 or end < 0: |
| return [text] |
| text_list = [] |
| for char in text[start+1:end]: |
| text_list += expand_brackets(text[:start] + char + text[end+1:]) |
| return text_list |
| |
| def deprecated(message): |
| def deprecated_decorator(func): |
| def deprecated_func(*args, **kwargs): |
| warnings.warn("{} is a deprecated function. {}".format(func.__name__, message), |
| category=DeprecationWarning, |
| stacklevel=2) |
| warnings.simplefilter('default', DeprecationWarning) |
| return func(*args, **kwargs) |
| return deprecated_func |
| return deprecated_decorator |
| |
| |
| def truncate(text, max_length=1024): |
| """Limit huge texts in number of characters""" |
| text = str(text) |
| if text and len(text) >= max_length: |
| return text[:max_length//2-3] + " ... " + text[-max_length//2+3:] |
| return text |
| |
| |
| def merge_dicts(*dicts, **kwargs): |
| """Creates a new dict merging N others and keyword arguments. |
| Right-most dicts take precedence. |
| Keyword args take precedence. |
| """ |
| return reduce( |
| lambda acc, x: acc.update(x) or acc, |
| list(dicts) + [kwargs], {}) |
| |
| |
| def remove_none_items(adict): |
| """Return a similar dict without keys associated to None values""" |
| return {k: v for k, v in adict.items() if v is not None} |
| |
| |
| def filter_dict_keys(adict, allow): |
| """Return a similar dict, but just containing the explicitly allowed keys |
| |
| Arguments: |
| adict (dict): Simple python dict data struct |
| allow (list): Explicits allowed keys |
| """ |
| return {k: v for k, v in adict.items() if k in allow} |
| |
| |
| def filter_out_dict_keys(adict, deny): |
| """Return a similar dict, but not containing the explicitly denied keys |
| |
| Arguments: |
| adict (dict): Simple python dict data struct |
| deny (list): Explicits denied keys |
| """ |
| return {k: v for k, v in adict.items() if k not in deny} |
| |
| |
| def expand_joined_fields(record): |
| """Given a db query result, explode the fields that contains `.` (join |
| operations). |
| |
| Example |
| >> expand_joined_fiels({'wim.id': 2}) |
| # {'wim': {'id': 2}} |
| """ |
| result = {} |
| for field, value in record.items(): |
| keys = field.split('.') |
| target = result |
| target = reduce(lambda target, key: target.setdefault(key, {}), |
| keys[:-1], result) |
| target[keys[-1]] = value |
| |
| return result |
| |
| |
| def ensure(condition, exception): |
| """Raise an exception if condition is not met""" |
| if not condition: |
| raise exception |
| |
| |
| def partition(predicate, iterable): |
| """Create two derived iterators from a single one |
| The first iterator created will loop thought the values where the function |
| predicate is True, the second one will iterate over the values where it is |
| false. |
| """ |
| iterable1, iterable2 = tee(iterable) |
| return filter(predicate, iterable2), filterfalse(predicate, iterable1) |
| |
| |
| def pipe(*functions): |
| """Compose functions of one argument in the opposite order, |
| So pipe(f, g)(x) = g(f(x)) |
| """ |
| return lambda x: reduce(lambda acc, f: f(acc), functions, x) |
| |
| |
| def compose(*functions): |
| """Compose functions of one argument, |
| So compose(f, g)(x) = f(g(x)) |
| """ |
| return lambda x: reduce(lambda acc, f: f(acc), functions[::-1], x) |
| |
| |
| def safe_get(target, key_path, default=None): |
| """Given a path of keys (eg.: "key1.key2.key3"), return a nested value in |
| a nested dict if present, or the default value |
| """ |
| keys = key_path.split('.') |
| target = reduce(lambda acc, key: acc.get(key) or {}, keys[:-1], target) |
| return target.get(keys[-1], default) |
| |
| |
| class Attempt(object): |
| """Auxiliary class to be used in an attempt to retry executing a failing |
| procedure |
| |
| Attributes: |
| count (int): 0-based "retries" counter |
| max_attempts (int): maximum number of "retries" allowed |
| info (dict): extra information about the specific attempt |
| (can be used to produce more meaningful error messages) |
| """ |
| __slots__ = ('count', 'max', 'info') |
| |
| MAX = 3 |
| |
| def __init__(self, count=0, max_attempts=MAX, info=None): |
| self.count = count |
| self.max = max_attempts |
| self.info = info or {} |
| |
| @property |
| def countdown(self): |
| """Like count, but in the opposite direction""" |
| return self.max - self.count |
| |
| @property |
| def number(self): |
| """1-based counter""" |
| return self.count + 1 |
| |
| |
| def inject_args(fn=None, **args): |
| """Partially apply keyword arguments in a function, but only if the function |
| define them in the first place |
| """ |
| if fn is None: # Allows calling the decorator directly or with parameters |
| return partial(inject_args, **args) |
| |
| spec = getspec(fn) |
| return wraps(fn)(partial(fn, **filter_dict_keys(args, spec.args))) |
| |
| |
| def get_arg(name, fn, args, kwargs): |
| """Find the value of an argument for a function, given its argument list. |
| |
| This function can be used to display more meaningful errors for debugging |
| """ |
| if name in kwargs: |
| return kwargs[name] |
| |
| spec = getspec(fn) |
| if name in spec.args: |
| i = spec.args.index(name) |
| return args[i] if i < len(args) else None |
| |
| return None |