initial commit 11/6011/3
authortierno <alfonso.tiernosepulveda@telefonica.com>
Thu, 19 Apr 2018 14:01:59 +0000 (16:01 +0200)
committertierno <alfonso.tiernosepulveda@telefonica.com>
Fri, 20 Apr 2018 14:40:58 +0000 (16:40 +0200)
Change-Id: Ia40148fdc2cabbbacb0b67aaed8442ed0ecf0bc2
Signed-off-by: tierno <alfonso.tiernosepulveda@telefonica.com>
17 files changed:
.gitignore-common [new file with mode: 0644]
LICENSE [new file with mode: 0644]
README.rst [new file with mode: 0644]
devops-stages/stage-archive.sh [new file with mode: 0755]
devops-stages/stage-build.sh [new file with mode: 0755]
devops-stages/stage-test.sh [new file with mode: 0755]
osm_common/__init__.py [new file with mode: 0644]
osm_common/dbbase.py [new file with mode: 0644]
osm_common/dbmemory.py [new file with mode: 0644]
osm_common/dbmongo.py [new file with mode: 0644]
osm_common/fsbase.py [new file with mode: 0644]
osm_common/fslocal.py [new file with mode: 0644]
osm_common/msgbase.py [new file with mode: 0644]
osm_common/msgkafka.py [new file with mode: 0644]
osm_common/msglocal.py [new file with mode: 0644]
setup.py [new file with mode: 0644]
tox.ini [new file with mode: 0644]

