--- /dev/null
+# This is a template with common files to be igonored, after clone make a copy to .gitignore
+# cp .gitignore-common .gitignore
+
+*.pyc
+*.pyo
+
+#auto-ignore
+.gitignore
+
+#logs
+logs
+
+#pycharm
+.idea
+
+#eclipse
+.project
+.pydevproject
+.settings
+
+#local stuff files that end in ".local" or folders called "local"
+*.local
+osm_common/local
+osm_common/test/local
+
+#local stuff files that end in ".temp" or folders called "temp"
+*.temp
+osm_common/temp
+osm_common/test/temp
+
+#distribution and package generation
+build
+dist
+*.egg-info
+
--- /dev/null
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "{}"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright {yyyy} {name of copyright owner}
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
--- /dev/null
+===========
+osm-common
+===========
+
+Contains general modules for lightweight build database, storage and message access.
+The target is to use same library for OSM modules, in order to easy migration to other technologies, that is
+different database or storage object system.
+For database: mongo and memory (volatile) are implemented.
+For message: Kafka and local file system are implemented.
+For storage: only local file system is implemented.
+
--- /dev/null
+#!/bin/sh
+MDG=common
+rm -rf pool
+rm -rf dists
+mkdir -p pool/$MDG
+mv deb_dist/*.deb pool/$MDG/
+mkdir -p dists/unstable/$MDG/binary-amd64/
+apt-ftparchive packages pool/$MDG > dists/unstable/$MDG/binary-amd64/Packages
+gzip -9fk dists/unstable/$MDG/binary-amd64/Packages
+echo "dists/**,pool/$MDG/*.deb"
--- /dev/null
+#!/bin/sh
+rm -rf deb_dist
+tox -e build
--- /dev/null
+#!/bin/sh
+#tox
--- /dev/null
+version = '0.1.3'
+date_version = '2018-04-19'
\ No newline at end of file
--- /dev/null
+from http import HTTPStatus
+
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
+
+
+class DbException(Exception):
+
+ def __init__(self, message, http_code=HTTPStatus.NOT_FOUND):
+ self.http_code = http_code
+ Exception.__init__(self, "database exception " + message)
+
+
+class DbBase(object):
+
+ def __init__(self):
+ pass
+
+ def db_connect(self, config):
+ pass
+
+ def db_disconnect(self):
+ pass
+
+ def get_list(self, table, filter={}):
+ pass
+
+ def get_one(self, table, filter={}, fail_on_empty=True, fail_on_more=True):
+ pass
+
+ def create(self, table, indata):
+ pass
+
+ def del_list(self, table, filter={}):
+ pass
+
+ def del_one(self, table, filter={}, fail_on_empty=True):
+ pass
--- /dev/null
+import logging
+from dbbase import DbException, DbBase
+from http import HTTPStatus
+from uuid import uuid4
+from copy import deepcopy
+
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
+
+
+class DbMemory(DbBase):
+
+ def __init__(self, logger_name='db'):
+ self.logger = logging.getLogger(logger_name)
+ self.db = {}
+
+ def db_connect(self, config):
+ if "logger_name" in config:
+ self.logger = logging.getLogger(config["logger_name"])
+
+ @staticmethod
+ def _format_filter(filter):
+ return filter # TODO
+
+ def _find(self, table, filter):
+ for i, row in enumerate(self.db.get(table, ())):
+ match = True
+ if filter:
+ for k, v in filter.items():
+ if k not in row or v != row[k]:
+ match = False
+ if match:
+ yield i, row
+
+ def get_list(self, table, filter={}):
+ try:
+ l = []
+ for _, row in self._find(table, self._format_filter(filter)):
+ l.append(deepcopy(row))
+ return l
+ except DbException:
+ raise
+ except Exception as e: # TODO refine
+ raise DbException(str(e))
+
+ def get_one(self, table, filter={}, fail_on_empty=True, fail_on_more=True):
+ try:
+ l = None
+ for _, row in self._find(table, self._format_filter(filter)):
+ if not fail_on_more:
+ return deepcopy(row)
+ if l:
+ raise DbException("Found more than one entry with filter='{}'".format(filter),
+ HTTPStatus.CONFLICT.value)
+ l = row
+ if not l and fail_on_empty:
+ raise DbException("Not found entry with filter='{}'".format(filter), HTTPStatus.NOT_FOUND)
+ return deepcopy(l)
+ except Exception as e: # TODO refine
+ raise DbException(str(e))
+
+ def del_list(self, table, filter={}):
+ try:
+ id_list = []
+ for i, _ in self._find(table, self._format_filter(filter)):
+ id_list.append(i)
+ deleted = len(id_list)
+ for i in id_list:
+ del self.db[table][i]
+ return {"deleted": deleted}
+ except DbException:
+ raise
+ except Exception as e: # TODO refine
+ raise DbException(str(e))
+
+ def del_one(self, table, filter={}, fail_on_empty=True):
+ try:
+ for i, _ in self._find(table, self._format_filter(filter)):
+ break
+ else:
+ if fail_on_empty:
+ raise DbException("Not found entry with filter='{}'".format(filter), HTTPStatus.NOT_FOUND)
+ return None
+ del self.db[table][i]
+ return {"deleted": 1}
+ except Exception as e: # TODO refine
+ raise DbException(str(e))
+
+ def replace(self, table, filter, indata, fail_on_empty=True):
+ try:
+ for i, _ in self._find(table, self._format_filter(filter)):
+ break
+ else:
+ if fail_on_empty:
+ raise DbException("Not found entry with filter='{}'".format(filter), HTTPStatus.NOT_FOUND)
+ return None
+ self.db[table][i] = deepcopy(indata)
+ return {"upadted": 1}
+ except Exception as e: # TODO refine
+ raise DbException(str(e))
+
+ def create(self, table, indata):
+ try:
+ id = indata.get("_id")
+ if not id:
+ id = str(uuid4())
+ indata["_id"] = id
+ if table not in self.db:
+ self.db[table] = []
+ self.db[table].append(deepcopy(indata))
+ return id
+ except Exception as e: # TODO refine
+ raise DbException(str(e))
+
+
+if __name__ == '__main__':
+ # some test code
+ db = dbmemory()
+ db.create("test", {"_id": 1, "data": 1})
+ db.create("test", {"_id": 2, "data": 2})
+ db.create("test", {"_id": 3, "data": 3})
+ print("must be 3 items:", db.get_list("test"))
+ print("must return item 2:", db.get_list("test", {"_id": 2}))
+ db.del_one("test", {"_id": 2})
+ print("must be emtpy:", db.get_list("test", {"_id": 2}))
--- /dev/null
+
+import logging
+from pymongo import MongoClient, errors
+from dbbase import DbException, DbBase
+from http import HTTPStatus
+from time import time, sleep
+
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
+
+# TODO consider use this decorator for database access retries
+# @retry_mongocall
+# def retry_mongocall(call):
+# def _retry_mongocall(*args, **kwargs):
+# retry = 1
+# while True:
+# try:
+# return call(*args, **kwargs)
+# except pymongo.AutoReconnect as e:
+# if retry == 4:
+# raise DbException(str(e))
+# sleep(retry)
+# return _retry_mongocall
+
+
+class DbMongo(DbBase):
+ conn_initial_timout = 120
+ conn_timout = 10
+
+ def __init__(self, logger_name='db'):
+ self.logger = logging.getLogger(logger_name)
+
+ def db_connect(self, config):
+ try:
+ if "logger_name" in config:
+ self.logger = logging.getLogger(config["logger_name"])
+ self.client = MongoClient(config["host"], config["port"])
+ self.db = self.client[config["name"]]
+ if "loglevel" in config:
+ self.logger.setLevel(getattr(logging, config['loglevel']))
+ # get data to try a connection
+ now = time()
+ while True:
+ try:
+ self.db.users.find_one({"username": "admin"})
+ return
+ except errors.ConnectionFailure as e:
+ if time() - now >= self.conn_initial_timout:
+ raise
+ self.logger.info("Waiting to database up {}".format(e))
+ sleep(2)
+ except errors.PyMongoError as e:
+ raise DbException(str(e))
+
+ def db_disconnect(self):
+ pass # TODO
+
+ @staticmethod
+ def _format_filter(filter):
+ try:
+ db_filter = {}
+ for query_k, query_v in filter.items():
+ dot_index = query_k.rfind(".")
+ if dot_index > 1 and query_k[dot_index+1:] in ("eq", "ne", "gt", "gte", "lt", "lte", "cont",
+ "ncont", "neq"):
+ operator = "$" + query_k[dot_index+1:]
+ if operator == "$neq":
+ operator = "$ne"
+ k = query_k[:dot_index]
+ else:
+ operator = "$eq"
+ k = query_k
+
+ v = query_v
+ if isinstance(v, list):
+ if operator in ("$eq", "$cont"):
+ operator = "$in"
+ v = query_v
+ elif operator in ("$ne", "$ncont"):
+ operator = "$nin"
+ v = query_v
+ else:
+ v = query_v.join(",")
+
+ if operator in ("$eq", "$cont"):
+ # v cannot be a comma separated list, because operator would have been changed to $in
+ db_filter[k] = v
+ elif operator == "$ncount":
+ # v cannot be a comma separated list, because operator would have been changed to $nin
+ db_filter[k] = {"$ne": v}
+ else:
+ # maybe db_filter[k] exist. e.g. in the query string for values between 5 and 8: "a.gt=5&a.lt=8"
+ if k not in db_filter:
+ db_filter[k] = {}
+ db_filter[k][operator] = v
+
+ return db_filter
+ except Exception as e:
+ raise DbException("Invalid query string filter at {}:{}. Error: {}".format(query_k, v, e),
+ http_code=HTTPStatus.BAD_REQUEST)
+
+ def get_list(self, table, filter={}):
+ try:
+ l = []
+ collection = self.db[table]
+ rows = collection.find(self._format_filter(filter))
+ for row in rows:
+ l.append(row)
+ return l
+ except DbException:
+ raise
+ except Exception as e: # TODO refine
+ raise DbException(str(e))
+
+ def get_one(self, table, filter={}, fail_on_empty=True, fail_on_more=True):
+ try:
+ if filter:
+ filter = self._format_filter(filter)
+ collection = self.db[table]
+ if not (fail_on_empty and fail_on_more):
+ return collection.find_one(filter)
+ rows = collection.find(filter)
+ if rows.count() == 0:
+ if fail_on_empty:
+ raise DbException("Not found any {} with filter='{}'".format(table[:-1], filter),
+ HTTPStatus.NOT_FOUND)
+ return None
+ elif rows.count() > 1:
+ if fail_on_more:
+ raise DbException("Found more than one {} with filter='{}'".format(table[:-1], filter),
+ HTTPStatus.CONFLICT)
+ return rows[0]
+ except Exception as e: # TODO refine
+ raise DbException(str(e))
+
+ def del_list(self, table, filter={}):
+ try:
+ collection = self.db[table]
+ rows = collection.delete_many(self._format_filter(filter))
+ return {"deleted": rows.deleted_count}
+ except DbException:
+ raise
+ except Exception as e: # TODO refine
+ raise DbException(str(e))
+
+ def del_one(self, table, filter={}, fail_on_empty=True):
+ try:
+ collection = self.db[table]
+ rows = collection.delete_one(self._format_filter(filter))
+ if rows.deleted_count == 0:
+ if fail_on_empty:
+ raise DbException("Not found any {} with filter='{}'".format(table[:-1], filter),
+ HTTPStatus.NOT_FOUND)
+ return None
+ return {"deleted": rows.deleted_count}
+ except Exception as e: # TODO refine
+ raise DbException(str(e))
+
+ def create(self, table, indata):
+ try:
+ collection = self.db[table]
+ data = collection.insert_one(indata)
+ return data.inserted_id
+ except Exception as e: # TODO refine
+ raise DbException(str(e))
+
+ def set_one(self, table, filter, update_dict, fail_on_empty=True):
+ try:
+ collection = self.db[table]
+ rows = collection.update_one(self._format_filter(filter), {"$set": update_dict})
+ if rows.updated_count == 0:
+ if fail_on_empty:
+ raise DbException("Not found any {} with filter='{}'".format(table[:-1], filter),
+ HTTPStatus.NOT_FOUND)
+ return None
+ return {"deleted": rows.deleted_count}
+ except Exception as e: # TODO refine
+ raise DbException(str(e))
+
+ def replace(self, table, id, indata, fail_on_empty=True):
+ try:
+ _filter = {"_id": id}
+ collection = self.db[table]
+ rows = collection.replace_one(_filter, indata)
+ if rows.matched_count == 0:
+ if fail_on_empty:
+ raise DbException("Not found any {} with filter='{}'".format(table[:-1], _filter),
+ HTTPStatus.NOT_FOUND)
+ return None
+ return {"replace": rows.modified_count}
+ except Exception as e: # TODO refine
+ raise DbException(str(e))
--- /dev/null
+
+from http import HTTPStatus
+
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
+
+
+class FsException(Exception):
+ def __init__(self, message, http_code=HTTPStatus.INTERNAL_SERVER_ERROR):
+ self.http_code = http_code
+ Exception.__init__(self, "storage exception " + message)
+
+
+class FsBase(object):
+ def __init__(self):
+ pass
+
+ def get_params(self):
+ return {}
+
+ def fs_connect(self, config):
+ pass
+
+ def fs_disconnect(self):
+ pass
+
+ def mkdir(self, folder):
+ pass
+
+ def file_exists(self, storage):
+ pass
+
+ def file_size(self, storage):
+ pass
+
+ def file_extract(self, tar_object, path):
+ pass
+
+ def file_open(self, storage, mode):
+ pass
+
+ def file_delete(self, storage, ignore_non_exist=False):
+ pass
+
--- /dev/null
+import os
+import logging
+import tarfile
+from http import HTTPStatus
+from shutil import rmtree
+from fsbase import FsBase, FsException
+
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
+
+
+class FsLocal(FsBase):
+
+ def __init__(self, logger_name='fs'):
+ self.logger = logging.getLogger(logger_name)
+ self.path = None
+
+ def get_params(self):
+ return {"fs": "local", "path": self.path}
+
+ def fs_connect(self, config):
+ try:
+ if "logger_name" in config:
+ self.logger = logging.getLogger(config["logger_name"])
+ self.path = config["path"]
+ if not self.path.endswith("/"):
+ self.path += "/"
+ if not os.path.exists(self.path):
+ raise FsException("Invalid configuration param at '[storage]': path '{}' does not exist".format(
+ config["path"]))
+ except FsException:
+ raise
+ except Exception as e: # TODO refine
+ raise FsException(str(e))
+
+ def fs_disconnect(self):
+ pass # TODO
+
+ def mkdir(self, folder):
+ """
+ Creates a folder or parent object location
+ :param folder:
+ :return: None or raises and exception
+ """
+ try:
+ os.mkdir(self.path + folder)
+ except Exception as e:
+ raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
+
+ def file_exists(self, storage, mode=None):
+ """
+ Indicates if "storage" file exist
+ :param storage: can be a str or a str list
+ :param mode: can be 'file' exist as a regular file; 'dir' exists as a directory or; 'None' just exists
+ :return: True, False
+ """
+ if isinstance(storage, str):
+ f = storage
+ else:
+ f = "/".join(storage)
+ if os.path.exists(self.path + f):
+ if mode == "file" and os.path.isfile(self.path + f):
+ return True
+ if mode == "dir" and os.path.isdir(self.path + f):
+ return True
+ return False
+
+ def file_size(self, storage):
+ """
+ return file size
+ :param storage: can be a str or a str list
+ :return: file size
+ """
+ if isinstance(storage, str):
+ f = storage
+ else:
+ f = "/".join(storage)
+ return os.path.getsize(self.path + f)
+
+ def file_extract(self, tar_object, path):
+ """
+ extract a tar file
+ :param tar_object: object of type tar
+ :param path: can be a str or a str list, or a tar object where to extract the tar_object
+ :return: None
+ """
+ if isinstance(path, str):
+ f = self.path + path
+ else:
+ f = self.path + "/".join(path)
+ tar_object.extractall(path=f)
+
+ def file_open(self, storage, mode):
+ """
+ Open a file
+ :param storage: can be a str or list of str
+ :param mode: file mode
+ :return: file object
+ """
+ try:
+ if isinstance(storage, str):
+ f = storage
+ else:
+ f = "/".join(storage)
+ return open(self.path + f, mode)
+ except FileNotFoundError:
+ raise FsException("File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND)
+ except IOError:
+ raise FsException("File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST)
+
+ def dir_ls(self, storage):
+ """
+ return folder content
+ :param storage: can be a str or list of str
+ :return: folder content
+ """
+ try:
+ if isinstance(storage, str):
+ f = storage
+ else:
+ f = "/".join(storage)
+ return os.listdir(self.path + f)
+ except NotADirectoryError:
+ raise FsException("File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND)
+ except IOError:
+ raise FsException("File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST)
+
+ def file_delete(self, storage, ignore_non_exist=False):
+ """
+ Delete storage content recursivelly
+ :param storage: can be a str or list of str
+ :param ignore_non_exist: not raise exception if storage does not exist
+ :return: None
+ """
+
+ if isinstance(storage, str):
+ f = self.path + storage
+ else:
+ f = self.path + "/".join(storage)
+ if os.path.exists(f):
+ rmtree(f)
+ elif not ignore_non_exist:
+ raise FsException("File {} does not exist".format(storage), http_code=HTTPStatus.NOT_FOUND)
--- /dev/null
+
+import asyncio
+from http import HTTPStatus
+
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
+
+
+class MsgException(Exception):
+ """
+ Base Exception class for all msgXXXX exceptions
+ """
+
+ def __init__(self, message, http_code=HTTPStatus.INTERNAL_SERVER_ERROR):
+ """
+ General exception
+ :param message: descriptive text
+ :param http_code: <http.HTTPStatus> type. It contains ".value" (http error code) and ".name" (http error name
+ """
+ self.http_code = http_code
+ Exception.__init__(self, "messaging exception " + message)
+
+
+class MsgBase(object):
+ """
+ Base class for all msgXXXX classes
+ """
+
+ def __init__(self):
+ pass
+
+ def connect(self, config):
+ pass
+
+ def disconnect(self):
+ pass
+
+ def write(self, topic, key, msg):
+ pass
+
+ def read(self, topic):
+ pass
+
+ async def aiowrite(self, topic, key, msg, loop):
+ pass
+
+ async def aioread(self, topic, loop):
+ pass
--- /dev/null
+import logging
+import asyncio
+import yaml
+from aiokafka import AIOKafkaConsumer
+from aiokafka import AIOKafkaProducer
+from aiokafka.errors import KafkaError
+from msgbase import MsgBase, MsgException
+#import json
+
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>, " \
+ "Guillermo Calvino <guillermo.calvinosanchez@altran.com>"
+class MsgKafka(MsgBase):
+ def __init__(self, logger_name='msg'):
+ self.logger = logging.getLogger(logger_name)
+ self.host = None
+ self.port = None
+ self.consumer = None
+ self.producer = None
+ self.loop = None
+ self.broker = None
+
+ def connect(self, config):
+ try:
+ if "logger_name" in config:
+ self.logger = logging.getLogger(config["logger_name"])
+ self.host = config["host"]
+ self.port = config["port"]
+ self.loop = asyncio.get_event_loop()
+ self.broker = str(self.host) + ":" + str(self.port)
+
+ except Exception as e: # TODO refine
+ raise MsgException(str(e))
+
+ def disconnect(self):
+ try:
+ self.loop.close()
+ except Exception as e: # TODO refine
+ raise MsgException(str(e))
+
+ def write(self, topic, key, msg):
+ try:
+ self.loop.run_until_complete(self.aiowrite(topic=topic, key=key,
+ msg=yaml.safe_dump(msg, default_flow_style=True),
+ loop=self.loop))
+
+ except Exception as e:
+ raise MsgException("Error writing {} topic: {}".format(topic, str(e)))
+
+ def read(self, topic):
+ """
+ Read from one or several topics. it is non blocking returning None if nothing is available
+ :param topic: can be str: single topic; or str list: several topics
+ :return: topic, key, message; or None
+ """
+ try:
+ return self.loop.run_until_complete(self.aioread(topic, self.loop))
+ except MsgException:
+ raise
+ except Exception as e:
+ raise MsgException("Error reading {} topic: {}".format(topic, str(e)))
+
+ async def aiowrite(self, topic, key, msg, loop=None):
+
+ if not loop:
+ loop = self.loop
+ try:
+ self.producer = AIOKafkaProducer(loop=loop, key_serializer=str.encode, value_serializer=str.encode,
+ bootstrap_servers=self.broker)
+ await self.producer.start()
+ await self.producer.send(topic=topic, key=key, value=msg)
+ except Exception as e:
+ raise MsgException("Error publishing to {} topic: {}".format(topic, str(e)))
+ finally:
+ await self.producer.stop()
+
+ async def aioread(self, topic, loop=None, callback=None, *args):
+ """
+ Asyncio read from one or several topics. It blocks
+ :param topic: can be str: single topic; or str list: several topics
+ :param loop: asyncio loop
+ :callback: callback function that will handle the message in kafka bus
+ :*args: optional arguments for callback function
+ :return: topic, key, message
+ """
+
+ if not loop:
+ loop = self.loop
+ try:
+ if isinstance(topic, (list, tuple)):
+ topic_list = topic
+ else:
+ topic_list = (topic,)
+
+ self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker)
+ await self.consumer.start()
+ self.consumer.subscribe(topic_list)
+
+ async for message in self.consumer:
+ if callback:
+ callback(message.topic, yaml.load(message.key), yaml.load(message.value), *args)
+ else:
+ return message.topic, yaml.load(message.key), yaml.load(message.value)
+ except KafkaError as e:
+ raise MsgException(str(e))
+ finally:
+ await self.consumer.stop()
+
--- /dev/null
+import logging
+import os
+import yaml
+import asyncio
+from msgbase import MsgBase, MsgException
+from time import sleep
+
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
+
+"""
+This emulated kafka bus by just using a shared file system. Useful for testing or devops.
+One file is used per topic. Only one producer and one consumer is allowed per topic. Both consumer and producer
+access to the same file. e.g. same volume if running with docker.
+One text line per message is used in yaml format.
+"""
+
+class MsgLocal(MsgBase):
+
+ def __init__(self, logger_name='msg'):
+ self.logger = logging.getLogger(logger_name)
+ self.path = None
+ # create a different file for each topic
+ self.files = {}
+ self.buffer = {}
+
+ def connect(self, config):
+ try:
+ if "logger_name" in config:
+ self.logger = logging.getLogger(config["logger_name"])
+ self.path = config["path"]
+ if not self.path.endswith("/"):
+ self.path += "/"
+ if not os.path.exists(self.path):
+ os.mkdir(self.path)
+ except MsgException:
+ raise
+ except Exception as e: # TODO refine
+ raise MsgException(str(e))
+
+ def disconnect(self):
+ for f in self.files.values():
+ try:
+ f.close()
+ except Exception as e: # TODO refine
+ pass
+
+ def write(self, topic, key, msg):
+ """
+ Insert a message into topic
+ :param topic: topic
+ :param key: key text to be inserted
+ :param msg: value object to be inserted, can be str, object ...
+ :return: None or raises and exception
+ """
+ try:
+ if topic not in self.files:
+ self.files[topic] = open(self.path + topic, "a+")
+ yaml.safe_dump({key: msg}, self.files[topic], default_flow_style=True, width=20000)
+ self.files[topic].flush()
+ except Exception as e: # TODO refine
+ raise MsgException(str(e))
+
+ def read(self, topic, blocks=True):
+ """
+ Read from one or several topics. it is non blocking returning None if nothing is available
+ :param topic: can be str: single topic; or str list: several topics
+ :param blocks: indicates if it should wait and block until a message is present or returns None
+ :return: topic, key, message; or None if blocks==True
+ """
+ try:
+ if isinstance(topic, (list, tuple)):
+ topic_list = topic
+ else:
+ topic_list = (topic, )
+ while True:
+ for single_topic in topic_list:
+ if single_topic not in self.files:
+ self.files[single_topic] = open(self.path + single_topic, "a+")
+ self.buffer[single_topic] = ""
+ self.buffer[single_topic] += self.files[single_topic].readline()
+ if not self.buffer[single_topic].endswith("\n"):
+ continue
+ msg_dict = yaml.load(self.buffer[single_topic])
+ self.buffer[single_topic] = ""
+ assert len(msg_dict) == 1
+ for k, v in msg_dict.items():
+ return single_topic, k, v
+ if not blocks:
+ return None
+ sleep(2)
+ except Exception as e: # TODO refine
+ raise MsgException(str(e))
+
+ async def aioread(self, topic, loop):
+ """
+ Asyncio read from one or several topics. It blocks
+ :param topic: can be str: single topic; or str list: several topics
+ :param loop: asyncio loop
+ :return: topic, key, message
+ """
+ try:
+ while True:
+ msg = self.read(topic, blocks=False)
+ if msg:
+ return msg
+ await asyncio.sleep(2, loop=loop)
+ except MsgException:
+ raise
+ except Exception as e: # TODO refine
+ raise MsgException(str(e))
+
--- /dev/null
+#!/usr/bin/env python3
+
+import os
+from setuptools import setup
+
+here = os.path.abspath(os.path.dirname(__file__))
+_name = "osm_common"
+VERSION = "4.0.0rc1"
+README = open(os.path.join(here, 'README.rst')).read()
+
+setup(
+ name=_name,
+ description='OSM common utilities',
+ long_description=README,
+ # version_command=('git describe --tags --long --dirty', 'pep440-git'),
+ version=VERSION,
+ # python_requires='>3.5',
+ author='ETSI OSM',
+ author_email='alfonso.tiernosepulveda@telefonica.com',
+ maintainer='Alfonso Tierno',
+ maintainer_email='alfonso.tiernosepulveda@telefonica.com',
+ url='https://osm.etsi.org/gitweb/?p=osm/common.git;a=summary',
+ license='Apache 2.0',
+
+ packages=[_name],
+ include_package_data=True,
+ # scripts=['nbi.py'],
+
+ install_requires=[
+ 'pymongo',
+ 'aiokafka',
+ 'PyYAML',
+ ],
+)
--- /dev/null
+[tox]
+envlist = py27,py3,flake8
+toxworkdir={homedir}/.tox
+
+[testenv]
+deps=nose
+ mock
+commands=nosetests
+
+[testenv:flake8]
+basepython = python3
+deps = flake8
+commands =
+ flake8 setup.py
+
+[testenv:build]
+basepython = python3
+deps = stdeb
+ setuptools-version-command
+commands = python3 setup.py --command-packages=stdeb.command bdist_deb