<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                [TOC] # MAPREDUCE加強 ## 1. 自定義inputFormat ### 1.1 需求 > 無論hdfs還是mapreduce,對于小文件都有損效率,實踐中,又難免面臨處理大量小文件的場景,此時,就需要有相應解決方案 ### 1.2 分析 > 小文件的優化無非以下幾種方式: 1) 在數據采集的時候,就將小文件或小批數據合成大文件再上傳HDFS 2) 在業務處理之前,在HDFS上使用mapreduce程序對小文件進行合并 3) 在mapreduce處理時,可采用combineInputFormat提高效率 ### 1.3 實現 > 本節實現的是上述第二種方式 > 程序的核心機制: > 自定義一個InputFormat > 改寫RecordReader,實現一次讀取一個完整文件封裝為KV > 在輸出時使用SequenceFileOutPutFormat輸出合并文件 > 代碼如下: > 自定義InputFromat ~~~ public class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> { //設置每個小文件不可分片,保證一個小文件生成一個key-value鍵值對 @Override protected boolean isSplitable(JobContext context, Path file) { return false; } @Override public RecordReader<NullWritable, BytesWritable> createRecordReader( InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { WholeFileRecordReader reader = new WholeFileRecordReader(); reader.initialize(split, context); return reader; } } ~~~ > 自定義RecordReader ~~~ class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> { private FileSplit fileSplit; private Configuration conf; private BytesWritable value = new BytesWritable(); private boolean processed = false; @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { this.fileSplit = (FileSplit) split; this.conf = context.getConfiguration(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (!processed) { byte[] contents = new byte[(int) fileSplit.getLength()]; Path file = fileSplit.getPath(); FileSystem fs = file.getFileSystem(conf); FSDataInputStream in = null; try { in = fs.open(file); IOUtils.readFully(in, contents, 0, contents.length); value.set(contents, 0, contents.length); } finally { IOUtils.closeStream(in); } processed = true; return true; } return false; } @Override public NullWritable getCurrentKey() throws IOException, InterruptedException { return NullWritable.get(); } @Override public BytesWritable getCurrentValue() throws IOException, InterruptedException { return value; } @Override public float getProgress() throws IOException { return processed ? 1.0f : 0.0f; } @Override public void close() throws IOException { // do nothing } } ~~~ > 定義mapreduce處理流程 ~~~ public class SmallFilesToSequenceFileConverter extends Configured implements Tool { static class SequenceFileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> { private Text filenameKey; @Override protected void setup(Context context) throws IOException, InterruptedException { InputSplit split = context.getInputSplit(); Path path = ((FileSplit) split).getPath(); filenameKey = new Text(path.toString()); } @Override protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException { context.write(filenameKey, value); } } @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration(); System.setProperty("HADOOP_USER_NAME", "hdfs"); String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: combinefiles <in> <out>"); System.exit(2); } Job job = Job.getInstance(conf,"combine small files to sequencefile"); // job.setInputFormatClass(WholeFileInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(BytesWritable.class); job.setMapperClass(SequenceFileMapper.class); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new SmallFilesToSequenceFileConverter(), args); System.exit(exitCode); } } ~~~ ## 2. 自定義outputFormat ### 2.1 需求 > 現有一些原始日志需要做增強解析處理,流程: 1) 從原始日志文件中讀取數據 2) 根據日志中的一個URL字段到外部知識庫中獲取信息增強到原始日志 3) 如果成功增強,則輸出到增強結果目錄;如果增強失敗,則抽取原始數據中URL字段輸出到待爬清單目錄 ### 2.2 分析 > 程序的關鍵點是要在一個mapreduce程序中根據數據的不同輸出兩類結果到不同目錄,這類靈活的輸出需求可以通過自定義outputformat來實現 ### 2.3 實現 > 實現要點: 1) 在mapreduce中訪問外部資源 2) 自定義outputformat,改寫其中的recordwriter,改寫具體輸出數據的方法write() > 代碼實現如下: > 數據庫獲取數據的工具 ~~~ public class DBLoader { public static void dbLoader(HashMap<String, String> ruleMap) { Connection conn = null; Statement st = null; ResultSet res = null; try { Class.forName("com.mysql.jdbc.Driver"); conn = DriverManager.getConnection("jdbc:mysql://hdp-node01:3306/urlknowledge", "root", "root"); st = conn.createStatement(); res = st.executeQuery("select url,content from urlcontent"); while (res.next()) { ruleMap.put(res.getString(1), res.getString(2)); } } catch (Exception e) { e.printStackTrace(); } finally { try{ if(res!=null){ res.close(); } if(st!=null){ st.close(); } if(conn!=null){ conn.close(); } }catch(Exception e){ e.printStackTrace(); } } } public static void main(String[] args) { DBLoader db = new DBLoader(); HashMap<String, String> map = new HashMap<String,String>(); db.dbLoader(map); System.out.println(map.size()); } } ~~~ > 自定義一個outputformat ~~~ public class LogEnhancerOutputFormat extends FileOutputFormat<Text, NullWritable>{ @Override public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { FileSystem fs = FileSystem.get(context.getConfiguration()); Path enhancePath = new Path("hdfs://hdp-node01:9000/flow/enhancelog/enhanced.log"); Path toCrawlPath = new Path("hdfs://hdp-node01:9000/flow/tocrawl/tocrawl.log"); FSDataOutputStream enhanceOut = fs.create(enhancePath); FSDataOutputStream toCrawlOut = fs.create(toCrawlPath); return new MyRecordWriter(enhanceOut,toCrawlOut); } static class MyRecordWriter extends RecordWriter<Text, NullWritable>{ FSDataOutputStream enhanceOut = null; FSDataOutputStream toCrawlOut = null; public MyRecordWriter(FSDataOutputStream enhanceOut, FSDataOutputStream toCrawlOut) { this.enhanceOut = enhanceOut; this.toCrawlOut = toCrawlOut; } @Override public void write(Text key, NullWritable value) throws IOException, InterruptedException { //有了數據,你來負責寫到目的地 —— hdfs //判斷,進來內容如果是帶tocrawl的,就往待爬清單輸出流中寫 toCrawlOut if(key.toString().contains("tocrawl")){ toCrawlOut.write(key.toString().getBytes()); }else{ enhanceOut.write(key.toString().getBytes()); } } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { if(toCrawlOut!=null){ toCrawlOut.close(); } if(enhanceOut!=null){ enhanceOut.close(); } } } } ~~~ > 開發mapreduce處理流程 ~~~ /** * 這個程序是對每個小時不斷產生的用戶上網記錄日志進行增強(將日志中的url所指向的網頁內容分析結果信息追加到每一行原始日志后面) * * @author * */ public class LogEnhancer { static class LogEnhancerMapper extends Mapper<LongWritable, Text, Text, NullWritable> { HashMap<String, String> knowledgeMap = new HashMap<String, String>(); /** * maptask在初始化時會先調用setup方法一次 利用這個機制,將外部的知識庫加載到maptask執行的機器內存中 */ @Override protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException, InterruptedException { DBLoader.dbLoader(knowledgeMap); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = StringUtils.split(line, "\t"); try { String url = fields[26]; // 對這一行日志中的url去知識庫中查找內容分析信息 String content = knowledgeMap.get(url); // 根據內容信息匹配的結果,來構造兩種輸出結果 String result = ""; if (null == content) { // 輸往待爬清單的內容 result = url + "\t" + "tocrawl\n"; } else { // 輸往增強日志的內容 result = line + "\t" + content + "\n"; } context.write(new Text(result), NullWritable.get()); } catch (Exception e) { } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(LogEnhancer.class); job.setMapperClass(LogEnhancerMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 要將自定義的輸出格式組件設置到job中 job.setOutputFormatClass(LogEnhancerOutputFormat.class); FileInputFormat.setInputPaths(job, new Path(args[0])); // 雖然我們自定義了outputformat,但是因為我們的outputformat繼承自fileoutputformat // 而fileoutputformat要輸出一個_SUCCESS文件,所以,在這還得指定一個輸出目錄 FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); System.exit(0); } } ~~~ ## 3. 自定義GroupingComparator ### 3.1 需求 > 有如下訂單數據 | 訂單id | 商品id | 成交金額 | --- | --- |--- | | Order_0000001| Pdt_01| 222.8| | Order_0000001| Pdt_05| 25.8| | Order_0000002| Pdt_03| 522.8| | Order_0000002| Pdt_04| 122.4| | Order_0000002| Pdt_05| 722.4| | Order_0000003| Pdt_01| 222.8| > 現在需要求出每一個訂單中成交金額最大的一筆交易 ### 3.2 分析 1) 利用“訂單id和成交金額”作為key,可以將map階段讀取到的所有訂單數據按照id分區,按照金額排序,發送到reduce 2) 在reduce端利用groupingcomparator將訂單id相同的kv聚合成組,然后取第一個即是最大值 ### 3.3 實現 > 自定義groupingcomparator ~~~ /** * 用于控制shuffle過程中reduce端對kv對的聚合邏輯 * @author duanhaitao@itcast.cn * */ public class ItemidGroupingComparator extends WritableComparator { protected ItemidGroupingComparator() { super(OrderBean.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { OrderBean abean = (OrderBean) a; OrderBean bbean = (OrderBean) b; //將item_id相同的bean都視為相同,從而聚合為一組 return abean.getItemid().compareTo(bbean.getItemid()); } } ~~~ > 定義訂單信息bean ~~~ /** * 訂單信息bean,實現hadoop的序列化機制 * @author duanhaitao@itcast.cn * */ public class OrderBean implements WritableComparable<OrderBean>{ private Text itemid; private DoubleWritable amount; public OrderBean() { } public OrderBean(Text itemid, DoubleWritable amount) { set(itemid, amount); } public void set(Text itemid, DoubleWritable amount) { this.itemid = itemid; this.amount = amount; } public Text getItemid() { return itemid; } public DoubleWritable getAmount() { return amount; } @Override public int compareTo(OrderBean o) { int cmp = this.itemid.compareTo(o.getItemid()); if (cmp == 0) { cmp = -this.amount.compareTo(o.getAmount()); } return cmp; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(itemid.toString()); out.writeDouble(amount.get()); } @Override public void readFields(DataInput in) throws IOException { String readUTF = in.readUTF(); double readDouble = in.readDouble(); this.itemid = new Text(readUTF); this.amount= new DoubleWritable(readDouble); } @Override public String toString() { return itemid.toString() + "\t" + amount.get(); } } ~~~ > 編寫mapreduce處理流程 ~~~ /** * 利用secondarysort機制輸出每種item訂單金額最大的記錄 * @author duanhaitao@itcast.cn * */ public class SecondarySort { static class SecondarySortMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{ OrderBean bean = new OrderBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = StringUtils.split(line, "\t"); bean.set(new Text(fields[0]), new DoubleWritable(Double.parseDouble(fields[1]))); context.write(bean, NullWritable.get()); } } static class SecondarySortReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{ //在設置了groupingcomparator以后,這里收到的kv數據 就是: <1001 87.6>,null <1001 76.5>,null .... //此時,reduce方法中的參數key就是上述kv組中的第一個kv的key:<1001 87.6> //要輸出同一個item的所有訂單中最大金額的那一個,就只要輸出這個key @Override protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(SecondarySort.class); job.setMapperClass(SecondarySortMapper.class); job.setReducerClass(SecondarySortReducer.class); job.setOutputKeyClass(OrderBean.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //指定shuffle所使用的GroupingComparator類 job.setGroupingComparatorClass(ItemidGroupingComparator.class); //指定shuffle所使用的partitioner類 job.setPartitionerClass(ItemIdPartitioner.class); job.setNumReduceTasks(3); job.waitForCompletion(true); } } ~~~ ## 4. Mapreduce中的DistributedCache應用 ### 4.1 Map端join案例 #### 4.1.1 需求 > 實現兩個“表”的join操作,其中一個表數據量小,一個表很大,這種場景在實際中非常常見,比如“訂單日志” join “產品信息” #### 4.1.2 分析 > --原理闡述 1. 適用于關聯表中有小表的情形; 2. 可以將小表分發到所有的map節點,這樣,map節點就可以在本地對自己所讀到的大表數據進行join并輸出最終結果 3. 可以大大提高join操作的并發度,加快處理速度 1. --示例:先在mapper類中預先定義好小表,進行join 2. --并用distributedcache機制將小表的數據分發到每一個maptask執行節點,從而每一個maptask節點可以從本地加載到小表的數據,進而在本地即可實現join #### 4.1.3 實現 ~~~ public class TestDistributedCache { static class TestDistributedCacheMapper extends Mapper<LongWritable, Text, Text, Text>{ FileReader in = null; BufferedReader reader = null; HashMap<String,String> b_tab = new HashMap<String, String>(); String localpath =null; String uirpath = null; //是在map任務初始化的時候調用一次 @Override protected void setup(Context context) throws IOException, InterruptedException { //通過這幾句代碼可以獲取到cache file的本地絕對路徑,測試驗證用 Path[] files = context.getLocalCacheFiles(); localpath = files[0].toString(); URI[] cacheFiles = context.getCacheFiles(); //緩存文件的用法——直接用本地IO來讀取 //這里讀的數據是map task所在機器本地工作目錄中的一個小文件 in = new FileReader("b.txt"); reader =new BufferedReader(in); String line =null; while(null!=(line=reader.readLine())){ String[] fields = line.split(","); b_tab.put(fields[0],fields[1]); } IOUtils.closeStream(reader); IOUtils.closeStream(in); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //這里讀的是這個map task所負責的那一個切片數據(在hdfs上) String[] fields = value.toString().split("\t"); String a_itemid = fields[0]; String a_amount = fields[1]; String b_name = b_tab.get(a_itemid); // 輸出結果 1001 98.9 banan context.write(new Text(a_itemid), new Text(a_amount + "\t" + ":" + localpath + "\t" +b_name )); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(TestDistributedCache.class); job.setMapperClass(TestDistributedCacheMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //這里是我們正常的需要處理的數據所在路徑 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //不需要reducer job.setNumReduceTasks(0); //分發一個文件到task進程的工作目錄 job.addCacheFile(new URI("hdfs://hadoop-server01:9000/cachefile/b.txt")); //分發一個歸檔文件到task進程的工作目錄 // job.addArchiveToClassPath(archive); //分發jar包到task節點的classpath下 // job.addFileToClassPath(jarfile); job.waitForCompletion(true); } } ~~~ ## 5. Mapreduce的其他補充 ### 5.1 計數器應用 > 在實際生產代碼中,常常需要將數據處理過程中遇到的不合規數據行進行全局計數,類似這種需求可以借助mapreduce框架中提供的全局計數器來實現 > 示例代碼如下: ~~~ public class MultiOutputs { //通過枚舉形式定義自定義計數器 enum MyCounter{MALFORORMED,NORMAL} static class CommaMapper extends Mapper<LongWritable, Text, Text, LongWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] words = value.toString().split(","); for (String word : words) { context.write(new Text(word), new LongWritable(1)); } //對枚舉定義的自定義計數器加1 context.getCounter(MyCounter.MALFORORMED).increment(1); //通過動態設置自定義計數器加1 context.getCounter("counterGroupa", "countera").increment(1); } } ~~~ ? ### 5.2 多job串聯 > 一個稍復雜點的處理邏輯往往需要多個mapreduce程序串聯處理,多job的串聯可以借助mapreduce框架的JobControl實現 > 示例代碼: ~~~ ControlledJob cJob1 = new ControlledJob(job1.getConfiguration()); ControlledJob cJob2 = new ControlledJob(job2.getConfiguration()); ControlledJob cJob3 = new ControlledJob(job3.getConfiguration()); cJob1.setJob(job1); cJob2.setJob(job2); cJob3.setJob(job3); // 設置作業依賴關系 cJob2.addDependingJob(cJob1); cJob3.addDependingJob(cJob2); JobControl jobControl = new JobControl("RecommendationJob"); jobControl.addJob(cJob1); jobControl.addJob(cJob2); jobControl.addJob(cJob3); // 新建一個線程來運行已加入JobControl中的作業,開始進程并等待結束 Thread jobControlThread = new Thread(jobControl); jobControlThread.start(); while (!jobControl.allFinished()) { Thread.sleep(500); } jobControl.stop(); return 0; ~~~ ### 5.3 Configuration對象高級應用 ## 6. mapreduce參數優化 ### 1.MapReduce重要配置參數 #### 1.1 資源相關參數 > //以下參數是在用戶自己的mr應用程序中配置就可以生效 1. mapreduce.map.memory.mb: 一個Map Task可使用的資源上限(單位:MB),默認為1024。如果Map Task實際使用的資源量超過該值,則會被強制殺死。 2. mapreduce.reduce.memory.mb: 一個Reduce Task可使用的資源上限(單位:MB),默認為1024。如果Reduce Task實際使用的資源量超過該值,則會被強制殺死。 3. mapreduce.map.cpu.vcores: 每個Map task可使用的最多cpu core數目, 默認值: 1 4. mapreduce.reduce.cpu.vcores: 每個Reduce task可使用的最多cpu core數目, 默認值: 1 5. mapreduce.map.java.opts: Map Task的JVM參數,你可以在此配置默認的java heap size等參數, e.g.“-Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc” (@taskid@會被Hadoop框架自動換為相應的taskid), 默認值: “” 6. mapreduce.reduce.java.opts: Reduce Task的JVM參數,你可以在此配置默認的java heap size等參數, e.g.“-Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc”, 默認值: “” > //應該在yarn啟動之前就配置在服務器的配置文件中才能生效 7) yarn.scheduler.minimum-allocation-mb 1024 給應用程序container分配的最小內存 8) yarn.scheduler.maximum-allocation-mb 8192 給應用程序container分配的最大內存 9) yarn.scheduler.minimum-allocation-vcores 1 10) yarn.scheduler.maximum-allocation-vcores 32 11) yarn.nodemanager.resource.memory-mb 8192 > //shuffle性能優化的關鍵參數,應在yarn啟動之前就配置好 12) mapreduce.task.io.sort.mb 100 //shuffle的環形緩沖區大小,默認100m 13) mapreduce.map.sort.spill.percent 0.8 //環形緩沖區溢出的閾值,默認80% #### 1.2 容錯相關參數 1) mapreduce.map.maxattempts: 每個Map Task最大重試次數,一旦重試參數超過該值,則認為Map Task運行失敗,默認值:4。 2) mapreduce.reduce.maxattempts: 每個Reduce Task最大重試次數,一旦重試參數超過該值,則認為Map Task運行失敗,默認值:4。 3) mapreduce.map.failures.maxpercent: 當失敗的Map Task失敗比例超過該值為,整個作業則失敗,默認值為0. 如果你的應用程序允許丟棄部分輸入數據,則該該值設為一個大于0的值,比如5,表示如果有低于5%的Map Task失敗(如果一個Map Task重試次數超過mapreduce.map.maxattempts,則認為這個Map Task失敗,其對應的輸入數據將不會產生任何結果),整個作業扔認為成功。 4) mapreduce.reduce.failures.maxpercent: 當失敗的Reduce Task失敗比例超過該值為,整個作業則失敗,默認值為0. 5) mapreduce.task.timeout: Task超時時間,經常需要設置的一個參數,該參數表達的意思為:如果一個task在一定時間內沒有任何進入,即不會讀取新的數據,也沒有輸出數據,則認為該task處于block狀態,可能是卡住了,也許永遠會卡主,為了防止因為用戶程序永遠block住不退出,則強制設置了一個該超時時間(單位毫秒),默認是300000。如果你的程序對每條輸入數據的處理時間過長(比如會訪問數據庫,通過網絡拉取數據等),建議將該參數調大,該參數過小常出現的錯誤提示是“AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster.”。 #### 1.3 本地運行mapreduce 作業 > 設置以下幾個參數: > mapreduce.framework.name=local > mapreduce.jobtracker.address=local > fs.defaultFS=local #### 1.4 效率和穩定性相關參數 1) mapreduce.map.speculative: 是否為Map Task打開推測執行機制,默認為false 2) mapreduce.reduce.speculative: 是否為Reduce Task打開推測執行機制,默認為false 3) mapreduce.job.user.classpath.first & mapreduce.task.classpath.user.precedence:當同一個class同時出現在用戶jar包和hadoop jar中時,優先使用哪個jar包中的class,默認為false,表示優先使用hadoop jar中的class。 4) mapreduce.input.fileinputformat.split.minsize: FileInputFormat做切片時的最小切片大小,(5)mapreduce.input.fileinputformat.split.maxsize: FileInputFormat做切片時的最大切片大小(切片的默認大小就等于blocksize,即 134217
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看