分享

为ZooKeeper, Kafka 和 Spark 应用编写单元测试示例


问题导读


1.如何为zookeeper编写单元测试?
2.如何为Kafka编写单元测试?
3.如何为Spark编写单元测试?








ZooKeeper, Kafka和Spark是当下流行的大数据平台工具之一。这两年得到飞速的发展,国内厂商也越来越多的使用它们。 本站有多篇文章介绍了它们的开发指南, 如: 跟着实例学习ZooKeeper的用法 Kafka和Spring集成实践 Kafka快速入门 Spark Streaming 集成 Kafka 总结 Spark 开发指南 ......官方网站提供了很多的代码例子,互联网上也有很多的开发例子,你可以很容易的学习如果编写基于这些平台框架的技术。 但是如何为这些应用编写单元测试呢? 本文提供了几种编写单元测试的技术。
所有的例子都可以在 这里 找到。
为ZooKeeper应用编写单元测试
一般我们不直接使用ZooKeeper的API编写Client代码,而是使用Curator或者i0itec-zkclient等包装的API开发。 原因是ZooKeeper本身提供的API太过底层,我们需要处理各种异常,并且API使用不方便。
有几种方式可以实现一个ZooKeeper simulator。
  • Curator TestingServer Curator提供了一个TestingServer类,可以模拟ZooKeeper,用来测试Curator应用或者其它使用ZooKeeper的应用。 你需要在pom.xml中引用curator-test:
  1. <dependency>
  2.         <groupId>org.apache.curator</groupId>
  3.         <artifactId>curator-test</artifactId>
  4.         <version>${curator.version}</version>
  5. </dependency>
复制代码


测试代码如下:
  1. public class CuratorAppTest {
  2.         private TestingServer server;
  3.        
  4.         @BeforeClass
  5.         public void setUp() throws Exception {
  6.                 server = new TestingServer();
  7.                 server.start();
  8.         }
  9.         @AfterClass
  10.         public void tearDown() throws IOException {
  11.                 server.stop();
  12.         }
  13.        
  14.         @Test
  15.         public void testSetAndGetData() {               
  16.                 CuratorApp app = new CuratorApp();
  17.                 String payload = System.currentTimeMillis() + "";
  18.                 String result = app.setAndGetData(server.getConnectString(), payload);
  19.                 assertEquals(result, payload);
  20.         }
  21.        
  22.         @Test
  23.         public void testWatch() throws Exception {               
  24.                 CuratorApp app = new CuratorApp();
  25.                 app.watch(server.getConnectString());
  26.         }
  27. }
复制代码


  • EmbeddedZookeeper Kafka为了它的单元测试实现一个嵌入式的ZooKeeper,代码相当简单:
  1. package kafka.zk
  2. import org.apache.zookeeper.server.ZooKeeperServer
  3. import org.apache.zookeeper.server.NIOServerCnxnFactory
  4. import kafka.utils.TestUtils
  5. import java.net.InetSocketAddress
  6. import kafka.utils.Utils
  7. import org.apache.kafka.common.utils.Utils.getPort
  8. class EmbeddedZookeeper(val connectString: String) {
  9.   val snapshotDir = TestUtils.tempDir()
  10.   val logDir = TestUtils.tempDir()
  11.   val tickTime = 500
  12.   val zookeeper = new ZooKeeperServer(snapshotDir, logDir, tickTime)
  13.   val factory = new NIOServerCnxnFactory()
  14.   factory.configure(new InetSocketAddress("127.0.0.1", getPort(connectString)), 0)
  15.   factory.startup(zookeeper)
  16.   def shutdown() {
  17.     Utils.swallow(zookeeper.shutdown())
  18.     Utils.swallow(factory.shutdown())
  19.     Utils.rm(logDir)
  20.     Utils.rm(snapshotDir)
  21.   }
  22.   
  23. }
复制代码




测试工具类:
  1. package kafka.zk
  2. import org.scalatest.junit.JUnit3Suite
  3. import org.I0Itec.zkclient.ZkClient
  4. import kafka.utils.{ZKStringSerializer, TestZKUtils, Utils}
  5. trait ZooKeeperTestHarness extends JUnit3Suite {
  6.   val zkConnect: String = TestZKUtils.zookeeperConnect
  7.   var zookeeper: EmbeddedZookeeper = null
  8.   var zkClient: ZkClient = null
  9.   val zkConnectionTimeout = 6000
  10.   val zkSessionTimeout = 6000
  11.   override def setUp() {
  12.     super.setUp
  13.     zookeeper = new EmbeddedZookeeper(zkConnect)
  14.     zkClient = new ZkClient(zookeeper.connectString, zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer)
  15.   }
  16.   override def tearDown() {
  17.     Utils.swallow(zkClient.close())
  18.     Utils.swallow(zookeeper.shutdown())
  19.     super.tearDown
  20.   }
  21. }
