3 # Copyright 2016 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
19 @author Varun Prasad (varun.prasad@riftio.com)
33 gi
.require_version("RwStagingMgmtYang", "1.0")
34 from gi
.repository
import RwStagingMgmtYang
35 import rift
.mano
.dts
as mano_dts
38 from ..protocol
import StagingStorePublisherProtocol
41 class StagingAreaExists(Exception):
44 class InvalidStagingArea(Exception):
47 class StagingStructureError(Exception):
50 class StagingFileStore(StagingStorePublisherProtocol
):
51 """File based store for creating and managing staging areas.
53 META_YAML
= "meta.yaml"
54 DEFAULT_EXPIRY
= 60 * 60
56 def __init__(self
, tasklet
, root_dir
=None):
57 default_path
= os
.path
.join(
58 os
.getenv('RIFT_ARTIFACTS'),
61 self
.root_dir
= root_dir
or default_path
63 if not os
.path
.isdir(self
.root_dir
):
64 os
.makedirs(self
.root_dir
)
66 self
.log
= tasklet
.log
67 self
.tmp_dir
= tempfile
.mkdtemp(dir=self
.root_dir
)
70 self
.tasklet
= tasklet
72 def on_recovery(self
, staging_areas
):
73 for area
in staging_areas
:
74 staging_area
= model
.StagingArea(area
)
75 self
._cache
[area
.area_id
] = staging_area
78 def get_staging_area(self
, area_id
):
79 if area_id
not in self
._cache
:
80 raise InvalidStagingArea
82 return self
._cache
[area_id
]
85 def get_delegate(self
, msg
):
87 proj
= self
.tasklet
.projects
[msg
.project_name
]
88 except Exception as e
:
89 err
= "Project or project name not found {}: {}". \
90 format(msg
.as_dict(), e
)
96 def create_staging_area(self
, staging_area_config
):
97 """Create the staging area
99 staging_area_config (YangInput_RwStagingMgmt_CreateStagingArea): Rpc input
105 StagingAreaExists: if the staging area already exists
107 delegate
= self
.get_delegate(staging_area_config
)
109 area_id
= str(uuid
.uuid4())
111 container_path
= os
.path
.join(self
.root_dir
, str(area_id
))
112 meta_path
= os
.path
.join(container_path
, self
.META_YAML
)
114 if os
.path
.exists(container_path
):
115 raise StagingAreaExists
118 os
.makedirs(container_path
)
120 config_dict
= staging_area_config
.as_dict()
123 "created_time": time
.time(),
125 "path": container_path
128 staging_area
= RwStagingMgmtYang
.StagingArea
.from_dict(config_dict
)
129 staging_area
= model
.StagingArea(staging_area
)
131 self
._cache
[area_id
] = staging_area
135 delegate
.on_staging_area_create(staging_area
.model
)
136 except Exception as e
:
137 self
.log
.exception(e
)
141 def remove_staging_area(self
, staging_area
):
142 """Delete the staging area
144 staging_area (str or model.StagingArea): Staging ID or the
147 delegate
= self
.get_delegate(staging_area_config
)
149 if type(staging_area
) is str:
150 staging_area
= self
.get_staging_area(staging_area
)
152 if os
.path
.isdir(staging_area
.model
.path
):
153 shutil
.rmtree(staging_area
.model
.path
)
155 staging_area
.model
.status
= "EXPIRED"
159 delegate
.on_staging_area_delete(staging_area
.model
)
160 except Exception as e
:
161 self
.log
.exception(e
)