Hive的Transform和UDF

太阳娱乐 21

Hive的Transform和UDF

原标题:MaxCompute重装上阵 第五弹 – SELECT TRANSFOR

UDTF

  • Hive中UDTF编写和使用

2017/12/20 北京云栖大会上阿里云MaxCompute发布了最新的功能Python
UDF,万众期待的功能终于支持啦,我怎么能不一试为快,今天就分享如何通过Studio进行Python
udf开发。

hive自定义函数(udf:user-defined function)

例1:
对于以下数据

1367775,10
1363426,10
1371235,10
1371237,10
1371236,10
1376888,10
1382132,10

1367775   beijing    10
1363426   beijing    10
1371235   shanghai   10
1371237   shanghai   10
1361236   beijing    10
1366888   beijing    10
1382132   shenzhen   10
写一个函数得到省份名

1、开发一个java类,继承UDF(聚合函数继承UDAF)并重载evaluate方法

package bigdata.udf

import org.apache.hadoop.hive.ql.exec.UDF;
                                 //继承类
public class ToLowerCase(GetProvince) extends UDF{
//加载一个字典表
    public static HashMap<Integer,String> provinceMap=new HashMap<Integer,String>
    static {
        provinceMap.put("136","beijing");
        provinceMap.put("137","shanghai");
        provinceMap.put("138","shenzhen");
    }


    //必须是public        //重载evaluate方法根据不同的输入判断调用那个函数
    public String evaluate(String field){
        String result = field.toLowerCase();
        return result;
    }
           //返回值           //输入
    public String evaluate(int phonenbr){
        String pnb = String.valueOf(phonenbr);
        return provinceMap.get(pnb.substring(0,3))== null?"huoxin":provinceMap.get(pnb.substring(0,3));
    }   
}

2、打成jar包上传到服务器
3、将jar包添加到hive的classpath
add JAR /home/hadoop/udf.jar;
4、创建临时函数与开发好的java class 关联
create temporary function getprovince as 'bigdata.udf.ToLowerCase';
5、hql中使用

create table t_flow(phonenbr int,flow int)
row format delimited //使用自带的serde:S erDe是Serialize/Deserilize的简称,目的是用于序列化和反序列化。S erDe能为表指定列,且对列指定相应的数据。
fields terminated by ',';
load data local inpath '/home/hadoop/flow.dat' into table t_flow;

select phonenbr,getprovince(phonenbr),flow from t_flow;

例2:

create table t_json(line string)
row format delimited;
load data local inpath '' into table t_json;
select * from t_json limit 10;

class JsonParser
package bigdata.udf;
import org.apache.hadoop.hive.ql.exec.UDF;
import parquet.org.codehaus.jackson.map.ObjectMapper;
public class JsonParser extends UDF {  //alt+/ctrl+shift+o导包
//Window - Preferences - Java - Editor - Templates,这里你可以看到所有的eclipse的快捷方式
//alt+/补全   
    public String evaluate(String jsonline){  //输入jsonline返回string
        ObjectMapper objectMapper = new ObjectMapper();
        try{
            MovieRateBean bean = ObjectMapper.readValue(jsonline,MovieRateBean);
            return bean.toString();
        }catch(Exception e){
        }
        return "";
    }   
}

MovieRateBean

package bigdata.udf;
public class MovieRateBean{
    private String movie;
    private String rate;
    private String timeStamp;
    private String uid;

    //alt+shift+s
    public String getMovie(){
        return movie;
    }
    public String setMovie(String movie){
        this.movie = movie;
    }
    public String getRate(){
        return rate;
    }
    public void setRate(String rate){
        this.rate = rate;
    }
    public String getTimeStamp(){
        return timestamp;
    }
    public void setTimeStamp(String timeStamp){
        this.timeStamp = timeStamp;
    }
    public String getUid(){
        return uid;
    }
    public void setUid(String uid){
        this.uid = uid;
    }

    public String toString(){

        return this.movie + "t" + this.rate + "t" +this.timeStamp + "t" + this.uid();
    }
}

