]> git.proxmox.com Git - mirror_edk2.git/blob - BaseTools/Source/Python/UPT/Core/FileHook.py
BaseTools: Replace BSD License with BSD+Patent License
[mirror_edk2.git] / BaseTools / Source / Python / UPT / Core / FileHook.py
1 ## @file
2 # This file hooks file and directory creation and removal
3 #
4 # Copyright (c) 2014 - 2018, Intel Corporation. All rights reserved.<BR>
5 #
6 # SPDX-License-Identifier: BSD-2-Clause-Patent
7 #
8
9 '''
10 File hook
11 '''
12
13 import os
14 import stat
15 import time
16 import zipfile
17 from time import sleep
18 from Library import GlobalData
19
20 __built_in_remove__ = os.remove
21 __built_in_mkdir__ = os.mkdir
22 __built_in_rmdir__ = os.rmdir
23 __built_in_chmod__ = os.chmod
24 __built_in_open__ = open
25
26 _RMFILE = 0
27 _MKFILE = 1
28 _RMDIR = 2
29 _MKDIR = 3
30 _CHMOD = 4
31
32 gBACKUPFILE = 'file.backup'
33 gEXCEPTION_LIST = ['Conf'+os.sep+'DistributionPackageDatabase.db', '.tmp', gBACKUPFILE]
34
35 class _PathInfo:
36 def __init__(self, action, path, mode=-1):
37 self.action = action
38 self.path = path
39 self.mode = mode
40
41 class RecoverMgr:
42 def __init__(self, workspace):
43 self.rlist = []
44 self.zip = None
45 self.workspace = os.path.normpath(workspace)
46 self.backupfile = gBACKUPFILE
47 self.zipfile = os.path.join(self.workspace, gBACKUPFILE)
48
49 def _createzip(self):
50 if self.zip:
51 return
52 self.zip = zipfile.ZipFile(self.zipfile, 'w', zipfile.ZIP_DEFLATED)
53
54 def _save(self, tmp, path):
55 if not self._tryhook(path):
56 return
57 self.rlist.append(_PathInfo(tmp, path))
58
59 def bkrmfile(self, path):
60 arc = self._tryhook(path)
61 if arc and os.path.isfile(path):
62 self._createzip()
63 self.zip.write(path, arc.encode('utf_8'))
64 sta = os.stat(path)
65 oldmode = stat.S_IMODE(sta.st_mode)
66 self.rlist.append(_PathInfo(_CHMOD, path, oldmode))
67 self.rlist.append(_PathInfo(_RMFILE, path))
68 __built_in_remove__(path)
69
70 def bkmkfile(self, path, mode, bufsize):
71 if not os.path.exists(path):
72 self._save(_MKFILE, path)
73 return __built_in_open__(path, mode, bufsize)
74
75 def bkrmdir(self, path):
76 if os.path.exists(path):
77 sta = os.stat(path)
78 oldmode = stat.S_IMODE(sta.st_mode)
79 self.rlist.append(_PathInfo(_CHMOD, path, oldmode))
80 self._save(_RMDIR, path)
81 __built_in_rmdir__(path)
82
83 def bkmkdir(self, path, mode):
84 if not os.path.exists(path):
85 self._save(_MKDIR, path)
86 __built_in_mkdir__(path, mode)
87
88 def bkchmod(self, path, mode):
89 if self._tryhook(path) and os.path.exists(path):
90 sta = os.stat(path)
91 oldmode = stat.S_IMODE(sta.st_mode)
92 self.rlist.append(_PathInfo(_CHMOD, path, oldmode))
93 __built_in_chmod__(path, mode)
94
95 def rollback(self):
96 if self.zip:
97 self.zip.close()
98 self.zip = None
99 index = len(self.rlist) - 1
100 while index >= 0:
101 item = self.rlist[index]
102 exist = os.path.exists(item.path)
103 if item.action == _MKFILE and exist:
104 #if not os.access(item.path, os.W_OK):
105 # os.chmod(item.path, S_IWUSR)
106 __built_in_remove__(item.path)
107 elif item.action == _RMFILE and not exist:
108 if not self.zip:
109 self.zip = zipfile.ZipFile(self.zipfile, 'r', zipfile.ZIP_DEFLATED)
110 arcname = os.path.normpath(item.path)
111 arcname = arcname[len(self.workspace)+1:].encode('utf_8')
112 if os.sep != "/" and os.sep in arcname:
113 arcname = arcname.replace(os.sep, '/')
114 mtime = self.zip.getinfo(arcname).date_time
115 content = self.zip.read(arcname)
116 filep = __built_in_open__(item.path, "wb")
117 filep.write(content)
118 filep.close()
119 intime = time.mktime(mtime + (0, 0, 0))
120 os.utime(item.path, (intime, intime))
121 elif item.action == _MKDIR and exist:
122 while True:
123 try:
124 __built_in_rmdir__(item.path)
125 break
126 except IOError:
127 # Sleep a short time and try again
128 # The anti-virus software may delay the file removal in this directory
129 sleep(0.1)
130 elif item.action == _RMDIR and not exist:
131 __built_in_mkdir__(item.path)
132 elif item.action == _CHMOD and exist:
133 try:
134 __built_in_chmod__(item.path, item.mode)
135 except EnvironmentError:
136 pass
137 index -= 1
138 self.commit()
139
140 def commit(self):
141 if self.zip:
142 self.zip.close()
143 __built_in_remove__(self.zipfile)
144
145 # Check if path needs to be hooked
146 def _tryhook(self, path):
147 path = os.path.normpath(path)
148 works = self.workspace if str(self.workspace).endswith(os.sep) else (self.workspace + os.sep)
149 if not path.startswith(works):
150 return ''
151 for exceptdir in gEXCEPTION_LIST:
152 full = os.path.join(self.workspace, exceptdir)
153 if full == path or path.startswith(full + os.sep) or os.path.split(full)[0] == path:
154 return ''
155 return path[len(self.workspace)+1:]
156
157 def _hookrm(path):
158 if GlobalData.gRECOVERMGR:
159 GlobalData.gRECOVERMGR.bkrmfile(path)
160 else:
161 __built_in_remove__(path)
162
163 def _hookmkdir(path, mode=0o777):
164 if GlobalData.gRECOVERMGR:
165 GlobalData.gRECOVERMGR.bkmkdir(path, mode)
166 else:
167 __built_in_mkdir__(path, mode)
168
169 def _hookrmdir(path):
170 if GlobalData.gRECOVERMGR:
171 GlobalData.gRECOVERMGR.bkrmdir(path)
172 else:
173 __built_in_rmdir__(path)
174
175 def _hookmkfile(path, mode='r', bufsize=-1):
176 if GlobalData.gRECOVERMGR:
177 return GlobalData.gRECOVERMGR.bkmkfile(path, mode, bufsize)
178 return __built_in_open__(path, mode, bufsize)
179
180 def _hookchmod(path, mode):
181 if GlobalData.gRECOVERMGR:
182 GlobalData.gRECOVERMGR.bkchmod(path, mode)
183 else:
184 __built_in_chmod__(path, mode)
185
186 def SetRecoverMgr(mgr):
187 GlobalData.gRECOVERMGR = mgr
188
189 os.remove = _hookrm
190 os.mkdir = _hookmkdir
191 os.rmdir = _hookrmdir
192 os.chmod = _hookchmod
193 __FileHookOpen__ = _hookmkfile