Implementing a recursive algorithm in pyspark to find pairings within a dataframe

2024/10/6 8:39:32

I have a spark dataframe (prof_student_df) that lists student/professor pair for a timestamp. There are 4 professors and 4 students for each timestamp and each professor-student pair has a “score” (so there are 16 rows per time frame). For each time frame, I need to find the one to one pairing between professors/students that maximizes the overall score. Each professor can only be matched with one student for a single time frame.

For example, here are the pairings/scores for one time frame.

|    time    | professor_id | student_id | score | is_match |
| 1596048041 | p1           | s1         |   0.7 | FALSE    |
| 1596048041 | p1           | s2         |   0.5 | TRUE     |
| 1596048041 | p1           | s3         |   0.3 | FALSE    |
| 1596048041 | p1           | s4         |   0.2 | FALSE    |
| 1596048041 | p2           | s1         |   0.9 | TRUE     |
| 1596048041 | p2           | s2         |   0.1 | FALSE    |
| 1596048041 | p2           | s3         |  0.15 | FALSE    |
| 1596048041 | p2           | s4         |   0.2 | FALSE    |
| 1596048041 | p3           | s1         |   0.2 | FALSE    |
| 1596048041 | p3           | s2         |   0.3 | FALSE    |
| 1596048041 | p3           | s3         |   0.4 | FALSE    |
| 1596048041 | p3           | s4         |   0.8 | TRUE     |
| 1596048041 | p4           | s1         |   0.2 | FALSE    |
| 1596048041 | p4           | s2         |   0.3 | FALSE    |
| 1596048041 | p4           | s3         |  0.35 | TRUE     |
| 1596048041 | p4           | s4         |   0.4 | FALSE    |

The goal Is to get this is_match column. It can be a boolean or a 0/1 bit or whatever works.

In the above example, p1 matched with s2, p2 matched with s1, p3 matched with s4 and p4 matched with s3 because that is the combination that maximized the total score (yields a score of 2.55). There is one weird edge case - it is possible to have LESS than 4 professors or students for a given time frame. If there are 4 professors and 3 students then 1 professor would be without a pairing and all of his is_match would be false. Similarly, if there are 3 professors and 4 students, 1 student would be without a pairing and all of his is_match would be false.

Does anyone know how I might accomplish this? i am thinking I would partition or group by time and then feed the data into some UDF that spits out the pairings and then maybe I would have to join that back to the original rows (although I am not sure). I am trying to implement this logic in pyspark and can use spark sql/sql or pyspark.

Ideally, I would like this to be as efficient as possible as there will be millions of rows. In the question, I mentioned a recursive algorithm because this is a traditional recursive type problem, but if there is a quicker solution that doesn't use recursion I am open to that.

many thanks, I am new to spark and a little stumped with how to do this.

EDIT: clarifying the question as I realize in my example I did not specify this for a single day, there will be up to 14 professors and 14 students to choose from. I am just looking at one day at a time which is why I didnt have the date in the dataframe. at any one time frame, there is at most 4 professors and 4 students. this dataframe just shows one time frame. but for the next time frame it is possible that the 4 professors are p5, p1, p7, p9 or something like that. the students might still be s1, s2, s3, s4.


Edit: As discussed in comments, to fix the issue mentioned in your update, we can convert student_id at each time into generalized sequence-id using dense_rank, go through Step 1 to 3 (using student column) and then use join to convert student at each time back to their original student_id. see below Step-0 and Step-4. in case there are less than 4 professors in a timeUnit, dimension will be resize to 4 in Numpy-end (using np_vstack() and np_zeros()), see the updated function find_assigned.

You can try pandas_udf and scipy.optimize.linear_sum_assignment(note: the backend method is the Hungarian algorithm as mentioned by @cronoik in the main comments), see below:

