六月婷婷AV,国产偷窥猎奇福利二区,日韩三级片。,好吊色网站,日韩成人中文在线视频,国产亚洲午夜啪啪,亚洲欧美另类国产精品,国产成人av1,任你艹在线观看

一文學會如何使用 TDengine 3.0 中的流式計算

小 T 導讀:TDengine 3.0 引入了全新的流式計算引擎,既支持時間驅動的流式計算,也支持事件驅動的流式計算。本文將對新的流式計算引擎的語法規(guī)則進行詳細介紹,方便開發(fā)者及企業(yè)使用。

TDengine 是一款開源、云原生的時序數(shù)據(jù)庫(Time Series Database,TSDB),專為物聯(lián)網(wǎng)、工業(yè)互聯(lián)網(wǎng)、金融、IT 運維監(jiān)控等場景設計并優(yōu)化。近期發(fā)布的 TDengine 3.0,全新的流式計算引擎是其一大亮點。

TDengine 3.0 的流式計算引擎提供了實時處理寫入的數(shù)據(jù)流能力,使用 SQL 定義實時流變換,當數(shù)據(jù)被寫入流的源表后,數(shù)據(jù)會被以定義的方式自動處理,并根據(jù)定義的觸發(fā)模式向目的表推送結果。它提供了替代復雜流處理系統(tǒng)的輕量級解決方案,并能夠在高吞吐的數(shù)據(jù)寫入情況下,提供毫秒級的計算結果延遲。

流式計算可以包含數(shù)據(jù)過濾,標量函數(shù)計算(含 UDF),以及窗口聚合(支持滑動窗口、會話窗口與狀態(tài)窗口),可以以超級表、子表、普通表為源表,寫入到目的超級表。在創(chuàng)建流時,目的超級表將被自動創(chuàng)建,隨后新插入的數(shù)據(jù)會被流定義的方式處理并寫入其中,通過 partition by 子句,可以以表名或標簽劃分 partition,不同的 partition 將寫入到目的超級表的不同子表。

TDengine 的流式計算能夠支持分布在多個 vnode 中的超級表聚合;還能夠處理亂序數(shù)據(jù)的寫入:它提供了 watermark 機制以度量容忍數(shù)據(jù)亂序的程度,并提供了 ignore expired 配置項以決定亂序數(shù)據(jù)的處理策略——丟棄或者重新計算。
下面我們就一起看一下 TDengine 中流式計算相關的 SQL 語法。

流式計算的創(chuàng)建、刪除與展示

創(chuàng)建

CREATE STREAM [IF NOT EXISTS] stream_name [stream_options] INTO stb_name AS subquery
stream_options: {
 TRIGGER    [AT_ONCE | WINDOW_CLOSE | MAX_DELAY time]
 WATERMARK   time
}

其中 subquery 是 select 普通查詢語法的子集:

subquery: SELECT select_list
    from_clause
    [WHERE condition]
    [PARTITION BY tag_list]
    [window_clause]

支持會話窗口、狀態(tài)窗口與滑動窗口,其中,會話窗口與狀態(tài)窗口搭配超級表時必須與 partition by tbname 一起使用:

window_clause: {
    SESSION(ts_col, tol_val)
  | STATE_WINDOW(col)
  | INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)]
}

在上述語句中,SESSION 是會話窗口,tol_val 是時間間隔的最大范圍。在 tol_val 時間間隔范圍內(nèi)的數(shù)據(jù)都屬于同一個窗口,如果有連續(xù)兩條數(shù)據(jù)的時間超過 tol_val,則自動開啟下一個窗口。窗口的定義與時序數(shù)據(jù)特色查詢中的定義完全相同,詳見 TDengine 特色查詢

例如,使用如下語句創(chuàng)建流式計算,同時自動創(chuàng)建名為 avg_vol 的超級表,此流計算以一分鐘為時間窗口、30 秒為前向增量統(tǒng)計這些電表的平均電壓,并將來自 meters 表的數(shù)據(jù)的計算結果寫入 avg_vol 表,不同 partition 的數(shù)據(jù)會分別創(chuàng)建子表并寫入不同子表。

