很多時候我們會自定義一些讀取資料的格式,而hadoop默認的RecordReader是LineRecordReader
會一行一行的讀取資料,key為偏移量,value為整行內容
現在嘗試客製化一個新的RecordReader,在此使用LineRecrodReader為基礎
並使用上一篇BuyPhoneJob的運算結果封裝成BuyPhoneBean,實現RecordReader後,可自定義資料格式傳給map
第一步建立BuyPhoneBean,一樣implements Writable,做序列化和反序列化
customerIds雖然為多個值組成,但在這裡當整個字串處理
procuctName | productPrice | customerIds |
---|---|---|
iPhone 8 Plus | 449.97 | 001,026,022,015,007 |
iPhone 7 | 207.0 | 004,029,024,018,010 |
… | … | … |
1 | public class BuyPhoneBean implements Writable { |
第二步自定義RecordReader,讀取依舊是使用LineRecordReader,只是將結果改為key是productName,而value是BuyPhoneBean
在繼承RecordReader後需要Override一些methods,運作方式會在之後做說明
這裡需要注意的是LineRecordReader使用完後在close method上進行關閉
1 | public class BuyPhoneRecordReader extends RecordReader<Text, BuyPhoneBean> { |
運作方式可看hadoop Mapper的source code (org.apache.hadoop.mapreduce.Mapper)
這裡擷取裡面136~151行的code,這裡可以看到 while(context.netKeyValue) 就是執行RecordReader的nextKeyValue method
而 map(context.getCurrentKey(), context.getCurrentValue(), context) 就是執行RecordReader的getCurrentKey()和getCurrentValue() method
在每次執行context.nextKeyValue()時判斷是否有下筆資料,因為我是借用LineRecordReader讀資料,所以直接使用它的nextKeyValue
只是將資料重新封裝成key是productName,value是BuyPhoneBean
1 | /** |
第三步將客製化的RecordReader覆寫到InputFormat,才可以讓hadoop使用自己做的RecordReader
1 | public class BuyPhoneInputFormat extends FileInputFormat<Text, BuyPhoneBean> { |
執行job前需要指定setInputFormatClass為我們上一步建立的BuyPhoneInputFormat
1 | public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { |
這裡mapper與reducer沒做特別處理,只是單純的輸出,主要還是測試自己寫的RecordReader