⑴ kafka 架構及其原理
kafka是一個 分布式 的、支持 分區的(partition )、多副本的 (replica ),基於 zookeeper 協調的 分布式消息系統。
從上面的描述中我們可以知道kafka的核心知識點:partition、replica
一個topic可以認為一個一類消息,每個topic將被分成多個partition。
在上圖中我們的生產者會決定發送到哪個 Partition:
如果沒有 Key 值則進行輪詢發送。
如果有 Key 值,對 Key 值進行 Hash,然後對分區數量取余,保證了同一個 Key 值的會被路由到同一個分區。(所有系統的partition都是同一個路數)
在上圖我們也可以看到,offset是跟partition走的,每個partition都有自己的offset。
總所周知,topic在物理層面以partition為分組,一個topic可以分成若干個partition,那麼topic以及partition又是怎麼存儲的呢?
其實partition還可以細分為logSegment,一個partition物理上由多個logSegment組成,那麼這些segment又是什麼呢?
LogSegment 文件由兩部分組成,分別為「.index」文件和「.log」文件,分別表示為 Segment 索引文件和數據文件。
這兩個文件的命令規則為:partition全局的第一個segment從0開始,後續每個segment文件名為上一個segment文件最後一條消息的offset值,數值大小為64位,20位數字字元長度,沒有數字用0填充,如下:
如上圖,「.index」索引文件存儲大量的元數據,「.log」數據文件存儲大量的消息,索引文件中的元數據指向對應數據文件中message的物理偏移地址。其中以「.index」索引文件中的元數據[3, 348]為例,在「.log」數據文件表示第3個消息,即在全局partition中表示170410+3=170413個消息,該消息的物理偏移地址為348。
那麼如何從partition中通過offset查找message呢?
以上圖為例,讀取offset=170418的消息,首先查找segment文件,其中00000000000000000000.index為最開始的文件,第二個文件為00000000000000170410.index(起始偏移為170410+1=170411),而第三個文件為00000000000000239430.index(起始偏移為239430+1=239431),所以這個offset=170418就落到了第二個文件之中。其他後續文件可以依次類推,以其實偏移量命名並排列這些文件,然後根據二分查找法就可以快速定位到具體文件位置。其次根據00000000000000170410.index文件中的[8,1325]定位到00000000000000170410.log文件中的1325的位置進行讀取。
要是讀取offset=170418的消息,從00000000000000170410.log文件中的1325的位置進行讀取,那麼怎麼知道何時讀完本條消息,否則就讀到下一條消息的內容了?
這個就需要聯繫到消息的物理結構了,消息都具有固定的物理結構,包括:嫌此offset(8 Bytes)、消息體的大小(4 Bytes)、crc32(4 Bytes)、magic(1 Byte)、attributes(1 Byte)、key length(4 Bytes)、key(K Bytes)、payload(N Bytes)等等欄位,可以確定一條消息的芹賀迅大小,即讀取到哪裡截止。
Kafka 的副本機制是多個服務端節點對其他節點的主題分區拍碰的日誌進行復制。當集群中的某個節點出現故障,訪問故障節點的請求會被轉移到其他正常節點(這一過程通常叫 Reblance)。
Kafka 每個主題的每個分區都有一個主副本以及 0 個或者多個副本,副本保持和主副本的數據同步,當主副本出故障時就會被替代。
當procer向leader發送數據時,可以通過request.required.acks參數來設置數據可靠性的級別:
在kafka系統中,會涉及到多處選舉機制,主要有這三方面:
⑵ kafka——消費者原理解析
kafka採用發布訂閱模式:一對多。發布訂閱模式又分兩種:
Kafka為這兩種模型提供了單一的消費者抽象模型: 消費者組 (consumer group)。 消費者用一個消費者組名標記自己。 一缺昌個發布在Topic上消息被分發給此消費者組中的一個消費者。 假如所有的消費者都在一個組中,那麼這就變成了隊列模型。 假如所有的消費者都在不同的組中,那麼就完全變成了發布-訂閱模型。 一個消費者組中消費者訂閱同一個Topic,每個消費者接受Topic的一部分分區的消息,從而實現對消費者的橫向擴展,對消息進行分流。
注意:當單個消費者無法跟上數據生成的速度,就可以增加更多的消費者分擔負載,每個消費者只處理部分partition的消息,從而實現單個應用程序的橫向伸縮。但是不要讓消費者的數量多於partition的數量,此時多餘的消費者會空閑。此外,Kafka還允許多個應用程序從同一個Topic讀取所有的消息,此時只要保證每個應用程序有自己的消費者組即可。
消費者組的概念就是:當有多個應用程序都需要從Kafka獲取消息時,讓每個app對應一個消費者組,從而使每個應用程序都能獲取一個或多個Topic的全部消息;在每個消費者組中,往消費者組中添加消費者來伸縮讀取能力和處理能力,消費者組中的每個消費者只處理每個Topic的一部分的消息,每個消費者對應一個線程。
在同一個群組中,無法讓一個線程運行多個消費者,也無法讓多線線程安全地共享一個消費者。按照規則,一個消費者使用一個線程,如果要在同一個消費者組中運行多個消費者,需要讓每個消費者運行在自己的線程中。最好把消費者的邏輯封裝在自己的對象中,然後使用java的ExecutorService啟動多個線程,使每個消費者運行在自己的線程上,可參考 https://www.confluent.io/blog
一個 consumer group 中有多個 consumer,一個 topic 有多個 partition,所以必然會涉及到 partition 的分配問題,即確定哪個 partition 由哪個 consumer 來消費。
關於如何設置partition值需要考慮的因素
Kafka 有兩種分配策略,一個是 RoundRobin,一個是 Range,默認為Range,當消費者組內消費者發生變化時,會觸發分區分配策略(方法重新分配)。
以上三種現象會使partition的所有權在消費者之間轉移,這樣的行為叫作再均衡。
再均衡的優點 :
再均尺慎衡的缺點 :
RoundRobin 輪詢方式將分區所有作為一個整體進行 Hash 排序,消費者組內分配分區個數最大差別為 1,是按照組來分的,可以解決多個消費者消費數據不均衡的問題。
但是,當消費者組內訂閱不同主題時,可能造成消費混亂,如下圖所示,Consumer0 訂閱主題 A,Consumer1 訂閱主題 B。
將 A、B 主題的分區排序後分配給消費者組,TopicB 分區中的數據可能 分配到 Consumer0 中。
Range 方式是按照主題來分的,不會產生輪詢方式的消費混亂問題。
但是,如下圖所示,Consumer0、Consumer1 同時訂閱了主題 A 和伏困扒 B,可能造成消息分配不對等問題,當消費者組內訂閱的主題越多,分區分配可能越不均衡。
由於 consumer 在消費過程中可能會出現斷電宕機等故障,consumer 恢復後,需要從故障前的位置繼續消費,所以 consumer 需要實時記錄自己消費到了哪個 offset,以便故障恢復後繼續消費。
consumer group +topic + partition 唯一確定一個offest
Kafka 0.9 版本之前,consumer 默認將 offset 保存在 Zookeeper 中,從 0.9 版本開始,
consumer 默認將 offset 保存在 Kafka 一個內置的 topic 中,該 topic 為__consumer_offsets。
你如果特別好奇,實在想看看offset什麼的,也可以執行下面操作:
修改配置文件 consumer.properties
再啟動一個消費者
當消費者崩潰或者有新的消費者加入,那麼就會觸發再均衡(rebalance),完成再均衡後,每個消費者可能會分配到新的分區,而不是之前處理那個,為了能夠繼續之前的工作,消費者需要讀取每個partition最後一次提交的偏移量,然後從偏移量指定的地方繼續處理。
case1:如果提交的偏移量小於客戶端處理的最後一個消息的偏移量,那麼處於兩個偏移量之間的消息就會被重復處理。
case2:如果提交的偏移量大於客戶端處理的最後一個消息的偏移量,那麼處於兩個偏移量之間的消息將會丟失。
自動提交的優點是方便,但是可能會重復處理消息
不足:broker在對提交請求作出回應之前,應用程序會一直阻塞,會限制應用程序的吞吐量。
因此,在消費者關閉之前一般會組合使用commitAsync和commitSync提交偏移量。
ConsumerRebalanceListener需要實現的兩個方法
下面的例子演示如何在失去partition的所有權之前通過onPartitionRevoked()方法來提交偏移量。
Consumer有個Rebalance的特性,即重新負載均衡,該特性依賴於一個協調器來實現。每當Consumer Group中有Consumer退出或有新的Consumer加入都會觸發Rebalance。
之所以要重新負載均衡,是為了將退出的Consumer所負責處理的數據再重新分配到組內的其他Consumer上進行處理。或當有新加入的Consumer時,將組內其他Consumer的負載壓力,重新進均勻分配,而不會說新加入一個Consumer就閑在那。
下面就用幾張圖簡單描述一下,各種情況觸發Rebalance時,組內成員是如何與協調器進行交互的。
Tips :圖中的Coordinator是協調器,而generation則類似於樂觀鎖中的版本號,每當成員入組成功就會更新,也是起到一個並發控制的作用。
參考:
https://blog.csdn.net/weixin_46122692/article/details/109270433
http://www.dockone.io/article/9956
https://www.cnblogs.com/sodawoods-blogs/p/8969774.html
https://blog.csdn.net/weixin_44367006/article/details/103075173
https://blog.51cto.com/zero01/2498017
⑶ kafka原理
Kafka 是一個消息系統,原本開發自 LinkedIn,用作 LinkedIn 的 活動流數據 (Activity Stream)和 運營數據 處理管道(Pipeline)的基礎。現在它已被多家公司作為多種類型的數據管道和消息系統使用。
Kafka通常用於應用中的兩種廣播類型:
由此可見,kafka給自身的定位並不只是一個消息系統,而是通過發布訂閱消息這種機制實現了流平台。
Kafka和大多數消息系統一樣,搭建好kafka集群後,生產者向特定的topic生產消息,而消費者通過訂閱topic,能夠准實時的拉取到該topic新消息,進行消費。如下圖:
kafka主要有以下特性:
尤其是高吞吐量,是他的最大賣點。kafka之所以能夠實現高吞吐量,是基於他自身優良的設計,及集群的可擴展性。
Kafka應用場景
一個Topic可以認為是一類消息,每個topic將被分成多個partition(區),每個partition在存儲層面是append log文件。任何發布到此partition的消息滲瞎模都會被直接追加到log文件的尾部,每條消息在文件中的位置稱為offset(偏移量),offset為一個long型數字,它用來唯一標記某個分區內的一條消息。kafka並沒有提供其它額外的索引機制來存儲offset,因為在kafka中幾乎不允許對消息進行「隨機讀寫」。
Kafka和其它消息系統有一個不一樣的設計,在consumer之上加了一層group。同一個group的consumer可以並行消費同一個topic的消息,但是同group的consumer,不會重復消費。這就好比多個consumer組成了一個團隊,一起幹活,當然幹活的速度就上來了。group中的consumer是如何配合協調的,其實和topic的分區相關聯,後面我們會詳細論述。
如果同一個topic需要被多次消費,可以通過設立多個consumer group來實現。每個group分別消費,互不影響。
在kafka中,一個partition中的消息只會被group中的一個consumer消費(同一時刻),每個group中consumer消息消費互相獨立神兆,我們可以認為一個group是一個"訂閱"者。一個Topic中的每個partions只會被一個"訂閱者"中的一個consumer消費,不過一個consumer可以同時消費多個partitions中的消息。
kafka只能保證一個partition中的消叢緩息被某個consumer消費時是順序的。事實上,從Topic角度來說,,當有多個partitions時,消息仍不是全局有序的。
⑷ Kafka架構及基本原理簡析
Kafka是一個由Scala和Java編寫的企業級的消息發布和訂閱系統,最早是由Linkedin公司開發,最終開源到Apache軟體基金會的項目。Kafka是一個分布式的,支持分區的,多副本的和多訂閱者的高吞吐量的消息系統,被廣泛應用在應用解耦、非同步處理、限流削峰和消息驅動等場景。本文將針對Kafka的架構和相關組件進行簡單的介紹。在介紹Kafka的架構之前,我們先了解一下Kafk的核心概念。
在詳細介紹Kafka的架構和基本組件之前,需要先了解一下Kafka的一些核心概念。
Procer: 消息的生產者,負責往Kafka集群中發送消息;
Consumer: 消息的消費者,主動從Kafka集群中拉取消息。
Consumer Group: 每個Consumer屬於一個特定的Consumer Group,新建Consumer的時候需要指定對應的Consumer Group ID。
Broker: Kafka集群中的服務實例,也稱之為節點,每個Kafka集群包含一個或者多個Broker(一個Broker就是一個伺服器或節點)。
Message: 通過Kafka集群進行傳遞的對象實體,存儲需要傳送的信息。
Topic: 消息的類別,主要用於對消息進行邏輯上的區分,每條發送到Kafka集群的消息都需要有一個指定的Topic,消費者根據Topic對指定的消息進行消費。
Partition: 消息的分區,Partition是一個物理上的概念,相當於一個文件夾,Kafka會為每個topic的每個分區創建一個文件夾,一個Topic的消息會存儲在一個或者多個Partition中。
Segment: 一個partition當中存在多個segment文件段(分段存儲),每個Segment分為兩部分,.log文件和 .index 文件,其中 .index 文件是索引文件,主要用於快速查詢.log 文件當中數據的偏移量位置;
.log文件: 存放Message的數據文件,在Kafka中把數據文件就叫做日誌文件。一個分區下面默認有n多個.log文件(分段存儲)。一個.log文件大掘悄塵默認1G,消息會不斷追加在.log文件中,當.log文件的大小超過1G的時候,會自動新建一個新的.log文件。
.index文件: 存放.log文件的索引數據,每個.index文件有一個對應同名的.log文件。
後面我們會對上面的一些核心概念進行更深入的介紹。在介紹完Kafka的核心概念之後,我們來看一下Kafka的對外提供的基本功能,組件及架構設計。
如上圖所示,Kafka主要包含四個主要的API組件:
1. Procer API
應用程序通過Procer API向Kafka集群發送一個或多個Topic的消息。
2. Consumer API
應用程序通過Consumer API,向Kafka集群訂閱一個或多個Topic的消運螞息,並處理這些Topic下接收到的消息。
3. Streams API
應用程序通過使用Streams API充當流處理器(Stream Processor),從一個或者多個判禪Topic獲取輸入流,並生產一個輸出流到一個或者多個Topic,能夠有效地將輸入流進行轉變後變成輸出流輸出到Kafka集群。
4. Connect API
允許應用程序通過Connect API構建和運行可重用的生產者或者消費者,能夠把kafka主題連接到現有的應用程序或數據系統。Connect實際上就做了兩件事情:使用Source Connector從數據源(如:DB)中讀取數據寫入到Topic中,然後再通過Sink Connector讀取Topic中的數據輸出到另一端(如:DB),以實現消息數據在外部存儲和Kafka集群之間的傳輸。
接下來我們將從Kafka的架構出發,重點介紹Kafka的主要組件及實現原理。Kafka支持消息持久化,消費端是通過主動拉取消息進行消息消費的,訂閱狀態和訂閱關系由客戶端負責維護,消息消費完後不會立刻刪除,會保留歷史消息,一般默認保留7天,因此可以通過在支持多訂閱者時,消息無需復制多分,只需要存儲一份就可以。下面將詳細介紹每個組件的實現原理。
1. Procer
Procer是Kafka中的消息生產者,主要用於生產帶有特定Topic的消息,生產者生產的消息通過Topic進行歸類,保存在Kafka 集群的Broker上,具體的是保存在指定的partition 的目錄下,以Segment的方式(.log文件和.index文件)進行存儲。
2. Consumer
Consumer是Kafka中的消費者,主要用於消費指定Topic的消息,Consumer是通過主動拉取的方式從Kafka集群中消費消息,消費者一定屬於某一個特定的消費組。
3. Topic
Kafka中的消息是根據Topic進行分類的,Topic是支持多訂閱的,一個Topic可以有多個不同的訂閱消息的消費者。Kafka集群Topic的數量沒有限制,同一個Topic的數據會被劃分在同一個目錄下,一個Topic可以包含1至多個分區,所有分區的消息加在一起就是一個Topic的所有消息。
4. Partition
在Kafka中,為了提升消息的消費速度,可以為每個Topic分配多個Partition,這也是就之前我們說到的,Kafka是支持多分區的。默認情況下,一個Topic的消息只存放在一個分區中。Topic的所有分區的消息合並起來,就是一個Topic下的所有消息。每個分區都有一個從0開始的編號,每個分區內的數據都是有序的,但是不同分區直接的數據是不能保證有序的,因為不同的分區需要不同的Consumer去消費,每個Partition只能分配一個Consumer,但是一個Consumer可以同時一個Topic的多個Partition。
5. Consumer Group
Kafka中的每一個Consumer都歸屬於一個特定的Consumer Group,如果不指定,那麼所有的Consumer都屬於同一個默認的Consumer Group。Consumer Group由一個或多個Consumer組成,同一個Consumer Group中的Consumer對同一條消息只消費一次。每個Consumer Group都有一個唯一的ID,即Group ID,也稱之為Group Name。Consumer Group內的所有Consumer協調在一起訂閱一個Topic的所有Partition,且每個Partition只能由一個Consuemr Group中的一個Consumer進行消費,但是可以由不同的Consumer Group中的一個Consumer進行消費。如下圖所示:
在層級關繫上來說Consumer Group好比是跟Topic對應的,而Consumer就對應於Topic下的Partition。Consumer Group中的Consumer數量和Topic下的Partition數量共同決定了消息消費的並發量,且Partition數量決定了最終並發量,因為一個Partition只能由一個Consumer進行消費。當一個Consumer Group中Consumer數量超過訂閱的Topic下的Partition數量時,Kafka會為每個Partition分配一個Consumer,多出來的Consumer會處於空閑狀態。當Consumer Group中Consumer數量少於當前定於的Topic中的Partition數量是,單個Consumer將承擔多個Partition的消費工作。如上圖所示,Consumer Group B中的每個Consumer需要消費兩個Partition中的數據,而Consumer Group C中會多出來一個空閑的Consumer4。總結下來就是:同一個Topic下的Partition數量越多,同一時間可以有越多的Consumer進行消費,消費的速度就會越快,吞吐量就越高。同時,Consumer Group中的Consumer數量需要控制為小於等於Partition數量,且最好是整數倍:如1,2,4等。
6. Segment
考慮到消息消費的性能,Kafka中的消息在每個Partition中是以分段的形式進行存儲的,即每1G消息新建一個Segment,每個Segment包含兩個文件:.log文件和.index文件。之前我們已經說過,.log文件就是Kafka實際存儲Procer生產的消息,而.index文件採用稀疏索引的方式存儲.log文件中對應消息的邏輯編號和物理偏移地址(offset),以便於加快數據的查詢速度。.log文件和.index文件是一一對應,成對出現的。下圖展示了.log文件和.index文件在Partition中的存在方式。
Kafka裡面每一條消息都有自己的邏輯offset(相對偏移量)以及存在物理磁碟上面實際的物理地址便宜量Position,也就是說在Kafka中一條消息有兩個位置:offset(相對偏移量)和position(磁碟物理偏移地址)。在kafka的設計中,將消息的offset作為了Segment文件名的一部分。Segment文件命名規則為:Partition全局的第一個Segment從0開始,後續每個segment文件名為上一個Partition的最大offset(Message的offset,非實際物理地偏移地址,實際物理地址需映射到.log中,後面會詳細介紹在.log文件中查詢消息的原理)。數值最大為64位long大小,由20位數字表示,前置用0填充。
上圖展示了.index文件和.log文件直接的映射關系,通過上圖,我們可以簡單介紹一下Kafka在Segment中查找Message的過程:
1. 根據需要消費的下一個消息的offset,這里假設是7,使用二分查找在Partition中查找到文件名小於(一定要小於,因為文件名編號等於當前offset的文件里存的都是大於當前offset的消息)當前offset的最大編號的.index文件,這里自然是查找到了00000000000000000000.index。
2. 在.index文件中,使用二分查找,找到offset小於或者等於指定offset(這里假設是7)的最大的offset,這里查到的是6,然後獲取到index文件中offset為6指向的Position(物理偏移地址)為258。
3. 在.log文件中,從磁碟位置258開始順序掃描,直到找到offset為7的Message。
至此,我們就簡單介紹完了Segment的基本組件.index文件和.log文件的存儲和查詢原理。但是我們會發現一個問題:.index文件中的offset並不是按順序連續存儲的,為什麼Kafka要將索引文件設計成這種不連續的樣子?這種不連續的索引設計方式稱之為稀疏索引,Kafka中採用了稀疏索引的方式讀取索引,kafka每當.log中寫入了4k大小的數據,就往.index里以追加的寫入一條索引記錄。使用稀疏索引主要有以下原因:
(1) 索引稀疏存儲,可以大幅降低.index文件佔用存儲空間大小。
(2) 稀疏索引文件較小,可以全部讀取到內存中,可以避免讀取索引的時候進行頻繁的IO磁碟操作,以便通過索引快速地定位到.log文件中的Message。
7. Message
Message是實際發送和訂閱的信息是實際載體,Procer發送到Kafka集群中的每條消息,都被Kafka包裝成了一個Message對象,之後再存儲在磁碟中,而不是直接存儲的。Message在磁碟中的物理結構如下所示。
其中 key 和 value 存儲的是實際的Message內容,長度不固定,而其他都是對Message內容的統計和描述,長度固定。因此在查找實際Message過程中,磁碟指針會根據Message的 offset 和 message length 計算移動位數,以加速Message的查找過程。之所以可以這樣加速,因為Kafka的.log文件都是順序寫的,往磁碟上寫數據時,就是追加數據,沒有隨機寫的操作。
8.Partition Replicas
最後我們簡單聊一下Kafka中的Partition Replicas(分區副本)機制,0.8版本以前的Kafka是沒有副本機制的。創建Topic時,可以為Topic指定分區,也可以指定副本個數。kafka 中的分區副本如下圖所示:
Kafka通過副本因子(replication-factor)控制消息副本保存在幾個Broker(伺服器)上,一般情況下副本數等於Broker的個數,且同一個副本因子不能放在同一個Broker中。副本因子是以分區為單位且區分角色;主副本稱之為Leader(任何時刻只有一個),從副本稱之為 Follower(可以有多個),處於同步狀態的副本叫做in-sync-replicas(ISR)。Leader負責讀寫數據,Follower不負責對外提供數據讀寫,只從Leader同步數據,消費者和生產者都是從leader讀寫數據,不與follower交互,因此Kafka並不是讀寫分離的。同時使用Leader進行讀寫的好處是,降低了數據同步帶來的數據讀取延遲,因為Follower只能從Leader同步完數據之後才能對外提供讀取服務。
如果一個分區有三個副本因子,就算其中一個掛掉,那麼只會剩下的兩個中,選擇一個leader,如下圖所示。但不會在其他的broker中,另啟動一個副本(因為在另一台啟動的話,必然存在數據拷貝和傳輸,會長時間佔用網路IO,Kafka是一個高吞吐量的消息系統,這個情況不允許發生)。如果指定分區的所有副本都掛了,Consumer如果發送數據到指定分區的話,將寫入不成功。Consumer發送到指定Partition的消息,會首先寫入到Leader Partition中,寫完後還需要把消息寫入到ISR列表裡面的其它分區副本中,寫完之後這個消息才能提交offset。
到這里,差不多把Kafka的架構和基本原理簡單介紹完了。Kafka為了實現高吞吐量和容錯,還引入了很多優秀的設計思路,如零拷貝,高並發網路設計,順序存儲,以後有時間再說。
⑸ kafka原理分析
作為一款典型的消息中間件產品,kafka系統仍然由procer、broker、consumer三部分組成。kafka涉及的幾個常用概念和組件簡派薯單介紹如下:
當consumer group的狀態發生變化(如有consumer故障、增減consumer成員等)或consumer group消費的topic狀態發生變化(如增加了partition,消費的topic發生變化),kafka集群會自動調整和重新分配consumer消費的partition,這個過程就叫做rebalance(再平衡)。
__consumer_offsets是kafka集群自己維護的一個特殊的topic,它裡面存儲的是每個consumer group已經消費了每個topic partition的offset。__consumer_offsets中offset消息的key由group id,topic name,partition id組成,格式為 {topic name}-${partition id},value值就是consumer提交的已消費的topic partition offset值。__consumer_offsets的分區數和副本數分別由offsets.topic.num.partitions(默認值為50)和offsets.topic.replication.factor(默認值為1)參數配置。我們通過公式 hash(group id) % offsets.topic.num.partitions 就可以計算出指定consumer group的已提交offset存儲的partition。由於consumer group提交的offset消息只有最後一條消息有意義,所以__consumer_offsets是一個compact topic,kafka集群會周期性的對__consumer_offsets執行compact操作,只保留最新的一次提交offset。
group coordinator運行在kafka某個broker上,負責consumer group內所有的consumer成員管理、所有的消費的topic的partition的消費關系分配、offset管理、觸發rebalance等功能。group coordinator管理partition分配時,會指定consumer group內某個consumer作為group leader執行具體的partition分配任務。存儲某個consumer group已提交offset的__consumer_offsets partition leader副本所在的broker就是該consumer group的協調器運行的broker。
跟大多數分布式系統一樣,集群有一個master角色管理整個集群,協調集群中各個成員的行為。kafka集群中的controller就相當於其它分布式系統的master,用來負責集群topic的分區分配,分區leader選舉以及維護集群的所有partition的ISR等集群協調功能。集群中哪個borker是controller也是通過一致性協議選舉產生的,2.8版本之前通腔銷過zookeeper進行選主,2.8版本後通過kafka raft協議進行選舉。如果controller崩潰,集群會重新選舉一個broker作為新的controller,並增加controller epoch值(相當於zookeeper ZAB協議的epoch,raft協議的term值)
當kafka集群新建了topic或為一個topic新增了partition,controller需要為這些新增加的partition分配到具體的broker上,並把分配結果記錄下來,供procer和consumer查詢獲取。
因為只有partition的leader副本才會處理procer和consumer的讀寫請求,而partition的其他follower副本需要從相應的leader副本同步消息,為了盡量保證集群中所有broker的負載是均衡的,controller在進行集群全局partition副本伍羨游分配時需要使partition的分布情況是如下這樣的:
在默認情況下,kafka採用輪詢(round-robin)的方式分配partition副本。由於partition leader副本承擔的流量比follower副本大,kafka會先分配所有topic的partition leader副本,使所有partition leader副本全局盡量平衡,然後再分配各個partition的follower副本。partition第一個follower副本的位置是相應leader副本的下一個可用broker,後面的副本位置依此類推。
舉例來說,假設我們有兩個topic,每個topic有兩個partition,每個partition有兩個副本,這些副本分別標記為1-1-1,1-1-2,1-2-1,1-2-2,2-1-1,2-1-2,2-2-1,2-2-2(編碼格式為topic-partition-replia,編號均從1開始,第一個replica是leader replica,其他的是follower replica)。共有四個broker,編號是1-4。我們先對broker按broker id進行排序,然後分配leader副本,最後分配foller副本。
1)沒有配置broker.rack的情況
現將副本1-1-1分配到broker 1,然後1-2-1分配到broker 2,依此類推,2-2-1會分配到broker 4。partition 1-1的leader副本分配在broker 1上,那麼下一個可用節點是broker 2,所以將副本1-1-2分配到broker 2上。同理,partition 1-2的leader副本分配在broker 2上,那麼下一個可用節點是broker 3,所以將副本1-1-2分配到broker 3上。依此類推分配其他的副本分片。最後分配的結果如下圖所示:
2)配置了broker.rack的情況
假設配置了兩個rack,broker 1和broker 2屬於Rack 1,broker 3和broker 4屬於Rack 2。我們對rack和rack內的broker分別排序。然後先將副本1-1-1分配到Rack 1的broker 1,然後將副本1-2-1分配到下一個Rack的第一個broker,即Rack 2的broker 3。其他的parttition leader副本依此類推。然後分配follower副本,partition 1-1的leader副本1-1-1分配在Rack 1的broker上,下一個可用的broker是Rack 2的broker 3,所以分配到broker 3上,其他依此類推。最後分配的結果如下圖所示:
kafka除了按照集群情況自動分配副本,也提供了reassign工具人工分配和遷移副本到指定broker,這樣用戶可以根據集群實際的狀態和各partition的流量情況分配副本
kafka集群controller的一項功能是在partition的副本中選擇一個副本作為leader副本。在topic的partition創建時,controller首先分配的副本就是leader副本,這個副本又叫做preference leader副本。
當leader副本所在broker失效時(宕機或網路分區等),controller需要為在該broker上的有leader副本的所有partition重新選擇一個leader,選擇方法就是在該partition的ISR中選擇第一個副本作為新的leader副本。但是,如果ISR成員只有一個,就是失效的leader自身,其餘的副本都落後於leader怎麼辦?kafka提供了一個unclean.leader.election配置參數,它的默認值為true。當unclean.leader.election值為true時,controller還是會在非ISR副本中選擇一個作為leader,但是這時候使用者需要承擔數據丟失和數據不一致的風險。當unclean.leader.election值為false時,則不會選擇新的leader,該partition處於不可用狀態,只能恢復失效的leader使partition重新變為可用。
當preference leader失效後,controller重新選擇一個新的leader,但是preference leader又恢復了,而且同步上了新的leader,是ISR的成員,這時候preference leader仍然會成為實際的leader,原先的新leader變為follower。因為在partition leader初始分配時,使按照集群副本均衡規則進行分配的,這樣做可以讓集群盡量保持平衡。
為了保證topic的高可用,topic的partition往往有多個副本,所有的follower副本像普通的consumer一樣不斷地從相應的leader副本pull消息。每個partition的leader副本會維護一個ISR列表存儲到集群信息庫里,follower副本成為ISR成員或者說與leader是同步的,需要滿足以下條件:
1)follower副本處於活躍狀態,與zookeeper(2.8之前版本)或kafka raft master之間的心跳正常
2)follower副本最近replica.lag.time.max.ms(默認是10秒)時間內從leader同步過最新消息。需要注意的是,一定要拉取到最新消息,如果最近replica.lag.time.max.ms時間內拉取過消息,但不是最新的,比如落後follower在追趕leader過程中,也不會成為ISR。
follower在同步leader過程中,follower和leader都會維護幾個參數,來表示他們之間的同步情況。leader和follower都會為自己的消息隊列維護LEO(Last End Offset)和HW(High Watermark)。leader還會為每一個follower維護一個LEO。LEO表示leader或follower隊列寫入的最後一條消息的offset。HW表示的offset對應的消息寫入了所有的ISR。當leader發現所有follower的LEO的最小值大於HW時,則會增加HW值到這個最小值LEO。follower拉取leader的消息時,同時能獲取到leader維護的HW值,如果follower發現自己維護的HW值小於leader發送過來的HW值,也會增加本地的HW值到leader的HW值。這樣我們可以得到一個不等式: follower HW <= leader HW <= follower LEO <= leader LEO 。HW對應的log又叫做committed log,consumer消費partititon的消息時,只能消費到offset值小於或等於HW值的消息的,由於這個原因,kafka系統又稱為分布式committed log消息系統。
kafka的消息內容存儲在log.dirs參數配置的目錄下。kafka每個partition的數據存放在本地磁碟log.dirs目錄下的一個單獨的目錄下,目錄命名規范為 ${topicName}-${partitionId} ,每個partition由多個LogSegment組成,每個LogSegment由一個數據文件(命名規范為: {baseOffset}.index)和一個時間戳索引文件(命名規范為:${baseOffset}.timeindex)組成,文件名的baseOffset就是相應LogSegment中第一條消息的offset。.index文件存儲的是消息的offset到該消息在相應.log文件中的偏移,便於快速在.log文件中快速找到指定offset的消息。.index是一個稀疏索引,每隔一定間隔大小的offset才會建立相應的索引(比如每間隔10條消息建立一個索引)。.timeindex也是一個稀疏索引文件,這樣可以根據消息的時間找到對應的消息。
可以考慮將消息日誌存放到多個磁碟中,這樣多個磁碟可以並發訪問,增加消息讀寫的吞吐量。這種情況下,log.dirs配置的是一個目錄列表,kafka會根據每個目錄下partition的數量,將新分配的partition放到partition數最少的目錄下。如果我們新增了一個磁碟,你會發現新分配的partition都出現在新增的磁碟上。
kafka提供了兩個參數log.segment.bytes和log.segment.ms來控制LogSegment文件的大小。log.segment.bytes默認值是1GB,當LogSegment大小達到log.segment.bytes規定的閾值時,kafka會關閉當前LogSegment,生成一個新的LogSegment供消息寫入,當前供消息寫入的LogSegment稱為活躍(Active)LogSegment。log.segment.ms表示最大多長時間會生成一個新的LogSegment,log.segment.ms沒有默認值。當這兩個參數都配置了值,kafka看哪個閾值先達到,觸發生成新的LogSegment。
kafka還提供了log.retention.ms和log.retention.bytes兩個參數來控制消息的保留時間。當消息的時間超過了log.retention.ms配置的閾值(默認是168小時,也就是一周),則會被認為是過期的,會被kafka自動刪除。或者是partition的總的消息大小超過了log.retention.bytes配置的閾值時,最老的消息也會被kafka自動刪除,使相應partition保留的總消息大小維持在log.retention.bytes閾值以下。這個地方需要注意的是,kafka並不是以消息為粒度進行刪除的,而是以LogSegment為粒度刪除的。也就是說,只有當一個LogSegment的最後一條消息的時間超過log.retention.ms閾值時,該LogSegment才會被刪除。這兩個參數都配置了值時,也是只要有一個先達到閾值,就會執行相應的刪除策略
當我們使用KafkaProcer向kafka發送消息時非常簡單,只要構造一個包含消息key、value、接收topic信息的ProcerRecord對象就可以通過KafkaProcer的send()向kafka發送消息了,而且是線程安全的。KafkaProcer支持通過三種消息發送方式
KafkaProcer客戶端雖然使用簡單,但是一條消息從客戶端到topic partition的日誌文件,中間需要經歷許多的處理過程。KafkaProcer的內部結構如下所示:
從圖中可以看出,消息的發送涉及兩類線程,一類是調用KafkaProcer.send()方法的應用程序線程,因為KafkaProcer.send()是多線程安全的,所以這樣的線程可以有多個;另一類是與kafka集群通信,實際將消息發送給kafka集群的Sender線程,當我們創建一個KafkaProcer實例時,會創建一個Sender線程,通過該KafkaProcer實例發送的所有消息最終通過該Sender線程發送出去。RecordAccumulator則是一個消息隊列,是應用程序線程與Sender線程之間消息傳遞的橋梁。當我們調用KafkaProcer.send()方法時,消息並沒有直接發送出去,只是寫入了RecordAccumulator中相應的隊列中,最終需要Sender線程在適當的時機將消息從RecordAccumulator隊列取出來發送給kafka集群。
消息的發送過程如下:
在使用KafkaConsumer實例消費kafka消息時,有一個特性我們要特別注意,就是KafkaConsumer不是多線程安全的,KafkaConsumer方法都在調用KafkaConsumer的應用程序線程中運行(除了consumer向kafka集群發送的心跳,心跳在一個專門的單獨線程中發送),所以我們調用KafkaConsumer的所有方法均需要保證在同一個線程中調用,除了KafkaConsumer.wakeup()方法,它設計用來通過其它線程向consumer線程發送信號,從而終止consumer執行。
跟procer一樣,consumer要與kafka集群通信,消費kafka消息,首先需要獲取消費的topic partition leader replica所在的broker地址等信息,這些信息可以通過向kafka集群任意broker發送Metadata請求消息獲取。
我們知道,一個consumer group有多個consumer,一個topic有多個partition,而且topic的partition在同一時刻只能被consumer group內的一個consumer消費,那麼consumer在消費partition消息前需要先確定消費topic的哪個partition。partition的分配通過group coordinator來實現。基本過程如下:
我們可以通過實現介面org.apache.kafka.clients.consumer.internals.PartitionAssignor自定義partition分配策略,但是kafka已經提供了三種分配策略可以直接使用。
partition分配完後,每個consumer知道了自己消費的topic partition,通過metadata請求可以獲取相應partition的leader副本所在的broker信息,然後就可以向broker poll消息了。但是consumer從哪個offset開始poll消息?所以consumer在第一次向broker發送FetchRequest poll消息之前需要向Group Coordinator發送OffsetFetchRequest獲取消費消息的起始位置。Group Coordinator會通過key {topic}-${partition}查詢 __consumer_offsets topic中是否有offset的有效記錄,如果存在,則將consumer所屬consumer group最近已提交的offset返回給consumer。如果沒有(可能是該partition是第一次分配給該consumer group消費,也可能是該partition長時間沒有被該consumer group消費),則根據consumer配置參數auto.offset.reset值確定consumer消費的其實offset。如果auto.offset.reset值為latest,表示從partition的末尾開始消費,如果值為earliest,則從partition的起始位置開始消費。當然,consumer也可以隨時通過KafkaConsumer.seek()方法人工設置消費的起始offset。
kafka broker在收到FetchRequest請求後,會使用請求中topic partition的offset查一個skiplist表(該表的節點key值是該partition每個LogSegment中第一條消息的offset值)確定消息所屬的LogSegment,然後繼續查LogSegment的稀疏索引表(存儲在.index文件中),確定offset對應的消息在LogSegment文件中的位置。為了提升消息消費的效率,consumer通過參數fetch.min.bytes和max.partition.fetch.bytes告訴broker每次拉取的消息總的最小值和每個partition的最大值(consumer一次會拉取多個partition的消息)。當kafka中消息較少時,為了讓broker及時將消息返回給consumer,consumer通過參數fetch.max.wait.ms告訴broker即使消息大小沒有達到fetch.min.bytes值,在收到請求後最多等待fetch.max.wait.ms時間後,也將當前消息返回給consumer。fetch.min.bytes默認值為1MB,待fetch.max.wait.ms默認值為500ms。
為了提升消息的傳輸效率,kafka採用零拷貝技術讓內核通過DMA把磁碟中的消息讀出來直接發送到網路上。因為kafka寫入消息時將消息寫入內存中就返回了,如果consumer跟上了procer的寫入速度,拉取消息時不需要讀磁碟,直接從內存獲取消息發送出去就可以了。
為了避免發生再平衡後,consumer重復拉取消息,consumer需要將已經消費完的消息的offset提交給group coordinator。這樣發生再平衡後,consumer可以從上次已提交offset出繼續拉取消息。
kafka提供了多種offset提交方式
partition offset提交和管理對kafka消息系統效率來說非常關鍵,它直接影響了再平衡後consumer是否會重復拉取消息以及重復拉取消息的數量。如果offset提交的比較頻繁,會增加consumer和kafka broker的消息處理負載,降低消息處理效率;如果offset提交的間隔比較大,再平衡後重復拉取的消息就會比較多。還有比較重要的一點是,kafka只是簡單的記錄每次提交的offset值,把最後一次提交的offset值作為最新的已提交offset值,作為再平衡後消息的起始offset,而什麼時候提交offset,每次提交的offset值具體是多少,kafka幾乎不關心(這個offset對應的消息應該存儲在kafka中,否則是無效的offset),所以應用程序可以先提交3000,然後提交2000,再平衡後從2000處開始消費,決定權完全在consumer這邊。
kafka中的topic partition與consumer group中的consumer的消費關系其實是一種配對關系,當配對雙方發生了變化時,kafka會進行再平衡,也就是重新確定這種配對關系,以提升系統效率、高可用性和伸縮性。當然,再平衡也會帶來一些負面效果,比如在再平衡期間,consumer不能消費kafka消息,相當於這段時間內系統是不可用的。再平衡後,往往會出現消息的重復拉取和消費的現象。
觸發再平衡的條件包括:
需要注意的是,kafka集群broker的增減或者topic partition leader重新選主這類集群狀態的變化並不會觸發在平衡
有兩種情況與日常應用開發比較關系比較密切:
consumer在調用subscribe()方法時,支持傳入一個ConsumerRebalanceListener監聽器,ConsumerRebalanceListener提供了兩個方法,onPartitionRevoked()方法在consumer停止消費之後,再平衡開始之前被執行。可以發現,這個地方是提交offset的好時機。onPartitonAssigned()方法則會在重新進行partition分配好了之後,但是新的consumer還未消費之前被執行。
我們在提到kafka時,首先想到的是它的吞吐量非常大,這也是很多人選擇kafka作為消息傳輸組件的重要原因。
以下是保證kafka吞吐量大的一些設計考慮:
但是kafka是不是總是這么快?我們同時需要看到kafka為了追求快舍棄了一些特性:
所以,kafka在消息獨立、允許少量消息丟失或重復、不關心消息順序的場景下可以保證非常高的吞吐量,但是在需要考慮消息事務、嚴格保證消息順序等場景下procer和consumer端需要進行復雜的考慮和處理,可能會比較大的降低kafka的吞吐量,例如對可靠性和保序要求比較高的控制類消息需要非常謹慎的權衡是否適合使用kafka。
我們通過procer向kafka集群發送消息,總是期望消息能被consumer成功消費到。最不能忍的是procer收到了kafka集群消息寫入的正常響應,但是consumer仍然沒有消費到消息。
kafka提供了一些機制來保證消息的可靠傳遞,但是有一些因素需要仔細權衡考慮,這些因素往往會影響kafka的吞吐量,需要在可靠性與吞吐量之間求得平衡:
kafka只保證partition消息順序,不保證topic級別的順序,而且保證的是partition寫入順序與讀取順序一致,不是業務端到端的保序。
如果對保序要求比較高,topic需要只設置一個partition。這時可以把參數max.in.flight.requests.per.connection設置為1,而retries設置為大於1的數。這樣即使發生了可恢復型錯誤,仍然能保證消息順序,但是如果發生不可恢復錯誤,應用層進行重試的話,就無法保序了。也可以採用同步發送的方式,但是這樣也極大的降低了吞吐量。如果消息攜帶了表示順序的欄位,可以在接收端對消息進行重新排序以保證最終的有序。