第一章 数据仓库的概念及架构选型

数据仓库(Data Warehouse),是为企业制定决策,提供数据支持的。可以帮助企业,改进业务流程、提高产品质量等。

1、数据仓库的输入数据

业务数据

就是各行业在处理事务过程中产生的数据。比如用户在电商网站中登录、下单、支付等过程中,需要和网站后台数据库进行增删改查交互,产生的数据就是业务数据。业务数据通常存储在MySQL、Oracle等数据库中。

用户行为数据

用户在使用产品过程中,通过埋点收集与客户端产品交互过程中产生的数据,并发往日志服务器进行保存。比如页面浏览、点击、停留、评论、点赞、收藏等。用户行为数据通常存储在日志文件中。

爬虫数据

通常是通过技术手段获取其他公司网站的数据。不建议同学们这样去做。

通常来说,业务数据就是后端持久化到数据库的日志记录,例如Mysql的Binlog日志;用户行为数据主要是前端进行埋点采集到的数据,嗯,简单让new bing来介绍下采集流程吧:

前端埋点到大数据采集的完整流程包括数据采集层、数据接入层、数据处理层、数据应用层这四个层次。在数据采集层,可以通过传统的埋点方式,在需要上报的位置组织数据、调用API、将数据传给后端,也可以通过“无埋点”概念,在底层hook所有的点击事件,自动采集全部事件并上报埋点数据。在数据接入层,可以通过Flume、Kafka等工具将数据接入到Hadoop生态圈中。在数据处理层,可以使用Hive、Spark SQL等工具进行数据清洗、转换和计算。在数据应用层,可以使用Tableau、Echarts等工具进行可视化展示。

(1) 一个埋点的求生之路——数据处理全流程解析 - 知乎. https://zhuanlan.zhihu.com/p/133090030

(2) 数据采集:埋点、采集、存储及分析 - Hider1214 - 博客园. https://www.cnblogs.com/hider/p/13967167.html

(3) 前端埋点简单实现方式 - 掘金. https://juejin.cn/post/7047710777507053582

2、项目框架

关于选用版本的详细解释,组件详细介绍单开一栏说明。

框架 版本 说明
jdk 1.8 Java版本,Hadoop体系建议使用1.8,虽然兼容Java11但是Hive除了最新的4.0.0alpha版本以外,其他的都只支持到1.8。Java11以上不支持。
Hadoop 3.1.3 选用视频配套版本,选择太旧的2.x可能出现依赖缺失,最新的3.4.3则存在依赖改名的问题
Zookeeper 3.5.7
Mysql 5.7.16 可以选用8系列,但是不能低于5.7版本,5.7.8之后支持Json,同时对InnoDB和一些强政策进行了更改。详情可看数据库MySQL 5.7版本介绍
Hive 3.1.2 Hive和Spark以及Hadoop版本要互相兼容,采用自编译等方式可以自定义版本
Flume 1.9.0 不建议选用2.0.0以上版本,配置文件有点改动(大概是.properties->.xml)
Kafka 3.0.0
Spark 3.0.0
DataX 3.0.0
Superset 1.4.2 推荐版本很搞,更新之后很多依赖都没了,不建议使用2.0.0,如果需要2.0.0+可以使用网上的一键部署版本,不然自己调整配置会很麻烦(需要一定conda、python知识)
DolphinScheduler 2.0.3
Maxwell 1.29.2

3、测试集群服务器规划

服务名称 子服务 服务器hadoop102 服务器hadoop1 服务器hadoop104
HDFS NameNode
DataNode
SecondaryNameNode
Yarn NodeManager
Resourcemanager
Zookeeper Zookeeper Server
Flume(采集日志) Flume
Kafka Kafka
Flume(消费Kafka日志) Flume
Flume(消费Kafka业务) Flume
Hive
MySQL MySQL
DataX
Spark
DolphinScheduler ApiApplicationServer
AlertServer
MasterServer
WorkerServer
LoggerServer
Superset Superset
服务数总计 16 11 12

第二章 组件及常见问题

0、虚拟机

能上哪个版本的VMware就上那个版本的VMware,虚拟机版本选用对环境影响并不大,不同Linux发行版之间基本是配置相同,包管理工具不同罢了,值得注意的是:选用CentOS Stream 9的配置项会和CentOS7之类的差别较大,在配置CentOS Stream 9作为虚拟机的时候可以参考以下文章:CentOS Stream9 设置静态IP

