分享

Storm入门指南第二章 入门

本帖最后由 pig2 于 2014-7-24 00:56 编辑
问题导读
1、什么是分组和消息流?
2、Storm以什么方式向每个bolt实例发送消息?
3、怎么创建、使用Storm项目?




本章我们将会创建一个Storm工程和我们的第一个Storm topology。

提示:下述假设你已经安装JRE1.6或者更高级版本。推荐使用Oracle提供的JRE:下载地址

操作模式
在开始创建项目之前,了解Storm的操作模式(operation modes)是很重要的。Storm有两种运行方式:

本地模式
在本地模式下,Storm topologies 运行在本地机器的一个JVM中。因为本地模式下查看所有topology组件共同工作最为简单,所以这种模式被用于开发、测试和调试。例如,我们可以调整参数,这使得我们可以看到我们的topology在不同的Storm配置环境中是如何运行的。为了以本地模式运行topologies,我们需要下载Storm的开发依赖包,这是我们开发、测试topologies所需的所有东西。当我们建立自己的第一个Storm工程的时候我们很快就可以看到是怎么回事了。

提示:本地模式下运行一个topology同Storm集群中运行类似。然而,确保所有组件线程安全非常重要,因为当它们被部署到远程模式时,它们可能运行在不同的JVM或者不同的物理机器上,此时,它们之间不能直接交流或者共享内存。

在本章的所有示例中,我们都以本地模式运行。

远程模式
在远程模式下,我们将topology提交到Storm集群中,Storm集群由许多进程组成,这些进程通常运行在不同的机器上。远程模式下不显示调试信息,这也是它被认为是产品模式的原因。然而,在一台机器上创建一个Storm集群也是可能的,并且在部署至产品前这样做还是一个好方法,它可以确保将来在一个成熟的产品环境中运行topology不会出现任何问题。

Hello World Storm
在这个项目中,我们会建立一个简单的topology来统计单词个数,我们可以将它看成是Storm topologies中的“Hello World”。然而,它又是一个非常强大的topology,因为它几乎可以扩展到无限大小,并且经过小小的修改,我们甚至可以使用它创建一个统计系统。例如,我们可以修改本项目来找到Twitter上的热门话题。

为了建立这个topology,我们将使用一个spout来负责从文件中读取单词,第一个bolt来标准化单词,第二个bolt去统计单词个数,如图2-1所示:
1.jpg

topology入门图2-1.topology入门

你可以在https://github.com/storm-book/ex ... rted/zipball/master下载本例源码的ZIP文件。

译者的话:本站有备份:下载地址

提示:如果你使用git(一个分布式的版本控制和源码管理工具),则可以运行命令:git clone git@github.com:storm-book/examplesch02-getting_started.git进入你想要下载的源码所在的目录。

检查Java安装
搭建环境的第一步就是检查正在运行的Java版本。运行java -version命令,我们可以看到类似如下信息:
  1. java -version
  2. java version “1.6.0_26″
  3. Java(TM) SE Runtime Environment(build 1.6.0_26-b03)
  4. Java HotSpot(TM) Server VM (build 20.1-b02,mixed mode)
复制代码


如果没有,检查下你的Java安装。下载地址



创建项目
首先,创建一个文件夹,用于存放这个应用(就像对于任何Java应用一样),该文件夹包含了整个项目的源代码。
接着我们需要下载Storm的依赖包——添加到本应用classpath的jar包集合。可以通过下面两种方式完成:
下载依赖包,解压,并将它们加入classpath路径
使用Apache Maven
提示:Maven是一个软件项目管理工具,可以用于管理一个项目开发周期中的多个方面(从从依赖包到发布构建过程),在本书中我们会广泛使用Maven。可以使用mvn命令检查maven是否安装,如果未安装,可以从下载,地址

下一步我们需要新建一个pom.xml文件(pom:project object model,项目的对象模型)去定义项目的结构,该文件描述了依赖包、封装、源码等等。这里我们将使用由nathanmarz构建的依赖包和Maven库,这些依赖包可以在这里找到。

提示:Storm的Maven依赖包引用了在本地模式下运行Storm所需的所有函数库。