javabean:这个类是public的,还要有一个无参数的构造函数。第二,属性是private的,必须通过get
和set 方法进行访问。第三,支持“事件”,例如addXXXXListener(XXXEvent
e),可以处理各种事件,比如鼠标点击,键盘响应等等。第四,提供一个反射机制。第五,可以序列化/反序列化的,这样,我就可以被方便的存储,转移了。

bin/beeline -u jdbc:hive2://localhost:10000 -n hadoop
add JAR /home/hadoop/udf.jar;
create temporary function parsejson as 'bigdata.udf.JsonParser';
select parsejson(line) form t_json limit 10;

但是只有一个字段,如何把它分为四个字段

//insert overwrite table t_rating
create table t_rating as
select split(parsejson(line),'t')[0]as movieid,
split(parsejson(line),'t')[1] as rate,
split(parsejson(line),'t')[2] as timestring,
split(parsejson(line),'t')[3] as uid 
from t_json;

内置json函数
select get_json_object(line,'$.movie') as moive,
get_json_object(line,'$.rate') as rate  from rat_json limit 10;

摘要:
MaxCompute(原ODPS)是阿里云自主研发的具有业界领先水平的分布式大数据处理平台,
尤其在集团内部得到广泛应用,支撑了多个BU的核心业务。
MaxCompute除了持续优化性能外,也致力于提升SQL语言的用户体验和表达能力,提高广大ODPS开发者的生产力。

UDAF

  • Hive
    udaf开发入门和运行过程详解
  • Hive通用型自定义聚合函数(UDAF)

 

Transform实现

提供了在sql中调用自写脚本(python或shell脚本)的功能,适合hive中没有的功能又不想写udf的情况。
1.加载rating.json文件到hive的一个原始表

create table t_json(line string)
row format delimited;
load data local inpath '' into table t_json;
select * from t_json limit 10;

2.需要解析json数据成四个字段,插入一张新表t_rating
内置json函数

set hive.support.sql11.reserved.keywords=false;##不然识不出timeStamp
hive> create table t_rating as
    > select get_json_object(line,'$.movie') as moive,get_json_object(line,'$.rate') as rate,get_json_object(line,'$.timeStamp') as timeStamp,get_json_object(line,'$.uid') as uid from t_json;

3.使用transform+python的方式转换unixtime为weekday
先编辑一个python脚本文件,然后将文件加入hive的classpath下:

vi weekday_mapper.py
#!/bin/python
import sys
import datetime

for line in sys.stdin:
    line = line.strip()//去空格
    movieid,rate,timestring,uid = line.split('t')
    weekday=datetime.datetime.fromtimestamp(float(timestring)).isoweekday()
    print 't'.join([movieid,rating,str(weekday),userid])  //相当于后面用/t串起来

add file weekday_mapper.py;

create table u_data_new(
    movieid int,
    rating int,
    weekday int,
    userid int)
row format delimited
fields terminated by '/t';

insert overwrite table u_data_new
//create table u_data_new as
select
    transform(movieid,rate,timestring,uid)
    using'python weekday_mapper.py'
    as(movieid,rating,weekday,userid)
from t_rating;

报错:生无可恋
ERROR : Ended Job = job_local1691136619_0009 with errors
Error: Error while processing statement: FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask (state=08S01,code=2)

select distinct(weekday) from u_data_new limit 10;

MaxCompute(原ODPS)是阿里云自主研发的具有业界领先水平的分布式大数据处理平台,
尤其在集团内部得到广泛应用,支撑了多个BU的核心业务。
MaxCompute除了持续优化性能外,也致力于提升SQL语言的用户体验和表达能力,提高广大ODPS开发者的生产力。

Hive中的TRANSFORM:使用脚本完成Map/Reduce

转自:
Hive的Transform和UDF。http://www.coder4.com/archives/4052

首先来看一下数据:

hive> select * from test;
OK
1       3
2       2
3       1

