How to efficiently split overlapping ranges?

2024/11/17 23:29:10

I am looking for an efficient method to split overlapping ranges, I have read many similar questions but none of them solves my problem.

The problem is simple, given a list of triplets, the first two elements of each triplet are integers and the first number is always less than or equal to the second, apply the following transformations such that for all pairings of the output, the start of the later triplet is always greater than the end of the other, and if the start of one triplet is equal to the end of the other plus one, they have different data:

If they have different data, subtract one from the other:

(0, 100, 'A'), (0, 20, 'B') -> (0, 20, 'B'), (21, 100, 'A')
(0, 100, 'A'), (20, 50, 'B') -> (0, 19, 'A'), (20, 50, 'B'), (51, 100, 'A')
(0, 100, 'A'), (50, 100, 'B') -> (0, 49, 'A'), (50, 100, 'B')
(0, 100, 'A'), (200, 300, 'B') -> (0, 100, 'A'), (200, 300, 'B')
(0, 100, 'A'), (50, 300, 'B') -> (0, 49, 'A'), (50, 300, 'B')

Else if they overlap, merge them:

(0, 100, 'A'), (0, 20, 'A') -> (0, 100, 'A')
(0, 100, 'A'), (20, 50, 'A') -> (0, 100, 'A')
(0, 100, 'A'), (50, 100, 'A') -> (0, 100, 'A')
(0, 100, 'A'), (200, 300, 'A') -> (0, 100, 'A'), (200, 300, 'A')
(0, 100, 'A'), (101, 300, 'A') -> (0, 300, 'A')
(0, 100, 'A'), (50, 300, 'A') -> (0, 300, 'A')

Here is a test case:

[(0, 16, 'red'), (0, 4, 'green'), (2, 9, 'blue'), (2, 7, 'cyan'), (4, 9, 'purple'), (6, 8, 'magenta'), (9, 14, 'yellow'), (11, 13, 'orange'), (18, 21, 'green'), (22, 25, 'green')]

Expected output:

[(0, 1, 'green'), (2, 3, 'cyan'), (4, 5, 'purple'), (6, 8, 'magenta'), (9, 10, 'yellow'), (11, 13, 'orange'), (14, 14, 'yellow'), (15, 16, 'red'), (18, 25, 'green')]

Graphic representation:

enter image description here

These functions encode the rules in Python:

def subtract(A, B):(As, Ae, _), (Bs, Be, Bd) = A, Bif As > Be or Bs > Ae:return [[Bs, Be, Bd]]result = []if As > Bs:result.append([Bs, As - 1, Bd])if Ae < Be:result.append([Ae + 1, Be, Bd])return resultdef join(A, B):(As, Ae, Ad), (Bs, Be, Bd) = A, Bif Bs > As:As, Ae, Bs, Be = Bs, Be, As, Aeif Bs <= As and Ae <= Be:return [[Bs, Be, Bd]]return [[Bs, Ae, Bd]] if As <= Be + 1 else [[Bs, Be, Bd], [As, Ae, Ad]]

I have previously implemented a somewhat efficient function that splits the ranges when not (s1 < s2 < e1 < e2) always holds. But it isn't efficient enough and fails to give the correct output if the aforementioned constraint isn't enforced.

The accepted answer does improve performance and can handle cases where ranges intersect, but it isn't as efficient as my data demands. I have literally millions of items to process.

(make_generic_case can be found in the linked question above)

In [411]: sample = make_generic_case(4096, 65536, 16)In [412]: %timeit solve(sample); sample.pop(-1)
2.14 ms ± 124 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)In [413]: len(solve(sample))
Out[413]: 1883

It takes about 2140/4096 = 0.5224609375 microseconds to process each item, 0.25 to 0.4 microseconds per item is acceptable.

The following function is the only function I wrote to date that gives the correct output when ranges intersect, but it isn't efficient at all:

def brute_force_discretize(ranges):numbers = {}ranges.sort(key=lambda x: (x[0], -x[1]))for start, end, data in ranges:numbers |= {n: data for n in range(start, end + 1)}numbers = list(numbers.items())l = len(numbers)i = 0output = []while i < l:di = 0curn, curv = numbers[i]while i != l and curn + di == numbers[i][0] and curv == numbers[i][1]:i += 1di += 1output.append((curn, numbers[i - 1][0], curv))return output

