1.ETL概念
ETL,是英文 Extract-Transform-Load 的缩写,用来描述将数据从来源端经过抽取(extract)、交互转换(transform)、加载(load)至目的端的过程。2.项目目标:
本次项目侧重于数据的整合(即将文件中数据进行清洗成为干净的结构化的数据保存在Hive中)。
1、将微博数据爬取下来存储在数据文件中
user-->用户信息comment-->对应每个用户的微博内容2、格式化数据保证能够load 到表hive中3、原始数据结构:
(1) weibodata(文件夹)---房地产(文件夹)---content(文件夹)---以uid为名的txt
---user(文件夹)----以uid为名的txt
其他文件夹桶房地产相同
(2)user文件夹下 以uid为名的TXT内容格式
User [id=1107717945, screenName=null, name=null, province=0, city=0, location=null, description=null, url=null, profileImageUrl=null, userDomain=null, gender=null, followersCount=0, friendsCount=0, statusesCount=0, favouritesCount=0, createdAt=null, following=false, verified=false, verifiedType=0, allowAllActMsg=false, allowAllComment=false, followMe=false, avatarLarge=null, onlineStatus=0, status=null, biFollowersCount=0, remark=房地产, lang=null, verifiedReason=null]
(3)content文件夹下 以uid为名的TXT内容格式
<comment>
<content>2014年可以使用,中国中心在64层到69层共2万平米 //@禾臣薛朝阳:#中国中心#也盖出来了?@许立-VANTONE</content> <time>2012-4-3 11:30:08</time> <repostsCount>8</repostsCount> <commentsCount>4</commentsCount> </comment>
3.项目思路
2.1.原始数据文件解析成数据对象 2.2.数据对象序列化到输出数据文件中 数据文件-->User {} --> UserPojo对象 -->(获取)List<UserPojo> --> fileUtil\IOUtil -->Content{} -->ContentPojo对象 -->(获取)LIst<ContentPojo> --> fileUtil\IOUtil 3.Hive过程 -- (规范四个步骤:config、create、udf、deal) 3.1 创建表 -- User和Content 3.2 加载数据到User和Content中 4.数据结构化完成!4.项目分析 1.源数据文件 -- 数据格式 查看爬取的微博数据格式:User和Content 2.写Java程序对源数据文件处理,生成"结构化"的对象,并按照"结构化"格式输出到文件中。 写Java代码,进行结构化: 2.1.需要对文件操作、需要IO流读写、可能会对日期进行处理、需要解析XML格式文件。 编写对应的工具类 2.2.需要将数据文件结构化成POJO对象: User文件 -- 创建UserInfo类 Content文件 -- 创建ContentInfo类 2.3.数据进行结构化,需要管理结构化的流程: DataLoadManager类:提供多个方法,按步骤进行格式化。 读取文件 -> 解析成User对象或Content对象 -> 结构化写入到输出文件中 2.4.提供一个系统控制类,方便统一开启任务执行。 SystemController类 -- 开关类
3.将生成的输出文件,加载到Hive表中。
3.1.规范四个步骤:config、create、udf、deal 3.2.编写脚本5.项目开发 先从项目规范上: 4.1.config:编写环境变量的脚本 4.2.create:编写建表脚本 4.3.udf:存放我们Java编写的"生成结构化数据的Jar包" // 新建Maven项目 - 数据结构化程序开发 - 打Jar包 //通过命令java -jar jar包 inputDir //得到:在执行命令的目录下会得到两个数据文件user.txt 和content.txt 4.4.deal:处理数据的脚本6.Jar包开发项目架构: (1)controller --->systemController->系统控制(用户的总入口)
(2)manager --->DataLoadManger--管理数据结构化的流程:(数据流的串联) (读取原文件->对象->输出到文件) (3)pojo --数据模型:User Content文件 (4)util--工具类:FileOperatorUtil\IOUtil\DateUtil\XMLParseUtil(或者正则)7.项目代码:
(1)Util包
package com.tl.job002.utils;import java.io.BufferedReader;import java.io.File;import java.io.FileInputStream;import java.io.FileOutputStream;import java.io.FileWriter;import java.io.InputStreamReader;import java.util.ArrayList;import java.util.List;public class IOUtil { /** * 文件按行读到list * @param txtFilePath * @param charset * @return * @throws Exception */ public static ListgetTxtContent(String txtFilePath, String charset) throws Exception { File txtFile = new File(txtFilePath); FileInputStream fis = new FileInputStream(txtFile); InputStreamReader isr = new InputStreamReader(fis, charset); BufferedReader br = new BufferedReader(isr); List lineList = new ArrayList (); String tempLine = null; //将读取的每一行放入list 中 //即每一行comment都是一个List对象 while ((tempLine = br.readLine()) != null) { lineList.add(tempLine); } br.close(); return lineList; }/** * * @param lineList * @param outputFilePath * @param charset * @return * @throws Exception */ public static boolean writeListToFile(List lineList, String outputFilePath, String charset) throws Exception { File outputFile = new File(outputFilePath); FileOutputStream fos = new FileOutputStream(outputFile); int lineCounter = 0; for (String line : lineList) { if (lineCounter > 0) { //先判断下一行有内容,在输出换行符,防止输出内容就立刻输出换行符导致最后多一个换行符 //第一行不会输出,当第二行才会输出,最后一行也不会输出 fos.write('\n'); } fos.write(line.getBytes(charset)); lineCounter++; } fos.close(); return true; } public static boolean writeListToFile(String txtContent, String outputFilePath, String charset) throws Exception { File outputFile = new File(outputFilePath); FileOutputStream fos = new FileOutputStream(outputFile); fos.write(txtContent.getBytes(charset)); fos.close(); return true; } public static void main(String[] args) throws Exception { // String txtFilePath = "房地产\\user\\2297199692.txt"; String txtFilePath = "房地产\\content\\1484018951.txt"; String inputCharset = "gbk"; String outputCharset = "utf-8"; String outputFilePath = "newFile.txt"; List lineList = getTxtContent(txtFilePath, inputCharset); for (String tempLine : lineList) { System.out.println(tempLine); } writeListToFile(lineList, outputFilePath, outputCharset); }}
package com.tl.job002.utils;import java.io.File;import java.util.ArrayList;import java.util.List;public class FileOperatorUtil { public static ListgetAllSubNormalFilePath(String filePath) { File file = new File(filePath); List resultList = new ArrayList (); // 如果是目录,则往下一层 if (file.isDirectory()) { for (File tempFile : file.listFiles()) { //将该直接子文件直接写到list中 //注意:这里使用递归的方式,返回一级目录的全路径 resultList.addAll(getAllSubNormalFilePath(tempFile.toString())); } } else { resultList.add(file.toString()); } return resultList; } /** * 得到不带后缀的文件名 * @param inputPath * @return */ public static String getFileNameWithoutSuffix(String inputPath){ //new File(inputPath).getName().split("\\.") return new File(inputPath).getName().split("\\.")[0]; } public static void main(String[] args) { String inputPath="房地产\\user\\1855569733.txt"; System.out.println(); }}
package com.tl.job002.utils;import java.util.List;public class StringUtil { public static String join(List
package com.tl.job002.utils;import java.text.ParseException;import java.text.SimpleDateFormat;import java.util.Date;public class DateUtil { static SimpleDateFormat dateFormat = new SimpleDateFormat( "yyyy-MM-dd hh:mm:ss"); public static Date getDate(String dateString) throws ParseException { return dateFormat.parse(dateString); } public static String formatDate(Date date){ return dateFormat.format(date); } public static void main(String[] args) { Date date=new Date(); System.out.println(formatDate(date)); }}
package com.tl.job002.utils;import java.awt.print.Book;import java.io.File;import java.io.StringReader;import java.util.ArrayList;import java.util.Iterator;import java.util.List;import org.dom4j.Attribute;import org.dom4j.Document;import org.dom4j.DocumentException;import org.dom4j.Element;import org.dom4j.io.SAXReader;public class XmlParserUtil { private static ArrayListbookList = new ArrayList (); public static void printXML(String xmlPath) { // 解析books.xml文件 // 创建SAXReader的对象reader SAXReader reader = new SAXReader(); try { // 通过reader对象的read方法加载books.xml文件,获取docuemnt对象。 Document document = reader.read(new File(xmlPath)); // 通过document对象获取根节点bookstore Element bookStore = document.getRootElement(); // 通过element对象的elementIterator方法获取迭代器 Iterator it = bookStore.elementIterator(); // 遍历迭代器,获取根节点中的信息(书籍) while (it.hasNext()) { Element book = (Element) it.next(); System.out.println("节点名:" + book.getName() + "--节点值:" + book.getStringValue()); } } catch (DocumentException e) { e.printStackTrace(); } } public static Element getXmlRootElement(File xmlFile) { // 解析books.xml文件 // 创建SAXReader的对象reader SAXReader reader = new SAXReader(); try { // 通过reader对象的read方法加载books.xml文件,获取docuemnt对象。 Document document = reader.read(xmlFile); // 通过document对象获取根节点bookstore Element bookStore = document.getRootElement(); return bookStore; } catch (DocumentException e) { // TODO Auto-generated catch block e.printStackTrace(); } return null; } public static Element getXmlRootElement(String xmlContent, boolean isFilter) { SAXReader reader = new SAXReader(); try { if (isFilter) { xmlContent = xmlContent.replace("&", "&") .replace("'", "'").replace("\"", """); } StringReader stringReader = new StringReader(xmlContent); Document document = reader.read(stringReader); // 通过document对象获取根节点bookstore Element rootElement = document.getRootElement(); return rootElement; } catch (DocumentException e) { // e.printStackTrace(); } return null; } /** * @param args */ public static void main(String[] args) { // String xmlContent = // " "; String xmlContent = " 回复@雷亚雷:在哈佛或MIT,每天都有各种议题的小型研讨会,凡在中午或晚餐时间举行的,一般都会准备便餐。即使如此,有时也会忽略。 //@雷亚雷:王老师,不吃午饭吗? 574 290 "; xmlContent = xmlContent.replace("&", "&").replace("'", "'") .replace("\"", """); Element rootElement = getXmlRootElement(xmlContent, true); System.out.println(rootElement.elementText("content")); }} 誠意推介加拿大殿堂級音樂大師David Foster 的演唱會DVD《Hit Man Returns: David Foster & Friends》!演唱者包括Earth, Wind & Fire、Michael Bolton及Donna Summer等等,全都是星光熠熠的唱家班!就連只得11歲的America's Got Talent參加者Jackie Evancho的女高音亦非常震撼人心,金曲聽出耳油! 1 7
(2)pojos包
package com.tl.job002.pojos;import java.util.ArrayList;import java.util.Date;import java.util.List;import com.tl.job002.utils.StringUtil;public class WbUserInfoPojo { private long uid; private String screenName; private String name; private int province; private int city; private String location; private String description; private String userDomain; private String gender; private int followersCount; private int friendsCount; private int statusesCount; private int favouritesCount; private Date createdAt; private boolean verified; private String remark; private String verifiedReason; @Override public String toString() { return "WbUserInfoPojo [uid=" + uid + ", screenName=" + screenName + ", name=" + name + ", province=" + province + ", city=" + city + ", location=" + location + ", description=" + description + ", userDomain=" + userDomain + ", gender=" + gender + ", followersCount=" + followersCount + ", friendsCount=" + friendsCount + ", statusesCount=" + statusesCount + ", favouritesCount=" + favouritesCount + ", createdAt=" + createdAt + ", verified=" + verified + ", remark=" + remark + ", verifiedReason=" + verifiedReason + "]"; } public String toString4FileOutput() { List
package com.tl.job002.pojos;import java.util.ArrayList;import java.util.Date;import java.util.List;import com.tl.job002.utils.DateUtil;import com.tl.job002.utils.StringUtil;public class WbContentInfoPojo { private long uid; private String content; private Date time; private int repostsCount; private int commentsCount; /** * 为了输出到文件,可以写一个指定输出格式的方法 * 目的:让对象以指定字符串格式输出到文件中 */ public String toString4FileOutput() { ListfieldList = new ArrayList (); fieldList.add(uid); fieldList.add(content); fieldList.add(DateUtil.formatDate(time)); fieldList.add(repostsCount); fieldList.add(commentsCount); //将List按照指定分隔符进行拼接 return StringUtil.join(fieldList, "\001"); } public long getUid() { return uid; } public void setUid(long uid) { this.uid = uid; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } public Date getTime() { return time; } public void setTime(Date time) { this.time = time; } public int getRepostsCount() { return repostsCount; } public void setRepostsCount(int repostsCount) { this.repostsCount = repostsCount; } public int getCommentsCount() { return commentsCount; } public void setCommentsCount(int commentsCount) { this.commentsCount = commentsCount; }}
package com.tl.job002.pojos;import java.util.List;/** * 将User集合和content集合封装到一起 * @author dell * */public class UserAndContentInfoPojo { private ListuserPojoList; private List contentPojoList; public UserAndContentInfoPojo(List userPojoList, List contentPojoList) { super(); this.userPojoList = userPojoList; this.contentPojoList = contentPojoList; } public List getUserPojoList() { return userPojoList; } public void setUserPojoList(List userPojoList) { this.userPojoList = userPojoList; } public List getContentPojoList() { return contentPojoList; } public void setContentPojoList(List contentPojoList) { this.contentPojoList = contentPojoList; }}
(3)manager包
package com.tl.job002.manager;import java.text.ParseException;import java.util.ArrayList;import java.util.List;import java.util.Map;import org.dom4j.Element;import com.tl.job002.pojos.UserAndContentInfoPojo;import com.tl.job002.pojos.WbContentInfoPojo;import com.tl.job002.pojos.WbUserInfoPojo;import com.tl.job002.utils.DateUtil;import com.tl.job002.utils.FileOperatorUtil;import com.tl.job002.utils.IOUtil;import com.tl.job002.utils.XmlParserUtil;/** * 数据结构化处理类 * @author dell *思路:1、原数据文件-->UserInfo()对象 -->list * -->ContentInfo()对象 -->list --->UserAndContentInfoPojo对象 * 2、UserAndContentInfoPojo对象 -->user_pojo_list.txt * -->content_pojo_list.txt * *思路整合: *数据文件: ->{uid,List} ->{userInfoPojo} */public class DataLoadManager { public static class UidAndListPojo { //内部静态 //uid private String uid; //该uid对应的每条信息: private List lineList; public String getUid() { return uid; } public void setUid(String uid) { this.uid = uid; } public List getLineList() { return lineList; } public void setLineList(List lineList) { this.lineList = lineList; } }/** * 得到所有源文件的 * @param inputDir * @param charset * @return * @throws Exception */ public static List getAllFileMapResult(String inputDir, String charset) throws Exception { // key是uid,value是行集合 List uidAndListPojoList = new ArrayList (); List txtFilePathList = FileOperatorUtil .getAllSubNormalFilePath(inputDir); for (String txtFilePath : txtFilePathList) { ArrayList txtLineList = new ArrayList (); List singleTxtLineList = IOUtil.getTxtContent(txtFilePath, charset); txtLineList.addAll(singleTxtLineList); //获取该子文件的文件uid String uidValue = FileOperatorUtil .getFileNameWithoutSuffix(txtFilePath); UidAndListPojo uidAndListPojo = new UidAndListPojo(); uidAndListPojo.setLineList(txtLineList); uidAndListPojo.setUid(uidValue); uidAndListPojoList.add(uidAndListPojo); } return uidAndListPojoList; } public static UserAndContentInfoPojo getConstructInfoPojo( List uidAndListPojoList) throws ParseException { List userPojoList = new ArrayList (); List contentPojoList = new ArrayList (); int errorLineCounter4Content = 0; int errorLineCounter4User = 0; for (UidAndListPojo uidAndListPojo : uidAndListPojoList) { String uidValue = uidAndListPojo.getUid(); for (String line : uidAndListPojo.getLineList()) { line = line.trim(); if (line.length() == 0) { continue; } if (line.startsWith("<")) { // 说明content类型 line = line.trim(); //获取该元素的根元素 Element rootElement = XmlParserUtil.getXmlRootElement(line,true); //出错过滤掉就可以 if (rootElement == null) { // System.out.println("解析出现错误!");// System.out.println(line); errorLineCounter4Content++; continue; } WbContentInfoPojo contentInfoPojo = new WbContentInfoPojo(); contentInfoPojo.setUid(Long.parseLong(uidValue)); contentInfoPojo.setContent(rootElement .elementText("content")); contentInfoPojo.setTime(DateUtil.getDate(rootElement .elementText("time"))); contentInfoPojo.setRepostsCount(Integer .parseInt(rootElement.elementText("repostsCount"))); contentInfoPojo .setCommentsCount(Integer.parseInt(rootElement .elementText("commentsCount"))); // 将形成的对象加入指定content List当中 contentPojoList.add(contentInfoPojo); } else { // 剩余是user类型 try { line = line.subSequence(line.indexOf('[') + 1, line.lastIndexOf(']')).toString(); String[] kvArray = line.split(","); WbUserInfoPojo userInfoPojo = new WbUserInfoPojo(); for (String kv : kvArray) { kv = kv.trim(); String[] kvPair = kv.split("="); if (kvPair[0].equals("id")) { userInfoPojo.setUid(Long.parseLong(kvPair[1])); } else if (kvPair[0].equals("screenName")) { userInfoPojo.setScreenName(kvPair[1]); } else if (kvPair[0].equals("province")) { userInfoPojo.setProvince(Integer .parseInt(kvPair[1])); } else if (kvPair[0].equals("remark")) { userInfoPojo.setRemark(kvPair[1]); } } userPojoList.add(userInfoPojo); } catch (Exception e) {// System.out.println(line); errorLineCounter4User++; } } } } System.out.println("errorLineCounter4Content=" + errorLineCounter4Content); System.out.println("errorLineCounter4User=" + errorLineCounter4User); return new UserAndContentInfoPojo(userPojoList, contentPojoList); }/** * 将 * @param userAndContentInfoPojo * @param userOutputFilePath * @param contentOutputFilePath * @param outputCharset * @return * @throws Exception */ public static boolean writePojoToFile( UserAndContentInfoPojo userAndContentInfoPojo, String userOutputFilePath, String contentOutputFilePath, String outputCharset) throws Exception { // 1、输出user pojo list List userInfoPojoList = userAndContentInfoPojo .getUserPojoList(); StringBuilder stringBuilder = new StringBuilder(); int lineCounter = 0; for (WbUserInfoPojo tempPojo : userInfoPojoList) { if (lineCounter > 0) { stringBuilder.append("\n"); } stringBuilder.append(tempPojo.toString4FileOutput()); lineCounter++; } IOUtil.writeListToFile(stringBuilder.toString(), userOutputFilePath, outputCharset); // 输出content pojo list List contentInfoPojoList = userAndContentInfoPojo .getContentPojoList(); stringBuilder = new StringBuilder(); lineCounter = 0; for (WbContentInfoPojo tempPojo : contentInfoPojoList) { if (lineCounter > 0) { stringBuilder.append("\n"); } stringBuilder.append(tempPojo.toString4FileOutput()); lineCounter++; } IOUtil.writeListToFile(stringBuilder.toString(), contentOutputFilePath, outputCharset); return true; }/** * 标准的输入输出 * @param inputDir * @param inputCharset * @param output4User * @param output4Content * @param outputCharset * @return */ public static boolean startProcess(String inputDir, String inputCharset, String output4User, String output4Content, String outputCharset) { try { // 把给定目录中的文本文件读取成list List uidAndLiPojoList = getAllFileMapResult( inputDir, inputCharset); // 将字符串的list转化成结构化对象pojo形式的list UserAndContentInfoPojo userAndContentInfoPojo = getConstructInfoPojo(uidAndLiPojoList); // 把两个pojo形式的list对象,分别持久化输出到一个统一的文本文件中,编码为utf-8 writePojoToFile(userAndContentInfoPojo, output4User, output4Content, outputCharset); } catch (Exception e) { e.printStackTrace(); return false; } return true; } public static void main(String[] args) throws Exception { // String inputDir = "房地产"; String inputDir = "weibodata"; String inputCharset = "gbk"; String output4User = "user_pojo_list.txt"; String output4Content = "content_pojo_list.txt"; String outputCharset = "utf-8"; startProcess(inputDir, inputCharset, output4User, output4Content, outputCharset); System.out.println("done!"); }}
(4)controler包
package com.tl.job002.controler;import com.tl.job002.manager.DataLoadManager;public class SystemControler { public static void main(String[] args) throws Exception { if(args==null || args.length!=1){ System.out.println("usage: 至少需要输入一个源数据目录!"); } String inputDir = args[0]; String inputCharset = "gbk"; String output4User = "user_pojo_list.txt"; String output4Content = "content_pojo_list.txt"; String outputCharset = "utf-8"; DataLoadManager.startProcess(inputDir, inputCharset, output4User, output4Content, outputCharset); System.out.println("done!"); }}