Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/logstream/stream_storage/stream_storage_abc.py: 82.50%

80 statements  

« prev     ^ index     » next       coverage.py v6.4.1, created at 2022-06-22 21:04 +0000

1# Copyright (C) 2020 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14""" 

15StreamStorageABC 

16==================== 

17 

18The abstract base class for stream storage providers. 

19""" 

20 

21import abc 

22from typing import Optional, Iterator, NamedTuple 

23from uuid import uuid4 

24 

25from buildgrid._exceptions import OutOfRangeError 

26 

27 

28class StreamHandle(NamedTuple): 

29 name: str 

30 write_resource_name: str 

31 

32 @staticmethod 

33 def _create_new(parent: str = '', prefix: str = '') -> 'StreamHandle': 

34 read_name = f'{parent}/logStreams/{prefix}{str(uuid4())}' 

35 write_name = f'{read_name}/{str(uuid4())}' 

36 return StreamHandle(read_name, write_name) 

37 

38 @staticmethod 

39 def _read_name_from_write_name(write_name: str) -> str: 

40 return write_name[:write_name.rfind('/')] 

41 

42 

43class StreamLength(NamedTuple): 

44 length: int 

45 finished: bool 

46 

47 

48class StreamChunk(NamedTuple): 

49 data: bytes 

50 chunk_length: int 

51 finished: bool 

52 

53 

54class StreamStorageABC(abc.ABC): 

55 def create_stream(self, parent: str, prefix: str) -> StreamHandle: 

56 """ 

57 Creates a stream for a specific parent with a specific prefix and a 

58 random UUID, and returns a StreamHandle namedtuple containing: 

59 name (str): The name of the string for read access 

60 write_resource_name (str): The secret name used only for writing 

61 to the stream 

62 """ 

63 new_stream_handle = StreamHandle._create_new(parent, prefix) 

64 self._create_stream(new_stream_handle) 

65 return new_stream_handle 

66 

67 @abc.abstractmethod 

68 def _create_stream(self, stream_handle: StreamHandle): 

69 """ 

70 Creates a stream with a specific StreamHandle in the StreamStorage 

71 """ 

72 raise NotImplementedError() 

73 

74 @abc.abstractmethod 

75 def stream_exists(self, stream_name: str) -> bool: 

76 """ 

77 Returns True when there is a stream with the given instance/stream_name. 

78 """ 

79 raise NotImplementedError() 

80 

81 @abc.abstractmethod 

82 def stream_finished(self, stream_name: str) -> bool: 

83 """ 

84 Returns True/False depending on whether the stream has been marked 

85 as completed. 

86 

87 Raises: 

88 NotFoundError when a stream with the given instance/stream_name 

89 does not exist. 

90 """ 

91 raise NotImplementedError() 

92 

93 @abc.abstractmethod 

94 def stream_length(self, stream_name: str) -> StreamLength: 

95 """ 

96 Returns a namedtuple of type StreamLength for the stream 

97 with the given instance/stream_name. 

98 length (int): The length of the stream in bytes 

99 finished (bool): A boolean indicating whether the stream has finished 

100 

101 Raises: 

102 NotFoundError when a stream with the given instance/stream_name 

103 does not exist. 

104 """ 

105 raise NotImplementedError() 

106 

107 def writeable_stream_length(self, write_resource_name: str) -> StreamLength: 

108 """ 

109 Returns a namedtuple of type StreamLength for the stream 

110 with the given instance/write_resource_name. 

111 length (int): The length of the stream in bytes 

112 finished (bool): A boolean indicating whether the stream has finished 

113 

114 Raises: 

115 NotFoundError when a stream with the given instance/stream_name 

116 does not exist. 

117 """ 

118 read_name = StreamHandle._read_name_from_write_name(write_resource_name) 

119 return self.stream_length(read_name) 

120 

121 def append_to_stream(self, write_resource_name: str, message: Optional[bytes] = None, *, 

122 mark_finished: Optional[bool] = False) -> None: 

123 """ 

124 Appends the `message` to the stream with the passed `write_resource_name` 

125 as long as the stream has not been marked as finished yet and marks the stream as 

126 finished when the relevant arg is set to True. 

127 

128 Raises: 

129 NotFoundError when a stream with the given instance/write_resource_name 

130 does not exist. 

131 StreamFinishedError when the stream was already marked as finished and 

132 no further writes are allowed. 

133 StorageFullError when the backing storage is full. 

134 WriteError when the write_resource_name is correct but 

135 the write failed for other reasons. 

136 """ 

137 read_name = StreamHandle._read_name_from_write_name( 

138 write_resource_name) 

139 self._append_to_stream(read_name, 

140 write_resource_name, 

141 message, 

142 mark_finished=mark_finished) 

143 

144 @abc.abstractmethod 

145 def _append_to_stream(self, stream_name, write_resource_name: str, message: Optional[bytes] = None, 

146 *, mark_finished: Optional[bool] = False) -> None: 

147 """ 

148 Appends the `message` to the stream with the passed `name` and `write_resource_name` 

149 as long as the stream has not been marked as finished yet and marks the stream as 

150 finished when the relevant arg is set to True. 

151 

152 Raises: 

153 NotFoundError when a stream with the given instance/write_resource_name 

154 does not exist. 

155 StreamFinishedError when the stream was already marked as finished and 

156 no further writes are allowed. 

157 StorageFullError when the backing storage is full. 

158 WriteError when the write_resource_name is correct but 

159 the write failed for other reasons. 