使用这些依赖包,我们可以写一个包含运行topology基本的必要组件的pom.xml文件:
  1. <project xmlns="http://maven.apache.org/POM/4.0.0"
  2.      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3.      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  4.     <modelVersion>4.0.0</modelVersion>
  5.     <groupId>storm.book</groupId>
  6.     <artifactId>Getting-Started</artifactId>
  7.     <version>0.0.1-SNAPSHOT</version>
  8.     <build>
  9.         <plugins>
  10.             <plugin>
  11.                 <groupId>org.apache.maven.plugins</groupId>
  12.                 <artifactId>maven-compiler-plugin</artifactId>
  13.                 <version>2.3.2</version>
  14.                 <configuration>
  15.                     <source>1.6</source>
  16.                     <target>1.6</target>
  17.                     <compilerVersion>1.6</compilerVersion>
  18.                 </configuration>
  19.             </plugin>
  20.         </plugins>
  21.     </build>
  22.     <repositories>
  23.         <!-- Repository where we can found the storm dependencies -->
  24.         <repository>
  25.             <id>clojars.org</id>
  26.             <url>http://clojars.org/repo</url>
  27.         </repository>
  28.     </repositories>
  29.     <dependencies>
  30.         <!-- Storm Dependency -->
  31.         <dependency>
  32.             <groupId>storm</groupId>
  33.             <artifactId>storm</artifactId>
  34.             <version>0.6.0</version>
  35.         </dependency>
  36.     </dependencies>
  37. </project>
复制代码


前几行指定了项目名称、版本;然后我们添加了一个编译器插件,该插件告诉Maven我们的代码应该用Java1.6编译;接着我们定义库(repository)(Maven支持同一个项目的多个库),clojars是Storm依赖包所在的库,Maven会自动下载本地模式运行Storm需要的所有子依赖包。

本项目的目录结构如下,它是一个典型的Maven Java项目。
1.jpg



java目录下的文件夹包含了我们的源代码,并且我们会将我们的单词文件放到resources文件夹中来处理。

创建第一个topology
为建立我们第一个topology,我们要创建运行本例(统计单词个数)的所有的类。本阶段例子中的有些部分不清楚很正常,我们将在接下来的几个章节中进一步解释它们。

Spout(WordReader类)
WordReader类实现了IRichSpout接口,该类负责读取文件并将每一行发送到一个bolt中去。
提示:spout发送一个定义字段(field)的列表,这种架构允许你有多种bolt读取相同的spout流,然后这些bolt可以定义字段(field)供其他bolt消费。
例2-1包含WordReader类的完整代码(后面会对代码中的每个部分进行分析)
例2-1.src/main/java/spouts/WordReader.java
  1. package spouts;
  2. import java.io.BufferedReader;
  3. import java.io.FileNotFoundException;
  4. import java.io.FileReader;
  5. import java.util.Map;
  6. import backtype.storm.spout.SpoutOutputCollector;
  7. import backtype.storm.task.TopologyContext;
  8. import backtype.storm.topology.IRichSpout;
  9. import backtype.storm.topology.OutputFieldsDeclarer;
  10. import backtype.storm.tuple.Fields;
  11. import backtype.storm.tuple.Values;
  12. public class WordReader implements IRichSpout{
  13.     private SpoutOutputCollector collector;
  14.     private FileReader fileReader;
  15.     private boolean completed=false;
  16.     private TopologyContext context;
  17.     public boolean isDistributed(){return false;}
  18.     public void ack(Object msgId) {
  19.         System.out.println("OK:"+msgId);
  20.     }
  21.     public void close(){}
  22.     public void fail(Object msgId) {
  23.         System.out.println("FAIL:"+msgId);
  24.     }
  25.     /**
  26.     * 该方法用于读取文件并发送文件中的每一行
  27.     */
  28.     public void nextTuple() {
  29.     /**
  30.     * The nextuple it is called forever, so if we have beenreaded the file
  31.     * we will wait and then return
  32.     */
  33.         if(completed){
  34.             try {
  35.             Thread.sleep(1000);
  36.             } catch(InterruptedException e) {
  37.             //Do nothing
  38.             }
  39.             return;
  40.         }
  41.         String str;
  42.         //Open the reader
  43.         BufferedReader reader =new BufferedReader(fileReader);
  44.         try{
  45.         //Read all lines
  46.             while((str=reader.readLine())!=null){
  47.         /**
  48.         * By each line emmit a new value with the line as a their
  49.         */
  50.                 this.collector.emit(new Values(str),str);
  51.             }
  52.         }catch(Exception e){
  53.             throw new RuntimeException("Errorreading tuple",e);
  54.         }finally{
  55.             completed = true;
  56.         }
  57.     }
  58.     /**
  59.     * We will create the file and get the collector object
  60.     */
  61.     public void open(Map conf,TopologyContext context,SpoutOutputCollector collector) {
  62.         try {
  63.             this.context=context;
  64.             this.fileReader=new FileReader(conf.get("wordsFile").toString());
  65.         } catch(FileNotFoundException e) {
  66.             throw new RuntimeException("Errorreading file["+conf.get("wordFile")+"]");
  67.         }
  68.         this.collector=collector;
  69.     }
  70.     /**
  71.     * 声明输出字段“line”
  72.     */
  73.     public void declareOutputFields(OutputFieldsDeclarer declarer) {
  74.         declarer.declare(new Fields("line"));
  75.     }
  76. }
