<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                [TOC] # 簡介 Hive對文件中的分隔符默認情況下只支持單字節分隔符,,默認單字符是`\001`。當然你也可以在創建表格時指定數據的分割符號。但是如果數據文件中的分隔符是多字符的,如下圖: ~~~ 01||zhangsan 02||lisi 03||wangwu ~~~ 我們創建表 ~~~ hive> create table t_test(id string,name string) > row format delimited > fields terminated by '||'; ~~~ 載入上面的數據 我們來查詢看下 ~~~ hive> select * from t_test; OK 01 02 03 ~~~ 發現后面的數據沒了,是因為hive默認只支持單字符分割 **補充:hive讀取數據的機制** 1. 首先用inputformat的一個具體的實現類讀取文件數據,返回一條條的記錄(可以是行,或者是你邏輯中的“行") 2. 然后利用SerDe<默認:org.apache.hadoop.hive.serde2.LazySimpleSerDe>的一個具體的實現類,對上面返回的一條條記錄進行字段切割 # RegexSerDe 使用RegexSerDe通過正則表達式來抽取字段 1. 建表 ~~~ create table t_bi_reg(id string,name string) row format serde 'org.apache.hadoop.hive.serde2.RegexSerDe' with serdeproperties( 'input.regex'='(.*)\\|\\|(.*)', 'output.format.string'='%1$s%2$s' ) stored as textfile; ~~~ `(.*)`表示匹配任意字符 `%1`正則表達第一個匹配的字符 2. 加載數據 ~~~ 01||zhangsan 02||lisi 03||wangwu load data local inpath '/root/lianggang.txt' into table t_bi_reg; ~~~ 3. 查詢 ~~~ hive> select * from t_bi_reg; OK 01 zhangsan 02 lisi 03 wangwu ~~~ # 自定義inputformat 通過自定義inputformat解決特殊分隔符問題 其原理是在inputformat讀取行的時候將數據中的“多字節分隔符”替換為hive默認的分隔符(ctrl+A 亦即 `\001`)或用于替代的單字符分隔符。以便hive在serde操作的時候按照默認的單字節分隔符進行字段抽取 查看源碼 ![](https://box.kancloud.cn/a0678de28fe6d70ad8863af0c85fedc7_1542x774.png) ## 第一種 ~~~ package com.hive; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.LineRecordReader; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; public class BiDelimiterInputFormat extends TextInputFormat { @Override public RecordReader<LongWritable, Text> getRecordReader(InputSplit genericSplit, JobConf job, Reporter reporter)throws IOException { reporter.setStatus(genericSplit.toString()); MyDemoRecordReader reader = new MyDemoRecordReader(new LineRecordReader(job, (FileSplit) genericSplit)); // BiRecordReader reader = new BiRecordReader(job, (FileSplit)genericSplit); return reader; } public static class MyDemoRecordReader implements RecordReader<LongWritable, Text> { LineRecordReader reader; Text text; //LineRecordReader給他重新賦值 public MyDemoRecordReader(LineRecordReader reader) { this.reader = reader; text = reader.createValue(); } //下面都調用原來的方法 @Override public void close() throws IOException { reader.close(); } @Override public LongWritable createKey() { return reader.createKey(); } @Override public Text createValue() { return new Text(); } @Override public long getPos() throws IOException { return reader.getPos(); } @Override public float getProgress() throws IOException { return reader.getProgress(); } @Override public boolean next(LongWritable key, Text value) throws IOException { //text是上面定義的,給他重新賦值 boolean next = reader.next(key, text); if(next){ //把text的里面2個||替換成一個|,注意要轉義 String replaceText = text.toString().replaceAll("\\|\\|", "\\|"); //替換完成之后重新set進去 value.set(replaceText); } return next; } } } ~~~ 1. 打包成jar,放到`$HIVE_HOME/lib`下 2. 建表指明自定義的inputformat inputformat是自己定義的 outputformat是默認的 ~~~ //查看默認,后面是表名 desc formatted t_test; ~~~ ~~~ create table t_lianggang(id string,name string) row format delimited fields terminated by '|' stored as inputformat 'com.hive.BiDelimiterInputFormat' outputformat 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'; ~~~ 3. 加載數據 ~~~ 01||zhangsan 02||lisi 03||wangwu load data local inpath '/root/lianggang.txt' into table t_lianggang; ~~~ 4. 查詢 ~~~ hive> select * from t_lianggang; OK 01 zhangsan 02 lisi 03 wangwu ~~~ ## 第二種 ~~~ package com.hive; import java.io.IOException; import java.io.InputStream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.io.compress.Decompressor; import org.apache.hadoop.io.compress.SplitCompressionInputStream; import org.apache.hadoop.io.compress.SplittableCompressionCodec; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.LineRecordReader; import org.apache.hadoop.mapred.RecordReader; public class BiRecordReader implements RecordReader<LongWritable, Text> { private static final Log LOG = LogFactory.getLog(LineRecordReader.class.getName()); private CompressionCodecFactory compressionCodecs = null; private long start; private long pos; private long end; private LineReader in; int maxLineLength; private Seekable filePosition; private CompressionCodec codec; private Decompressor decompressor; /** * A class that provides a line reader from an input stream. * * @deprecated Use {@link org.apache.hadoop.util.LineReader} instead. */ @Deprecated public static class LineReader extends org.apache.hadoop.util.LineReader { LineReader(InputStream in) { super(in); } LineReader(InputStream in, int bufferSize) { super(in, bufferSize); } public LineReader(InputStream in, Configuration conf) throws IOException { super(in, conf); } } public BiRecordReader(Configuration job, FileSplit split) throws IOException { this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE); start = split.getStart(); end = start + split.getLength(); final Path file = split.getPath(); compressionCodecs = new CompressionCodecFactory(job); codec = compressionCodecs.getCodec(file); // open the file and seek to the start of the split FileSystem fs = file.getFileSystem(job); FSDataInputStream fileIn = fs.open(split.getPath()); if (isCompressedInput()) { decompressor = CodecPool.getDecompressor(codec); if (codec instanceof SplittableCompressionCodec) { final SplitCompressionInputStream cIn = ((SplittableCompressionCodec) codec).createInputStream(fileIn, decompressor, start, end, SplittableCompressionCodec.READ_MODE.BYBLOCK); in = new LineReader(cIn, job); start = cIn.getAdjustedStart(); end = cIn.getAdjustedEnd(); filePosition = cIn; // take pos from compressed stream } else { in = new LineReader(codec.createInputStream(fileIn, decompressor), job); filePosition = fileIn; } } else { fileIn.seek(start); in = new LineReader(fileIn, job); filePosition = fileIn; } // If this is not the first split, we always throw away first record // because we always (except the last split) read one extra line in // next() method. if (start != 0) { start += in.readLine(new Text(), 0, maxBytesToConsume(start)); } this.pos = start; } private boolean isCompressedInput() { return (codec != null); } private int maxBytesToConsume(long pos) { return isCompressedInput() ? Integer.MAX_VALUE : (int) Math.min(Integer.MAX_VALUE, end - pos); } private long getFilePosition() throws IOException { long retVal; if (isCompressedInput() && null != filePosition) { retVal = filePosition.getPos(); } else { retVal = pos; } return retVal; } public BiRecordReader(InputStream in, long offset, long endOffset, int maxLineLength) { this.maxLineLength = maxLineLength; this.in = new LineReader(in); this.start = offset; this.pos = offset; this.end = endOffset; this.filePosition = null; } public BiRecordReader(InputStream in, long offset, long endOffset, Configuration job) throws IOException { this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE); this.in = new LineReader(in, job); this.start = offset; this.pos = offset; this.end = endOffset; this.filePosition = null; } public LongWritable createKey() { return new LongWritable(); } public Text createValue() { return new Text(); } /** * Read a line. */ //要修改的地方 public synchronized boolean next(LongWritable key, Text value) throws IOException { // We always read one extra line, which lies outside the upper // split limit i.e. (end - 1) while (getFilePosition() <= end) { key.set(pos); int newSize = in.readLine(value, maxLineLength, Math.max(maxBytesToConsume(pos), maxLineLength)); if (newSize == 0) { return false; } //把要替換的||轉為| String str = value.toString().replaceAll("\\|\\|", "\\|"); value.set(str); pos += newSize; if (newSize < maxLineLength) { return true; } // line too long. try again LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize)); } return false; } /** * Get the progress within the split */ public float getProgress() throws IOException { if (start == end) { return 0.0f; } else { return Math.min(1.0f, (getFilePosition() - start) / (float) (end - start)); } } public synchronized long getPos() throws IOException { return pos; } public synchronized void close() throws IOException { try { if (in != null) { in.close(); } } finally { if (decompressor != null) { CodecPool.returnDecompressor(decompressor); } } } } ~~~ # 總結 字段少用正則好 字段多的話,用正則性能消耗大些
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看