"""
client = self.clients[CUSTOM_OBJECT_CLIENT]
try:
- client.create_namespaced_custom_object(
- group=api_group,
- plural=api_plural,
- version=api_version,
- body=manifest_dict,
- namespace=namespace,
- )
+ if namespace:
+ client.create_namespaced_custom_object(
+ group=api_group,
+ plural=api_plural,
+ version=api_version,
+ body=manifest_dict,
+ namespace=namespace,
+ )
+ else:
+ client.create_cluster_custom_object(
+ group=api_group,
+ plural=api_plural,
+ version=api_version,
+ body=manifest_dict,
+ )
except ApiException as e:
info = json.loads(e.body)
if info.get("reason").lower() == "alreadyexists":
"""
client = self.clients[CUSTOM_OBJECT_CLIENT]
try:
- client.delete_namespaced_custom_object(
- group=api_group,
- plural=api_plural,
- version=api_version,
- name=name,
- namespace=namespace,
- )
+ if namespace:
+ client.delete_namespaced_custom_object(
+ group=api_group,
+ plural=api_plural,
+ version=api_version,
+ name=name,
+ namespace=namespace,
+ )
+ else:
+ client.delete_cluster_custom_object(
+ group=api_group,
+ plural=api_plural,
+ version=api_version,
+ name=name,
+ )
except ApiException as e:
info = json.loads(e.body)
if info.get("reason").lower() == "notfound":
"""
client = self.clients[CUSTOM_OBJECT_CLIENT]
try:
- object_dict = client.list_namespaced_custom_object(
- group=api_group,
- plural=api_plural,
- version=api_version,
- namespace=namespace,
- field_selector=f"metadata.name={name}",
- )
+ if namespace:
+ object_dict = client.list_namespaced_custom_object(
+ group=api_group,
+ plural=api_plural,
+ version=api_version,
+ namespace=namespace,
+ field_selector=f"metadata.name={name}",
+ )
+ else:
+ object_dict = client.list_cluster_custom_object(
+ group=api_group,
+ plural=api_plural,
+ version=api_version,
+ field_selector=f"metadata.name={name}",
+ )
if len(object_dict.get("items")) == 0:
return None
return object_dict.get("items")[0]
except ApiException as e:
+ self.logger.debug(f"Exception: {e}")
info = json.loads(e.body)
if info.get("reason").lower() == "notfound":
self.logger.warning("Cannot get custom object: {}".format(e))
+ return None
else:
raise e
"""
client = self.clients[CUSTOM_OBJECT_CLIENT]
try:
- object_dict = client.list_namespaced_custom_object(
- group=api_group,
- plural=api_plural,
- version=api_version,
- namespace=namespace,
- )
+ if namespace:
+ object_dict = client.list_namespaced_custom_object(
+ group=api_group,
+ plural=api_plural,
+ version=api_version,
+ namespace=namespace,
+ )
+ else:
+ object_dict = client.list_cluster_custom_object(
+ group=api_group,
+ plural=api_plural,
+ version=api_version,
+ )
self.logger.debug(f"Object-list: {object_dict.get('items')}")
return object_dict.get("items")
except ApiException as e:
+ self.logger.debug(f"Exception: {e}")
info = json.loads(e.body)
if info.get("reason").lower() == "notfound":
self.logger.warning(
- "Cannot retrieve list of custom objects: {}".format(e)
+ "Cannot find specified custom objects: {}".format(e)
)
+ return []
else:
raise e