!!注意一下内存分配:HiveServer2启动查询功能大概率是将结果暂存在内存中,开8G很容易OOM,而DolphinScheduler更加离谱,全节点启动的情况下,8G内存甚至任务都无法执行,推荐配置16G、8G、8G,最低配置如下:

Hadoop102:8G+(我配的10G,结束时约空闲1~2G)

Hadoop103/Hadoop104:6G+(我配的8G,结束时约空闲1~3G)

0.1、Jdk

版本选择1.8,以所有组件中要求版本最高的那个作为适配版本,所有组件中Hive只能在1.8中启动,故选择1.8版本;当然如果想采用最新版本体验或是仍然想用更高级Java版本中的语法糖,那么可以安装多个Java,让Hive执行其他Java即可。

0.2、Mock模拟数据准备

(1)生成日志数据

这里使用文档中准备的Java程序执行即可,使用基本流程是:

  • 依次启动Zookeeper、Kafka、Flume(采集日志);
  • 执行Java程序,在选定目录下生成Json日志;
  • Flume将生成的日志数据采集到HDFS上。
(2)生成业务数据

这里使用文档中准备的Java程序执行即可,使用基本流程是:

  • 修改application.properties中的mock.clear和mock.clear.user数据
  • 执行Java程序,在MySQL数据库中生成业务数据

0.3、Source环境总览

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#JAVA_HOME
export JAVA_HOME=/opt/module/jdk
export PATH=$PATH:$JAVA_HOME/bin

#HADOOP_HOME
export HADOOP_HOME=/opt/module/hadoop
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin

#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin

#HIVE_HOME
export HIVE_HOME=/opt/module/hive
export PATH=$PATH:$HIVE_HOME/bin

# SPARK_HOME
export SPARK_HOME=/opt/module/spark
export PATH=$PATH:$SPARK_HOME/bin

1、Hadoop

Hadoop是一个由Apache基金会所开发的分布式系统基础架构,是一个存储系统 + 计算框架的软件框架。主要解决海量数据存储与计算的问题,是大数据技术中的基石。Hadoop以一种可靠、高效、可伸缩的方式进行数据处理,用户可以在不了解分布式底层细节的情况下,开发出分布式程序。

(1)配置流程:
  • 核心配置文件core-site.xml

    • 配置NameNode地址
    • 指定Hadoop数据存储目录
    • 配置HDFS网页登录使用的静态用户
    • 配置用户允许通过代理访问的主机节点
    • 配置用户允许通过代理用户所属组
    • 配置用户允许通过代理的用户
  • HDFS配置文件hdfs-site.xml

    • 配置NameNode Web端访问端口
    • 配置Secondary NameNode Web端访问端口
    • 指定HDFS副本的数量
  • YARN配置文件yarn-site.xml

    • 指定MapReduce走shuffle
    • 指定ResourceManager地址
    • 配置环境变量继承
    • 配置YARN单个容器允许分配的最大最小内存
    • 配置YARN容器允许管理的物理内存大小
    • 关闭YARN对物理内存和虚拟内存的限制检查
    • 开启日志聚集功能
    • 设置日志聚集服务器地址
    • 设置日志保留时间为7天
  • MapReduce配置文件mapred-site.xml

    • 指定MapReduce程序运行在YARN上
    • 配置历史服务器端地址
    • 配置历史服务器Web端地址
  • 配置workers(集群)

  • 分发到所有集群机器上

  • 初始化NameNode节点

1
hdfs namenode -format
(2)常见启停命令
1
2
3
start-dfs.sh                        # 启动HDFS
start-yarn.sh # 启动YARN
mapred --daemon start historyserver # 启动历史日志采集
(3)参数调优
  • hdfs-site.xml

    • NameNode有一个工作线程池,用来处理不同DataNode的并发心跳以及客户端并发的元数据操作。对于大集群或者有大量客户端的集群来说,通常需要增大参数dfs.namenode.handler.count的默认值10。
1
2
3
4
<property>
<name>dfs.namenode.handler.count</name>
<value>10</value>
</property>

