`
weitao1026
  • 浏览: 988763 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

Pig里面内置大量的工具函数

pig 
阅读更多

Pig里面内置大量的工具函数,也开放了大量的接口,来给我们开发者使用,通过UDF,我们可以非常方便的完成某些Pig不直接支持或没有的的功能,比如散仙前面几篇文章写的将pig分析完的结果,存储到各种各样的介质里面,而不仅仅局限于HDFS,当然,我们也可以在都存。

那么如何实现自己的存储UDF呢? 提到这里,我们不得不说下pig里面的load和store函数,load函数是从某个数据源,加载数据,一般都是从HDFS上加载,而store函数则是将分析完的结果,存储到HDFS用的,所以,我们只需继承重写store的功能函数StoreFunc即可完成我们的大部分需求,懂的了这个,我们就可以将结果任意存储了,可以存到数据库,也可以存到索引文件,也可以存入本地txt,excel等等

下面先看下StoreFunc的源码:

Java代码 复制代码 收藏代码
  1. /* 
  2.  * Licensed to the Apache Software Foundation (ASF) under one 
  3.  * or more contributor license agreements.  See the NOTICE file 
  4.  * distributed with this work for additional information 
  5.  * regarding copyright ownership.  The ASF licenses this file 
  6.  * to you under the Apache License, Version 2.0 (the 
  7.  * "License"); you may not use this file except in compliance 
  8.  * with the License.  You may obtain a copy of the License at 
  9.  * 
  10.  *     http://www.apache.org/licenses/LICENSE-2.0 
  11.  * 
  12.  * Unless required by applicable law or agreed to in writing, software 
  13.  * distributed under the License is distributed on an "AS IS" BASIS, 
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
  15.  * See the License for the specific language governing permissions and 
  16.  * limitations under the License. 
  17.  */  
  18. package org.apache.pig;  
  19.   
  20. import java.io.IOException;  
  1. import org.apache.hadoop.fs.FileSystem;  
  2. import org.apache.hadoop.fs.Path;  
  3. import org.apache.hadoop.mapreduce.Counter;  
  4. import org.apache.hadoop.mapreduce.Job;  
  5. import org.apache.hadoop.mapreduce.OutputFormat;  
  6. import org.apache.hadoop.mapreduce.RecordWriter;  
  7.   
  8. import org.apache.pig.classification.InterfaceAudience;  
  9. import org.apache.pig.classification.InterfaceStability;  
  10. import org.apache.pig.data.Tuple;  
  11. import org.apache.pig.impl.util.UDFContext;  
  12. import org.apache.pig.tools.pigstats.PigStatusReporter;  
  13.   
  14.   
  15. /** 
  16. * StoreFuncs take records from Pig's processing and store them into a data store.  Most frequently 
  17. * this is an HDFS file, but it could also be an HBase instance, RDBMS, etc. 
  18. */  
  19. @InterfaceAudience.Public  
  20. @InterfaceStability.Stable  
  21. public abstract class StoreFunc implements StoreFuncInterface {  
  22.   
  23.     /** 
  24.      * This method is called by the Pig runtime in the front end to convert the 
  25.      * output location to an absolute path if the location is relative. The 
  26.      * StoreFunc implementation is free to choose how it converts a relative  
  27.      * location to an absolute location since this may depend on what the location 
  28.      * string represent (hdfs path or some other data source).  
  29.      *   
  30.      *  
  31.      * @param location location as provided in the "store" statement of the script 
  32.      * @param curDir the current working direction based on any "cd" statements 
  33.      * in the script before the "store" statement. If there are no "cd" statements 
  34.      * in the script, this would be the home directory -  
  35.      * <pre>/user/<username> </pre> 
  36.      * @return the absolute location based on the arguments passed 
  37.      * @throws IOException if the conversion is not possible 
  38.      */  
  39.     @Override  
  40.     public String relToAbsPathForStoreLocation(String location, Path curDir)   
  41.     throws IOException {  
  42.         return LoadFunc.getAbsolutePath(location, curDir);  
  43.     }  
  44.   
  45.     /** 
  46.      * Return the OutputFormat associated with StoreFunc.  This will be called 
  47.      * on the front end during planning and on the backend during 
  48.      * execution.  
  49.      * @return the {@link OutputFormat} associated with StoreFunc 
  50.      * @throws IOException if an exception occurs while constructing the  
  51.      * OutputFormat 
  52.      * 
  53.      */  
  54.     public abstract OutputFormat getOutputFormat() throws IOException;  
  55.   
  56.     /** 
  57.      * Communicate to the storer the location where the data needs to be stored.   
  58.      * The location string passed to the {@link StoreFunc} here is the  
  59.      * return value of {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)}  
  60.      * This method will be called in the frontend and backend multiple times. Implementations 
  61.      * should bear in mind that this method is called multiple times and should 
  62.      * ensure there are no inconsistent side effects due to the multiple calls. 
  63.      * {@link #checkSchema(ResourceSchema)} will be called before any call to 
  64.      * {@link #setStoreLocation(String, Job)}. 
  65.      *  
  66.  
  67.      * @param location Location returned by  
  68.      * {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)} 
  69.      * @param job The {@link Job} object 
  70.      * @throws IOException if the location is not valid. 
  71.      */  
  72.     public abstract void setStoreLocation(String location, Job job) throws IOException;  
  73.    
  74.     /** 
  75.      * Set the schema for data to be stored.  This will be called on the 
  76.      * front end during planning if the store is associated with a schema. 
  77.      * A Store function should implement this function to 
  78.      * check that a given schema is acceptable to it.  For example, it 
  79.      * can check that the correct partition keys are included; 
  80.      * a storage function to be written directly to an OutputFormat can 
  81.      * make sure the schema will translate in a well defined way.  Default implementation 
  82.      * is a no-op. 
  83.      * @param s to be checked 
  84.      * @throws IOException if this schema is not acceptable.  It should include 
  85.      * a detailed error message indicating what is wrong with the schema. 
  86.      */  
  87.     @Override  
  88.     public void checkSchema(ResourceSchema s) throws IOException {  
  89.         // default implementation is a no-op  
  90.     }  
  91.   
  92.     /** 
  93.      * Initialize StoreFunc to write data.  This will be called during 
  94.      * execution on the backend before the call to putNext. 
  95.      * @param writer RecordWriter to use. 
  96.      * @throws IOException if an exception occurs during initialization 
  97.      */  
  98.     public abstract void prepareToWrite(RecordWriter writer) throws IOException;  
  99.   
  100.     /** 
  101.      * Write a tuple to the data store. 
  102.      *  
  103.      * @param t the tuple to store. 
  104.      * @throws IOException if an exception occurs during the write 
  105.      */  
  106.     public abstract void putNext(Tuple t) throws IOException;  
  107.       
  108.     /** 
  109.      * This method will be called by Pig both in the front end and back end to 
  110.      * pass a unique signature to the {@link StoreFunc} which it can use to store 
  111.      * information in the {@link UDFContext} which it needs to store between 
  112.      * various method invocations in the front end and back end. This method  
  113.      * will be called before other methods in {@link StoreFunc}.  This is necessary 
  114.      * because in a Pig Latin script with multiple stores, the different 
  115.      * instances of store functions need to be able to find their (and only their) 
  116.      * data in the UDFContext object.  The default implementation is a no-op. 
  117.      * @param signature a unique signature to identify this StoreFunc 
  118.      */  
  119.     @Override  
  120.     public void setStoreFuncUDFContextSignature(String signature) {  
  121.         // default implementation is a no-op  
  122.     }  
  123.       
  124.     /** 
  125.      * This method will be called by Pig if the job which contains this store 
  126.      * fails. Implementations can clean up output locations in this method to 
  127.      * ensure that no incorrect/incomplete results are left in the output location. 
  128.      * The default implementation  deletes the output location if it 
  129.      * is a {@link FileSystem} location. 
  130.      * @param location Location returned by  
  131.      * {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)} 
  132.      * @param job The {@link Job} object - this should be used only to obtain  
  133.      * cluster properties through {@link Job#getConfiguration()} and not to set/query 
  134.      * any runtime job information.  
  135.      */  
  136.     @Override  
  137.     public void cleanupOnFailure(String location, Job job)   
  138.     throws IOException {  
  139.         cleanupOnFailureImpl(location, job);  
  140.     }  
  141.   
  142.     /** 
  143.      * This method will be called by Pig if the job which contains this store 
  144.      * is successful, and some cleanup of intermediate resources is required. 
  145.      * Implementations can clean up output locations in this method to 
  146.      * ensure that no incorrect/incomplete results are left in the output location. 
  147.      * @param location Location returned by  
  148.      * {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)} 
  149.      * @param job The {@link Job} object - this should be used only to obtain  
  150.      * cluster properties through {@link Job#getConfiguration()} and not to set/query 
  151.      * any runtime job information.  
  152.      */  
  153.     @Override  
  154.     public void cleanupOnSuccess(String location, Job job)   
  155.     throws IOException {  
  156.         // DEFAULT: DO NOTHING, user-defined overrides can  
  157.         // call cleanupOnFailureImpl(location, job) or ...?  
  158.     }  
  159.       
  160.     /** 
  161.      * Default implementation for {@link #cleanupOnFailure(String, Job)} 
  162.      * and {@link #cleanupOnSuccess(String, Job)}.  This removes a file 
  163.      * from HDFS. 
  164.      * @param location file name (or URI) of file to remove 
  165.      * @param job Hadoop job, used to access the appropriate file system. 
  166.      * @throws IOException 
  167.      */  
  168.     public static void cleanupOnFailureImpl(String location, Job job)   
  169.     throws IOException {          
  170.         Path path = new Path(location);  
  171.         FileSystem fs = path.getFileSystem(job.getConfiguration());  
  172.         if(fs.exists(path)){  
  173.             fs.delete(path, true);  
  174.         }      
  175.     }  
  176.       
  177.     /** 
  178.      * Issue a warning.  Warning messages are aggregated and reported to 
  179.      * the user. 
  180.      * @param msg String message of the warning 
  181.      * @param warningEnum type of warning 
  182.      */  
  183.     public final void warn(String msg, Enum warningEnum) {  
  184.         Counter counter = PigStatusReporter.getInstance().getCounter(warningEnum);  
  185.         counter.increment(1);  
  186.     }  
  187. }  
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.pig;

import java.io.IOException;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;

import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.tools.pigstats.PigStatusReporter;


/**
* StoreFuncs take records from Pig's processing and store them into a data store.  Most frequently
* this is an HDFS file, but it could also be an HBase instance, RDBMS, etc.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class StoreFunc implements StoreFuncInterface {

    /**
     * This method is called by the Pig runtime in the front end to convert the
     * output location to an absolute path if the location is relative. The
     * StoreFunc implementation is free to choose how it converts a relative 
     * location to an absolute location since this may depend on what the location
     * string represent (hdfs path or some other data source). 
     *  
     * 
     * @param location location as provided in the "store" statement of the script
     * @param curDir the current working direction based on any "cd" statements
     * in the script before the "store" statement. If there are no "cd" statements
     * in the script, this would be the home directory - 
     * <pre>/user/<username> </pre>
     * @return the absolute location based on the arguments passed
     * @throws IOException if the conversion is not possible
     */
    @Override
    public String relToAbsPathForStoreLocation(String location, Path curDir) 
    throws IOException {
        return LoadFunc.getAbsolutePath(location, curDir);
    }

    /**
     * Return the OutputFormat associated with StoreFunc.  This will be called
     * on the front end during planning and on the backend during
     * execution. 
     * @return the {@link OutputFormat} associated with StoreFunc
     * @throws IOException if an exception occurs while constructing the 
     * OutputFormat
     *
     */
    public abstract OutputFormat getOutputFormat() throws IOException;

    /**
     * Communicate to the storer the location where the data needs to be stored.  
     * The location string passed to the {@link StoreFunc} here is the 
     * return value of {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)} 
     * This method will be called in the frontend and backend multiple times. Implementations
     * should bear in mind that this method is called multiple times and should
     * ensure there are no inconsistent side effects due to the multiple calls.
     * {@link #checkSchema(ResourceSchema)} will be called before any call to
     * {@link #setStoreLocation(String, Job)}.
     * 

     * @param location Location returned by 
     * {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)}
     * @param job The {@link Job} object
     * @throws IOException if the location is not valid.
     */
    public abstract void setStoreLocation(String location, Job job) throws IOException;
 
    /**
     * Set the schema for data to be stored.  This will be called on the
     * front end during planning if the store is associated with a schema.
     * A Store function should implement this function to
     * check that a given schema is acceptable to it.  For example, it
     * can check that the correct partition keys are included;
     * a storage function to be written directly to an OutputFormat can
     * make sure the schema will translate in a well defined way.  Default implementation
     * is a no-op.
     * @param s to be checked
     * @throws IOException if this schema is not acceptable.  It should include
     * a detailed error message indicating what is wrong with the schema.
     */
    @Override
    public void checkSchema(ResourceSchema s) throws IOException {
        // default implementation is a no-op
    }

    /**
     * Initialize StoreFunc to write data.  This will be called during
     * execution on the backend before the call to putNext.
     * @param writer RecordWriter to use.
     * @throws IOException if an exception occurs during initialization
     */
    public abstract void prepareToWrite(RecordWriter writer) throws IOException;

    /**
     * Write a tuple to the data store.
     * 
     * @param t the tuple to store.
     * @throws IOException if an exception occurs during the write
     */
    public abstract void putNext(Tuple t) throws IOException;
    
    /**
     * This method will be called by Pig both in the front end and back end to
     * pass a unique signature to the {@link StoreFunc} which it can use to store
     * information in the {@link UDFContext} which it needs to store between
     * various method invocations in the front end and back end. This method 
     * will be called before other methods in {@link StoreFunc}.  This is necessary
     * because in a Pig Latin script with multiple stores, the different
     * instances of store functions need to be able to find their (and only their)
     * data in the UDFContext object.  The default implementation is a no-op.
     * @param signature a unique signature to identify this StoreFunc
     */
    @Override
    public void setStoreFuncUDFContextSignature(String signature) {
        // default implementation is a no-op
    }
    
    /**
     * This method will be called by Pig if the job which contains this store
     * fails. Implementations can clean up output locations in this method to
     * ensure that no incorrect/incomplete results are left in the output location.
     * The default implementation  deletes the output location if it
     * is a {@link FileSystem} location.
     * @param location Location returned by 
     * {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)}
     * @param job The {@link Job} object - this should be used only to obtain 
     * cluster properties through {@link Job#getConfiguration()} and not to set/query
     * any runtime job information. 
     */
    @Override
    public void cleanupOnFailure(String location, Job job) 
    throws IOException {
        cleanupOnFailureImpl(location, job);
    }

    /**
     * This method will be called by Pig if the job which contains this store
     * is successful, and some cleanup of intermediate resources is required.
     * Implementations can clean up output locations in this method to
     * ensure that no incorrect/incomplete results are left in the output location.
     * @param location Location returned by 
     * {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)}
     * @param job The {@link Job} object - this should be used only to obtain 
     * cluster properties through {@link Job#getConfiguration()} and not to set/query
     * any runtime job information. 
     */
    @Override
    public void cleanupOnSuccess(String location, Job job) 
    throws IOException {
        // DEFAULT: DO NOTHING, user-defined overrides can
        // call cleanupOnFailureImpl(location, job) or ...?
    }
    
    /**
     * Default implementation for {@link #cleanupOnFailure(String, Job)}
     * and {@link #cleanupOnSuccess(String, Job)}.  This removes a file
     * from HDFS.
     * @param location file name (or URI) of file to remove
     * @param job Hadoop job, used to access the appropriate file system.
     * @throws IOException
     */
    public static void cleanupOnFailureImpl(String location, Job job) 
    throws IOException {        
        Path path = new Path(location);
        FileSystem fs = path.getFileSystem(job.getConfiguration());
        if(fs.exists(path)){
            fs.delete(path, true);
        }    
    }
    
    /**
     * Issue a warning.  Warning messages are aggregated and reported to
     * the user.
     * @param msg String message of the warning
     * @param warningEnum type of warning
     */
    public final void warn(String msg, Enum warningEnum) {
        Counter counter = PigStatusReporter.getInstance().getCounter(warningEnum);
        counter.increment(1);
    }
}




这里面有许多方法,但并不都需要我们重新定义的,一般来说,我们只需要重写如下的几个抽象方法即可:

(1)getOutputFormat方法,与Hadoop的OutFormat对应,在最终的输出时,会根据不同的format方法,生成不同的形式。
(2)setStoreLocation方法,这个方法定义了生成文件的路径,如果不是存入HDFS上,则可以忽略。
(3)prepareToWrite 在写入数据之前做一些初始化工作
(4)putNext从Pig里面传递过来最终需要存储的数据




在1的步骤我们知道,需要提供一个outputFormat的类,这时就需要我们继承hadoop里面的某个outputformat基类,然后重写getRecordWriter方法,接下来我们还可能要继承RecordWriter类,来定义我们自己的输出格式,可能是一行txt数据,也有可能是一个对象,或一个索引集合等等,如下面支持lucene索引的outputformat

Java代码 复制代码 收藏代码
  1. package com.pig.support.lucene;  
  2.   
  3.   
  4.   
  5. import java.io.File;  
  6. import java.io.IOException;  
  7. import java.util.concurrent.atomic.AtomicInteger;  
  8.   
  9. import org.apache.hadoop.conf.Configuration;  
  10. import org.apache.hadoop.fs.FileSystem;  
  11. import org.apache.hadoop.fs.FileUtil;  
  12. import org.apache.hadoop.fs.Path;  
  13. import org.apache.hadoop.io.Writable;  
  14. import org.apache.hadoop.mapreduce.RecordWriter;  
  15. import org.apache.hadoop.mapreduce.TaskAttemptContext;  
  16. import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;  
  17. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  18. import org.apache.lucene.analysis.standard.StandardAnalyzer;  
  19. import org.apache.lucene.document.Document;  
  20. import org.apache.lucene.index.IndexWriter;  
  21. import org.apache.lucene.index.IndexWriterConfig;  
  22. import org.apache.lucene.index.LogByteSizeMergePolicy;  
  23. import org.apache.lucene.index.SerialMergeScheduler;  
  24. import org.apache.lucene.store.FSDirectory;  
  25. import org.apache.lucene.util.Version;  
  26.   
  27. /** 
  28.  * 继承FileOutputFormat,重写支持Lucene格式的outputFormat策略 
  29.  * */  
  30. public class LuceneOutputFormat extends FileOutputFormat<Writable, Document> {  
  31.   
  32.     String location;  
  33.     FileSystem fs;  
  34.     String taskid;  
  35.   
  36.     FileOutputCommitter committer;  
  37.     AtomicInteger counter = new AtomicInteger();  
  38.   
  39.     public LuceneOutputFormat(String location) {  
  40.         this.location = location;  
  41.     }  
  42.       
  43.     @Override  
  44.     public RecordWriter<Writable, Document> getRecordWriter(  
  45.             TaskAttemptContext ctx) throws IOException, InterruptedException {  
  46.   
  47.         Configuration conf = ctx.getConfiguration();  
  48.         fs = FileSystem.get(conf);  
  49.   
  50.         File baseDir = new File(System.getProperty("java.io.tmpdir"));  
  51.         String baseName = System.currentTimeMillis() + "-";  
  52.         File tempDir = new File(baseDir, baseName + counter.getAndIncrement());  
  53.         tempDir.mkdirs();  
  54.         tempDir.deleteOnExit();  
  55.   
  56.         return new LuceneRecordWriter(  
  57.                 (FileOutputCommitter) getOutputCommitter(ctx), tempDir);  
  58.     }  
  59.   
  60.     /** 
  61.      * Write out the LuceneIndex to a local temporary location.<br/> 
  62.      * On commit/close the index is copied to the hdfs output directory.<br/> 
  63.      * 
  64.      */  
  65.     static class LuceneRecordWriter extends RecordWriter<Writable, Document> {  
  66.   
  67.         final IndexWriter writer;  
  68.         final FileOutputCommitter committer;  
  69.         final File tmpdir;  
  70.   
  71.         public LuceneRecordWriter(FileOutputCommitter committer, File tmpdir) {  
  72.             try {  
  73.                 this.committer = committer;  
  74.                 this.tmpdir = tmpdir;  
  75.                 IndexWriterConfig config = new IndexWriterConfig(Version.LUCENE_4_10_2,  
  76.                         new StandardAnalyzer());  
  77.                 LogByteSizeMergePolicy mergePolicy = new LogByteSizeMergePolicy();  
  78.                 mergePolicy.setMergeFactor(10);  
  79.                 //mergePolicy.setUseCompoundFile(false);  
  80.                 config.setMergePolicy(mergePolicy);  
  81.                 config.setMergeScheduler(new SerialMergeScheduler());  
  82.   
  83.                 writer = new IndexWriter(FSDirectory.open(tmpdir),  
  84.                         config);  
  85.                   
  86.             } catch (IOException e) {  
  87.                 RuntimeException exc = new RuntimeException(e.toString(), e);  
  88.                 exc.setStackTrace(e.getStackTrace());  
  89.                 throw exc;  
  90.             }  
  91.         }  
  92.   
  93.         @Override  
  94.         public void close(final TaskAttemptContext ctx) throws IOException,  
  95.                 InterruptedException {  
  96.             //use a thread for status polling  
  97.             final Thread th = new Thread() {  
  98.                 public void run() {  
  99.                     ctx.progress();  
  100.                     try {  
  101.                         Thread.sleep(500);  
  102.                     } catch (InterruptedException e) {  
  103.                         Thread.currentThread().interrupt();  
  104.                         return;  
  105.                     }  
  106.                 }  
  107.             };  
  108.             th.start();  
  109.             try {  
  110.                 writer.forceMerge(1);  
  111.                 writer.close();  
  112.                 // move all files to part  
  113.                 Configuration conf = ctx.getConfiguration();  
  114.   
  115.                 Path work = committer.getWorkPath();  
  116.                 Path output = new Path(work, "index-"  
  117.                         + ctx.getTaskAttemptID().getTaskID().getId());  
  118.                 FileSystem fs = FileSystem.get(conf);  
  119.   
  120.                 FileUtil.copy(tmpdir, fs, output, true, conf);  
  121.             } finally {  
  122.                 th.interrupt();  
  123.             }  
  124.         }  
  125.   
  126.         @Override  
  127.         public void write(Writable key, Document doc) throws IOException,  
  128.                 InterruptedException {  
  129.             writer.addDocument(doc);  
  130.   
  131.         }  
  132.   
  133.     }  
  134. }  
package com.pig.support.lucene;



import java.io.File;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LogByteSizeMergePolicy;
import org.apache.lucene.index.SerialMergeScheduler;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.util.Version;

/**
 * 继承FileOutputFormat,重写支持Lucene格式的outputFormat策略
 * */
public class LuceneOutputFormat extends FileOutputFormat<Writable, Document> {

	String location;
	FileSystem fs;
	String taskid;

	FileOutputCommitter committer;
	AtomicInteger counter = new AtomicInteger();

	public LuceneOutputFormat(String location) {
		this.location = location;
	}
	
	@Override
	public RecordWriter<Writable, Document> getRecordWriter(
			TaskAttemptContext ctx) throws IOException, InterruptedException {

		Configuration conf = ctx.getConfiguration();
		fs = FileSystem.get(conf);

		File baseDir = new File(System.getProperty("java.io.tmpdir"));
		String baseName = System.currentTimeMillis() + "-";
		File tempDir = new File(baseDir, baseName + counter.getAndIncrement());
		tempDir.mkdirs();
		tempDir.deleteOnExit();

		return new LuceneRecordWriter(
				(FileOutputCommitter) getOutputCommitter(ctx), tempDir);
	}

	/**
	 * Write out the LuceneIndex to a local temporary location.<br/>
	 * On commit/close the index is copied to the hdfs output directory.<br/>
	 *
	 */
	static class LuceneRecordWriter extends RecordWriter<Writable, Document> {

		final IndexWriter writer;
		final FileOutputCommitter committer;
		final File tmpdir;

		public LuceneRecordWriter(FileOutputCommitter committer, File tmpdir) {
			try {
				this.committer = committer;
				this.tmpdir = tmpdir;
				IndexWriterConfig config = new IndexWriterConfig(Version.LUCENE_4_10_2,
						new StandardAnalyzer());
				LogByteSizeMergePolicy mergePolicy = new LogByteSizeMergePolicy();
			    mergePolicy.setMergeFactor(10);
			    //mergePolicy.setUseCompoundFile(false);
			    config.setMergePolicy(mergePolicy);
			    config.setMergeScheduler(new SerialMergeScheduler());

				writer = new IndexWriter(FSDirectory.open(tmpdir),
						config);
				
			} catch (IOException e) {
				RuntimeException exc = new RuntimeException(e.toString(), e);
				exc.setStackTrace(e.getStackTrace());
				throw exc;
			}
		}

		@Override
		public void close(final TaskAttemptContext ctx) throws IOException,
				InterruptedException {
			//use a thread for status polling
			final Thread th = new Thread() {
				public void run() {
					ctx.progress();
					try {
						Thread.sleep(500);
					} catch (InterruptedException e) {
						Thread.currentThread().interrupt();
						return;
					}
				}
			};
			th.start();
			try {
				writer.forceMerge(1);
				writer.close();
				// move all files to part
				Configuration conf = ctx.getConfiguration();

				Path work = committer.getWorkPath();
				Path output = new Path(work, "index-"
						+ ctx.getTaskAttemptID().getTaskID().getId());
				FileSystem fs = FileSystem.get(conf);

				FileUtil.copy(tmpdir, fs, output, true, conf);
			} finally {
				th.interrupt();
			}
		}

		@Override
		public void write(Writable key, Document doc) throws IOException,
				InterruptedException {
			writer.addDocument(doc);

		}

	}
}




最后总结一下,自定义输入格式的步骤:

(1)继承StoreFunc函数,重写其方法
(2)继承一个outputformat基类,重写自己的outputformat类
(2)继承一个RecodeWriter,重写自己的writer方法


当然这并不都是必须的,比如在向数据库存储的时候,我们就可以直接在putNext的时候,获取,保存为集合,然后在OutputCommitter提交成功之后,commit我们的数据,如果保存失败,我们也可以在abort方法里回滚我们的数据。

分享到:
评论

相关推荐

    pig-json:用于 Apache Pig 的 Mortar JSON 工具

    猪-json 用于 Apache Pig 的 Mortar JSON 工具。... 使用 pig-json 优于 Pig 的内置 JsonStorage 的好处是 pig-json 读取任意 JSON 数据而无需元数据文件,而 Pig 内置的 JsonStorage 只能读取它使

    pig格式图片编辑工具

    用于图片格式转换pig, pceg, 等转换压缩,变换图片大小,实用,方便。

    Pig编程指南

    《Pig编程指南》不仅为初学者讲解ApachePig的基础知识,同时也向有一定使用经验的高级用户介绍更加综合全面的Pig重要特性,如PigLatin脚本语言、控制台shell交互命令以及用于对Pig进行拓展的用户自定义函数(UDF)等。...

    pig udf 函数(urldecode row_number tomap)

    pig udf,实现了 urldecode、 row_number、 tomap.版本使用cdh4.1.2,如果需要在别的版本中使用,请替换工程文件中的两个jar包,对应您需要的版本。

    pig-0.7.0.tar.gz

    Hadoop系统的pig工具包,很好用的

    《Hadoop系统搭建及项目实践》课件09分布式数据分析工具 Pig.pdf

    《Hadoop系统搭建及项目实践》课件09分布式数据分析工具 Pig.pdf《Hadoop系统搭建及项目实践》课件09分布式数据分析工具 Pig.pdf《Hadoop系统搭建及项目实践》课件09分布式数据分析工具 Pig.pdf《Hadoop系统搭建及...

    apache pig 基础及应用

    apache pig 基础及应用,urldecode row_number web日志分析 根据 用户行为 做出 简易的 相似度 判断。

    PIG微服务前后端源码

    PIG(国内微服务热度最高的一个社区)

    PIGO搜索 v2.3

    3、增加站内搜索框功能,这样做站的朋友能使用PIGO做站内搜索了,也能卖PIGO搜索里面的广告了……等等。由于工程庞大,下个版本估计会在几个月后完成,大家淡定!最新消息,会公布在PHPig.net。 安装环境1、PHP5.0...

    Beginning Apache Pig(Apress,2016)

    Learn to use Apache Pig to develop lightweight big data applications easily and quickly. This book shows you many optimization techniques and covers every context where Pig is used in big data ...

    Pig安装与Pig Latin语言,应用案例.

    来试试Pig安装,与学学Pig Latin语言,玩玩应用案例

    pig源码0.15版

    pig0.15源码,适合小白学习大数据参考和使用

    大数据之pig 命令

    pig将就的命令

    大数据pig实战

    大数据pig实战,大数据pig实战,大数据pig实战大数据pig实战大数据pig实战

    Beginning Apache Pig: Big Data Processing Made Easy [2016]

    Beginning Apache Pig: Big Data Processing Made Easy English | 29 Dec. 2016 | ISBN: 1484223365 | 300 Pages | PDF | 4.9 MB Learn to use Apache Pig to develop lightweight big data applications easily ...

    pig官方udf教程

    pig官方的udf教程,介绍了 Writing Java UDFs Writing Python UDFs Writing JavaScript UDFs Writing Ruby UDFs Piggy Bank,一个开源pig的udf包,主要是java

    Pig-Toolbox_v1.0.7.6

    Pig-Toolbox_v1.0.7.6 谷歌浏览器插件,CRX可解压,用开发者模式安装

    大数据Pig学习

    pig学习 PPT

Global site tag (gtag.js) - Google Analytics