高級功能
連續(xù)查詢(Continuous Query)
連續(xù)查詢是 TDengine 定期自動執(zhí)行的查詢,采用滑動窗口的方式進行計算,是一種簡化的時間驅動的流式計算。針對庫中的表或超級表,TDengine 可提供定期自動執(zhí)行的連續(xù)查詢,用戶可讓 TDengine 推送查詢的結果,也可以將結果再寫回到 TDengine 中。每次執(zhí)行的查詢是一個時間窗口,時間窗口隨著時間流動向前滑動。在定義連續(xù)查詢的時候需要指定時間窗口(time window, 參數interval)大小和每次前向增量時間(forward sliding times, 參數sliding)。
TDengine 的連續(xù)查詢采用時間驅動模式,可以直接使用 TAOS SQL 進行定義,不需要額外的操作。使用連續(xù)查詢,可以方便快捷地按照時間窗口生成結果,從而對原始采集數據進行降采樣(down sampling)。用戶通過 TAOS SQL 定義連續(xù)查詢以后,TDengine 自動在最后的一個完整的時間周期末端拉起查詢,并將計算獲得的結果推送給用戶或者寫回 TDengine。
TDengine 提供的連續(xù)查詢與普通流計算中的時間窗口計算具有以下區(qū)別:
- 不同于流計算的實時反饋計算結果,連續(xù)查詢只在時間窗口關閉以后才開始計算。例如時間周期是 1 天,那么當天的結果只會在 23:59:59 以后才會生成。
- 如果有歷史記錄寫入到已經計算完成的時間區(qū)間,連續(xù)查詢并不會重新進行計算,也不會重新將結果推送給用戶。對于寫回 TDengine 的模式,也不會更新已經存在的計算結果。
- 使用連續(xù)查詢推送結果的模式,服務端并不緩存客戶端計算狀態(tài),也不提供 Exactly-Once 的語意保證。如果用戶的應用端崩潰,再次拉起的連續(xù)查詢將只會從再次拉起的時間開始重新計算最近的一個完整的時間窗口。如果使用寫回模式,TDengine 可確保數據寫回的有效性和連續(xù)性。
使用連續(xù)查詢
下面以智能電表場景為例介紹連續(xù)查詢的具體使用方法。假設我們通過下列 SQL 語句創(chuàng)建了超級表和子表:
create table meters (ts timestamp, current float, voltage int, phase float) tags (location binary(64), groupId int);
create table D1001 using meters tags ("Beijing.Chaoyang", 2);
create table D1002 using meters tags ("Beijing.Haidian", 2);
...
我們已經知道,可以通過下面這條 SQL 語句以一分鐘為時間窗口、30秒為前向增量統(tǒng)計這些電表的平均電壓。
select avg(voltage) from meters interval(1m) sliding(30s);
每次執(zhí)行這條語句,都會重新計算所有數據。 如果需要每隔 30 秒執(zhí)行一次來增量計算最近一分鐘的數據,可以把上面的語句改進成下面的樣子,每次使用不同的 startTime 并定期執(zhí)行:
select avg(voltage) from meters where ts > {startTime} interval(1m) sliding(30s);
這樣做沒有問題,但 TDengine 提供了更簡單的方法,只要在最初的查詢語句前面加上 create table {tableName} as 就可以了,例如:
create table avg_vol as select avg(voltage) from meters interval(1m) sliding(30s);
會自動創(chuàng)建一個名為 avg_vol 的新表,然后每隔 30 秒,TDengine 會增量執(zhí)行 as 后面的 SQL 語句,并將查詢結果寫入這個表中,用戶程序后續(xù)只要從 avg_vol 中查詢數據即可。例如:
taos> select * from avg_vol;
ts | avg_voltage_ |
===================================================
2020-07-29 13:37:30.000 | 222.0000000 |
2020-07-29 13:38:00.000 | 221.3500000 |
2020-07-29 13:38:30.000 | 220.1700000 |
2020-07-29 13:39:00.000 | 223.0800000 |
需要注意,查詢時間窗口的最小值是10毫秒,沒有時間窗口范圍的上限。
此外,TDengine 還支持用戶指定連續(xù)查詢的起止時間。如果不輸入開始時間,連續(xù)查詢將從第一條原始數據所在的時間窗口開始;如果沒有輸入結束時間,連續(xù)查詢將永久運行;如果用戶指定了結束時間,連續(xù)查詢在系統(tǒng)時間達到指定的時間以后停止運行。比如使用下面的SQL創(chuàng)建的連續(xù)查詢將運行一小時,之后會自動停止。
create table avg_vol as select avg(voltage) from meters where ts > now and ts <= now + 1h interval(1m) sliding(30s);
需要說明的是,上面例子中的 now 是指創(chuàng)建連續(xù)查詢的時間,而不是查詢執(zhí)行的時間,否則,查詢就無法自動停止了。另外,為了盡量避免原始數據延遲寫入導致的問題,TDengine 中連續(xù)查詢的計算有一定的延遲。也就是說,一個時間窗口過去后,TDengine 并不會立即計算這個窗口的數據,所以要稍等一會(一般不會超過 1 分鐘)才能查到計算結果。
管理連續(xù)查詢
用戶可在控制臺中通過 show streams 命令來查看系統(tǒng)中全部運行的連續(xù)查詢,并可以通過 kill stream 命令殺掉對應的連續(xù)查詢。后續(xù)版本會提供更細粒度和便捷的連續(xù)查詢管理命令。
數據訂閱(Publisher/Subscriber)
基于數據天然的時間序列特性,TDengine 的數據寫入(insert)與消息系統(tǒng)的數據發(fā)布(pub)邏輯上一致,均可視為系統(tǒng)中插入一條帶時間戳的新記錄。同時,TDengine 在內部嚴格按照數據時間序列單調遞增的方式保存數據。本質上來說,TDengine 中里每一張表均可視為一個標準的消息隊列。
TDengine 內嵌支持輕量級的消息訂閱與推送服務。使用系統(tǒng)提供的 API,用戶可使用普通查詢語句訂閱數據庫中的一張或多張表。訂閱的邏輯和操作狀態(tài)的維護均是由客戶端完成,客戶端定時輪詢服務器是否有新的記錄到達,有新的記錄到達就會將結果反饋到客戶。
TDengine 的訂閱與推送服務的狀態(tài)是客戶端維持,TDengine 服務器并不維持。因此如果應用重啟,從哪個時間點開始獲取最新數據,由應用決定。
TDengine 的 API 中,與訂閱相關的主要有以下三個:
taos_subscribe
taos_consume
taos_unsubscribe
這些API的文檔請見 C/C++ Connector,下面仍以智能電表場景為例介紹一下它們的具體用法(超級表和子表結構請參考上一節(jié)“連續(xù)查詢”),完整的示例代碼可以在 這里 找到。
如果我們希望當某個電表的電流超過一定限制(比如 10A)后能得到通知并進行一些處理, 有兩種方法:一是分別對每張子表進行查詢,每次查詢后記錄最后一條數據的時間戳,后續(xù)只查詢這個時間戳之后的數據:
select * from D1001 where ts > {last_timestamp1} and current > 10;
select * from D1002 where ts > {last_timestamp2} and current > 10;
...
這確實可行,但隨著電表數量的增加,查詢數量也會增加,客戶端和服務端的性能都會受到影響,當電表數增長到一定的程度,系統(tǒng)就無法承受了。
另一種方法是對超級表進行查詢。這樣,無論有多少電表,都只需一次查詢:
select * from meters where ts > {last_timestamp} and current > 10;
但是,如何選擇 last_timestamp 就成了一個新的問題。因為,一方面數據的產生時間(也就是數據時間戳)和數據入庫的時間一般并不相同,有時偏差還很大;另一方面,不同電表的數據到達 TDengine 的時間也會有差異。所以,如果我們在查詢中使用最慢的那臺電表的數據的時間戳作為 last_timestamp,就可能重復讀入其它電表的數據;如果使用最快的電表的時間戳,其它電表的數據就可能被漏掉。
TDengine 的訂閱功能為上面這個問題提供了一個徹底的解決方案。
首先是使用 taos_subscribe 創(chuàng)建訂閱:
TAOS_SUB* tsub = NULL;
if (async) {
// create an asynchronized subscription, the callback function will be called every 1s
tsub = taos_subscribe(taos, restart, topic, sql, subscribe_callback, &blockFetch, 1000);
} else {
// create an synchronized subscription, need to call 'taos_consume' manually
tsub = taos_subscribe(taos, restart, topic, sql, NULL, NULL, 0);
}
TDengine 中的訂閱既可以是同步的,也可以是異步的,上面的代碼會根據從命令行獲取的參數 async 的值來決定使用哪種方式。這里,同步的意思是用戶程序要直接調用 taos_consume 來拉取數據,而異步則由 API 在內部的另一個線程中調用 taos_consume,然后把拉取到的數據交給回調函數 subscribe_callback去處理。(注意,subscribe_callback 中不宜做較為耗時的操作,否則有可能導致客戶端阻塞等不可控的問題。)
參數 taos 是一個已經建立好的數據庫連接,在同步模式下無特殊要求。但在異步模式下,需要注意它不會被其它線程使用,否則可能導致不可預計的錯誤,因為回調函數在API的內部線程中被調用,而 TDengine 的部分 API 不是線程安全的。
參數 sql 是查詢語句,可以在其中使用where子句指定過濾條件。在我們的例子中,如果只想訂閱電流超過 10A 時的數據,可以這樣寫:
select * from meters where current > 10;
注意,這里沒有指定起始時間,所以會讀到所有時間的數據。如果只想從一天前的數據開始訂閱,而不需要更早的歷史數據,可以再加上一個時間條件:
select * from meters where ts > now - 1d and current > 10;
訂閱的 topic 實際上是它的名字,因為訂閱功能是在客戶端API中實現的,所以沒必要保證它全局唯一,但需要它在一臺客戶端機器上唯一。
如果名為 topic 的訂閱不存在,參數 restart 沒有意義;但如果用戶程序創(chuàng)建這個訂閱后退出,當它再次啟動并重新使用這個 topic 時,restart 就會被用于決定是從頭開始讀取數據,還是接續(xù)上次的位置進行讀取。本例中,如果 restart 是 true(非零值),用戶程序肯定會讀到所有數據。但如果這個訂閱之前就存在了,并且已經讀取了一部分數據,且 restart 是 false(0),用戶程序就不會讀到之前已經讀取的數據了。
taos_subscribe的最后一個參數是以毫秒為單位的輪詢周期。在同步模式下,如果前后兩次調用 taos_consume 的時間間隔小于此時間,taos_consume 會阻塞,直到間隔超過此時間。異步模式下,這個時間是兩次調用回調函數的最小時間間隔。
taos_subscribe 的倒數第二個參數用于用戶程序向回調函數傳遞附加參數,訂閱 API 不對其做任何處理,只原樣傳遞給回調函數。此參數在同步模式下無意義。
訂閱創(chuàng)建以后,就可以消費其數據了,同步模式下,示例代碼是下面的 else 部分:
if (async) {
getchar();
} else while(1) {
TAOS_RES* res = taos_consume(tsub);
if (res == NULL) {
printf("failed to consume data.");
break;
} else {
print_result(res, blockFetch);
getchar();
}
}
這里是一個 while 循環(huán),用戶每按一次回車鍵就調用一次 taos_consume,而 taos_consume 的返回值是查詢到的結果集,與 taos_use_result 完全相同,例子中使用這個結果集的代碼是函數 print_result:
void print_result(TAOS_RES* res, int blockFetch) {
TAOS_ROW row = NULL;
int num_fields = taos_num_fields(res);
TAOS_FIELD* fields = taos_fetch_fields(res);
int nRows = 0;
if (blockFetch) {
nRows = taos_fetch_block(res, &row);
for (int i = 0; i < nRows; i++) {
char temp[256];
taos_print_row(temp, row + i, fields, num_fields);
puts(temp);
}
} else {
while ((row = taos_fetch_row(res))) {
char temp[256];
taos_print_row(temp, row, fields, num_fields);
puts(temp);
nRows++;
}
}
printf("%d rows consumed.\n", nRows);
}
其中的 taos_print_row 用于處理訂閱到數據,在我們的例子中,它會打印出所有符合條件的記錄。而異步模式下,消費訂閱到的數據則顯得更為簡單:
void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) {
print_result(res, *(int*)param);
}
當要結束一次數據訂閱時,需要調用 taos_unsubscribe:
taos_unsubscribe(tsub, keep);
其第二個參數,用于決定是否在客戶端保留訂閱的進度信息。如果這個參數是false(0),那無論下次調用 taos_subscribe 時的 restart 參數是什么,訂閱都只能重新開始。另外,進度信息的保存位置是 {DataDir}/subscribe/ 這個目錄下,每個訂閱有一個與其 topic 同名的文件,刪掉某個文件,同樣會導致下次創(chuàng)建其對應的訂閱時只能重新開始。
代碼介紹完畢,我們來看一下實際的運行效果。假設:
- 示例代碼已經下載到本地
- TDengine 也已經在同一臺機器上安裝好
- 示例所需的數據庫、超級表、子表已經全部創(chuàng)建好
則可以在示例代碼所在目錄執(zhí)行以下命令來編譯并啟動示例程序:
make
./subscribe -sql='select * from meters where current > 10;'
示例程序啟動后,打開另一個終端窗口,啟動 TDengine 的 shell 向 D1001 插入一條電流為 12A 的數據:
$ taos
> use test;
> insert into D1001 values(now, 12, 220, 1);
這時,因為電流超過了 10A,您應該可以看到示例程序將它輸出到了屏幕上。您可以繼續(xù)插入一些數據觀察示例程序的輸出。
Java 使用數據訂閱功能
訂閱功能也提供了 Java 開發(fā)接口,相關說明請見 Java Connector。需要注意的是,目前 Java 接口沒有提供異步訂閱模式,但用戶程序可以通過創(chuàng)建 TimerTask 等方式達到同樣的效果。
下面以一個示例程序介紹其具體使用方法。它所完成的功能與前面介紹的 C 語言示例基本相同,也是訂閱數據庫中所有電流超過 10A 的記錄。
準備數據
# 創(chuàng)建 power 庫
taos> create database power;
# 切換庫
taos> use power;
# 創(chuàng)建超級表
taos> create table meters(ts timestamp, current float, voltage int, phase int) tags(location binary(64), groupId int);
# 創(chuàng)建表
taos> create table d1001 using meters tags ("Beijing.Chaoyang", 2);
taos> create table d1002 using meters tags ("Beijing.Haidian", 2);
# 插入測試數據
taos> insert into d1001 values("2020-08-15 12:00:00.000", 12, 220, 1),("2020-08-15 12:10:00.000", 12.3, 220, 2),("2020-08-15 12:20:00.000", 12.2, 220, 1);
taos> insert into d1002 values("2020-08-15 12:00:00.000", 9.9, 220, 1),("2020-08-15 12:10:00.000", 10.3, 220, 1),("2020-08-15 12:20:00.000", 11.2, 220, 1);
# 從超級表 meters 查詢電流大于 10A 的記錄
taos> select * from meters where current > 10;
ts | current | voltage | phase | location | groupid |
===========================================================================================================
2020-08-15 12:10:00.000 | 10.30000 | 220 | 1 | Beijing.Haidian | 2 |
2020-08-15 12:20:00.000 | 11.20000 | 220 | 1 | Beijing.Haidian | 2 |
2020-08-15 12:00:00.000 | 12.00000 | 220 | 1 | Beijing.Chaoyang | 2 |
2020-08-15 12:10:00.000 | 12.30000 | 220 | 2 | Beijing.Chaoyang | 2 |
2020-08-15 12:20:00.000 | 12.20000 | 220 | 1 | Beijing.Chaoyang | 2 |
Query OK, 5 row(s) in set (0.004896s)
示例程序
public class SubscribeDemo {
private static final String topic = "topic-meter-current-bg-10";
private static final String sql = "select * from meters where current > 10";
public static void main(String[] args) {
Connection connection = null;
TSDBSubscribe subscribe = null;
try {
Class.forName("com.taosdata.jdbc.TSDBDriver");
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
String jdbcUrl = "jdbc:TAOS://127.0.0.1:6030/power?user=root&password=taosdata";
connection = DriverManager.getConnection(jdbcUrl, properties);
subscribe = ((TSDBConnection) connection).subscribe(topic, sql, true); // 創(chuàng)建訂閱
int count = 0;
while (count < 10) {
TimeUnit.SECONDS.sleep(1); // 等待1秒,避免頻繁調用 consume,給服務端造成壓力
TSDBResultSet resultSet = subscribe.consume(); // 消費數據
if (resultSet == null) {
continue;
}
ResultSetMetaData metaData = resultSet.getMetaData();
while (resultSet.next()) {
int columnCount = metaData.getColumnCount();
for (int i = 1; i <= columnCount; i++) {
System.out.print(metaData.getColumnLabel(i) + ": " + resultSet.getString(i) + "\t");
}
System.out.println();
count++;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != subscribe)
subscribe.close(true); // 關閉訂閱
if (connection != null)
connection.close();
} catch (SQLException throwables) {
throwables.printStackTrace();
}
}
}
}
運行示例程序,首先,它會消費符合查詢條件的所有歷史數據:
# java -jar subscribe.jar
ts: 1597464000000 current: 12.0 voltage: 220 phase: 1 location: Beijing.Chaoyang groupid : 2
ts: 1597464600000 current: 12.3 voltage: 220 phase: 2 location: Beijing.Chaoyang groupid : 2
ts: 1597465200000 current: 12.2 voltage: 220 phase: 1 location: Beijing.Chaoyang groupid : 2
ts: 1597464600000 current: 10.3 voltage: 220 phase: 1 location: Beijing.Haidian groupid : 2
ts: 1597465200000 current: 11.2 voltage: 220 phase: 1 location: Beijing.Haidian groupid : 2
接著,使用 taos 客戶端向表中新增一條數據:
# taos
taos> use power;
taos> insert into d1001 values("2020-08-15 12:40:00.000", 12.4, 220, 1);
因為這條數據的電流大于10A,示例程序會將其消費:
ts: 1597466400000 current: 12.4 voltage: 220 phase: 1 location: Beijing.Chaoyang groupid: 2
緩存(Cache)
TDengine 采用時間驅動緩存管理策略(First-In-First-Out,FIFO),又稱為寫驅動的緩存管理機制。這種策略有別于讀驅動的數據緩存模式(Least-Recent-Used,LRU),直接將最近寫入的數據保存在系統(tǒng)的緩存中。當緩存達到臨界值的時候,將最早的數據批量寫入磁盤。一般意義上來說,對于物聯網數據的使用,用戶最為關心最近產生的數據,即當前狀態(tài)。TDengine 充分利用了這一特性,將最近到達的(當前狀態(tài))數據保存在緩存中。
TDengine 通過查詢函數向用戶提供毫秒級的數據獲取能力。直接將最近到達的數據保存在緩存中,可以更加快速地響應用戶針對最近一條或一批數據的查詢分析,整體上提供更快的數據庫查詢響應能力。從這個意義上來說,可通過設置合適的配置參數將 TDengine 作為數據緩存來使用,而不需要再部署額外的緩存系統(tǒng),可有效地簡化系統(tǒng)架構,降低運維的成本。需要注意的是,TDengine 重啟以后系統(tǒng)的緩存將被清空,之前緩存的數據均會被批量寫入磁盤,緩存的數據將不會像專門的 key-value 緩存系統(tǒng)再將之前緩存的數據重新加載到緩存中。
TDengine 分配固定大小的內存空間作為緩存空間,緩存空間可根據應用的需求和硬件資源配置。通過適當的設置緩存空間,TDengine 可以提供極高性能的寫入和查詢的支持。TDengine 中每個虛擬節(jié)點(virtual node)創(chuàng)建時分配獨立的緩存池。每個虛擬節(jié)點管理自己的緩存池,不同虛擬節(jié)點間不共享緩存池。每個虛擬節(jié)點內部所屬的全部表共享該虛擬節(jié)點的緩存池。
TDengine 將內存池按塊劃分進行管理,數據在內存塊里是以行(row)的形式存儲。一個 vnode 的內存池是在 vnode 創(chuàng)建時按塊分配好,而且每個內存塊按照先進先出的原則進行管理。在創(chuàng)建內存池時,塊的大小由系統(tǒng)配置參數 cache 決定;每個 vnode 中內存塊的數目則由配置參數blocks決定。因此對于一個 vnode,總的內存大小為:cache * blocks。一個 cache block 需要保證每張表能存儲至少幾十條以上記錄,才會有效率。
你可以通過函數 last_row() 快速獲取一張表或一張超級表的最后一條記錄,這樣很便于在大屏顯示各設備的實時狀態(tài)或采集值。例如:
select last_row(voltage) from meters where location='Beijing.Chaoyang';
該 SQL 語句將獲取所有位于北京朝陽區(qū)的電表最后記錄的電壓值。

