How to explode multiple columns, different types and different lengths?

2024/10/18 15:25:31

I've got a DF with columns of different time cycles (1/6, 3/6, 6/6 etc.) and would like to "explode" all the columns to create a new DF in which each row is a 1/6 cycle.

from pyspark import Row 
from pyspark.sql import SparkSession 
from pyspark.sql.functions import explode, arrays_zip, colspark = SparkSession.builder \.appName('DataFrame') \.master('local[*]') \.getOrCreate()df = spark.createDataFrame([Row(a=1, b=[1, 2, 3, 4, 5, 6], c=[11, 22, 33], d=['foo'])])|  a|                 b|           c|    d|
+---+------------------+------------+-----+
|  1|[1, 2, 3, 4, 5, 6]|[11, 22, 33]|[foo]|
+---+------------------+------------+-----+

I'm doing the explode:

df2 = (df.withColumn("tmp", arrays_zip("b", "c", "d")).withColumn("tmp", explode("tmp")).select("a", col("tmp.b"), col("tmp.c"), "d"))

But the output is not what I want:

|  a|  b|   c|    d|
+---+---+----+-----+
|  1|  1|  11|[foo]|
|  1|  2|  22|[foo]|
|  1|  3|  33|[foo]|
|  1|  4|null|[foo]|
|  1|  5|null|[foo]|
|  1|  6|null|[foo]|
+---+---+----+-----+

I would want it to look like this:

|  a|  b|  c|  d|
+---+---+---+---+
|  1|  1| 11|foo|
|   |  2|   |   |
|   |  3| 22|   |
|   |  4|   |   |
|   |  5| 33|   |
|   |  6|   |   |
+---+---+---+---+

I am new to Spark and from the start I've got complicated topics ! :)

Update 2019-07-15: Maybe someone has a solution without usage of UDFs? -> answered by @jxc

Update 2019-07-17: Maybe someone has a solution how to change the null <-> values sequences in more complicated order? Like in c - Null, 11, Null, 22, Null, 33 or more complex situation as we want in column d first value to be Null, next foo then Null, Null, Null:

|  a|  b|  c|  d|
+---+---+---+---+
|  1|  1|   |   |
|   |  2| 11|foo|
|   |  3|   |   |
|   |  4| 22|   |
|   |  5|   |   |
|   |  6| 33|   |
+---+---+---+---+
Answer

Here is one way without using udf:

UPDATE on 2019/07/17: adjusted SQL stmt and added N=6 as parameter to SQL.

UPDATE on 2019/07/16: removed the temporary column t, replaced with a constant array(0,1,2,3,4,5) in the transform function. In such case, we can operate on the value of the array elements directly instead of their indexes.

UPDATE: I removed the original method which uses String functions and converts data types in the array elements all into String and less efficient. The Spark SQL higher-order functions with Spark 2.4+ should be better than the original method.

Setup

from pyspark.sql import functions as F, Rowdf = spark.createDataFrame([ Row(a=1, b=[1, 2, 3, 4, 5, 6], c=['11', '22', '33'], d=['foo'], e=[111,222]) ])>>> df.show()
+---+------------------+------------+-----+----------+
|  a|                 b|           c|    d|         e|
+---+------------------+------------+-----+----------+
|  1|[1, 2, 3, 4, 5, 6]|[11, 22, 33]|[foo]|[111, 222]|
+---+------------------+------------+-----+----------+# columns you want to do array-explode
cols = df.columns# number of array elements to set
N = 6

Using SQL higher-order function: transform

Use the Spark SQL higher-order function: transform(), do the following:

  1. create the following Spark SQL code where {0} will be replaced by the column_name, {1} will be replaced by N:

    stmt = '''CASEWHEN '{0}' in ('d') THENtransform(sequence(0,{1}-1), x -> IF(x == 1, `{0}`[0], NULL))WHEN size(`{0}`) <= {1}/2 AND size(`{0}`) > 1 THENtransform(sequence(0,{1}-1), x -> IF(((x+1)*size(`{0}`))%{1} == 0, `{0}`[int((x-1)*size(`{0}`)/{1})], NULL))ELSE `{0}`END AS `{0}`
    '''
    

    Note: array transformation only defined when array contains more than one (unless specified in a separate WHEN clause) and <= N/2 elements (in this example, 1 < size <= 3). arrays with other size will be kept as-is.

  2. Run the above SQL with selectExpr() for all required columns

    df1 = df.withColumn('a', F.array('a')) \.selectExpr(*[ stmt.format(c,N) for c in cols ])>>> df1.show()
    +---+------------------+----------------+-----------+---------------+
    |  a|                 b|               c|          d|              e|
    +---+------------------+----------------+-----------+---------------+
    |[1]|[1, 2, 3, 4, 5, 6]|[, 11,, 22,, 33]|[, foo,,,,]|[,, 111,,, 222]|
    +---+------------------+----------------+-----------+---------------+
    
  3. run arrays_zip and explode:

    df_new = df1.withColumn('vals', F.explode(F.arrays_zip(*cols))) \.select('vals.*') \.fillna('', subset=cols)>>> df_new.show()
    +----+---+---+---+----+
    |   a|  b|  c|  d|   e|
    +----+---+---+---+----+
    |   1|  1|   |   |null|
    |null|  2| 11|foo|null|
    |null|  3|   |   | 111|
    |null|  4| 22|   |null|
    |null|  5|   |   |null|
    |null|  6| 33|   | 222|
    +----+---+---+---+----+
    

    Note: fillna('', subset=cols) only changed columns containing Strings

