I am writing parallel rainbow tables generator using parallel python and multiple machines. So far, I have it working on a single machine. It creates all possible passwords, hashes them, saves to file. It takes max_pass_len, file as arguments. Charset is predefined. Here's the code:
def hashAndSave(listOfComb, fileObject):for item in listOfComb:hashedVal = crypt(item, 'po')fileObject.write("%s:%s\n" % (hashedVal, item))def gen_rt_save(max_pw_len, file):charset = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'for i in range(3, max_pw_len):lista = [''.join(j) for j in combinations_with_replacement(charset, i)]hashAndSave(lista, file)
In order to parallelize it I need to split work between several machines. They need to know where start and stop generating passwords.
I figured I need a function which takes two arguments as parameters - start and stop point. Charset is global and has to be fully used in that combinations.
Simplest way would be to pick subset defined by two specific combinations from the list of all possible combinations for given charset and length range. However it takes time and space, and I need to avoid that.
Example:
charset='abcdefghi' #minified charset, normally 62 characters
ranged_comb(abf,defg)
result -> # it is not combination between to lists! there are specific functions for that, and they would not use full charset, only whats in lists
abf
abg
abh
abi
aca
acb
...
defd
defe
deff
defg
I thought about using lists of indexes of charset letters as parameters to use it in for loops. Yet I can't really use fors because their number might vary. How to create such function?
Since for brute-forcing passwords/generating rainbow tables you don't need a strict lexicographic order, as long as you go through all permutations (with repeats), this is quite simple:
def get_permutation_by_index(source, size, index):result = []for _ in range(size):result.append(source[index % len(source)])index = index // len(source)return result
Then all you need is an index of your permutation to get it from your iterable (strings of characters work, too). What it does is essentially looping through every possible element position for a given size, offset by the passed index, and stores it in the result
list. You can use return "".join(result)
instead if you're interested in getting a string out of it.
Now your workers can use this function to generate their 'password' range chunks. The simplest way of distribution would be if your workers would receive a single index from a distributor, perform their task and wait for the next index, however, unless your hashing function is excruciatingly slow spawning your workers, data transfer and such might end up slower than executing everything linearly in the main process. That's why you'd ideally want your workers to crunch over larger chunks at a time to justify distributing the whole process. Thus, you'd want your worker to accept a range and do something like:
def worker(source, size, start, end):result = []for i in range(start, end):result.append(get_permutation_by_index(source, size, i)) # add to the resultreturn result
Then all you need is a 'distributor' - a central dispatcher that will conduct the workers and split the workload among them. Since our workers don't accept varying sizes (that's an exercise I'll leave for you to figure out, you have all the ingredients) your 'distributor' will need to advance through sizes and keep a track what chunks it sends to your workers. This means that, for smaller and edge chunks, your workers will be receiving lesser workload than defined, but in the grand scheme of things this won't matter much for your use case. So, a simple distributor would look like:
def distributor(source, start, end, chunk_size=1000):result = []for size in range(start, end + 1): # for each size in the given range...total = len(source) ** size # max number of permutations for this sizefor chunk in range(0, total, chunk_size): # for each chunk...data = worker(source, size, chunk, min(chunk + chunk_size, total)) # process...result.append(data) # store the result...return result
Where start
and end
represent the number of source elements you want to permute through your workers, and chunk_size
represents the number of permutations each worker should process in an ideal case - as I mentioned, this won't be the case if a total number of permutations for a given size is lower than the chunk_size
or if there is less unprocessed permutations left for a given size than the chunk_size
value, but those are edge cases and I'll leave it for you to figure out how to distribute even more evenly. Also, keep in mind that the returned result will be a list of lists returned from our workers - you'll have to flatten it if you want to treat all the results equally.
But wait, isn't this a linear execution using a single process? Well, of course it is! What we did here is effectively decoupled workers from the distributor so now we can add as many arbitrary levels of separation and/or parallelization in-between without affecting our execution. For example, here's how to make our workers run in parallel:
from multiprocessing import Pool
import timedef get_permutation_by_index(source, size, index):result = []for _ in range(size):result.append(source[index % len(source)])index = index // len(source)return result# let's have our worker perform a naive ascii-shift Caesar cipher
def worker(source, size, start, end):result = []for i in range(start, end):time.sleep(0.2) # simulate a long operation by adding 200 milliseconds of pausepermutation = get_permutation_by_index(source, size, i)# naive Caesar cipher - simple ascii shift by +4 placesresult.append("".join([chr(ord(x) + 4) for x in permutation]))return resultdef distributor(source, start, end, workers=10, chunk_size=10):pool = Pool(processes=workers) # initiate our Pool with a specified number of workersjobs = set() # store our worker result referencesfor size in range(start, end + 1): # for each size in the given range...total = len(source) ** size # max number of permutations for this sizefor chunk in range(0, total, chunk_size): # for each chunk...# add a call to the worker to our Poolr = pool.apply_async(worker,(source, size, chunk, min(chunk + chunk_size, total)))jobs.add(r) # add our ApplyResult in the jobs set for a later checkupresult = []while jobs: # loop as long as we're waiting for results...for job in jobs:if job.ready(): # current worker finished processing...result.append(job.get()) # store our result...jobs.remove(job)breaktime.sleep(0.05) # let other threads get a chance to breathe a little...return result # keep in mind that this is NOT an ordered resultif __name__ == "__main__": # important protection for cross-platform use# call 6 threaded workers to sift through all 2 and 3-letter permutations # of "abcd", using the default chunk size ('ciphers per worker') of 10caesar_permutations = distributor("abcd", 2, 3, 6)print([perm for x in caesar_permutations for perm in x]) # print flattened results# ['gg', 'hg', 'eh', 'fh', 'gh', 'hh', 'eff', 'fff', 'gff', 'hff', 'egf', 'fgf', 'ggf',
# 'hgf', 'ehf', 'fhf', 'ghf', 'hhf', 'eeg', 'feg', 'geg', 'heg', 'efg', 'ffg', 'gfg',
# 'hfg', 'eee', 'fee', 'gee', 'hee', 'efe', 'ffe', 'gfe', 'hfe', 'ege', 'fge', 'ee',
# 'fe', 'ge', 'he', 'ef', 'ff', 'gf', 'hf', 'eg', 'fg', 'gge', 'hge', 'ehe', 'fhe',
# 'ghe', 'hhe', 'eef', 'fef', 'gef', 'hef', 'ehh', 'fhh', 'ghh', 'hhh', 'egg', 'fgg',
# 'ggg', 'hgg', 'ehg', 'fhg', 'ghg', 'hhg', 'eeh', 'feh', 'geh', 'heh', 'efh', 'ffh',
# 'gfh', 'hfh', 'egh', 'fgh', 'ggh', 'hgh']
Voila! Everything executed in parallel (and over multiple cores if the underlying OS did its scheduling properly). This should be sufficient for your use case - all you need is to add your communication or I/O code in the worker
function and let the actual code be executed by a receiver on the other side, then when you get the results return them back to the distributor
. You can also directly write your tables in the distributor()
instead of waiting for everything to finish.
And if you're going to execute this exclusively over a network, you don't really need it in a multiprocess setting, threads would be sufficient to handle the I/O latency, so just replace the multiprocess import with: from multiprocessing.pool import ThreadPool as Pool
(don't let the module name fool you, this is a threading interface, not a multiprocessing one!).