Jinja2 sets autoescape to False, disabling SSL certificate checks, use of unsafe yaml load,
Try-Except-Pass detected, use of assert detected.
Removing openmanoconfig.py.
Change-Id: I3353208e150ae6c2f91befa1a3bbed33ed0c528d
Signed-off-by: aticig <gulsum.atici@canonical.com>
# See the License for the specific language governing permissions and
# limitations under the License.
##
+import logging
version = "8.0.1.post0"
version_date = "2020-06-29"
from pkg_resources import get_distribution
version = get_distribution("osm_ng_ro").version
-except Exception:
- pass
+except Exception as error:
+ logging.exception(f"{error} occured while getting the ro version")
from cryptography.hazmat.primitives.asymmetric import rsa
from jinja2 import (
Environment,
+ select_autoescape,
StrictUndefined,
TemplateError,
TemplateNotFound,
if text_id:
return text_id
- except Exception:
- pass
+ except Exception as error:
+ logging.exception(f"{error} occured while getting process id")
# Return a random id
return "".join(random_choice("0123456789abcdef") for _ in range(12))
def _parse_jinja2(self, cloud_init_content, params, context):
try:
- env = Environment(undefined=StrictUndefined)
+ env = Environment(
+ undefined=StrictUndefined,
+ autoescape=select_autoescape(default_for_string=True, default=True),
+ )
template = env.from_string(cloud_init_content)
return template.render(params or {})
vim_info_info = yaml.safe_load(vim_info["vim_info"])
if vim_info_info.get("name"):
vim_info["name"] = vim_info_info["name"]
- except Exception:
- pass
+ except Exception as vim_info_error:
+ self.logger.exception(
+ f"{vim_info_error} occured while getting the vim_info from yaml"
+ )
except vimconn.VimConnException as e:
# Mark all tasks at VIM_ERROR status
self.logger.error(
flavor_data = task["find_params"]["flavor_data"]
vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
except vimconn.VimConnNotFoundException:
- pass
+ self.logger.exception("VimConnNotFoundException occured.")
if not vim_flavor_id and task.get("params"):
# CREATE
try:
mkdir(file_name)
except FileExistsError:
- pass
+ self.logger.exception(
+ "FileExistsError occured while processing vim_config."
+ )
file_name = file_name + "/ca_cert"
self.logger.info("Unloaded {}".format(target_id))
rmtree("{}:{}".format(target_id, self.worker_index))
except FileNotFoundError:
- pass # this is raised by rmtree if folder does not exist
+ # This is raised by rmtree if folder does not exist.
+ self.logger.exception("FileNotFoundError occured while unloading VIM.")
except Exception as e:
self.logger.error("Cannot unload {}: {}".format(target_id, e))
cherrypy.request.headers.pop("Content-File-MD5", None)
elif "application/yaml" in cherrypy.request.headers["Content-Type"]:
error_text = "Invalid yaml format "
- indata = yaml.load(
- cherrypy.request.body, Loader=yaml.SafeLoader
- )
+ indata = yaml.safe_load(cherrypy.request.body)
cherrypy.request.headers.pop("Content-File-MD5", None)
elif (
"application/binary" in cherrypy.request.headers["Content-Type"]
# "Only 'Content-Type' of type 'application/json' or
# 'application/yaml' for input format are available")
error_text = "Invalid yaml format "
- indata = yaml.load(
- cherrypy.request.body, Loader=yaml.SafeLoader
- )
+ indata = yaml.safe_load(cherrypy.request.body)
cherrypy.request.headers.pop("Content-File-MD5", None)
else:
error_text = "Invalid yaml format "
- indata = yaml.load(cherrypy.request.body, Loader=yaml.SafeLoader)
+ indata = yaml.safe_load(cherrypy.request.body)
cherrypy.request.headers.pop("Content-File-MD5", None)
if not indata:
kwargs[k] = None
elif format_yaml:
try:
- kwargs[k] = yaml.load(v, Loader=yaml.SafeLoader)
- except Exception:
- pass
+ kwargs[k] = yaml.safe_load(v)
+ except Exception as yaml_error:
+ logging.exception(
+ f"{yaml_error} occured while parsing the yaml"
+ )
elif (
k.endswith(".gt")
or k.endswith(".lt")
except Exception:
try:
kwargs[k] = float(v)
- except Exception:
- pass
+ except Exception as keyword_error:
+ logging.exception(
+ f"{keyword_error} occured while getting the keyword arguments"
+ )
elif v.find(",") > 0:
kwargs[k] = v.split(",")
elif isinstance(v, (list, tuple)):
v[index] = None
elif format_yaml:
try:
- v[index] = yaml.load(v[index], Loader=yaml.SafeLoader)
- except Exception:
- pass
+ v[index] = yaml.safe_load(v[index])
+ except Exception as error:
+ logging.exception(
+ f"{error} occured while parsing the yaml"
+ )
return indata
except (ValueError, yaml.YAMLError) as exc:
try:
if cherrypy.request.method == "POST":
- to_send = yaml.load(cherrypy.request.body, Loader=yaml.SafeLoader)
+ to_send = yaml.safe_load(cherrypy.request.body)
for k, v in to_send.items():
self.ns.msg.write(main_topic, k, v)
return_text += " {}: {}\n".format(k, v)
elif cherrypy.request.method == "GET":
for k, v in kwargs.items():
- self.ns.msg.write(
- main_topic, k, yaml.load(v, Loader=yaml.SafeLoader)
- )
- return_text += " {}: {}\n".format(
- k, yaml.load(v, Loader=yaml.SafeLoader)
- )
+ self.ns.msg.write(main_topic, k, yaml.safe_load(v))
+ return_text += " {}: {}\n".format(k, yaml.safe_load(v))
except Exception as e:
return_text += "Error: " + str(e)
elif o in ("-c", "--config"):
config_file = a
else:
- assert False, "Unhandled option"
+ raise ValueError("Unhandled option")
if config_file:
if not path.isfile(config_file):
--- /dev/null
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#######################################################################################
+
+import unittest
+from unittest.mock import Mock, patch
+
+from jinja2 import (
+ Environment,
+ select_autoescape,
+ StrictUndefined,
+ TemplateError,
+ TemplateNotFound,
+ UndefinedError,
+)
+from osm_ng_ro.ns import Ns, NsException
+
+
+class TestNs(unittest.TestCase):
+ def setUp(self):
+ self.ns = Ns()
+
+ @patch("jinja2.Environment.__init__")
+ def test__parse_jinja2_undefined_error(self, env_mock: Mock):
+ cloud_init_content = None
+ params = None
+ context = None
+
+ env_mock.side_effect = UndefinedError("UndefinedError occurred.")
+
+ with self.assertRaises(NsException):
+ self.ns._parse_jinja2(
+ cloud_init_content=cloud_init_content, params=params, context=context
+ )
+
+ @patch("jinja2.Environment.__init__")
+ def test__parse_jinja2_template_error(self, env_mock: Mock):
+ cloud_init_content = None
+ params = None
+ context = None
+
+ env_mock.side_effect = TemplateError("TemplateError occurred.")
+
+ with self.assertRaises(NsException):
+ self.ns._parse_jinja2(
+ cloud_init_content=cloud_init_content, params=params, context=context
+ )
+
+ @patch("jinja2.Environment.__init__")
+ def test__parse_jinja2_template_not_found(self, env_mock: Mock):
+ cloud_init_content = None
+ params = None
+ context = None
+
+ env_mock.side_effect = TemplateNotFound("TemplateNotFound occurred.")
+
+ with self.assertRaises(NsException):
+ self.ns._parse_jinja2(
+ cloud_init_content=cloud_init_content, params=params, context=context
+ )
+
+ def test_rendering_jinja2_temp_without_special_characters(self):
+ cloud_init_content = """
+ disk_setup:
+ ephemeral0:
+ table_type: {{type}}
+ layout: True
+ overwrite: {{is_override}}
+ runcmd:
+ - [ ls, -l, / ]
+ - [ sh, -xc, "echo $(date) '{{command}}'" ]
+ """
+ params = {
+ "type": "mbr",
+ "is_override": "False",
+ "command": "; mkdir abc",
+ }
+ context = "cloud-init for VM"
+ expected_result = """
+ disk_setup:
+ ephemeral0:
+ table_type: mbr
+ layout: True
+ overwrite: False
+ runcmd:
+ - [ ls, -l, / ]
+ - [ sh, -xc, "echo $(date) '; mkdir abc'" ]
+ """
+ result = self.ns._parse_jinja2(
+ cloud_init_content=cloud_init_content, params=params, context=context
+ )
+ self.assertEqual(result, expected_result)
+
+ def test_rendering_jinja2_temp_with_special_characters(self):
+ cloud_init_content = """
+ disk_setup:
+ ephemeral0:
+ table_type: {{type}}
+ layout: True
+ overwrite: {{is_override}}
+ runcmd:
+ - [ ls, -l, / ]
+ - [ sh, -xc, "echo $(date) '{{command}}'" ]
+ """
+ params = {
+ "type": "mbr",
+ "is_override": "False",
+ "command": "& rm -rf",
+ }
+ context = "cloud-init for VM"
+ expected_result = """
+ disk_setup:
+ ephemeral0:
+ table_type: mbr
+ layout: True
+ overwrite: False
+ runcmd:
+ - [ ls, -l, / ]
+ - [ sh, -xc, "echo $(date) '& rm -rf /'" ]
+ """
+ result = self.ns._parse_jinja2(
+ cloud_init_content=cloud_init_content, params=params, context=context
+ )
+ self.assertNotEqual(result, expected_result)
+
+ def test_rendering_jinja2_temp_with_special_characters_autoescape_is_false(self):
+ with patch("osm_ng_ro.ns.Environment") as mock_environment:
+ mock_environment.return_value = Environment(
+ undefined=StrictUndefined,
+ autoescape=select_autoescape(default_for_string=False, default=False),
+ )
+ cloud_init_content = """
+ disk_setup:
+ ephemeral0:
+ table_type: {{type}}
+ layout: True
+ overwrite: {{is_override}}
+ runcmd:
+ - [ ls, -l, / ]
+ - [ sh, -xc, "echo $(date) '{{command}}'" ]
+ """
+ params = {
+ "type": "mbr",
+ "is_override": "False",
+ "command": "& rm -rf /",
+ }
+ context = "cloud-init for VM"
+ expected_result = """
+ disk_setup:
+ ephemeral0:
+ table_type: mbr
+ layout: True
+ overwrite: False
+ runcmd:
+ - [ ls, -l, / ]
+ - [ sh, -xc, "echo $(date) '& rm -rf /'" ]
+ """
+ result = self.ns._parse_jinja2(
+ cloud_init_content=cloud_init_content,
+ params=params,
+ context=context,
+ )
+ self.assertEqual(result, expected_result)
self.logger.error("renew_locks task exception: {}".format(exc))
self.aiomain_task_renew_lock = None
except asyncio.CancelledError:
- pass
+ self.logger.exception("asyncio.CancelledError occured.")
except Exception as e:
if self.to_terminate:
try:
self.__addMetadata(s_uid, service_type, s_connInf["vlan_id"])
- except Exception:
- pass
+ except Exception as e:
+ self.logger.exception(f"{e} occured.")
return (s_uid, s_connInf)
except CvpLoginError as e:
self.logger.error(
"Error removing configlets from device {}: {}".format(s, e)
)
- pass
for s in self.switches:
if allLeafConfigured[s]:
found_in_cvp = True
except CvpApiError as error:
if "Entity does not exist" in error.msg:
- pass
+ self.logger.exception(f"{error} occured.")
else:
raise error
found_in_cvp = True
except CvpApiError as error:
if "Entity does not exist" in error.msg:
- pass
+ self.logger.exception(f"{error} occured.")
else:
raise error
service_uuid, str(e)
)
)
- pass
def __removeMetadata(self, service_uuid):
"""Removes the connectivity service from 'OSM_metadata' configLet"""
found_in_cvp = True
except CvpApiError as error:
if "Entity does not exist" in error.msg:
- pass
+ self.logger.exception(f"{error} occured.")
else:
raise error
service_uuid, str(e)
)
)
- pass
def edit_connectivity_service(
self, service_uuid, conn_info=None, connection_points=None, **kwargs
found_in_cvp = True
except CvpApiError as error:
if "Entity does not exist" in error.msg:
- pass
+ self.logger.exception(f"{error} occured.")
else:
raise error
found_in_cvp = True
except CvpApiError as error:
if "Entity does not exist" in error.msg:
- pass
+ self.logger.exception(f"{error} occured.")
else:
raise error
found_in_cvp = True
except CvpApiError as error:
if "Entity does not exist" in error.msg:
- pass
+ self.logger.exception(f"{error} occured.")
else:
raise error
try:
if flavor_data[0] == "@": # read from a file
with open(flavor_data[1:], "r") as stream:
- self.flavor_info = yaml.load(stream, Loader=yaml.Loader)
+ self.flavor_info = yaml.safe_load(stream)
else:
- self.flavor_info = yaml.load(flavor_data, Loader=yaml.Loader)
+ self.flavor_info = yaml.safe_load(flavor_data)
except yaml.YAMLError as e:
self.flavor_info = None
return
except CloudError as e:
if e.error.error and "notfound" in e.error.error.lower():
- pass
+ self.logger.exception("CloudError Exception occured.")
# continue and create it
else:
self._format_vimconn_exception(e)
try:
self.of_connector.del_flow(flow_id)
except OpenflowConnNotFoundException:
- pass
+ self.logger.exception("OpenflowConnNotFoundException occured.")
except OpenflowConnException as e:
error_text = "Cannot remove rule '{}': {}".format(flow_id, e)
error_list.append(error_text)
+++ /dev/null
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-##
-# Copyright 2015 Telefonica Investigacion y Desarrollo, S.A.U.
-# This file is part of openmano
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-#
-# For those usages not covered by the Apache License, Version 2.0 please
-# contact with: nfvlabs@tid.es
-##
-
-
-"""
-Read openmanod.cfg file and creates envioronment variables for openmano client
-Call it wusing execution quotes, or copy paste the output to set your shell envioronment
-It read database to look for a ninc tenant / datacenter
-"""
-
-from __future__ import print_function
-from os import environ
-from openmanod import load_configuration
-#from socket import gethostname
-from db_base import db_base_Exception
-import nfvo_db
-import getopt
-import sys
-
-
-__author__="Alfonso Tierno, Gerardo Garcia, Pablo Montes"
-__date__ ="$26-aug-2014 11:09:29$"
-__version__="0.0.1-r509"
-version_date="Oct 2016"
-database_version="0.16" #expected database schema version
-
-
-def usage():
- print("Usage: ", sys.argv[0], "[options]")
- print(" -v|--version: prints current version")
- print(" -c|--config [configuration_file]: loads the configuration file (default: openmanod.cfg)")
- print(" -h|--help: shows this help")
- return
-
-
-if __name__ == "__main__":
- # Read parameters and configuration file
- try:
- # load parameters and configuration
- opts, args = getopt.getopt(sys.argv[1:], "vhc:",
- ["config=", "help", "version"])
- config_file = 'openmanod.cfg'
-
- for o, a in opts:
- if o in ("-v", "--version"):
- print("openmanoconfig.py version " + __version__ + ' ' + version_date)
- print("(c) Copyright Telefonica")
- exit()
- elif o in ("-h", "--help"):
- usage()
- exit()
- elif o in ("-c", "--config"):
- config_file = a
- else:
- assert False, "Unhandled option"
- global_config = load_configuration(config_file)
- if global_config["http_host"] == "0.0.0.0":
- global_config["http_host"] = "localhost" #gethostname()
- environ["OPENMANO_HOST"]=global_config["http_host"]
- print("export OPENMANO_HOST='{}'".format(global_config["http_host"]))
- environ["OPENMANO_PORT"] = str(global_config["http_port"])
- print("export OPENMANO_PORT={}".format(global_config["http_port"]))
-
- mydb = nfvo_db.nfvo_db();
- mydb.connect(global_config['db_host'], global_config['db_user'], global_config['db_passwd'], global_config['db_name'])
- try:
- tenants = mydb.get_rows(FROM="nfvo_tenants")
- if not tenants:
- print("#No tenant found", file=sys.stderr)
- elif len(tenants) > 1:
- print("#Found several tenants export OPENMANO_TENANT=", file=sys.stderr, end="")
- for tenant in tenants:
- print(" '{}'".format(tenant["name"]), file=sys.stderr, end="")
- print("")
- else:
- environ["OPENMANO_TENANT"] = tenants[0]["name"]
- print("export OPENMANO_TENANT='{}'".format(tenants[0]["name"]))
-
- dcs = mydb.get_rows(FROM="datacenters")
- if not dcs:
- print("#No datacenter found", file=sys.stderr)
- elif len(dcs) > 1:
- print("#Found several datacenters export OPENMANO_DATACENTER=", file=sys.stderr, end="")
- for dc in dcs:
- print(" '{}'".format(dc["name"]), file=sys.stderr, end="")
- print("")
- else:
- environ["OPENMANO_DATACENTER"] = dcs[0]["name"]
- print("export OPENMANO_DATACENTER='{}'".format(dcs[0]["name"]))
-
- except db_base_Exception as e:
- print("#DATABASE is not a MANO one or it is a '0.0' version. Try to upgrade to version '{}' with \
- './database_utils/migrate_mano_db.sh'".format(database_version), file=sys.stderr)
- exit(-1)
-
-
-
- except db_base_Exception as e:
- print("#"+str(e), file=sys.stderr)
- exit(-1)
-
- except SystemExit:
- pass
--- /dev/null
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#######################################################################################
+---
+security:
+ - |
+ Fixing following RO security vulnerabilities. Improper Certificate Validation, jinja2 sets autoescape to False,
+ disabling SSL certificate checks, use of unsafe yaml load, try-except-pass detected, use of assert detected.
+