Coverity-CWE 295: Improper Certificate Validation
[osm/NBI.git] / osm_nbi / descriptor_topics.py
index 2814653..d986963 100644 (file)
@@ -20,6 +20,7 @@ import copy
 import os
 import shutil
 import functools
+import re
 
 # import logging
 from deepdiff import DeepDiff
@@ -51,10 +52,14 @@ from osm_nbi import utils
 
 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
 
+valid_helm_chart_re = re.compile(
+    r"^[a-z0-9]([-a-z0-9]*[a-z0-9]/)?([a-z0-9]([-a-z0-9]*[a-z0-9])?)*$"
+)
+
 
 class DescriptorTopic(BaseTopic):
     def __init__(self, db, fs, msg, auth):
-        BaseTopic.__init__(self, db, fs, msg, auth)
+        super().__init__(db, fs, msg, auth)
 
     def _validate_input_new(self, indata, storage_params, force=False):
         return indata
@@ -437,7 +442,7 @@ class DescriptorTopic(BaseTopic):
                 indata = json.load(content)
             else:
                 error_text = "Invalid yaml format "
-                indata = yaml.load(content, Loader=yaml.SafeLoader)
+                indata = yaml.safe_load(content)
 
             # Need to close the file package here so it can be copied from the
             # revision to the current, unrevisioned record
@@ -847,9 +852,23 @@ class VnfdTopic(DescriptorTopic):
         self.validate_internal_virtual_links(indata)
         self.validate_monitoring_params(indata)
         self.validate_scaling_group_descriptor(indata)
+        self.validate_helm_chart(indata)
 
         return indata
 
+    @staticmethod
+    def validate_helm_chart(indata):
+        kdus = indata.get("kdu", [])
+        for kdu in kdus:
+            helm_chart_value = kdu.get("helm-chart")
+            if not helm_chart_value:
+                continue
+            if not valid_helm_chart_re.match(helm_chart_value):
+                raise EngineException(
+                    "helm-chart '{}' is not valid".format(helm_chart_value),
+                    http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+                )
+
     @staticmethod
     def validate_mgmt_interface_connection_point(indata):
         if not indata.get("vdu"):
@@ -1323,11 +1342,9 @@ class VnfdTopic(DescriptorTopic):
             with self.fs.file_open(
                 (old_descriptor_directory.rstrip("/"), descriptor_file_name), "r"
             ) as old_descriptor_file:
-
                 with self.fs.file_open(
                     (new_descriptor_directory.rstrip("/"), descriptor_file_name), "r"
                 ) as new_descriptor_file:
-
                     old_content = yaml.safe_load(old_descriptor_file.read())
                     new_content = yaml.safe_load(new_descriptor_file.read())
 
@@ -1377,7 +1394,7 @@ class NsdTopic(DescriptorTopic):
     topic_msg = "nsd"
 
     def __init__(self, db, fs, msg, auth):
-        DescriptorTopic.__init__(self, db, fs, msg, auth)
+        super().__init__(db, fs, msg, auth)
 
     def pyangbind_validation(self, item, data, force=False):
         if self._descriptor_data_is_in_old_format(data):
@@ -1719,11 +1736,9 @@ class NsdTopic(DescriptorTopic):
             with self.fs.file_open(
                 (old_descriptor_directory.rstrip("/"), descriptor_file_name), "r"
             ) as old_descriptor_file:
-
                 with self.fs.file_open(
                     (new_descriptor_directory.rstrip("/"), descriptor_file_name), "r"
                 ) as new_descriptor_file:
-
                     old_content = yaml.safe_load(old_descriptor_file.read())
                     new_content = yaml.safe_load(new_descriptor_file.read())