CREATE STREAM avg_vol_s INTO avg_vol AS
SELECT _wstartts, count(*), avg(voltage) FROM meters PARTITION BY tbname INTERVAL(1m) SLIDING(30s);

刪除

DROP STREAM [IF NOT EXISTS] stream_name;

僅刪除流式計算任務,由流式計算寫入的數(shù)據(jù)不會被刪除。

展示

SHOW STREAMS;

若要展示更詳細的信息,可以使用:

SELECT * from performance_schema.`perf_streams`;

流式計算的 partition

我們可以使用 PARTITION BY TBNAME 或 PARTITION BY tag 對一個流進行多分區(qū)的計算,每個分區(qū)的時間線與時間窗口是獨立的,會各自聚合,并寫入到目的表中的不同子表。如果不帶 PARTITION BY 選項,那所有的數(shù)據(jù)將寫入到一張子表。

流式計算創(chuàng)建的超級表有唯一的 tag 列 groupId,每個 partition 會被分配唯一 groupId。與 schemaless 寫入一致,我們通過 MD5 計算子表名,并自動創(chuàng)建它。

流式計算的觸發(fā)模式

在創(chuàng)建流時,可以通過 TRIGGER 指令指定流式計算的觸發(fā)模式。

對于非窗口計算,流式計算的觸發(fā)是實時的;對于窗口計算,目前提供如下 3 種觸發(fā)模式:

  1. AT_ONCE:寫入立即觸發(fā)
  2. WINDOW_CLOSE:窗口關閉時觸發(fā)(窗口關閉由事件時間決定,可配合 watermark 使用)
  3. MAX_DELAY time:若窗口關閉,則觸發(fā)計算。若窗口未關閉,且未關閉時長超過 max delay 指定的時間,則觸發(fā)計算。

由于窗口關閉是由事件時間所決定的,如果因事件流中斷、或持續(xù)延遲導致事件時間無法更新,可能無法得到最新的計算結果。因此,流式計算提供了以事件時間結合處理時間計算的 MAX_DELAY 觸發(fā)模式,MAX_DELAY 模式在窗口關閉時會立即觸發(fā)計算。此外,當數(shù)據(jù)寫入后,計算觸發(fā)的時間超過 max delay 指定的時間,則立即觸發(fā)計算。

流式計算的窗口關閉

流式計算以事件時間(插入記錄中的時間戳主鍵)為基準計算窗口關閉,而非以 TDengine 服務器的時間,這樣可以避免客戶端與服務器時間不一致帶來的問題,有效解決亂序數(shù)據(jù)寫入等難題。同時,流式計算還提供了 watermark 來定義容忍的亂序程度。
在創(chuàng)建流時,我們可以在 stream_option 中指定 watermark,它定義了數(shù)據(jù)亂序的容忍上界。流式計算通過 watermark 來度量對亂序數(shù)據(jù)的容忍程度,watermark 默認為 0。

T = 最新事件時間 – watermark

每次寫入的數(shù)據(jù)都會以上述公式更新窗口關閉時間,并將窗口結束時間 < T 的所有打開的窗口關閉,若觸發(fā)模式為 WINDOW_CLOSE 或 MAX_DELAY,則推送窗口聚合結果。

TDengine Database

在上圖中,縱軸表示不同時刻,對于不同時刻,我們畫出其對應的 TDengine 收到的數(shù)據(jù),即為橫軸。已知橫軸上的數(shù)據(jù)點表示已經(jīng)收到的數(shù)據(jù),其中藍色的點表示事件時間(即數(shù)據(jù)中的時間戳主鍵)最后的數(shù)據(jù),該數(shù)據(jù)點減去定義的 watermark 時間,就得到亂序容忍的上界 T。所有結束時間小于 T 的窗口都將被關閉(圖中以灰色方框標記)。

在 T2 時刻,亂序數(shù)據(jù)(黃色的點)到達 TDengine,由于有 watermark 的存在,這些數(shù)據(jù)進入的窗口并未被關閉,因此可以被正確處理。在 T3 時刻,最新事件到達,T 向后推移超過了第二個窗口關閉的時間,該窗口被關閉,亂序數(shù)據(jù)被正確處理。