复制代码


  • 自己实现 Kafka的类无法直接在java中使用,你可以使用一些花招, 比如把trait改为abstract class改造使用。 我们可以参考Kafka的scala代码使用java自己实现一个, 主要例如org.apache.zookeeper.server.ZooKeeperServer和org.apache.zookeeper.server.NIOServerCnxnFactory。 如这个代码: gist 或者如stackoverflow上讨论的:
  1. Properties startupProperties = ...
  2. QuorumPeerConfig quorumConfiguration = new QuorumPeerConfig();
  3. try {
  4.     quorumConfiguration.parseProperties(startupProperties);
  5. } catch(Exception e) {
  6.     throw new RuntimeException(e);
  7. }
  8. zooKeeperServer = new ZooKeeperServerMain();
  9. final ServerConfig configuration = new ServerConfig();
  10. configuration.readFrom(quorumConfiguration);
  11. new Thread() {
  12.     public void run() {
  13.         try {
  14.             zooKeeperServer.runFromConfig(configuration);
  15.         } catch (IOException e) {
  16.             log.error("ZooKeeper Failed", e);
  17.         }
  18.     }
  19. }.start();
复制代码


为Kafka应用编写单元测试
Kafka代码本身就提供了单元测试,所以我们编写Kafka producer应用和consumer应用时可以参考这些应用。 你可以在线查看这些单元测试以及它们可以重用的文件: kafka单元测试类 KafkaServerTestHarness 提供了一个基本测试trait:
  1. trait KafkaServerTestHarness extends JUnit3Suite with ZooKeeperTestHarness {
  2.   val configs: List[KafkaConfig]
  3.   var servers: List[KafkaServer] = null
  4.   override def setUp() {
  5.     super.setUp
  6.     if(configs.size <= 0)
  7.       throw new KafkaException("Must suply at least one server config.")
  8.     servers = configs.map(TestUtils.createServer(_))
  9.   }
  10.   override def tearDown() {
  11.     servers.map(server => server.shutdown())
  12.     servers.map(server => server.config.logDirs.map(Utils.rm(_)))
  13.     super.tearDown
  14.   }
  15. }
复制代码


以及提供一个初始化好producer和consumer的Trait:
  1. trait ProducerConsumerTestHarness extends JUnit3Suite with KafkaServerTestHarness {
  2.     val port: Int
  3.     val host = "localhost"
  4.     var producer: Producer[String, String] = null
  5.     var consumer: SimpleConsumer = null
  6.   override def setUp() {
  7.       super.setUp
  8.       val props = TestUtils.getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), "kafka.utils.StaticPartitioner")
  9.       producer = new Producer(new ProducerConfig(props))
  10.       consumer = new SimpleConsumer(host, port, 1000000, 64*1024, "")
  11.     }
  12.    override def tearDown() {
  13.      producer.close()
  14.      consumer.close()
  15.      super.tearDown
  16.    }
  17. }
复制代码



如果你想在代码中使用它们,你需要引入:
  1. <dependency>
  2.         <groupId>org.apache.kafka</groupId>
  3.         <artifactId>kafka_2.10</artifactId>
  4.         <classifier>test</classifier>
  5.         <scope>test</scope>
  6.         <version>${kafka.version}</version>
  7. </dependency>
复制代码