I have implemented several other algorithms found here and here, but all of them fail to pass make_generic_case(4096, 65536, 16) and comparing against brute_force_discretize test:

def discretize_gen(ranges):nodes = get_nodes(ranges)stack = []for (n1, e1, d1), (n2, e2, _) in zip(nodes, nodes[1:]):if e1:stack.remove(d1)else:stack.append(d1)start = n1 + e1end = n2 - (not e2)if start <= end and stack:yield start, end, stack[-1]def merge(segments):start, end, data = next(segments)  # keep one segment bufferedfor start2, end2, data2 in segments:if end + 1 == start2 and data == data2:  # adjacent & same dataend = end2  # mergeelse:yield start, end, datastart, end, data = start2, end2, data2yield start, end, data  # flush the bufferdef subtractRanges(A, B):(As, Ae), (Bs, Be) = A, Bif As > Be or Bs > Ae:return [[Bs, Be]]result = []if As > Bs:result.append([Bs, As - 1])if Ae < Be:result.append([Ae + 1, Be])return resultdef merge_list(segments):start, end, data = segments.pop(0)for start2, end2, data2 in segments:if end + 1 == start2 and data == data2:end = end2else:yield start, end, datastart, end, data = start2, end2, data2yield start, end, datadef discretize(ranges):i = 0ranges = [list(e) for e in ranges]while i < len(ranges):for superior in ranges[i + 1 :]:if result := subtractRanges(superior[:2], ranges[i][:2]):ranges[i][:2] = result[0]if len(result) > 1:ranges[i + 1 : 0] = ([*result[1], ranges[i][2]],)else:ranges.pop(i)i -= 1breaki += 1return list(merge_list(sorted(ranges)))def flatten(ranges):stack = []result = []ranges = sorted(ranges, key=lambda x: (x[0], -x[1]))for ns, ne, nd in ranges:new = [ns, ne, nd]append = Truefor old in stack.copy():stack.remove(old)if nd != old[2]:for item in subtract(new, old):if item[1] < ns:result.append(item)else:stack.append(item)else:joined = join(new, old)if len(joined) == 1:stack.extend(joined)append = Falseelse:result.append(old)if append:stack.append(new)if stack:result.extend(stack)return sorted(result)

All of them fail the correctness check, and none of them is as efficient as my previous implementation to begin with.

What is an efficient method, that for any test case generated using make_generic_case(4096, 65536, 16), gives the same output as brute_force_discretize and takes about 1 millisecond to process such test case on average? (I have tested thoroughly that the function solve found in the accepted answer gives the correct output, so you can use that to verify the output).


I have to point out that I really have millions of triplets to process, no exaggeration. And the numbers are very large, they are under 2^32-1 (IPv4 address) and 2^128-1 (IPv6), so bruteforce methods are unacceptable. I in fact wrote one myself and I posted it above.

And I know how slow it is, it will take forever to process my data, or will burn my RAM.

This is the link to one of many datasets I have to process: https://git.ipfire.org/?p=location/location-database.git;a=tree, it is the database.txt file.

The file is very large (hundred mebibytes) and contains millions of entries, you have to scroll down (like really down) to see the IP ranges. I already wrote code that processes it, but it was very slow but much faster than the one posted in the existing answer below.

Answer

If your ranges are integers and the values arent too large, you can use a "paint" technique where you override color positions in a list with the color specified by each range. Find the color breaks in the mask to identify the new range breaks and combine them using zip to reform the final list of merged/overlayed ranges

R = [(0, 16, 'red'), (0, 4, 'green'), (2, 9, 'blue'), (2, 7, 'cyan'), (4, 9, 'purple'), (6, 8, 'magenta'), (9, 14, 'yellow'), (11, 13, 'orange'), (18, 21, 'green'), (22, 25, 'green')]M = [None]*(max(e for _,e,_ in R)+1) # paint mask (0 ... max end)
for s,e,c in R:M[s:e+1] = [c]*(e-s+1)           # paint range with colorpos = [i for i,(a,b) in enumerate(zip(M,M[1:]),1) if a!=b]
R2  = [(s,e-1,M[s]) for s,e in zip([0]+pos,pos+[len(M)]) if M[s]]

ouput:

print(R2)[(0, 1, 'green'), (2, 3, 'cyan'), (4, 5, 'purple'), (6, 8, 'magenta'), (9, 10, 'yellow'), (11, 13, 'orange'), (14, 14, 'yellow'), (15, 16, 'red'), (17, 17, None), (18, 25, 'green')]

You could also use groupby from itertools to reform the groups (could be slightly faster):

from itertools import groupby
R2 = [(g[0],g[-1],c) for c,(*g,) in groupby(range(len(M)),key=lambda i:M[i]) if c]

[EDIT]

For large value ranges, this simple "paint" logic isn't ideal.

Here is a different approach that creates a bit mask of range indexes for each starting and stopping positions. The ranges that stack on top of each other will add up as we go through position and subtract when reaching the position following their last. Since each range only starts once and ends once, instead of counting, we can use a bit mask and determine the layers at each position using an XOR bitwise operation on the masks (also works using sets but bit masks are faster):

from collections import defaultdict
def bitMasks(R):R = sorted(R,key=lambda t:(t[0],t[0]-t[1])) # see noteposMask = defaultdict(int)for i,(s,e,_) in enumerate(R):posMask[s]   |= 1<<iposMask[e+1] |= 1<<i        result       = []start,color  = 0, Nonelayers       = 0for pos in sorted(posMask):layers  ^= posMask[pos]newColor = R[layers.bit_length()-1][-1] if layers else Noneif color == newColor: continueif color is not None:result.append((start,pos-1,color))start,color = pos,newColorreturn result

Note: the ranges that start at the same position are sorted in decreasing size so the smaller overlays always have precedence over the larger ones. This allows use of the high bit position (i.e. last occurrence) to be used as the top layer index.

output:

bitMasks(R)[(0, 1, 'green'), (2, 3, 'cyan'), (4, 5, 'purple'), (6, 8, 'magenta'), (9, 10, 'yellow'), (11, 13, 'orange'), (14, 14, 'yellow'), (15, 16, 'red'), (18, 25, 'green')]

I benchmarked it against the solve() solution and found that bitMasks is 75% slower so not as good but it may provide some inspiration.

https://en.xdnf.cn/q/118712.html

Related Q&A

pass 2D array to linear regression (sklearn)

I want to pass 2D array to linear regression: x = [[1, 0, 1, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 1, 0, 0, 1, 3, 1, 1, 0, 0, 0, 0, 1, 0, 0, 1],[0, 0, 0, 0, 0, 0, 0, 0, 1, 0], [0, 0, 0,…

How do I fix this OverflowError?

I keep getting a "OverflowError: math range error". No matter what I input, the result is the same. Im running Python 3.3, and its finding the problem at the last line. How do I fix this? (A…

Pyinstaller subprocess.check_output error

Ive bundled my app with pyinstaller to 2 *.exegui_app.exe (onefile) config.ini \libs (onedir)winservice.exe+ all DLLs and libsWhen I manually install service with command winservice.exe install everyth…

Exception handler to check if inline script for variable worked

I need to add exception handling that considers if line 7 fails because there is no intersection between the query and array brands. Im new to using exception handlers and would appreciate any advice o…

Parameter list with single argument

When testing Python parameter list with a single argument, I found some weird behavior with print.>>> def hi(*x): ... print(x) ... >>> hi() () >>> hi(1,2) (1, 2) >>…

Scatter plot of values in pandas dataframe

I have a pandas dataframe in the following format. I am trying to plot this data based on ClusterAssigned, with probably different colors for 0 and 1. Distance ClusterAssigned23 135 120 …

String Delimiter in Python

I want to do split a string using "},{" as the delimiter. I have tried various things but none of them work.string="2,1,6,4,5,1},{8,1,4,9,6,6,7,0},{6,1,2,3,9},{2,3,5,4,3 "Split it i…

Wrong encoding of email attachment

I have a python 2.7 script running on windows. It logs in gmail, checks for new e-mails and attachments:#!/usr/bin/env python # -*- coding: utf-8 -*-file_types = ["pdf", "doc", &quo…

Blank lines in txt files in Python

I want to write sensor values to a text file with Python. All is working fine but one thing; in the text file, there are blank lines between each value. Its really annoying because I cant put the value…

Python 3 - decode spectroscopy data (Base64, IEEE754)

Im a chemist and working with spectroscopic data that was stored as a list (501 pairs of X,Y data) of Base64-encoded floating point values according to IEEE754.I tried to get an array of X, Y data to w…