Transform map to mapPartition using pyspark

2024/10/14 6:17:39

I am trying to load a tensorflow model from disk and predicting the values.


def get_value(row):print("**********************************************")graph = tf.Graph()rowkey = row[0]checkpoint_file = "/home/sahil/Desktop/Relation_Extraction/data/1485336002/checkpoints/model-300"print("Loading model................................")with graph.as_default():session_conf = tf.ConfigProto(allow_soft_placement=allow_soft_placement,log_device_placement=log_device_placement)sess = tf.Session(config=session_conf)with sess.as_default():# Load the saved meta graph and restore variablessaver = tf.train.import_meta_graph("{}.meta".format(checkpoint_file))saver.restore(sess, checkpoint_file)input_x = graph.get_operation_by_name("X_train").outputs[0]dropout_keep_prob = graph.get_operation_by_name("dropout_keep_prob").outputs[0]predictions = graph.get_operation_by_name("output/predictions").outputs[0]batch_predictions =, {input_x: [row[1]], dropout_keep_prob: 1.0})print(batch_predictions)return (rowkey, batch_predictions)

I have a RDD which consists of a tuple (rowkey, input_vector). I want to use the loaded model to predict the score/class of the input.

Code to call get_value()

result = iter: get_value(iter))

The problem is every time I call the map, the model is loaded everytime for each tuple and it takes a lot of time.

I am thinking of loading the model using mapPartitions and then use map to call get_value function. I have no clue as how to convert the code to a mapPartition where I load the tensorflow model only once per parition and reduce the running time.

Thanks in advance.


I am not sure if I get your question correctly, but we can optimise your code a bit here.

graph = tf.Graph()checkpoint_file = "/home/sahil/Desktop/Relation_Extraction/data/1485336002/checkpoints/model-300"with graph.as_default():session_conf = tf.ConfigProto(allow_soft_placement=allow_soft_placement,log_device_placement=log_device_placement)sess = tf.Session(config=session_conf)s = sess.as_default()
saver = tf.train.import_meta_graph("{}.meta".format(checkpoint_file))
saver.restore(sess, checkpoint_file)input_x = graph.get_operation_by_name("X_train").outputs[0]
dropout_keep_prob = graph.get_operation_by_name("dropout_keep_prob").outputs[0]
predictions = graph.get_operation_by_name("output/predictions").outputs[0]session_pickle = cPickle.dumps(sess)def get_value(key, vector, session_pickle):sess = cPickle.loads(session_pickle)rowkey = keybatch_predictions =, {input_x: [vector], dropout_keep_prob: 1.0})print(batch_predictions)return (rowkey, batch_predictionsresult = (key, row): get_value(key=key, vector = row ,  session_pickle = session_pickle))

So you can serialize your tensorflow session. Though I haven't tested your code here. Run this and leave a comment.