from pyspark.sql.functions import pandas_udf, PandasUDFType, first, expr, dense_rank
from pyspark.sql.types import StructType
from scipy.optimize import linear_sum_assignment
from pyspark.sql import Window
import numpy as npdf = spark.createDataFrame([('1596048041', 'p1', 's1', 0.7), ('1596048041', 'p1', 's2', 0.5), ('1596048041', 'p1', 's3', 0.3),('1596048041', 'p1', 's4', 0.2), ('1596048041', 'p2', 's1', 0.9), ('1596048041', 'p2', 's2', 0.1),('1596048041', 'p2', 's3', 0.15), ('1596048041', 'p2', 's4', 0.2), ('1596048041', 'p3', 's1', 0.2),('1596048041', 'p3', 's2', 0.3), ('1596048041', 'p3', 's3', 0.4), ('1596048041', 'p3', 's4', 0.8),('1596048041', 'p4', 's1', 0.2), ('1596048041', 'p4', 's2', 0.3), ('1596048041', 'p4', 's3', 0.35),('1596048041', 'p4', 's4', 0.4)
] , ['time', 'professor_id', 'student_id', 'score'])N = 4
cols_student = [*range(1,N+1)]

Step-0: add an extra column student, and create a new dataframe df3 with all unique combos of time + student_id + student.

w1 = Window.partitionBy('time').orderBy('student_id')df = df.withColumn('student', dense_rank().over(w1))
|      time|professor_id|student_id|score|student|
|1596048041|          p1|        s1|  0.7|      1|
|1596048041|          p2|        s1|  0.9|      1|
|1596048041|          p3|        s1|  0.2|      1|
|1596048041|          p4|        s1|  0.2|      1|
|1596048041|          p1|        s2|  0.5|      2|
|1596048041|          p2|        s2|  0.1|      2|
|1596048041|          p3|        s2|  0.3|      2|
|1596048041|          p4|        s2|  0.3|      2|
|1596048041|          p1|        s3|  0.3|      3|
|1596048041|          p2|        s3| 0.15|      3|
|1596048041|          p3|        s3|  0.4|      3|
|1596048041|          p4|        s3| 0.35|      3|
|1596048041|          p1|        s4|  0.2|      4|
|1596048041|          p2|        s4|  0.2|      4|
|1596048041|          p3|        s4|  0.8|      4|
|1596048041|          p4|        s4|  0.4|      4|
+----------+------------+----------+-----+-------+df3 ='time','student_id','student').dropDuplicates()
|      time|student_id|student|
|1596048041|        s1|      1|
|1596048041|        s2|      2|
|1596048041|        s3|      3|
|1596048041|        s4|      4|

Step-1: use pivot to find the matrix of professors vs students, notice we set negative of scores to the values of pivot so that we can use scipy.optimize.linear_sum_assignment to find the min cost of an assignment problem:

df1 = df.groupby('time','professor_id').pivot('student', cols_student).agg(-first('score'))
|      time|professor_id|   1|   2|    3|   4|
|1596048041|          p4|-0.2|-0.3|-0.35|-0.4|
|1596048041|          p2|-0.9|-0.1|-0.15|-0.2|
|1596048041|          p1|-0.7|-0.5| -0.3|-0.2|
|1596048041|          p3|-0.2|-0.3| -0.4|-0.8|

Step-2: use pandas_udf and scipy.optimize.linear_sum_assignment to get column indices and then assign the corresponding column name to a new column assigned:

