-
Notifications
You must be signed in to change notification settings - Fork 0
/
sm.py
166 lines (132 loc) · 5.59 KB
/
sm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
import re
import os
import shutil
import threading
def thread(func, *args, **kwargs):
thread = threading.Thread(target=func, args=args, kwargs=kwargs)
thread.start()
return thread
def acfgetreg(string, key):
'''ACF property extracting regular expression'''
found = re.findall('"%s"\s+"(.+)"' % key, string)
if found:
return found[0]
else:
return False
def dirsize(path):
'''Gets size of given directory.'''
if os.path.isfile(path):
return os.path.getsize(path)
total = 0
for dirpath, dirs, files in os.walk(path):
for file in files:
file = os.path.join(dirpath, file)
try:
total += os.path.getsize(file)
except OSError:
pass
return total
def bytesize(num, binary=True):
'''Given a number of bytes, human-formats as binary(kibi, mebi; *1024) or non-binary (kilo, mega; *1000) up to yotta/yobibytes.'''
if binary:
divisor = 1024.0
else:
divisor = 1000.0
for unit in ['','K','M','G','T','P','E','Z']:
if abs(num) < divisor:
return "%3.1f %s%sB" % (num, unit, 'i'*binary*bool(unit))
num /= divisor
return "%.1f Y%sB" % (num, 'i' * binary)
def getpath(path):
'''Attempts to resolve to Steam library path.'''
base = os.path.basename(path)
if os.path.exists(os.path.join(path, 'steam.dll')):
pass #If DLL is present, it's valid
elif base == 'steamapps': #Work down directories
path = os.path.join(path, '..')
elif base in ('common', 'downloading', 'sourcemods', 'temp'):
path = os.path.join(path, '..', '..')
else:
downpath = os.path.join(path, 'Steam')
if os.path.exists(downpath): #Try to work one up
path = downpath
else:
walk = os.walk(path)
for path_, dirs, files in walk:
if ('steam.dll' in files or 'Steam.dll' in files) and 'steamapps' in dirs:
path = path_
break
else: #No result
return False
return os.path.realpath(path)
class Operation:
'''Folder copying operation w/ status. Does not attempt to see if free space is available.'''
def _copy(self, src, dst):
'''Copies SRC to DST, updates internal stats.'''
relsrc = os.path.relpath(src, self.src) # Relative source path for display
size = os.path.getsize(src) #Size of file to be copied
self._status('Copying file %s' % relsrc)
shutil.copy2(src, dst, follow_symlinks=False)
self.copied += size
def start(self):
'''Starts operation.'''
self._status('Scanning tree')
for dirpath, dirs, files in self.list:
for d in dirs:
d = os.path.join(dirpath, d) #Make absolute
relpath = os.path.relpath(d, self.src) #Get relative to top source
newpath = os.path.join(self.dst, relpath) #Get new destination folder
if not os.path.exists(newpath):
os.makedirs(newpath) #Make directory if nonexistent
relpath = os.path.relpath(dirpath, self.src)
newpath = os.path.join(self.dst, relpath) #Get destination again
for file in files:
file = os.path.realpath(os.path.join(dirpath, file))
self._copy(file, newpath)
def _status(self, status):
'''Runs callback with status and attempts to stop divison by zero errors.'''
self.status = status
if callable(self.callback):
try:
self.callback(status)
except ZeroDivisionError: # Presumably, (deleted/size) was referenced; it must ergo be zero
self.size = self.copied = 1
self.callback(status)
def __init__(self, src, dst, callback=None):
'''Copies folder SRC to DST. Whenever status updates, calls CALLBACK with it, if existent.'''
if os.path.exists(dst):
raise FileExistsError
return None
self.src = src
self.dst = dst
self.callback = callback
self.list = os.walk(self.src) #List of files
self.size = dirsize(src) #Bytes to copy
self.copied = 0 # Bytes copied
def move(sender, game, library, callback=None):
'''Moves game with ID `game` from library `sender` to `library`.
If callback is set, every operation done does callback(statusmsg, percentdone)'''
srcpath = os.path.join(sender['path'], game['path'])
gamepath = os.path.basename(game['path']) # Name of folder
dstpath = os.path.join(library['path'], 'steamapps', 'common', gamepath)
if os.path.isdir(dstpath):
callback('Deleting existent paths', 0)
shutil.rmtree(dstpath)
copyop = Operation(srcpath, dstpath, library['path'])
if callable(callback):
copyop.callback = lambda status: callback(status, 100*(copyop.copied/copyop.size))
copyop.start()
if callable(callback):
callback('Copying metadata file', 100)
shutil.copy2(game['acfpath'], os.path.join(library['path'], 'steamapps')) # Copy ACF file
if delete:
callback('Deleting original files', 100)
delete(game)
def delete(library, game):
'''Deletes GAME from LIBRARY. No callback.'''
path = os.path.join(library['path'], game['path'])
if os.path.exists(path):
shutil.rmtree(path)
acfpath = os.path.join(path, os.pardir, os.pardir, 'appmanifest_%s.acf' % game['id'])
if os.path.exists(acfpath):
os.remove(acfpath)