假设,我们要输出每一列的md5值。在目前的hive中是没有这个udf的。

我们看一下Python的代码:

#!/home/tops/bin/python

import sys
import hashlib

for line in sys.stdin:
    line = line.strip()
    arr = line.split()
    md5_arr = []
    for a in arr:
        md5_arr.append(hashlib.md5(a).hexdigest())
    print "t".join(md5_arr)

在Hive中,使用脚本,首先要将他们加入:

add file /xxxx/test.py

然后,在调用时,使用TRANSFORM语法。

SELECT 
    TRANSFORM (col1, col2) 
    USING './test.py' 
    AS (new1, new2) 
FORM 
    test;

这里,我们使用了AS,指定输出的若干个列,分别对应到哪个列名。如果省略这句,则Hive会将第1个tab前的结果作为key,后面其余作为value。

这里有一个小坑:有时候,我们结合INSERT
OVERWRITE使用上述TRANSFORM,而目标表,其分割副可能不是t。但是请牢记:TRANSFORM的分割符号,传入、传出脚本的,永远是t。不要考虑外面其他的分割符号!

最后,解释一下MAP、REDUCE。

在有的Hive语句中,大家可能会看到SELECT MAP (…) USING ‘xx.py’这样的语法。

然而,在Hive中,MAP、REDUCE只不过是TRANSFORM的别名,Hive不保证一定会在map/reduce中调用脚本。看看官方文档是怎么说的:

Formally, MAP ... and REDUCE ... are syntactic transformations of SELECT TRANSFORM ( ... ). In other words, they serve as comments or notes to the reader of the query. BEWARE: Use of these keywords may be dangerous as (e.g.) typing "REDUCE" does not force a reduce phase to occur and typing "MAP" does not force a new map phase!

所以、混用map
reduce语法关键字,甚至会引起混淆,所以建议大家还是都用TRANSFORM吧。

友情提示:如果脚本不是Python,而是awk、sed等系统内置命令,可以直接使用,而不用add
file。

如果表中有MAP,ARRAY等复杂类型,怎么用TRANSFORM生成?

例如:

CREATE TABLE features
(
    id BIGINT,
    norm_features MAP<STRING, FLOAT> 
);

答案是,要在脚本的输出中,对特殊字段按照HDFS文件中的格式输出即可。

例如,以上面的表结构为例,每行输出应为:

1^Ifeature1^C1.0^Bfeature2^C2.0

其中I是tab键,这是TRANSFORM要求的分割符号。B和^C是Hive存储时MAP类型的KV分割符。

另外,在Hive的TRANSFORM语句的时候,要注意AS中加上类型声明:

SELECT TRANSFORM(stuff)
USING 'script'
AS (thing1 INT, thing2 MAP<STRING, FLOAT>)

前置条件

MaxCompute基于ODPS2.0新一代的SQL引擎,显著提升了SQL语言编译过程的易用性与语言的表达能力。我们在此推出MaxCompute(ODPS2.0)重装上阵系列文章

Hive中的TRANSFORM:自定义Mapper和Reducer完成Map/Reduce

/**
 * Mapper.
 */
public interface Mapper {
  /**
   * Maps a single row into an intermediate rows.
   * 
   * @param record
   *          input record
   * @param output
   *          collect mapped rows.
   * @throws Exception
   *           on error
   */
  void map(String[] record, Output output) throws Exception;
}

可以将一列拆分为多列

使用样例:

public class ExecuteMap {

    private static final String FULL_PATH_CLASS = "com.***.dpop.ods.mr.impl.";

    private static final Map<String, Mapper> mappers = new HashMap<String, Mapper>();

    public static void main(String[] args) throws Exception {
        if (args.length < 1) {
            throw new Exception("Process class must be given");
        }

        new GenericMR().map(System.in, System.out,
                getMapper(args[0], Arrays.copyOfRange(args, 1, args.length)));
    }