但要注意,在 window_close 或 max_delay 模式下,窗口關閉直接影響推送結果。在 at_once 模式下,窗口關閉只與內(nèi)存占用有關。

流式計算的過期數(shù)據(jù)處理策略

對于已關閉的窗口,再次落入該窗口中的數(shù)據(jù)就會被標記為過期數(shù)據(jù)。TDengine 對于過期數(shù)據(jù)提供兩種處理方式,由 IGNORE EXPIRED 選項指定:

  1. 重新計算,即 IGNORE EXPIRED 0:默認配置,從 TSDB 中重新查找對應窗口的所有數(shù)據(jù)并重新計算得到最新結果
  2. 直接丟棄,即 IGNORE EXPIRED 1:忽略過期數(shù)據(jù)

無論在哪種模式下,watermark 都應該被妥善設置,來得到正確結果(直接丟棄模式)或避免頻繁觸發(fā)重算帶來的性能開銷(重新計算模式)。

示例

企業(yè)電表的數(shù)據(jù)經(jīng)常都是成百上千億條的,想要將這些分散、凌亂的數(shù)據(jù)清洗或轉換都需要比較長的時間,很難做到高效性和實時性。在如下例子中,通過 TDengine 流計算可以將電表電壓大于 220V 的數(shù)據(jù)清洗掉,然后以 5 秒為窗口整合并計算出每個窗口中電流的最大值,最后將結果輸出到指定的數(shù)據(jù)表中。

創(chuàng)建 Database 和原始數(shù)據(jù)表

首先準備數(shù)據(jù),完成建庫、建一張超級表和多張子表操作:

DROP DATABASE IF EXISTS power;
CREATE DATABASE power;
USE power;

CREATE STABLE meters (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);

CREATE TABLE d1001 USING meters TAGS ("California.SanFrancisco", 2);
CREATE TABLE d1002 USING meters TAGS ("California.SanFrancisco", 3);
CREATE TABLE d1003 USING meters TAGS ("California.LosAngeles", 2);
CREATE TABLE d1004 USING meters TAGS ("California.LosAngeles", 3);

創(chuàng)建流

create stream current_stream into current_stream_output_stb as select _wstart as start, _wend as end, max(current) as max_current from meters where voltage <= 220 interval (5s);

寫入數(shù)據(jù)

insert into d1001 values("2018-10-03 14:38:05.000", 10.30000, 219, 0.31000);
insert into d1001 values("2018-10-03 14:38:15.000", 12.60000, 218, 0.33000);
insert into d1001 values("2018-10-03 14:38:16.800", 12.30000, 221, 0.31000);
insert into d1002 values("2018-10-03 14:38:16.650", 10.30000, 218, 0.25000);
insert into d1003 values("2018-10-03 14:38:05.500", 11.80000, 221, 0.28000);
insert into d1003 values("2018-10-03 14:38:16.600", 13.40000, 223, 0.29000);
insert into d1004 values("2018-10-03 14:38:05.000", 10.80000, 223, 0.29000);
insert into d1004 values("2018-10-03 14:38:06.500", 11.50000, 221, 0.35000);

查詢以觀察結果

taos> select start, end, max_current from current_stream_output_stb;
          start          |           end           |     max_current      |
===========================================================================
 2018-10-03 14:38:05.000 | 2018-10-03 14:38:10.000 |             10.30000 |
 2018-10-03 14:38:15.000 | 2018-10-03 14:38:20.000 |             12.60000 |
Query OK, 2 rows in database (0.018762s)

寫在最后

如果大家能夠運用好 TDengine 3.0 提供的流計算引擎,就不需要再部署其他的第三方流處理系統(tǒng),這樣一來,不僅降低了系統(tǒng)的復雜度,還大大減少了研發(fā)和運維成本。在實際操作中應用 TDengine 流計算引擎時,上述的詳細語法會帶給你很多幫助,如果還產(chǎn)生了其他更為復雜的應用問題,你也可以進入 TDengine 社區(qū)向技術人員尋求幫助。