复制代码

在任何spout中调用的第一个方法都是open()方法,该方法接收3个参数:

TopologyContext:它包含了所有的topology数据
conf对象:在topology定义的时候被创建
SpoutOutputCollector:该类的实例可以让我们发送将被bolts处理的数据。
下面的代码块是open()方法的实现:
  1. public void open(Map conf,TopologyContext context,SpoutOutputCollector collector) {
  2.     try {
  3.         this.context=context;
  4.         this.fileReader=new FileReader(conf.get("wordsFile").toString());
  5.     } catch(FileNotFoundException e) {
  6.         throw new RuntimeException("Errorreading file["+conf.get("wordFile")+"]");
  7.     }
  8.     this.collector=collector;
  9. }
复制代码


在open()方法中,我们也创建了reader,它负责读文件。接着,我们需要实现nextTuple()方法,在该方法中发送要被bolt处理的值(values)。在我们的例子中,这个方法读文件并且每行发送一个值。
  1. public void nextTuple() {
  2.     if(completed){
  3.         try {
  4.         Thread.sleep(1000);
  5.         } catch(InterruptedException e) {
  6.         //Do nothing
  7.         }
  8.         return;
  9.     }
  10.     String str;
  11.     //Open the reader
  12.     BufferedReader reader =new BufferedReader(fileReader);
  13.     try{
  14.     //Read all lines
  15.         while((str=reader.readLine())!=null){
  16.     /**
  17.     * By each line emmit a new value with the line as a their
  18.     */
  19.             this.collector.emit(new Values(str),str);
  20.         }
  21.     }catch(Exception e){
  22.         throw new RuntimeException("Errorreading tuple",e);
  23.     }finally{
  24.         completed = true;
  25.     }
  26. }
复制代码

提示:Values类是ArrayList的一个实现,将列表中的元素传递到构造方法中。

nextTuple()方法被周期性地调用(和ack()、fail()方法相同的循环),当没有工作要做时,nextTuple()方法必须释放对线程的控制,以便其他的方法有机会被调用。因此必须在nextTuple()第一行检查处理是否完成,如果已经完成,在返回前至少应该休眠1秒来降低处理器的负载,如果还有工作要做,则将文件中的每一行读取为一个值并发送出去。

提示:元组(tuple)是一个值的命名列表,它可以是任何类型的Java对象(只要这个对象是可以序列化的)。默认情况下,Storm可以序列化的常用类型有strings、byte arrays、ArrayList、HashMap和HashSet。


Bolt(WordNormalizer&WordCounter类)
上面我们设计了一个spout来读取文件,并且每读取一行发送一个元组(tuple)。现在,我们需要创建两个bolt处理这些元组(见图2-1)。这些bolt实现了IRichBolt接口。
在bolt中,最重要的方法是execute()方法,每当bolt收到一个元组,该方法就会被调用一次,对于每个收到的元组,该bolt处理完之后又会发送几个bolt。
提示:一个spout或bolt可以发送多个tuple,当nextTuple()或execute()方法被调用时,它们可以发送0、1或者多个元组。在第五章中你将会了解到更多。
第一个bolt,WordNormalizer,负责接收每一行,并且将行标准化——它将行分解为一个个的单词后转化成小写,并且消除单词前后的空格。
首先,我们需要声明bolt的输出参数:
  1. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  2.     declarer.declare(new Fields("word"));
  3. }
