Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/persistence/sql/models.py: 97.39%

115 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-06-11 15:37 +0000

1# Copyright (C) 2019 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14import datetime 

15from typing import List, Optional 

16 

17from sqlalchemy import ( 

18 BigInteger, 

19 Boolean, 

20 Column, 

21 DateTime, 

22 ForeignKey, 

23 Index, 

24 Integer, 

25 LargeBinary, 

26 String, 

27 Table, 

28 UniqueConstraint, 

29 false, 

30) 

31from sqlalchemy.ext.declarative import declarative_base 

32from sqlalchemy.orm import relationship 

33 

34from buildgrid._enums import LeaseState 

35from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action, Digest 

36from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2 

37 

38Base = declarative_base() 

39 

40 

41job_platform_association = Table( 

42 "job_platforms", 

43 Base.metadata, 

44 Column("job_name", ForeignKey("jobs.name", ondelete="CASCADE", onupdate="CASCADE"), primary_key=True), 

45 Column("platform_id", ForeignKey("platform_properties.id"), primary_key=True), 

46) 

47 

48 

49class PlatformEntry(Base): 

50 __tablename__ = "platform_properties" 

51 __table_args__ = (UniqueConstraint("key", "value"),) 

52 

53 id: int = Column(Integer, autoincrement=True, primary_key=True) 

54 key: str = Column(String) 

55 value: str = Column(String) 

56 

57 jobs: List["JobEntry"] = relationship("JobEntry", secondary=job_platform_association, back_populates="platform") 

58 

59 

60class JobEntry(Base): 

61 __tablename__ = "jobs" 

62 

63 name: str = Column(String, primary_key=True) 

64 instance_name: Optional[str] = Column(String, index=True) 

65 action_digest: str = Column(String, index=True, nullable=False) 

66 action: bytes = Column(LargeBinary) 

67 priority: int = Column(Integer, default=1, index=True, nullable=False) 

68 stage: int = Column(Integer, default=0, index=True, nullable=False) 

69 do_not_cache: bool = Column(Boolean, default=False, nullable=False) 

70 cancelled: bool = Column(Boolean, default=False, nullable=False) 

71 queued_timestamp: Optional[datetime.datetime] = Column(DateTime, index=True) 

72 queued_time_duration: Optional[int] = Column(Integer) 

73 worker_start_timestamp: Optional[datetime.datetime] = Column(DateTime) 

74 worker_completed_timestamp: Optional[datetime.datetime] = Column(DateTime) 

75 result: Optional[str] = Column(String) 

76 assigned: Optional[bool] = Column(Boolean, default=False) 

77 n_tries: Optional[int] = Column(Integer, default=0) 

78 # This is a hash of the platform properties, used for matching jobs to workers 

79 platform_requirements: Optional[str] = Column(String, nullable=True) 

80 status_code: Optional[int] = Column(Integer, nullable=True) 

81 stdout_stream_name: Optional[str] = Column(String, nullable=True) 

82 stdout_stream_write_name: Optional[str] = Column(String, nullable=True) 

83 stderr_stream_name: Optional[str] = Column(String, nullable=True) 

84 stderr_stream_write_name: Optional[str] = Column(String, nullable=True) 

85 command: Optional[str] = Column(String, nullable=True) 

86 

87 leases: List["LeaseEntry"] = relationship("LeaseEntry", backref="job") 

88 active_states: List[int] = [ 

89 LeaseState.UNSPECIFIED.value, 

90 LeaseState.PENDING.value, 

91 LeaseState.ACTIVE.value, 

92 LeaseState.CANCELLED.value, 

93 ] 

94 active_leases: List["LeaseEntry"] = relationship( 

95 "LeaseEntry", 

96 primaryjoin=f"and_(LeaseEntry.job_name==JobEntry.name, LeaseEntry.state.in_({active_states}))", 

97 order_by="LeaseEntry.id.desc()", 

98 overlaps="job,leases", 

99 ) 

100 

101 operations: List["OperationEntry"] = relationship("OperationEntry", backref="job") 

102 

103 platform: List["PlatformEntry"] = relationship( 

104 "PlatformEntry", secondary=job_platform_association, back_populates="jobs" 

105 ) 

106 

107 __table_args__ = ( 

108 Index( 

109 "ix_worker_start_timestamp", 

110 worker_start_timestamp, 

111 unique=False, 

112 postgresql_where=worker_start_timestamp.isnot(None), 

113 sqlite_where=worker_start_timestamp.isnot(None), 

114 ), 

115 Index( 

116 "ix_worker_completed_timestamp", 

117 worker_completed_timestamp, 

118 unique=False, 

119 postgresql_where=worker_completed_timestamp.isnot(None), 

120 sqlite_where=worker_completed_timestamp.isnot(None), 

121 ), 

122 ) 

123 

124 