diff --git a/.gitignore-common b/.gitignore-common
new file mode 100644 (file)
index 0000000..a1e47df
--- /dev/null
@@ -0,0 +1,35 @@
+# This is a template with common files to be igonored, after clone make a copy to .gitignore
+# cp .gitignore-common .gitignore
+
+*.pyc
+*.pyo
+
+#auto-ignore
+.gitignore
+
+#logs
+logs 
+
+#pycharm
+.idea
+
+#eclipse
+.project     
+.pydevproject
+.settings
+
+#local stuff files that end in ".local" or folders called "local"
+*.local
+osm_common/local
+osm_common/test/local
+
+#local stuff files that end in ".temp" or folders called "temp"
+*.temp
+osm_common/temp
+osm_common/test/temp
+
+#distribution and package generation
+build
+dist
+*.egg-info
+
diff --git a/LICENSE b/LICENSE
new file mode 100644 (file)
index 0000000..8dada3e
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,201 @@
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "{}"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright {yyyy} {name of copyright owner}
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
diff --git a/README.rst b/README.rst
new file mode 100644 (file)
index 0000000..3273af4
--- /dev/null
@@ -0,0 +1,11 @@
+===========
+osm-common
+===========
+
+Contains general modules for lightweight build database, storage and message access.
+The target is to use same library for OSM modules, in order to easy migration to other technologies, that is
+different database or storage object system.
+For database: mongo and memory (volatile) are implemented.
+For message: Kafka and local file system are implemented.
+For storage: only local file system is implemented.
+
diff --git a/devops-stages/stage-archive.sh b/devops-stages/stage-archive.sh
new file mode 100755 (executable)
index 0000000..727c020
--- /dev/null
@@ -0,0 +1,10 @@
+#!/bin/sh
+MDG=common
+rm -rf pool
+rm -rf dists
+mkdir -p pool/$MDG
+mv deb_dist/*.deb pool/$MDG/
+mkdir -p dists/unstable/$MDG/binary-amd64/
+apt-ftparchive packages pool/$MDG > dists/unstable/$MDG/binary-amd64/Packages
+gzip -9fk dists/unstable/$MDG/binary-amd64/Packages
+echo "dists/**,pool/$MDG/*.deb"
diff --git a/devops-stages/stage-build.sh b/devops-stages/stage-build.sh
new file mode 100755 (executable)
index 0000000..bf7602b
--- /dev/null
@@ -0,0 +1,3 @@
+#!/bin/sh
+rm -rf deb_dist
+tox -e build
diff --git a/devops-stages/stage-test.sh b/devops-stages/stage-test.sh
new file mode 100755 (executable)
index 0000000..0333d84
--- /dev/null
@@ -0,0 +1,2 @@
+#!/bin/sh
+#tox
diff --git a/osm_common/__init__.py b/osm_common/__init__.py
new file mode 100644 (file)
index 0000000..df7b893
--- /dev/null
@@ -0,0 +1,2 @@
+version = '0.1.3'
+date_version = '2018-04-19'
\ No newline at end of file
diff --git a/osm_common/dbbase.py b/osm_common/dbbase.py
new file mode 100644 (file)
index 0000000..aa9c24e
--- /dev/null
@@ -0,0 +1,37 @@
+from http import HTTPStatus
+
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
+
+
+class DbException(Exception):
+
+    def __init__(self, message, http_code=HTTPStatus.NOT_FOUND):
+        self.http_code = http_code
+        Exception.__init__(self, "database exception " + message)
+
+
+class DbBase(object):
+
+    def __init__(self):
+        pass
+
+    def db_connect(self, config):
+        pass
+
+    def db_disconnect(self):
+        pass
+
+    def get_list(self, table, filter={}):
+        pass
+
+    def get_one(self, table, filter={}, fail_on_empty=True, fail_on_more=True):
+        pass
+
+    def create(self, table, indata):
+        pass
+
+    def del_list(self, table, filter={}):
+        pass
+
+    def del_one(self, table, filter={}, fail_on_empty=True):
+        pass
diff --git a/osm_common/dbmemory.py b/osm_common/dbmemory.py
new file mode 100644 (file)
index 0000000..cdb0482
--- /dev/null
@@ -0,0 +1,124 @@
+import logging
+from dbbase import DbException, DbBase
+from http import HTTPStatus
+from uuid import uuid4
+from copy import deepcopy
+
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
+
+
+class DbMemory(DbBase):
+
+    def __init__(self, logger_name='db'):
+        self.logger = logging.getLogger(logger_name)
+        self.db = {}
+
+    def db_connect(self, config):
+        if "logger_name" in config:
+            self.logger = logging.getLogger(config["logger_name"])
+
+    @staticmethod
+    def _format_filter(filter):
+        return filter    # TODO
+
+    def _find(self, table, filter):
+        for i, row in enumerate(self.db.get(table, ())):
+            match = True
+            if filter:
+                for k, v in filter.items():
+                    if k not in row or v != row[k]:
+                        match = False
+            if match:
+                yield i, row
+
+    def get_list(self, table, filter={}):
+        try:
+            l = []
+            for _, row in self._find(table, self._format_filter(filter)):
+                l.append(deepcopy(row))
+            return l
+        except DbException:
+            raise
+        except Exception as e:  # TODO refine
+            raise DbException(str(e))
+
+    def get_one(self, table, filter={}, fail_on_empty=True, fail_on_more=True):
+        try:
+            l = None
+            for _, row in self._find(table, self._format_filter(filter)):
+                if not fail_on_more:
+                    return deepcopy(row)
+                if l:
+                    raise DbException("Found more than one entry with filter='{}'".format(filter),
+                                      HTTPStatus.CONFLICT.value)
+                l = row
+            if not l and fail_on_empty:
+                raise DbException("Not found entry with filter='{}'".format(filter), HTTPStatus.NOT_FOUND)
+            return deepcopy(l)
+        except Exception as e:  # TODO refine
+            raise DbException(str(e))
+
+    def del_list(self, table, filter={}):
+        try:
+            id_list = []
+            for i, _ in self._find(table, self._format_filter(filter)):
+                id_list.append(i)
+            deleted = len(id_list)
+            for i in id_list:
+                del self.db[table][i]
+            return {"deleted": deleted}
+        except DbException:
+            raise
+        except Exception as e:  # TODO refine
+            raise DbException(str(e))
+
+    def del_one(self, table, filter={}, fail_on_empty=True):
+        try:
+            for i, _ in self._find(table, self._format_filter(filter)):
+                break
+            else:
+                if fail_on_empty:
+                    raise DbException("Not found entry with filter='{}'".format(filter), HTTPStatus.NOT_FOUND)
+                return None
+            del self.db[table][i]
+            return {"deleted": 1}
+        except Exception as e:  # TODO refine
+            raise DbException(str(e))
+
+    def replace(self, table, filter, indata, fail_on_empty=True):
+        try:
+            for i, _ in self._find(table, self._format_filter(filter)):
+                break
+            else:
+                if fail_on_empty:
+                    raise DbException("Not found entry with filter='{}'".format(filter), HTTPStatus.NOT_FOUND)
+                return None
+            self.db[table][i] = deepcopy(indata)
+            return {"upadted": 1}
+        except Exception as e:  # TODO refine
+            raise DbException(str(e))
+
+    def create(self, table, indata):
+        try:
+            id = indata.get("_id")
+            if not id:
+                id = str(uuid4())
+                indata["_id"] = id
+            if table not in self.db:
+                self.db[table] = []
+            self.db[table].append(deepcopy(indata))
+            return id
+        except Exception as e:  # TODO refine
+            raise DbException(str(e))
+
+
+if __name__ == '__main__':
+    # some test code
+    db = dbmemory()
+    db.create("test", {"_id": 1, "data": 1})
+    db.create("test", {"_id": 2, "data": 2})
+    db.create("test", {"_id": 3, "data": 3})
+    print("must be 3 items:", db.get_list("test"))
+    print("must return item 2:", db.get_list("test", {"_id": 2}))
+    db.del_one("test", {"_id": 2})
+    print("must be emtpy:", db.get_list("test", {"_id": 2}))
diff --git a/osm_common/dbmongo.py b/osm_common/dbmongo.py
new file mode 100644 (file)
index 0000000..582773a
--- /dev/null
@@ -0,0 +1,191 @@
+
+import logging
+from pymongo import MongoClient, errors
+from dbbase import DbException, DbBase
+from http import HTTPStatus
+from time import time, sleep
+
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
+
+# TODO consider use this decorator for database access retries
+# @retry_mongocall
+# def retry_mongocall(call):
+#     def _retry_mongocall(*args, **kwargs):
+#         retry = 1
+#         while True:
+#             try:
+#                 return call(*args, **kwargs)
+#             except pymongo.AutoReconnect as e:
+#                 if retry == 4:
+#                     raise DbException(str(e))
+#                 sleep(retry)
+#     return _retry_mongocall
+
+
+class DbMongo(DbBase):
+    conn_initial_timout = 120
+    conn_timout = 10
+
+    def __init__(self, logger_name='db'):
+        self.logger = logging.getLogger(logger_name)
+
+    def db_connect(self, config):
+        try:
+            if "logger_name" in config:
+                self.logger = logging.getLogger(config["logger_name"])
+            self.client = MongoClient(config["host"], config["port"])
+            self.db = self.client[config["name"]]
+            if "loglevel" in config:
+                self.logger.setLevel(getattr(logging, config['loglevel']))
+            # get data to try a connection
+            now = time()
+            while True:
+                try:
+                    self.db.users.find_one({"username": "admin"})
+                    return
+                except errors.ConnectionFailure as e:
+                    if time() - now >= self.conn_initial_timout:
+                        raise
+                    self.logger.info("Waiting to database up {}".format(e))
+                    sleep(2)
+        except errors.PyMongoError as e:
+            raise DbException(str(e))
+
+    def db_disconnect(self):
+        pass  # TODO
+
+    @staticmethod
+    def _format_filter(filter):
+        try:
+            db_filter = {}
+            for query_k, query_v in filter.items():
+                dot_index = query_k.rfind(".")
+                if dot_index > 1 and query_k[dot_index+1:] in ("eq", "ne", "gt", "gte", "lt", "lte", "cont",
+                                                               "ncont", "neq"):
+                    operator = "$" + query_k[dot_index+1:]
+                    if operator == "$neq":
+                        operator = "$ne"
+                    k = query_k[:dot_index]
+                else:
+                    operator = "$eq"
+                    k = query_k
+
+                v = query_v
+                if isinstance(v, list):
+                    if operator in ("$eq", "$cont"):
+                        operator = "$in"
+                        v = query_v
+                    elif operator in ("$ne", "$ncont"):
+                        operator = "$nin"
+                        v = query_v
+                    else:
+                        v = query_v.join(",")
+
+                if operator in ("$eq", "$cont"):
+                    # v cannot be a comma separated list, because operator would have been changed to $in
+                    db_filter[k] = v
+                elif operator == "$ncount":
+                    # v cannot be a comma separated list, because operator would have been changed to $nin
+                    db_filter[k] = {"$ne": v}
+                else:
+                    # maybe db_filter[k] exist. e.g. in the query string for values between 5 and 8: "a.gt=5&a.lt=8"
+                    if k not in db_filter:
+                        db_filter[k] = {}
+                    db_filter[k][operator] = v
+
+            return db_filter
+        except Exception as e:
+            raise DbException("Invalid query string filter at {}:{}. Error: {}".format(query_k, v, e),
+                              http_code=HTTPStatus.BAD_REQUEST)
+
+    def get_list(self, table, filter={}):
+        try:
+            l = []
+            collection = self.db[table]
+            rows = collection.find(self._format_filter(filter))
+            for row in rows:
+                l.append(row)
+            return l
+        except DbException:
+            raise
+        except Exception as e:  # TODO refine
+            raise DbException(str(e))
+
+    def get_one(self, table, filter={}, fail_on_empty=True, fail_on_more=True):
+        try:
+            if filter:
+                filter = self._format_filter(filter)
+            collection = self.db[table]
+            if not (fail_on_empty and fail_on_more):
+                return collection.find_one(filter)
+            rows = collection.find(filter)
+            if rows.count() == 0:
+                if fail_on_empty:
+                    raise DbException("Not found any {} with filter='{}'".format(table[:-1], filter),
+                                      HTTPStatus.NOT_FOUND)
+                return None
+            elif rows.count() > 1:
+                if fail_on_more:
+                    raise DbException("Found more than one {} with filter='{}'".format(table[:-1], filter),
+                                      HTTPStatus.CONFLICT)
+            return rows[0]
+        except Exception as e:  # TODO refine
+            raise DbException(str(e))
+
+    def del_list(self, table, filter={}):
+        try:
+            collection = self.db[table]
+            rows = collection.delete_many(self._format_filter(filter))
+            return {"deleted": rows.deleted_count}
+        except DbException:
+            raise
+        except Exception as e:  # TODO refine
+            raise DbException(str(e))
+
+    def del_one(self, table, filter={}, fail_on_empty=True):
+        try:
+            collection = self.db[table]
+            rows = collection.delete_one(self._format_filter(filter))
+            if rows.deleted_count == 0:
+                if fail_on_empty:
+                    raise DbException("Not found any {} with filter='{}'".format(table[:-1], filter),
+                                      HTTPStatus.NOT_FOUND)
+                return None
+            return {"deleted": rows.deleted_count}
+        except Exception as e:  # TODO refine
+            raise DbException(str(e))
+
+    def create(self, table, indata):
+        try:
+            collection = self.db[table]
+            data = collection.insert_one(indata)
+            return data.inserted_id
+        except Exception as e:  # TODO refine
+            raise DbException(str(e))
+
+    def set_one(self, table, filter, update_dict, fail_on_empty=True):
+        try:
+            collection = self.db[table]
+            rows = collection.update_one(self._format_filter(filter), {"$set": update_dict})
+            if rows.updated_count == 0:
+                if fail_on_empty:
+                    raise DbException("Not found any {} with filter='{}'".format(table[:-1], filter),
+                                      HTTPStatus.NOT_FOUND)
+                return None
+            return {"deleted": rows.deleted_count}
+        except Exception as e:  # TODO refine
+            raise DbException(str(e))
+
+    def replace(self, table, id, indata, fail_on_empty=True):
+        try:
+            _filter = {"_id": id}
+            collection = self.db[table]
+            rows = collection.replace_one(_filter, indata)
+            if rows.matched_count == 0:
+                if fail_on_empty:
+                    raise DbException("Not found any {} with filter='{}'".format(table[:-1], _filter),
+                                      HTTPStatus.NOT_FOUND)
+                return None
+            return {"replace": rows.modified_count}
+        except Exception as e:  # TODO refine
+            raise DbException(str(e))
diff --git a/osm_common/fsbase.py b/osm_common/fsbase.py
new file mode 100644 (file)
index 0000000..7b6cd0c
--- /dev/null
@@ -0,0 +1,43 @@
+
+from http import HTTPStatus
+
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
+
+
+class FsException(Exception):
+    def __init__(self, message, http_code=HTTPStatus.INTERNAL_SERVER_ERROR):
+        self.http_code = http_code
+        Exception.__init__(self, "storage exception " + message)
+
+
+class FsBase(object):
+    def __init__(self):
+        pass
+
+    def get_params(self):
+        return {}
+
+    def fs_connect(self, config):
+        pass
+
+    def fs_disconnect(self):
+        pass
+
+    def mkdir(self, folder):
+        pass
+
+    def file_exists(self, storage):
+        pass
+
+    def file_size(self, storage):
+        pass
+
+    def file_extract(self, tar_object, path):
+        pass
+
+    def file_open(self, storage, mode):
+        pass
+
+    def file_delete(self, storage, ignore_non_exist=False):
+        pass
+
diff --git a/osm_common/fslocal.py b/osm_common/fslocal.py
new file mode 100644 (file)
index 0000000..b7dd839
--- /dev/null
@@ -0,0 +1,142 @@
+import os
+import logging
+import tarfile
+from http import HTTPStatus
+from shutil import rmtree
+from fsbase import FsBase, FsException
+
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
+
+
+class FsLocal(FsBase):
+
+    def __init__(self, logger_name='fs'):
+        self.logger = logging.getLogger(logger_name)
+        self.path = None
+
+    def get_params(self):
+        return {"fs": "local", "path": self.path}
+
+    def fs_connect(self, config):
+        try:
+            if "logger_name" in config:
+                self.logger = logging.getLogger(config["logger_name"])
+            self.path = config["path"]
+            if not self.path.endswith("/"):
+                self.path += "/"
+            if not os.path.exists(self.path):
+                raise FsException("Invalid configuration param at '[storage]': path '{}' does not exist".format(
+                    config["path"]))
+        except FsException:
+            raise
+        except Exception as e:  # TODO refine
+            raise FsException(str(e))
+
+    def fs_disconnect(self):
+        pass  # TODO
+
+    def mkdir(self, folder):
+        """
+        Creates a folder or parent object location
+        :param folder:
+        :return: None or raises and exception
+        """
+        try:
+            os.mkdir(self.path + folder)
+        except Exception as e:
+            raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
+
+    def file_exists(self, storage, mode=None):
+        """
+        Indicates if "storage" file exist
+        :param storage: can be a str or a str list
+        :param mode: can be 'file' exist as a regular file; 'dir' exists as a directory or; 'None' just exists
+        :return: True, False
+        """
+        if isinstance(storage, str):
+            f = storage
+        else:
+            f = "/".join(storage)
+        if os.path.exists(self.path + f):
+            if mode == "file" and os.path.isfile(self.path + f):
+                return True
+            if mode == "dir" and os.path.isdir(self.path + f):
+                return True
+        return False
+
+    def file_size(self, storage):
+        """
+        return file size
+        :param storage: can be a str or a str list
+        :return: file size
+        """
+        if isinstance(storage, str):
+            f = storage
+        else:
+            f = "/".join(storage)
+        return os.path.getsize(self.path + f)
+
+    def file_extract(self, tar_object, path):
+        """
+        extract a tar file
+        :param tar_object: object of type tar
+        :param path: can be a str or a str list, or a tar object where to extract the tar_object
+        :return: None
+        """
+        if isinstance(path, str):
+            f = self.path + path
+        else:
+            f = self.path + "/".join(path)
+        tar_object.extractall(path=f)
+
+    def file_open(self, storage, mode):
+        """
+        Open a file
+        :param storage: can be a str or list of str
+        :param mode: file mode
+        :return: file object
+        """
+        try:
+            if isinstance(storage, str):
+                f = storage
+            else:
+                f = "/".join(storage)
+            return open(self.path + f, mode)
+        except FileNotFoundError:
+            raise FsException("File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND)
+        except IOError:
+            raise FsException("File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST)
+
+    def dir_ls(self, storage):
+        """
+        return folder content
+        :param storage: can be a str or list of str
+        :return: folder content
+        """
+        try:
+            if isinstance(storage, str):
+                f = storage
+            else:
+                f = "/".join(storage)
+            return os.listdir(self.path + f)
+        except NotADirectoryError:
+            raise FsException("File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND)
+        except IOError:
+            raise FsException("File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST)
+
+    def file_delete(self, storage, ignore_non_exist=False):
+        """
+        Delete storage content recursivelly
+        :param storage: can be a str or list of str
+        :param ignore_non_exist: not raise exception if storage does not exist
+        :return: None
+        """
+
+        if isinstance(storage, str):
+            f = self.path + storage
+        else:
+            f = self.path + "/".join(storage)
+        if os.path.exists(f):
+            rmtree(f)
+        elif not ignore_non_exist:
+            raise FsException("File {} does not exist".format(storage), http_code=HTTPStatus.NOT_FOUND)
diff --git a/osm_common/msgbase.py b/osm_common/msgbase.py
new file mode 100644 (file)
index 0000000..25e8c80
--- /dev/null
@@ -0,0 +1,47 @@
+
+import asyncio
+from http import HTTPStatus
+
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
+
+
+class MsgException(Exception):
+    """
+    Base Exception class for all msgXXXX exceptions
+    """
+
+    def __init__(self, message, http_code=HTTPStatus.INTERNAL_SERVER_ERROR):
+        """
+        General exception
+        :param message:  descriptive text
+        :param http_code: <http.HTTPStatus> type. It contains ".value" (http error code) and ".name" (http error name
+        """
+        self.http_code = http_code
+        Exception.__init__(self, "messaging exception " + message)
+
+
+class MsgBase(object):
+    """
+    Base class for all msgXXXX classes
+    """
+
+    def __init__(self):
+        pass
+
+    def connect(self, config):
+        pass
+
+    def disconnect(self):
+        pass
+
+    def write(self, topic, key, msg):
+        pass
+
+    def read(self, topic):
+        pass
+
+    async def aiowrite(self, topic, key, msg, loop):
+        pass
+
+    async def aioread(self, topic, loop):
+        pass
diff --git a/osm_common/msgkafka.py b/osm_common/msgkafka.py
new file mode 100644 (file)
index 0000000..c819c81
--- /dev/null
@@ -0,0 +1,107 @@
+import logging
+import asyncio
+import yaml
+from aiokafka import AIOKafkaConsumer
+from aiokafka import AIOKafkaProducer
+from aiokafka.errors import KafkaError
+from msgbase import MsgBase, MsgException
+#import json
+
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>, " \
+             "Guillermo Calvino <guillermo.calvinosanchez@altran.com>"
+class MsgKafka(MsgBase):
+    def __init__(self, logger_name='msg'):
+        self.logger = logging.getLogger(logger_name)
+        self.host = None
+        self.port = None
+        self.consumer = None
+        self.producer = None
+        self.loop = None
+        self.broker = None
+
+    def connect(self, config):
+        try:
+            if "logger_name" in config:
+                self.logger = logging.getLogger(config["logger_name"])
+            self.host = config["host"]
+            self.port = config["port"]
+            self.loop = asyncio.get_event_loop()
+            self.broker = str(self.host) + ":" + str(self.port)
+
+        except Exception as e:  # TODO refine
+            raise MsgException(str(e))
+
+    def disconnect(self):
+        try:
+            self.loop.close()
+        except Exception as e:  # TODO refine
+            raise MsgException(str(e))
+
+    def write(self, topic, key, msg):
+        try:
+            self.loop.run_until_complete(self.aiowrite(topic=topic, key=key,
+                                                       msg=yaml.safe_dump(msg, default_flow_style=True),
+                                                       loop=self.loop))
+
+        except Exception as e:
+            raise MsgException("Error writing {} topic: {}".format(topic, str(e)))
+
+    def read(self, topic):
+        """
+        Read from one or several topics. it is non blocking returning None if nothing is available
+        :param topic: can be str: single topic; or str list: several topics
+        :return: topic, key, message; or None
+        """
+        try:
+            return self.loop.run_until_complete(self.aioread(topic, self.loop))
+        except MsgException:
+            raise
+        except Exception as e:
+            raise MsgException("Error reading {} topic: {}".format(topic, str(e)))
+
+    async def aiowrite(self, topic, key, msg, loop=None):
+
+        if not loop:
+            loop = self.loop
+        try:
+            self.producer = AIOKafkaProducer(loop=loop, key_serializer=str.encode, value_serializer=str.encode,
+                                             bootstrap_servers=self.broker)
+            await self.producer.start()
+            await self.producer.send(topic=topic, key=key, value=msg)
+        except Exception as e:
+            raise MsgException("Error publishing to {} topic: {}".format(topic, str(e)))
+        finally:
+            await self.producer.stop()
+
+    async def aioread(self, topic, loop=None, callback=None, *args):
+        """
+        Asyncio read from one or several topics. It blocks
+        :param topic: can be str: single topic; or str list: several topics
+        :param loop: asyncio loop
+        :callback: callback function that will handle the message in kafka bus
+        :*args: optional arguments for callback function
+        :return: topic, key, message
+        """
+
+        if not loop:
+            loop = self.loop
+        try:
+            if isinstance(topic, (list, tuple)):
+                topic_list = topic
+            else:
+                topic_list = (topic,)
+
+            self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker)
+            await self.consumer.start()
+            self.consumer.subscribe(topic_list)
+
+            async for message in self.consumer:
+                if callback:
+                    callback(message.topic, yaml.load(message.key), yaml.load(message.value), *args)
+                else:
+                    return message.topic, yaml.load(message.key), yaml.load(message.value)
+        except KafkaError as e:
+            raise MsgException(str(e))
+        finally:
+            await self.consumer.stop()
+
diff --git a/osm_common/msglocal.py b/osm_common/msglocal.py
new file mode 100644 (file)
index 0000000..c774f85
--- /dev/null
@@ -0,0 +1,111 @@
+import logging
+import os
+import yaml
+import asyncio
+from msgbase import MsgBase, MsgException
+from time import sleep
+
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
+
+"""
+This emulated kafka bus by just using a shared file system. Useful for testing or devops.
+One file is used per topic. Only one producer and one consumer is allowed per topic. Both consumer and producer 
+access to the same file. e.g. same volume if running with docker.
+One text line per message is used in yaml format.
+"""
+
+class MsgLocal(MsgBase):
+
+    def __init__(self, logger_name='msg'):
+        self.logger = logging.getLogger(logger_name)
+        self.path = None
+        # create a different file for each topic
+        self.files = {}
+        self.buffer = {}
+
+    def connect(self, config):
+        try:
+            if "logger_name" in config:
+                self.logger = logging.getLogger(config["logger_name"])
+            self.path = config["path"]
+            if not self.path.endswith("/"):
+                self.path += "/"
+            if not os.path.exists(self.path):
+                os.mkdir(self.path)
+        except MsgException:
+            raise
+        except Exception as e:  # TODO refine
+            raise MsgException(str(e))
+
+    def disconnect(self):
+        for f in self.files.values():
+            try:
+                f.close()
+            except Exception as e:  # TODO refine
+                pass
+
+    def write(self, topic, key, msg):
+        """
+        Insert a message into topic
+        :param topic: topic
+        :param key: key text to be inserted
+        :param msg: value object to be inserted, can be str, object ...
+        :return: None or raises and exception
+        """
+        try:
+            if topic not in self.files:
+                self.files[topic] = open(self.path + topic, "a+")
+            yaml.safe_dump({key: msg}, self.files[topic], default_flow_style=True, width=20000)
+            self.files[topic].flush()
+        except Exception as e:  # TODO refine
+            raise MsgException(str(e))
+
+    def read(self, topic, blocks=True):
+        """
+        Read from one or several topics. it is non blocking returning None if nothing is available
+        :param topic: can be str: single topic; or str list: several topics
+        :param blocks: indicates if it should wait and block until a message is present or returns None
+        :return: topic, key, message; or None if blocks==True
+        """
+        try:
+            if isinstance(topic, (list, tuple)):
+                topic_list = topic
+            else:
+                topic_list = (topic, )
+            while True:
+                for single_topic in topic_list:
+                    if single_topic not in self.files:
+                        self.files[single_topic] = open(self.path + single_topic, "a+")
+                        self.buffer[single_topic] = ""
+                    self.buffer[single_topic] += self.files[single_topic].readline()
+                    if not self.buffer[single_topic].endswith("\n"):
+                        continue
+                    msg_dict = yaml.load(self.buffer[single_topic])
+                    self.buffer[single_topic] = ""
+                    assert len(msg_dict) == 1
+                    for k, v in msg_dict.items():
+                        return single_topic, k, v
+                if not blocks:
+                    return None
+                sleep(2)
+        except Exception as e:  # TODO refine
+            raise MsgException(str(e))
+
+    async def aioread(self, topic, loop):
+        """
+        Asyncio read from one or several topics. It blocks
+        :param topic: can be str: single topic; or str list: several topics
+        :param loop: asyncio loop
+        :return: topic, key, message
+        """
+        try:
+            while True:
+                msg = self.read(topic, blocks=False)
+                if msg:
+                    return msg
+                await asyncio.sleep(2, loop=loop)
+        except MsgException:
+            raise
+        except Exception as e:  # TODO refine
+            raise MsgException(str(e))
+
diff --git a/setup.py b/setup.py
new file mode 100644 (file)
index 0000000..4a0749f
--- /dev/null
+++ b/setup.py
@@ -0,0 +1,34 @@
+#!/usr/bin/env python3
+
+import os
+from setuptools import setup
+
+here = os.path.abspath(os.path.dirname(__file__))
+_name = "osm_common"
+VERSION = "4.0.0rc1" 
+README = open(os.path.join(here, 'README.rst')).read()
+
+setup(
+    name=_name,
+    description='OSM common utilities',
+    long_description=README,
+    # version_command=('git describe --tags --long --dirty', 'pep440-git'),
+    version=VERSION,
+    # python_requires='>3.5',
+    author='ETSI OSM',
+    author_email='alfonso.tiernosepulveda@telefonica.com',
+    maintainer='Alfonso Tierno',
+    maintainer_email='alfonso.tiernosepulveda@telefonica.com',
+    url='https://osm.etsi.org/gitweb/?p=osm/common.git;a=summary',
+    license='Apache 2.0',
+
+    packages=[_name],
+    include_package_data=True,
+    # scripts=['nbi.py'],
+
+    install_requires=[
+        'pymongo',
+        'aiokafka',
+        'PyYAML',
+    ],
+)
diff --git a/tox.ini b/tox.ini
new file mode 100644 (file)
index 0000000..ef1f5eb
--- /dev/null
+++ b/tox.ini
@@ -0,0 +1,20 @@
+[tox]
+envlist = py27,py3,flake8
+toxworkdir={homedir}/.tox
+
+[testenv]
+deps=nose
+     mock
+commands=nosetests
+
+[testenv:flake8]
+basepython = python3
+deps = flake8
+commands =
+    flake8 setup.py
+
+[testenv:build]
+basepython = python3
+deps = stdeb
+       setuptools-version-command
+commands = python3 setup.py --command-packages=stdeb.command bdist_deb