复制代码


这儿,我们声明bolt发送一个命名为“word”的字段。
接着,我们实现execute方法,输入的tuple将会在这个方法中被处理:
  1. public void execute(Tuple input) {
  2.     String sentence = input.getString(0);
  3.     String[]words= sentence.split(" ");
  4.     for(String word:words){
  5.         word =word.trim();
  6.         if(!word.isEmpty()){
  7.             word =word.toLowerCase();
  8.             //Emit the word
  9.             List a =new ArrayList();
  10.             a.add(input);
  11.             collector.emit(a,new Values(word));
  12.         }
  13.     }
  14.     // Acknowledge the tuple
  15.     collector.ack(input);
  16. }
复制代码


第一行读取元组中的值,可以按照位置或者字段命名读取。值被处理后使用collector对象发送出去。当每个元组被处理完之后,就会调用collector的ack()方法,表明该tuple成功地被处理。如果tuple不能被处理,则应该调用collector的fail()方法。

例2-2包含这个类的完整代码。
例2-2.src/main/java/bolts/WordNormalizer.java
  1. package bolts;
  2. import java.util.ArrayList;
  3. import java.util.List;
  4. import java.util.Map;
  5. import backtype.storm.task.OutputCollector;
  6. import backtype.storm.task.TopologyContext;
  7. import backtype.storm.topology.IRichBolt;
  8. import backtype.storm.topology.OutputFieldsDeclarer;
  9. import backtype.storm.tuple.Fields;
  10. import backtype.storm.tuple.Tuple;
  11. import backtype.storm.tuple.Values;
  12. public class WordNormalizer implements IRichBolt{
  13.     private OutputCollector collector;
  14.     public void cleanup(){}
  15.     /**
  16.     * The bolt will receive the line from the
  17.     * words file and process it to Normalize this line
  18.     *
  19.     * The normalize will be put the words in lower case
  20.     * and split the line to get all words in this
  21.     */
  22.     public void execute(Tuple input) {
  23.         String sentence = input.getString(0);
  24.         String[]words= sentence.split(" ");
  25.         for(String word:words){
  26.             word =word.trim();
  27.             if(!word.isEmpty()){
  28.                 word =word.toLowerCase();
  29.                 //Emit the word
  30.                 List a =new ArrayList();
  31.                 a.add(input);
  32.                 collector.emit(a,new Values(word));
  33.             }
  34.         }
  35.         // Acknowledge the tuple
  36.         collector.ack(input);
  37.     }
  38.     public void prepare(Map stormConf,TopologyContext context,OutputCollector collector) {
  39.         this.collector=collector;
  40.     }
  41.     /**
  42.     * The bolt will only emit the field "word"
  43.     */
  44.     public void declareOutputFields(OutputFieldsDeclarer declarer) {
  45.         declarer.declare(new Fields("word"));
  46.     }
  47. }
复制代码

提示:在这个类中,每调用一次execute()方法,会发送多个元组。例如,当execute()方法收到“This is the Storm book”这个句子时,该方法会发送5个新元组。

