本文共 8364 字,大约阅读时间需要 27 分钟。
安装hadoop的配置的问题:
主要是对以下文件进行配置 core-site.xml、hdfs-site.xml 、yarn-site.xml、mapred.site.xml 进行配置 其中core-site.xml 与hdfs-site.xml主要是对hdfs文件系统进行配置,tarm-site.xml与mapred-site.xml对集群进行配置hadoop为C++ 提供了编程的借口,通过hadoop pipes,主要通过C++ 与java进程之间的通信来实现,不同语言之间的兼容通过进程之间的通信来实现,
编译出现的问题,由于编译阶段链接的库比较多,所以一定要注意链接库的依赖关系,链接器的链接顺序是从从左往右进行扫描,一定要将被依赖项放到依赖项的后面(真鸡儿坑)
//hadoop 的c语言编程接口#include#include "hdfs.h"#include #include void HdfsWriteTest(hdfsFS pfs);void GetHdfsInfo(){ hdfsFS pfs = NULL; int iRet = 0; tOffset iTmp = 0; pfs = hdfsConnect("hdfs://localhost:9000/",0); if(pfs == NULL) { //WRITELOGEX(LOG_ERROR,("GetHdfsInfo():hdfsConnectfaield errno=%d"),errno); printf("connect failed\n"); return ; } printf("connect success\n"); //WRITELOGEX(LOG_INFO,("GetHdfsInfo():hdfsConnectsuccess!")); //获取文件系统的容量 iTmp = hdfsGetCapacity(pfs); if(-1 == iTmp) { //WRITELOGEX(LOG_ERROR, ("GetHdfsInfo():hdfsConnectfaield errno=%d"), errno); printf("hdfsGetCapacity failed\n"); hdfsDisconnect(pfs); pfs = NULL; return ; } printf("system storge is %ld\n", iTmp); //WRITELOGEX(LOG_INFO, ("GetHdfsInfo():hdfsConnectsuccess! offset=%d"), iTmp); //获取文件系统中所有文件的大小 iTmp = hdfsGetUsed(pfs); if(-1 == iTmp) { //WRITELOGEX(LOG_ERROR, ("GetHdfsInfo():hdfsConnectfaield errno = %d"), errno); printf("hdfsGetUsed failed\n"); hdfsDisconnect(pfs); pfs = NULL; return ; } printf("system already used %ld ", iTmp); HdfsWriteTest(pfs); iRet = hdfsDisconnect(pfs); if(-1 == iRet) { //WRITELOGEX(LOG_ERROR,("GetHdfsInfo():hdfsDisconnectFailed errno = %d"), errno); printf("hdfsDisconnectFailed\n"); return ; } printf("hdfsDisconnectsuccess\n"); //WRITELOGEX(LOG_INFO, ("GetHdfsInfo():hdfsDisconnectsuccess!")); pfs = NULL; return ;}void HdfsWriteTest(hdfsFS pfs){ int iRet = 0; hdfsFile pfile = NULL; char *szTestFile = "/tmp/user/testfile.txt"; if(pfs == NULL) { return ; } //打开文件 pfile = hdfsOpenFile(pfs,szTestFile,O_WRONLY || O_CREAT, 0, 0, 0); if(NULL == pfile) { //WRITELOGEX(LOG_ERROR, ("HdfsWriteTest():hdfsOpenFilefailed! szFilePath = %s,errno=%d"),szTestFile,errno); return ; } //WRITELOGEX(LOG_INFO, ("HdfsWriteTest():hdfsOpenFilesuccess! szFilePath=%s"), szTestFile); //写入数据 iRet = hdfsWrite(pfs, pfile, "ni hao", strlen("ni hao")); if(iRet == -1) { //WRITELOGEX(LOG_ERROR, ("HdfsWriteTest():hdfsWritefailed! ret = %d, errno = %d"),iRet, errno); hdfsCloseFile(pfs, pfile); pfile = NULL; return ; } //WRITELOGEX(LOG_INFO, ("HdfsWriteTest():hdfsWritesuccess ret = %d",iRet)); //将缓冲去的内容写入磁盘 iRet = hdfsHFlush(pfs, pfile); if(-1 == iRet) { //WRITELOGEX(LOG_ERROR, ("HdfsWriteTest():hdfsHFlushfailed ret = %d, errno = %d"),iRet,errno); return ; } //WRITELOGEX(LOG_INFO,("HdfsWriteTest():hdfsHFlushsuccess ret = %d"),iRet); //关闭文件 iRet = hdfsCloseFile(pfs, pfile); if(-1 == iRet) { //WRITELOGEX(LOG_ERROR, ("HdfsWriteTest():hdfsCloseFilefailed ret = %d, errno = %d"), iRet,errno); return ; } pfile = NULL; return ;}int main(){ GetHdfsInfo(); return 0; }
#include#include #include #include #include "Pipes.hh"#include "TemplateFactory.hh"#include "StringUtils.hh" using namespace std;class MaxTemperatureMapper : public HadoopPipes::Mapper { public: MaxTemperatureMapper(HadoopPipes::TaskContext& context) { } void map(HadoopPipes::MapContext& context) { std::string line = context.getInputValue(); std::string year = line.substr(15, 4); std::string airTemperature = line.substr(87, 5); std::string q = line.substr(92, 1); if (airTemperature != "+9999" && (q == "0" || q == "1" || q == "4" || q == "5" || q == "9")) { context.emit(year, airTemperature); } } }; class MapTemperatureReducer : public HadoopPipes::Reducer { public: MapTemperatureReducer(HadoopPipes::TaskContext& context) { } void reduce(HadoopPipes::ReduceContext& context) { int maxValue = INT_MIN; while (context.nextValue()) { maxValue = std::max(maxValue, HadoopUtils::toInt(context.getInputValue())); } context.emit(context.getInputKey(), HadoopUtils::toString(maxValue)); } }; int main(int argc, char *argv[]) { return HadoopPipes::runTask(HadoopPipes::TemplateFactory ()); }
//使用流式操作hadoop jar ./share/hadoop/tools/lib/hadoop-streaming-3.1.2.jar -input /tmp/count/word -output /tmp/count/res -mapper /home/hadoop/testhadoop/map -reducer /home/hadoop/testhadoop/reduce//使用管道操作hadoop pipes -D hadoop.pipes.java.recordreader=true -D hadoop.pipes.java.recordwriter=true -input /tmp/user/test -output /tmp/user/output12 -program /tmp/user/firsthadoop//g++编译命令g++ -m64 -o firsthadoop firsthadoop.cpp -I/usr/local/hadoop/include -L/usr/local/hadoop/lib/native -lhdfs -lhadooppipes -lhadooputils -lpthread -lcrypto -lssl -std=c++11
namenode节点
负责数据元的操作,namenode节点上存储的是数据元信息(文件名,文件大小,文件的属性,创建之间,权限,副本数)当client发出请求的时候,会先向namenode 请求,namenode节点告诉client他所请求的数据在那个datanode上,这时候client会直接与datanode 节点进行通信,与文件内容有关的数据不经过namenode节点,否则namenode节点会成为系统的瓶颈, 当namenode节点收到请求时候,会返回给客户端一个最近的datanode节点,以减轻网络的负载量 namenode 中存储者数据的分布情况与数据的备份的情况,datanode节点会定期向datanode节点发送心跳包,以告知namenode节点自己当前还在存活状态,当namenode节点一段事件后收不到其中一个datanode节点的心跳包了,则namenode节点认为其中一个datanode节点掉线了,这时候,namenode节点会将该datanode节点上存储的内容分布到其他的datanode节点上,以保证数据的备份的数量。 如果只有一个namenode节点的情况下,当namenode节点出现故障,则所有的操作都不能进行,存在单点故障问题, 在hadoop2中,运行者两个namenode节点,在同一个时刻只有一个namenode节点服务,另一个节点作为备用,双namenode节点的工作原理:datanode节
负责具体的文件内容的操作reduce 的过程
(1)reducer会从主动mapper 阶段复制键值对,由于mapper会有很多,所以reducer会复制多个mapper的输出 (2)把复制到reduce本地的数据全部进行合并,把分散的数据合并成一个打的数据,对合并后的数据进行排序 (3)对合并后的数据调用reduce ,对与键值相同的键值对调用reduce,每次调用会产生或多个键值对,将键值对存储到本地的hdfs系统中 shuffle 派发 整个mapreduce的核心过程,指mapper产生的直接输出的结果,经过一系列的处理之后,成为reducer的直接输入结果的过程, (1)mapper端的shuffle: 由Mapper 产生的结果并不会直接写入磁盘中,而是先存储在内存中,当内存中的数据量达到某个阈值的时候,一次性写入本地磁盘。并且同时进行sort,commbine,(合并)、partition(分片),其中sort是吧mapper产生的结果按照key值进行排序,combine是把key值相同的记录进行合并;partition是把数据均衡的分给reducer (2)reducer端的shuffle mapper个reducer往往不在同一个节点上运行,所以reducer需要从多个节点上下载mapper的结果数据,并对这些数据进行处理,然后才能被reducer处理转载地址:http://manwi.baihongyu.com/