Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/auth/config.py: 96.67%

30 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-10-04 17:48 +0000

1# Copyright (C) 2023 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15import os 

16import re 

17from typing import Dict, List, Literal, Optional, Union 

18 

19import yaml 

20from pydantic import BaseModel, Field 

21 

22 

23class Acl(BaseModel): 

24 actor: Optional[str] = Field(default=None) 

25 requests: Optional[List[str]] = Field(default=None) 

26 subject: Optional[str] = Field(default=None) 

27 workflow: Optional[str] = Field(default=None) 

28 

29 def is_authorized( 

30 self, 

31 request_name: str, 

32 actor: Optional[str] = None, 

33 subject: Optional[str] = None, 

34 workflow: Optional[str] = None, 

35 ) -> bool: 

36 if self.actor is not None and not re.match(self.actor, actor or ""): 

37 return False 

38 

39 if self.subject is not None and not re.match(self.subject, subject or ""): 

40 return False 

41 

42 if self.workflow is not None and not re.match(self.workflow, workflow or ""): 

43 return False 

44 

45 if self.requests is not None and request_name not in self.requests: 

46 return False 

47 

48 return True 

49 

50 

51class InstanceAuthorizationConfig(BaseModel): 

52 allow: Union[Literal["all"], List[Acl]] 

53 

54 def is_authorized( 

55 self, 

56 request_name: str, 

57 actor: Optional[str] = None, 

58 subject: Optional[str] = None, 

59 workflow: Optional[str] = None, 

60 ) -> bool: 

61 if self.allow == "all": 

62 return True 

63 

64 return any(acl.is_authorized(request_name, actor, subject, workflow) for acl in self.allow) 

65 

66 

67def parse_auth_config(path: Union[str, bytes, "os.PathLike[str]"]) -> Dict[str, InstanceAuthorizationConfig]: 

68 with open(path) as config_file: 

69 config = yaml.safe_load(config_file) 

70 return { 

71 instance_name: InstanceAuthorizationConfig(**instance_config) 

72 for instance_name, instance_config in config.items() 

73 }