如果你想在Java直接实现这些trait是不可以的,这些Trait都有实现方法,在Java中无法直接使用,可能你需要把trait改为abstract class。 或者你使用它的TestUtils创建。
  1. @BeforeClass
  2. public void setup() {
  3.         ......
  4. }
  5. @AfterClass
  6. public void teardown() {
  7.         ......
  8. }
  9. @Test
  10. public void testReceive() {
  11.         Properties consumerProps = TestUtils.createConsumerProperties(zkServer.connectString(), "group_1", "consumer_id", 1000);
  12.         consumerProps.putAll(kafkaProps);
  13.         ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
  14.         ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
  15.         Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
  16.         topicCountMap.put(topic, new Integer(1));
  17.         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
  18.         List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
  19.         // now launch all the threads
  20.         ExecutorService executor = Executors.newFixedThreadPool(2);
  21.         // now create an object to consume the messages
  22.         int threadNumber = 0;
  23.         for (final KafkaStream<byte[], byte[]> stream : streams) {
  24.                 executor.submit(new ConsumerThread(stream, threadNumber));
  25.                 threadNumber++;
  26.         }
  27.         // setup producer
  28.         Properties properties = TestUtils.getProducerConfig("localhost:" + port);
  29.         properties.put("serializer.class", StringEncoder.class.getCanonicalName());
  30.         ProducerConfig pConfig = new ProducerConfig(properties);
  31.         Producer<Integer, String> producer = new Producer<>(pConfig);
  32.         // send message
  33.         for (int i = 0; i < 10; i++) {
  34.                 KeyedMessage<Integer, String> data = new KeyedMessage<>(topic, "test-message-" + i);
  35.                 List<KeyedMessage<Integer, String>> messages = new ArrayList<KeyedMessage<Integer, String>>();
  36.                 messages.add(data);
  37.                 // producer.send(scala.collection.JavaConversions.asScalaBuffer(messages));
  38.                 producer.send(scala.collection.JavaConversions.asScalaBuffer(messages));
  39.         }
  40.         try {
  41.                 Thread.sleep(20000);
  42.         } catch (InterruptedException e) {
  43.                 e.printStackTrace();
  44.         }
  45.         executor.shutdownNow();
  46.         producer.close();
  47. }
复制代码

为Spark应用编写单元测试
如果是为普通的Spark应用编写单元测试代码, 比较简单,创建SparkContext时只需将master设为local即可。确保在finally或者teardown方法中调用SparkContext.stop(),因为Spark不允许在同一个程序中拥有两个SparkContext。 以SparkPi为例。
  1. public final class JavaSparkPi {
  2.         public static void main(String[] args) throws Exception {
  3.                 double pi = calculatePi(args);               
  4.                 System.out.println("Pi is roughly " + pi);
  5.                
  6.         }
  7.         public static double calculatePi(String[] args) {
  8.                 SparkConf sparkConf = new SparkConf().setAppName("JavaSparkPi");
  9.                 JavaSparkContext jsc = new JavaSparkContext(sparkConf);
  10.                 int slices = (args.length == 1) ? Integer.parseInt(args[0]) : 2;
  11.                 int n = 100000 * slices;
  12.                 List<Integer> l = new ArrayList<Integer>(n);
  13.                 for (int i = 0; i < n; i++) {
  14.                         l.add(i);
  15.                 }
  16.                 JavaRDD<Integer> dataSet = jsc.parallelize(l, slices);
  17.                 int count = dataSet.map(new Function<Integer, Integer>() {
  18.                         @Override
  19.                         public Integer call(Integer integer) {
  20.                                 double x = Math.random() * 2 - 1;
  21.                                 double y = Math.random() * 2 - 1;
  22.                                 return (x * x + y * y < 1) ? 1 : 0;
  23.                         }
  24.                 }).reduce(new Function2<Integer, Integer, Integer>() {
  25.                         @Override
  26.                         public Integer call(Integer integer, Integer integer2) {
  27.                                 return integer + integer2;
  28.                         }
  29.                 });
  30.                 double pi = 4.0 * count / n;
  31.                 jsc.stop();
  32.                 return pi;
  33.         }
  34. }
复制代码

单元测试类:
  1. public class JavaSparkPiTest {
  2.         @Test
  3.         public void testPi() {
  4.                 Properties props = System.getProperties();               
  5.                 props.setProperty("spark.master", "local[4]");
  6.                 props.setProperty("spark.rdd.compress", "true");
  7.                 props.setProperty("spark.executor.memory", "1g");
  8.                 try {
  9.                         double pi = JavaSparkPi.calculatePi(new String[]{"1"});
  10.                         assertTrue((pi -3.14) < 1);
  11.                 } catch (Exception e) {
  12.                         fail(e.getMessage(),e);
  13.                 }
  14.         }
  15. }
复制代码


对于Spark Streaming应用, 单元测试相对复杂,因为你需要集成其它的框架。 如果你的应用集成Kafka,你可以使用上面的Kafka的测试类, 如果你的应用集成Mongo,你可以使用真实的Mongo或者fake Mongo如Fongo, 如果你的应用集成TCP 流, 你需要实现一个TCP server simulator, 基本上你应该寻找或者实现要集成的框架的simulator。




已有(1)人评论

跳转到指定楼层
hb1984 发表于 2015-2-3 17:59:14
谢谢楼主分享。     
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条