I am trying to run an example of the FPGrowth algorithm in Spark, however, I am coming across an error. This is my code:

import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.fpm.{FPGrowth, FPGrowthModel}

val transactions: RDD[Array[String]] = sc.textFile("path/transations.txt").map(_.split(" ")).cache()

val fpg = new FPGrowth().setMinSupport(0.2).setNumPartitions(10)

val model = fpg.run(transactions)

model.freqItemsets.collect().foreach { itemset => println(itemset.items.mkString("[", ",", "]") + ", " + itemset.freq)}


The code works up until the last line where I get the error:

WARN TaskSetManager: Lost task 0.0 in stage 4.0 (TID 16, ip-10-0-0-###.us-west-1.compute.internal): 
com.esotericsoftware.kryo.KryoException: java.lang.IllegalArgumentException: Can not set 
final scala.collection.mutable.ListBuffer field org.apache.spark.mllib.fpm.FPTree$Summary.nodes to scala.collection.mutable.ArrayBuffer
Serialization trace:
nodes (org.apache.spark.mllib.fpm.FPTree$Summary)

I have even tried to use the solution that was proposed here: SPARK-7483

I haven't had any luck with this either. Has anyone found a solution to this? Or does anyone know of a way to just view the results or save them to a text file?

Any help would be greatly appreciated!

I also found the full source code for this algorithm - http://mail-archives.apache.org/mod_mbox/spark-commits/201502.mbox/%3C1cfe817dfdbf47e3bbb657ab343dcf82@git.apache.org%3E



I got the same error: This is because of spark version. In Spark 1.5.2 this is fixed, however I was using 1.3. I fixed by doing the following:


I switched from using spark-shell to spark-submit and then changed the configuration for kryoserializer. Here is my code:

import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD import org.apache.spark.mllib.fpm.FPGrowth import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ListBuffer

object fpgrowth { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Spark FPGrowth") conf.registerKryoClasses(Array(classOf[ArrayBuffer[String]], classOf[ListBuffer[String]]))

val sc = new SparkContext(conf)

val data = sc.textFile("<path to file.txt>")

val transactions: RDD[Array[String]] = data.map(s => s.trim.split(' '))

val fpg = new FPGrowth()
val model = fpg.run(transactions)

model.freqItemsets.collect().foreach { itemset =>
  println(itemset.items.mkString("[", ",", "]") + ", " + itemset.freq)

} }