160 """ 

161 raise NotImplementedError() 

162 

163 @abc.abstractmethod 

164 def read_stream_chunk_blocking( 

165 self, stream_name: str, read_offset_bytes: int, 

166 read_limit_bytes: Optional[int] = None) -> StreamChunk: 

167 """ 

168 Returns a commited chunk of the stream message or waits until it is available as follows: 

169 chunk = message[read_offset_bytes, min(read_limit_bytes, finished_stream_length) ) 

170 

171 When the `read_limit_bytes` argument is not set, this method will return the whole message 

172 starting from the specified offset, blocking until the stream finishes. 

173 

174 In cases in which the stream has not finished and the chunk offset and size requested was 

175 not commited yet, this method will block and wait until the chunk is commited and/or 

176 the stream is finished, returning the appropriate chunk. 

177 

178 Raises: 

179 NotFoundError when a stream with the given instance/stream_name 

180 does not exist. 

181 OutOfRangeError when a finished stream does not contain a chunk of data 

182 at the specified offset. 

183 """ 

184 raise NotImplementedError() 

185 

186 @abc.abstractmethod 

187 def read_stream_chunk(self, stream_name: str, read_offset_bytes: int, 

188 read_limit_bytes: Optional[int] = None) -> StreamChunk: 

189 """ 

190 Returns a commited chunk of the stream message or raises if it is not available as follows: 

191 chunk = message[read_offset_bytes, min(read_limit_bytes, finished_stream_length) ) 

192 

193 When the `read_limit_bytes` argument is not set, this method will return the whole message 

194 starting from the specified offest, raising if the stream hasn't finished. 

195 

196 Raises: 

197 NotFoundError when a stream with the given instance/stream_name 

198 does not exist. 

199 StreamWritePendingError when the requested chunk is not fully commited. 

200 OutOfRangeError when a finished stream does not contain a chunk of data 

201 at the specified offset. 

202 """ 

203 raise NotImplementedError() 

204 

205 def read_stream_bytes_blocking_iterator( 

206 self, 

207 stream_name: str, 

208 max_chunk_size: int, 

209 offset: int = 0) -> Iterator[bytes]: 

210 """ 

211 An iterator returning commited chunks of up to `chunk_size` until the end of the stream. 

212 Blocks and waits until the stream finishes. 

213 

214 Can optionally start at a specific `chunk_offset`, which starts streaming data from 

215 (`stream_offset = chunk_offset * chunk_size`) 

216 

217 Raises: 

218 NotFoundError when a stream with the given instance/stream_name 

219 does not exist. 

220 OutOfRangeError when a finished stream does not contain a chunk of data 

221 at the specified offset. 

222 """ 

223 more_to_stream = True 

224 while more_to_stream: 

225 try: 

226 chunk = self.read_stream_chunk_blocking( 

227 stream_name, offset, max_chunk_size) 

228 if chunk.finished: 

229 more_to_stream = False 

230 else: 

231 offset += chunk.chunk_length 

232 yield chunk.data 

233 except OutOfRangeError: 

234 more_to_stream = False 

235 

236 def new_client_streaming(self, stream_name: str): 

237 """ 

238 Inform the StreamStorage backend that a new client 

239 is streaming the stream `stream_name` in case 

240 it cares about the number of clients streaming a specific 

241 stream. 

242 This can be useful for e.g. cleaning up old streams. 

243 No-op by default. 

244 Raises: 

245 NotFoundError when a stream with the given instance/stream_name 

246 does not exist. 

247 """ 

248 pass 

249 

250 def streaming_client_left(self, stream_name: str): 

251 """ 

252 Inform the StreamStorage backend that a client 

253 streaming the stream `stream_name` has left, in case 

254 it cares about the number of clients streaming a specific 

255 stream. 

256 This can be useful for e.g. cleaning up old streams. 

257 No-op by default. 

258 Raises: 

259 NotFoundError when a stream with the given instance/stream_name 

260 does not/no longer exist(s). 

261 """ 

262 pass 

263 

264 def wait_for_streaming_clients(self, write_resource_name: str, timeout: int=300) -> bool: 

265 """ 

266 Returns a boolean specifying if at least one client is streaming this stream 

267 within the specified timeout. 

268 

269 This allows the writers to only start writing when there are readers 

270 interested in the stream. 

271 Note: Accepts `write_resource_name` and calls 

272 `_wait_for_streaming_clients(read_name)` on the backend. 

273 Raises: 

274 NotFoundError when a stream with the given instance/stream_name 

275 does not exist. 

276 """ 

277 read_name = StreamHandle._read_name_from_write_name(write_resource_name) 

278 return self._wait_for_streaming_clients(read_name, timeout) 

279 

280 @abc.abstractmethod 

281 def _wait_for_streaming_clients(self, stream_name: str, timeout: int) -> bool: 

282 """ 

283 Returns a boolean specifying if at least one client is streaming this stream 

284 within the specified timeout. 

285 

286 This allows the writers to only start writing when there are readers 

287 interested in the stream. 

288 Raises: 

289 NotFoundError when a stream with the given instance/stream_name 

290 does not exist. 

291 """ 

292 raise NotImplementedError() 

293 

294 @abc.abstractmethod 

295 def delete_stream(self, write_resource_name): 

296 raise NotImplementedError() 

297 

298 def set_instance_name(self, instance_name: str) -> None: 

299 # This method should always get called, so there's no benefit in 

300 # adding an __init__ to this abstract class (therefore adding the 

301 # need for subclasses to call `super()`) just to define a null 

302 # value for this. 

303 # pylint: disable=attribute-defined-outside-init 

304 self._instance_name: Optional[str] = instance_name