125class LeaseEntry(Base): 

126 __tablename__ = "leases" 

127 

128 job: JobEntry 

129 id: int = Column(Integer, primary_key=True) 

130 job_name: str = Column( 

131 String, ForeignKey("jobs.name", ondelete="CASCADE", onupdate="CASCADE"), index=True, nullable=False 

132 ) 

133 status: Optional[int] = Column(Integer) 

134 state: int = Column(Integer, nullable=False) 

135 worker_name: Optional[str] = Column(String, index=True, default=None) 

136 

137 def to_protobuf(self) -> bots_pb2.Lease: 

138 lease = bots_pb2.Lease() 

139 lease.id = self.job_name 

140 

141 if self.job.action is not None: 

142 action = Action() 

143 action.ParseFromString(self.job.action) 

144 lease.payload.Pack(action) 

145 else: 

146 lease.payload.Pack(string_to_digest(self.job.action_digest)) 

147 

148 lease.state = self.state # type: ignore[assignment] 

149 if self.status is not None: 

150 lease.status.code = self.status 

151 return lease 

152 

153 

154class ClientIdentityEntry(Base): 

155 __tablename__ = "client_identities" 

156 __table_args__ = (UniqueConstraint("instance", "workflow", "actor", "subject"),) 

157 

158 id: int = Column(Integer, primary_key=True, autoincrement=True) 

159 instance: str = Column(String, nullable=False) 

160 workflow: str = Column(String, nullable=False) 

161 actor: str = Column(String, nullable=False) 

162 subject: str = Column(String, nullable=False) 

163 

164 def __str__(self) -> str: 

165 return ( 

166 f"ClientIdentity: [instance={self.instance} workflow={self.workflow}" 

167 f" actor={self.actor} subject={self.subject}]" 

168 ) 

169 

170 

171class OperationEntry(Base): 

172 __tablename__ = "operations" 

173 

174 job: JobEntry 

175 name: str = Column(String, primary_key=True) 

176 job_name: str = Column( 

177 String, ForeignKey("jobs.name", ondelete="CASCADE", onupdate="CASCADE"), index=True, nullable=False 

178 ) 

179 cancelled: bool = Column(Boolean, default=False, nullable=False) 

180 tool_name: Optional[str] = Column(String, nullable=True) 

181 tool_version: Optional[str] = Column(String, nullable=True) 

182 invocation_id: Optional[str] = Column(String, nullable=True) 

183 correlated_invocations_id: Optional[str] = Column(String, nullable=True) 

184 client_identity_id: Optional[int] = Column(Integer, ForeignKey("client_identities.id"), nullable=True) 

185 client_identity: Optional[ClientIdentityEntry] = relationship("ClientIdentityEntry") 

186 

187 

188class IndexEntry(Base): 

189 __tablename__ = "index" 

190 

191 digest_hash: str = Column(String, nullable=False, index=True, primary_key=True) 

192 digest_size_bytes: int = Column(BigInteger, nullable=False) 

193 accessed_timestamp: datetime.datetime = Column(DateTime, index=True, nullable=False) 

194 deleted: bool = Column(Boolean, nullable=False, server_default=false()) 

195 inline_blob: Optional[bytes] = Column(LargeBinary, nullable=True) 

196 

197 

198# This table is used to store the bot session state. It also stores the 

199# assigned leases, instead of making use of the 'leases' table through an 

200# SQLAlchemy relationship, as the 'leases' table is dependent on the type of 

201# data store selected, and might never be populated. 

202class BotEntry(Base): 

203 __tablename__ = "bots" 

204 

205 name: str = Column(String, nullable=False, index=True, primary_key=True) 

206 bot_id: str = Column(String, nullable=False, index=True) 

207 last_update_timestamp: datetime.datetime = Column(DateTime, index=True, nullable=False) 

208 bot_status: int = Column(Integer, nullable=False) 

209 expiry_time: Optional[datetime.datetime] = Column(DateTime, index=True, nullable=True) 

210 lease_id: Optional[str] = Column(String, nullable=True) 

211 instance_name: Optional[str] = Column(String, nullable=True) 

212 

213 

214# This table is used by the SQLStorage CAS backend to store blobs 

215# in a database. 

216class BlobEntry(Base): 

217 __tablename__ = "blobs" 

218 

219 digest_hash: str = Column(String, primary_key=True) 

220 digest_size_bytes: int = Column(BigInteger, nullable=False) 

221 data: bytes = Column(LargeBinary, nullable=False) 

222 

223 

224def digest_to_string(digest: Digest) -> str: 

225 return f"{digest.hash}/{digest.size_bytes}" 

226 

227 

228def string_to_digest(string: str) -> Digest: 

229 digest_hash, size_bytes = string.split("/", 1) 

230 return Digest(hash=digest_hash, size_bytes=int(size_bytes))