I am looking for an efficient method to split overlapping ranges, I have read many similar questions but none of them solves my problem.
The problem is simple, given a list of triplets, the first two elements of each triplet are integers and the first number is always less than or equal to the second, apply the following transformations such that for all pairings of the output, the start of the later triplet is always greater than the end of the other, and if the start of one triplet is equal to the end of the other plus one, they have different data:
If they have different data, subtract one from the other:
(0, 100, 'A'), (0, 20, 'B') -> (0, 20, 'B'), (21, 100, 'A')
(0, 100, 'A'), (20, 50, 'B') -> (0, 19, 'A'), (20, 50, 'B'), (51, 100, 'A')
(0, 100, 'A'), (50, 100, 'B') -> (0, 49, 'A'), (50, 100, 'B')
(0, 100, 'A'), (200, 300, 'B') -> (0, 100, 'A'), (200, 300, 'B')
(0, 100, 'A'), (50, 300, 'B') -> (0, 49, 'A'), (50, 300, 'B')
Else if they overlap, merge them:
(0, 100, 'A'), (0, 20, 'A') -> (0, 100, 'A')
(0, 100, 'A'), (20, 50, 'A') -> (0, 100, 'A')
(0, 100, 'A'), (50, 100, 'A') -> (0, 100, 'A')
(0, 100, 'A'), (200, 300, 'A') -> (0, 100, 'A'), (200, 300, 'A')
(0, 100, 'A'), (101, 300, 'A') -> (0, 300, 'A')
(0, 100, 'A'), (50, 300, 'A') -> (0, 300, 'A')
Here is a test case:
[(0, 16, 'red'), (0, 4, 'green'), (2, 9, 'blue'), (2, 7, 'cyan'), (4, 9, 'purple'), (6, 8, 'magenta'), (9, 14, 'yellow'), (11, 13, 'orange'), (18, 21, 'green'), (22, 25, 'green')]
Expected output:
[(0, 1, 'green'), (2, 3, 'cyan'), (4, 5, 'purple'), (6, 8, 'magenta'), (9, 10, 'yellow'), (11, 13, 'orange'), (14, 14, 'yellow'), (15, 16, 'red'), (18, 25, 'green')]
Graphic representation:
These functions encode the rules in Python:
def subtract(A, B):(As, Ae, _), (Bs, Be, Bd) = A, Bif As > Be or Bs > Ae:return [[Bs, Be, Bd]]result = []if As > Bs:result.append([Bs, As - 1, Bd])if Ae < Be:result.append([Ae + 1, Be, Bd])return resultdef join(A, B):(As, Ae, Ad), (Bs, Be, Bd) = A, Bif Bs > As:As, Ae, Bs, Be = Bs, Be, As, Aeif Bs <= As and Ae <= Be:return [[Bs, Be, Bd]]return [[Bs, Ae, Bd]] if As <= Be + 1 else [[Bs, Be, Bd], [As, Ae, Ad]]
I have previously implemented a somewhat efficient function that splits the ranges when not (s1 < s2 < e1 < e2)
always holds. But it isn't efficient enough and fails to give the correct output if the aforementioned constraint isn't enforced.
The accepted answer does improve performance and can handle cases where ranges intersect, but it isn't as efficient as my data demands. I have literally millions of items to process.
(make_generic_case
can be found in the linked question above)
In [411]: sample = make_generic_case(4096, 65536, 16)In [412]: %timeit solve(sample); sample.pop(-1)
2.14 ms ± 124 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)In [413]: len(solve(sample))
Out[413]: 1883
It takes about 2140/4096 = 0.5224609375 microseconds to process each item, 0.25 to 0.4 microseconds per item is acceptable.
The following function is the only function I wrote to date that gives the correct output when ranges intersect, but it isn't efficient at all:
def brute_force_discretize(ranges):numbers = {}ranges.sort(key=lambda x: (x[0], -x[1]))for start, end, data in ranges:numbers |= {n: data for n in range(start, end + 1)}numbers = list(numbers.items())l = len(numbers)i = 0output = []while i < l:di = 0curn, curv = numbers[i]while i != l and curn + di == numbers[i][0] and curv == numbers[i][1]:i += 1di += 1output.append((curn, numbers[i - 1][0], curv))return output
I have implemented several other algorithms found here and here, but all of them fail to pass make_generic_case(4096, 65536, 16)
and comparing against brute_force_discretize
test:
def discretize_gen(ranges):nodes = get_nodes(ranges)stack = []for (n1, e1, d1), (n2, e2, _) in zip(nodes, nodes[1:]):if e1:stack.remove(d1)else:stack.append(d1)start = n1 + e1end = n2 - (not e2)if start <= end and stack:yield start, end, stack[-1]def merge(segments):start, end, data = next(segments) # keep one segment bufferedfor start2, end2, data2 in segments:if end + 1 == start2 and data == data2: # adjacent & same dataend = end2 # mergeelse:yield start, end, datastart, end, data = start2, end2, data2yield start, end, data # flush the bufferdef subtractRanges(A, B):(As, Ae), (Bs, Be) = A, Bif As > Be or Bs > Ae:return [[Bs, Be]]result = []if As > Bs:result.append([Bs, As - 1])if Ae < Be:result.append([Ae + 1, Be])return resultdef merge_list(segments):start, end, data = segments.pop(0)for start2, end2, data2 in segments:if end + 1 == start2 and data == data2:end = end2else:yield start, end, datastart, end, data = start2, end2, data2yield start, end, datadef discretize(ranges):i = 0ranges = [list(e) for e in ranges]while i < len(ranges):for superior in ranges[i + 1 :]:if result := subtractRanges(superior[:2], ranges[i][:2]):ranges[i][:2] = result[0]if len(result) > 1:ranges[i + 1 : 0] = ([*result[1], ranges[i][2]],)else:ranges.pop(i)i -= 1breaki += 1return list(merge_list(sorted(ranges)))def flatten(ranges):stack = []result = []ranges = sorted(ranges, key=lambda x: (x[0], -x[1]))for ns, ne, nd in ranges:new = [ns, ne, nd]append = Truefor old in stack.copy():stack.remove(old)if nd != old[2]:for item in subtract(new, old):if item[1] < ns:result.append(item)else:stack.append(item)else:joined = join(new, old)if len(joined) == 1:stack.extend(joined)append = Falseelse:result.append(old)if append:stack.append(new)if stack:result.extend(stack)return sorted(result)
All of them fail the correctness check, and none of them is as efficient as my previous implementation to begin with.
What is an efficient method, that for any test case generated using make_generic_case(4096, 65536, 16)
, gives the same output as brute_force_discretize
and takes about 1 millisecond to process such test case on average? (I have tested thoroughly that the function solve
found in the accepted answer gives the correct output, so you can use that to verify the output).
I have to point out that I really have millions of triplets to process, no exaggeration. And the numbers are very large, they are under 2^32-1 (IPv4 address) and 2^128-1 (IPv6), so bruteforce methods are unacceptable. I in fact wrote one myself and I posted it above.
And I know how slow it is, it will take forever to process my data, or will burn my RAM.
This is the link to one of many datasets I have to process: https://git.ipfire.org/?p=location/location-database.git;a=tree, it is the database.txt file.
The file is very large (hundred mebibytes) and contains millions of entries, you have to scroll down (like really down) to see the IP ranges. I already wrote code that processes it, but it was very slow but much faster than the one posted in the existing answer below.