From: tierno Date: Thu, 19 Apr 2018 14:01:59 +0000 (+0200) Subject: initial commit X-Git-Tag: v4.0.0~19 X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=5c012612f1292b012dc12b337c611c6f3c083b18;p=osm%2Fcommon.git initial commit Change-Id: Ia40148fdc2cabbbacb0b67aaed8442ed0ecf0bc2 Signed-off-by: tierno --- diff --git a/.gitignore-common b/.gitignore-common new file mode 100644 index 0000000..a1e47df --- /dev/null +++ b/.gitignore-common @@ -0,0 +1,35 @@ +# This is a template with common files to be igonored, after clone make a copy to .gitignore +# cp .gitignore-common .gitignore + +*.pyc +*.pyo + +#auto-ignore +.gitignore + +#logs +logs + +#pycharm +.idea + +#eclipse +.project +.pydevproject +.settings + +#local stuff files that end in ".local" or folders called "local" +*.local +osm_common/local +osm_common/test/local + +#local stuff files that end in ".temp" or folders called "temp" +*.temp +osm_common/temp +osm_common/test/temp + +#distribution and package generation +build +dist +*.egg-info + diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..8dada3e --- /dev/null +++ b/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/README.rst b/README.rst new file mode 100644 index 0000000..3273af4 --- /dev/null +++ b/README.rst @@ -0,0 +1,11 @@ +=========== +osm-common +=========== + +Contains general modules for lightweight build database, storage and message access. +The target is to use same library for OSM modules, in order to easy migration to other technologies, that is +different database or storage object system. +For database: mongo and memory (volatile) are implemented. +For message: Kafka and local file system are implemented. +For storage: only local file system is implemented. + diff --git a/devops-stages/stage-archive.sh b/devops-stages/stage-archive.sh new file mode 100755 index 0000000..727c020 --- /dev/null +++ b/devops-stages/stage-archive.sh @@ -0,0 +1,10 @@ +#!/bin/sh +MDG=common +rm -rf pool +rm -rf dists +mkdir -p pool/$MDG +mv deb_dist/*.deb pool/$MDG/ +mkdir -p dists/unstable/$MDG/binary-amd64/ +apt-ftparchive packages pool/$MDG > dists/unstable/$MDG/binary-amd64/Packages +gzip -9fk dists/unstable/$MDG/binary-amd64/Packages +echo "dists/**,pool/$MDG/*.deb" diff --git a/devops-stages/stage-build.sh b/devops-stages/stage-build.sh new file mode 100755 index 0000000..bf7602b --- /dev/null +++ b/devops-stages/stage-build.sh @@ -0,0 +1,3 @@ +#!/bin/sh +rm -rf deb_dist +tox -e build diff --git a/devops-stages/stage-test.sh b/devops-stages/stage-test.sh new file mode 100755 index 0000000..0333d84 --- /dev/null +++ b/devops-stages/stage-test.sh @@ -0,0 +1,2 @@ +#!/bin/sh +#tox diff --git a/osm_common/__init__.py b/osm_common/__init__.py new file mode 100644 index 0000000..df7b893 --- /dev/null +++ b/osm_common/__init__.py @@ -0,0 +1,2 @@ +version = '0.1.3' +date_version = '2018-04-19' \ No newline at end of file diff --git a/osm_common/dbbase.py b/osm_common/dbbase.py new file mode 100644 index 0000000..aa9c24e --- /dev/null +++ b/osm_common/dbbase.py @@ -0,0 +1,37 @@ +from http import HTTPStatus + +__author__ = "Alfonso Tierno " + + +class DbException(Exception): + + def __init__(self, message, http_code=HTTPStatus.NOT_FOUND): + self.http_code = http_code + Exception.__init__(self, "database exception " + message) + + +class DbBase(object): + + def __init__(self): + pass + + def db_connect(self, config): + pass + + def db_disconnect(self): + pass + + def get_list(self, table, filter={}): + pass + + def get_one(self, table, filter={}, fail_on_empty=True, fail_on_more=True): + pass + + def create(self, table, indata): + pass + + def del_list(self, table, filter={}): + pass + + def del_one(self, table, filter={}, fail_on_empty=True): + pass diff --git a/osm_common/dbmemory.py b/osm_common/dbmemory.py new file mode 100644 index 0000000..cdb0482 --- /dev/null +++ b/osm_common/dbmemory.py @@ -0,0 +1,124 @@ +import logging +from dbbase import DbException, DbBase +from http import HTTPStatus +from uuid import uuid4 +from copy import deepcopy + +__author__ = "Alfonso Tierno " + + +class DbMemory(DbBase): + + def __init__(self, logger_name='db'): + self.logger = logging.getLogger(logger_name) + self.db = {} + + def db_connect(self, config): + if "logger_name" in config: + self.logger = logging.getLogger(config["logger_name"]) + + @staticmethod + def _format_filter(filter): + return filter # TODO + + def _find(self, table, filter): + for i, row in enumerate(self.db.get(table, ())): + match = True + if filter: + for k, v in filter.items(): + if k not in row or v != row[k]: + match = False + if match: + yield i, row + + def get_list(self, table, filter={}): + try: + l = [] + for _, row in self._find(table, self._format_filter(filter)): + l.append(deepcopy(row)) + return l + except DbException: + raise + except Exception as e: # TODO refine + raise DbException(str(e)) + + def get_one(self, table, filter={}, fail_on_empty=True, fail_on_more=True): + try: + l = None + for _, row in self._find(table, self._format_filter(filter)): + if not fail_on_more: + return deepcopy(row) + if l: + raise DbException("Found more than one entry with filter='{}'".format(filter), + HTTPStatus.CONFLICT.value) + l = row + if not l and fail_on_empty: + raise DbException("Not found entry with filter='{}'".format(filter), HTTPStatus.NOT_FOUND) + return deepcopy(l) + except Exception as e: # TODO refine + raise DbException(str(e)) + + def del_list(self, table, filter={}): + try: + id_list = [] + for i, _ in self._find(table, self._format_filter(filter)): + id_list.append(i) + deleted = len(id_list) + for i in id_list: + del self.db[table][i] + return {"deleted": deleted} + except DbException: + raise + except Exception as e: # TODO refine + raise DbException(str(e)) + + def del_one(self, table, filter={}, fail_on_empty=True): + try: + for i, _ in self._find(table, self._format_filter(filter)): + break + else: + if fail_on_empty: + raise DbException("Not found entry with filter='{}'".format(filter), HTTPStatus.NOT_FOUND) + return None + del self.db[table][i] + return {"deleted": 1} + except Exception as e: # TODO refine + raise DbException(str(e)) + + def replace(self, table, filter, indata, fail_on_empty=True): + try: + for i, _ in self._find(table, self._format_filter(filter)): + break + else: + if fail_on_empty: + raise DbException("Not found entry with filter='{}'".format(filter), HTTPStatus.NOT_FOUND) + return None + self.db[table][i] = deepcopy(indata) + return {"upadted": 1} + except Exception as e: # TODO refine + raise DbException(str(e)) + + def create(self, table, indata): + try: + id = indata.get("_id") + if not id: + id = str(uuid4()) + indata["_id"] = id + if table not in self.db: + self.db[table] = [] + self.db[table].append(deepcopy(indata)) + return id + except Exception as e: # TODO refine + raise DbException(str(e)) + + +if __name__ == '__main__': + # some test code + db = dbmemory() + db.create("test", {"_id": 1, "data": 1}) + db.create("test", {"_id": 2, "data": 2}) + db.create("test", {"_id": 3, "data": 3}) + print("must be 3 items:", db.get_list("test")) + print("must return item 2:", db.get_list("test", {"_id": 2})) + db.del_one("test", {"_id": 2}) + print("must be emtpy:", db.get_list("test", {"_id": 2})) diff --git a/osm_common/dbmongo.py b/osm_common/dbmongo.py new file mode 100644 index 0000000..582773a --- /dev/null +++ b/osm_common/dbmongo.py @@ -0,0 +1,191 @@ + +import logging +from pymongo import MongoClient, errors +from dbbase import DbException, DbBase +from http import HTTPStatus +from time import time, sleep + +__author__ = "Alfonso Tierno " + +# TODO consider use this decorator for database access retries +# @retry_mongocall +# def retry_mongocall(call): +# def _retry_mongocall(*args, **kwargs): +# retry = 1 +# while True: +# try: +# return call(*args, **kwargs) +# except pymongo.AutoReconnect as e: +# if retry == 4: +# raise DbException(str(e)) +# sleep(retry) +# return _retry_mongocall + + +class DbMongo(DbBase): + conn_initial_timout = 120 + conn_timout = 10 + + def __init__(self, logger_name='db'): + self.logger = logging.getLogger(logger_name) + + def db_connect(self, config): + try: + if "logger_name" in config: + self.logger = logging.getLogger(config["logger_name"]) + self.client = MongoClient(config["host"], config["port"]) + self.db = self.client[config["name"]] + if "loglevel" in config: + self.logger.setLevel(getattr(logging, config['loglevel'])) + # get data to try a connection + now = time() + while True: + try: + self.db.users.find_one({"username": "admin"}) + return + except errors.ConnectionFailure as e: + if time() - now >= self.conn_initial_timout: + raise + self.logger.info("Waiting to database up {}".format(e)) + sleep(2) + except errors.PyMongoError as e: + raise DbException(str(e)) + + def db_disconnect(self): + pass # TODO + + @staticmethod + def _format_filter(filter): + try: + db_filter = {} + for query_k, query_v in filter.items(): + dot_index = query_k.rfind(".") + if dot_index > 1 and query_k[dot_index+1:] in ("eq", "ne", "gt", "gte", "lt", "lte", "cont", + "ncont", "neq"): + operator = "$" + query_k[dot_index+1:] + if operator == "$neq": + operator = "$ne" + k = query_k[:dot_index] + else: + operator = "$eq" + k = query_k + + v = query_v + if isinstance(v, list): + if operator in ("$eq", "$cont"): + operator = "$in" + v = query_v + elif operator in ("$ne", "$ncont"): + operator = "$nin" + v = query_v + else: + v = query_v.join(",") + + if operator in ("$eq", "$cont"): + # v cannot be a comma separated list, because operator would have been changed to $in + db_filter[k] = v + elif operator == "$ncount": + # v cannot be a comma separated list, because operator would have been changed to $nin + db_filter[k] = {"$ne": v} + else: + # maybe db_filter[k] exist. e.g. in the query string for values between 5 and 8: "a.gt=5&a.lt=8" + if k not in db_filter: + db_filter[k] = {} + db_filter[k][operator] = v + + return db_filter + except Exception as e: + raise DbException("Invalid query string filter at {}:{}. Error: {}".format(query_k, v, e), + http_code=HTTPStatus.BAD_REQUEST) + + def get_list(self, table, filter={}): + try: + l = [] + collection = self.db[table] + rows = collection.find(self._format_filter(filter)) + for row in rows: + l.append(row) + return l + except DbException: + raise + except Exception as e: # TODO refine + raise DbException(str(e)) + + def get_one(self, table, filter={}, fail_on_empty=True, fail_on_more=True): + try: + if filter: + filter = self._format_filter(filter) + collection = self.db[table] + if not (fail_on_empty and fail_on_more): + return collection.find_one(filter) + rows = collection.find(filter) + if rows.count() == 0: + if fail_on_empty: + raise DbException("Not found any {} with filter='{}'".format(table[:-1], filter), + HTTPStatus.NOT_FOUND) + return None + elif rows.count() > 1: + if fail_on_more: + raise DbException("Found more than one {} with filter='{}'".format(table[:-1], filter), + HTTPStatus.CONFLICT) + return rows[0] + except Exception as e: # TODO refine + raise DbException(str(e)) + + def del_list(self, table, filter={}): + try: + collection = self.db[table] + rows = collection.delete_many(self._format_filter(filter)) + return {"deleted": rows.deleted_count} + except DbException: + raise + except Exception as e: # TODO refine + raise DbException(str(e)) + + def del_one(self, table, filter={}, fail_on_empty=True): + try: + collection = self.db[table] + rows = collection.delete_one(self._format_filter(filter)) + if rows.deleted_count == 0: + if fail_on_empty: + raise DbException("Not found any {} with filter='{}'".format(table[:-1], filter), + HTTPStatus.NOT_FOUND) + return None + return {"deleted": rows.deleted_count} + except Exception as e: # TODO refine + raise DbException(str(e)) + + def create(self, table, indata): + try: + collection = self.db[table] + data = collection.insert_one(indata) + return data.inserted_id + except Exception as e: # TODO refine + raise DbException(str(e)) + + def set_one(self, table, filter, update_dict, fail_on_empty=True): + try: + collection = self.db[table] + rows = collection.update_one(self._format_filter(filter), {"$set": update_dict}) + if rows.updated_count == 0: + if fail_on_empty: + raise DbException("Not found any {} with filter='{}'".format(table[:-1], filter), + HTTPStatus.NOT_FOUND) + return None + return {"deleted": rows.deleted_count} + except Exception as e: # TODO refine + raise DbException(str(e)) + + def replace(self, table, id, indata, fail_on_empty=True): + try: + _filter = {"_id": id} + collection = self.db[table] + rows = collection.replace_one(_filter, indata) + if rows.matched_count == 0: + if fail_on_empty: + raise DbException("Not found any {} with filter='{}'".format(table[:-1], _filter), + HTTPStatus.NOT_FOUND) + return None + return {"replace": rows.modified_count} + except Exception as e: # TODO refine + raise DbException(str(e)) diff --git a/osm_common/fsbase.py b/osm_common/fsbase.py new file mode 100644 index 0000000..7b6cd0c --- /dev/null +++ b/osm_common/fsbase.py @@ -0,0 +1,43 @@ + +from http import HTTPStatus + +__author__ = "Alfonso Tierno " + + +class FsException(Exception): + def __init__(self, message, http_code=HTTPStatus.INTERNAL_SERVER_ERROR): + self.http_code = http_code + Exception.__init__(self, "storage exception " + message) + + +class FsBase(object): + def __init__(self): + pass + + def get_params(self): + return {} + + def fs_connect(self, config): + pass + + def fs_disconnect(self): + pass + + def mkdir(self, folder): + pass + + def file_exists(self, storage): + pass + + def file_size(self, storage): + pass + + def file_extract(self, tar_object, path): + pass + + def file_open(self, storage, mode): + pass + + def file_delete(self, storage, ignore_non_exist=False): + pass + diff --git a/osm_common/fslocal.py b/osm_common/fslocal.py new file mode 100644 index 0000000..b7dd839 --- /dev/null +++ b/osm_common/fslocal.py @@ -0,0 +1,142 @@ +import os +import logging +import tarfile +from http import HTTPStatus +from shutil import rmtree +from fsbase import FsBase, FsException + +__author__ = "Alfonso Tierno " + + +class FsLocal(FsBase): + + def __init__(self, logger_name='fs'): + self.logger = logging.getLogger(logger_name) + self.path = None + + def get_params(self): + return {"fs": "local", "path": self.path} + + def fs_connect(self, config): + try: + if "logger_name" in config: + self.logger = logging.getLogger(config["logger_name"]) + self.path = config["path"] + if not self.path.endswith("/"): + self.path += "/" + if not os.path.exists(self.path): + raise FsException("Invalid configuration param at '[storage]': path '{}' does not exist".format( + config["path"])) + except FsException: + raise + except Exception as e: # TODO refine + raise FsException(str(e)) + + def fs_disconnect(self): + pass # TODO + + def mkdir(self, folder): + """ + Creates a folder or parent object location + :param folder: + :return: None or raises and exception + """ + try: + os.mkdir(self.path + folder) + except Exception as e: + raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR) + + def file_exists(self, storage, mode=None): + """ + Indicates if "storage" file exist + :param storage: can be a str or a str list + :param mode: can be 'file' exist as a regular file; 'dir' exists as a directory or; 'None' just exists + :return: True, False + """ + if isinstance(storage, str): + f = storage + else: + f = "/".join(storage) + if os.path.exists(self.path + f): + if mode == "file" and os.path.isfile(self.path + f): + return True + if mode == "dir" and os.path.isdir(self.path + f): + return True + return False + + def file_size(self, storage): + """ + return file size + :param storage: can be a str or a str list + :return: file size + """ + if isinstance(storage, str): + f = storage + else: + f = "/".join(storage) + return os.path.getsize(self.path + f) + + def file_extract(self, tar_object, path): + """ + extract a tar file + :param tar_object: object of type tar + :param path: can be a str or a str list, or a tar object where to extract the tar_object + :return: None + """ + if isinstance(path, str): + f = self.path + path + else: + f = self.path + "/".join(path) + tar_object.extractall(path=f) + + def file_open(self, storage, mode): + """ + Open a file + :param storage: can be a str or list of str + :param mode: file mode + :return: file object + """ + try: + if isinstance(storage, str): + f = storage + else: + f = "/".join(storage) + return open(self.path + f, mode) + except FileNotFoundError: + raise FsException("File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND) + except IOError: + raise FsException("File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST) + + def dir_ls(self, storage): + """ + return folder content + :param storage: can be a str or list of str + :return: folder content + """ + try: + if isinstance(storage, str): + f = storage + else: + f = "/".join(storage) + return os.listdir(self.path + f) + except NotADirectoryError: + raise FsException("File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND) + except IOError: + raise FsException("File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST) + + def file_delete(self, storage, ignore_non_exist=False): + """ + Delete storage content recursivelly + :param storage: can be a str or list of str + :param ignore_non_exist: not raise exception if storage does not exist + :return: None + """ + + if isinstance(storage, str): + f = self.path + storage + else: + f = self.path + "/".join(storage) + if os.path.exists(f): + rmtree(f) + elif not ignore_non_exist: + raise FsException("File {} does not exist".format(storage), http_code=HTTPStatus.NOT_FOUND) diff --git a/osm_common/msgbase.py b/osm_common/msgbase.py new file mode 100644 index 0000000..25e8c80 --- /dev/null +++ b/osm_common/msgbase.py @@ -0,0 +1,47 @@ + +import asyncio +from http import HTTPStatus + +__author__ = "Alfonso Tierno " + + +class MsgException(Exception): + """ + Base Exception class for all msgXXXX exceptions + """ + + def __init__(self, message, http_code=HTTPStatus.INTERNAL_SERVER_ERROR): + """ + General exception + :param message: descriptive text + :param http_code: type. It contains ".value" (http error code) and ".name" (http error name + """ + self.http_code = http_code + Exception.__init__(self, "messaging exception " + message) + + +class MsgBase(object): + """ + Base class for all msgXXXX classes + """ + + def __init__(self): + pass + + def connect(self, config): + pass + + def disconnect(self): + pass + + def write(self, topic, key, msg): + pass + + def read(self, topic): + pass + + async def aiowrite(self, topic, key, msg, loop): + pass + + async def aioread(self, topic, loop): + pass diff --git a/osm_common/msgkafka.py b/osm_common/msgkafka.py new file mode 100644 index 0000000..c819c81 --- /dev/null +++ b/osm_common/msgkafka.py @@ -0,0 +1,107 @@ +import logging +import asyncio +import yaml +from aiokafka import AIOKafkaConsumer +from aiokafka import AIOKafkaProducer +from aiokafka.errors import KafkaError +from msgbase import MsgBase, MsgException +#import json + +__author__ = "Alfonso Tierno , " \ + "Guillermo Calvino " +class MsgKafka(MsgBase): + def __init__(self, logger_name='msg'): + self.logger = logging.getLogger(logger_name) + self.host = None + self.port = None + self.consumer = None + self.producer = None + self.loop = None + self.broker = None + + def connect(self, config): + try: + if "logger_name" in config: + self.logger = logging.getLogger(config["logger_name"]) + self.host = config["host"] + self.port = config["port"] + self.loop = asyncio.get_event_loop() + self.broker = str(self.host) + ":" + str(self.port) + + except Exception as e: # TODO refine + raise MsgException(str(e)) + + def disconnect(self): + try: + self.loop.close() + except Exception as e: # TODO refine + raise MsgException(str(e)) + + def write(self, topic, key, msg): + try: + self.loop.run_until_complete(self.aiowrite(topic=topic, key=key, + msg=yaml.safe_dump(msg, default_flow_style=True), + loop=self.loop)) + + except Exception as e: + raise MsgException("Error writing {} topic: {}".format(topic, str(e))) + + def read(self, topic): + """ + Read from one or several topics. it is non blocking returning None if nothing is available + :param topic: can be str: single topic; or str list: several topics + :return: topic, key, message; or None + """ + try: + return self.loop.run_until_complete(self.aioread(topic, self.loop)) + except MsgException: + raise + except Exception as e: + raise MsgException("Error reading {} topic: {}".format(topic, str(e))) + + async def aiowrite(self, topic, key, msg, loop=None): + + if not loop: + loop = self.loop + try: + self.producer = AIOKafkaProducer(loop=loop, key_serializer=str.encode, value_serializer=str.encode, + bootstrap_servers=self.broker) + await self.producer.start() + await self.producer.send(topic=topic, key=key, value=msg) + except Exception as e: + raise MsgException("Error publishing to {} topic: {}".format(topic, str(e))) + finally: + await self.producer.stop() + + async def aioread(self, topic, loop=None, callback=None, *args): + """ + Asyncio read from one or several topics. It blocks + :param topic: can be str: single topic; or str list: several topics + :param loop: asyncio loop + :callback: callback function that will handle the message in kafka bus + :*args: optional arguments for callback function + :return: topic, key, message + """ + + if not loop: + loop = self.loop + try: + if isinstance(topic, (list, tuple)): + topic_list = topic + else: + topic_list = (topic,) + + self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker) + await self.consumer.start() + self.consumer.subscribe(topic_list) + + async for message in self.consumer: + if callback: + callback(message.topic, yaml.load(message.key), yaml.load(message.value), *args) + else: + return message.topic, yaml.load(message.key), yaml.load(message.value) + except KafkaError as e: + raise MsgException(str(e)) + finally: + await self.consumer.stop() + diff --git a/osm_common/msglocal.py b/osm_common/msglocal.py new file mode 100644 index 0000000..c774f85 --- /dev/null +++ b/osm_common/msglocal.py @@ -0,0 +1,111 @@ +import logging +import os +import yaml +import asyncio +from msgbase import MsgBase, MsgException +from time import sleep + +__author__ = "Alfonso Tierno " + +""" +This emulated kafka bus by just using a shared file system. Useful for testing or devops. +One file is used per topic. Only one producer and one consumer is allowed per topic. Both consumer and producer +access to the same file. e.g. same volume if running with docker. +One text line per message is used in yaml format. +""" + +class MsgLocal(MsgBase): + + def __init__(self, logger_name='msg'): + self.logger = logging.getLogger(logger_name) + self.path = None + # create a different file for each topic + self.files = {} + self.buffer = {} + + def connect(self, config): + try: + if "logger_name" in config: + self.logger = logging.getLogger(config["logger_name"]) + self.path = config["path"] + if not self.path.endswith("/"): + self.path += "/" + if not os.path.exists(self.path): + os.mkdir(self.path) + except MsgException: + raise + except Exception as e: # TODO refine + raise MsgException(str(e)) + + def disconnect(self): + for f in self.files.values(): + try: + f.close() + except Exception as e: # TODO refine + pass + + def write(self, topic, key, msg): + """ + Insert a message into topic + :param topic: topic + :param key: key text to be inserted + :param msg: value object to be inserted, can be str, object ... + :return: None or raises and exception + """ + try: + if topic not in self.files: + self.files[topic] = open(self.path + topic, "a+") + yaml.safe_dump({key: msg}, self.files[topic], default_flow_style=True, width=20000) + self.files[topic].flush() + except Exception as e: # TODO refine + raise MsgException(str(e)) + + def read(self, topic, blocks=True): + """ + Read from one or several topics. it is non blocking returning None if nothing is available + :param topic: can be str: single topic; or str list: several topics + :param blocks: indicates if it should wait and block until a message is present or returns None + :return: topic, key, message; or None if blocks==True + """ + try: + if isinstance(topic, (list, tuple)): + topic_list = topic + else: + topic_list = (topic, ) + while True: + for single_topic in topic_list: + if single_topic not in self.files: + self.files[single_topic] = open(self.path + single_topic, "a+") + self.buffer[single_topic] = "" + self.buffer[single_topic] += self.files[single_topic].readline() + if not self.buffer[single_topic].endswith("\n"): + continue + msg_dict = yaml.load(self.buffer[single_topic]) + self.buffer[single_topic] = "" + assert len(msg_dict) == 1 + for k, v in msg_dict.items(): + return single_topic, k, v + if not blocks: + return None + sleep(2) + except Exception as e: # TODO refine + raise MsgException(str(e)) + + async def aioread(self, topic, loop): + """ + Asyncio read from one or several topics. It blocks + :param topic: can be str: single topic; or str list: several topics + :param loop: asyncio loop + :return: topic, key, message + """ + try: + while True: + msg = self.read(topic, blocks=False) + if msg: + return msg + await asyncio.sleep(2, loop=loop) + except MsgException: + raise + except Exception as e: # TODO refine + raise MsgException(str(e)) + diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..4a0749f --- /dev/null +++ b/setup.py @@ -0,0 +1,34 @@ +#!/usr/bin/env python3 + +import os +from setuptools import setup + +here = os.path.abspath(os.path.dirname(__file__)) +_name = "osm_common" +VERSION = "4.0.0rc1" +README = open(os.path.join(here, 'README.rst')).read() + +setup( + name=_name, + description='OSM common utilities', + long_description=README, + # version_command=('git describe --tags --long --dirty', 'pep440-git'), + version=VERSION, + # python_requires='>3.5', + author='ETSI OSM', + author_email='alfonso.tiernosepulveda@telefonica.com', + maintainer='Alfonso Tierno', + maintainer_email='alfonso.tiernosepulveda@telefonica.com', + url='https://osm.etsi.org/gitweb/?p=osm/common.git;a=summary', + license='Apache 2.0', + + packages=[_name], + include_package_data=True, + # scripts=['nbi.py'], + + install_requires=[ + 'pymongo', + 'aiokafka', + 'PyYAML', + ], +) diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..ef1f5eb --- /dev/null +++ b/tox.ini @@ -0,0 +1,20 @@ +[tox] +envlist = py27,py3,flake8 +toxworkdir={homedir}/.tox + +[testenv] +deps=nose + mock +commands=nosetests + +[testenv:flake8] +basepython = python3 +deps = flake8 +commands = + flake8 setup.py + +[testenv:build] +basepython = python3 +deps = stdeb + setuptools-version-command +commands = python3 setup.py --command-packages=stdeb.command bdist_deb