    private static Mapper getMapper(String parserClass, String[] args)
            throws ClassNotFoundException {
        if (mappers.containsKey(parserClass)) {
            return mappers.get(parserClass);
        }

        Class[] classes = new Class[args.length];
        for (int i = 0; i < classes.length; ++i) {
            classes[i] = String.class;
        }
        try {
            Mapper mapper = (Mapper) Class.forName(FULL_PATH_CLASS + parserClass).getConstructor(classes).newInstance(args);
            mappers.put(parserClass, mapper);
            return mapper;
        } catch (ClassNotFoundException e) {
            throw new ClassNotFoundException("Unknown MapperClass:" + parserClass, e);
        } catch (Exception e) {
            throw new  ClassNotFoundException("Error Constructing processor", e);
        }

    }
}

MR_USING=" USING 'java -Xmx512m -Xms512m -cp ods-mr-1.0.jar:hive-contrib-2.3.33.jar com.***.dpop.ods.mr.api.ExecuteMap "

COMMAND="FROM dw_rtb.event_fact_adx_auction "
COMMAND="${COMMAND} INSERT overwrite TABLE dw_rtb.event_fact_mid_adx_auction_ad PARTITION(yymmdd=${CURRENT_DATE}) SELECT transform(search_id, print_time, pthread_id, ad_s) ${MR_USING} EventFactMidAdxAuctionAdMapper' as search_id, print_time, pthread_id, ad_s, ssp_id WHERE $INSERT_PARTITION and original = 'exinternal' "

 

第一弹 – 善用MaxCompute编译器的错误和警告

Hive Python Streaming的原理及写法

http://www.tuicool.com/articles/vmumUjA

了解到,虽然功能发布,不过还在公测阶段,如果想要使用,还得申请开通:。这里我就不介绍申请开通具体流程了。

第二弹 – 新的基本数据类型与内建函数

环境准备

MaxCompute Studio支持Python UDF开发,前提需要安装python,
pyodps和idea的python插件。

  1. 安装Python:可以Google或者百度搜索下如何安装。
  2. 安装pyodps:可以参考python
    sdk文档的安装步骤。即,在
    Python 2.6 以上(包括 Python 3),系统安装 pip 后,只需运行下 pip
    install pyodps,PyODPS 的相关依赖便会自动安装。
  3. Intellij IDEA中安装Python插件。搜索Python Community
    Edition插件并安装
  4. 太阳娱乐 1
  5. 配置studio module对python的依赖。

  6.  

    • File -> Project structure,添加python sdk:
    • 太阳娱乐 2
    • File -> Project structure,添加python facets:
      太阳娱乐 3
    • File -> Project structure,配置module依赖python facets:
      太阳娱乐 4

第三弹 – 复杂类型

开发Python UDF

环境都准备好后,既可在对应依赖的module里创建进行python udf开发。

第四弹 – CTE,VALUES,SEMIJOIN

新建python脚本。

右键 new | MaxCompute Python,弹框里输入脚本名称,选择类型为python udf:

太阳娱乐 5

生成的模板已自动填充框架代码,只需要编写UDF的入参出参,以及函数逻辑:
太阳娱乐 6

上次向您介绍了CTE,VALUES,SEMIJOIN,本篇向您介绍MaxCompute对其他脚本语言的支持

本地调试

代码开发好后,可以在Studio中进行本地调试。Studio支持下载表的部分sample数据到本地运行,进行debug,步骤如下:

  1. 右键python udf类,点击”运行”菜单,弹出run
    configuration对话框。UDF|UDAF|UDTF一般作用于select子句中表的某些列,此处需配置MaxCompute
    project,table和column(元数据来源于project
    explorer窗口和warehouse下的example项目):
    太阳娱乐 7
  2. 点击OK后,通过tunnel自动下载指定表的sample数据到本地warehouse目录(若之前已下载过,则不会再次重复下载,否则利用tunnel服务下载数据。默认下载100条,如需更多数据测试,可自行使用console的tunnel命令或者studio的表下载功能)。下载完成后,可以在warehouse目录看到下载的sample数据。这里用户也可以使用warehouse里的数据进行调试,具体可参考java
    udf开发中的关于本地运行的warehouse目录”部分)。
  3. 太阳娱乐 8
  4. 然后本地运行框架会根据指定的列,获取data文件里指定列的数据,调用UDF本地运行。
    太阳娱乐 9
  • SELECT TRANSFORM。

  • 场景1

  • 我的系统要迁移到MaxCompute平台上,系统中原来有很多功能是使用脚本来完成的,包括python,shell,ruby等脚本。
    要迁移到MaxCompute上,我需要把这些脚本全部都改造成UDF/UDAF/UDTF。改造过程不仅需要耗费时间人力,还需要做一遍又一遍的测试,从而保证改造成的udf和原来的脚本在逻辑上是等价的。我希望能有更简单的迁移方式。
  • 场景2
  • 太阳娱乐,SQL比较擅长的是集合操作,而我需要做的事情要对一条数据做更多的精细的计算,现有的内置函数不能方便的实现我想要的功能,而UDF的框架不够灵活,并且Java/Python我都不太熟悉。相比之下我更擅长写脚本。我就希望能够写一个脚本,数据全都输入到我的脚本里来,我自己来做各种计算,然后把结果输出。而MaxCompute平台就负责帮我把数据做好切分,让我的脚本能够分布式执行,负责数据的输入表和输出表的管理,负责JOIN,UNION等关系操作就好了。

注册发布Python UDF

  1. 代码调试好后,将python脚本添加为MaxCompute的Resource:
    太阳娱乐 10

注意此处选择的MaxCompute project必须是已经申请开通python
udf的project。

  1. 注册python 函数:
    太阳娱乐 11
  2. 在sql脚本中编辑MaxCompute sql试用python udf:
    太阳娱乐 12

原文链接:

上述功能可以使用SELECT TRANSFORM来实现

SELECT TRANSFORM 介绍

此文中采用MaxCompute Studio作展示,首先,安装MaxCompute
Studio,导入测试MaxCompute项目,创建工程,建立一个新的MaxCompute脚本文件, 如下

太阳娱乐 13

提交作业可以看到执行计划(全部展开后的视图):

太阳娱乐 14

Select
transform允许sql用户指定在服务器上执行一句shell命令,将上游数据各字段用tab分隔,每条记录一行,逐行输入shell命令的stdin,并从stdout读取数据作为输出,送到下游。Shell命令的本质是调用Unix的一些utility,因此可以启动其他的脚本解释器。包括python,java,php,awk,ruby等。

该命令兼容Hive的Transform功能,可以参考Hive的文档。一些需要注意的点如下:

  1. Using
    子句指定的是要执行的命令,而非资源列表,这一点和大多数的MaxCompute
    SQL语法不一样,这么做是为了和hive的语法保持兼容。

  2. 输入从stdin传入,输出从stdout传出;

  3. 可以配置分隔符,默认使用 t 分隔列,用换行分隔行;

  4. 可以自定义reader/writer,但用内置的reader/writer会快很多

  5. 使用自定义的资源(脚本文件,数据文件等),可以使用 set
    odps.sql.session.resources=foo.sh,bar.txt;
    来指定。可以指定多个resource文件,用逗号隔开(因此不允许resource名字中包含逗号和分号)。此外我们还提供了resources子句,可以在using
    子句后面指定 resources ‘foo.sh’, ‘bar.txt’
    来指定资源,两种方式是等价的(参考“用odps跑测试”的例子);

6.
资源文件会被下载到执行指定命令的工作目录,可以使用文件接口打开./bar.txt文件。

目前odps select transform完全兼容了hive的语法、功能和行为,包括
input/output row format 以及
reader/writer。Hive上的脚本,大部分可以直接拿来运行,部分脚本只需要经过少许改动即可运行。另外我们很多功能都用比hive更高执行效率的语言
(C++) 重构,用以优化性能。

应用场景举例

理论上select transform能实现的功能udtf都能实现,但是select
transform比udtf要灵活得多。且select
transform不仅支持java和python,还支持shell,perl等其它脚本和工具。
且编写的过程要简单,特别适合adhoc功能的实现。举几个例子:

  1. 无中生有造数据

太阳娱乐 15

或者使用python

太阳娱乐 16

上面的语句造出一份有50行的数据表,值是从1到50;
测试时候的数据就可以方便造出来了。功能看似简单,但以前是odps的一个痛点,没有方便的办法造数据,就不方便测试以及初学者的学习和探索。当然这也可以通过udtf来实现,但是需要复杂的流程:进入ide->写udtf->打包->add
jar/python->create function->执行->drop function->drop
resource。

  1. awk 用户会很喜欢这个功能

太阳娱乐 17

上面的语句仅仅是把value原样输出,但是熟悉awk的用户,从此过上了写awk脚本不写sql的日子

  1. 用odps跑测试

太阳娱乐 18

或者

太阳娱乐 19

这个例子是为了说明,很多java的utility可以直接拿来运行。java和python虽然有现成的udtf框架,但是用select
transform编写更简单,并且不需要额外依赖,也没有格式要求,甚至可以实现离线脚本拿来直接就用。

  1. 支持其他脚本语言

select transform (key, value) using “perl -e ‘while($input =
<STDIN>){print $input;}'” from src;

上面用的是perl。这其实不仅仅是语言支持的扩展,一些简单的功能,awk,
python, perl, shell
都支持直接在命令里面写脚本,不需要写脚本文件,上传资源等过程,开发过程更简单。另外,由于目前我们计算集群上没有php和ruby,所以这两种脚本不支持。

  1. 可以串联着用,使用 distribute by和 sort by对输入数据做预处理

太阳娱乐 20

或者用map,reduce的关键字会让逻辑显得清楚一些

太阳娱乐 21

理论上OpenMR的模型都可以映射到上面的计算过程。注意,使用map,reduce,select
transform这几个语法其实语义是一样的,用哪个关键字,哪种写法,不影响直接过程和结果。

性能

性能上,SELECT TRANSFORM 与UDTF
各有千秋。经过多种场景对比测试,数据量较小时,大多数场景下select
transform有优势,而数据量大时UDTF有优势。由于transform的开发更加简便,所以select
transform非常适合做adhoc的数据分析。

UDTF的优势:

  1. UDTF是有类型,而Transform的子进程基于stdin/stdout传输数据,所有数据都当做string处理,因此transform多了一步类型转换;
  2. Transform数据传输依赖于操作系统的管道,而目前管道的buffer仅有4KB,且不能设置,
    transform读/写 空/满 的pipe会导致进程被挂起;
  3. UDTF的常量参数可以不用传输,而Transform没办法利用这个优化。

SELECT TRANSFORM 的优势:

  1. 子进程和父进程是两个进程,而UDTF是单线程的,如果计算占比比较高,数据吞吐量比较小,可以利用服务器的多核特性
  2. 数据的传输通过更底层的系统调用来读写,效率比java高
  3. SELECT
    TRANSFORM支持的某些工具,如awk,是natvie代码实现的,和java相比理论上可能会有性能优势。

小结

MaxCompute基于ODPS2.0的SQL引擎,提供了SELECT
TRANSFORM功能,可以明显简化对脚本代码的引用,与此同时,也提高了性能!我们推荐您尽量使用SELECT
TRANSFORM。

标注

  • 注一,USING
    后面的字符串,在后台是直接起的子进程来调起命令,没有起shell,所以shell的某些语法,如输入输出重定向,管道等是不支持的。如果用户需要可以以
    shell 作为命令,真正的命令作为数据输入,参考“无中生有造数据”的例子;
  • 注二,JAVA 和 PYTHON 的实际路径,可以从JAVA_HOME 和 PYTHON_HOME
    环境变量中得到作业;

作者:隐林

本文为云栖社区原创内容,未经允许不得转载。返回搜狐,查看更多

责任编辑:

admin

网站地图xml地图