第二个bolt,WordCounter,负责统计每个单词个数。当topology结束时(cleanup()方法被调用时),显示每个单词的个数。
提示:第二个bolt中什么也不发送,本例中,将数据添加到一个map对象中,但是现实生活中,bolt可以将数据存储到一个数据库中。
  1. package bolts;
  2. import java.util.HashMap;
  3. import java.util.Map;
  4. import backtype.storm.task.OutputCollector;
  5. import backtype.storm.task.TopologyContext;
  6. import backtype.storm.topology.IRichBolt;
  7. import backtype.storm.topology.OutputFieldsDeclarer;
  8. import backtype.storm.tuple.Tuple;
  9. public class WordCounter implements IRichBolt{
  10.     Integer id;
  11.     String name;
  12.     Map<String,Integer>counters;
  13.     private OutputCollector collector;
  14.     /**
  15.     * At the end of the spout (when the cluster is shutdown
  16.     * We will show the word counters
  17.     */
  18.     @Override
  19.     public void cleanup(){
  20.         System.out.println("-- Word Counter ["+name+"-"+id+"]--");
  21.         for(Map.Entry<String,Integer>entry: counters.entrySet()){
  22.             System.out.println(entry.getKey()+": "+entry.getValue());
  23.         }
  24.     }
  25.     /**
  26.     * On each word We will count
  27.     */
  28.     @Override
  29.     public void execute(Tuple input) {
  30.         String str =input.getString(0);
  31.         /**
  32.         * If the word dosn't exist in the map we will create
  33.         * this, if not We will add 1
  34.         */
  35.         if(!counters.containsKey(str)){
  36.             counters.put(str,1);
  37.         }else{
  38.             Integer c =counters.get(str) +1;
  39.             counters.put(str,c);
  40.         }
  41.         //Set the tuple as Acknowledge
  42.         collector.ack(input);
  43.     }
  44.     /**
  45.     * On create
  46.     */
  47.     @Override
  48.     public void prepare(Map stormConf,TopologyContext context,OutputCollector collector) {
  49.         this.counters=newHashMap<String,Integer>();
  50.         this.collector=collector;
  51.         this.name=context.getThisComponentId();
  52.         this.id=context.getThisTaskId();
  53.     }
  54.     @Override
  55.     public void declareOutputFields(OutputFieldsDeclarer declarer) {}
  56. }
复制代码

execute()方法使用一个映射(Map类型)采集单词并统计这些单词个数。当topology结束的时候,cleanup()方法被调用并且打印出counter映射。(这仅仅是个例子,通常情况下,当topology关闭时,你应该使用cleanup()方法关闭活动链接和其他资源。)

主类
在主类中,你将创建topology和一个LocalCluster对象,LocalCluster对象使你可以在本地测试和调试topology。LocalCluster结合Config对象允许你尝试不同的集群配置。例如,如果不慎使用一个全局变量或者类变量,当配置不同数量的worker测试topology的时候,你将会发现这个错误。(关于config对象在第三章会有更多介绍)

提示:所有的topology结点应该可以在进程间没有数据共享的情形下独立运行(也就是说没有全局或者类变量),因为当topology运行在一个真实的集群上时,这些进程可能运行在不同的机器上。

你将使用TopologyBuilder创建topology,TopologyBuilder会告诉Storm怎么安排节点顺序、它们怎么交换数据。
  1. TopologyBuilder builder =new TopologyBuilder();
  2. builder.setSpout("word-reader",new WordReader());
  3. builder.setBolt("word-normalizer",new WordNormalizer()).shuffleGrouping("word-reader");
  4. builder.setBolt("word-counter",new WordCounter(),2).fieldsGrouping("word-normalizer",new Fields("word"));
复制代码


本例中spout和bolt之间使用随机分组(shuffleGrouping)连接,这种分组类型告诉Storm以随机分布的方式从源节点往目标节点发送消息。

接着,创建一个包含topology配置信息的Config对象,该配置信息在运行时会与集群配置信息合并,并且通过prepare()方法发送到所有节点。
  1. Config conf =new Config();
  2. conf.put("wordsFile",args[0]);
  3. conf.setDebug(false);
复制代码


将wordFile属性设置为将要被spout读取的文件名称(文件名在args参数中传入),并将debug属性设置为true,因为你在开发过程中,当debug为true时,Storm会打印节点间交换的所有消息和其他调试数据,这些信息有助于理解topology是如何运行的。

前面提到,你将使用LocalCluster来运行topology。在一个产品环境中,topology会持续运行,但是在本例中,你仅需运行topology几秒钟就能看到结果。
  1. LocalCluster cluster =new LocalCluster();
  2. cluster.submitTopology("Getting-Started-Toplogie",conf,builder.createTopology());
  3. Thread.sleep(1000);
  4. cluster.shutdown();
复制代码

使用createTopology和submitTopology创建、运行topology,睡眠两秒(topology运行在不同的线程中),然后通过关闭集群来停止topology。