$$
dfs.namenode.handler.count=20 × log_e^ClusterSize, 比如集群规模为8台时,此参数设置为41。
$$

  • yarn-site.xml

    • 数据统计主要用HiveSQL,没有数据倾斜,小文件已经做了合并处理,开启的JVM重用,而且IO没有阻塞,内存用了不到50%。但是还是跑的非常慢,而且数据量洪峰过来时,整个集群都会宕掉。

    • 内存利用率不够。这个一般是Yarn的2个配置造成的,单个任务可以申请的最大内存大小,和Hadoop单个节点可用内存大小。调节这两个参数能提高系统内存的利用率。

      1
      2
      3
      4
      (a)yarn.nodemanager.resource.memory-mb
      表示该节点上YARN可使用的物理内存总量,默认是8192(MB),注意,如果你的节点内存资源不够8GB,则需要调减小这个值,而YARN不会智能的探测节点的物理内存总量。
      (b)yarn.scheduler.maximum-allocation-mb
      单个任务可申请的最多物理内存量,默认是8192(MB)。

2、Zookeeper

ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,它包含一个简单的原语集,分布式应用程序可以基于它实现同步服务,配置维护和命名服务等。ZooKeeper作为一个分布式的服务框架,主要用来解决分布式集群中应用系统的一致性问题,它能提供基于类似于文件系统的目录节点树方式的数据存储,Zookeeper作用主要是用来维护和监控存储的数据的状态变化,通过监控这些数据状态的变化,从而达到基于数据的集群管理。

(1)配置流程
  • 配置myid文件,每个服务器上的myid是唯一编号,代表单独的Zookeeper server。

  • 配置*/conf目录下的zoo_sample.cfg文件

    • 改名,去掉sample启用配置

    • 修改数据存储路径,配置到myid对应的目录

    • 增加server.*配置。例:

      1
      server.2=hadoop102:2888:3888 # .2就是myid中的server编号
  • 同步配置到其他服务器(一定记得修改myid)

(2)常见启停脚本
1
2
3
zkServer.sh start  # 启动Zookeeper
zkServer.sh status # 查看Zookeeper状态
zkServer.sh stop # 停止Zookeeper进程

3、Kafka

Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。Kafka最初由LinkedIn公司发布,使用Scala语言编写,与2010年12月份开源,成为Apache的顶级子项目。

Kafka是一种高吞吐量、持久性、分布式的发布订阅的消息队列系统。它可以在廉价的PC Server上搭建起大规模消息系统。

(1)配置流程
  • 配置*/config/目录下的server.properties文件

    1
    2
    3
    4
    5
    6
    #broker的全局唯一编号,不能重复,只能是数字。每个服务器上的数字不能一样。
    broker.id=0
    #kafka运行日志(数据)存放的路径,路径不需要提前创建,kafka自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
    log.dirs=/opt/module/kafka/datas
    #配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便管理)
    zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
  • 分发软件包,记得修改broker.id

(2)常见启停脚本
1
2
3
# 配置过zookeeper.connect之后需要先启动Zookeeper服务
kafka-server-start.sh -daemon config/server.properties # 根据配置文件启动Kafka
kafka-server-stop.sh # 关闭Kafka
(3)常见命令行操作
  • 主题命令操作

    • Kafka中的数据单元是消息,可以把它当作数据库里的一行“数据”或者一条“记录”来理解。Kafka通过主题来进行分类,主题就好比数据库中的表,每个主题包含多个分区,分区可以分布在不同的服务器上,也就是说通过这种方式来实现分布式数据的存储和读取。

      主题是Kafka中的基础概念,是一切消息处理的基础。主题属于Kafka元数据的一部分,会存储在Zookeeper中。生产者发布消息到某一特定主题上,由消费者去消费特定主题的消息。消费者可以订阅一个或多个主题。

    1
    kafka-topics.sh  # 查看操作命令主题
