UDF(用戶定義函數(shù))
在有些應(yīng)用場景中,應(yīng)用邏輯需要的查詢無法直接使用系統(tǒng)內(nèi)置的函數(shù)來表示。利用 UDF 功能,TDengine 可以插入用戶編寫的處理代碼并在查詢中使用它們,就能夠很方便地解決特殊應(yīng)用場景中的使用需求。 UDF 通常以數(shù)據(jù)表中的一列數(shù)據(jù)做為輸入,同時支持以嵌套子查詢的結(jié)果作為輸入。
從 2.2.0.0 版本開始,TDengine 支持通過 C/C++ 語言進行 UDF 定義。接下來結(jié)合示例講解 UDF 的使用方法。
用 C/C++ 語言來定義 UDF
TDengine 提供 3 個 UDF 的源代碼示例,分別為:
標量函數(shù)
add_one.c 是結(jié)構(gòu)最簡單的 UDF 實現(xiàn)。其功能為:對傳入的一個數(shù)據(jù)列(可能因 WHERE 子句進行了篩選)中的每一項,都輸出 +1 之后的值,并且要求輸入的列數(shù)據(jù)類型為 INT。
這一具體的處理邏輯在函數(shù) void add_one(char* data, short itype, short ibytes, int numOfRows, long long* ts, char* dataOutput, char* interBuf, char* tsOutput, int* numOfOutput, short otype, short obytes, SUdfInit* buf) 中定義。這類用于實現(xiàn) UDF 的基礎(chǔ)計算邏輯的函數(shù),我們稱為 udfNormalFunc,也就是對行數(shù)據(jù)塊的標量計算函數(shù)。需要注意的是,udfNormalFunc 的參數(shù)項是固定的,用于按照約束完成與引擎之間的數(shù)據(jù)交換。
- udfNormalFunc 中各參數(shù)的具體含義是:
- data:輸入數(shù)據(jù)。
- itype:輸入數(shù)據(jù)的類型。這里采用的是短整型表示法,與各種數(shù)據(jù)類型對應(yīng)的值可以參見 column_meta 中的列類型說明。例如 4 用于表示 INT 型。
- iBytes:輸入數(shù)據(jù)中每個值會占用的字節(jié)數(shù)。
- numOfRows:輸入數(shù)據(jù)的總行數(shù)。
- ts:主鍵時間戳在輸入中的列數(shù)據(jù)(只讀)。
- dataOutput:輸出數(shù)據(jù)的緩沖區(qū),緩沖區(qū)大小為用戶指定的輸出類型大小 * numOfRows。
- interBuf:中間計算結(jié)果的緩沖區(qū),大小為用戶在創(chuàng)建 UDF 時指定的BUFSIZE大小。通常用于計算中間結(jié)果與最終結(jié)果不一致時使用,由引擎負責(zé)分配與釋放。
- tsOutput:主鍵時間戳在輸出時的列數(shù)據(jù),如果非空可用于輸出結(jié)果對應(yīng)的時間戳。
- numOfOutput:輸出結(jié)果的個數(shù)(行數(shù))。
- oType:輸出數(shù)據(jù)的類型。取值含義與 itype 參數(shù)一致。
- oBytes:輸出數(shù)據(jù)中每個值占用的字節(jié)數(shù)。
- buf:用于在 UDF 與引擎間的狀態(tài)控制信息傳遞塊。
聚合函數(shù)
abs_max.c 實現(xiàn)的是一個聚合函數(shù),功能是對一組數(shù)據(jù)按絕對值取最大值。
其計算過程為:與所在查詢語句相關(guān)的數(shù)據(jù)會被分為多個行數(shù)據(jù)塊,對每個行數(shù)據(jù)塊調(diào)用 udfNormalFunc(在本例的實現(xiàn)代碼中,實際函數(shù)名是 abs_max)來生成每個子表的中間結(jié)果,再將子表的中間結(jié)果調(diào)用 udfMergeFunc(本例中,其實際的函數(shù)名是 abs_max_merge)進行聚合,生成超級表的最終聚合結(jié)果或中間結(jié)果。聚合查詢最后還會通過 udfFinalizeFunc(本例中,其實際的函數(shù)名是 abs_max_finalize)再把超級表的中間結(jié)果處理為最終結(jié)果,最終結(jié)果只能含0或1條結(jié)果數(shù)據(jù)。
值得注意的是,udfNormalFunc、udfMergeFunc、udfFinalizeFunc 之間,函數(shù)名約定使用相同的前綴,此前綴即 udfNormalFunc 的實際函數(shù)名。udfMergeFunc 的函數(shù)名后綴 _merge、udfFinalizeFunc 的函數(shù)名后綴 _finalize,是 UDF 實現(xiàn)規(guī)則的一部分,系統(tǒng)會按照這些函數(shù)名后綴來調(diào)用相應(yīng)功能。
-
udfMergeFunc 用于對計算中間結(jié)果進行聚合,只有針對超級表的聚合查詢才需要調(diào)用該函數(shù)。本例中 udfMergeFunc 對應(yīng)的實現(xiàn)函數(shù)為
void abs_max_merge(char* data, int32_t numOfRows, char* dataOutput, int32_t* numOfOutput, SUdfInit* buf),其中各參數(shù)的具體含義是:- data:udfNormalFunc 的輸出數(shù)據(jù)數(shù)組,如果使用了 interBuf 那么 data 就是 interBuf 的數(shù)組。
- numOfRows:data 中數(shù)據(jù)的行數(shù)。
- dataOutput:輸出數(shù)據(jù)的緩沖區(qū),大小等于一條最終結(jié)果的大小。如果此時輸出還不是最終結(jié)果,可以選擇輸出到 interBuf 中即data中。
- numOfOutput:輸出結(jié)果的個數(shù)(行數(shù))。
- buf:用于在 UDF 與引擎間的狀態(tài)控制信息傳遞塊。
-
udfFinalizeFunc 用于對計算結(jié)果進行最終計算,通常用于有 interBuf 使用的場景。本例中 udfFinalizeFunc 對應(yīng)的實現(xiàn)函數(shù)為
void abs_max_finalize(char* dataOutput, char* interBuf, int* numOfOutput, SUdfInit* buf),其中各參數(shù)的具體含義是:- dataOutput:輸出數(shù)據(jù)的緩沖區(qū)。
- interBuf:中間結(jié)算結(jié)果緩沖區(qū),可作為輸入。
- numOfOutput:輸出數(shù)據(jù)的個數(shù),對聚合函數(shù)來說只能是0或者1。
- buf:用于在 UDF 與引擎間的狀態(tài)控制信息傳遞塊。
其他典型場景,如協(xié)方差的計算,即可通過定義聚合UDF的方式實現(xiàn)。
其他 UDF 函數(shù)
用戶 UDF 程序除了需要實現(xiàn)上面幾個函數(shù)外,還有兩個用于初始化和釋放 UDF 與引擎間的狀態(tài)控制信息傳遞塊的函數(shù)。具體來說,也即對應(yīng) udfInitFunc 和 udfDestroyFunc。其函數(shù)名命名規(guī)則同樣是采取以 udfNormalFunc 的實際函數(shù)名為前綴,以 _init 和 _destroy 為后綴。系統(tǒng)會在初始化和資源釋放時調(diào)用對應(yīng)名稱的函數(shù)。
-
udfInitFunc 用于初始化狀態(tài)控制信息傳遞塊。上例中 udfInitFunc 對應(yīng)的實現(xiàn)函數(shù)為
int abs_max_init(SUdfInit* buf),其中各參數(shù)的具體含義是:- buf:用于在 UDF 與引擎間的狀態(tài)控制信息傳遞塊。
-
udfDestroyFunc 用于釋放狀態(tài)控制信息傳遞塊。上例中 udfDestroyFunc 對應(yīng)的實現(xiàn)函數(shù)為
void abs_max_destroy(SUdfInit* buf),其中各參數(shù)的具體含義是:- buf:用于在 UDF 與引擎間的狀態(tài)控制信息傳遞塊。
目前該功能暫時沒有實際意義,待后續(xù)擴展使用。
UDF 實現(xiàn)方式的規(guī)則總結(jié)
根據(jù) UDF 函數(shù)類型的不同,用戶所要實現(xiàn)的功能函數(shù)也不同:
- 標量函數(shù):UDF 中需實現(xiàn) udfNormalFunc。
- 聚合函數(shù):UDF 中需實現(xiàn) udfNormalFunc、udfMergeFunc(對超級表查詢)、udfFinalizeFunc。
需要注意的是,如果對應(yīng)的函數(shù)不需要具體的功能,也需要實現(xiàn)一個空函數(shù)。
編譯 UDF
用戶定義函數(shù)的 C 語言源代碼無法直接被 TDengine 系統(tǒng)使用,而是需要先編譯為 .so 鏈接庫,之后才能載入 TDengine 系統(tǒng)。
例如,按照上一章節(jié)描述的規(guī)則準備好了用戶定義函數(shù)的源代碼 add_one.c,那么可以執(zhí)行如下指令編譯得到動態(tài)鏈接庫文件:
gcc -g -O0 -fPIC -shared add_one.c -o add_one.so
這樣就準備好了動態(tài)鏈接庫 add_one.so 文件,可以供后文創(chuàng)建 UDF 時使用了。為了保證可靠的系統(tǒng)運行,編譯器 GCC 推薦使用 7.5及以上版本。
在系統(tǒng)中管理和使用 UDF
創(chuàng)建 UDF
用戶可以通過 SQL 指令在系統(tǒng)中加載客戶端所在主機上的 UDF 函數(shù)庫(不能通過 RESTful 接口或 HTTP 管理界面來進行這一過程)。一旦創(chuàng)建成功,則當(dāng)前 TDengine 集群的所有用戶都可以在 SQL 指令中使用這些函數(shù)。UDF 存儲在系統(tǒng)的 MNode 節(jié)點上,因此即使重啟 TDengine 系統(tǒng),已經(jīng)創(chuàng)建的 UDF 也仍然可用。
在創(chuàng)建 UDF 時,需要區(qū)分標量函數(shù)和聚合函數(shù)。如果創(chuàng)建時聲明了錯誤的函數(shù)類別,則可能導(dǎo)致通過 SQL 指令調(diào)用函數(shù)時出錯。此外, UDF 支持輸入與輸出類型不一致,用戶需要保證輸入數(shù)據(jù)類型與 UDF 程序匹配,UDF 輸出數(shù)據(jù)類型與 OUTPUTTYPE 匹配。
-
創(chuàng)建標量函數(shù):
CREATE FUNCTION ids(X) AS ids(Y) OUTPUTTYPE typename(Z) [ BUFSIZE B ];- ids(X):標量函數(shù)未來在 SQL 指令中被調(diào)用時的函數(shù)名,必須與函數(shù)實現(xiàn)中 udfNormalFunc 的實際名稱一致;
- ids(Y):包含 UDF 函數(shù)實現(xiàn)的動態(tài)鏈接庫的庫文件絕對路徑(指的是庫文件在當(dāng)前客戶端所在主機上的保存路徑,通常是指向一個 .so 文件),這個路徑需要用英文單引號或英文雙引號括起來;
- typename(Z):此函數(shù)計算結(jié)果的數(shù)據(jù)類型,與上文中 udfNormalFunc 的 itype 參數(shù)不同,這里不是使用數(shù)字表示法,而是直接寫類型名稱即可;
- B:中間計算結(jié)果的緩沖區(qū)大小,單位是字節(jié),最小 0,最大 512,如果不使用可以不設(shè)置。
例如,如下語句可以把 add_one.so 創(chuàng)建為系統(tǒng)中可用的 UDF:
CREATE FUNCTION add_one AS "/home/taos/udf_example/add_one.so" OUTPUTTYPE INT; -
創(chuàng)建聚合函數(shù):
CREATE AGGREGATE FUNCTION ids(X) AS ids(Y) OUTPUTTYPE typename(Z) [ BUFSIZE B ];- ids(X):聚合函數(shù)未來在 SQL 指令中被調(diào)用時的函數(shù)名,必須與函數(shù)實現(xiàn)中 udfNormalFunc 的實際名稱一致;
- ids(Y):包含 UDF 函數(shù)實現(xiàn)的動態(tài)鏈接庫的庫文件絕對路徑(指的是庫文件在當(dāng)前客戶端所在主機上的保存路徑,通常是指向一個 .so 文件),這個路徑需要用英文單引號或英文雙引號括起來;
- typename(Z):此函數(shù)計算結(jié)果的數(shù)據(jù)類型,與上文中 udfNormalFunc 的 itype 參數(shù)不同,這里不是使用數(shù)字表示法,而是直接寫類型名稱即可;
- B:中間計算結(jié)果的緩沖區(qū)大小,單位是字節(jié),最小 0,最大 512,如果不使用可以不設(shè)置。
關(guān)于中間計算結(jié)果的使用,可以參考示例程序demo.c
例如,如下語句可以把 demo.so 創(chuàng)建為系統(tǒng)中可用的 UDF:
CREATE AGGREGATE FUNCTION demo AS "/home/taos/udf_example/demo.so" OUTPUTTYPE DOUBLE bufsize 14;
管理 UDF
- 刪除指定名稱的用戶定義函數(shù):
DROP FUNCTION ids(X);- ids(X):此參數(shù)的含義與 CREATE 指令中的 ids(X) 參數(shù)一致,也即要刪除的函數(shù)的名字,例如
DROP FUNCTION add_one;。
- ids(X):此參數(shù)的含義與 CREATE 指令中的 ids(X) 參數(shù)一致,也即要刪除的函數(shù)的名字,例如
- 顯示系統(tǒng)中當(dāng)前可用的所有 UDF:
SHOW FUNCTIONS;
調(diào)用 UDF
在 SQL 指令中,可以直接以在系統(tǒng)中創(chuàng)建 UDF 時賦予的函數(shù)名來調(diào)用用戶定義函數(shù)。例如:
SELECT X(c) FROM table/stable;
表示對名為 c 的數(shù)據(jù)列調(diào)用名為 X 的用戶定義函數(shù)。SQL 指令中用戶定義函數(shù)可以配合 WHERE 等查詢特性來使用。
UDF 的一些使用限制
在當(dāng)前版本下,使用 UDF 存在如下這些限制:
- 在創(chuàng)建和調(diào)用 UDF 時,服務(wù)端和客戶端都只支持 Linux 操作系統(tǒng);
- UDF 不能與系統(tǒng)內(nèi)建的 SQL 函數(shù)混合使用,暫不支持在一條 SQL 語句中使用多個不同名的 UDF ;
- UDF 只支持以單個數(shù)據(jù)列作為輸入;
- UDF 只要創(chuàng)建成功,就會被持久化存儲到 MNode 節(jié)點中;
- 無法通過 RESTful 接口來創(chuàng)建 UDF;
- UDF 在 SQL 中定義的函數(shù)名,必須與 .so 庫文件實現(xiàn)中的接口函數(shù)名前綴保持一致,也即必須是 udfNormalFunc 的名稱,而且不可與 TDengine 中已有的內(nèi)建 SQL 函數(shù)重名。
代碼附件
add_one.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
typedef struct SUdfInit{
int maybe_null; /* 1 if function can return NULL */
int decimals; /* for real functions */
long long length; /* For string functions */
char *ptr; /* free pointer for function data */
int const_item; /* 0 if result is independent of arguments */
} SUdfInit;
void add_one(char* data, short itype, short ibytes, int numOfRows, long long* ts, char* dataOutput, char* interBUf, char* tsOutput,
int* numOfOutput, short otype, short obytes, SUdfInit* buf) {
int i;
int r = 0;
// printf("add_one input data:%p, type:%d, rows:%d, ts:%p,%lld, dataoutput:%p, tsOutput:%p, numOfOutput:%p, buf:%p\n", data, itype, numOfRows, ts, *ts, dataOutput, tsOutput, numOfOutput, buf);
if (itype == 4) {
for(i=0;i<numOfRows;++i) {
// printf("input %d - %d", i, *((int *)data + i));
*((int *)dataOutput+i)=*((int *)data + i) + 1;
// printf(", output %d\n", *((int *)dataOutput+i));
if (tsOutput) {
*(long long*)tsOutput=1000000;
}
}
*numOfOutput=numOfRows;
// printf("add_one out, numOfOutput:%d\n", *numOfOutput);
}
}
abs_max.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <inttypes.h>
typedef struct SUdfInit{
int maybe_null; /* 1 if function can return NULL */
int decimals; /* for real functions */
int64_t length; /* For string functions */
char *ptr; /* free pointer for function data */
int const_item; /* 0 if result is independent of arguments */
} SUdfInit;
#define TSDB_DATA_INT_NULL 0x80000000L
#define TSDB_DATA_BIGINT_NULL 0x8000000000000000L
void abs_max(char* data, short itype, short ibytes, int numOfRows, int64_t* ts, char* dataOutput, char* interBuf, char* tsOutput,
int* numOfOutput, short otype, short obytes, SUdfInit* buf) {
int i;
int64_t r = 0;
// printf("abs_max input data:%p, type:%d, rows:%d, ts:%p, %" PRId64 ", dataoutput:%p, tsOutput:%p, numOfOutput:%p, buf:%p\n", data, itype, numOfRows, ts, *ts, dataOutput, tsOutput, numOfOutput, buf);
if (itype == 5) {
r=*(int64_t *)dataOutput;
*numOfOutput=0;
for(i=0;i<numOfRows;++i) {
if (*((int64_t *)data + i) == TSDB_DATA_BIGINT_NULL) {
continue;
}
*numOfOutput=1;
//int64_t v = abs(*((int64_t *)data + i));
int64_t v = *((int64_t *)data + i);
if (v < 0) {
v = 0 - v;
}
if (v > r) {
r = v;
}
}
*(int64_t *)dataOutput=r;
// printf("abs_max out, dataoutput:%" PRId64", numOfOutput:%d\n", *(int64_t *)dataOutput, *numOfOutput);
}else {
*numOfOutput=0;
}
}
void abs_max_finalize(char* dataOutput, char* interBuf, int* numOfOutput, SUdfInit* buf) {
int i;
//int64_t r = 0;
// printf("abs_max_finalize dataoutput:%p:%d, numOfOutput:%d, buf:%p\n", dataOutput, *dataOutput, *numOfOutput, buf);
// *numOfOutput=1;
// printf("abs_max finalize, dataoutput:%" PRId64", numOfOutput:%d\n", *(int64_t *)dataOutput, *numOfOutput);
}
void abs_max_merge(char* data, int32_t numOfRows, char* dataOutput, int32_t* numOfOutput, SUdfInit* buf) {
int64_t r = 0;
if (numOfRows > 0) {
r = *((int64_t *)data);
}
// printf("abs_max_merge numOfRows:%d, dataoutput:%p, buf:%p\n", numOfRows, dataOutput, buf);
for (int i = 1; i < numOfRows; ++i) {
// printf("abs_max_merge %d - %" PRId64"\n", i, *((int64_t *)data + i));
if (*((int64_t*)data + i) > r) {
r= *((int64_t*)data + i);
}
}
*(int64_t*)dataOutput=r;
if (numOfRows > 0) {
*numOfOutput=1;
} else {
*numOfOutput=0;
}
// printf("abs_max_merge, dataoutput:%" PRId64", numOfOutput:%d\n", *(int64_t *)dataOutput, *numOfOutput);
}
int abs_max_init(SUdfInit* buf) {
// printf("abs_max init\n");
return 0;
}
void abs_max_destroy(SUdfInit* buf) {
// printf("abs_max destroy\n");
}
demo.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
typedef struct SUdfInit{
int maybe_null; /* 1 if function can return NULL */
int decimals; /* for real functions */
long long length; /* For string functions */
char *ptr; /* free pointer for function data */
int const_item; /* 0 if result is independent of arguments */
} SUdfInit;
typedef struct SDemo{
double sum;
int num;
short otype;
}SDemo;
#define FLOAT_NULL 0x7FF00000 // it is an NAN
#define DOUBLE_NULL 0x7FFFFF0000000000L // it is an NAN
void demo(char* data, short itype, short ibytes, int numOfRows, long long* ts, char* dataOutput, char* interBuf, char* tsOutput,
int* numOfOutput, short otype, short obytes, SUdfInit* buf) {
int i;
double r = 0;
SDemo *p = (SDemo *)interBuf;
SDemo *q = (SDemo *)dataOutput;
printf("demo input data:%p, type:%d, rows:%d, ts:%p,%lld, dataoutput:%p, interBUf:%p, tsOutput:%p, numOfOutput:%p, buf:%p\n", data, itype, numOfRows, ts, *ts, dataOutput, interBuf, tsOutput, numOfOutput, buf);
for(i=0;i<numOfRows;++i) {
if (itype == 4) {
r=*((int *)data+i);
} else if (itype == 6) {
r=*((float *)data+i);
} else if (itype == 7) {
r=*((double *)data+i);
}
p->sum += r*r;
}
p->otype = otype;
p->num += numOfRows;
q->sum = p->sum;
q->num = p->num;
q->otype = p->otype;
*numOfOutput=1;
printf("demo out, sum:%f, num:%d, numOfOutput:%d\n", p->sum, p->num, *numOfOutput);
}
void demo_merge(char* data, int32_t numOfRows, char* dataOutput, int32_t* numOfOutput, SUdfInit* buf) {
int i;
SDemo *p = (SDemo *)data;
SDemo res = {0};
printf("demo_merge input data:%p, rows:%d, dataoutput:%p, numOfOutput:%p, buf:%p\n", data, numOfRows, dataOutput, numOfOutput, buf);
for(i=0;i<numOfRows;++i) {
res.sum += p->sum * p->sum;
res.num += p->num;
p++;
}
p->sum = res.sum;
p->num = res.num;
*numOfOutput=1;
printf("demo out, sum:%f, num:%d, numOfOutput:%d\n", p->sum, p->num, *numOfOutput);
}
void demo_finalize(char* dataOutput, char* interBuf, int* numOfOutput, SUdfInit* buf) {
SDemo *p = (SDemo *)interBuf;
printf("demo_finalize interbuf:%p, numOfOutput:%p, buf:%p, sum:%f, num:%d\n", interBuf, numOfOutput, buf, p->sum, p->num);
if (p->otype == 6) {
if (p->num != 30000) {
*(unsigned int *)dataOutput = FLOAT_NULL;
} else {
*(float *)dataOutput = (float)(p->sum / p->num);
}
printf("finalize values:%f\n", *(float *)dataOutput);
} else if (p->otype == 7) {
if (p->num != 30000) {
*(unsigned long long *)dataOutput = DOUBLE_NULL;
} else {
*(double *)dataOutput = (double)(p->sum / p->num);
}
printf("finalize values:%f\n", *(double *)dataOutput);
}
*numOfOutput=1;
printf("demo finalize, numOfOutput:%d\n", *numOfOutput);
}
int demo_init(SUdfInit* buf) {
printf("demo init\n");
return 0;
}
void demo_destroy(SUdfInit* buf) {
printf("demo destroy\n");
}

