分享

solr 导入csv文件

desehawk 2015-3-18 12:42:34 发表于 实操演练 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 24010

问题导读

1.使用CSVRequestHandler过程中,如何避免空行出现的异常?
2.solr是如何导入csv文件的?
3.字段里面本来就有逗号符号等,是如何解决的?







今天想用DIH导入csv文件,于是数据源用FileDataSource+自定义转换器粗略实现了一下
  1. package com.besttone.transformer;
  2. import java.util.Map;
  3. public class CsvTransformer {
  4.         // 参考资料 http://wiki.apache.org/solr/DIHCustomTransformer
  5.         public Object transformRow(Map<String, Object> row) {
  6.                 // TODO Auto-generated method stub
  7.                 Object rawLine = row.get("rawLine");
  8.                 if (rawLine != null) {
  9.                         String[] props = rawLine.toString().split(",");
  10.                         row.put("id", props[0]);
  11.                         row.put("name", props[1]);
  12.                 }
  13.                 return row;
  14.         }
  15. }
复制代码
发现很多问题,比如字段里面本来就有逗号符号等等,用这个粗略的转换器肯定是无法实现了,于是继续找文档发现solr自带了一个CSVRequestHandler,不过默认在solrconfig.xml里是没有配这个requestHandler的,于是先配一个:
  1.   <!-- CSV update handler, loaded on demand -->
  2.   <requestHandler name="/update/csv" class="solr.CSVRequestHandler" startup="lazy">
  3.   </requestHandler>
复制代码
这样在浏览器里输入URL:http://localhost:8088/solr-src/csv-core/update/csv?stream.file=D:/dpimport/test_data2.csv&stream.contentType=text/plain;charset=utf-8&fieldnames=id,name&commit=true
就能将csv文件导入进去了,我的csv文件有两个字段一个id,一个name,做了点测试数据如:
1,aaa
2,bbb
...
连续行导入当然没有问题,当中间有空行时,office 的 csv 文件会变成:
1,aaa

2,bbb
也就是空行会有一个逗号,然后导入的时候恰巧ID字段的FiledSchema是唯一不能为空的,会导致创建索引文件的时候出异常,于是我将CSVRequestHandler源码扩展了一下,我增加了一个参数emptyLine,然后在load方法中加了一个逻辑:

  1.         //是否支持空数据行
  2.         if(emptyLine)
  3.         {
  4.                 int totalLength=0;
  5.                 for (int i = 0; i < vals.length; i++) {
  6.                         totalLength += vals[i].length();
  7.                 }
  8.                 if (totalLength==0)
  9.                 {
  10.                         continue;
  11.                 }
  12.         }