参数 描述
–bootstrap-server <String: server toconnect to> 连接的Kafka Broker主机名称和端口号。
–topic <String: topic> 操作的topic名称。
–create 创建主题。
–delete 删除主题。
–alter 修改主题。
–list 查看所有主题。
–describe 查看主题详细描述。
–partitions <Integer: # of partitions> 设置分区数。
–replication-factor<Integer: replication factor> 设置分区副本。
–config <String: name=value> 更新系统默认的配置。
  • 生产者命令操作

    • Kafka生产者是指向Kafka集群发送消息的客户端应用程序。生产者将消息封装成一个ProducerRecord向Kafka集群中的某个主题发送消息。发送的消息首先会经过序列化器进行序列化,以便在网络中传输。发送的消息需要经过分区器来决定该消息会分发到主题对应的分区,当然如果指定了分区,那么就不需要分区器来决定。
    1
    kafka-console-producer.sh  # 查看操作生产者
参数 描述
–bootstrap-server <String: server toconnect to> 连接的Kafka Broker主机名称和端口号。
–topic <String: topic> 操作的topic名称。
  • 消费者命令操作

    • Kafka消费者是指从Kafka集群中读取消息的客户端应用程序。消费者将从一个或多个主题订阅消息,并在消费者组中进行组织。当多个消费者形成一个消费组来消费主题时,每个消费者会收到不同分区的消息。消费者可以控制消息的读取位置,从而实现对消息的重复读取或跳过。
    1
    kafka-console-consumer.sh  # 查看操作消费者
参数 描述
–bootstrap-server <String: server toconnect to> 连接的Kafka Broker主机名称和端口号。
–topic <String: topic> 操作的topic名称。
–from-beginning 从头开始消费。
–group <String: consumer group id> 指定消费者组名称。

4、Flume

Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。它的主要作用是将数据从各种数据源(如Web服务器)收集到Hadoop的HDFS中,或者将数据从一个地方传输到另一个地方。Flume的核心概念包括:Source、Channel和Sink。Source是数据源,可以是Avro、Thrift、JMS、Netcat、Exec等;Channel是缓存区,用于存储Source产生的数据;Sink是数据目的地,可以是HDFS、HBase、Solr、Elasticsearch等。

  • 删除lib目录下的guava.jar以兼容Hadoop
(1)整体流程

日志采集Flume需要采集日志文件内容,并对日志格式(JSON)进行校验,然后将校验通过的日志发送到Kafka。此处可选择TaildirSource和KafkaChannel,并配置日志校验拦截器。

  • TailDirSource相比ExecSource、SpoolingDirectorySource的优势
    • TailDirSource:断点续传、多目录。Flume1.6以前需要自己自定义Source记录每次读取文件位置,实现断点续传。
    • ExecSource可以实时搜集数据,但是在Flume不运行或者Shell命令出错的情况下,数据将会丢失。
    • SpoolingDirectorySource监控目录,支持断点续传。
  • Kafka Channel
    • 采用Kafka Channel,省去了Sink,提高了效率。

Flume执行job实现日志的采集,因此无需就行过多的原生配置,只需要针对每个任务写对应的job即可。

(2)配置实操
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# job/file_to_kafka.conf
# 定义组件
a1.sources = r1
a1.channels = c1

# 配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = xxx.ETLInterceptor$Builder # 自定义拦截器jar包

# 配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false

# 组装
a1.sources.r1.channels = c1
1
2
flume-ng agent -n a1 -c conf/ -f job/file_to_kafka.conf -Dflume.root.logger=info,console
# 这段代码是用于启动Flume的命令行。其中,`flume-ng`是启动Flume的命令,`agent`表示启动的是一个agent,`-n a1`表示agent的名称为a1,`-c conf/`表示配置文件所在的目录为conf,`-f job/file_to_kafka.conf`表示使用名为file_to_kafka.conf的配置文件,`-Dflume.root.logger=info,console`表示设置日志级别为info,并将日志输出到控制台。

5、MySQL

配置流程
  • 安装MySQL依赖
  • 安装mysql-client
  • 安装mysql-server
  • 配置MySQL
    • 设置复杂密码
    • 更改MySQL密码策略
    • 设置简单密码

6、Maxwell

Maxwell是一个能实时读取MySQL二进制日志binlog,并生成 JSON 格式的消息,作为生产者发送给 Kafka、Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平台的应用程序³。它的主要作用是将MySQL数据库中的数据变更事件转换成JSON格式的消息,以便于在分布式系统中进行数据同步。Maxwell可以通过配置文件来指定需要同步的表,以及需要过滤掉的字段等信息。

原理:将自己伪装成slave,并遵循MySQL主从复制的协议,从master同步数据。

