-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsFileManipulation.py
More file actions
379 lines (315 loc) · 10.5 KB
/
sFileManipulation.py
File metadata and controls
379 lines (315 loc) · 10.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
import zipfile
import sys, os
import shutil
import multiprocessing
kilobytes = 1024
megabytes = kilobytes * 1024
passwd = 'mypassword'
originalFile = "bigFile.txt"
txtFileExtension = ".txt"
zipFileExtension = ".zip"
# this function encodes/decoes data using XOR
def xor_crypt_string(inputFile, processedFile, key, encode, decode):
fileFrom = open(inputFile, "r+")
fileTo = open(processedFile, "w")
try:
data = fileFrom.read();
from itertools import izip, cycle
import base64
if decode:
data = base64.decodestring(data)
xored = ''.join(chr(ord(x) ^ ord(y)) for (x,y) in izip(data, cycle(key)))
result = xored
if encode:
result = base64.encodestring(xored).strip()
fileTo.write(result)
finally:
fileTo.close()
fileFrom.close()
# this function splits one file to many files based on chunksize
def split(fromfile, todir, chunksize):
if not os.path.exists(todir):
os.mkdir(todir)
else:
for fname in os.listdir(todir):
os.remove(os.path.join(todir, fname))
partnum = 0
input = open(fromfile, 'rb') # use binary mode on Windows
while 1: # eof=empty string from read
chunk = input.read(chunksize) # get next part <= chunksize
if not chunk: break
partnum = partnum+1
filename = os.path.join(todir, (fromfile + "part%04d" % partnum))
fileobj = open(filename, 'wb')
fileobj.write(chunk)
fileobj.close()
input.close()
assert partnum <= 9999 # join sort fails if 5 digits
return partnum
# this function aggregates given files in 'fromdir' to one file
def join(fromdir, tofile):
output = open(tofile, 'wb')
parts = os.listdir(fromdir)
parts.sort()
for filename in parts:
filepath = os.path.join(fromdir, filename)
fileobj = open(filepath, 'rb')
while 1:
filebytes = fileobj.read(megabytes)
if not filebytes: break
output.write(filebytes)
fileobj.close()
output.close()
def doZip(orig, zipped):
print " Zipping..."
import zipfile
zf = zipfile.ZipFile(zipped, mode='w')
try:
print " adding ', orig, ' into zip"
zf.write(orig)
finally:
zf.close()
def doUnzip(zipped):
print " Unzipping..."
import zipfile
zf = zipfile.ZipFile(zipped)
try:
zf.extractall()
finally:
zf.close()
# test 1: zip/unzip a big file
def test1():
test1File = "bigFile1"
# copy orignal file to test1 file for testing
shutil.copy(originalFile, test1File)
zippedFile = test1File + zipFileExtension
# zip file
doZip(test1File, zippedFile)
# unzip file using a different process and wait until it's done
#doUnzip(zippedFile)
proc = multiprocessing.Process(target=doUnzip, args=(zippedFile,))
proc.start()
proc.join()
# remove generated files
os.remove(test1File)
os.remove(zippedFile)
# test 2: encrypt/decrypt a big file
def test2():
test2File = "bigFile2"
# copy orignal file to test2 file for testing
shutil.copy(originalFile, test2File)
encryptedFile = test2File + 'encrypted' + txtFileExtension
# encode the file and output the encoded file
print " encoding and encrpyting file..."
xor_crypt_string(test2File, encryptedFile, passwd, True, False)
# decode the file and output the original test file
print " decrypting and decoding file..."
proc = multiprocessing.Process(target=xor_crypt_string, args=(encryptedFile, test2File, passwd, False, True,))
#xor_crypt_string(encryptedFile, test2File, passwd, False, True)
proc.start()
proc.join()
# remove generated files
os.remove(test2File)
os.remove(encryptedFile)
# test 3: split/merge a big file. split limit is 5MB by default.
def test3():
test3File = "bigFile3"
# copy orignal file to test3 file for testing
shutil.copy(originalFile, test3File)
splitFileDir = 'splitFiles'
# split file into a set of smaller files (5MB)
try:
parts = split(test3File, splitFileDir, 5 * megabytes)
except:
print 'Error during split:'
print sys.exc_type, sys.exc_value
else:
print ' Split finished:', parts
# aggregate file
try:
proc = multiprocessing.Process(target=join, args=(splitFileDir, test3File,))
proc.start()
proc.join()
#join(splitFileDir, test3File)
except:
print 'Error aggregating files:'
print sys.exc_type, sys.exc_value
else:
print ' Merge completed'
# remove generated file and directory
os.remove(test3File)
shutil.rmtree(splitFileDir)
# Encrypt the file and zip it. Then unzip and decrpyt it.
def test4():
test4File = "bigFile4"
# copy orignal file to test4 file for testing
shutil.copy(originalFile, test4File)
zippedFile = test4File + zipFileExtension
encryptedFile = test4File + 'encrypted' + txtFileExtension
# encode the file and output the encoded file
print " encoding and encrpyting file..."
xor_crypt_string(test4File, encryptedFile, passwd, True, False)
# zip file
doZip(encryptedFile, zippedFile)
# unzip file using a different process and wait until it's done
#doUnzip(zippedFile)
proc = multiprocessing.Process(target=doUnzip, args=(zippedFile,))
proc.start()
proc.join()
# decode the file and output the original test file
print " decrypting and decoding file..."
proc = multiprocessing.Process(target=xor_crypt_string, args=(encryptedFile, test4File, passwd, False, True,))
proc.start()
proc.join()
#xor_crypt_string(encryptedFile, test4File, passwd, False, True)
# remove generated files
os.remove(test4File)
os.remove(encryptedFile)
os.remove(zippedFile)
# Encrypt the file and split it into smaller files. Then merge back and decrypt.
def test5():
test5File = "bigFile5"
# copy orignal file to test5 file for testing
shutil.copy(originalFile, test5File)
encryptedFile = test5File + "encrypted" + txtFileExtension
splitFileDir = "splitFiles"
# encode the file and output the encoded file
print " encoding and encrpyting file..."
xor_crypt_string(test5File, encryptedFile, passwd, True, False)
# split file into a set of smaller files (5MB)
try:
parts = split(encryptedFile, splitFileDir, 5 * megabytes)
except:
print 'Error during split:'
print sys.exc_type, sys.exc_value
else:
print ' Split finished:', parts
# aggregate file
try:
proc = multiprocessing.Process(target=join, args=(splitFileDir, encryptedFile,))
proc.start()
proc.join()
#join(splitFileDir, encryptedFile)
except:
print 'Error aggregating files:'
print sys.exc_type, sys.exc_value
else:
print ' Merge completed'
# # decode the file and output the original test file
print " decrypting and decoding file..."
proc = multiprocessing.Process(target=xor_crypt_string, args=(encryptedFile, test5File, passwd, False, True,))
proc.start()
proc.join()
#xor_crypt_string(encryptedFile, test5File, passwd, False, True)
# remove generated files
os.remove(test5File)
os.remove(encryptedFile)
shutil.rmtree(splitFileDir)
# Zip the file and split it. Then merge back and unzip.
def test6():
test6File = "bigFile6"
# copy orignal file to test6 file for testing
shutil.copy(originalFile, test6File)
zippedFile = test6File + zipFileExtension
splitFileDir = 'splitFiles'
# zip file
doZip(test6File, zippedFile)
# split file into a set of smaller files (5MB)
try:
parts = split(zippedFile, splitFileDir, 5 * megabytes)
except:
print 'Error during split:'
print sys.exc_type, sys.exc_value
else:
print ' Split finished:', parts
# aggregate file
try:
proc = multiprocessing.Process(target=join, args=(splitFileDir, zippedFile,))
proc.start()
proc.join()
#join(splitFileDir, zippedFile)
except:
print 'Error aggregating files:'
print sys.exc_type, sys.exc_value
else:
print ' Merge completed'
# unzip file using a different process and wait until it's done
proc = multiprocessing.Process(target=doUnzip, args=(zippedFile,))
proc.start()
proc.join()
#doUnzip(zippedFile)
# remove generated files
os.remove(test6File)
os.remove(zippedFile)
shutil.rmtree(splitFileDir)
def test7():
test7File = "bigFile7"
# copy orignal file to test7 file for testing
shutil.copy(originalFile, test7File)
zippedFile = test7File + zipFileExtension
splitFileDir = 'splitFiles'
encryptedFile = test7File + 'encrypted' + txtFileExtension
# encode the file and output the encoded file
print " encoding and encrpyting file..."
xor_crypt_string(test7File, encryptedFile, passwd, True, False)
# zip file
doZip(encryptedFile, zippedFile)
# split file into a set of smaller files (5MB)
try:
parts = split(zippedFile, splitFileDir, 5 * megabytes)
except:
print 'Error during split:'
print sys.exc_type, sys.exc_value
else:
print ' Split finished:', parts
# aggregate file
try:
proc = multiprocessing.Process(target=join, args=(splitFileDir, zippedFile,))
proc.start()
proc.join()
#join(splitFileDir, zippedFile)
except:
print 'Error aggregating files:'
print sys.exc_type, sys.exc_value
else:
print ' Merge completed'
# unzip file using a different process and wait until it's done
proc = multiprocessing.Process(target=doUnzip, args=(zippedFile,))
proc.start()
proc.join()
doUnzip(zippedFile)
# decode the file and output the original test file
print " decrypting and decoding file..."
proc = multiprocessing.Process(target=xor_crypt_string, args=(encryptedFile, test7File, passwd, False, True,))
proc.start()
proc.join()
#xor_crypt_string(encryptedFile, test7File, passwd, False, True)
# remove generated files
os.remove(test7File)
os.remove(zippedFile)
os.remove(encryptedFile)
shutil.rmtree(splitFileDir)
if __name__ == '__main__':
print "This script is useful to generate test cases for forwardtracking"
print "Test cases currently include the combination of the following file/process manipulations"
print "Test 1 Zip/Unzip: Zip and unzip a given big file"
print "Test 2 Encrypt/Decrypt: Encrypt and decrypt a given big file"
print "Test 3 Split/Merge: Split a given big file into multiple parts and merge back"
print "Test 4 Encrypt and Zip/Unzip and Decrypt: Combination of test 1 and test 2"
print "Test 5 Encrypt and Split/Merge and Decrypt: Combination of test 2 and test 3"
print "Test 6 Zip and Split/Merge and Unzip: Combination of test 1 and test 3"
print "Test 7 Encrypt, Zip & Split/Merge, Unzip and Decrypt: Combination of test 1, test 2 and test 3"
print "\nDoing test 1..."
test1()
print "\nDoing test 2..."
test2()
print "\nDoing test 3..."
test3()
print "\nDoing test 4..."
test4()
print "\nDoing test 5..."
test5()
print "\nDoing test 6..."
test6()
print "\nDoing test 7..."
test7()