Spark-Junit Test

스파크로 개발 후 주요 기능에 대해 테스트를 진행해보자.

먼저 pom.xml에 아래 dependency를 추가한다.

<dependency>  
 <groupId>junit</groupId>  
 <artifactId>junit</artifactId>  
 <version>4.7</version>  
 <scope>test</scope>  
</dependency>  
 <groupId>org.scalatest</groupId>  
 <artifactId>scalatest_2.11</artifactId>  
 <version>3.0.5</version>  
 <scope>test</scope>  
</dependency>

아래 wordCount 메서드를 테스트해보자.

package com.example.test

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions

object WordCount {  
  // 아래 메서드를 테스트해보자.
  def wordCount(fileRDD: RDD) = {  
    fileRDD.flatMap(line => line.split(" "))  
      .map(word => (word, 1)).reduceByKey(_ + _)  
  }  
  
  def main(args: Array[String]): Unit =  
  {  
    val inputPath = args(0)  
    val outputPath = args(1)  
      
    val conf = new SparkConf().setMaster("master").setAppName("wordCount")  
    val sc = new SparkContext(conf)  
    sc.setLogLevel("ERROR")  
    val fileRDD = sc.textFile(inputPath)  
      
    val resultRDD = wordCount(fileRDD)  
    resultRDD.saveAsTextFile(outputPath)  
  }  
}

test 폴더에 test를 위한 클래스를 생성 후 ScalaTest가 제공하는 FunSuite를 통해 쉽게 테스트를 할 수 있다.

import org.scalatest._  
import org.junit.Assert._ 

class WordCount extends FunSuite with SharedSparkContext {  
  test("testWordCount") {  // 테스트 함수 이름
    try {  
      val s = "Hi hi hi bye bye bye word count"  
	  val seq = s.split(" ")  
      val rdd = sc2.parallelize(seq)  
      val result = WordCount.wordCount(rdd)  // wordCount 수행
      assertEquals(8, result.count())  // 결과 확인
        
    } catch {  
      case e: Exception =>  
        e.printStackTrace()  
    }  
  }  
}

여기서 SharedSparkContext는 아래와 같이 정의한다.

SharedSparkContext는 테스트에서 SparkContext 호출이 필요한 메서드들이 공통으로 사용할 수 있다.

import org.apache.spark.SparkContext  
import org.scalatest._  
  
trait SharedSparkContext extends BeforeAndAfterAll { self: Suite =>  
  @transient private var _sc: SparkContext = _  
  
  def sc: SparkContext = _sc  
  
  override def beforeAll() {  
    _sc = SparkContext("local[4]", "test")  
    super.beforeAll()  
  }  
  
  override def afterAll(): Unit = {  
    _sc.stop()  
    _sc = null  
 super.afterAll()  
  }  
}

실행 후 테스트 결과를 확인한다.

private method 테스트 방법

만약 위 wordCount 기능이 private으로 선언되어있다면, 외부 Test Class에서 호출할 수 없다.

이를 위해 리플렉션을 통해 해당 메서드를 접근할 수 있다.

import org.scalatest._  
import org.junit.Assert._ 

class WordCount extends FunSuite with SharedSparkContext {  
  test("testWordCount") {  
    try {  
      val s = "Hi hi hi bye bye bye word count"  
  val seq = s.split(" ")  
      val rdd = sc.parallelize(seq)\  
  
      val method = WordCount.getClass().getDeclaredMethod("wordCount", classOf[RDD])  
      method.setAccessible(true)  // accessible을 true로 설정함.
      val result = method.invoke(WordCount, rdd).asInstanceOf[DataFrame]  
      
      assertEquals(8, result.count())  
  
    } catch {  
      case e: Exception =>  
        e.printStackTrace()  
    }  
  }  
}

참고

http://www.scalatest.org/getting_started_with_fun_suite

https://www.slideshare.net/SparkSummit/spark-summit-eu-talk-by-ted-malaska

Comments