Assuming I am having the following dataframe:
dummy_data = [('a',1),('b',25),('c',3),('d',8),('e',1)]
df = sc.parallelize(dummy_data).toDF(['letter','number'])
And i want to create the following dataframe:
[('a',0),('b',2),('c',1),('d',3),('e',0)]
What I do is to convert it to rdd
and use zipWithIndex
function and after join the results:
convertDF = (df.select('number').distinct().rdd.zipWithIndex().map(lambda x:(x[0].number,x[1])).toDF(['old','new']))finalDF = (df.join(convertDF,df.number == convertDF.old).select(df.letter,convertDF.new))
Is if there is something similar function as zipWIthIndex
in dataframes? Is there another more efficient way to do this task?
Please check https://issues.apache.org/jira/browse/SPARK-23074 for this direct functionality parity in dataframes .. upvote that jira if you're interested to see this at some point in Spark.
Here's a workaround though in PySpark:
def dfZipWithIndex (df, offset=1, colName="rowId"):'''Enumerates dataframe rows is native order, like rdd.ZipWithIndex(), but on a dataframe and preserves a schema:param df: source dataframe:param offset: adjustment to zipWithIndex()'s index:param colName: name of the index column'''new_schema = StructType([StructField(colName,LongType(),True)] # new added field in front+ df.schema.fields # previous schema)zipped_rdd = df.rdd.zipWithIndex()new_rdd = zipped_rdd.map(lambda args: ([args[1] + offset] + list(args[0])))return spark.createDataFrame(new_rdd, new_schema)
That's also available in abalon package.