From: selvi.j Date: Fri, 28 Apr 2023 06:47:48 +0000 (+0000) Subject: Coverity-CWE 295: Improper Certificate Validation X-Git-Tag: release-v14.0-start X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=9184037181b9fbf39dfa9624657087aed7f1a6cd;p=osm%2FNBI.git Coverity-CWE 295: Improper Certificate Validation Added fix for CWE 295: Improper Certificate Validation (SSL certificate validation disabled) Change-Id: Ibdf84e00a79d42c695a25ce96e13c515e85b11f2 Signed-off-by: selvi.j --- diff --git a/osm_nbi/tests/send_kafka.py b/osm_nbi/tests/send_kafka.py deleted file mode 100755 index d066d14..0000000 --- a/osm_nbi/tests/send_kafka.py +++ /dev/null @@ -1,64 +0,0 @@ -#! /usr/bin/python3 -# -*- coding: utf-8 -*- - -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import sys -import requests -import yaml -from os import getenv - -__author__ = "Alfonso Tierno, alfonso.tiernosepulveda@telefonica.com" -__date__ = "$2019-05-31$" -__version__ = "0.1" -version_date = "May 2019" - - -def usage(): - print("Usage: ", sys.argv[0], "topic key message") - print(" Sends a kafka message using URL test of NBI") - print(" host is defined by env OSMNBI_HOST (localhost by default)") - print(" port is defined by env OSMNBI_PORT (9999 by default)") - return - - -if __name__ == "__main__": - try: - if "--help" in sys.argv: - usage() - exit(0) - - if len(sys.argv) != 4: - print( - "missing parameters. Type --help for more information", file=sys.stderr - ) - exit(1) - - topic, key, message = sys.argv[1:] - host = getenv("OSMNBI_HOST", "localhost") - port = getenv("OSMNBI_PORT", "9999") - url = "https://{host}:{port}/osm/test/message/{topic}".format( - host=host, port=port, topic=topic - ) - print(url) - data = {key: message} - - r = requests.post(url, data=yaml.safe_dump(data), verify=False) - if r.status_code not in (200, 201, 202, 204): - print("Received code={}, content='{}'".format(r.status_code, r.text)) - exit(1) - print("{} -> {}: {}".format(topic, key, message)) - - except Exception: - raise diff --git a/osm_nbi/tests/upload.py b/osm_nbi/tests/upload.py deleted file mode 100755 index dfd7302..0000000 --- a/osm_nbi/tests/upload.py +++ /dev/null @@ -1,117 +0,0 @@ -#! /usr/bin/python3 -# -*- coding: utf-8 -*- - -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import getopt -import sys -import requests -from os.path import getsize, basename -from hashlib import md5 - -__author__ = "Alfonso Tierno, alfonso.tiernosepulveda@telefonica.com" -__date__ = "$2018-01-01$" -__version__ = "0.1" -version_date = "Jan 2018" - - -def usage(): - print("Usage: ", sys.argv[0], "[options]") - print(" --version: prints current version") - print(" -f|--file FILE: file to be sent") - print(" -h|--help: shows this help") - print(" -u|--url URL: complete server URL") - print(" -s|--chunk-size SIZE: size of chunks, by default 1000") - print(" -t|--token TOKEN: Authorizaton token, previously obtained from server") - print(" -v|--verbose print debug information, can be used several times") - return - - -if __name__ == "__main__": - try: - # load parameters and configuration - opts, args = getopt.getopt( - sys.argv[1:], - "hvu:s:f:t:", - ["url=", "help", "version", "verbose", "file=", "chunk-size=", "token="], - ) - url = None - chunk_size = 500 - pkg_file = None - verbose = 0 - token = None - - for o, a in opts: - if o == "--version": - print("upload version " + __version__ + " " + version_date) - sys.exit() - elif o in ("-v", "--verbose"): - verbose += 1 - elif o in ("-h", "--help"): - usage() - sys.exit() - elif o in ("-u", "--url"): - url = a - elif o in ("-s", "--chunk-size"): - chunk_size = int(a) - elif o in ("-f", "--file"): - pkg_file = a - elif o in ("-t", "--token"): - token = a - else: - assert False, "Unhandled option" - total_size = getsize(pkg_file) - index = 0 - transaction_id = None - file_md5 = md5() - with open(pkg_file, "rb") as f: - headers = { - "Content-type": "application/gzip", - "Content-Filename": basename(pkg_file), - "Accept": "application/json", - } - if token: - headers["Authorization"] = token - while index < total_size: - chunk_data = f.read(chunk_size) - file_md5.update(chunk_data) - # payload = {"file_name": pkg_file, "chunk_data": base64.b64encode(chunk_data).decode("utf-8"), - # "chunk_size": chunk_size} - if transaction_id: - headers["Transaction-Id"] = transaction_id - if index + len(chunk_data) == total_size: - headers["Content-File-MD5"] = file_md5.hexdigest() - # payload["id"] = transaction_id - headers["Content-range"] = "bytes {}-{}/{}".format( - index, index + len(chunk_data) - 1, total_size - ) - # refers to rfc2616: https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html - if verbose: - print("TX chunk Headers: {}".format(headers)) - r = requests.post(url, data=chunk_data, headers=headers, verify=False) - if r.status_code not in (200, 201): - print("Got {}: {}".format(r.status_code, r.text)) - exit(1) - if verbose > 1: - print("RX {}: {}".format(r.status_code, r.text)) - response = r.json() - if not transaction_id: - transaction_id = response["id"] - index += len(chunk_data) - if verbose <= 1: - print("RX {}: {}".format(r.status_code, r.text)) - if "id" in response: - print("---\nid: {}".format(response["id"])) - except Exception: - raise