Coverage for sources/mimeogram/fsprotect/cache.py: 95%

96 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-05 19:15 +0000

1# vim: set filetype=python fileencoding=utf-8: 

2# -*- coding: utf-8 -*- 

3 

4#============================================================================# 

5# # 

6# Licensed under the Apache License, Version 2.0 (the "License"); # 

7# you may not use this file except in compliance with the License. # 

8# You may obtain a copy of the License at # 

9# # 

10# http://www.apache.org/licenses/LICENSE-2.0 # 

11# # 

12# Unless required by applicable law or agreed to in writing, software # 

13# distributed under the License is distributed on an "AS IS" BASIS, # 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

15# See the License for the specific language governing permissions and # 

16# limitations under the License. # 

17# # 

18#============================================================================# 

19 

20 

21''' Cache for filesystem protection checks. ''' 

22 

23 

24from . import __ 

25from . import core as _core 

26 

27 

28_scribe = __.produce_scribe( __name__ ) 

29 

30 

31class Rule( __.immut.DataclassObject ): 

32 ''' Rule for path protection. ''' 

33 

34 paths: frozenset[ __.Path ] 

35 patterns: frozenset[ str ] = frozenset( ) 

36 

37 

38class Cache( _core.Protector ): 

39 ''' Cache of protected paths and patterns for platform. ''' 

40 

41 rules: dict[ _core.Reasons, Rule ] 

42 defaults_disablement: frozenset[ str ] 

43 rules_supercession: __.immut.Dictionary[ 

44 __.Path, tuple[ frozenset[ str ], frozenset[ str ] ] ] 

45 

46 @classmethod 

47 def from_configuration( selfclass, auxdata: __.Globals ) -> __.typx.Self: 

48 ''' Initializes protection cache for current platform. ''' 

49 _scribe.debug( 'Initializing protection cache.' ) 

50 rules: dict[ _core.Reasons, Rule ] = { } 

51 discover_platform_locations( auxdata, rules ) 

52 provide_credentials_locations( rules ) 

53 provide_project_locations( rules ) 

54 disables, supercedes = _process_configuration( auxdata, rules ) 

55 return selfclass( 

56 rules = rules, 

57 defaults_disablement = disables, 

58 rules_supercession = supercedes ) 

59 

60 def verify( self, path: __.Path ) -> _core.Status: 

61 ''' Verifies if a path should be protected using cached data. ''' 

62 path = _normalize_path( path ) 

63 _scribe.debug( f"Path: {path}" ) 

64 

65 if any( part in self.defaults_disablement for part in path.parts ): 

66 return _core.Status( path = path, active = False ) 

67 

68 for dir_path, ( ignore, protect ) in self.rules_supercession.items( ): 

69 dir_path_ = _normalize_path( dir_path ) 

70 if not path.is_relative_to( dir_path_ ): continue 

71 rel_path = path.relative_to( dir_path_ ) 

72 if _check_path_patterns( rel_path, ignore ): 

73 return _core.Status( path = path, active = False ) 

74 if _check_path_patterns( rel_path, protect ): 

75 return _core.Status( 

76 path = path, 

77 reason = _core.Reasons.PlatformSensitive, 

78 active = True ) 

79 

80 for reason, rule in self.rules.items( ): 

81 for protected_path in rule.paths: 

82 protected_path_ = _normalize_path( protected_path ) 

83 if path.is_relative_to( protected_path_ ): 

84 return _core.Status( 

85 path = path, 

86 reason = reason, 

87 active = True ) 

88 if _check_path_patterns( path, rule.patterns ): 

89 return _core.Status( 

90 path = path, 

91 reason = reason, 

92 active = True ) 

93 

94 return _core.Status( path = path, active = False ) 

95 

96 

97def provide_credentials_locations( 

98 rules: dict[ _core.Reasons, Rule ] 

99) -> None: 

100 ''' Provides common locations for credentials and other secrets. ''' 

101 from . import home as module 

102 home = __.Path.home( ) 

103 cred_paths = { 

104 home / path for path in module.discover_sensitive_locations( ) } 

105 rules[ _core.Reasons.Credentials ] = Rule( 

106 paths = frozenset( cred_paths ) ) 

107 

108 

109def provide_project_locations( 

110 rules: dict[ _core.Reasons, Rule ] 

111) -> None: 

112 ''' Provides IDE and VCS locations relative to project. ''' 

113 from . import project as module 

114 project_sensitive = module.discover_sensitive_locations( ) 

115 # TODO: Consider whether these patterns are compatible with Windows. 