(1)配置流程
  • 修改MySQL配置,启用Binglog日志(默认不开启)后重启MySQL服务。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    [mysqld]

    #数据库id
    server-id = 1
    #启动binlog,该参数的值会作为binlog的文件名
    log-bin=mysql-bin
    #binlog类型,maxwell要求为row类型
    binlog_format=row
    #启用binlog的数据库,需根据实际情况作出修改
    binlog-do-db=gmall
    • 几种常见的Binlog模式

      • Statement-based:基于语句,Binlog会记录所有写操作的SQL语句,包括insert、update、delete等。

        优点: 节省空间

        缺点: 有可能造成数据不一致,例如insert语句中包含now()函数。

      • Row-based:基于行,Binlog会记录每次写操作后被操作行记录的变化。

        优点:保持数据的绝对一致性。

        缺点:占用较大空间。

      • mixed:混合模式,默认是Statement-based,如果SQL语句可能导致数据不一致,就自动切换到Row-based。

    • Maxwell要求Binlog采用Row-based模式。

  • 配置maxwell目录下的config.properties.example文件(元数据库、生产者、集群配置)

    • 重命名为config.properties
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    #Maxwell数据发送目的地,可选配置有stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
    producer=kafka
    #目标Kafka集群地址
    kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092
    #目标Kafka topic,可静态配置,例如:maxwell,也可动态配置,例如:%{database}_%{table}
    kafka_topic=maxwell

    #MySQL相关配置
    host=hadoop102
    user=maxwell
    password=maxwell
    jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai
(2)常见启停脚本
1
2
maxwell --config /opt/module/maxwell/config.properties --daemon                         # 启动maxwell
ps -ef | grep maxwell | grep -v grep | grep maxwell | awk '{print $2}' | xargs kill -9 # 停止maxwell
(3)增量同步和全量同步
  • 增量同步:启用Kafka消费者、maxwell,Mock业务数据即可在Kafka中观察到生成的数据;

  • 全量同步:

    • Maxwell提供了bootstrap功能来进行历史数据的全量同步,输出结果为Json格式
    1
    2
    maxwell-bootstrap --database gmall --table user_info --config /opt/module/maxwell/config.properties
    # 这段代码是在运行maxwell-bootstrap命令,它会将gmall数据库中的user_info表的数据导入到另一个数据库中。其中,--config /opt/module/maxwell/config.properties是指定了配置文件的路径。

7、DataX

DataX 是阿里巴巴开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。源码地址:https://github.com/alibaba/DataX

为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。

(1)配置流程
  • 解压自检

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    python /opt/module/datax/bin/datax.py /opt/module/datax/job/job.json

    # 出现如下反馈代表安装成功
    ……
    2021-10-12 21:51:12.335 [job-0] INFO JobContainer -
    任务启动时刻 : 2021-10-12 21:51:02
    任务结束时刻 : 2021-10-12 21:51:12
    任务总计耗时 : 10s
    任务平均流量 : 253.91KB/s
    记录写入速度 : 10000rec/s
    读出记录总数 : 100000
    读写失败总数 : 0
(2)配置说明

配置文件模板为json,json最外层是一个job,job包含setting和content两部分,其中setting用于对整个job进行配置,content用户配置数据源和目的地。

(3)DataX优化

DataX3.0提供了包括通道(并发)、记录流、字节流三种流控模式,可以随意控制你的作业速度,让你的作业在数据库可以承受的范围内达到最佳的同步速度。

参数 说明
job.setting.speed.channel 并发数
job.setting.speed.record 总record限速
job.setting.speed.byte 总byte限速
core.transport.channel.speed.record 单个channel的record限速,默认值为10000(10000条/s)
core.transport.channel.speed.byte 单个channel的byte限速,默认值1024*1024(1M/s)

注意事项:

1.若配置了总record限速,则必须配置单个channel的record限速

2.若配置了总byte限速,则必须配置单个channe的byte限速

3.若配置了总record限速和总byte限速,channel并发数参数就会失效。因为配置了总record限速和总byte限速之后,实际channel并发数是通过计算得到的:

计算公式为:

min(总byte限速/单个channel的byte限速,总record限速/单个channel的record限速)

8、Hive