In one method chain:

df_new = df.withColumn('a', F.array('a')) \.selectExpr(*[ stmt.format(c,N) for c in cols ]) \.withColumn('vals', F.explode(F.arrays_zip(*cols))) \.select('vals.*') \.fillna('', subset=cols)

Explanation with the transform function:

The transform function (list below, reflect to an old revision of requirements)

transform(sequence(0,5), x -> IF((x*size({0}))%6 == 0, {0}[int(x*size({0})/6)], NULL))

As mentioned in the post, {0} will be replaced with column name. Here we use column-c which contains 3 elements as an example:

  • In the transform function, sequence(0,5) creates a constant array array(0,1,2,3,4,5) with 6 elements, and the rest sets the lambda function with one argument x having the value of elements.
  • IF(condition, true_value, false_value): is a standard SQL function
  • The condition we applied is: (x*size(c))%6 == 0 where size(c)=3, if this condition is true, it will return c[int(x*size(c)/6)], otherwise, return NULL. so for x from 0 to 5, we will have:

    ((0*3)%6)==0) true   -->  c[int(0*3/6)] = c[0]
    ((1*3)%6)==0) false  -->  NULL
    ((2*3)%6)==0) true   -->  c[int(2*3/6)] = c[1]
    ((3*3)%6)==0) false  -->  NULL
    ((4*3)%6)==0) true   -->  c[int(4*3/6)] = c[2]
    ((5*3)%6)==0) false  -->  NULL
    

Similar to column-e which contains a 2-element array.

https://en.xdnf.cn/q/72829.html

Related Q&A

How to convert \xXY encoded characters to UTF-8 in Python?

I have a text which contains characters such as "\xaf", "\xbe", which, as I understand it from this question, are ASCII encoded characters. I want to convert them in Python to their…

Pandas One hot encoding: Bundling together less frequent categories

Im doing one hot encoding over a categorical column which has some 18 different kind of values. I want to create new columns for only those values, which appear more than some threshold (lets say 1%), …

How to pass classs self through a flask.Blueprint.route decorator?

I am writing my websites backend using Flask and Python 2.7, and have run into a bit of a problem. I like to use classes to enclose my functions, it makes things neat for me and helps me keep everythin…

why cannot I use sp.signal by import scipy as sp? [duplicate]

This question already has an answer here:scipy.special import issue(1 answer)Closed 8 years ago.I would like to use scipy.signal.lti and scipy.signal.impulse function to calculate the transfer function…

How to speed up nested cross validation in python?

From what Ive found there is 1 other question like this (Speed-up nested cross-validation) however installing MPI does not work for me after trying several fixes also suggested on this site and microso…

Streaming video from camera in FastAPI results in frozen image after first frame

I am trying to stream video from a camera using FastAPI, similar to an example I found for Flask. In Flask, the example works correctly, and the video is streamed without any issues. However, when I tr…

Fastest way to concatenate multiple files column wise - Python

What is the fastest method to concatenate multiple files column wise (within Python)?Assume that I have two files with 1,000,000,000 lines and ~200 UTF8 characters per line.Method 1: Cheating with pas…

Can autograd in pytorch handle a repeated use of a layer within the same module?

I have a layer layer in an nn.Module and use it two or more times during a single forward step. The output of this layer is later inputted to the same layer. Can pytorchs autograd compute the grad of t…

Altering numpy function output array in place

Im trying to write a function that performs a mathematical operation on an array and returns the result. A simplified example could be:def original_func(A):return A[1:] + A[:-1]For speed-up and to avoi…

Does the E-factory of lxml support dynamically generated data?

Is there a way of creating the tags dynamically with the E-factory of lxml? For instance I get a syntax error for the following code:E.BODY(E.TABLE(for row_num in range(len(ws.rows)):row = ws.rows[row…