116 rules[ _core.Reasons.Concealment ] = Rule( 

117 paths = frozenset( ), 

118 patterns = frozenset( 

119 f"**/{path}/**" for path in project_sensitive ) ) 

120 

121 

122def _check_path_patterns( path: __.Path, patterns: frozenset[ str ] ) -> bool: 

123 ''' Checks if path matches any of the glob patterns. ''' 

124 from wcmatch import glob 

125 str_path = str( path ) 

126 return glob.globmatch( str_path, list( patterns ), flags = glob.GLOBSTAR ) 

127 

128 

129def discover_platform_locations( 

130 auxdata: __.Globals, rules: dict[ _core.Reasons, Rule ] 

131) -> None: 

132 ''' Discovers system and user locations based on platform. ''' 

133 match __.sys.platform: 

134 case 'darwin': 

135 from . import macos as module 

136 sys_paths = module.discover_system_paths( ) 

137 user_paths = module.discover_user_paths( ) 

138 case 'win32': 

139 from . import windows as module 

140 sys_paths = module.discover_system_paths( ) 

141 user_paths = module.discover_user_paths( ) 

142 case _: 

143 from . import unix as module 

144 sys_paths = module.discover_system_paths( ) 

145 user_paths: frozenset[ __.Path ] = frozenset( ) 

146 rules[ _core.Reasons.OsDirectory ] = Rule( paths = frozenset( sys_paths ) ) 

147 config_paths = ( 

148 user_paths | { __.Path( auxdata.directories.user_config_path ), } ) 

149 rules[ _core.Reasons.UserConfiguration ] = Rule( 

150 paths = frozenset( config_paths ) ) 

151 

152 

153def _expand_location( path: str ) -> __.Path: 

154 ''' Expands path with home directory and environment variables. ''' 

155 expanded = __.os.path.expanduser( __.os.path.expandvars( path ) ) 

156 if ( __.sys.platform == 'win32' 156 ↛ 160line 156 didn't jump to line 160 because the condition on line 156 was never true

157 and expanded.startswith( '/' ) 

158 and not expanded.startswith( '//' ) # Skip UNC paths 

159 ): 

160 expanded = expanded.lstrip( '/' ) 

161 path_obj = __.Path( expanded ) 

162 if not path_obj.is_absolute( ): 

163 path_obj = __.Path( __.Path.cwd( ).drive + '/' + expanded ) 

164 else: path_obj = __.Path( expanded ) 

165 return _normalize_path( path_obj.resolve( ) ) 

166 

167 

168def _normalize_path( path: __.Path ) -> __.Path: 

169 ''' Normalizes path for consistent comparison across platforms. ''' 

170 resolved = path.resolve( ) 

171 if __.sys.platform == 'win32' and resolved.drive: 

172 return __.Path( 

173 resolved.drive.lower( ) 

174 + str( resolved )[ len( resolved.drive ): ] ) 

175 return resolved 

176 

177 

178def _process_configuration( 

179 auxdata: __.Globals, 

180 rules: dict[ _core.Reasons, Rule ], 

181) -> tuple[ 

182 frozenset[ str ], 

183 __.immut.Dictionary[ 

184 __.Path, tuple[ frozenset[ str ], frozenset[ str ] ] ], 

185]: 

186 config = auxdata.configuration.get( 'protection', { } ) 

187 if not config: return frozenset( ), __.immut.Dictionary( ) 

188 # Additional locations and patterns. 

189 locations_add = { 

190 _expand_location( path ) 

191 for path in config.get( 'additional-locations', ( ) ) } 

192 patterns_add = set( config.get( 'additional-patterns', ( ) ) ) 

193 rules[ _core.Reasons.CustomAddition ] = Rule( 

194 paths = frozenset( locations_add ), 

195 patterns = frozenset( patterns_add ) ) 

196 # Override defaults. 

197 patterns_remove = frozenset( config.get( 'defaults-disablement', ( ) ) ) 

198 # Process directory overrides 

199 supercession_rules: dict[ 

200 __.Path, tuple[ frozenset[ str ], frozenset[ str ] ] ] = { } 

201 for dir_path, dir_rules in config.get( 

202 'rules-supercession', { } 

203 ).items( ): 

204 full_path = _expand_location( dir_path ) 

205 supercession_rules[ full_path ] = ( 

206 frozenset( dir_rules.get( 'ignore', [ ] ) ), 

207 frozenset( dir_rules.get( 'protect', [ ] ) ) ) 

208 return patterns_remove, __.immut.Dictionary( supercession_rules )