Hive是一个基于Hadoop的数据仓库工具,用于处理结构化数据。它可以将结构化的数据文件映射为一张数据库表,并提供类SQL查询功能。Hive的操作接口采用类SQL语法,提供快速开发的能力,避免了写MapReduce,减少开发人员的学习成本,功能扩展很方便。

你可以使用Hive来进行数据提取、转化、加载,这是一种可以存储、查询和分析存储在Hadoop中的大规模数据的机制。你可以在Hive中使用类SQL语句来查询数据,这些语句会被转换成MapReduce任务来执行。你需要安装Hive并配置它以便使用它。你可以在网上找到很多关于如何使用Hive的教程和指南。

配置流程
  • 解决日志包冲突,删除lib下的log4j-slf4j-impl.jar包

  • 拷贝对应版本的mysql驱动到lib目录 mysql-connector-java-5.1.27-bin.jar

  • 新建hive-site.xml

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    <?xml version="1.0"?>
    <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
    <configuration>
    <property>
    <name>javax.jdo.option.ConnectionURL</name>
    <value>jdbc:mysql://hadoop102:3306/metastore?useSSL=false&amp;useUnicode=true&amp;characterEncoding=UTF-8</value>
    </property>

    <property>
    <name>javax.jdo.option.ConnectionDriverName</name>
    <value>com.mysql.jdbc.Driver</value>
    </property>

    <property>
    <name>javax.jdo.option.ConnectionUserName</name>
    <value>root</value>
    </property>

    <property>
    <name>javax.jdo.option.ConnectionPassword</name>
    <value>000000</value>
    </property>

    <property>
    <name>hive.metastore.warehouse.dir</name>
    <value>/user/hive/warehouse</value>
    </property>

    <property>
    <name>hive.metastore.schema.verification</name>
    <value>false</value>
    </property>

    <property>
    <name>hive.server2.thrift.port</name>
    <value>10000</value>
    </property>

    <property>
    <name>hive.server2.thrift.bind.host</name>
    <value>hadoop102</value>
    </property>

    <property>
    <name>hive.metastore.event.db.notification.api.auth</name>
    <value>false</value>
    </property>

    <property>
    <name>hive.cli.print.header</name>
    <value>true</value>
    </property>

    <property>
    <name>hive.cli.print.current.db</name>
    <value>true</value>
    </property>
    </configuration>
  • 在MySQL创建Hive元数据库metastore

  • 初始化Hive元数据库

    1
    schematool -initSchema -dbType mysql -verbose
  • 修改元数据库字符集

    • Hive元数据库的字符集默认为Latin1,由于其不支持中文字符,故若建表语句中包含中文注释,会出现乱码现象。修改Hive元数据库中存储注释的字段的字符集为utf-8。
    1
    2
    alter table COLUMNS_V2 modify column COMMENT varchar(256) character set utf8;
    alter table TABLE_PARAMS modify column PARAM_VALUE mediumtext character set utf8;

9、Spark

Spark是一个快速的、通用的、基于内存的分布式计算系统,用于大规模数据处理。它是UC Berkeley AMP lab所开源的类Hadoop MapReduce的通用并行框架,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出和结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。

(1)兼容性说明

官网下载的Hive3.1.2和Spark3.0.0默认是不兼容的。因为Hive3.1.2支持的Spark版本是2.4.5,所以需要我们重新编译Hive3.1.2版本。

编译步骤:官网下载Hive3.1.2源码,修改pom文件中引用的Spark版本为3.0.0,如果编译通过,直接打包获取jar包。如果报错,就根据提示,修改相关方法,直到不报错,打包获取jar包。

这里我试过了hive4.0.0alpha去兼容jdk11,但是无疾而终(默认启动的是beeline不是hive,这个时候我还没学hive,不太好大刀阔斧上手)。spark也试了从3.0.0到最新版中的几个版本,但是hive没有自编译,所以总会报错。

另外,hadoop版本也要考虑,Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.spark.SparkTask. RPC channel is closed.这个问题在YARN日志报的$GetFileInfoRequestProto无法强制转换为com.google.protobuf.Message,主要就是hadoop版本导致的。

