Pig里面内置大量的工具函数,也开放了大量的接口,来给我们开发者使用,通过UDF,我们可以非常方便的完成某些Pig不直接支持或没有的的功能,比如散仙前面几篇文章写的将pig分析完的结果,存储到各种各样的介质里面,而不仅仅局限于HDFS,当然,我们也可以在都存。
那么如何实现自己的存储UDF呢? 提到这里,我们不得不说下pig里面的load和store函数,load函数是从某个数据源,加载数据,一般都是从HDFS上加载,而store函数则是将分析完的结果,存储到HDFS用的,所以,我们只需继承重写store的功能函数StoreFunc即可完成我们的大部分需求,懂的了这个,我们就可以将结果任意存储了,可以存到数据库,也可以存到索引文件,也可以存入本地txt,excel等等
下面先看下StoreFunc的源码:
- /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.pig;
- import java.io.IOException;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.mapreduce.Counter;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.OutputFormat;
- import org.apache.hadoop.mapreduce.RecordWriter;
- import org.apache.pig.classification.InterfaceAudience;
- import org.apache.pig.classification.InterfaceStability;
- import org.apache.pig.data.Tuple;
- import org.apache.pig.impl.util.UDFContext;
- import org.apache.pig.tools.pigstats.PigStatusReporter;
- /**
- * StoreFuncs take records from Pig's processing and store them into a data store. Most frequently
- * this is an HDFS file, but it could also be an HBase instance, RDBMS, etc.
- */
- @InterfaceAudience.Public
- @InterfaceStability.Stable
- public abstract class StoreFunc implements StoreFuncInterface {
- /**
- * This method is called by the Pig runtime in the front end to convert the
- * output location to an absolute path if the location is relative. The
- * StoreFunc implementation is free to choose how it converts a relative
- * location to an absolute location since this may depend on what the location
- * string represent (hdfs path or some other data source).
- *
- *
- * @param location location as provided in the "store" statement of the script
- * @param curDir the current working direction based on any "cd" statements
- * in the script before the "store" statement. If there are no "cd" statements
- * in the script, this would be the home directory -
- * <pre>/user/<username> </pre>
- * @return the absolute location based on the arguments passed
- * @throws IOException if the conversion is not possible
- */
- @Override
- public String relToAbsPathForStoreLocation(String location, Path curDir)
- throws IOException {
- return LoadFunc.getAbsolutePath(location, curDir);
- }
- /**
- * Return the OutputFormat associated with StoreFunc. This will be called
- * on the front end during planning and on the backend during
- * execution.
- * @return the {@link OutputFormat} associated with StoreFunc
- * @throws IOException if an exception occurs while constructing the
- * OutputFormat
- *
- */
- public abstract OutputFormat getOutputFormat() throws IOException;
- /**
- * Communicate to the storer the location where the data needs to be stored.
- * The location string passed to the {@link StoreFunc} here is the
- * return value of {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)}
- * This method will be called in the frontend and backend multiple times. Implementations
- * should bear in mind that this method is called multiple times and should
- * ensure there are no inconsistent side effects due to the multiple calls.
- * {@link #checkSchema(ResourceSchema)} will be called before any call to
- * {@link #setStoreLocation(String, Job)}.
- *
- * @param location Location returned by
- * {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)}
- * @param job The {@link Job} object
- * @throws IOException if the location is not valid.
- */
- public abstract void setStoreLocation(String location, Job job) throws IOException;
- /**
- * Set the schema for data to be stored. This will be called on the
- * front end during planning if the store is associated with a schema.
- * A Store function should implement this function to
- * check that a given schema is acceptable to it. For example, it
- * can check that the correct partition keys are included;
- * a storage function to be written directly to an OutputFormat can
- * make sure the schema will translate in a well defined way. Default implementation
- * is a no-op.
- * @param s to be checked
- * @throws IOException if this schema is not acceptable. It should include
- * a detailed error message indicating what is wrong with the schema.
- */
- @Override
- public void checkSchema(ResourceSchema s) throws IOException {
- // default implementation is a no-op
- }
- /**
- * Initialize StoreFunc to write data. This will be called during
- * execution on the backend before the call to putNext.
- * @param writer RecordWriter to use.
- * @throws IOException if an exception occurs during initialization
- */
- public abstract void prepareToWrite(RecordWriter writer) throws IOException;
- /**
- * Write a tuple to the data store.
- *
- * @param t the tuple to store.
- * @throws IOException if an exception occurs during the write
- */
- public abstract void putNext(Tuple t) throws IOException;
- /**
- * This method will be called by Pig both in the front end and back end to
- * pass a unique signature to the {@link StoreFunc} which it can use to store
- * information in the {@link UDFContext} which it needs to store between
- * various method invocations in the front end and back end. This method
- * will be called before other methods in {@link StoreFunc}. This is necessary
- * because in a Pig Latin script with multiple stores, the different
- * instances of store functions need to be able to find their (and only their)
- * data in the UDFContext object. The default implementation is a no-op.
- * @param signature a unique signature to identify this StoreFunc
- */
- @Override
- public void setStoreFuncUDFContextSignature(String signature) {
- // default implementation is a no-op
- }
- /**
- * This method will be called by Pig if the job which contains this store
- * fails. Implementations can clean up output locations in this method to
- * ensure that no incorrect/incomplete results are left in the output location.
- * The default implementation deletes the output location if it
- * is a {@link FileSystem} location.
- * @param location Location returned by
- * {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)}
- * @param job The {@link Job} object - this should be used only to obtain
- * cluster properties through {@link Job#getConfiguration()} and not to set/query
- * any runtime job information.
- */
- @Override
- public void cleanupOnFailure(String location, Job job)
- throws IOException {
- cleanupOnFailureImpl(location, job);
- }
- /**
- * This method will be called by Pig if the job which contains this store
- * is successful, and some cleanup of intermediate resources is required.
- * Implementations can clean up output locations in this method to
- * ensure that no incorrect/incomplete results are left in the output location.
- * @param location Location returned by
- * {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)}
- * @param job The {@link Job} object - this should be used only to obtain
- * cluster properties through {@link Job#getConfiguration()} and not to set/query
- * any runtime job information.
- */
- @Override
- public void cleanupOnSuccess(String location, Job job)
- throws IOException {
- // DEFAULT: DO NOTHING, user-defined overrides can
- // call cleanupOnFailureImpl(location, job) or ...?
- }
- /**
- * Default implementation for {@link #cleanupOnFailure(String, Job)}
- * and {@link #cleanupOnSuccess(String, Job)}. This removes a file
- * from HDFS.
- * @param location file name (or URI) of file to remove
- * @param job Hadoop job, used to access the appropriate file system.
- * @throws IOException
- */
- public static void cleanupOnFailureImpl(String location, Job job)
- throws IOException {
- Path path = new Path(location);
- FileSystem fs = path.getFileSystem(job.getConfiguration());
- if(fs.exists(path)){
- fs.delete(path, true);
- }
- }
- /**
- * Issue a warning. Warning messages are aggregated and reported to
- * the user.
- * @param msg String message of the warning
- * @param warningEnum type of warning
- */
- public final void warn(String msg, Enum warningEnum) {
- Counter counter = PigStatusReporter.getInstance().getCounter(warningEnum);
- counter.increment(1);
- }
- }
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.pig; import java.io.IOException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.pig.classification.InterfaceAudience; import org.apache.pig.classification.InterfaceStability; import org.apache.pig.data.Tuple; import org.apache.pig.impl.util.UDFContext; import org.apache.pig.tools.pigstats.PigStatusReporter; /** * StoreFuncs take records from Pig's processing and store them into a data store. Most frequently * this is an HDFS file, but it could also be an HBase instance, RDBMS, etc. */ @InterfaceAudience.Public @InterfaceStability.Stable public abstract class StoreFunc implements StoreFuncInterface { /** * This method is called by the Pig runtime in the front end to convert the * output location to an absolute path if the location is relative. The * StoreFunc implementation is free to choose how it converts a relative * location to an absolute location since this may depend on what the location * string represent (hdfs path or some other data source). * * * @param location location as provided in the "store" statement of the script * @param curDir the current working direction based on any "cd" statements * in the script before the "store" statement. If there are no "cd" statements * in the script, this would be the home directory - * <pre>/user/<username> </pre> * @return the absolute location based on the arguments passed * @throws IOException if the conversion is not possible */ @Override public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException { return LoadFunc.getAbsolutePath(location, curDir); } /** * Return the OutputFormat associated with StoreFunc. This will be called * on the front end during planning and on the backend during * execution. * @return the {@link OutputFormat} associated with StoreFunc * @throws IOException if an exception occurs while constructing the * OutputFormat * */ public abstract OutputFormat getOutputFormat() throws IOException; /** * Communicate to the storer the location where the data needs to be stored. * The location string passed to the {@link StoreFunc} here is the * return value of {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)} * This method will be called in the frontend and backend multiple times. Implementations * should bear in mind that this method is called multiple times and should * ensure there are no inconsistent side effects due to the multiple calls. * {@link #checkSchema(ResourceSchema)} will be called before any call to * {@link #setStoreLocation(String, Job)}. * * @param location Location returned by * {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)} * @param job The {@link Job} object * @throws IOException if the location is not valid. */ public abstract void setStoreLocation(String location, Job job) throws IOException; /** * Set the schema for data to be stored. This will be called on the * front end during planning if the store is associated with a schema. * A Store function should implement this function to * check that a given schema is acceptable to it. For example, it * can check that the correct partition keys are included; * a storage function to be written directly to an OutputFormat can * make sure the schema will translate in a well defined way. Default implementation * is a no-op. * @param s to be checked * @throws IOException if this schema is not acceptable. It should include * a detailed error message indicating what is wrong with the schema. */ @Override public void checkSchema(ResourceSchema s) throws IOException { // default implementation is a no-op } /** * Initialize StoreFunc to write data. This will be called during * execution on the backend before the call to putNext. * @param writer RecordWriter to use. * @throws IOException if an exception occurs during initialization */ public abstract void prepareToWrite(RecordWriter writer) throws IOException; /** * Write a tuple to the data store. * * @param t the tuple to store. * @throws IOException if an exception occurs during the write */ public abstract void putNext(Tuple t) throws IOException; /** * This method will be called by Pig both in the front end and back end to * pass a unique signature to the {@link StoreFunc} which it can use to store * information in the {@link UDFContext} which it needs to store between * various method invocations in the front end and back end. This method * will be called before other methods in {@link StoreFunc}. This is necessary * because in a Pig Latin script with multiple stores, the different * instances of store functions need to be able to find their (and only their) * data in the UDFContext object. The default implementation is a no-op. * @param signature a unique signature to identify this StoreFunc */ @Override public void setStoreFuncUDFContextSignature(String signature) { // default implementation is a no-op } /** * This method will be called by Pig if the job which contains this store * fails. Implementations can clean up output locations in this method to * ensure that no incorrect/incomplete results are left in the output location. * The default implementation deletes the output location if it * is a {@link FileSystem} location. * @param location Location returned by * {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)} * @param job The {@link Job} object - this should be used only to obtain * cluster properties through {@link Job#getConfiguration()} and not to set/query * any runtime job information. */ @Override public void cleanupOnFailure(String location, Job job) throws IOException { cleanupOnFailureImpl(location, job); } /** * This method will be called by Pig if the job which contains this store * is successful, and some cleanup of intermediate resources is required. * Implementations can clean up output locations in this method to * ensure that no incorrect/incomplete results are left in the output location. * @param location Location returned by * {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)} * @param job The {@link Job} object - this should be used only to obtain * cluster properties through {@link Job#getConfiguration()} and not to set/query * any runtime job information. */ @Override public void cleanupOnSuccess(String location, Job job) throws IOException { // DEFAULT: DO NOTHING, user-defined overrides can // call cleanupOnFailureImpl(location, job) or ...? } /** * Default implementation for {@link #cleanupOnFailure(String, Job)} * and {@link #cleanupOnSuccess(String, Job)}. This removes a file * from HDFS. * @param location file name (or URI) of file to remove * @param job Hadoop job, used to access the appropriate file system. * @throws IOException */ public static void cleanupOnFailureImpl(String location, Job job) throws IOException { Path path = new Path(location); FileSystem fs = path.getFileSystem(job.getConfiguration()); if(fs.exists(path)){ fs.delete(path, true); } } /** * Issue a warning. Warning messages are aggregated and reported to * the user. * @param msg String message of the warning * @param warningEnum type of warning */ public final void warn(String msg, Enum warningEnum) { Counter counter = PigStatusReporter.getInstance().getCounter(warningEnum); counter.increment(1); } }
这里面有许多方法,但并不都需要我们重新定义的,一般来说,我们只需要重写如下的几个抽象方法即可:
(1)getOutputFormat方法,与Hadoop的OutFormat对应,在最终的输出时,会根据不同的format方法,生成不同的形式。
(2)setStoreLocation方法,这个方法定义了生成文件的路径,如果不是存入HDFS上,则可以忽略。
(3)prepareToWrite 在写入数据之前做一些初始化工作
(4)putNext从Pig里面传递过来最终需要存储的数据
在1的步骤我们知道,需要提供一个outputFormat的类,这时就需要我们继承hadoop里面的某个outputformat基类,然后重写getRecordWriter方法,接下来我们还可能要继承RecordWriter类,来定义我们自己的输出格式,可能是一行txt数据,也有可能是一个对象,或一个索引集合等等,如下面支持lucene索引的outputformat
- package com.pig.support.lucene;
- import java.io.File;
- import java.io.IOException;
- import java.util.concurrent.atomic.AtomicInteger;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.FileUtil;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.Writable;
- import org.apache.hadoop.mapreduce.RecordWriter;
- import org.apache.hadoop.mapreduce.TaskAttemptContext;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.lucene.analysis.standard.StandardAnalyzer;
- import org.apache.lucene.document.Document;
- import org.apache.lucene.index.IndexWriter;
- import org.apache.lucene.index.IndexWriterConfig;
- import org.apache.lucene.index.LogByteSizeMergePolicy;
- import org.apache.lucene.index.SerialMergeScheduler;
- import org.apache.lucene.store.FSDirectory;
- import org.apache.lucene.util.Version;
- /**
- * 继承FileOutputFormat,重写支持Lucene格式的outputFormat策略
- * */
- public class LuceneOutputFormat extends FileOutputFormat<Writable, Document> {
- String location;
- FileSystem fs;
- String taskid;
- FileOutputCommitter committer;
- AtomicInteger counter = new AtomicInteger();
- public LuceneOutputFormat(String location) {
- this.location = location;
- }
- @Override
- public RecordWriter<Writable, Document> getRecordWriter(
- TaskAttemptContext ctx) throws IOException, InterruptedException {
- Configuration conf = ctx.getConfiguration();
- fs = FileSystem.get(conf);
- File baseDir = new File(System.getProperty("java.io.tmpdir"));
- String baseName = System.currentTimeMillis() + "-";
- File tempDir = new File(baseDir, baseName + counter.getAndIncrement());
- tempDir.mkdirs();
- tempDir.deleteOnExit();
- return new LuceneRecordWriter(
- (FileOutputCommitter) getOutputCommitter(ctx), tempDir);
- }
- /**
- * Write out the LuceneIndex to a local temporary location.<br/>
- * On commit/close the index is copied to the hdfs output directory.<br/>
- *
- */
- static class LuceneRecordWriter extends RecordWriter<Writable, Document> {
- final IndexWriter writer;
- final FileOutputCommitter committer;
- final File tmpdir;
- public LuceneRecordWriter(FileOutputCommitter committer, File tmpdir) {
- try {
- this.committer = committer;
- this.tmpdir = tmpdir;
- IndexWriterConfig config = new IndexWriterConfig(Version.LUCENE_4_10_2,
- new StandardAnalyzer());
- LogByteSizeMergePolicy mergePolicy = new LogByteSizeMergePolicy();
- mergePolicy.setMergeFactor(10);
- //mergePolicy.setUseCompoundFile(false);
- config.setMergePolicy(mergePolicy);
- config.setMergeScheduler(new SerialMergeScheduler());
- writer = new IndexWriter(FSDirectory.open(tmpdir),
- config);
- } catch (IOException e) {
- RuntimeException exc = new RuntimeException(e.toString(), e);
- exc.setStackTrace(e.getStackTrace());
- throw exc;
- }
- }
- @Override
- public void close(final TaskAttemptContext ctx) throws IOException,
- InterruptedException {
- //use a thread for status polling
- final Thread th = new Thread() {
- public void run() {
- ctx.progress();
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return;
- }
- }
- };
- th.start();
- try {
- writer.forceMerge(1);
- writer.close();
- // move all files to part
- Configuration conf = ctx.getConfiguration();
- Path work = committer.getWorkPath();
- Path output = new Path(work, "index-"
- + ctx.getTaskAttemptID().getTaskID().getId());
- FileSystem fs = FileSystem.get(conf);
- FileUtil.copy(tmpdir, fs, output, true, conf);
- } finally {
- th.interrupt();
- }
- }
- @Override
- public void write(Writable key, Document doc) throws IOException,
- InterruptedException {
- writer.addDocument(doc);
- }
- }
- }
package com.pig.support.lucene; import java.io.File; import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.lucene.analysis.standard.StandardAnalyzer; import org.apache.lucene.document.Document; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.LogByteSizeMergePolicy; import org.apache.lucene.index.SerialMergeScheduler; import org.apache.lucene.store.FSDirectory; import org.apache.lucene.util.Version; /** * 继承FileOutputFormat,重写支持Lucene格式的outputFormat策略 * */ public class LuceneOutputFormat extends FileOutputFormat<Writable, Document> { String location; FileSystem fs; String taskid; FileOutputCommitter committer; AtomicInteger counter = new AtomicInteger(); public LuceneOutputFormat(String location) { this.location = location; } @Override public RecordWriter<Writable, Document> getRecordWriter( TaskAttemptContext ctx) throws IOException, InterruptedException { Configuration conf = ctx.getConfiguration(); fs = FileSystem.get(conf); File baseDir = new File(System.getProperty("java.io.tmpdir")); String baseName = System.currentTimeMillis() + "-"; File tempDir = new File(baseDir, baseName + counter.getAndIncrement()); tempDir.mkdirs(); tempDir.deleteOnExit(); return new LuceneRecordWriter( (FileOutputCommitter) getOutputCommitter(ctx), tempDir); } /** * Write out the LuceneIndex to a local temporary location.<br/> * On commit/close the index is copied to the hdfs output directory.<br/> * */ static class LuceneRecordWriter extends RecordWriter<Writable, Document> { final IndexWriter writer; final FileOutputCommitter committer; final File tmpdir; public LuceneRecordWriter(FileOutputCommitter committer, File tmpdir) { try { this.committer = committer; this.tmpdir = tmpdir; IndexWriterConfig config = new IndexWriterConfig(Version.LUCENE_4_10_2, new StandardAnalyzer()); LogByteSizeMergePolicy mergePolicy = new LogByteSizeMergePolicy(); mergePolicy.setMergeFactor(10); //mergePolicy.setUseCompoundFile(false); config.setMergePolicy(mergePolicy); config.setMergeScheduler(new SerialMergeScheduler()); writer = new IndexWriter(FSDirectory.open(tmpdir), config); } catch (IOException e) { RuntimeException exc = new RuntimeException(e.toString(), e); exc.setStackTrace(e.getStackTrace()); throw exc; } } @Override public void close(final TaskAttemptContext ctx) throws IOException, InterruptedException { //use a thread for status polling final Thread th = new Thread() { public void run() { ctx.progress(); try { Thread.sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return; } } }; th.start(); try { writer.forceMerge(1); writer.close(); // move all files to part Configuration conf = ctx.getConfiguration(); Path work = committer.getWorkPath(); Path output = new Path(work, "index-" + ctx.getTaskAttemptID().getTaskID().getId()); FileSystem fs = FileSystem.get(conf); FileUtil.copy(tmpdir, fs, output, true, conf); } finally { th.interrupt(); } } @Override public void write(Writable key, Document doc) throws IOException, InterruptedException { writer.addDocument(doc); } } }
最后总结一下,自定义输入格式的步骤:
(1)继承StoreFunc函数,重写其方法
(2)继承一个outputformat基类,重写自己的outputformat类
(2)继承一个RecodeWriter,重写自己的writer方法
当然这并不都是必须的,比如在向数据库存储的时候,我们就可以直接在putNext的时候,获取,保存为集合,然后在OutputCommitter提交成功之后,commit我们的数据,如果保存失败,我们也可以在abort方法里回滚我们的数据。
相关推荐
猪-json 用于 Apache Pig 的 Mortar JSON 工具。... 使用 pig-json 优于 Pig 的内置 JsonStorage 的好处是 pig-json 读取任意 JSON 数据而无需元数据文件,而 Pig 内置的 JsonStorage 只能读取它使
用于图片格式转换pig, pceg, 等转换压缩,变换图片大小,实用,方便。
《Pig编程指南》不仅为初学者讲解ApachePig的基础知识,同时也向有一定使用经验的高级用户介绍更加综合全面的Pig重要特性,如PigLatin脚本语言、控制台shell交互命令以及用于对Pig进行拓展的用户自定义函数(UDF)等。...
pig udf,实现了 urldecode、 row_number、 tomap.版本使用cdh4.1.2,如果需要在别的版本中使用,请替换工程文件中的两个jar包,对应您需要的版本。
Hadoop系统的pig工具包,很好用的
《Hadoop系统搭建及项目实践》课件09分布式数据分析工具 Pig.pdf《Hadoop系统搭建及项目实践》课件09分布式数据分析工具 Pig.pdf《Hadoop系统搭建及项目实践》课件09分布式数据分析工具 Pig.pdf《Hadoop系统搭建及...
apache pig 基础及应用,urldecode row_number web日志分析 根据 用户行为 做出 简易的 相似度 判断。
PIG(国内微服务热度最高的一个社区)
3、增加站内搜索框功能,这样做站的朋友能使用PIGO做站内搜索了,也能卖PIGO搜索里面的广告了……等等。由于工程庞大,下个版本估计会在几个月后完成,大家淡定!最新消息,会公布在PHPig.net。 安装环境1、PHP5.0...
Learn to use Apache Pig to develop lightweight big data applications easily and quickly. This book shows you many optimization techniques and covers every context where Pig is used in big data ...
来试试Pig安装,与学学Pig Latin语言,玩玩应用案例
pig0.15源码,适合小白学习大数据参考和使用
pig将就的命令
大数据pig实战,大数据pig实战,大数据pig实战大数据pig实战大数据pig实战
Beginning Apache Pig: Big Data Processing Made Easy English | 29 Dec. 2016 | ISBN: 1484223365 | 300 Pages | PDF | 4.9 MB Learn to use Apache Pig to develop lightweight big data applications easily ...
pig官方的udf教程,介绍了 Writing Java UDFs Writing Python UDFs Writing JavaScript UDFs Writing Ruby UDFs Piggy Bank,一个开源pig的udf包,主要是java
Pig-Toolbox_v1.0.7.6 谷歌浏览器插件,CRX可解压,用开发者模式安装
pig学习 PPT