• 阅读: 927 回复: 0
    学习委员

    数栖云中的各个作业类型,到底有什么不同?

    楼主 发表于 2019-08-08 18:56:39

    听说许多同学在进入到正式开发环节中时,面对新建离线任务时的任务类型选择:DDL?数据同步?HBase?到底该选择从什么开始做开发,感到无从下手!脑袋里不停浮现出无数问号:这是啥?这又是啥?都是干啥的?

    今天,我们就来跟大家分别介绍一下,这些任务到底有什么不同,分别是用来做什么的。大家也可以根据下方的指导操作,进行相应的测试体验。(下面每种作业类型仅供测试使用,不需要每天周期调度执行,因此统一存放到开发中心 -> 临时作业 中的tmp目录下,且所有作业命名以tmp_开头。)

    1.DDL 任务

    DDL类型为平台抽象的一种作业类型,用于书写建表语句,当引擎类型为Hadoop时,DDL 作业本质即为 Hive 作业。

    新建tmp_ddl临时作业,模板选择空模板,输入下述测试代码:

    -- 如果表已存在,可以删除掉
    -- DROP TABLE IF EXISTS demo_tmp_ddl;
    
    CREATE TABLE IF NOT EXISTS demo_tmp_ddl 
    ( 
          userid    BIGINT COMMENT '用户ID'
        , ip        STRING COMMENT 'IP地址'
        , address    STRING COMMENT '地址'
    )
    COMMENT '临时表'
    PARTITIONED BY ( ds STRING )
    STORED AS TEXTFILE;

    1560519614462

    • 内部表模板代码:
    -- 如果表已存在,可以删除掉
    -- DROP TABLE IF EXISTS demo;
    
    CREATE TABLE IF NOT EXISTS demo 
    ( 
          userid    BIGINT COMMENT '用户ID'
        , ip        STRING COMMENT 'IP地址'
        , address    STRING COMMENT '地址'
    )
    COMMENT '内部表'
    PARTITIONED BY ( ds STRING )
    STORED AS TEXTFILE;
    • 外部表模板代码:
    -- 如果表已存在,可以删除掉
    -- DROP TABLE IF EXISTS demo;
    
    CREATE EXTERNAL TABLE IF NOT EXISTS demo 
    ( 
          userid        BIGINT COMMENT '用户ID'
        , ip            STRING COMMENT 'IP地址'
        , address        STRING COMMENT '地址'
    ) 
    COMMENT '外部表'
    PARTITIONED BY ( ds STRING )
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ','
    STORED AS TEXTFILE
    LOCATION '/user/hive/external/demo';

    2.数据同步任务

    数据同步作业用于数据同步或数据交换,目的是把数据从一个数据源中同步到另一个数据源。

    2.1 数据源配置

    首先创建一个数据同步作业tmp_datax,配置源头表信息和目的表信息。(若无可用数据源,请先到数据源管理中添加并授权数据源,并在该环境中添加数据源)

    1560521662179

    2.2 字段映射

    配置源头表和目的表的字段映射关系,映射序号为连线的顺序。

    1560686669345

    点击fx图标可以为该字段设置转换函数,关于函数的使用方式可在函数开发-系统函数-数据同步作业函数中查看。

    如图所示,在content字段上设置dx_substr("0","3"),则从第0位开始,截取长度为3的字符串。

    15606867109421560686742855

    在字段映射时,可选择将写入时间做为源信息写入到目的表的某个字段。目前支持MySQL和Oracle。

    字段映射时,当遇到复杂的使用场景时,可通过Transformer来做字段转换功能。代码示例如下,该代码的作用是在序号为0的字段即user_id的头部加上前缀"111"。

    注意:“code”中的代码不能有换行。  

    {
        "name": "dx_groovy",
        "parameter": {
        "code":"Column column = record.getColumn(0);def str = column.asString();def sb = new StringBuffer(str);def header = sb.insert(0,'111');def strHearder = header.toString();record.setColumn(0, new StringColumn(strHearder));return record",
        "extraPackage": ["import groovy.json.JsonSlurper;"]
        }
     }

    2.3 作业配置

    配置最大传输速度和容错。

    1560684925211

    2.4 完成

    数据同步作业后,支持修改字段映射和作业配置,不支持修改数据源和表。

    1560684967263

    2.5 运行

    点击运行,开始数据同步,日志中将显示同步结果的统计数据。

    1560686931050

    可以看到表demo_tmp_ddl中导入了数据,并按指定函数转换规则进行了转换。

    1560845854580

    3.非结构化数据同步

    非结构化数据同步作业用于同步文件或文件夹,目的是把文件资源从一个数据源中同步到另一个数据源。

    创建一个非结构化数据同步作业tmp_sycn,配置源端和目的端信息。(若无可用数据源,请先到数据源管理中添加并授权数据源到离线开发,并在该环境中添加数据源)

    如图所示,将一个user文件夹及其下所有文件同步到tmp文件夹下。

    1560687311646

    3.1 源端

    源端数据源类型支持OSS、FTP和HDFS。选择数据源后,可选填资源路径,当鼠标置于输入框中,显示下一层目录或文件,可在输入框中选择目录或文件。支持模糊搜索,支持通配符*。

    点击添加,可增加一行输入框,用于支持批量传输。

    通配符*只能用于目的文件夹或文件,不可用于目录中。

    3.2 目的端

    目的端数据源支持HDFS。选择数据源后,可选填存储路径,当鼠标置于输入框中,显示下一层目录。

    冲突策略分为覆盖、重命名、跳过、报错。

    • 覆盖:当文件重名时,新文件直接覆盖旧的文件。

    • 重命名:当文件重名时,新文件名称拼接timestamp做为文件名。

    • 跳过:当文件重名时,忽略新文件。

    • 报错:当文件重名时,停止作业并报错。

    4. Hive

    Hive2类型用于书写Hive SQL。新建tmp_hive临时作业,输入以下代码。

    show tables
     1560519669452

    5. Shell

    Shell类型用于书写shell。新建tmp_shell临时作业,输入下述测试代码:

    echo "离线开发"
    
    # 输出系统时间
    date
    
     1560518985256

    6. Python

    Python类型用于书写Python代码。新建tmp_python临时作业,输入以下代码。

    # -*- coding: utf-8 -*-
    
    """
    *********************************************************************
    功能:使用python脚本,打印字符串信息
    *********************************************************************
    """
    
    # 引用其他python资源
    
    if __name__ == '__main__':
        print "citic"
    
     1560519061637

    7. Perl

    Perl类型用于书写Perl代码。新建tmp_perl临时作业,输入以下代码,打印语句。

    print "Hello, World!\n";
    
     1560519414552

    8. SparkSQL

    Spark SQL类型用于书写Spark SQL。新建tmp_sparksql临时作业,输入以下代码:

    show tables

    1560519749012

    9. Spark

    Spark类型用于书写Spark代码,语言支持Scala和Java。由于平台暂不支持在线编译代码,因此书写Spark作业的步骤如下:

    9.1 在Idea或者Eclipse中书写如下Scala或Java代码:SparkSqlDemo.scala,并生成jar包:spark_sql_demo.jar(在Idea中建议安装PackageJars插件,可直接在package上进行打包)

    package com.dtwave.spark.sql
    
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.sql.SparkSession
    
    /**
      *
      * SparkSQL定义临时函数、操作Hive表
      *
      * 输出Jar包路径: libs/spark_sql_demo.jar
      *
      * @author baisong
      * @date 18/2/1
      */
    object SparkSqlDemo {
    
      //设置日志级别
      Logger.getLogger("org").setLevel(Level.ERROR)
    
      def main(args: Array[String]): Unit = {
    
        if (args.length < 2) {
          System.err.println("Usage: SparkSqlDemo <dbName> <tableName>")
          System.exit(-1)
        }
        //库名
        val dbName = args(0)
        // 表名
        val tableName = args(1)
    
        /**
          * 此处不用设置AppName和Master参数, 平台提交作业会自动添加.
          *
          * AppName命名格式: 人物名_用户名_实例名
          */
        val spark = SparkSession
          .builder()
          .enableHiveSupport()
          .getOrCreate()
    
        // 注册临时UDF
        // spark.udf.register("demo_udf", (x: String) => "Hello," + x)
    
        // 查询Hive表数据
        val df = spark.sql(s"select * from ${dbName}.${tableName} limit 3")
    
        // 打印输出数据
        df.show(false)
      }
    }

     9.2 上传spark_sql_demo.jar到资源文件,名称为spark_sql_demo

    1560651188847

    9.3 新建临时作业tmp_spark,jar选择spark_sql_demo,配置好spark作业的基本信息可参数配置:class为com.dtwave.spark.sql.SparkSqlDemo,master为yarn,deploy-mode为client,app arguments为${dbName} ${tableName},配置作业的属性配置信息,包括作业参数和资源依赖。作业参数增加一个dbName,值为当前环境所使用的数据库名estate;tableName为表名,例如:demo_tmp_ddl

    1560520503309

    10. PySpark

    PySpark类型用于书写Python Spark 代码。新建tmp_pyspark临时作业,输入下述测试代码:

    样例代码1:

    import sys
    from random import random
    from operator import add
    
    from pyspark.sql import SparkSession
    
    
    if __name__ == "__main__":
        spark = SparkSession\
            .builder\
            .appName("PythonPi") \
            .getOrCreate()
    
        partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
        n = 100000 * partitions
    
        def f(_):
            x = random() * 2 - 1
            y = random() * 2 - 1
            return 1 if x ** 2 + y ** 2 < 1 else 0
    
        count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
        print("Pi is roughly %f" % (4.0 * count / n))
    
        spark.stop()
    
     1560849440604

    样例代码2:

    注意:要使以下代码正常运行,需要先按numpy,安装命令 :pip install numpy。

    from __future__ import print_function
      from pyspark.ml.feature import Word2Vec
      from pyspark.sql import SparkSession
      # Word2Vec Example
      if __name__ == "__main__":
          spark = SparkSession\
              .builder\
              .appName("Word2VecExample")\
              .getOrCreate()
          # $example on$
          # Input data: Each row is a bag of words from a sentence or document.
          documentDF = spark.createDataFrame([
              ("Hi I heard about Spark".split(" "), ),
              ("I wish Java could use case classes".split(" "), ),
              ("Logistic regression models are neat".split(" "), )
    ], ["text"])
          # Learn a mapping from words to Vectors.
          word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text",
      outputCol="result")
          model = word2Vec.fit(documentDF)
          result = model.transform(documentDF)
          for row in result.collect():
              text, vector = row
              print("Text: [%s] => \nVector: %s\n" % (", ".join(text),
      str(vector)))
          spark.stop()

    11. Tensorflow

    新建TensorFlow临时作业tmp_tensorflow,代码如下:

    #!/usr/bin/python
    
    import re
    
    line = "Cats are smarter than dogs"
    
    matchObj = re.match( r'(.*) are (.*?) .*', line, re.M|re.I)
    
    if matchObj:
       print "matchObj.group() : ", matchObj.group()
       print "matchObj.group(1) : ", matchObj.group(1)
       print "matchObj.group(2) : ", matchObj.group(2)
    else:
       print "No match!!"
    
     1560650818687

    12. Hbase

    新建tmp_Hbase临时作业,输入以下代码查询有哪些表:

    list
    
     1560520856139
    
  • 未登录

    回复楼主

    登录后可回复
    /1000