(2)配置流程
  • 在hive中创建spark配置文件

    1
    2
    3
    4
    5
    6
    # spark-defaults.conf
    spark.master yarn
    spark.eventLog.enabled true
    spark.eventLog.dir hdfs://hadoop102:8020/spark-history
    spark.executor.memory 1g
    spark.driver.memory 1g
  • 修改hive-site.xml文件,更改为spark引擎

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    <!--Spark依赖位置(注意:端口号8020必须和namenode的端口号一致)-->
    <property>
    <name>spark.yarn.jars</name>
    <value>hdfs://hadoop102:8020/spark-jars/*</value>
    </property>

    <!--Hive执行引擎-->
    <property>
    <name>hive.execution.engine</name>
    <value>spark</value>
    </property>
  • 增加Application Master资源比例

    • 修改YARN配置文件yarn-site.xml
    1
    2
    3
    4
    <property>
    <name>yarn.scheduler.capacity.maximum-am-resource-percent</name>
    <value>0.8</value>
    </property

10、DolphinScheduler

Apache DolphinScheduler是一个分布式、易扩展的可视化DAG工作流任务调度平台。致力于解决数据处理流程中错综复杂的依赖关系,使调度系统在数据处理流程中开箱即用。

(1)核心架构
  • MasterServer采用分布式无中心设计理念,MasterServer主要负责 DAG 任务切分、任务提交、任务监控,并同时监听其它MasterServer和WorkerServer的健康状态。

  • WorkerServer也采用分布式无中心设计理念,WorkerServer主要负责任务的执行和提供日志服务。

  • ZooKeeper服务,系统中的MasterServer和WorkerServer节点都通过ZooKeeper来进行集群管理和容错。

  • Alert服务,提供告警相关服务。

  • API接口层,主要负责处理前端UI层的请求。

  • UI,系统的前端页面,提供系统的各种可视化操作界面。

(2)前置准备
  • 三台节点均需部署JDK(1.8+),并配置相关环境变量。

  • 需部署数据库,支持MySQL(5.7+)或者PostgreSQL(8.2.15+)。如 MySQL 则需要 JDBC Driver 8.0.16。

  • 需部署Zookeeper(3.4.6+)。

  • 如果启用 HDFS 文件系统,则需要 Hadoop(2.6+)环境。

  • 三台节点均需安装进程管理工具包psmisc。

(3)配置流程
  • 配置一键部署脚本

    • 修改解压目录下的conf/config目录下的install_config.conf文件

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      ips="hadoop102,hadoop103,hadoop104" 
      # 将要部署任一 DolphinScheduler 服务的服务器主机名或 ip 列表

      masters="hadoop102"
      # master 所在主机名列表,必须是 ips 的子集

      workers="hadoop102:default,hadoop103:default,hadoop104:default"
      # worker主机名及队列,此处的 ip 必须在 ips 列表中

      alertServer="hadoop102"
      # 告警服务所在服务器主机名

      apiServers="hadoop102"
      # api服务所在服务器主机名

      installPath="/opt/module/dolphinscheduler"
      # DS 安装路径,如果不存在会创建

      deployUser="atguigu"
      # 部署用户,任务执行服务是以 sudo -u {linux-user} 切换不同 Linux 用户的方式来实现多租户运行作业,因此该用户必须有免密的 sudo 权限。

      javaHome="/opt/module/jdk"
      # JAVA_HOME 路径

      DATABASE_TYPE="mysql"
      # 数据库类型

      SPRING_DATASOURCE_URL="jdbc:mysql://hadoop102:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8"
      # 数据库 URL

      SPRING_DATASOURCE_USERNAME="dolphinscheduler"
      # 数据库用户名

      SPRING_DATASOURCE_PASSWORD="dolphinscheduler"
      # 数据库密码

      registryPluginName="zookeeper"
      # 注册中心插件名称,DS 通过注册中心来确保集群配置的一致性

      registryServers="hadoop102:2181,hadoop103:2181,hadoop104:2181"
      # 注册中心地址,即 Zookeeper 集群的地址

      registryNamespace="dolphinscheduler"
      # DS 在 Zookeeper 的结点名称

      resourceStorageType="HDFS"
      # 资源存储类型

      resourceUploadPath="/dolphinscheduler"
      # 资源上传路径

      defaultFS="hdfs://hadoop102:8020"
      # 默认文件系统

      resourceManagerHttpAddressPort="8088"
      # yarn RM http 访问端口

      yarnHaIps=
      # Yarn RM 高可用 ip,若未启用 RM 高可用,则将该值置空

      singleYarnIp="hadoop103"
      # Yarn RM 主机名,若启用了 HA 或未启用 RM,保留默认值

      hdfsRootUser="atguigu"
      # 拥有 HDFS 根目录操作权限的用户
  • 初始化数据库

    • 创建数据库

      1
      CREATE DATABASE dolphinscheduler DEFAULT CHARACTER SET utf8 DEFAULT COLLATE utf8_general_ci;
    • 创建用户

      1
      CREATE USER 'dolphinscheduler'@'%' IDENTIFIED BY 'dolphinscheduler';
    • 赋予用户相应权限

      1
      2
      GRANT ALL PRIVILEGES ON dolphinscheduler.* TO 'dolphinscheduler'@'%';
      flush privileges;
    • 拷贝MySQL驱动到DolphinScheduler的解压目录下的lib中

    • 执行数据库初始化脚本

      1
      script/create-dolphinscheduler.sh