例2-3将上面代码拼凑到一起。
例2-3.src/main/java/TopologyMain.java
  1. import spouts.WordReader;
  2. import bolts.WordCounter;
  3. import bolts.WordNormalizer;
  4. import backtype.storm.Config;
  5. import backtype.storm.LocalCluster;
  6. import backtype.storm.topology.TopologyBuilder;
  7. import backtype.storm.tuple.Fields;
  8. public class TopologyMain{
  9.     public static void main(String[]args)throws InterruptedException{
  10.     //Topology definition
  11.         TopologyBuilder builder =new TopologyBuilder();
  12.         builder.setSpout("word-reader",new WordReader());
  13.         builder.setBolt("word-normalizer",new WordNormalizer()).shuffleGrouping("word-reader");
  14.         builder.setBolt("word-counter",new WordCounter(),2).fieldsGrouping("word-normalizer",new Fields("word"));
  15.     //Configuration
  16.         Config conf =new Config();
  17.         conf.put("wordsFile",args[0]);
  18.         conf.setDebug(false);
  19.     //Topology run
  20.         conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING,1);
  21.         LocalCluster cluster =new LocalCluster();
  22.         cluster.submitTopology("Getting-Started-Toplogie",conf,builder.createTopology());
  23.         Thread.sleep(1000);
  24.         cluster.shutdown();
  25.     }
  26. }
复制代码


运行本项目
现在开始准备运行第一个topology!如果你新建一个文本文件(src/main/resources/words.txt)并且每行一个单词,则可以通过如下命令运行这个topology:
  1. mvn exec:java -Dexec.mainClass=”TopologyMain” -Dexec.args=”src/main/resources/words.txt”
复制代码

例如,如果你使用如下words.txt文件:
  1. Storm
  2. test
  3. are
  4. great
  5. is
  6. an
  7. Storm
  8. simple
  9. application
  10. but
  11. very
  12. powerful
  13. really
  14. Storm
  15. is
  16. great
复制代码

在日志中,你将会看到类似如下信息:
  1. is: 2
  2. application: 1
  3. but: 1
  4. great: 1
  5. test: 1
  6. simple: 1
  7. Storm: 3
  8. really: 1
  9. are: 1
  10. great: 1
  11. an: 1
  12. powerful: 1
  13. very: 1
复制代码

在本例中,你只使用了每个结点的一个单一实例,假如此时有一个非常大的日志文件怎么去统计每个单词的个数?此时可以很方便地改系统中节点数量来并行工作,如创建WordCounter的两个实例:
  1. builder.setBolt("word-counter",new WordCounter(),2).shuffleGrouping("word-normalizer");
复制代码

重新运行这个程序,你将看到:
  1. – Word Counter [word-counter-2] –
  2. application: 1
  3. is: 1
  4. great: 1
  5. are: 1
  6. powerful: 1
  7. Storm: 3
  8. – Word Counter [word-counter-3] –
  9. really: 1
  10. is: 1
  11. but: 1
  12. great: 1
  13. test: 1
  14. simple: 1
  15. an: 1
  16. very: 1
复制代码

太棒了!改变并行度,so easy(当然,在实际生活中,每个实例运行在不同的机器中)。但仔细一看似乎还有点问题:“is”和“great”这两个单词在每个WordCounter实例中都被计算了一次。Why?当使用随机分组(shuffleGrouping)时,Storm以随机分布的方式向每个bolt实例发送每条消息。在这个例子中,将相同的单词发送到同一个WordCounter实例是更理想的。为了实现这个,你可以将shuffleGrounping(“word-normalizer”)改成fieldsGrouping(“word-normalizer”,new Fields(“word”))。尝试一下并重新运行本程序来确认结果。后面的章节你将看到更多关于分组和消息流的内容。

总结
本章我们讨论了Storm的本地操作模式和远程操作模式的不同,以及用Storm开发的强大和简便。同时也学到了更多关于Storm的基本概念,我们将在接下来的章节深入解释这些概念。



上一篇:Storm入门指南第一章 基础知识





最后,感谢原作者的分享:东风化宇

已有(4)人评论

跳转到指定楼层
quenlang 发表于 2014-10-10 17:52:23
讲的很好,很适合入门,感谢楼主的分享
回复

使用道具 举报

datapro 发表于 2015-6-15 21:43:15
帮了大忙,大爱你!
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条