Kafka作為一個流行的消息隊(duì)列,以分布式高性能,高可靠性等特點(diǎn)已經(jīng)在多種場景下廣泛使用。在工業(yè)互聯(lián)網(wǎng)、物聯(lián)網(wǎng)時序數(shù)據(jù)存儲的解決方案中也有大量用到。
但在實(shí)際部署過程中,可能會因?yàn)榕渲迷驅(qū)е陆?jīng)過Kafka的數(shù)據(jù)在接收方產(chǎn)生亂序,給后續(xù)處理環(huán)節(jié)帶來排序等工作,造成不必要的處理開銷,降低系統(tǒng)的處理性能和額外排序的工作。
其實(shí)可以通過合理的規(guī)劃設(shè)計(jì)Kafka的配置和方法來避免消息在通過Kafka后亂序的產(chǎn)生,只需要遵循以下原則即可:
對于需要確保順序的一條消息流,發(fā)送到同一個partition上去
Kafka可以在一個topic下設(shè)置多個partition來實(shí)現(xiàn)分布式和負(fù)載均衡,由同一consumer group下的不同consumer去消費(fèi);這樣的機(jī)制能夠支持多線程分布式的處理,帶來高性能,但也帶來了同一消息流走了不同路徑的可能性,如果沒有針對性的規(guī)劃,從架構(gòu)上就無法保證消息的順序。如下圖所示,對于同一個topic的一條消息流,寫入不同的partition,就會產(chǎn)生多條路徑。

為了確保一條消息流的數(shù)據(jù)能夠嚴(yán)格按照時間順序被消費(fèi),則必須遵循一條路徑的原則,這樣才能實(shí)現(xiàn)FIFO(First In First Out)。
根據(jù)Kafka的文檔描述,把哪條記錄發(fā)到哪個partition,是由producer負(fù)責(zé):
Producers
Producers publish data to the topics of their choice. The producer is responsible for choosing which record to assign to which partition within the topic. This can be done in a round-robin fashion simply to balance load or it can be done according to some semantic partition function (say based on some key in the record). More on the use of partitioning in a second!
可見,Kafka已考慮到了確保消息順序的需求,提供了接口來實(shí)現(xiàn)根據(jù)指定的key值發(fā)送到同一partition的方法。 可以看看Kafka相關(guān)源碼:
class DefaultPartitioner(props: VerifiableProperties = null) extends Partitioner {
private val random = new java.util.Random
def partition(key: Any, numPartitions: Int): Int = {
Utils.abs(key.hashCode) % numPartitions
}
}
從源碼上來看,Kafka支持通過Key的hash值對partition的數(shù)量求余來實(shí)現(xiàn)基于Key的分配partition方法。因此我們只要對不同時序消息流,找到他們不同的key,并且這個key是不會發(fā)生變化的,那么就能在發(fā)送到Kafka的時候,確保每一條消息流發(fā)送到同一個partition,走唯一的路徑。因此我們可以通過指定Key的方式,來實(shí)現(xiàn)這種嚴(yán)格的時序關(guān)系。
具體實(shí)現(xiàn)方法
在TDengine Database的應(yīng)用場景下,我們通常會把某一類設(shè)備(超級表)劃分為一個topic。對于每個設(shè)備,會單獨(dú)建表,一個設(shè)備產(chǎn)生的數(shù)據(jù),會只放到一張表里。對于設(shè)備產(chǎn)生的原始數(shù)據(jù),就需要在這個數(shù)據(jù)中找一個能夠代表這個數(shù)據(jù)的ID,而且不會發(fā)生變化的字段,作為Key值,在發(fā)送給Kafka時,帶上這個Key值。這樣就能確保該設(shè)備的所有數(shù)據(jù)流經(jīng)過Kafka時,走唯一的路徑。這個ID或key往往是設(shè)備具有唯一性的設(shè)備編碼,這個編碼不僅可以作為Kafka的Key,也可以作為TDengine Database里的表名。
具體實(shí)現(xiàn)非常簡單,在producer發(fā)送數(shù)據(jù)時,選擇一個key,通過KeyedMessage方法生成消息,然后send。以Java為例,其他語言可以從Kafka文檔中找到相同功能的接口:
producer.send(new KeyedMessage<String, String>(topic,key,record))
這個接口,可以讓使用者非常方便無需增加代碼的情況下來實(shí)現(xiàn)指定每個消息流綁定一個partition的結(jié)果。用戶也可以通過自己實(shí)現(xiàn)一個partition的算法,來實(shí)現(xiàn)更精準(zhǔn)的partition分配控制。具體實(shí)現(xiàn)可以參考”kafka 指定partition生產(chǎn),消費(fèi)“中的介紹。



互聯(lián)網(wǎng).png)



-1.png)







證.png)


伙伴.png)
伙伴.png)
伙伴.png)



