spark2.1hbasejava的简单介绍

两台服务器手动部署大数据平台

两台服务器手动部署大数据平台

##### 初始服务器数量

– 2台centos7

##### 建议配置

– 32G(RAM)

– 24cpu

– 10t(SATA)

### 1.环境

– 系统centos7

– jdk:1.8.0_171(64位)

– zookeeper:3.4.8

– spark-2.1.0-bin-hadoop2.6

– kafka_2.10-0.10.2.1

– hadoop-2.7.0

– hbase-1.2.6

– elasticsearch-6.3.0

### 2.系统准备

对应的安装包文件:

elasticsearch-6.3.0.tar.gz

hadoop-2.7.0.tar.gz

hbase-1.2.6-bin.tar.gz

jdk-8u171-linux-x64.tar.gz

kafka_2.10-0.10.2.1.tgz

mysql-5.7.23-1.el7.x86_64.rpm-bundle.tar

spark2.1.0hadoop2.6.tgz.gz

zookeeper-3.4.8.tar.gz

一、 配置好hosts

“`

两台设备的host

ip1 hello1

ip2 hello2

关闭防火墙

systemctl stop firewalld

systemctl disable firewalld

二、机器之间做好免密

1. 在hello1服务器中,cd /root/

2. ssh-keygen -trsa  (全部按回车,走默认配置)

3. cat ~/.ssh/id_rsa.pub ~/.ssh/authorized_keys

4. chmod 600 ~/.ssh/authorized_keys

5. scp ~/.ssh/authorized_keys root@hello2:~/.ssh/

到此处时可以实现hello1机器上通过root账户登录到hello2中,但从hello2中无法通过免密码登录到hello1服务器。

6. 在hello2服务器中,cd /root/

7. ssh-keygen -trsa  (全部按回车,走默认配置)

8. cat ~/.ssh/id_rsa.pub ~/.ssh/authorized_keys

9. scp ~/.ssh/authorized_keys root@hello1:~/.ssh/

到此处时可以实现hello1机器与hello2机器之间免密码互通

三、建立一个用户操作elasticsearch用户,后期所有安装软件放在该目录下(当前使用root账户安装)

1.添加用户:

useradd -m -s /bin/bash es

2.为该用户设置密码:

password es

四、安装JDK

如果系统自带openjdk,先将其卸载掉!

1.创建jdk安装路径(hello1、hello2都执行)

执行: mkdir /usr/java

2.解压缩jdk到安装目录

执行: tar -zxvf jdk-8u171-linux-x64.tar.gz -C /usr/java/

3.添加环境变量

vi /etc/profile,添加以下语句

export JAVA_HOME=/usr/java/jdk1.8.0_171

export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

export PATH=$PATH:$JAVA_HOME/bin

执行:source /etc/profile

4.复制安装包和数据目录到hello2

scp  -r /usr/java/jdk1.8.0_171 hello2:/usr/java/

scp /etc/profile hello2:/etc/

登录到hello2上,进入/home/es目录

执行: source /etc/profile

5、验证:

两台服务器上分别执行: java -version,查看输出的版本是否与安装的版本一致。

五、安装mysql

1.如果centos系统中自带mariadb,先卸载mariadb。

2.解压mysql安装包程序

执行:tar -xvf  mysql-5.7.23-1.el7.x86_64.rpm-bundle.tar

3.依次安装里面rpm包组建

rpm -ivh mysql-community-common-5.7.23-1.el7.x86_64.rpm

rpm -ivh mysql-community-libs-5.7.23-1.el7.x86_64.rpm

rpm -ivh mysql-community-client-5.7.23-1.el7.x86_64.rpm

rpm -ivh mysql-community-server-5.7.23-1.el7.x86_64.rpm

rpm -ivh mysql-community-devel-5.7.23-1.el7.x86_64.rpm

4.启动MySQL

执行: systemctl start mysqld

5.登录mysql服务器

这种方式安装好后,会再my.cnf文件中自动生成一个密码,

执行:cat /var/log/mysqld.log | grep password, 出现如下记录:

2017-09-15T01:58:11.863301Z 1 [Note] A temporary password is generated for root@www.easyaq.com: m-NdrSG4ipuO

其中“m-NdrSG4ipuO”为mysql root账户的初始密码。

登录:

执行: mysql -uroot -p

输入密码: m-NdrSG4ipuO,即可进入mysql服务器。

后续可自行修改root密码,创建新账户等操作。

六、安装zookeeper

1.解压zookeeper安装包到指定目录(/home/es)

tar -zxvf zookeeper-3.4.8.tar.gz -C /home/es

2.创建程序软连接

cd /home/es/

ln -s zookeeper-3.4.8 zookeeper

3.添加执行路径环境

vi /etc/profile

添加

export ZOOKEEPER_HOME=/home/es/zookeeper

export PATH=$PATH:$ZOOKEEPER_HOME/bin

执行

source /etc/profile

4.修改配置文件

cd /home/es/zookeeper

cp conf/zoo_sample.cfg conf/zoo.cfg

在/home/data下创建对应的zookeeper数据存储目录

mkdir /home/data/zookeeper

mkdir /home/data/zookeeper/data

mkdir /home/data/zookeeper/log

修改配置文件:conf/zoo.cfg,添加以下语句

dataDir=/home/data/zookeeper/data

dataLogDir=/home/data/zookeeper/log

server.1=hello1:2888:3888

server.2=hello2:2888:3888

5.创建server表示符文件

touch /home/data/zookeeper/data/myid

echo echo 1/home/data/zookeeper/data/myid

6.复制安装包和数据目录到hello2

scp -r /home/es/zookeeper-3.4.8 es@hello2:/home/es

scp -r /home/data/zookeeper es@hello2:/home/data

scp  /etc/profile es@hello2:/etc

登录到hello2上

cd /home/es

ln -s zookeeper-3.4.8 zookeeper

echo echo 2/home/data/zookeeper/data/myid

执行

source /etc/profile

7.两台机器上分别执行

zkServer.sh start

8.验证

jps | grep QuorumPeerMain,查看是否有该进程

zkServer.sh status,查看服务状态

六、安装kafka

1.解压kafka安装包到指定目录(/home/es)

tar -zxvf kafka_2.10-0.10.2.1.tgz -C /home/es

2.创建程序软连接

cd /home/es/

ln -s kafka_2.10-0.10.2.1 kafka

3.修改配置文件

备份:

cp config/server.properties config/server.properties.bak

创建kafka日志目录:

mkdir /home/data/kafka

mkdir /home/data/kafka/kafka-logs

修改:config/server.properties,具体对应字段如下:

broker.id=0

delete.topic.enable=true

num.network.threads=10

num.io.threads=32

socket.send.buffer.bytes=102400

socket.receive.buffer.bytes=102400

socket.request.max.bytes=104857600

log.dirs=/home/data/kafka/kafka-logs

num.partitions=1

num.recovery.threads.per.data.dir=1

log.retention.hours=168

log.segment.bytes=1073741824

log.retention.check.interval.ms=300000

zookeeper.connect=hello1:2181,hello2:2181

zookeeper.connection.timeout.ms=6000

6.复制安装包和数据目录到hello2

scp -r /home/es/kafka_2.10-0.10.2.1 es@hello2:/home/es

scp -r /home/data/kafka es@hello2:/home/data

修改hello2中的配置

登录到hello2上,cd /home/es/kafka,修改config/server.properties中broker.id值为2.

7.启动kafka

在两台机器的/home/es/kafka中,创建一个日志存放目录:mkdir start_log,执行以下命令:

nohup bin/kafka-server-start.sh config/server.properties start_log/kafka_start_log 21

8.验证运行情况

jps | grep Kafka,查看进程

通过kafka命令查看topic。

七、安装hadoop

1.解压hadoop安装包到指定目录(/home/es)

tar -zxvf hadoop-2.7.0.tar.gz -C /home/es

2.创建程序软连接

cd /home/es/

ln -s hadoop-2.7.0 hadoop

3.创建数据存放目录

mkdir /home/data/hadoop

mkdir /home/data/hadoop/tmp

mkdir /home/data/hadoop/dfs

mkdir /home/data/hadoop/dfs/data

mkdir /home/data/hadoop/dfs/name

4.修改配置文件

修改/home/es/hadoop/etc/hadoop/core-site.xml

configuration

property

namefs.defaultFS/name

valuehdfs://hello1:9000/value

/property

property

namehadoop.tmp.dir/name

valuefile:/home/data/hadoop/tmp/value

/property

property

nameio.file.buffer.size/name

value131702/value

/property

/configuration

修改/home/es/hadoop/etc/hadoop/hdfs-site.xml

configuration

property

namedfs.namenode.name.dir/name

valuefile:/home/data/hadoop/dfs/name/value

/property

property

namedfs.datanode.data.dir/name

valuefile:/home/data/hadoop/dfs/data/value

/property

property

namedfs.replication/name

value2/value

/property

property

namedfs.namenode.secondary.http-address/name

valuehello1:9001/value

/property

property

namedfs.webhdfs.enabled/name

valuetrue/value

/property

/configuration

修改/home/es/hadoop/etc/hadoop/mapred-site.xml

configuration

property

namemapreduce.framework.name/name

valueyarn/value

/property

property

namemapreduce.jobhistory.address/name

valuehello1:10020/value

/property

property

namemapreduce.jobhistory.webapp.address/name

valuehello1:19888/value

/property

/configuration

修改/home/es/hadoop/etc/hadoop/yarn-site.xml

configuration

!– Site specific YARN configuration properties —

property

nameyarn.nodemanager.aux-services/name

valuemapreduce_shuffle/value

/property

property

nameyarn.nodemanager.auxservices.mapreduce.shuffle.class/name

valueorg.apache.hadoop.mapred.ShuffleHandler/value

/property

property

nameyarn.resourcemanager.address/name

valuehello1:8032/value

/property

property

nameyarn.resourcemanager.scheduler.address/name

valuehello1:8030/value

/property

property

nameyarn.resourcemanager.resource-tracker.address/name

valuehello1:8031/value

/property

property

nameyarn.resourcemanager.admin.address/name

valuehello1:8033/value

/property

property

nameyarn.resourcemanager.webapp.address/name

valuehello1:8088/value

/property

property

nameyarn.nodemanager.resource.memory-mb/name

value768/value

/property

/configuration

配置/home/es/hadoop/etc/hadoop目录下hadoop-env.sh、yarn-env.sh的JAVA_HOME(不设置的话,启动不了)

export JAVA_HOME=/usr/java/jdk1.8.0_171

配置/home/es/hadoop/etc/hadoop目录下的slaves,删除默认的www.easyaq.com,增加2个从节点,

hello1

hello2

5、将配置好的Hadoop复制到各个节点对应位置上,通过scp传送

scp  -r /home/es/hadoop-2.7.0 hello2:/home/es/

scp  -r /home/data/hadoop hello2:/home/data/

登录到hello2上,进入/home/es目录

执行: ln -s hadoop-2.7.0 hadoop

6、格式化nameNode及启动hadoop

在主服务器启动hadoop,从节点会自动启动,进入/home/es/hadoop目录

初始化,输入命令,bin/hdfs namenode -format

全部启动sbin/start-all.sh,也可以分开sbin/start-dfs.sh、sbin/start-yarn.sh

输入命令,jps,可以看到相关信息

7、验证hadoop运行情况

浏览器打开

浏览器打开

8、添加hadoop环境变量到/etc/profile

export HADOOP_HOME=/home/es/hadoop export PATH=$PATH:$HADOOP_HOME/sbin

export PATH=$PATH:$HADOOP_HOME/bin

export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native

export HADOOP_OPTS=”-Djava.library.path=$HADOOP_HOME/lib/native”

执行: source /etc/profile

八、安装Hbase

1.解压hbase安装包到指定目录(/home/es)

tar -zxvf hbase-1.2.6-bin.tar.gz -C /home/es

2.创建程序软连接

cd /home/es/

ln -s hbase-1.2.6 hbase

3.添加hbase环境变量到/etc/profile

export  HBASE_HOME=/home/es/hbase

export  PATH=$HBASE_HOME/bin:$PATH

执行:source /etc/profile

4.修改HBASE配置文件

vi /home/es/hbase/conf/hbase-env.sh

增加: export JAVA_HOME=/usr/java/jdk1.8.0_171

修改: export HBASE_MANAGES_ZK=false

vi /home/es/hbase/conf/hbase-site.xml

修改类容:

configuration

property

namehbase.rootdir/name !– hbase存放数据目录 —

valuehdfs://hello1:9000/hbase/hbase_db/value

!– 端口要和Hadoop的fs.defaultFS端口一致–

/property

property

namehbase.cluster.distributed/name !– 是否分布式部署 —

valuetrue/value

/property

property

namehbase.zookeeper.quorum/name !– list of  zookooper —

valuehello1,hello2/value

/property

 property!–zookooper配置、日志等的存储位置 —

namehbase.zookeeper.property.dataDir/name

value/home/es/hbase/zookeeper/value

 /property

/configuration

配置regionservers,vi /home/es/hbase/conf/regionservers

去掉默认的www.easyaq.com,加入hello1、hello2

5、将配置好的hbase复制到各个节点对应位置上,通过scp传送

scp  -r /home/es/hbase-1.2.6 hello2:/home/es/

scp /etc/profile hello2:/etc/

登录到hello2上,进入/home/es目录

执行: ln -s hbase-1.2.6 hbase

source /etc/profile

6、hbase的启动

hello1中执行: start-hbase.sh

7、验证hbase运行情况

输入jps命令查看进程是否启动成功,若 hello1上出现HMaster、HRegionServer、HQuormPeer,hello2上出现HRegionServer、HQuorumPeer,就是启动成功了。

输入hbase shell 命令 进入hbase命令模式,输入status命令,查看运行状态。

在浏览器中输入就可以在界面上看到hbase的配置

注意事项:

正常安装后,创建普通不带压缩表可以正常读写,当使用snappy进行压缩创建表时,该表无法再regionServer中启动!

解决方法:

1.在hbase-site.xml文件中添加一下属性

property

                namehbase.regionserver.codecs/name

                valuesnappy/value

        /property

2.每台机器中将hadoop_native.zip解压缩到hbase安装目录的lib下,执行 unzip hadoop_native.zip $HBASE_HOME/lib/

3.在$HBASE_HOME/conf/hbase-env.sh 中添加:export HBASE_LIBRARY_PATH=/home/es/hbase/lib/native

4.重启Hbase服务即可

九、Spark安装

1.解压hbase安装包到指定目录(/home/es)

tar -zxvf spark2.1.0hadoop2.6.tgz.gz -C /home/es

2.创建程序软连接

cd /home/es/

ln -s spark2.1.0hadoop2.6 spark

3.修改配置文件

mv /home/es/spark/conf/spark-env.sh.template  /home/es/spark/conf/spark-env.sh

vi /home/es/spark/conf/spark-env.sh

修改对应配置:

export JAVA_HOME=/usr/java/jdk1.8.0_171

export SPARK_MASTER_IP=hello1

export SPARK_MASTER_PORT=7077

export SPARK_LOCAL_IP=hello1

修改slaves文件

mv /home/es/spark/conf/slaves.template  /home/es/spark/conf/slaves

vi /home/es/spark/conf/slaves

将www.easyaq.com修改成:

hello1

hello2

5、将配置好的hbase复制到各个节点对应位置上,通过scp传送

scp  -r /home/es/spark2.1.0hadoop2.6 hello2:/home/es/

登录到hello2上,进入/home/es目录

执行: ln -s spark2.1.0hadoop2.6 spark

在hello2中修改/home/es/spark/conf/spark-env.sh

export JAVA_HOME=/usr/java/jdk1.8.0_171

export SPARK_MASTER_IP=hello1

export SPARK_MASTER_PORT=7077

export SPARK_LOCAL_IP=hello2

6、启动spark

cd /home/es/spark

执行: sbin/start-all.sh

7、检测执行结果

jps | grep Worker,看是否有相应的进程。

十、安装elasticsearch

由于elasticsearch,用root账户无法启动,故该组件用es账户安装

1、切换到es账户: su es

2、解压hbase安装包到指定目录(/home/es)

tar -zxvf elasticsearch-6.3.0.tar.gz -C /home/es/

创建程序软连接

cd /home/es/

ln -s elasticsearch-6.3.0 elasticsearch

3、修改配置文件

vi /home/es/elasticsearch/config/elasticsearch.yml

# 集群的名字 

cluster.name: crrc-health

# 节点名字 

node.name: node-1 

# 数据存储目录(多个路径用逗号分隔) 

path.data: /home/data1/elasticsearch/data

# 日志目录 

path.logs: /home/data1/elasticsearch/logs

#本机的ip地址

network.host: hello1 

#设置集群中master节点的初始列表,可以通过这些节点来自动发现新加入集群的节点

discovery.zen.ping.unicast.hosts: [“hello1”, “hello2”]

# 设置节点间交互的tcp端口(集群),(默认9300) 

transport.tcp.port: 9300

# 监听端口(默认) 

http.port: 9200

# 增加参数,使head插件可以访问es 

http.cors.enabled: true

http.cors.allow-origin: “*”

4、创建elasticsearch数据和存储目录

mkdir /home/data1/elasticsearch

mkdir /home/data1/elasticsearch/data

mkdir /home/data1/elasticsearch/logs

5、修改linux系统的默认硬限制参数

切换至root用户: su root

vim /etc/security/limits.conf

添加:

es soft nofile 65536

es hard nofile 65536

退出es登录,重新用es账户登录,使用命令:ulimit -Hn查看硬限制参数。

vi /etc/sysctl.conf

添加:

vm.max_map_count=655360

执行:

sysctl -p

6、将配置好的elasticsearch复制到各个节点对应位置上,通过scp传送

scp  -r /home/es/elasticsearch-6.3.0 hello2:/home/es/

scp  -r /home/data1/elasticsearch hello2:/home/data1/

登录到hello2上,进入/home/es目录

执行: ln -s elasticsearch-6.3.0 elasticsearch-6.3.0

在hello2中修改/home/es/elasticsearch/config/elasticsearch.yml

修改: network.host: hello2

7、启动elasticsearch

使用es账户

执行:

/home/es/elasticsearch/bin/elasticsearch -d

8、验证

控制台中输入:curl

Spark 读取 Hbase 数据

下面这种方式是全表扫描,Spark如果通过RS来访问Hbase数据进行数据分析,对RS会产生很大的压力。不太建议使用下面的方式

在本地测试时返现运行的很慢,后来看到以下日志

由于Hbase表中只有两个region,所以只启动两个Task,此时并行度为二!

那么也就是说Spark读取Hbase的并行度取决于这个表有多少个region。然后根据region的startkey和endkey来获取数据

SparkSQL同步Hbase数据到Hive表

spark 2.3.0

hive 3.0.0

hbase 2.0.0

常规操作 hbase数据同步到hive是通过再hive端建立hbase的映射表。

但是由于集群组件问题,建立的映射表不能进行

insert into A select * from hbase映射表

操作。报错!

org.apache.hadoop.hbase.client.RetriesExhaustedException: Can’t get the location for replica 0

at org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:332)

spark读取hbase数据形成RDD,构建schma信息,形成DF

通过sparkSQL 将df数据写入到指定的hive表格中。

hadoop本地环境版本一定要与依赖包版本保持一直,不然报如下错误

java.lang.IllegalArgumentException: Unrecognized Hadoop major version number: 3.1.1

hbase 1.X与2.X有很大差距,所以再看案例参考是一定要结合自己的hbase版本。

笔者程序编译中遇到

Cannot Resolve symbol TableInputFormat HBase找不到TableInputFormat

因为:新版本2.1.X版本的HBASE又把mapreduce.TableInputFormat单独抽取出来了

需要导入依赖

dependency

groupIdorg.apache.hbase/groupId

artifactIdhbase-mapreduce/artifactId

version${hbase.version}/version

/dependency

一定要把hbase相关的包都cp 到spark的jars文件下面。然后重启spark服务。

不然你会遇到此类错误

Class org.apache.hadoop.hive.hbase.HBaseSerDe not found

或者

java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/HBaseConfiguration

这些都是缺少jar包的表现。

spark2.1hbasejava的简单介绍

spark on hbase 读写

本文主要讲述了spark对hbase进行独写的两种方式,这两种方式分别为:

1.利用spark提供的 newAPIHadoopRDD api 对hbase进行读写

2.SparkOnHbase,这种方式其实是利用Cloudera-labs开源的一个HbaseContext的工具类来支持spark用RDD的方式批量读写hbase

hbase 表格式如下:

部分数据集如下:

文中的spark 的版本为2.3.2,hbase 的版本为1.2.6

因为hbase数据集的数据都是序列化的,所以spark 默认读取Hbase的数据时会报数据序列化的错误,不管哪种方式,在读取hbase数据之前,为spark配置序列化方式,如图所示:

主要是利用TableInputFormat,TableOutPutFormat的方式对hbase进行读写。

下边是对hbase进行读

运行结果如图:

通过maven 将hbase-spark jar 报 导入

由于hbase-spark 运用的spark 版本为1.6 而实际的spark 版本为2.3.2,所以执行spark 任务会报 没有 org.apache.spark.logging 类没有定义,这是因为 spark 2.3.2 这个类名已经改变,因此需要重新构造这个类并打成jar包放入到spark 的jar目录里即可

以下为读方式:

sparkOnHbase 对于第一种方式的优势在于:

1无缝的使用Hbase connection

2和Kerberos无缝集成

3通过get或者scan直接生成rdd

4利用RDD支持hbase的任何组合操作

5为通用操作提供简单的方法,同时通过API允许不受限制的未知高级操作

6支持java和scala

7为spark和 spark streaming提供相似的API

如何使用scala+spark读写hbase

如何使用scala+spark读写Hbase

软件版本如下:

scala2.11.8

spark2.1.0

hbase1.2.0

公司有一些实时数据处理的项目,存储用的是hbase,提供实时的检索,当然hbase里面存储的数据模型都是简单的,复杂的多维检索的结果是在es里面存储的,公司也正在引入Kylin作为OLAP的数据分析引擎,这块后续有空在研究下。

接着上面说的,hbase存储着一些实时的数据,前两周新需求需要对hbase里面指定表的数据做一次全量的update以满足业务的发展,平时操作hbase都是单条的curd,或者插入一个批量的list,用的都是hbase的java api比较简单,但这次涉及全量update,所以如果再用原来那种单线程的操作api,势必速度回慢上许多。

关于批量操作Hbase,一般我们都会用MapReduce来操作,这样可以大大加快处理效率,原来也写过MR操作Hbase,过程比较繁琐,最近一直在用scala做spark的相关开发,所以就直接使用scala+spark来搞定这件事了,当然底层用的还是Hbase的TableOutputFormat和TableOutputFormat这个和MR是一样的,在spark里面把从hbase里面读取的数据集转成rdd了,然后做一些简单的过滤,转化,最终在把结果写入到hbase里面。

整个流程如下:

(1)全量读取hbase表的数据

(2)做一系列的ETL

(3)把全量数据再写回hbase

核心代码如下:

//获取conf

val conf=HBaseConfiguration.create() //设置读取的表

conf.set(TableInputFormat.INPUT_TABLE,tableName) //设置写入的表

conf.set(TableOutputFormat.OUTPUT_TABLE,tableName)//创建sparkConf

val sparkConf=new SparkConf() //设置spark的任务名

sparkConf.setAppName(“read and write for hbase “) //创建spark上下文

val sc=new SparkContext(sparkConf)

//为job指定输出格式和输出表名

val newAPIJobConfiguration1 = Job.getInstance(conf)

newAPIJobConfiguration1.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, tableName)

newAPIJobConfiguration1.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

//全量读取hbase表

val rdd=sc.newAPIHadoopRDD(conf,classOf[TableInputFormat]

,classOf[ImmutableBytesWritable]

,classOf[Result]

)

//过滤空数据,然后对每一个记录做更新,并转换成写入的格式

val final_rdd= rdd.filter(checkNotEmptyKs).map(forDatas)

//转换后的结果,再次做过滤

val save_rdd=final_rdd.filter(checkNull)

//最终在写回hbase表

save_rdd.saveAsNewAPIHadoopDataset(newAPIJobConfiguration1.getConfiguration)

sc.stop()

从上面的代码可以看出来,使用spark+scala操作hbase是非常简单的。下面我们看一下,中间用到的几个自定义函数:

第一个:checkNotEmptyKs

作用:过滤掉空列簇的数据

def checkNotEmptyKs(f:((ImmutableBytesWritable,Result))):Boolean={ val r=f._2 val rowkey=Bytes.toString(r.getRow) val map:scala.collection.mutable.Map[Array[Byte],Array[Byte]]= r.getFamilyMap(Bytes.toBytes(“ks”)).asScala if(map.isEmpty) false else true

}

第二个:forDatas

作用:读取每一条数据,做update后,在转化成写入操作

def forDatas(f: (ImmutableBytesWritable,Result)): (ImmutableBytesWritable,Put)={ val r=f._2 //获取Result

val put:Put=new Put(r.getRow) //声明put

val ks=Bytes.toBytes(“ks”) //读取指定列簇

val map:scala.collection.mutable.Map[Array[Byte],Array[Byte]]= r.getFamilyMap(ks).asScala

map.foreach(kv={//遍历每一个rowkey下面的指定列簇的每一列的数据做转化

val kid= Bytes.toString(kv._1)//知识点id

var value=Bytes.toString(kv._2)//知识点的value值

value=”修改后的value”

put.addColumn(ks,kv._1,Bytes.toBytes(value))//放入put对象

}

) if(put.isEmpty) null else (new ImmutableBytesWritable(),put)

}

第三个:checkNull 作用:过滤最终结果里面的null数据

def checkNull(f:((ImmutableBytesWritable,Put))):Boolean={ if(f==null) false else true

}

上面就是整个处理的逻辑了,需要注意的是对hbase里面的无效数据作过滤,跳过无效数据即可,逻辑是比较简单的,代码量也比较少。

除了上面的方式,还有一些开源的框架,也封装了相关的处理逻辑,使得spark操作hbase变得更简洁,有兴趣的朋友可以了解下,github链接如下:

如何使用Spark/Scala读取Hbase的数据

使用Spark/Scala读取Hbase的数据 必须使用高亮参数启动Spark-shell,否则当你遍历RDD时会出现如下的Exception java.io.NotSerializableException: org.apache.hadoop.hbase.io.ImmutableBytesWritable spark-shell–conf spark.serializer=org.apache.spark.serializer.KryoSerializer 以下代码,经过MaprDB实测通过 import org.apache.spark._ import org.apache.spark.rdd.NewHadoopRDD import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} import org.apache.hadoop.hbase.client.HBaseAdmin import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HColumnDescriptor import org.apache.hadoop.hbase.util.Bytes import org如何使用Spark/Scala读取Hbase的数据

本文来自投稿,不代表【】观点,发布者:【

本文地址: ,如若转载,请注明出处!

举报投诉邮箱:253000106@qq.com

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2024年3月24日 13:00:44
下一篇 2024年3月24日 13:08:04

相关推荐

  • c语言mallloc使用的简单介绍

    C语言中使用malloc必须加#includemallo.h? 1、在C语言中使用malloc函数进行动态内存分配。malloc的全称是memory allocation,中文叫动态内存分配。原型:extern void malloc(unsigned int num_bytes);功能:分配长度为num_bytes字节的内存块。 2、你可以看一下C语言那本…

    2024年5月23日
    4500
  • javascriptcanvas的简单介绍

    如何使用js在画布上绘制图形 1、可以使用 drawImage()方法把一幅图像绘制到画布上。 以使用三种不同的参数组合。最简单的调用方式是传入一个 HTML 元素,以及绘制该图像的起点的 x 和 y 坐标。 2、效果图:使用JavaScript在画布中绘制文本图形首先我们来看看要在画布上绘制文本图形,需要用到的最重要的属性和方法:font属性:定义文本的字…

    2024年5月23日
    4200
  • cortexm4linux的简单介绍

    Cortex-M4的主要功能 Cortex-M4提供了无可比拟的功能,以将[1] 32位控制与领先的数字信号处理技术集成来满足需要很高能效级别的市场。 Cortex-M4核心具有浮点单元(FPU)单精度,支持所有Arm单精度数据处理指令和数据类型。它还实现了一套完整的DSP指令和一个提高应用程序安全性的内存保护单元(MPU)。 主要是m4比m3多了dsp的支…

    2024年5月23日
    4400
  • 3desjavaphp的简单介绍

    php的3des加密结果与java不一致 他们的加密算法都是通用的,是可以解开的,只要你des的模式,加密长度,初始向量什么的都一样就可以。 JAVA写RSA加密,私钥都是一样的,公钥每次加密的结果不一样跟对数据的padding(填充)有关。Padding(填充)属性定义元素边框与元素内容之间的空间。padding简写属性在一个声明中设置所有内边距属性。 要…

    2024年5月23日
    4900
  • 黑客代码软件学习推荐歌曲的简单介绍

    我想自学编程代码,,目地是“黑”网站,开发出破解代码。有没有这方面的… 这个迭代周期不应该以周为周期或以月为周期发生,而是应该以日为周期。知识等待使用的时间越久,知识这把斧头就越钝。等待学习新知识的时间越长,你就越难以将其融入到代码中。 我认为这个问题问得本身就显得有点矛盾,想学却担心自己看不懂代码学不来,试问哪个编程人员不是从零开始的。坚定信念…

    2024年5月23日
    4800
  • java8种基本类型范围的简单介绍

    java中常用的数据类型有哪些 1、java数据类型分为基本数据类型和引用数据类型,基本数据类型有boolean 、long 、int 、char、byte、short、double、float。引用数据类型有类类型、接口类型和数组类型。 2、java中包含的基本数据类型介绍:\x0d\x0aJava共支持8种内置数据类型。内置类型由Java语言预先定义好,…

    2024年5月23日
    4900
  • linux系统与gpt的关系的简单介绍

    linux下查看分区是不是gpt 看分区会报错,比如:WARNING: GPT (GUID Partition Table) detected on /dev/sda! The util fdisk doesnt support GPT. Use GNU Parted.所以这个sda就是gpt的。 选择“管理”;在“磁盘管理”中,右键“磁盘0”,在弹出的右键…

    2024年5月23日
    5400
  • 北京黑客学习培训的简单介绍

    现在学什么好啊 包括建筑设计、服装设计、珠宝首饰设计等,选择适合自己的设计专业,好好学习,就业容易且收入高。学前教育专业:很多女生喜欢小孩子,且女生一般细心且有耐心,教育行业假期宽裕,工作环境单纯,就业机会多。 现在比较热门好就业的专业有人工智能、机械专业、电子商务专业、人力资源专业、金融学专业、小语种类专业等等。人工智能:人工智能领域的研究包括机器人、语言…

    2024年5月23日
    4200
  • javaee要学那些东西的简单介绍

    java主要学习哪些内容 1、学java最重要的是下面四个内容:掌握Java语言的使用:语言语法、程序逻辑,OOP(面向对象)思想,封装、继承、多态,集合框架、泛型、File I\O技术,多线程技术、socket网络编程,XML技术。 2、Java基础:了解Java的基本语法、数据类型、控制流程、数组、字符串等基础概念。学习面向对象编程(OOP)的原则和概念…

    2024年5月23日
    4300
  • excel自杀的简单介绍

    excel表格中宏代码,具有自杀功能,密码输入错误3次就自动删除表格_百度… 我给楼主一个建议,利用excel自身的密码保护,保护此工作簿的结构,将sheet提前隐藏。这样如果有人将同一份excel打开时,如果不能输入正确的密码,就不能将sheet取消隐藏,就能达到楼主所说的保护了。 如果你还有源文件的话,可以这样试试:打开其他的excel文件,…

    2024年5月23日
    8100

发表回复

登录后才能评论



关注微信