听说许多同学在进入到正式开发环节中时,面对新建离线任务时的任务类型选择:DDL?数据同步?HBase?到底该选择从什么开始做开发,感到无从下手!脑袋里不停浮现出无数问号:这是啥?这又是啥?都是干啥的?
今天,我们就来跟大家分别介绍一下,这些任务到底有什么不同,分别是用来做什么的。大家也可以根据下方的指导操作,进行相应的测试体验。(下面每种作业类型仅供测试使用,不需要每天周期调度执行,因此统一存放到开发中心 -> 临时作业 中的tmp目录下,且所有作业命名以tmp_
开头。)
1.DDL 任务
DDL类型为平台抽象的一种作业类型,用于书写建表语句,当引擎类型为Hadoop时,DDL 作业本质即为 Hive 作业。
新建tmp_ddl
临时作业,模板选择空模板,输入下述测试代码:
-- 如果表已存在,可以删除掉
-- DROP TABLE IF EXISTS demo_tmp_ddl;
CREATE TABLE IF NOT EXISTS demo_tmp_ddl
(
userid BIGINT COMMENT '用户ID'
, ip STRING COMMENT 'IP地址'
, address STRING COMMENT '地址'
)
COMMENT '临时表'
PARTITIONED BY ( ds STRING )
STORED AS TEXTFILE;
- 内部表模板代码:
-- 如果表已存在,可以删除掉
-- DROP TABLE IF EXISTS demo;
CREATE TABLE IF NOT EXISTS demo
(
userid BIGINT COMMENT '用户ID'
, ip STRING COMMENT 'IP地址'
, address STRING COMMENT '地址'
)
COMMENT '内部表'
PARTITIONED BY ( ds STRING )
STORED AS TEXTFILE;
- 外部表模板代码:
-- 如果表已存在,可以删除掉
-- DROP TABLE IF EXISTS demo;
CREATE EXTERNAL TABLE IF NOT EXISTS demo
(
userid BIGINT COMMENT '用户ID'
, ip STRING COMMENT 'IP地址'
, address STRING COMMENT '地址'
)
COMMENT '外部表'
PARTITIONED BY ( ds STRING )
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION '/user/hive/external/demo';
2.数据同步任务
数据同步作业用于数据同步或数据交换,目的是把数据从一个数据源中同步到另一个数据源。
2.1 数据源配置
首先创建一个数据同步作业tmp_datax
,配置源头表信息和目的表信息。(若无可用数据源,请先到数据源管理中添加并授权数据源,并在该环境中添加数据源)
2.2 字段映射
配置源头表和目的表的字段映射关系,映射序号为连线的顺序。
点击fx图标可以为该字段设置转换函数,关于函数的使用方式可在函数开发-系统函数-数据同步作业函数中查看。
如图所示,在content字段上设置dx_substr("0","3"),则从第0位开始,截取长度为3的字符串。
在字段映射时,可选择将写入时间做为源信息写入到目的表的某个字段。目前支持MySQL和Oracle。
字段映射时,当遇到复杂的使用场景时,可通过Transformer来做字段转换功能。代码示例如下,该代码的作用是在序号为0的字段即user_id
的头部加上前缀"111"。
注意:“code”中的代码不能有换行。
{
"name": "dx_groovy",
"parameter": {
"code":"Column column = record.getColumn(0);def str = column.asString();def sb = new StringBuffer(str);def header = sb.insert(0,'111');def strHearder = header.toString();record.setColumn(0, new StringColumn(strHearder));return record",
"extraPackage": ["import groovy.json.JsonSlurper;"]
}
}
2.3 作业配置
配置最大传输速度和容错。
2.4 完成
数据同步作业后,支持修改字段映射和作业配置,不支持修改数据源和表。
2.5 运行
点击运行,开始数据同步,日志中将显示同步结果的统计数据。
可以看到表demo_tmp_ddl
中导入了数据,并按指定函数转换规则进行了转换。
3.非结构化数据同步
非结构化数据同步作业用于同步文件或文件夹,目的是把文件资源从一个数据源中同步到另一个数据源。
创建一个非结构化数据同步作业tmp_sycn
,配置源端和目的端信息。(若无可用数据源,请先到数据源管理中添加并授权数据源到离线开发,并在该环境中添加数据源)
如图所示,将一个user文件夹及其下所有文件同步到tmp文件夹下。
3.1 源端
源端数据源类型支持OSS、FTP和HDFS。选择数据源后,可选填资源路径,当鼠标置于输入框中,显示下一层目录或文件,可在输入框中选择目录或文件。支持模糊搜索,支持通配符*。
点击添加,可增加一行输入框,用于支持批量传输。
通配符*只能用于目的文件夹或文件,不可用于目录中。
3.2 目的端
目的端数据源支持HDFS。选择数据源后,可选填存储路径,当鼠标置于输入框中,显示下一层目录。
冲突策略分为覆盖、重命名、跳过、报错。
-
覆盖:当文件重名时,新文件直接覆盖旧的文件。
-
重命名:当文件重名时,新文件名称拼接timestamp做为文件名。
-
跳过:当文件重名时,忽略新文件。
-
报错:当文件重名时,停止作业并报错。
4. Hive
Hive2类型用于书写Hive SQL。新建tmp_hive
临时作业,输入以下代码。
show tables
5. Shell
Shell类型用于书写shell。新建tmp_shell
临时作业,输入下述测试代码:
echo "离线开发"
# 输出系统时间
date
6. Python
Python类型用于书写Python代码。新建tmp_python
临时作业,输入以下代码。
# -*- coding: utf-8 -*-
"""
*********************************************************************
功能:使用python脚本,打印字符串信息
*********************************************************************
"""
# 引用其他python资源
if __name__ == '__main__':
print "citic"
7. Perl
Perl类型用于书写Perl代码。新建tmp_perl
临时作业,输入以下代码,打印语句。
print "Hello, World!\n";
8. SparkSQL
Spark SQL类型用于书写Spark SQL。新建tmp_sparksql
临时作业,输入以下代码:
show tables
9. Spark
Spark类型用于书写Spark代码,语言支持Scala和Java。由于平台暂不支持在线编译代码,因此书写Spark作业的步骤如下:
9.1 在Idea或者Eclipse中书写如下Scala或Java代码:SparkSqlDemo.scala,并生成jar包:spark_sql_demo.jar
(在Idea中建议安装PackageJars插件,可直接在package上进行打包)
package com.dtwave.spark.sql
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
/**
*
* SparkSQL定义临时函数、操作Hive表
*
* 输出Jar包路径: libs/spark_sql_demo.jar
*
* @author baisong
* @date 18/2/1
*/
object SparkSqlDemo {
//设置日志级别
Logger.getLogger("org").setLevel(Level.ERROR)
def main(args: Array[String]): Unit = {
if (args.length < 2) {
System.err.println("Usage: SparkSqlDemo <dbName> <tableName>")
System.exit(-1)
}
//库名
val dbName = args(0)
// 表名
val tableName = args(1)
/**
* 此处不用设置AppName和Master参数, 平台提交作业会自动添加.
*
* AppName命名格式: 人物名_用户名_实例名
*/
val spark = SparkSession
.builder()
.enableHiveSupport()
.getOrCreate()
// 注册临时UDF
// spark.udf.register("demo_udf", (x: String) => "Hello," + x)
// 查询Hive表数据
val df = spark.sql(s"select * from ${dbName}.${tableName} limit 3")
// 打印输出数据
df.show(false)
}
}
9.2 上传spark_sql_demo.jar
到资源文件,名称为spark_sql_demo
。
9.3 新建临时作业tmp_spark
,jar选择spark_sql_demo
,配置好spark作业的基本信息可参数配置:class为com.dtwave.spark.sql.SparkSqlDemo,master为yarn,deploy-mode为client,app arguments为${dbName} ${tableName},配置作业的属性配置信息,包括作业参数和资源依赖。作业参数增加一个dbName,值为当前环境所使用的数据库名estate
;tableName为表名,例如:demo_tmp_ddl
。
10. PySpark
PySpark类型用于书写Python Spark 代码。新建tmp_pyspark
临时作业,输入下述测试代码:
样例代码1:
import sys
from random import random
from operator import add
from pyspark.sql import SparkSession
if __name__ == "__main__":
spark = SparkSession\
.builder\
.appName("PythonPi") \
.getOrCreate()
partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
n = 100000 * partitions
def f(_):
x = random() * 2 - 1
y = random() * 2 - 1
return 1 if x ** 2 + y ** 2 < 1 else 0
count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
print("Pi is roughly %f" % (4.0 * count / n))
spark.stop()
样例代码2:
注意:要使以下代码正常运行,需要先按numpy,安装命令 :pip install numpy。
from __future__ import print_function
from pyspark.ml.feature import Word2Vec
from pyspark.sql import SparkSession
# Word2Vec Example
if __name__ == "__main__":
spark = SparkSession\
.builder\
.appName("Word2VecExample")\
.getOrCreate()
# $example on$
# Input data: Each row is a bag of words from a sentence or document.
documentDF = spark.createDataFrame([
("Hi I heard about Spark".split(" "), ),
("I wish Java could use case classes".split(" "), ),
("Logistic regression models are neat".split(" "), )
], ["text"])
# Learn a mapping from words to Vectors.
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text",
outputCol="result")
model = word2Vec.fit(documentDF)
result = model.transform(documentDF)
for row in result.collect():
text, vector = row
print("Text: [%s] => \nVector: %s\n" % (", ".join(text),
str(vector)))
spark.stop()
11. Tensorflow
新建TensorFlow临时作业tmp_tensorflow
,代码如下:
#!/usr/bin/python
import re
line = "Cats are smarter than dogs"
matchObj = re.match( r'(.*) are (.*?) .*', line, re.M|re.I)
if matchObj:
print "matchObj.group() : ", matchObj.group()
print "matchObj.group(1) : ", matchObj.group(1)
print "matchObj.group(2) : ", matchObj.group(2)
else:
print "No match!!"
12. Hbase
新建tmp_Hbase
临时作业,输入以下代码查询有哪些表:
list