复制代码
修改后的CSVRequestHandler如下:
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one or more
  3. * contributor license agreements.  See the NOTICE file distributed with
  4. * this work for additional information regarding copyright ownership.
  5. * The ASF licenses this file to You under the Apache License, Version 2.0
  6. * (the "License"); you may not use this file except in compliance with
  7. * the License.  You may obtain a copy of the License at
  8. *
  9. *     http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. */
  17. package org.apache.solr.handler;
  18. import org.apache.solr.request.SolrQueryRequest;
  19. import org.apache.solr.response.SolrQueryResponse;
  20. import org.apache.solr.common.SolrException;
  21. import org.apache.solr.common.SolrInputDocument;
  22. import org.apache.solr.common.params.SolrParams;
  23. import org.apache.solr.common.params.UpdateParams;
  24. import org.apache.solr.common.util.StrUtils;
  25. import org.apache.solr.common.util.ContentStream;
  26. import org.apache.solr.schema.IndexSchema;
  27. import org.apache.solr.schema.SchemaField;
  28. import org.apache.solr.update.*;
  29. import org.apache.solr.update.processor.UpdateRequestProcessor;
  30. import org.apache.solr.internal.csv.CSVStrategy;
  31. import org.apache.solr.internal.csv.CSVParser;
  32. import org.apache.commons.io.IOUtils;
  33. import java.util.regex.Pattern;
  34. import java.util.List;
  35. import java.io.*;
  36. /**
  37. * @version $Id: CSVRequestHandler.java 1298169 2012-03-07 22:27:54Z uschindler $
  38. */
  39. public class CSVRequestHandler extends ContentStreamHandlerBase {
  40.   @Override
  41.   protected ContentStreamLoader newLoader(SolrQueryRequest req, UpdateRequestProcessor processor) {
  42.     return new SingleThreadedCSVLoader(req, processor);
  43.   }
  44.   //////////////////////// SolrInfoMBeans methods //////////////////////
  45.   @Override
  46.   public String getDescription() {
  47.     return "Add/Update multiple documents with CSV formatted rows";
  48.   }
  49.   @Override
  50.   public String getVersion() {
  51.     return "$Revision: 1298169 $";
  52.   }
  53.   @Override
  54.   public String getSourceId() {
  55.     return "$Id: CSVRequestHandler.java 1298169 2012-03-07 22:27:54Z uschindler $";
  56.   }
  57.   @Override
  58.   public String getSource() {
  59.     return "$URL: https://svn.apache.org/repos/asf/lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/CSVRequestHandler.java $";
  60.   }
  61. }
  62. abstract class CSVLoader extends ContentStreamLoader {
  63.   public static final String SEPARATOR="separator";
  64.   public static final String FIELDNAMES="fieldnames";
  65.   public static final String HEADER="header";
  66.   public static final String SKIP="skip";
  67.   public static final String SKIPLINES="skipLines";
  68.   public static final String MAP="map";
  69.   public static final String TRIM="trim";
  70.   public static final String EMPTY="keepEmpty";
  71.   public static final String SPLIT="split";
  72.   public static final String ENCAPSULATOR="encapsulator";
  73.   public static final String ESCAPE="escape";
  74.   public static final String OVERWRITE="overwrite";
  75. <span style="color:#ff6666;"> public static final String EMPTYLINE="emptyLine";//是否支持空数据行
  76. </span>
  77.   private static Pattern colonSplit = Pattern.compile(":");
  78.   private static Pattern commaSplit = Pattern.compile(",");
  79.   final IndexSchema schema;
  80.   final SolrParams params;
  81.   final CSVStrategy strategy;
  82.   final UpdateRequestProcessor processor;
  83.   String[] fieldnames;
  84.   SchemaField[] fields;
  85.   CSVLoader.FieldAdder[] adders;
  86.   int skipLines;    // number of lines to skip at start of file
  87.   <span style="color:#ff6666;">boolean emptyLine; //是否支持空数据行
  88. </span>  
  89.   final AddUpdateCommand templateAdd;
  90.   /** Add a field to a document unless it's zero length.
  91.    * The FieldAdder hierarchy handles all the complexity of
  92.    * further transforming or splitting field values to keep the
  93.    * main logic loop clean.  All implementations of add() must be
  94.    * MT-safe!
  95.    */
  96.   private class FieldAdder {
  97.     void add(SolrInputDocument doc, int line, int column, String val) {
  98.       if (val.length() > 0) {
  99.         doc.addField(fields[column].getName(),val,1.0f);
  100.       }
  101.     }
  102.   }
  103.   /** add zero length fields */
  104.   private class FieldAdderEmpty extends CSVLoader.FieldAdder {
  105.     @Override
  106.     void add(SolrInputDocument doc, int line, int column, String val) {
  107.       doc.addField(fields[column].getName(),val,1.0f);
  108.     }
  109.   }
  110.   /** trim fields */
  111.   private class FieldTrimmer extends CSVLoader.FieldAdder {
  112.     private final CSVLoader.FieldAdder base;
  113.     FieldTrimmer(CSVLoader.FieldAdder base) { this.base=base; }
  114.     @Override
  115.     void add(SolrInputDocument doc, int line, int column, String val) {
  116.       base.add(doc, line, column, val.trim());
  117.     }
  118.   }
  119.   /** map a single value.
  120.    * for just a couple of mappings, this is probably faster than
  121.    * using a HashMap.
  122.    */
  123. private class FieldMapperSingle extends CSVLoader.FieldAdder {
  124.    private final String from;
  125.    private final String to;
  126.    private final CSVLoader.FieldAdder base;
  127.    FieldMapperSingle(String from, String to, CSVLoader.FieldAdder base) {
  128.      this.from=from;
  129.      this.to=to;
  130.      this.base=base;
  131.    }
  132.     @Override
  133.     void add(SolrInputDocument doc, int line, int column, String val) {
  134.       if (from.equals(val)) val=to;
  135.       base.add(doc,line,column,val);
  136.     }
  137. }
  138.   /** Split a single value into multiple values based on
  139.    * a CSVStrategy.
  140.    */
  141.   private class FieldSplitter extends CSVLoader.FieldAdder {
  142.     private final CSVStrategy strategy;
  143.     private final CSVLoader.FieldAdder base;
  144.     FieldSplitter(CSVStrategy strategy, CSVLoader.FieldAdder base) {
  145.       this.strategy = strategy;
  146.       this.base = base;
  147.     }
  148.     @Override
  149.     void add(SolrInputDocument doc, int line, int column, String val) {
  150.       CSVParser parser = new CSVParser(new StringReader(val), strategy);
  151.       try {
  152.         String[] vals = parser.getLine();
  153.         if (vals!=null) {
  154.           for (String v: vals) base.add(doc,line,column,v);
  155.         } else {
  156.           base.add(doc,line,column,val);
  157.         }
  158.       } catch (IOException e) {
  159.         throw new SolrException( SolrException.ErrorCode.BAD_REQUEST,e);
  160.       }
  161.     }
  162.   }
  163.   String errHeader="CSVLoader:";
  164.   CSVLoader(SolrQueryRequest req, UpdateRequestProcessor processor) {
  165.     this.processor = processor;
  166.     this.params = req.getParams();
  167.     schema = req.getSchema();
  168.     templateAdd = new AddUpdateCommand();
  169.     templateAdd.allowDups=false;
  170.     templateAdd.overwriteCommitted=true;
  171.     templateAdd.overwritePending=true;
  172.     if (params.getBool(OVERWRITE,true)) {
  173.       templateAdd.allowDups=false;
  174.       templateAdd.overwriteCommitted=true;
  175.       templateAdd.overwritePending=true;
  176.     } else {
  177.       templateAdd.allowDups=true;
  178.       templateAdd.overwriteCommitted=false;
  179.       templateAdd.overwritePending=false;
  180.     }
  181.     templateAdd.commitWithin = params.getInt(UpdateParams.COMMIT_WITHIN, -1);
  182.    
  183.     strategy = new CSVStrategy(',', '"', CSVStrategy.COMMENTS_DISABLED, CSVStrategy.ESCAPE_DISABLED, false, false, false, true);
  184.     String sep = params.get(SEPARATOR);
  185.     if (sep!=null) {
  186.       if (sep.length()!=1) throw new SolrException( SolrException.ErrorCode.BAD_REQUEST,"Invalid separator:'"+sep+"'");
  187.       strategy.setDelimiter(sep.charAt(0));
  188.     }
  189.     String encapsulator = params.get(ENCAPSULATOR);
  190.     if (encapsulator!=null) {
  191.       if (encapsulator.length()!=1) throw new SolrException( SolrException.ErrorCode.BAD_REQUEST,"Invalid encapsulator:'"+encapsulator+"'");
  192.     }
  193.     String escape = params.get(ESCAPE);
  194.     if (escape!=null) {
  195.       if (escape.length()!=1) throw new SolrException( SolrException.ErrorCode.BAD_REQUEST,"Invalid escape:'"+escape+"'");
  196.     }
  197.     // if only encapsulator or escape is set, disable the other escaping mechanism
  198.     if (encapsulator == null && escape != null) {
  199.       strategy.setEncapsulator( CSVStrategy.ENCAPSULATOR_DISABLED);     
  200.       strategy.setEscape(escape.charAt(0));
  201.     } else {
  202.       if (encapsulator != null) {
  203.         strategy.setEncapsulator(encapsulator.charAt(0));
  204.       }
  205.       if (escape != null) {
  206.         char ch = escape.charAt(0);
  207.         strategy.setEscape(ch);
  208.         if (ch == '\\') {
  209.           // If the escape is the standard backslash, then also enable
  210.           // unicode escapes (it's harmless since 'u' would not otherwise
  211.           // be escaped.                    
  212.           strategy.setUnicodeEscapeInterpretation(true);
  213.         }
  214.       }
  215.     }
  216.     String fn = params.get(FIELDNAMES);
  217.     fieldnames = fn != null ? commaSplit.split(fn,-1) : null;
  218.     Boolean hasHeader = params.getBool(HEADER);
  219.     skipLines = params.getInt(SKIPLINES,0);
  220.     <span style="color:#ff6666;">emptyLine = params.getBool(EMPTYLINE, false);//扩展
  221. </span>   
  222.     if (fieldnames==null) {
  223.       if (null == hasHeader) {
  224.         // assume the file has the headers if they aren't supplied in the args
  225.         hasHeader=true;
  226.       } else if (!hasHeader) {
  227.         throw new SolrException( SolrException.ErrorCode.BAD_REQUEST,"CSVLoader: must specify fieldnames=<fields>* or header=true");
  228.       }
  229.     } else {
  230.       // if the fieldnames were supplied and the file has a header, we need to
  231.       // skip over that header.
  232.       if (hasHeader!=null && hasHeader) skipLines++;
  233.       prepareFields();
  234.     }
  235.   }
  236.   /** create the FieldAdders that control how each field  is indexed */
  237.   void prepareFields() {
  238.     // Possible future optimization: for really rapid incremental indexing
  239.     // from a POST, one could cache all of this setup info based on the params.
  240.     // The link from FieldAdder to this would need to be severed for that to happen.
  241.     fields = new SchemaField[fieldnames.length];
  242.     adders = new CSVLoader.FieldAdder[fieldnames.length];
  243.     String skipStr = params.get(SKIP);
  244.     List<String> skipFields = skipStr==null ? null : StrUtils.splitSmart(skipStr,',');
  245.     CSVLoader.FieldAdder adder = new CSVLoader.FieldAdder();
  246.     CSVLoader.FieldAdder adderKeepEmpty = new CSVLoader.FieldAdderEmpty();
  247.     for (int i=0; i<fields.length; i++) {
  248.       String fname = fieldnames[i];
  249.       // to skip a field, leave the entries in fields and addrs null
  250.       if (fname.length()==0 || (skipFields!=null && skipFields.contains(fname))) continue;
  251.       fields[i] = schema.getField(fname);
  252.       boolean keepEmpty = params.getFieldBool(fname,EMPTY,false);
  253.       adders[i] = keepEmpty ? adderKeepEmpty : adder;
  254.       // Order that operations are applied: split -> trim -> map -> add
  255.       // so create in reverse order.
  256.       // Creation of FieldAdders could be optimized and shared among fields
  257.       String[] fmap = params.getFieldParams(fname,MAP);
  258.       if (fmap!=null) {
  259.         for (String mapRule : fmap) {
  260.           String[] mapArgs = colonSplit.split(mapRule,-1);
  261.           if (mapArgs.length!=2)
  262.             throw new SolrException( SolrException.ErrorCode.BAD_REQUEST, "Map rules must be of the form 'from:to' ,got '"+mapRule+"'");
  263.           adders[i] = new CSVLoader.FieldMapperSingle(mapArgs[0], mapArgs[1], adders[i]);
  264.         }
  265.       }
  266.       if (params.getFieldBool(fname,TRIM,false)) {
  267.         adders[i] = new CSVLoader.FieldTrimmer(adders[i]);
  268.       }
  269.       if (params.getFieldBool(fname,SPLIT,false)) {
  270.         String sepStr = params.getFieldParam(fname,SEPARATOR);
  271.         char fsep = sepStr==null || sepStr.length()==0 ? ',' : sepStr.charAt(0);
  272.         String encStr = params.getFieldParam(fname,ENCAPSULATOR);
  273.         char fenc = encStr==null || encStr.length()==0 ? (char)-2 : encStr.charAt(0);
  274.         String escStr = params.getFieldParam(fname,ESCAPE);
  275.         char fesc = escStr==null || escStr.length()==0 ? CSVStrategy.ESCAPE_DISABLED : escStr.charAt(0);
  276.         CSVStrategy fstrat = new CSVStrategy(fsep,fenc,CSVStrategy.COMMENTS_DISABLED,fesc, false, false, false, false);
  277.         adders[i] = new CSVLoader.FieldSplitter(fstrat, adders[i]);
  278.       }
  279.     }
  280.   }
  281.   private void input_err(String msg, String[] line, int lineno) {
  282.     StringBuilder sb = new StringBuilder();
  283.     sb.append(errHeader).append(", line=").append(lineno).append(",").append(msg).append("\n\tvalues={");
  284.     for (String val: line) {
  285.       sb.append("'").append(val).append("',"); }
  286.     sb.append('}');
  287.     throw new SolrException( SolrException.ErrorCode.BAD_REQUEST,sb.toString());
  288.   }
  289.   private void input_err(String msg, String[] lines, int lineNo, Throwable e) {
  290.     StringBuilder sb = new StringBuilder();
  291.     sb.append(errHeader).append(", line=").append(lineNo).append(",").append(msg).append("\n\tvalues={");
  292.     if (lines != null) {
  293.       for (String val : lines) {
  294.         sb.append("'").append(val).append("',");
  295.       }
  296.     } else {
  297.       sb.append("NO LINES AVAILABLE");
  298.     }
  299.     sb.append('}');
  300.     throw new SolrException( SolrException.ErrorCode.BAD_REQUEST,sb.toString(), e);
  301.   }
  302.   /** load the CSV input */
  303.   @Override
  304.   public void load(SolrQueryRequest req, SolrQueryResponse rsp, ContentStream stream) throws IOException {
  305.     errHeader = "CSVLoader: input=" + stream.getSourceInfo();
  306.     Reader reader = null;
  307.    
  308.     try {
  309.       reader = stream.getReader();
  310.       if (skipLines>0) {
  311.         if (!(reader instanceof BufferedReader)) {
  312.           reader = new BufferedReader(reader);
  313.         }
  314.         BufferedReader r = (BufferedReader)reader;
  315.         for (int i=0; i<skipLines; i++) {
  316.           r.readLine();
  317.         }
  318.       }
  319.       CSVParser parser = new CSVParser(reader, strategy);
  320.       // parse the fieldnames from the header of the file
  321.       if (fieldnames==null) {
  322.         fieldnames = parser.getLine();
  323.         if (fieldnames==null) {
  324.           throw new SolrException( SolrException.ErrorCode.BAD_REQUEST,"Expected fieldnames in CSV input");
  325.         }
  326.         prepareFields();
  327.       }
  328.       // read the rest of the CSV file
  329.       for(;;) {
  330.         int line = parser.getLineNumber();  // for error reporting in MT mode
  331.         String[] vals = null;
  332.         try {
  333.           vals = parser.getLine();
  334.          
  335.         } catch (IOException e) {
  336.           //Catch the exception and rethrow it with more line information
  337.          input_err("can't read line: " + line, null, line, e);
  338.         }
  339.         if (vals==null) break;
  340.         <span style="color:#ff0000;">//是否支持空数据行
  341. </span><span style="color:#ff0000;">        if(emptyLine)
  342.         {
  343.                 int totalLength=0;
  344.                 for (int i = 0; i < vals.length; i++) {
  345.                         totalLength += vals[i].length();
  346.                 }
  347.                 if (totalLength==0)
  348.                 {
  349.                         continue;
  350.                 }
  351.         }
  352. </span>        
  353.         if (vals.length != fields.length) {
  354.           input_err("expected "+fields.length+" values but got "+vals.length, vals, line);
  355.         }
  356.         addDoc(line,vals);
  357.       }
  358.     } finally{
  359.       if (reader != null) {
  360.         IOUtils.closeQuietly(reader);
  361.       }
  362.     }
  363.   }
  364.   /** called for each line of values (document) */
  365.   abstract void addDoc(int line, String[] vals) throws IOException;
  366.   /** this must be MT safe... may be called concurrently from multiple threads. */
  367.   void doAdd(int line, String[] vals, SolrInputDocument doc, AddUpdateCommand template) throws IOException {
  368.     // the line number is passed simply for error reporting in MT mode.
  369.     // first, create the lucene document
  370.     for (int i=0; i<vals.length; i++) {
  371.       if (fields[i]==null) continue;  // ignore this field
  372.       String val = vals[i];
  373.       adders[i].add(doc, line, i, val);
  374.     }
  375.     template.solrDoc = doc;
  376.     processor.processAdd(template);
  377.   }
  378. }
  379. class SingleThreadedCSVLoader extends CSVLoader {
  380.   SingleThreadedCSVLoader(SolrQueryRequest req, UpdateRequestProcessor processor) {
  381.     super(req, processor);
  382.   }
  383.   @Override
  384.   void addDoc(int line, String[] vals) throws IOException {
  385.     templateAdd.indexedId = null;
  386.     SolrInputDocument doc = new SolrInputDocument();
  387.     doAdd(line, vals, doc, templateAdd);
  388.   }
  389. }
复制代码


这样在上面那个请求URL的基础上再&emptyLine=true,就能避免空行出现的异常了。

以上是针对3.6版本的solr的修改。不同版本不一定可行



没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条