Blog Website

How to Implement User-defined-functions in ClickHouse

在数据库中,通常有三类不同的函数:

  • UDF:用户自定义函数
  • UDAF:用户自定义聚合函数
  • UDTF:用户自定义表函数

函数是什么

数据库中的函数与编程语言中的函数类似,具有参数和返回值,有的函数也可能没有参数,但都会有返回值。因此,我们能够在 SQL 语句中使用这些函数,并获取它的返回值。比如我们最熟悉的summaxmin 等均为函数,这三个函数实际上都是 UDAF,当我们要计算表中某一列的和/最大值/最小值的时候,就可以使用这些函数:

1
2
3
4
5
6
7
8
# 计算某一列的和,列名为 n
SELECT sum(n) FROM t1;

# 计算某一列的最大值,列名为 n
SELECT max(n) FROM t1;

# 计算某一列的最小值,列名为 n
SELECT min(n) FROM t1;

正如上文所说到的,数据库有三类函数:UDF、UDAF 和 UDTF。这三类函数在工作原理上面有比较大的差别。

UDF

UDF 的执行看起来像是独立的处理每一行数据,而实际上,函数不会作用在一个单独的行上,而是作用在以 Block 为单位的数据上,实现向量化查询。UDF 通常不会改变数据的行数(arrayJoin 除外),其对输入的数据列进行计算后,会产生一个新的数据列。以 plus 函数为例,其接收两个参数,然后将对应位置的值相加,返回一个新的列:

1
SELECT plus(m, n) FROM t1

实际上,可以直接写:

1
SELECT m + n FROM t1

m + nParse 阶段会被解析成 plus(m, n)

ClickHouse 中包含许多种类功能丰富的 UDF,能够进行各种算术逻辑运算,字符串处理,时间日期计算以及哈希函数等等,更多信息可参考官方文档

UDAF

聚合函数是状态函数,它们会将传入的值激活到某个状态,从而能够从其中获取值。上文提到的 summaxmin 均为聚合函数,不像 UDF 独立地处理每一行数据,生成一个新的列,UDAF 会对传入其中的数据进行聚合计算,最终只产生一个值,比如一个,一个最大值,或者一个最小值

UDTF

用户自定义表函数,正如名字所示,这类函数会返回一个临时的表(Table),从而可以通过 SELECT 语句从中读取数据,甚至通过 INSERT 语句插入数据,比如 cluster 表函数。

UDTF 与 UDF 和 UDAF 的区别比较大。我们以一个通俗的比喻来说明。

我们把一个表中的两列数据当做 C/C++ 里面的两个数组 ab 那么 plus(a, b) 所做的事情就是把两个数组对应位置的值相加,返回一个新的数组,这里要求 ab 的长度相同,在 ClickHouse 中,一个 Block 中的列行数一定是相等的;而 sum(a) 所做的事情就是把数组 a 中所有的值相加,最后返回一个累加后值;而对于 UDTF,它的参数不会是数据列,而是某些具有特殊含义的常量值,然后其根据函数的具体定义返回一个临时表,以 C/C++ 中的函数来类比,它可能更像这样一个函数: create(100, 0) - 表示要产生一个长度为 100,值为 0 的数组,但是,不是直接返回这个数组,而是返回一个包含这样一个列的表,之后能够通过 SELECT 从中获取数据。

因此,在一个 SQL 语句中,UDF 和 UDAF 会跟在 SELECT 的后面,而 UDTF 则会跟在 FROM 的后面

比如 SELECT * FROM numbers(10)numbers(10) 就是 ClickHouse 中的一个 UDTF,其会返回一个临时表,该表中有一列数据,列名为 number,值为 0 到 9 的所有整数。

整体架构

在 ClickHouse 中,UDF、UDAF 和 UDTF 分别是三个不同的模块。

UDF

UDF 模块的代码位于 src/Functions 目录下,其中定义了 UDF 的接口类 IFunction,因此,当我们要添加新的 UDF 时,通常只需要继承该接口类并实现其中的如下一些重要接口:

1
String getName() const override;

该函数需要返回一个 String 类型的值,其为函数的名字。

1
size_t getNumberOfArguments() const override;

该函数需要返回一个整数,表示函数接收的参数个数。如果函数的参数个数是可变的,那么该函数返回0,同时,继承:

1
bool isVariadic() const override;

函数,并返回 true

1
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override;

在该函数中完成对参数个数和数据类型的检查,同时返回函数返回值对应的 DataType。

1
bool useDefaultImplementationForConstants() const override;

该函数表示如果对于所有参数都为常量的情况下,是否使用系统的默认实现,通常返回true即可。

1
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &result_type, size_t input_rows_count) const override;

该函数实现函数具体的执行逻辑,第一个参数为输入的参数,可通过下标访问对应的参数,第二个参数为函数返回值的类型,第三个参数为输入的行数。在该函数,需要根据输入参数(对应的列),执行相应计算,并构造一个新的列,其中存储计算产生的结果值,最后返回新生成的列,其即为函数作用在这一参数对应的 Block 上面生成的结果。

事实上,IFunction 接口类中还有许多其他的方法,可以通过继承并实现那些方法进一步控制函数的行为和功能等,同时,实现函数并非一定是继承该接口类。更多信息可参考 src/Functions/IFunction.h, src/Functions/IFunctionImpl.h

最后,当实现完函数类之后,需要在 FunctionFactory 中完成该函数的注册,FunctionFactory 使用一个 std::unordered_map 来保存函数名和对应的函数类的实例,从而在执行 SQL 时,能够通过名字来找到对应的函数并执行。

UDAF

UDAF 模块的代码文件位于 src/AggregateFunctions/ 目录下。同样地,当添加一个新的聚合函数时,需要继承接口类IAggregateFunction作为该类的子类来实现。其中,主要需要实现以下几个方法:

1
String getName() const override;

同上面的普通函数,返回函数名字。

1
DataTypePtr getReturnType() const override;

返回该聚合函数的返回值类型。

1
void add(AggregateDataPtr place, const IColumn ** columns, const size_t row_num, Arena *) const override

add方法是聚合函数中的核心方法,其将place指针对应聚合状态数据取出,与columns中对应的第row_num行的数据进行聚合计算。同时,大多数情况下,还需要实现该聚合函数的聚合状态类型,以真正完成数据的聚合计算。

1
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override

merge方法将两个聚合状态进行合并,在并发执行聚合函数的过程中,需要将对应的聚合结果进行合并。

1
2
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override;
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override;

序列化和反序列化方法在分布式查询执行进行网络传输和内存不够的时候被使用。

1
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena * arena) const override;

通过该方法获取最终的聚合结果。

除了IAggregateFunction接口之外,还有IAggregateFunctionHelper辅助接口,其通过模板的类型派生,将虚函数的调用转换为函数指针的调用,从而提高聚合函数的计算效率。

对于更复杂的聚合函数实现,可能还需要继承实现更多的IAggregateFunction接口,具体可参考src/AggregateFunctions/IAggregateFunction.h

最后,同样需要将新添加的聚合函数在AggregateFunctionFactory中进行注册。

UDTF

UDTF 模块位于 src/TableFunctions/ 目录下。实现TableFunction时需要继承ITableFunction接口,并实现从中继承的几个方法:

1
std::string getName() const override;

同上,返回函数名字。

1
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const String & table_name, ColumnsDescription cached_columns) const override;

该方法根据传入的参数构建出一个临时表,并返回。

1
const char * getStorageTypeName() const override;

该方法返回该 UDTF 创建的临时表对应的底层存储的名字。

1
void parseArguments(const ASTPtr & ast_function, const Context & context) override;

该方法对 AST 节点参数进行解析,以便后续获取真实的表结构。

1
ColumnsDescription getActualTableStructure(const Context & context) const override;

该方法返回 UDTF 对应的表结构。

最后,将其在对应的 Factory 中完成注册。

示例

下面,我们以一些简单的例子来展示如何在 ClickHouse 中实现 UDF、UDAF 和 UDTF。

UDF

下面,我们以一个简单的示例来展示如何在ClickHouse中添加UDF。我们想要添加的这个函数接收一个String类型的参数,并返回String的长度。首先,在src/Functions/目录下新建代码文件StrLen.cpp(由于该函数较为简单,不再将头文件和源文件分开),下面开始编写代码。

首先,引入一些必要的头文件,在实现函数时,需要和 ClickHouse 中的 DataTypeColumn 相关的类型和方法打交道。

1
2
3
4
5
6
7
8
#include <Columns/ColumnString.h>
#include <Columns/ColumnVector.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/IFunction.h>
#include <Functions/IFunctionImpl.h>

之后,我们开始实现继承自 IFunction 接口类的子类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class FunctionStrLen : public IFunction
{
public:
static constexpr auto name = "strLen";
String getName() const override { return name; }

size_t getNumberOfArguments() const override { return 1; }

DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
....
}

bool useDefaultImplementationForConstants() const override { return true; }
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t) const override
{
...
}
}

首先,我们声明一个静态的 String 类型的成员变量 name,值为 strLen,也就是该函数的名字,在 getName 方法中返回该变量。

getNumberOfArguments 函数返回 1,表示函数的参数个数为 1。

useDefaultImplementationForConstants 返回 true 即可。函数参数为 “hello”,”world” 之类的即为常量,而如果参数是某个表中的一个 String 列则不是常量。当返回 true 时,对于常量参数,系统会将其转为一个非常量的列,从而按列的方式进行计算,因此,在计算逻辑中我们只用实现非常量的情况。

下面来看另外两个较为复杂的函数。

1
2
3
4
5
6
7
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
if (!isString(arguments[0]))
throw Exception(
"Illegal type " + arguments[0]->getName() + " of argument of function " + getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return std::make_shared<DataTypeUInt64>();
}

getReturnTypeImpl 函数完成对参数个数和数据类型的检查,同时返回函数返回值对应的 DataType。在这里,我们需要检查函数参数为 String 类型,如果不是,则抛出异常。然后返回函数的返回值类型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t) const override
{
const auto & strcolumn = arguments[0].column;

if (const ColumnString * col = checkAndGetColumn<ColumnString>(strcolumn.get()))
{
const auto & offsets = col->getOffsets();

auto col_res = ColumnVector<UInt64>::create();
auto & res_data = col_res->getData();
res_data.resize(offsets.size());

for (size_t i = 0; i < offsets.size(); ++i)
res_data[i] = offsets[i] - offsets[i - 1] - 1;

return col_res;
}
throw Exception(
"Illegal column " + arguments[0].column->getName() + " of argument of function " + getName(), ErrorCodes::ILLEGAL_COLUMN);
}

executeImpl 函数实现函数具体的执行逻辑,第一个参数为输入的参数,可通过下标访问对应的参数,第二个参数为函数返回值的类型,第三个参数为输入的行数。在该函数,需要根据输入参数(对应的列),执行相应计算,并构造一个新的列,其中存储计算产生的结果值,最后返回新生成的列,其即为函数作用在这一参数对应的 Block 上面生成的结果。

在上面的代码中,输入参数是一个 ColumnString 列,首先从参数中获取到该列。我们要计算的是该列中每一个 String 的长度,并返回一个长度列。ColumnString 的内存表示是两个数组,其中一个数组保存 String 的数组,两个 String 之间有一个 0 值,另一个数组为 offset 数组,其中的值表示每一个 String 在前一个数组中的偏移,为了计算 String 的长度,我们只需访问 offset 数组。首先,创建一个 UInt64 类型的 ColumnVector,并将大小 resize 为和 ColumnString 中的 String 个数相同,然后通过 offset 数组计算出每一个 String 的长度并填到 ColumnVector 列中。最后返回创建的整数列。

最后,当实现完函数类之后,我们需要在 FunctionFactory 中完成该函数的注册。

注册函数时,我们需要在 src/Functions/registerFunctions.cpp 中声明函数 registerFunctionStrLen,并在 StrLen.cpp 文件中实现:

1
void registerFunctionStrLen(FunctionFactory & factory) { factory.registerFunction<FunctionStrLen>(); }

最后,在 src/Functions/registerFunctions.cpp 中的 registerFunctions 函数中调用该函数完成函数的注册。

下面,是 StrLen.cpp 文件的完整内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
#include <Columns/ColumnString.h>
#include <Columns/ColumnVector.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/IFunction.h>
#include <Functions/IFunctionImpl.h>

namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_COLUMN;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int TOO_LARGE_STRING_SIZE;
}

namespace
{
class FunctionStrLen : public IFunction
{

public:
static constexpr auto name = "strLen";
static FunctionPtr create(const Context &) { return std::make_shared<FunctionStrLen>(); }

String getName() const override { return name; }

size_t getNumberOfArguments() const override { return 1; }

DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
if (!isString(arguments[0]))
throw Exception(
"Illegal type " + arguments[0]->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return std::make_shared<DataTypeUInt64>();
}

bool useDefaultImplementationForConstants() const override { return true; }

ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t) const override
{
const auto & strcolumn = arguments[0].column;

if (const ColumnString * col = checkAndGetColumn<ColumnString>(strcolumn.get()))
{
const auto & offsets = col->getOffsets();

auto col_res = ColumnVector<UInt64>::create();
auto & res_data = col_res->getData();
res_data.resize(offsets.size());

for (size_t i = 0; i < offsets.size(); ++i)
res_data[i] = offsets[i] - offsets[i - 1] - 1;

return col_res;
}
throw Exception(
"Illegal column " + arguments[0].column->getName() + " of argument of function " + getName(), ErrorCodes::ILLEGAL_COLUMN);
}
};

}

void registerFunctionStrLen(FunctionFactory & factory)
{
factory.registerFunction<FunctionStrLen>();
}
}

UDAF

接下来,我们来实现一个简单的聚合函数 aggStrLen,该函数也是获取 String 的长度,但与上面的函数不同的是,它获取的是所有 String 长度的和。

首先,在 src/AggregateFunctions/ 目录下创建两个新的文件 AggregateFunctionStrLen.hAggregateFunctionStrLen.cpp。在头文件中,我们完成函数实现,在源文件实现注册函数。

同样地,首先引入必要的文件:

1
2
3
4
5
6
7
8
#include <type_traits>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnVector.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <AggregateFunctions/IAggregateFunction.h>

接下来,由于聚合函数是状态函数,我们需要先定义一个结构,保存该聚合函数的中间状态值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
struct AggregateFunctionStrLenData
{
UInt64 sum{};

void ALWAYS_INLINE add(UInt64 value) { sum += value; }

void merge(const AggregateFunctionStrLenData & rhs) { sum += rhs.sum; }

void write(WriteBuffer & buf) const
{
writeBinary(sum, buf);
}

void read(ReadBuffer & buf)
{
readBinary(sum, buf);
}

UInt64 get() const { return sum; }
};

由于该聚合函数计算的是 String 的长度,因此用一个 UInt64 的值来保存状态值即可。add 方法将一个新的值加到 sum 中,merge 方法完成两个状态的聚合。

下面,开始实现函数类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
template <typename Data>
class AggregateFunctionStrLen final : public IAggregateFunctionDataHelper<Data, AggregateFunctionStrLen<Data>>
{
public:
String getName() const override { return "aggStrLen"; }

AggregateFunctionStrLen(const DataTypes & argument_types_)
: IAggregateFunctionDataHelper<Data, AggregateFunctionStrLen<Data>>(argument_types_, {})
{}

DataTypePtr getReturnType() const override { return std::make_shared<DataTypeUInt64>(); }

void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override
{
const auto & column = static_cast<const ColumnString &>(*columns[0]);
const auto & offsets = column.getOffsets();
this->data(place).add(offsets[row_num] - offsets[row_num - 1] - 1);
}

void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override
{
this->data(place).merge(this->data(rhs));
}

void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
this->data(place).write(buf);
}

void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
{
this->data(place).read(buf);
}

void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
{
auto & column = static_cast<ColumnVector<UInt64> &>(to);
column.getData().push_back(this->data(place).get());
}
};

该聚合函数继承自 IAggregateFunctionDataHelper 接口类,getName 方法返回函数的名字 aggStrLen,防止和上面的 strLen 混淆。

getReturnType 方法返回函数的返回值类型。

add 方法将 columns 中对应列的 row_num 行的数据取出来,和 place 指针指向的聚合状态进行聚合计算,place 指向的即为一个上面定义的 struct 对象。

merge 方法将 rhs 对应的聚合状态取出来,和 place 对应的聚合状态进行聚合。

serializedeserialize 方法完成聚合状态的序列化和反序列化。序列化和反序列化方法在分布式查询执行进行网络传输和内存不够的时候被使用。

最后 insertResultInto 方法将最终计算得到的值插入到 IColumn to 中。

下面,我们需要在 AggregateFunctionStrLen.cpp 中实现注册函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
AggregateFunctionPtr createAggregateFunctionStrLen(const std::string & name, const DataTypes & argument_types, const Array & parameters)
{
assertNoParameters(name, parameters);
assertUnary(name, argument_types);

DataTypePtr data_type = argument_types[0];
if (!isString(data_type))
throw Exception(
"Illegal type " + argument_types[0]->getName() + " of argument for aggregate function " + name,
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);

return std::make_shared<AggregateFunctionStrLen<AggregateFunctionStrLenData>>(argument_types);
}

void registerAggregateFunctionStrLen(AggregateFunctionFactory & factory)
{
factory.registerFunction("aggStrLen", createAggregateFunctionStrLen);
}

在该文件中,我们新实现一个函数 createAggregateFunctionStrLen,该函数检查聚合函数的参数是否符合要求,然后创建一个 AggregateFunctionStrLen 对象返回。

registerAggregateFunctionStrLen 函数是在 src/AggregateFunctions/registerAggregateFunctions.cpp 中声明的,最后,我们同样需要在 registerAggregateFunctions 中调用该函数以完成该聚合函数的注册。

下面,是 AggregateFunctionStrLen.hAggregateFunctionStrLen.cpp 的完整代码:

AggregateFunctionStrLen.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#pragma once

#include <type_traits>

#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>

#include <Columns/ColumnString.h>
#include <Columns/ColumnVector.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>

#include <AggregateFunctions/IAggregateFunction.h>

namespace DB
{
struct AggregateFunctionStrLenData
{
UInt64 sum{};

void ALWAYS_INLINE add(UInt64 value) { sum += value; }

void merge(const AggregateFunctionStrLenData & rhs) { sum += rhs.sum; }

void write(WriteBuffer & buf) const
{
writeBinary(sum, buf);
}

void read(ReadBuffer & buf)
{
readBinary(sum, buf);
}

UInt64 get() const { return sum; }
};

template <typename Data>
class AggregateFunctionStrLen final : public IAggregateFunctionDataHelper<Data, AggregateFunctionStrLen<Data>>
{
public:
String getName() const override { return "aggStrLen"; }

AggregateFunctionStrLen(const DataTypes & argument_types_)
: IAggregateFunctionDataHelper<Data, AggregateFunctionStrLen<Data>>(argument_types_, {})
{}

DataTypePtr getReturnType() const override { return std::make_shared<DataTypeUInt64>(); }

void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override
{
const auto & column = static_cast<const ColumnString &>(*columns[0]);
const auto & offsets = column.getOffsets();
this->data(place).add(offsets[row_num] - offsets[row_num - 1] - 1);
}

void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override
{
this->data(place).merge(this->data(rhs));
}

void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
this->data(place).write(buf);
}

void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
{
this->data(place).read(buf);
}

void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
{
auto & column = static_cast<ColumnVector<UInt64> &>(to);
column.getData().push_back(this->data(place).get());
}
};

}

AggregateFunctionStrLen.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/AggregateFunctionStrLen.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include <AggregateFunctions/Helpers.h>
#include "registerAggregateFunctions.h"


namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
}

namespace
{
AggregateFunctionPtr createAggregateFunctionStrLen(const std::string & name, const DataTypes & argument_types, const Array & parameters)
{
assertNoParameters(name, parameters);
assertUnary(name, argument_types);

DataTypePtr data_type = argument_types[0];
if (!isString(data_type))
throw Exception(
"Illegal type " + argument_types[0]->getName() + " of argument for aggregate function " + name,
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);

return std::make_shared<AggregateFunctionStrLen<AggregateFunctionStrLenData>>(argument_types);
}
}

void registerAggregateFunctionStrLen(AggregateFunctionFactory & factory)
{
factory.registerFunction("aggStrLen", createAggregateFunctionStrLen);
}

}

UDTF

最后,我们来实现一个简单的 UDTF。事实上,当实现一个 UDTF 时,首先需要实现对应的表引擎,在这里,我们暂时不实现新的表引擎,使用一个系统中已有的表引擎:StorageSystemContributors,该表引擎文件位于 src/Storages/System/ 目录下,其包含一个名为 nameString 列,列中的每一个值为一个 ClickHouse Contributor 的 GitHub 名字。

我们来实现这样一个简单的 UDTF,该函数返回一个临时的 StorageSystemContributors 表。

首先,在 src/TableFunctions/ 目录下创建两个新的文件:TableFunctionContributors.hTableFunctionContributors.cpp

在头文件中,完成类的定义:

1
2
3
4
5
6
7
8
9
10
class TableFunctionContributors : public ITableFunction
{
public:
static constexpr auto name = "contributors";
std::string getName() const override { return name; }
private:
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const String & table_name, ColumnsDescription cached_columns) const override;
const char * getStorageTypeName() const override { return "StorageSystemContributors"; }

ColumnsDescription getActualTableStructure(const Context & context) const override;

UDTF 需要继承 ITableFunction 接口。同样地,getName 返回该函数的名字:contributorsgetStorageTypeName 返回表引擎类型名字。

下面,我们在源文件中实现另外两个个函数以及注册函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
ColumnsDescription TableFunctionContributors::getActualTableStructure(const Context & /*context*/) const
{
return ColumnsDescription{{{"name", std::make_shared<DataTypeString>()}}};
}

StoragePtr TableFunctionContributors::executeImpl(
const ASTPtr & /*ast_function*/, const Context & , const std::string & table_name, ColumnsDescription /*cached_columns*/) const
{
auto res = StorageSystemContributors::create(StorageID(getDatabaseName(), table_name));
res->startup();
return res;
}

void registerTableFunctionContributors(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionContributors>();
}

getActualTableStructure 函数返回 ColumnDescription,即返回的临时表的列列信息(列名以及列的数据类型)。

executeImpl 函数实现表函数的具体执行逻辑,在这儿,只需要创建一个对应的 StorageSystemContributors 表并返回即可。

registerTableFunctionContributors 函数同样是在 src/TableFunctions/registerTableFunctions.h 中声明中,在这儿实现,然后在 src/TableFunctions/registerTableFunctions/ 中的 registerTableFunctions 中调用完成函数注册。

下面,是这两个文件的完整内容:

TableFunctionContributors.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#pragma once

#include <TableFunctions/ITableFunction.h>
#include <Core/Types.h>

namespace DB
{
class TableFunctionContributors : public ITableFunction
{
public:
static constexpr auto name = "contributors";
std::string getName() const override { return name; }
private:
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const String & table_name, ColumnsDescription cached_columns) const override;
const char * getStorageTypeName() const override { return "Contributors"; }

ColumnsDescription getActualTableStructure(const Context & context) const override;

};

}

TableFunctionContributors.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Storages/System/StorageSystemContributors.h>
#include <TableFunctions/ITableFunction.h>
#include <TableFunctions/TableFunctionContributors.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <TableFunctions/parseColumnsListForTableFunction.h>
#include "registerTableFunctions.h"


namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}

ColumnsDescription TableFunctionContributors::getActualTableStructure(const Context & /*context*/) const
{
return ColumnsDescription{{{"name", std::make_shared<DataTypeString>()}}};
}

StoragePtr TableFunctionContributors::executeImpl(
const ASTPtr & /*ast_function*/, const Context & , const std::string & table_name, ColumnsDescription /*cached_columns*/) const
{
auto res = StorageSystemContributors::create(StorageID(getDatabaseName(), table_name));
res->startup();
return res;
}

void registerTableFunctionContributors(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionContributors>();
}
}

使用

添加完这些函数之后,需要重新编译 ClickHouse 并重启。

下面,我们开始展示如何使用上面实现的这些函数。

strLen

1
2
3
4
5
6
7
8
9
10
11
:) select strLen('hello,world')

SELECT strLen('hello,world')

Query id: 4f3be5f4-5001-4f5e-95dd-620edbd99132

┌─strLen('hello,world')─┐
11
└───────────────────────┘

1 rows in set. Elapsed: 0.002 sec.

strLen 函数接收一个参数,返回输入参数的长度

aggStrLen

1
2
3
4
5
6
7
8
9
10
11
:) select aggStrLen('hello,world')

SELECT aggStrLen('hello,world')

Query id: ff829043-e241-429e-9db1-a68d6b439457

┌─aggStrLen('hello,world')─┐
11
└──────────────────────────┘

1 rows in set. Elapsed: 0.002 sec.

同样地,aggStrLen 也接收 String 参数,在这里,我们看到返回值和上面的 strLen 一样,这是因为参数为常量,只有一个 String,故两个函数的返回值相同。

contributors

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
:) select * from contributors() limit 10

SELECT *
FROM contributors()
LIMIT 10

Query id: fd2efbc2-ca5b-4414-a6d0-76ae7c88987f

┌─name─────────────┐
│ franklee │
│ tiger.yan │
│ ikopylov │
│ alexey-milovidov │
│ Andrey Mironov │
│ Alexander Lukin │
│ maxulan │
│ ageraab │
│ Nikolay Kirsh │
│ Stepan Herold │
└──────────────────┘

10 rows in set. Elapsed: 0.002 sec.

上面展示了 contributors 表函数的使用,其会返回一个临时表,我们可以通过 SELECT 从中获取数据,从上面的结果中可以看出,临时表含有一个 name 列,列中的每一个值为一个人名,即为 ClickHouse contributor。

下面,我们展示一下如何将这三个函数结合使用,strLenaggStrLen 可以作用在非常量列上面。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
:) select strLen(name) from contributors() limit 10

SELECT strLen(name)
FROM contributors()
LIMIT 10

Query id: ec0abeb5-28df-41b2-9db0-47932a2d9202

┌─strLen(name)─┐
20
4
6
8
12
9
12
15
6
6
└──────────────┘

10 rows in set. Elapsed: 0.002 sec.

:) select aggStrLen(name) from contributors()

SELECT aggStrLen(name)
FROM contributors()

Query id: e5fa1c7d-4d53-4a64-babf-90c058682b69

┌─aggStrLen(name)─┐
9727
└─────────────────┘

1 rows in set. Elapsed: 0.003 sec.

从上面的结果中,我们能够清晰地看出 strLenaggStrLen 的区别了,前者计算每一个名字的长度,返回一个新的列,后者计算所有名字长度的和,最后只返回一个聚合后的值。

总结

从上文中,我们可以看到,ClickHouse 具有良好的代码结构,易于扩展,这使得我们可以很容易地在其中添加自定义函数,完成各种各样丰富的功能。