PySpark reversing StringIndexer in nested array

2024/4/15 1:05:23

I'm using PySpark to do collaborative filtering using ALS. My original user and item id's are strings, so I used StringIndexer to convert them to numeric indices (PySpark's ALS model obliges us to do so).

After I've fitted the model, I can get the top 3 recommendations for each user like so:

recs = (model.recommendForAllUsers(3)

The recs dataframe looks like so:

|userIdIndex|     recommendations|
|       1580|[[10096,3.6725707...|
|       4900|[[10096,3.0137873...|
|       5300|[[10096,2.7274625...|
|       6620|[[10096,2.4493625...|
|       7240|[[10096,2.4928937...|
only showing top 5 rowsroot|-- userIdIndex: integer (nullable = false)|-- recommendations: array (nullable = true)|    |-- element: struct (containsNull = true)|    |    |-- productIdIndex: integer (nullable = true)|    |    |-- rating: float (nullable = true)

I want to create a huge JSOM dump with this dataframe, and I can like so:


and a sample of these jsons is:

{"userIdIndex": 1580,"recommendations": [{"productIdIndex": 10096,"rating": 3.6725707},{"productIdIndex": 10141,"rating": 3.61542},{"productIdIndex": 11591,"rating": 3.536216}]

The userIdIndex and productIdIndex keys are due to the StringIndexer transformation.

How can I get the original value of these columns back? I suspect I must use the IndexToString transformer, but I can't quite figure out how since the data is nested in an array inside the recs Dataframe.

I tried to use a Pipeline evaluator (stages=[StringIndexer, ALS, IndexToString]) but it looks like this evaluator doesn't support these indexers.



In both cases you'll need an access to the list of labels. This can be accessed using either a StringIndexerModel

user_indexer_model = ...  # type: StringIndexerModel
user_labels = user_indexer_model.labelsproduct_indexer_model = ...  # type: StringIndexerModel
product_labels = product_indexer_model.labels

or column metadata.

For userIdIndex you can just apply IndexToString:

from import IndexToStringuser_id_to_label = IndexToString(inputCol="userIdIndex", outputCol="userId", labels=user_labels)

For recommendations you'll need either udf or expression like this:

from pyspark.sql.functions import array, col, lit, structn = 3  # Same as numItemsproduct_labels_ = array(*[lit(x) for x in product_labels])
recommendations = array(*[struct(product_labels_[col("recommendations")[i]["productIdIndex"]].alias("productId"),col("recommendations")[i]["rating"].alias("rating")
) for i in range(n)])recs.withColumn("recommendations", recommendations)

