`
weitao1026
  • 浏览: 993958 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

如何把Pig的结果存储到Solr中

pig 
阅读更多

如何把Pig的结果存储到Solr中,那么可能就会有朋友问了,为什么不存到数据库呢? 不支持还是? 其实只要我们愿意,我们可以存储它的结果集到任何地方,只需要重写我们自己的StoreFunc类即可。

关于如何将Pig分析完的结果存储到数据库,在pig的piggy贡献组织里,已经有了对应的UDF了,piggybank是非apache官方提供的工具函数,里面的大部分的UDF都是,其他公司或着个人在后来使用时贡献的,这些工具类,虽然没有正式划入pig的源码包里,但是pig每次发行的时候,都会以扩展库的形式附带,编译后会放在pig根目录下一个叫contrib的目录下,
piggybank的地址是
https://cwiki.apache.org/confluence/display/PIG/PiggyBank
,感兴趣的朋友们,可以看一看。

将pig分析完的结果存入到数据库,也是非常简单的,需要的条件有:


(1)piggybank.jar的jar包
(2)依赖数据库的对应的驱动jar


有一点需要注意下,在将结果存储到数据库之前,一定要确保有访问和写入数据库的权限,否则任务就会失败!
散仙在存储到远程的MySQL上,就是由于权限的问题,而写入失败了,具体的异常是这样描述的:

Java代码 复制代码 收藏代码
  1. Access denied for user 'root'@'localhost'   
Access denied for user 'root'@'localhost' 


当出现上面异常的时候,就意味着权限写入有问题,我们使用以下的授权方法,来给目标机赋予权限:
(1)允许所有的机器ip访问
GRANT ALL PRIVILEGES ON *.* TO 'myuser'@'%' IDENTIFIED BY 'mypassword' WITH GRANT OPTION; 
(2)允许指定的机器ip访问:
1. GRANT ALL PRIVILEGES ON *.* TO 'myuser'@'192.168.1.3' IDENTIFIED BY    'mypassword' WITH GRANT OPTION;  


确定有权限之后,我们就可以造一份数据,测试是否可以将HDFS上的数据存储到数据库中,测试数据如下:

Java代码 复制代码 收藏代码
  1. 1,2,3  
  2. 1,2,4  
  3. 2,2,4  
  4. 3,4,2  
  5. 8,2,4  
1,2,3
1,2,4
2,2,4
3,4,2
8,2,4


提前在对应的MySQL上,建库建表建字段,看下散仙测试表的结构:


最后,在来看下我们的pig脚本是如何定义和使用的:

Java代码 复制代码 收藏代码
  1. --注册数据库驱动包和piggybank的jar  
  2. register ./dependfiles/mysql-connector-java-5.1.23-bin.jar;  
  3. register ./dependfiles/piggybank.jar  
  4.   
  5. --为了能使schemal和数据库对应起来,建议在这个地方给数据加上列名  
  6. a = load '/tmp/dongliang/g.txt' using PigStorage(',') as (id:int,name:chararray,count:int) ;  
  7.   
  8.   
  9. --过滤出id大于2的数据  
  10.   
  11. a = filter a by id > 2;  
  12.   
  13. --存储结果到数据库里  
  14. STORE a INTO '/tmp/dbtest' using org.apache.pig.piggybank.storage.DBStorage('com.mysql.jdbc.Driver''jdbc:mysql://192.168.146.63/user''root''pwd',  
  15.     'INSERT into pig(id,name,count) values (?,?,?)');  
  16. ~                                                             
--注册数据库驱动包和piggybank的jar
register ./dependfiles/mysql-connector-java-5.1.23-bin.jar;
register ./dependfiles/piggybank.jar

--为了能使schemal和数据库对应起来,建议在这个地方给数据加上列名
a = load '/tmp/dongliang/g.txt' using PigStorage(',') as (id:int,name:chararray,count:int) ;


--过滤出id大于2的数据

a = filter a by id > 2;

--存储结果到数据库里
STORE a INTO '/tmp/dbtest' using org.apache.pig.piggybank.storage.DBStorage('com.mysql.jdbc.Driver', 'jdbc:mysql://192.168.146.63/user', 'root', 'pwd',
    'INSERT into pig(id,name,count) values (?,?,?)');
~                                                           



执行成功后,我们再去查看数据库发现已经将pig处理后的数据正确的写入到了数据库中:



最后,附上DBStore类的源码:

Java代码 复制代码 收藏代码
  1. /* 
  2.  * Licensed to the Apache Software Foundation (ASF) under one 
  3.  * or more contributor license agreements.  See the NOTICE file 
  4.  * distributed with this work for additional information 
  5.  * regarding copyright ownership.  The ASF licenses this file 
  6.  * to you under the Apache License, Version 2.0 (the 
  7.  * "License"); you may not use this file except in compliance 
  8.  * with the License.  You may obtain a copy of the License at 
  9.  * 
  10.  *     http://www.apache.org/licenses/LICENSE-2.0 
  11.  * 
  12.  * Unless required by applicable law or agreed to in writing, software 
  13.  * distributed under the License is distributed on an "AS IS" BASIS, 
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
  15.  * See the License for the specific language governing permissions and 
  16.  * limitations under the License. 
  17.  */  
  18. package org.apache.pig.piggybank.storage;  
  19.   
  20. import org.joda.time.DateTime;  
  21.   
  22. import org.apache.commons.logging.Log;  
  23. import org.apache.commons.logging.LogFactory;  
  24. import org.apache.hadoop.io.NullWritable;  
  25. import org.apache.hadoop.mapreduce.Job;  
  26. import org.apache.hadoop.mapreduce.JobContext;  
  27. import org.apache.hadoop.mapreduce.OutputCommitter;  
  28. import org.apache.hadoop.mapreduce.OutputFormat;  
  29. import org.apache.hadoop.mapreduce.RecordWriter;  
  30. import org.apache.hadoop.mapreduce.TaskAttemptContext;  
  31. import org.apache.pig.StoreFunc;  
  32. import org.apache.pig.backend.executionengine.ExecException;  
  33. import org.apache.pig.data.DataByteArray;  
  34. import org.apache.pig.data.DataType;  
  35. import org.apache.pig.data.Tuple;  
  36.   
  37. import java.io.IOException;  
  38. import java.sql.*;  
  39.   
  40. public class DBStorage extends StoreFunc {  
  41.   private final Log log = LogFactory.getLog(getClass());  
  42.   
  43.   private PreparedStatement ps;  
  44.   private Connection con;  
  45.   private String jdbcURL;  
  46.   private String user;  
  47.   private String pass;  
  48.   private int batchSize;  
  49.   private int count = 0;  
  50.   private String insertQuery;  
  51.   
  52.   public DBStorage(String driver, String jdbcURL, String insertQuery) {  
  53.     this(driver, jdbcURL, nullnull, insertQuery, "100");  
  54.   }  
  55.   
  56.   public DBStorage(String driver, String jdbcURL, String user, String pass,  
  57.       String insertQuery) throws SQLException {  
  58.     this(driver, jdbcURL, user, pass, insertQuery, "100");  
  59.   }  
  60.   
  61.   public DBStorage(String driver, String jdbcURL, String user, String pass,  
  62.       String insertQuery, String batchSize) throws RuntimeException {  
  63.     log.debug("DBStorage(" + driver + "," + jdbcURL + "," + user + ",XXXX,"  
  64.         + insertQuery + ")");  
  65.     try {  
  66.       Class.forName(driver);  
  67.     } catch (ClassNotFoundException e) {  
  68.       log.error("can't load DB driver:" + driver, e);  
  69.       throw new RuntimeException("Can't load DB Driver", e);  
  70.     }  
  71.     this.jdbcURL = jdbcURL;  
  72.     this.user = user;  
  73.     this.pass = pass;  
  74.     this.insertQuery = insertQuery;  
  75.     this.batchSize = Integer.parseInt(batchSize);  
  76.   }  
  77.   
  78.   /** 
  79.    * Write the tuple to Database directly here. 
  80.    */  
  81.   public void putNext(Tuple tuple) throws IOException {  
  82.     int sqlPos = 1;  
  83.     try {  
  84.       int size = tuple.size();  
  85.       for (int i = 0; i < size; i++) {  
  86.         try {  
  87.           Object field = tuple.get(i);  
  88.   
  89.           switch (DataType.findType(field)) {  
  90.           case DataType.NULL:  
  91.             ps.setNull(sqlPos, java.sql.Types.VARCHAR);  
  92.             sqlPos++;  
  93.             break;  
  94.   
  95.           case DataType.BOOLEAN:  
  96.             ps.setBoolean(sqlPos, (Boolean) field);  
  97.             sqlPos++;  
  98.             break;  
  99.   
  100.           case DataType.INTEGER:  
  101.             ps.setInt(sqlPos, (Integer) field);  
  102.             sqlPos++;  
  103.             break;  
  104.   
  105.           case DataType.LONG:  
  106.             ps.setLong(sqlPos, (Long) field);  
  107.             sqlPos++;  
  108.             break;  
  109.   
  110.           case DataType.FLOAT:  
  111.             ps.setFloat(sqlPos, (Float) field);  
  112.             sqlPos++;  
  113.             break;  
  114.   
  115.           case DataType.DOUBLE:  
  116.             ps.setDouble(sqlPos, (Double) field);  
  117.             sqlPos++;  
  118.             break;  
  119.   
  120.           case DataType.DATETIME:  
  121.             ps.setDate(sqlPos, new Date(((DateTime) field).getMillis()));  
  122.             sqlPos++;  
  123.             break;  
  124.   
  125.           case DataType.BYTEARRAY:  
  126.             byte[] b = ((DataByteArray) field).get();  
  127.             ps.setBytes(sqlPos, b);  
  128.   
  129.             sqlPos++;  
  130.             break;  
  131.           case DataType.CHARARRAY:  
  132.             ps.setString(sqlPos, (String) field);  
  133.             sqlPos++;  
  134.             break;  
  135.           case DataType.BYTE:  
  136.             ps.setByte(sqlPos, (Byte) field);  
  137.             sqlPos++;  
  138.             break;  
  139.   
  140.           case DataType.MAP:  
  141.           case DataType.TUPLE:  
  142.           case DataType.BAG:  
  143.             throw new RuntimeException("Cannot store a non-flat tuple "  
  144.                 + "using DbStorage");  
  145.   
  146.           default:  
  147.             throw new RuntimeException("Unknown datatype "  
  148.                 + DataType.findType(field));  
  149.   
  150.           }  
  151.   
  152.         } catch (ExecException ee) {  
  153.           throw new RuntimeException(ee);  
  154.         }  
  155.   
  156.       }  
  157.       ps.addBatch();  
  158.       count++;  
  159.       if (count > batchSize) {  
  160.         count = 0;  
  161.         ps.executeBatch();  
  162.         ps.clearBatch();  
  163.         ps.clearParameters();  
  164.       }  
  165.     } catch (SQLException e) {  
  166.       try {  
  167.         log  
  168.             .error("Unable to insert record:" + tuple.toDelimitedString("\t"),  
  169.                 e);  
  170.       } catch (ExecException ee) {  
  171.         // do nothing  
  172.       }  
  173.       if (e.getErrorCode() == 1366) {  
  174.         // errors that come due to utf-8 character encoding  
  175.         // ignore these kind of errors TODO: Temporary fix - need to find a  
  176.         // better way of handling them in the argument statement itself  
  177.       } else {  
  178.         throw new RuntimeException("JDBC error", e);  
  179.       }  
  180.     }  
  181.   }  
  182.   
  183.   class MyDBOutputFormat extends OutputFormat<NullWritable, NullWritable> {  
  184.   
  185.     @Override  
  186.     public void checkOutputSpecs(JobContext context) throws IOException,  
  187.         InterruptedException {  
  188.       // IGNORE  
  189.     }  
  190.   
  191.     @Override  
  192.     public OutputCommitter getOutputCommitter(TaskAttemptContext context)  
  193.         throws IOException, InterruptedException {  
  194.       return new OutputCommitter() {  
  195.   
  196.         @Override  
  197.         public void abortTask(TaskAttemptContext context) throws IOException {  
  198.           try {  
  199.             if (ps != null) {  
  200.               ps.close();  
  201.             }  
  202.             if (con != null) {  
  203.               con.rollback();  
  204.               con.close();  
  205.             }  
  206.           } catch (SQLException sqe) {  
  207.             throw new IOException(sqe);  
  208.           }  
  209.         }  
  210.   
  211.         @Override  
  212.         public void commitTask(TaskAttemptContext context) throws IOException {  
  213.           if (ps != null) {  
  214.             try {  
  215.               ps.executeBatch();  
  216.               con.commit();  
  217.               ps.close();  
  218.               con.close();  
  219.               ps = null;  
  220.               con = null;  
  221.             } catch (SQLException e) {  
  222.               log.error("ps.close", e);  
  223.               throw new IOException("JDBC Error", e);  
  224.             }  
  225.           }  
  226.         }  
  227.   
  228.         @Override  
  229.         public boolean needsTaskCommit(TaskAttemptContext context)  
  230.             throws IOException {  
  231.           return true;  
  232.         }  
  233.   
  234.         @Override  
  235.         public void cleanupJob(JobContext context) throws IOException {  
  236.           // IGNORE  
  237.         }  
  238.   
  239.         @Override  
  240.         public void setupJob(JobContext context) throws IOException {  
  241.           // IGNORE  
  242.         }  
  243.   
  244.         @Override  
  245.         public void setupTask(TaskAttemptContext context) throws IOException {  
  246.           // IGNORE  
  247.         }  
  248.       };  
  249.     }  
  250.   
  251.     @Override  
  252.     public RecordWriter<NullWritable, NullWritable> getRecordWriter(  
  253.         TaskAttemptContext context) throws IOException, InterruptedException {  
  254.       // We don't use a record writer to write to database  
  255.         return new RecordWriter<NullWritable, NullWritable>() {  
  256.                   @Override  
  257.                   public void close(TaskAttemptContext context) {  
  258.                       // Noop  
  259.                       }  
  260.                       @Override  
  261.                       public void write(NullWritable k, NullWritable v) {  
  262.                           // Noop  
  263.                       }  
  264.                   };  
  265.     }  
  266.   
  267.   }  
  268.   
  269.   @SuppressWarnings("unchecked")  
  270.   @Override  
  271.   public OutputFormat getOutputFormat()  
  272.       throws IOException {  
  273.     return new MyDBOutputFormat();  
  274.   }  
  275.   
  276.   /** 
  277.    * Initialise the database connection and prepared statement here. 
  278.    */  
  279.   @SuppressWarnings("unchecked")  
  280.   @Override  
  281.   public void prepareToWrite(RecordWriter writer)  
  282.       throws IOException {  
  283.     ps = null;  
  284.     con = null;  
  285.     if (insertQuery == null) {  
  286.       throw new IOException("SQL Insert command not specified");  
  287.     }  
  288.     try {  
  289.       if (user == null || pass == null) {  
  290.         con = DriverManager.getConnection(jdbcURL);  
  291.       } else {  
  292.         con = DriverManager.getConnection(jdbcURL, user, pass);  
  293.       }  
  294.       con.setAutoCommit(false);  
  295.       ps = con.prepareStatement(insertQuery);  
  296.     } catch (SQLException e) {  
  297.       log.error("Unable to connect to JDBC @" + jdbcURL);  
  298.       throw new IOException("JDBC Error", e);  
  299.     }  
  300.     count = 0;  
  301.   }  
  302.   
  303.   @Override  
  304.   public void setStoreLocation(String location, Job job) throws IOException {  
  305.     // IGNORE since we are writing records to DB.  
  306.   }  
  307. }  
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics