博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
hadoop分布式框架的使用中的问题
阅读量:3942 次
发布时间:2019-05-24

本文共 8364 字,大约阅读时间需要 27 分钟。

文章目录

安转配置问题

  • 对于jdk版本号大于9的可能会出现无法启动resourcemanager 与nodemanager的情况,原因是jdk版本过高的话,与hadoop 的兼容

安装hadoop的配置的问题:

主要是对以下文件进行配置 core-site.xml、hdfs-site.xml 、yarn-site.xml、mapred.site.xml 进行配置
其中core-site.xml 与hdfs-site.xml主要是对hdfs文件系统进行配置,tarm-site.xml与mapred-site.xml对集群进行配置

hadoop为C++ 提供了编程的借口,通过hadoop pipes,主要通过C++ 与java进程之间的通信来实现,不同语言之间的兼容通过进程之间的通信来实现,

编译出现的问题,由于编译阶段链接的库比较多,所以一定要注意链接库的依赖关系,链接器的链接顺序是从从左往右进行扫描,一定要将被依赖项放到依赖项的后面(真鸡儿坑)

C语言接口

//hadoop 的c语言编程接口#include 
#include "hdfs.h"#include
#include
void HdfsWriteTest(hdfsFS pfs);void GetHdfsInfo(){
hdfsFS pfs = NULL; int iRet = 0; tOffset iTmp = 0; pfs = hdfsConnect("hdfs://localhost:9000/",0); if(pfs == NULL) {
//WRITELOGEX(LOG_ERROR,("GetHdfsInfo():hdfsConnectfaield errno=%d"),errno); printf("connect failed\n"); return ; } printf("connect success\n"); //WRITELOGEX(LOG_INFO,("GetHdfsInfo():hdfsConnectsuccess!")); //获取文件系统的容量 iTmp = hdfsGetCapacity(pfs); if(-1 == iTmp) {
//WRITELOGEX(LOG_ERROR, ("GetHdfsInfo():hdfsConnectfaield errno=%d"), errno); printf("hdfsGetCapacity failed\n"); hdfsDisconnect(pfs); pfs = NULL; return ; } printf("system storge is %ld\n", iTmp); //WRITELOGEX(LOG_INFO, ("GetHdfsInfo():hdfsConnectsuccess! offset=%d"), iTmp); //获取文件系统中所有文件的大小 iTmp = hdfsGetUsed(pfs); if(-1 == iTmp) {
//WRITELOGEX(LOG_ERROR, ("GetHdfsInfo():hdfsConnectfaield errno = %d"), errno); printf("hdfsGetUsed failed\n"); hdfsDisconnect(pfs); pfs = NULL; return ; } printf("system already used %ld ", iTmp); HdfsWriteTest(pfs); iRet = hdfsDisconnect(pfs); if(-1 == iRet) {
//WRITELOGEX(LOG_ERROR,("GetHdfsInfo():hdfsDisconnectFailed errno = %d"), errno); printf("hdfsDisconnectFailed\n"); return ; } printf("hdfsDisconnectsuccess\n"); //WRITELOGEX(LOG_INFO, ("GetHdfsInfo():hdfsDisconnectsuccess!")); pfs = NULL; return ;}void HdfsWriteTest(hdfsFS pfs){
int iRet = 0; hdfsFile pfile = NULL; char *szTestFile = "/tmp/user/testfile.txt"; if(pfs == NULL) {
return ; } //打开文件 pfile = hdfsOpenFile(pfs,szTestFile,O_WRONLY || O_CREAT, 0, 0, 0); if(NULL == pfile) {
//WRITELOGEX(LOG_ERROR, ("HdfsWriteTest():hdfsOpenFilefailed! szFilePath = %s,errno=%d"),szTestFile,errno); return ; } //WRITELOGEX(LOG_INFO, ("HdfsWriteTest():hdfsOpenFilesuccess! szFilePath=%s"), szTestFile); //写入数据 iRet = hdfsWrite(pfs, pfile, "ni hao", strlen("ni hao")); if(iRet == -1) {
//WRITELOGEX(LOG_ERROR, ("HdfsWriteTest():hdfsWritefailed! ret = %d, errno = %d"),iRet, errno); hdfsCloseFile(pfs, pfile); pfile = NULL; return ; } //WRITELOGEX(LOG_INFO, ("HdfsWriteTest():hdfsWritesuccess ret = %d",iRet)); //将缓冲去的内容写入磁盘 iRet = hdfsHFlush(pfs, pfile); if(-1 == iRet) {
//WRITELOGEX(LOG_ERROR, ("HdfsWriteTest():hdfsHFlushfailed ret = %d, errno = %d"),iRet,errno); return ; } //WRITELOGEX(LOG_INFO,("HdfsWriteTest():hdfsHFlushsuccess ret = %d"),iRet); //关闭文件 iRet = hdfsCloseFile(pfs, pfile); if(-1 == iRet) {
//WRITELOGEX(LOG_ERROR, ("HdfsWriteTest():hdfsCloseFilefailed ret = %d, errno = %d"), iRet,errno); return ; } pfile = NULL; return ;}int main(){
GetHdfsInfo(); return 0; }

hadooppipes编程接口

#include 
#include
#include
#include
#include "Pipes.hh"#include "TemplateFactory.hh"#include "StringUtils.hh" using namespace std;class MaxTemperatureMapper : public HadoopPipes::Mapper {
public: MaxTemperatureMapper(HadoopPipes::TaskContext& context) {
} void map(HadoopPipes::MapContext& context) {
std::string line = context.getInputValue(); std::string year = line.substr(15, 4); std::string airTemperature = line.substr(87, 5); std::string q = line.substr(92, 1); if (airTemperature != "+9999" && (q == "0" || q == "1" || q == "4" || q == "5" || q == "9")) {
context.emit(year, airTemperature); } } }; class MapTemperatureReducer : public HadoopPipes::Reducer {
public: MapTemperatureReducer(HadoopPipes::TaskContext& context) {
} void reduce(HadoopPipes::ReduceContext& context) {
int maxValue = INT_MIN; while (context.nextValue()) {
maxValue = std::max(maxValue, HadoopUtils::toInt(context.getInputValue())); } context.emit(context.getInputKey(), HadoopUtils::toString(maxValue)); } }; int main(int argc, char *argv[]) {
return HadoopPipes::runTask(HadoopPipes::TemplateFactory
()); }

hadoop 操作相关的命令

//使用流式操作hadoop jar ./share/hadoop/tools/lib/hadoop-streaming-3.1.2.jar -input /tmp/count/word -output /tmp/count/res -mapper /home/hadoop/testhadoop/map -reducer /home/hadoop/testhadoop/reduce//使用管道操作hadoop pipes -D hadoop.pipes.java.recordreader=true -D hadoop.pipes.java.recordwriter=true -input /tmp/user/test -output /tmp/user/output12 -program /tmp/user/firsthadoop//g++编译命令g++ -m64 -o firsthadoop firsthadoop.cpp -I/usr/local/hadoop/include -L/usr/local/hadoop/lib/native  -lhdfs -lhadooppipes -lhadooputils -lpthread -lcrypto -lssl -std=c++11

hdfs文件系统

  1. namenode节点

    负责数据元的操作,namenode节点上存储的是数据元信息(文件名,文件大小,文件的属性,创建之间,权限,副本数)当client发出请求的时候,会先向namenode 请求,namenode节点告诉client他所请求的数据在那个datanode上,这时候client会直接与datanode 节点进行通信,与文件内容有关的数据不经过namenode节点,否则namenode节点会成为系统的瓶颈,
    当namenode节点收到请求时候,会返回给客户端一个最近的datanode节点,以减轻网络的负载量
    namenode 中存储者数据的分布情况与数据的备份的情况,datanode节点会定期向datanode节点发送心跳包,以告知namenode节点自己当前还在存活状态,当namenode节点一段事件后收不到其中一个datanode节点的心跳包了,则namenode节点认为其中一个datanode节点掉线了,这时候,namenode节点会将该datanode节点上存储的内容分布到其他的datanode节点上,以保证数据的备份的数量。
    如果只有一个namenode节点的情况下,当namenode节点出现故障,则所有的操作都不能进行,存在单点故障问题,
    在hadoop2中,运行者两个namenode节点,在同一个时刻只有一个namenode节点服务,另一个节点作为备用,双namenode节点的工作原理:

    • 内存中各自保存一份元数据
    • edits日志只能有一份,只有Adtive状态的namenode节点可以做写操作
    • 两个节点都可读取edits
    • 共享的edits放在一个共享存储管理
      active namenode:
      接受client的rpc请并处理,同时写自己的editlog和共享存储上的editlog,接受dayanode的block report、block localtion updates和heartbeat;
      standby namenode:
      同样接受datanode 的block report 、block localtion updates和heartbeat; 同时会从共享存储上的editlog上读取并执行这些操作,使得自己的namenode中的元数据都是和active namenode中的元数据是同步的,所以说standby模式的namenode是一个热备,一旦切换成active node节点,马上就能提供服务
      hadoop2 对与两个namenode节点处理单点故障的问题,但是随之而来是两个节点切换的时候造成的脑裂问题。
      (脑裂问题:发生在namenode自动切换的时候,当active 节点准备切换的时候,先向standby namenode节点发送一个消息,收到消息后standby namenode节点变为活跃状态,但是如果此时原来的活跃的namenode节点还没有下线,此时就会出现两个active namenode节点共同处理的情况,
      解决方法:在standby 与 active 之间添加一个心跳包,只有收不到心跳包的时候才进行状态的切换)
  2. datanode节

    负责具体的文件内容的操作

yarn架构

  1. Client向Yarn提交Application,这里我们假设是一个MapReduce作业。
  2. ResourceManager向NodeManager通信,为该Application分配第一个容器。并在这个容器中运行这个应用程序对应的ApplicationMaster。
  3. ApplicationMaster启动以后,对作业(也就是Application)进行拆分,拆分task出来,这些task可以运行在一个或多个容器中。然后向ResourceManager申请要运行程序的容器,并定时向ResourceManager发送心跳
  4. 申请到容器后,ApplicationMaster会去和容器对应的NodeManager通信,而后将作业分发到对应的NodeManager中的容器去运行,这里会将拆分后的MapReduce进行分发,对应容器中运行的可能是Map任务,也可能是Reduce任务。
  5. 容器中运行的任务会向ApplicationMaster发送心跳,汇报自身情况。当程序运行完成后,ApplicationMaster再向ResourceManager注销并释放容器资源。

mapreduce 详解

  1. 输入文件的分割过程
    hdfs底层是按照块进行存储的,在进行数据的处理之前,先将输入的文件按照固定大小的块进行分割,如果一个文件的大小小于一个块的大小,则该文件占据一个块,如果文件大小大于一个块的大小,将文件分割成若干个小于块大小的文件,每个块调用一个map()函数,在数据存储的时候并不是一个文件占据一个单独的块,可能一个文件分成了很多的块存放在不同的系统中。
  2. 对输入片中的记录按照一定的规则解析成键值对,有个默认的规则是把一行的内容解析成键值对,首地址作为键,行的内容作为值
  3. 调用map() 方法,对第二步解析出来的键值对调用一个map方法,每个键值对调用一次,这个阶段也是编程者可以控制的阶段,往往是通过该阶段来对输入的文件进行解析的
  4. 按照一定的规则对第三步得到的数据进行分区,通过分区可以进行大致的排序,一个分区确定了一个范围,默认情况下只有一个分区,分区的数量就是reduce运行的数量
  5. 对同一个分区的键值对进行排序,首先按照键进行排序,键值相同的按照值进行排序
  6. 对数据进行规约处理,也就是reduce 操作,通常会有comper操作,键值相同的会调用一次reduce操作,经过这一阶段,数据量减少,规约后的数据会存储到本地系统中

reduce 的过程

(1)reducer会从主动mapper 阶段复制键值对,由于mapper会有很多,所以reducer会复制多个mapper的输出
(2)把复制到reduce本地的数据全部进行合并,把分散的数据合并成一个打的数据,对合并后的数据进行排序
(3)对合并后的数据调用reduce ,对与键值相同的键值对调用reduce,每次调用会产生或多个键值对,将键值对存储到本地的hdfs系统中
shuffle 派发
整个mapreduce的核心过程,指mapper产生的直接输出的结果,经过一系列的处理之后,成为reducer的直接输入结果的过程,
(1)mapper端的shuffle:
由Mapper 产生的结果并不会直接写入磁盘中,而是先存储在内存中,当内存中的数据量达到某个阈值的时候,一次性写入本地磁盘。并且同时进行sort,commbine,(合并)、partition(分片),其中sort是吧mapper产生的结果按照key值进行排序,combine是把key值相同的记录进行合并;partition是把数据均衡的分给reducer
(2)reducer端的shuffle
 mapper个reducer往往不在同一个节点上运行,所以reducer需要从多个节点上下载mapper的结果数据,并对这些数据进行处理,然后才能被reducer处理

转载地址:http://manwi.baihongyu.com/

你可能感兴趣的文章
移动端多页面应用(MPA)的开发(二)
查看>>
移动端多页面应用(MPA)的开发(三)
查看>>
移动端多页面APP(MPA)开发体验
查看>>
基于深度学习知识追踪研究进展(综述)数据集模型方法
查看>>
linux常见命令与FileZilla
查看>>
PostgreSQL和ElasticSearch学习笔记
查看>>
java反射
查看>>
paint 和 paintcomponent的区别
查看>>
JSP字节码的存放路径问题
查看>>
对RMQ的理解
查看>>
LCA的离线算法
查看>>
spark学习与资料
查看>>
Java_SSM问题
查看>>
sql-数据库操作
查看>>
推荐CTR预估-几个基础模型FM \FFM\GBDT+LR
查看>>
推荐系统基础
查看>>
redis
查看>>
word2vec参数
查看>>
python的collections
查看>>
LDA和PCA
查看>>