# returnSchema contains one more StringType column `assigned` than schema from the input pdf:
schema = StructType.fromJson(df1.schema.jsonValue()).add('assigned', 'string')# since the # of students are always N, we can use np.vstack to set the N*N matrix
# below `n` is the number of professors/rows in pdf
# sz is the size of input Matrix, sz=4 in this example
def __find_assigned(pdf, sz):cols = pdf.columns[2:]n = pdf.shape[0]n1 = pdf.iloc[:,2:].fillna(0).values_, idx = linear_sum_assignment(np.vstack((n1,np.zeros((sz-n,sz)))))return pdf.assign(assigned=[cols[i] for i in idx][:n])find_assigned = pandas_udf(lambda x: __find_assigned(x,N), schema, PandasUDFType.GROUPED_MAP)df2 = df1.groupby('time').apply(find_assigned)
|      time|professor_id|   1|   2|    3|   4|assigned|
|1596048041|          p4|-0.2|-0.3|-0.35|-0.4|       3|
|1596048041|          p2|-0.9|-0.1|-0.15|-0.2|       1|
|1596048041|          p1|-0.7|-0.5| -0.3|-0.2|       2|
|1596048041|          p3|-0.2|-0.3| -0.4|-0.8|       4|

Note: per suggestion from @OluwafemiSule, we can use the parameter maximize instead of negate the score values. this parameter is available SciPy 1.4.0+:

  _, idx = linear_sum_assignment(np.vstack((n1,np.zeros((N-n,N)))), maximize=True)

Step-3: use SparkSQL stack function to normalize the above df2, negate the score values and filter rows with score is NULL. the desired is_match column should have assigned==student:

df_new = df2.selectExpr('time','professor_id','assigned','stack({},{}) as (student, score)'.format(len(cols_student), ','.join("int('{0}'), -`{0}`".format(c) for c in cols_student))
) \
.filter("score is not NULL") \
.withColumn('is_match', expr("assigned=student"))
|      time|professor_id|assigned|student|score|is_match|
|1596048041|          p4|       3|      1|  0.2|   false|
|1596048041|          p4|       3|      2|  0.3|   false|
|1596048041|          p4|       3|      3| 0.35|    true|
|1596048041|          p4|       3|      4|  0.4|   false|
|1596048041|          p2|       1|      1|  0.9|    true|
|1596048041|          p2|       1|      2|  0.1|   false|
|1596048041|          p2|       1|      3| 0.15|   false|
|1596048041|          p2|       1|      4|  0.2|   false|
|1596048041|          p1|       2|      1|  0.7|   false|
|1596048041|          p1|       2|      2|  0.5|    true|
|1596048041|          p1|       2|      3|  0.3|   false|
|1596048041|          p1|       2|      4|  0.2|   false|
|1596048041|          p3|       4|      1|  0.2|   false|
|1596048041|          p3|       4|      2|  0.3|   false|
|1596048041|          p3|       4|      3|  0.4|   false|
|1596048041|          p3|       4|      4|  0.8|    true|

Step-4: use join to convert student back to student_id (use broadcast join if possible):

df_new = df_new.join(df3, on=["time", "student"])
|      time|student|professor_id|assigned|score|is_match|student_id|
|1596048041|      1|          p1|       2|  0.7|   false|        s1|
|1596048041|      2|          p1|       2|  0.5|    true|        s2|
|1596048041|      3|          p1|       2|  0.3|   false|        s3|
|1596048041|      4|          p1|       2|  0.2|   false|        s4|
|1596048041|      1|          p2|       1|  0.9|    true|        s1|
|1596048041|      2|          p2|       1|  0.1|   false|        s2|
|1596048041|      3|          p2|       1| 0.15|   false|        s3|
|1596048041|      4|          p2|       1|  0.2|   false|        s4|
|1596048041|      1|          p3|       4|  0.2|   false|        s1|
|1596048041|      2|          p3|       4|  0.3|   false|        s2|
|1596048041|      3|          p3|       4|  0.4|   false|        s3|
|1596048041|      4|          p3|       4|  0.8|    true|        s4|
|1596048041|      1|          p4|       3|  0.2|   false|        s1|
|1596048041|      2|          p4|       3|  0.3|   false|        s2|
|1596048041|      3|          p4|       3| 0.35|    true|        s3|
|1596048041|      4|          p4|       3|  0.4|   false|        s4|
+----------+-------+------------+--------+-----+--------+----------+df_new = df_new.drop("student", "assigned")