(3)常见启停脚本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
./install.sh  # 一键部署并启动
./bin/start-all.sh # 启动所有服务(区别于Hadoop)
./bin/stop-all.sh # 关闭所有服务(区别于Hadoop)

# 启停Master
./bin/dolphinscheduler-daemon.sh start master-server
./bin/dolphinscheduler-daemon.sh stop master-server

# 启停Worker
./bin/dolphinscheduler-daemon.sh start worker-server
./bin/dolphinscheduler-daemon.sh stop worker-server

# 启停Api
./bin/dolphinscheduler-daemon.sh start api-server
./bin/dolphinscheduler-daemon.sh stop api-server

# 启动Logger
./bin/dolphinscheduler-daemon.sh start logger-server
./bin/dolphinscheduler-daemon.sh stop logger-server

# 启停Alert
./bin/dolphinscheduler-daemon.sh start alert-server
./bin/dolphinscheduler-daemon.sh stop alert-server

对于每个节点的配置在部署后的目录中,针对每个进程都有一个conf文件

11、Superset

强烈斥责Superset的开发人员,兼容性屌差,我曾经尝试过用Docker、Miniconda安装过Superset,但是安装成功的次数寥寥无几,所以在这里着重介绍下。尤其我大一的时候,那时候B站UP主鱼皮还介绍过十分钟搭建Superset,结果放到现在这种方式不太行了(看着官方文档还是可以,但是那个依赖版本维护极差,对小白其实很不友好,尤其当时我还没学数据库),现在Superset最新版都2.0.0+了,不胜唏嘘啊。

(1)安装流程
  • 安装Miniconda

  • 创建虚拟环境

    • 配置conda国内镜像
    • 创建Python3.8环境
  • 安装依赖

    1
    sudo yum install -y gcc gcc-c++ libffi-devel python-devel python-pip python-wheel python-setuptools openssl-devel cyrus-sasl-devel openldap-devel
  • 安装(更新)setuptools和pip

    1
    pip install --upgrade setuptools pip -i https://pypi.douban.com/simple/
  • 指定安装版本:

    1
    pip install apache-superset==1.4.2 -i https://pypi.douban.com/simple/
  • 依赖降级:

    1
    2
    3
    4
    5
    6
    pip uninstall werkzeyg
    pip uninstall markupsafe
    pip install markupsafe==2.0.1
    pip install flask==1.1.2
    pip install Werkzeug==1.0.1
    pip install --upgrade cryptography==3.2
  • 初始化数据库之前先执行:export FLASK_APP=superset

  • 初始化Superset

    1
    superset db upgrade
  • 安装gunicorn

    1
    pip install gunicorn -i https://pypi.douban.com/simple/
(2)常见启停脚本
1
2
3
4
5
6
gunicorn --workers 5 --timeout 120 --bind hadoop102:8787  "superset.app:create_app()" --daemon
# 启动Superset
# workers:指定进程个数
# timeout:worker进程超时时间,超时会自动重启
# bind:绑定本机地址,即为Superset访问地址
# daemon:后台运行
(3)安装MySQL依赖项
1
conda install mysqlclient

对接不同的数据源,需安装不同的依赖,以下地址为官网说明。

https://superset.apache.org/docs/databases/installing-database-drivers