离线数仓总结(上)
第一章 数据仓库的概念及架构选型
数据仓库(Data Warehouse),是为企业制定决策,提供数据支持的。可以帮助企业,改进业务流程、提高产品质量等。
1、数据仓库的输入数据
业务数据
就是各行业在处理事务过程中产生的数据。比如用户在电商网站中登录、下单、支付等过程中,需要和网站后台数据库进行增删改查交互,产生的数据就是业务数据。业务数据通常存储在MySQL、Oracle等数据库中。
用户行为数据
用户在使用产品过程中,通过埋点收集与客户端产品交互过程中产生的数据,并发往日志服务器进行保存。比如页面浏览、点击、停留、评论、点赞、收藏等。用户行为数据通常存储在日志文件中。
爬虫数据
通常是通过技术手段获取其他公司网站的数据。不建议同学们这样去做。
通常来说,业务数据就是后端持久化到数据库的日志记录,例如Mysql的Binlog日志;用户行为数据主要是前端进行埋点采集到的数据,嗯,简单让new bing来介绍下采集流程吧:
前端埋点到大数据采集的完整流程包括数据采集层、数据接入层、数据处理层、数据应用层这四个层次。在数据采集层,可以通过传统的埋点方式,在需要上报的位置组织数据、调用API、将数据传给后端,也可以通过“无埋点”概念,在底层hook所有的点击事件,自动采集全部事件并上报埋点数据。在数据接入层,可以通过Flume、Kafka等工具将数据接入到Hadoop生态圈中。在数据处理层,可以使用Hive、Spark SQL等工具进行数据清洗、转换和计算。在数据应用层,可以使用Tableau、Echarts等工具进行可视化展示。
(1) 一个埋点的求生之路——数据处理全流程解析 - 知乎. https://zhuanlan.zhihu.com/p/133090030
(2) 数据采集:埋点、采集、存储及分析 - Hider1214 - 博客园. https://www.cnblogs.com/hider/p/13967167.html
(3) 前端埋点简单实现方式 - 掘金. https://juejin.cn/post/7047710777507053582
2、项目框架
关于选用版本的详细解释,组件详细介绍单开一栏说明。
框架 | 版本 | 说明 |
---|---|---|
jdk | 1.8 | Java版本,Hadoop体系建议使用1.8,虽然兼容Java11但是Hive除了最新的4.0.0alpha版本以外,其他的都只支持到1.8。Java11以上不支持。 |
Hadoop | 3.1.3 | 选用视频配套版本,选择太旧的2.x可能出现依赖缺失,最新的3.4.3则存在依赖改名的问题 |
Zookeeper | 3.5.7 | |
Mysql | 5.7.16 | 可以选用8系列,但是不能低于5.7版本,5.7.8之后支持Json,同时对InnoDB和一些强政策进行了更改。详情可看数据库MySQL 5.7版本介绍 |
Hive | 3.1.2 | Hive和Spark以及Hadoop版本要互相兼容,采用自编译等方式可以自定义版本 |
Flume | 1.9.0 | 不建议选用2.0.0以上版本,配置文件有点改动(大概是.properties->.xml) |
Kafka | 3.0.0 | |
Spark | 3.0.0 | |
DataX | 3.0.0 | |
Superset | 1.4.2 | 推荐版本很搞,更新之后很多依赖都没了,不建议使用2.0.0,如果需要2.0.0+可以使用网上的一键部署版本,不然自己调整配置会很麻烦(需要一定conda、python知识) |
DolphinScheduler | 2.0.3 | |
Maxwell | 1.29.2 |
3、测试集群服务器规划
服务名称 | 子服务 | 服务器hadoop102 | 服务器hadoop1 | 服务器hadoop104 |
---|---|---|---|---|
HDFS | NameNode | √ | ||
DataNode | √ | √ | √ | |
SecondaryNameNode | √ | |||
Yarn | NodeManager | √ | √ | √ |
Resourcemanager | √ | |||
Zookeeper | Zookeeper Server | √ | √ | √ |
Flume(采集日志) | Flume | √ | √ | |
Kafka | Kafka | √ | √ | √ |
Flume(消费Kafka日志) | Flume | √ | ||
Flume(消费Kafka业务) | Flume | √ | ||
Hive | √ | √ | √ | |
MySQL | MySQL | √ | ||
DataX | √ | √ | √ | |
Spark | √ | √ | √ | |
DolphinScheduler | ApiApplicationServer | √ | ||
AlertServer | √ | |||
MasterServer | √ | |||
WorkerServer | √ | √ | √ | |
LoggerServer | √ | √ | √ | |
Superset | Superset | √ | ||
服务数总计 | 16 | 11 | 12 |
第二章 组件及常见问题
0、虚拟机
能上哪个版本的VMware就上那个版本的VMware,虚拟机版本选用对环境影响并不大,不同Linux发行版之间基本是配置相同,包管理工具不同罢了,值得注意的是:选用CentOS Stream 9的配置项会和CentOS7之类的差别较大,在配置CentOS Stream 9作为虚拟机的时候可以参考以下文章:CentOS Stream9 设置静态IP。
!!注意一下内存分配:HiveServer2启动查询功能大概率是将结果暂存在内存中,开8G很容易OOM,而DolphinScheduler更加离谱,全节点启动的情况下,8G内存甚至任务都无法执行,推荐配置16G、8G、8G,最低配置如下:
Hadoop102:8G+(我配的10G,结束时约空闲1~2G)
Hadoop103/Hadoop104:6G+(我配的8G,结束时约空闲1~3G)
0.1、Jdk
版本选择1.8,以所有组件中要求版本最高的那个作为适配版本,所有组件中Hive只能在1.8中启动,故选择1.8版本;当然如果想采用最新版本体验或是仍然想用更高级Java版本中的语法糖,那么可以安装多个Java,让Hive执行其他Java即可。
0.2、Mock模拟数据准备
(1)生成日志数据
这里使用文档中准备的Java程序执行即可,使用基本流程是:
- 依次启动Zookeeper、Kafka、Flume(采集日志);
- 执行Java程序,在选定目录下生成Json日志;
- Flume将生成的日志数据采集到HDFS上。
(2)生成业务数据
这里使用文档中准备的Java程序执行即可,使用基本流程是:
- 修改application.properties中的mock.clear和mock.clear.user数据
- 执行Java程序,在MySQL数据库中生成业务数据
0.3、Source环境总览
1 | JAVA_HOME |
1、Hadoop
Hadoop是一个由Apache基金会所开发的分布式系统基础架构,是一个存储系统 + 计算框架的软件框架。主要解决海量数据存储与计算的问题,是大数据技术中的基石。Hadoop以一种可靠、高效、可伸缩的方式进行数据处理,用户可以在不了解分布式底层细节的情况下,开发出分布式程序。
(1)配置流程:
核心配置文件core-site.xml
- 配置NameNode地址
- 指定Hadoop数据存储目录
- 配置HDFS网页登录使用的静态用户
- 配置用户允许通过代理访问的主机节点
- 配置用户允许通过代理用户所属组
- 配置用户允许通过代理的用户
HDFS配置文件hdfs-site.xml
- 配置NameNode Web端访问端口
- 配置Secondary NameNode Web端访问端口
- 指定HDFS副本的数量
YARN配置文件yarn-site.xml
- 指定MapReduce走shuffle
- 指定ResourceManager地址
- 配置环境变量继承
- 配置YARN单个容器允许分配的最大最小内存
- 配置YARN容器允许管理的物理内存大小
- 关闭YARN对物理内存和虚拟内存的限制检查
- 开启日志聚集功能
- 设置日志聚集服务器地址
- 设置日志保留时间为7天
MapReduce配置文件mapred-site.xml
- 指定MapReduce程序运行在YARN上
- 配置历史服务器端地址
- 配置历史服务器Web端地址
配置workers(集群)
分发到所有集群机器上
初始化NameNode节点
1 | hdfs namenode -format |
(2)常见启停命令
1 | start-dfs.sh # 启动HDFS |
(3)参数调优
hdfs-site.xml
- NameNode有一个工作线程池,用来处理不同DataNode的并发心跳以及客户端并发的元数据操作。对于大集群或者有大量客户端的集群来说,通常需要增大参数dfs.namenode.handler.count的默认值10。
1 | <property> |
$$
dfs.namenode.handler.count=20 × log_e^ClusterSize, 比如集群规模为8台时,此参数设置为41。
$$
yarn-site.xml
数据统计主要用HiveSQL,没有数据倾斜,小文件已经做了合并处理,开启的JVM重用,而且IO没有阻塞,内存用了不到50%。但是还是跑的非常慢,而且数据量洪峰过来时,整个集群都会宕掉。
内存利用率不够。这个一般是Yarn的2个配置造成的,单个任务可以申请的最大内存大小,和Hadoop单个节点可用内存大小。调节这两个参数能提高系统内存的利用率。
1
2
3
4(a)yarn.nodemanager.resource.memory-mb
表示该节点上YARN可使用的物理内存总量,默认是8192(MB),注意,如果你的节点内存资源不够8GB,则需要调减小这个值,而YARN不会智能的探测节点的物理内存总量。
(b)yarn.scheduler.maximum-allocation-mb
单个任务可申请的最多物理内存量,默认是8192(MB)。
2、Zookeeper
ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,它包含一个简单的原语集,分布式应用程序可以基于它实现同步服务,配置维护和命名服务等。ZooKeeper作为一个分布式的服务框架,主要用来解决分布式集群中应用系统的一致性问题,它能提供基于类似于文件系统的目录节点树方式的数据存储,Zookeeper作用主要是用来维护和监控存储的数据的状态变化,通过监控这些数据状态的变化,从而达到基于数据的集群管理。
(1)配置流程
配置myid文件,每个服务器上的myid是唯一编号,代表单独的Zookeeper server。
配置*/conf目录下的zoo_sample.cfg文件
改名,去掉sample启用配置
修改数据存储路径,配置到myid对应的目录
增加server.*配置。例:
1
server.2=hadoop102:2888:3888 # .2就是myid中的server编号
同步配置到其他服务器(一定记得修改myid)
(2)常见启停脚本
1 | zkServer.sh start # 启动Zookeeper |
3、Kafka
Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。Kafka最初由LinkedIn公司发布,使用Scala语言编写,与2010年12月份开源,成为Apache的顶级子项目。
Kafka是一种高吞吐量、持久性、分布式的发布订阅的消息队列系统。它可以在廉价的PC Server上搭建起大规模消息系统。
(1)配置流程
配置*/config/目录下的server.properties文件
1
2
3
4
5
6#broker的全局唯一编号,不能重复,只能是数字。每个服务器上的数字不能一样。
broker.id=0
#kafka运行日志(数据)存放的路径,路径不需要提前创建,kafka自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
log.dirs=/opt/module/kafka/datas
#配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便管理)
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka分发软件包,记得修改broker.id
(2)常见启停脚本
1 | 配置过zookeeper.connect之后需要先启动Zookeeper服务 |
(3)常见命令行操作
主题命令操作
Kafka中的数据单元是消息,可以把它当作数据库里的一行“数据”或者一条“记录”来理解。Kafka通过主题来进行分类,主题就好比数据库中的表,每个主题包含多个分区,分区可以分布在不同的服务器上,也就是说通过这种方式来实现分布式数据的存储和读取。
主题是Kafka中的基础概念,是一切消息处理的基础。主题属于Kafka元数据的一部分,会存储在Zookeeper中。生产者发布消息到某一特定主题上,由消费者去消费特定主题的消息。消费者可以订阅一个或多个主题。
1
kafka-topics.sh # 查看操作命令主题
参数 | 描述 |
---|---|
–bootstrap-server <String: server toconnect to> | 连接的Kafka Broker主机名称和端口号。 |
–topic <String: topic> | 操作的topic名称。 |
–create | 创建主题。 |
–delete | 删除主题。 |
–alter | 修改主题。 |
–list | 查看所有主题。 |
–describe | 查看主题详细描述。 |
–partitions <Integer: # of partitions> | 设置分区数。 |
–replication-factor<Integer: replication factor> | 设置分区副本。 |
–config <String: name=value> | 更新系统默认的配置。 |
生产者命令操作
- Kafka生产者是指向Kafka集群发送消息的客户端应用程序。生产者将消息封装成一个ProducerRecord向Kafka集群中的某个主题发送消息。发送的消息首先会经过序列化器进行序列化,以便在网络中传输。发送的消息需要经过分区器来决定该消息会分发到主题对应的分区,当然如果指定了分区,那么就不需要分区器来决定。
1
kafka-console-producer.sh # 查看操作生产者
参数 | 描述 |
---|---|
–bootstrap-server <String: server toconnect to> | 连接的Kafka Broker主机名称和端口号。 |
–topic <String: topic> | 操作的topic名称。 |
消费者命令操作
- Kafka消费者是指从Kafka集群中读取消息的客户端应用程序。消费者将从一个或多个主题订阅消息,并在消费者组中进行组织。当多个消费者形成一个消费组来消费主题时,每个消费者会收到不同分区的消息。消费者可以控制消息的读取位置,从而实现对消息的重复读取或跳过。
1
kafka-console-consumer.sh # 查看操作消费者
参数 | 描述 |
---|---|
–bootstrap-server <String: server toconnect to> | 连接的Kafka Broker主机名称和端口号。 |
–topic <String: topic> | 操作的topic名称。 |
–from-beginning | 从头开始消费。 |
–group <String: consumer group id> | 指定消费者组名称。 |
4、Flume
Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。它的主要作用是将数据从各种数据源(如Web服务器)收集到Hadoop的HDFS中,或者将数据从一个地方传输到另一个地方。Flume的核心概念包括:Source、Channel和Sink。Source是数据源,可以是Avro、Thrift、JMS、Netcat、Exec等;Channel是缓存区,用于存储Source产生的数据;Sink是数据目的地,可以是HDFS、HBase、Solr、Elasticsearch等。
- 删除lib目录下的guava.jar以兼容Hadoop
(1)整体流程
日志采集Flume需要采集日志文件内容,并对日志格式(JSON)进行校验,然后将校验通过的日志发送到Kafka。此处可选择TaildirSource和KafkaChannel,并配置日志校验拦截器。
- TailDirSource相比ExecSource、SpoolingDirectorySource的优势
- TailDirSource:断点续传、多目录。Flume1.6以前需要自己自定义Source记录每次读取文件位置,实现断点续传。
- ExecSource可以实时搜集数据,但是在Flume不运行或者Shell命令出错的情况下,数据将会丢失。
- SpoolingDirectorySource监控目录,支持断点续传。
- Kafka Channel
- 采用Kafka Channel,省去了Sink,提高了效率。
Flume执行job实现日志的采集,因此无需就行过多的原生配置,只需要针对每个任务写对应的job即可。
(2)配置实操
1 | # job/file_to_kafka.conf |
1 | flume-ng agent -n a1 -c conf/ -f job/file_to_kafka.conf -Dflume.root.logger=info,console |
5、MySQL
配置流程
- 安装MySQL依赖
- 安装mysql-client
- 安装mysql-server
- 配置MySQL
- 设置复杂密码
- 更改MySQL密码策略
- 设置简单密码
6、Maxwell
Maxwell是一个能实时读取MySQL二进制日志binlog,并生成 JSON 格式的消息,作为生产者发送给 Kafka、Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平台的应用程序³。它的主要作用是将MySQL数据库中的数据变更事件转换成JSON格式的消息,以便于在分布式系统中进行数据同步。Maxwell可以通过配置文件来指定需要同步的表,以及需要过滤掉的字段等信息。
原理:将自己伪装成slave,并遵循MySQL主从复制的协议,从master同步数据。
(1)配置流程
修改MySQL配置,启用Binglog日志(默认不开启)后重启MySQL服务。
1
2
3
4
5
6
7
8
9
10[mysqld]
#数据库id
server-id = 1
#启动binlog,该参数的值会作为binlog的文件名
log-bin=mysql-bin
#binlog类型,maxwell要求为row类型
binlog_format=row
#启用binlog的数据库,需根据实际情况作出修改
binlog-do-db=gmall几种常见的Binlog模式
Statement-based:基于语句,Binlog会记录所有写操作的SQL语句,包括insert、update、delete等。
优点: 节省空间
缺点: 有可能造成数据不一致,例如insert语句中包含now()函数。
Row-based:基于行,Binlog会记录每次写操作后被操作行记录的变化。
优点:保持数据的绝对一致性。
缺点:占用较大空间。
mixed:混合模式,默认是Statement-based,如果SQL语句可能导致数据不一致,就自动切换到Row-based。
Maxwell要求Binlog采用Row-based模式。
配置maxwell目录下的config.properties.example文件(元数据库、生产者、集群配置)
- 重命名为config.properties
1
2
3
4
5
6
7
8
9
10
11
12#Maxwell数据发送目的地,可选配置有stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
producer=kafka
#目标Kafka集群地址
kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092
#目标Kafka topic,可静态配置,例如:maxwell,也可动态配置,例如:%{database}_%{table}
kafka_topic=maxwell
#MySQL相关配置
host=hadoop102
user=maxwell
password=maxwell
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai
(2)常见启停脚本
1 | maxwell --config /opt/module/maxwell/config.properties --daemon # 启动maxwell |
(3)增量同步和全量同步
增量同步:启用Kafka消费者、maxwell,Mock业务数据即可在Kafka中观察到生成的数据;
全量同步:
- Maxwell提供了bootstrap功能来进行历史数据的全量同步,输出结果为Json格式
1
2maxwell-bootstrap --database gmall --table user_info --config /opt/module/maxwell/config.properties
这段代码是在运行maxwell-bootstrap命令,它会将gmall数据库中的user_info表的数据导入到另一个数据库中。其中,--config /opt/module/maxwell/config.properties是指定了配置文件的路径。
7、DataX
DataX 是阿里巴巴开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。源码地址:https://github.com/alibaba/DataX
为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。
(1)配置流程
解压自检
1
2
3
4
5
6
7
8
9
10
11
12python /opt/module/datax/bin/datax.py /opt/module/datax/job/job.json
出现如下反馈代表安装成功
……
2021-10-12 21:51:12.335 [job-0] INFO JobContainer -
任务启动时刻 : 2021-10-12 21:51:02
任务结束时刻 : 2021-10-12 21:51:12
任务总计耗时 : 10s
任务平均流量 : 253.91KB/s
记录写入速度 : 10000rec/s
读出记录总数 : 100000
读写失败总数 : 0
(2)配置说明
配置文件模板为json,json最外层是一个job,job包含setting和content两部分,其中setting用于对整个job进行配置,content用户配置数据源和目的地。
(3)DataX优化
DataX3.0提供了包括通道(并发)、记录流、字节流三种流控模式,可以随意控制你的作业速度,让你的作业在数据库可以承受的范围内达到最佳的同步速度。
参数 | 说明 |
---|---|
job.setting.speed.channel | 并发数 |
job.setting.speed.record | 总record限速 |
job.setting.speed.byte | 总byte限速 |
core.transport.channel.speed.record | 单个channel的record限速,默认值为10000(10000条/s) |
core.transport.channel.speed.byte | 单个channel的byte限速,默认值1024*1024(1M/s) |
注意事项:
1.若配置了总record限速,则必须配置单个channel的record限速
2.若配置了总byte限速,则必须配置单个channe的byte限速
3.若配置了总record限速和总byte限速,channel并发数参数就会失效。因为配置了总record限速和总byte限速之后,实际channel并发数是通过计算得到的:
计算公式为:
min(总byte限速/单个channel的byte限速,总record限速/单个channel的record限速)
8、Hive
Hive是一个基于Hadoop的数据仓库工具,用于处理结构化数据。它可以将结构化的数据文件映射为一张数据库表,并提供类SQL查询功能。Hive的操作接口采用类SQL语法,提供快速开发的能力,避免了写MapReduce,减少开发人员的学习成本,功能扩展很方便。
你可以使用Hive来进行数据提取、转化、加载,这是一种可以存储、查询和分析存储在Hadoop中的大规模数据的机制。你可以在Hive中使用类SQL语句来查询数据,这些语句会被转换成MapReduce任务来执行。你需要安装Hive并配置它以便使用它。你可以在网上找到很多关于如何使用Hive的教程和指南。
配置流程
解决日志包冲突,删除lib下的log4j-slf4j-impl.jar包
拷贝对应版本的mysql驱动到lib目录 mysql-connector-java-5.1.27-bin.jar
新建hive-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
<configuration>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://hadoop102:3306/metastore?useSSL=false&useUnicode=true&characterEncoding=UTF-8</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>000000</value>
</property>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive/warehouse</value>
</property>
<property>
<name>hive.metastore.schema.verification</name>
<value>false</value>
</property>
<property>
<name>hive.server2.thrift.port</name>
<value>10000</value>
</property>
<property>
<name>hive.server2.thrift.bind.host</name>
<value>hadoop102</value>
</property>
<property>
<name>hive.metastore.event.db.notification.api.auth</name>
<value>false</value>
</property>
<property>
<name>hive.cli.print.header</name>
<value>true</value>
</property>
<property>
<name>hive.cli.print.current.db</name>
<value>true</value>
</property>
</configuration>在MySQL创建Hive元数据库metastore
初始化Hive元数据库
1
schematool -initSchema -dbType mysql -verbose
修改元数据库字符集
- Hive元数据库的字符集默认为Latin1,由于其不支持中文字符,故若建表语句中包含中文注释,会出现乱码现象。修改Hive元数据库中存储注释的字段的字符集为utf-8。
1
2alter table COLUMNS_V2 modify column COMMENT varchar(256) character set utf8;
alter table TABLE_PARAMS modify column PARAM_VALUE mediumtext character set utf8;
9、Spark
Spark是一个快速的、通用的、基于内存的分布式计算系统,用于大规模数据处理。它是UC Berkeley AMP lab所开源的类Hadoop MapReduce的通用并行框架,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出和结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。
(1)兼容性说明
官网下载的Hive3.1.2和Spark3.0.0默认是不兼容的。因为Hive3.1.2支持的Spark版本是2.4.5,所以需要我们重新编译Hive3.1.2版本。
编译步骤:官网下载Hive3.1.2源码,修改pom文件中引用的Spark版本为3.0.0,如果编译通过,直接打包获取jar包。如果报错,就根据提示,修改相关方法,直到不报错,打包获取jar包。
这里我试过了hive4.0.0alpha去兼容jdk11,但是无疾而终(默认启动的是beeline不是hive,这个时候我还没学hive,不太好大刀阔斧上手)。spark也试了从3.0.0到最新版中的几个版本,但是hive没有自编译,所以总会报错。
另外,hadoop版本也要考虑,Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.spark.SparkTask. RPC channel is closed.这个问题在YARN日志报的$GetFileInfoRequestProto无法强制转换为com.google.protobuf.Message,主要就是hadoop版本导致的。
(2)配置流程
在hive中创建spark配置文件
1
2
3
4
5
6# spark-defaults.conf
spark.master yarn
spark.eventLog.enabled true
spark.eventLog.dir hdfs://hadoop102:8020/spark-history
spark.executor.memory 1g
spark.driver.memory 1g修改hive-site.xml文件,更改为spark引擎
1
2
3
4
5
6
7
8
9
10
11<!--Spark依赖位置(注意:端口号8020必须和namenode的端口号一致)-->
<property>
<name>spark.yarn.jars</name>
<value>hdfs://hadoop102:8020/spark-jars/*</value>
</property>
<!--Hive执行引擎-->
<property>
<name>hive.execution.engine</name>
<value>spark</value>
</property>增加Application Master资源比例
- 修改YARN配置文件yarn-site.xml
1
2
3
4<property>
<name>yarn.scheduler.capacity.maximum-am-resource-percent</name>
<value>0.8</value>
</property
10、DolphinScheduler
Apache DolphinScheduler是一个分布式、易扩展的可视化DAG工作流任务调度平台。致力于解决数据处理流程中错综复杂的依赖关系,使调度系统在数据处理流程中开箱即用。
(1)核心架构
MasterServer采用分布式无中心设计理念,MasterServer主要负责 DAG 任务切分、任务提交、任务监控,并同时监听其它MasterServer和WorkerServer的健康状态。
WorkerServer也采用分布式无中心设计理念,WorkerServer主要负责任务的执行和提供日志服务。
ZooKeeper服务,系统中的MasterServer和WorkerServer节点都通过ZooKeeper来进行集群管理和容错。
Alert服务,提供告警相关服务。
API接口层,主要负责处理前端UI层的请求。
UI,系统的前端页面,提供系统的各种可视化操作界面。
(2)前置准备
三台节点均需部署JDK(1.8+),并配置相关环境变量。
需部署数据库,支持MySQL(5.7+)或者PostgreSQL(8.2.15+)。如 MySQL 则需要 JDBC Driver 8.0.16。
需部署Zookeeper(3.4.6+)。
如果启用 HDFS 文件系统,则需要 Hadoop(2.6+)环境。
三台节点均需安装进程管理工具包psmisc。
(3)配置流程
配置一键部署脚本
修改解压目录下的conf/config目录下的install_config.conf文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65ips="hadoop102,hadoop103,hadoop104"
# 将要部署任一 DolphinScheduler 服务的服务器主机名或 ip 列表
masters="hadoop102"
# master 所在主机名列表,必须是 ips 的子集
workers="hadoop102:default,hadoop103:default,hadoop104:default"
# worker主机名及队列,此处的 ip 必须在 ips 列表中
alertServer="hadoop102"
# 告警服务所在服务器主机名
apiServers="hadoop102"
# api服务所在服务器主机名
installPath="/opt/module/dolphinscheduler"
# DS 安装路径,如果不存在会创建
deployUser="atguigu"
# 部署用户,任务执行服务是以 sudo -u {linux-user} 切换不同 Linux 用户的方式来实现多租户运行作业,因此该用户必须有免密的 sudo 权限。
javaHome="/opt/module/jdk"
# JAVA_HOME 路径
DATABASE_TYPE="mysql"
# 数据库类型
SPRING_DATASOURCE_URL="jdbc:mysql://hadoop102:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8"
# 数据库 URL
SPRING_DATASOURCE_USERNAME="dolphinscheduler"
# 数据库用户名
SPRING_DATASOURCE_PASSWORD="dolphinscheduler"
# 数据库密码
registryPluginName="zookeeper"
# 注册中心插件名称,DS 通过注册中心来确保集群配置的一致性
registryServers="hadoop102:2181,hadoop103:2181,hadoop104:2181"
# 注册中心地址,即 Zookeeper 集群的地址
registryNamespace="dolphinscheduler"
# DS 在 Zookeeper 的结点名称
resourceStorageType="HDFS"
# 资源存储类型
resourceUploadPath="/dolphinscheduler"
# 资源上传路径
defaultFS="hdfs://hadoop102:8020"
# 默认文件系统
resourceManagerHttpAddressPort="8088"
# yarn RM http 访问端口
yarnHaIps=
# Yarn RM 高可用 ip,若未启用 RM 高可用,则将该值置空
singleYarnIp="hadoop103"
# Yarn RM 主机名,若启用了 HA 或未启用 RM,保留默认值
hdfsRootUser="atguigu"
# 拥有 HDFS 根目录操作权限的用户
初始化数据库
创建数据库
1
CREATE DATABASE dolphinscheduler DEFAULT CHARACTER SET utf8 DEFAULT COLLATE utf8_general_ci;
创建用户
1
CREATE USER 'dolphinscheduler'@'%' IDENTIFIED BY 'dolphinscheduler';
赋予用户相应权限
1
2GRANT ALL PRIVILEGES ON dolphinscheduler.* TO 'dolphinscheduler'@'%';
flush privileges;拷贝MySQL驱动到DolphinScheduler的解压目录下的lib中
执行数据库初始化脚本
1
script/create-dolphinscheduler.sh
(3)常见启停脚本
1 | ./install.sh # 一键部署并启动 |
对于每个节点的配置在部署后的目录中,针对每个进程都有一个conf文件
11、Superset
强烈斥责Superset的开发人员,兼容性屌差,我曾经尝试过用Docker、Miniconda安装过Superset,但是安装成功的次数寥寥无几,所以在这里着重介绍下。尤其我大一的时候,那时候B站UP主鱼皮还介绍过十分钟搭建Superset,结果放到现在这种方式不太行了(看着官方文档还是可以,但是那个依赖版本维护极差,对小白其实很不友好,尤其当时我还没学数据库),现在Superset最新版都2.0.0+了,不胜唏嘘啊。
(1)安装流程
安装Miniconda
创建虚拟环境
- 配置conda国内镜像
- 创建Python3.8环境
安装依赖
1
sudo yum install -y gcc gcc-c++ libffi-devel python-devel python-pip python-wheel python-setuptools openssl-devel cyrus-sasl-devel openldap-devel
安装(更新)setuptools和pip
1
pip install --upgrade setuptools pip -i https://pypi.douban.com/simple/
指定安装版本:
1
pip install apache-superset==1.4.2 -i https://pypi.douban.com/simple/
依赖降级:
1
2
3
4
5
6pip uninstall werkzeyg
pip uninstall markupsafe
pip install markupsafe==2.0.1
pip install flask==1.1.2
pip install Werkzeug==1.0.1
pip install --upgrade cryptography==3.2初始化数据库之前先执行:export FLASK_APP=superset
初始化Superset
1
superset db upgrade
安装gunicorn
1
pip install gunicorn -i https://pypi.douban.com/simple/
(2)常见启停脚本
1 | gunicorn --workers 5 --timeout 120 --bind hadoop102:8787 "superset.app:create_app()" --daemon |
(3)安装MySQL依赖项
1 | conda install mysqlclient |
对接不同的数据源,需安装不同的依赖,以下地址为官网说明。
https://superset.apache.org/docs/databases/installing-database-drivers