Skip to main content

Ch11: 流式處理

  • ch10 Batch processing中,有一個大假設:輸入是有界的,是已知的有限大小.如mapreduce必須先完成所有輸入才可以產生輸出.(因為可能最後輸入的紀錄次序最低的key,而他就是要輸出的紀錄,有些狀況是user沒辦法等,所以streaming process就是來及時處理每次的事件.

發送事件流

  • 在stream processing中,紀錄通常被稱為事件event,一個小的、自包含的、不可變的物件,它包含在某個時間點所發生的事情的細節
  • 事件可能有生產者(Producer, aka publisher or sender)產生一次,然後可能由多個消費者(consumer, aka subscriber or recipients)處理.
  • 相關的事件會被稱為主題(topic) or 串流(stream)

訊息傳遞系統

Unix pipe or TCP connection是實作的簡單方法,但缺點是指可以連接一個發送方和一個接收方,而這張要討論的訊息傳遞系統是可以有多個生產者發送訊息到相同的topic,並可以有多個接收方.

兩個問題可以幫忙釐清適合用哪一種系統:

  1. 如果生產者發送訊息的速度 > 消費者處理的速度?:

    1. 系統可以丟棄訊息

    2. 將訊息緩衝在佇列中

    3. 啟動背壓機制(backpressure 即阻止生產者發送更多訊息)參考ch8

  2. 如果節點崩潰或暫時離線?是否會有訊息丟失?\ 與資料庫相同,持久話可能需要寫入磁碟,或有某種複製方式支援,但缺點就是較低的效能,若能承受偶爾丟失訊息,可能就可以在相同硬體條件下,獲得更高的吞吐量、更低延遲

    可否丟失訊息,取決於application是否偶爾丟失一些資料點也無仿,因為更新的數據很快就會再發送回來,例如:週期性傳輸的感測器數據(天氣資訊?)

batch processing的優點就是,提供強大的可靠性保證,失敗的任務可以自動重啟,失敗任務的部分輸出也會自動被丟棄.

直接從生產者傳遞給消費者

許多訊息傳遞系統是生產者與消費者之間使用直接的網路通訊,中間沒有節點

  • 金融行業的streams會廣泛使用UDP multicast

  • zeroMQ, nanomsg等無代理(brokerless)也用類似方法,通過TCP or IP的multicast實作發佈、訂閱式的訊息傳遞

  • StatsD和brubeck使用不可靠的UDP訊息來收集網路中所有機器的指標以監控他們

  • 如果消費者在網路上公開一個服務,生產者可以直接發出HTTP or RPC request來將訊息推送給消費者

缺點:對故障容忍度很低,需假設消費者、生產者一直處在線上

訊息代理(message broker)

也稱訊息佇列(message queue),是針對處理message streams而最佳化的資料庫,他作為伺服器運行,生產者和消費者則作為客戶端連接他

將資料持久化的問題轉移到broker身上,有時訊息太多,也會造成消費者需要以非同步的方式工作

訊息代理與資料庫的比較

  1. 資料庫通常會保留資料,直到資料被明確刪除.而大多數message brokers會在訊息成功傳遞給消費者後自動刪除

  2. 因為訊息很快就刪除,所以大多數brokers的working set都很小,若消費者消耗速度很慢,而需要緩衝大量訊息的話,可能會將部分訊息轉到磁碟上,吞吐量受影響

  3. 資料庫通常支援次級索引和各種搜尋方式,而message broker通常只支援以某種方式來訂閱匹配到某種pattern的topic

  4. 查詢資料庫時,結果通常基於某個時間點的資料快照;如果另一個客戶端隨後向資料庫寫入一些改變了查詢結果的內容,則第一個客戶端不會發現其先前結果現已過期(除非它重複查詢或輪詢變更)。相比之下,訊息代理不支援任意查詢,但是當資料發生變化時(即新訊息可用時),它們會通知客戶端。

這是關於訊息代理的傳統觀點,它被封裝在諸如 JMS 和 AMQP 的標準中,並且被諸如 RabbitMQ、ActiveMQ、HornetQ、Qpid、TIBCO 企業訊息服務、IBM MQ、Azure Service Bus 和 Google Cloud Pub/Sub 所實現 。

所以之後會提到AMQP/JMS-style message broker就是如此

多個消費者

當多個消費者從同一主題中讀取訊息時,有兩種主要的訊息傳遞模式,如圖11-1所示:

  • 負載均衡(load balancing)

    每條訊息都被傳遞給消費者 之一,所以處理該主題下訊息的工作能被多個消費者共享。代理可以為消費者任意分配訊息。當處理訊息的代價高昂,希望能並行處理訊息時,此模式非常有用

  • 扇出(fan-out)

    每條訊息都被傳遞給 所有 消費者。扇出允許幾個獨立的消費者各自 “收聽” 相同的訊息廣播,而不會相互影響 —— 這個streaming的概念等效於batch processing中多個不同批處理作業讀取同一份輸入檔案

fig11-1.png

兩種模式可以組合使用:例如,兩個獨立的消費者組可以每組各訂閱同一個主題,每一組都共同收到所有訊息,但在每一組內部,每條訊息僅由單個節點處理。

確認與重新傳遞

訊息代理使用 確認(acknowledgments):客戶端必須顯式告知代理訊息處理完畢的時間,以便代理能將訊息從佇列中移除。

如果與客戶端的連線關閉,或者代理超出一段時間未收到確認,代理則認為訊息沒有被處理,因此它將訊息再遞送給另一個消費者。

(請注意可能發生這樣的情況,訊息 實際上是 處理完畢的,但 確認 在網路中丟失了。需要一種原子提交協議才能處理這種情況,正如在 “++實踐中的分散式事務++” 中所討論的那樣)

fig11-2.png

即使訊息代理試圖保留訊息的順序(如 JMS 和 AMQP 標準所要求的),負載均衡與重傳的組合也不可避免地導致訊息被重新排序。為避免此問題,你可以讓每個消費者使用單獨的佇列(即不使用負載均衡功能)。如果訊息是完全獨立的,則訊息順序重排並不是一個問題。

分區日誌

  • message brokers是基於暫態訊息傳遞的想法所建構的,但資料庫、檔案系統則相反

如果你將新的消費者新增到訊息傳遞系統,通常只能接收到消費者註冊之後開始傳送的訊息。先前的任何訊息都隨風而逝,一去不復返。作為對比,你可以隨時為檔案和資料庫新增新的客戶端,且能讀取任意久遠的資料(只要應用沒有顯式覆蓋或刪除這些資料)。

例如:圖11-1中,在m2的確認(acknowledgments)後才新增consumer3,m1, m2就收不到

為什麼我們不能把它倆雜交一下,既有資料庫的持久儲存方式,又有訊息傳遞的低延遲通知?這就是 基於日誌的訊息代理(log-based message brokers) 背後的想法。

使用日誌來儲存訊息

  • 生產者透過將訊息追加到日誌末尾來發送訊息,而消費者透過依次讀取日誌來接收訊息。如果消費者讀到日誌末尾,則會等待新訊息追加的通知。

fig11-3.png

  • 為了突破單個磁碟所能提供的吞吐量限制,可以對日誌進行分區,每個分區都成為獨立的日誌,可以獨立於其他分區進行讀寫,進而提高吞吐量.

Apache Kafka 、Amazon Kinesis Streams 和 Twitter 的 DistributedLog 都是基於日誌的訊息代理。Google Cloud Pub/Sub 在架構上類似,但對外暴露的是 JMS 風格的 API,而不是日誌抽象。儘管這些訊息代理將所有訊息寫入磁碟,但透過跨多臺機器分割槽,每秒能夠實現數百萬條訊息的吞吐量,並透過複製訊息來實現容錯性。

日誌與傳統訊息傳遞(JMS/AMQP-style)的比較

基於日誌的方法天然支援扇出式訊息傳遞,因為多個消費者可以獨立讀取日誌,而不會相互影響 —— 讀取訊息不會將其從日誌中刪除。為了在一組消費者之間實現負載平衡,代理可以將整個分割槽分配給消費者組中的節點,而不是將單條訊息分配給消費者客戶端。

然後每個客戶端將消費被指派分割槽中的 所有 訊息。通常情況下,當一個使用者被指派了一個日誌分割槽時,它會以簡單的單執行緒方式順序地讀取分割槽中的訊息。這種粗粒度的負載均衡方法有一些缺點:

  • 共享消費主題工作的節點數,最多為該主題中的日誌分割槽數,因為同一個分割槽內的所有訊息被遞送到同一個節點。

  • 如果某條訊息處理緩慢,則它會阻塞該分割槽中後續訊息的處理

  • 因此在訊息處理代價高昂,希望逐條並行處理,以及訊息的順序並沒有那麼重要的情況下,JMS/AMQP 風格的訊息代理是可取的。另一方面,在訊息吞吐量很高,處理迅速,順序很重要的情況下,基於日誌的方法表現得非常好。

消費者偏移量

  • 使用消費者偏移量的好處:

所有偏移量小於消費者的當前偏移量的訊息已經被處理,而具有更大偏移量的訊息還沒有被看到。因此,代理不需要跟蹤確認每條訊息,只需要定期記錄消費者的偏移即可。這種方法減少了額外簿記開銷,而且在批處理和流處理中採用這種方法有助於提高基於日誌的系統的吞吐量。

實際上,這種偏移量與單領導者資料庫複製中常見的日誌序列號非常相似,我們在 “++設定新從庫++” 中討論了這種情況。在資料庫複製中,日誌序列號允許跟隨者斷開連線後,重新連線到領導者,並在不跳過任何寫入的情況下恢復複製。這裡原理完全相同:訊息代理表現得像一個主庫,而消費者就像一個從庫。

如果消費者節點失效,則失效消費者的分割槽將指派給其他節點,並從最後記錄的偏移量開始消費訊息。如果消費者已經處理了後續的訊息,但還沒有記錄它們的偏移量,那麼重啟後這些訊息將被處理兩次。我們將在本章後面討論這個問題的處理方法。

磁碟空間使用情況

如果只追加寫入日誌,則磁碟空間終究會耗盡。為了回收磁碟空間,日誌實際上被分割成段,並不時地將舊段刪除或移動到歸檔儲存。(我們將在後面討論一種更為複雜的磁碟空間釋放方式)

這就意味著如果一個慢消費者跟不上訊息產生的速率而落後得太多,它的消費偏移量指向了刪除的段,那麼它就會錯過一些訊息。實際上,日誌實現了一個有限大小的緩衝區,當緩衝區填滿時會丟棄舊訊息,它也被稱為 迴圈緩衝區(circular buffer) 或 環形緩衝區(ring buffer)。不過由於緩衝區在磁碟上,因此緩衝區可能相當的大。

如果生產者發送訊息的速度 > 消費者處理的速度?:

  1. 系統可以丟棄訊息

  2. 將訊息緩衝在佇列中 -- "為log-based message queue的處理方式"

  3. 啟動背壓機制(backpressure 即阻止生產者發送更多訊息)參考ch8

如果消費者遠遠落後,而所要求的資訊比保留在磁碟上的資訊還要舊,那麼它將不能讀取這些資訊,所以代理實際上丟棄了比緩衝區容量更大的舊資訊。你可以監控消費者落後日誌頭部的距離,如果落後太多就發出報警。由於緩衝區很大,因而有足夠的時間讓運維人員來修復慢消費者,並在訊息開始丟失之前讓其趕上。

即使消費者真的落後太多開始丟失訊息,也只有那個消費者受到影響;它不會中斷其他消費者的服務。這是一個巨大的運維優勢:你可以實驗性地消費生產日誌,以進行開發,測試或除錯,而不必擔心會中斷生產服務。當消費者關閉或崩潰時,會停止消耗資源,唯一剩下的只有消費者偏移量。

這種行為也與傳統的訊息代理(JMS/AMQP-style)形成了鮮明對比,在那種情況下,你需要小心地刪除那些消費者已經關閉的佇列 —— 否則那些佇列就會累積不必要的訊息,從其他仍活躍的消費者那裡佔走記憶體。

重播舊訊息

  • JMS/AMQP-style message broker 處理和確認訊息是破壞性操作

  • log-based message broker則更像是唯讀操作

所以其實消費者可以用更改消費者偏移量的方式來重啟,這個面向使得log-based訊息傳遞更像上一章的batch processing,其中衍生資料透過可重複的轉換過程與輸入資料顯式分離。它允許進行更多的實驗,更容易從錯誤和bug中恢復,使其成為在組織內整合資料流的良好工具。

資料庫與串流

  • 我們已經在訊息代理和資料庫之間進行了一些比較。儘管傳統上它們被視為單獨的工具類別,但是我們看到,log-based message brokers已經成功地從資料庫中獲取靈感並將其應用於訊息傳遞。我們也可以反過來:從訊息傳遞和流中獲取靈感,並將它們應用於資料庫。

保持系統同步

  • 正如我們在本書中所看到的,沒有一個系統能夠滿足所有的資料儲存、查詢和處理需求。在實踐中,大多數重要應用都需要組合使用幾種不同的技術來滿足所有的需求。

由於相同或相關的資料出現在了不同的地方,因此相互間需要保持同步:如果某個專案在資料庫中被更新,它也應當在快取、搜尋索引和資料倉庫中被更新。

如果週期性的完整資料庫轉儲過於緩慢,有時會使用的替代方法是 雙寫(dual write),其中應用程式碼在資料變更時明確寫入每個系統。

但是,雙寫有一些嚴重的問題,其中一個是競爭條件

fig11-4.png

圖 11-4 在資料庫中 X 首先被設定為 A,然後被設定為 B,而在搜尋索引處,寫入以相反的順序到達

這時候就必須有額外的病發檢測機制,如ch.5提到的檢測並發寫入等.

雙重寫入的另一個問題是,其中一個寫入可能會失敗,而另一個成功。這是一個容錯問題,而不是一個併發問題,但也會造成兩個系統互相不一致的結果。

  • 如果實際上只有一個領導者 —— 例如,資料庫 —— 而且我們能讓搜尋索引成為資料庫的追隨者,情況要好得多。但這在實踐中可能嗎?

變更資料的捕獲

  • 變更資料捕獲(change data capture, CDC) 是一種觀察寫入資料庫的所有資料變更,並將其萃取為可以複製到其他系統中的形式的過程,在資料寫入時立即以stream的形式提供出來.

fig11-5.png

CDC實作:

我們可以將日誌消費者叫做 衍生資料系統(derived data systems):儲存在搜尋索引和資料倉庫中的資料,只是 記錄系統 資料的額外檢視。變更資料捕獲是一種機制,可確保對記錄系統所做的所有更改都反映在衍生資料系統中,以便衍生系統具有資料的準確副本。

從本質上說,變更資料捕獲使得一個數據庫成為leader(被捕獲變化的資料庫),並將其他元件變為followers。log-based message broker非常適合從源資料庫傳輸變更事件,因為它保留了訊息的順序(避免了 ++圖 11-2++ 的重新排序問題)。

資料庫觸發器可用來實現變更資料捕獲(請參閱 ch.5),透過註冊觀察所有變更的觸發器,並將相應的變更項寫入變更日誌表中。但是它們往往是脆弱的,而且有顯著的效能開銷。解析複製日誌可能是一種更穩健的方法,但它也很有挑戰,例如如何應對模式變更。

LinkedIn 的 Databus,Facebook 的 Wormhole和 Yahoo! 的 Sherpa大規模地應用這個思路。Bottled Water 使用解碼 WAL 的 API 實現了 PostgreSQL 的 CDC,Maxwell 和 Debezium 透過解析 binlog 對 MySQL 做了類似的事情,Mongoriver 讀取 MongoDB oplog,而 GoldenGate 為 Oracle 提供類似的功能。

  • 註:類似於message brokers,變更資料捕獲通常是非同步的:記錄資料庫系統在提交變更之前不會等待消費者應用變更。這種設計具有的運維優勢是,新增緩慢的消費者不會過度影響記錄系統。不過,所有複製延遲可能有的問題在這裡都可能出現

初始快照

  • 如果你擁有所有對資料庫進行變更的日誌,則可以透過重播日誌,來重建完整資料庫,但是在許多情況下,永遠保留所有更改會耗費太多磁碟空間,

資料庫的初始快照必須與變更日誌中的已知位置或偏移量相對應,以便在處理完快照後知道從哪裡開始應用變更。一些 CDC 工具集成了這種快照功能,而其他工具則把它留給你手動執行。

日誌壓縮

我們之前在 “雜湊索引” 中關於日誌結構儲存引擎的上下文中討論了日誌壓縮(請參閱 圖 3-2 的示例)。原理很簡單:儲存引擎定期在日誌中查詢具有相同鍵的記錄,丟掉所有重複的內容,並只保留每個鍵的最新更新。這個壓縮與合併過程在後臺執行。

在日誌結構儲存引擎中,具有特殊值 NULL(墓碑,即 tombstone)的更新表示該鍵被刪除,並會在日誌壓縮過程中被移除。但只要鍵不被覆蓋或刪除,它就會永遠留在日誌中。這種壓縮日誌所需的磁碟空間僅取決於資料庫的當前內容,而不取決於資料庫中曾經發生的寫入次數。如果相同的鍵經常被覆蓋寫入,則先前的值將最終將被垃圾回收,只有最新的值會保留下來。

現在,無論何時需要重建衍生資料系統(如搜尋索引),你可以從壓縮日誌主題的零偏移量處啟動新的消費者,然後依次掃描日誌中的所有訊息。日誌能保證包含資料庫中每個鍵的最新值(也可能是一些較舊的值)—— 換句話說,你可以使用它來獲取資料庫內容的完整副本,而無需從 CDC 源資料庫取一個快照。

事件溯源

事件溯源(Event Sourcing) | Newbe.Claptrap

我們在這裡討論的CDC想法和 事件溯源(Event Sourcing) 之間有一些相似之處,這是一個在 領域驅動設計(domain-driven design, DDD) 社群中折騰出來的技術。我們將簡要討論事件溯源,因為它包含了一些關於流處理系統的有用想法。

  • 在CDC中,應用以 可變方式(mutable way) 使用資料庫,可以任意更新和刪除記錄。變更日誌是從資料庫的底層提取的(例如,透過解析複製日誌),從而確保從資料庫中提取的寫入順序與實際寫入的順序相匹配,從而避免 圖 11-4 中的競態條件。寫入資料庫的應用不需要知道 CDC 的存在。

  • 在事件溯源中,應用邏輯顯式構建在寫入事件日誌的不可變事件之上。在這種情況下,事件儲存是僅追加寫入的,更新與刪除是不鼓勵的或禁止的。事件被設計為旨在反映應用層面發生的事情,而不是底層的狀態變更。

諸如 Event Store這樣的專業資料庫已經被開發出來,供使用事件溯源的應用使用,但總的來說,這種方法獨立於任何特定的工具。傳統的資料庫或基於日誌的訊息代理也可以用來構建這種風格的應用。

從事件日誌衍生出當前狀態

與CDC一樣,重播事件日誌允許讓你重新構建系統的當前狀態。不過,日誌壓縮需要採用不同的方式處理:

  • 用於記錄更新的 CDC 事件通常包含記錄的 完整新版本,因此主鍵的當前值完全由該主鍵的最近事件確定,而日誌壓縮可以丟棄相同主鍵的先前事件。

  • 另一方面,事件溯源在更高層次進行建模:事件通常表示使用者操作的意圖,而不是因為操作而發生的狀態更新機制。在這種情況下,後面的事件通常不會覆蓋先前的事件,所以你需要完整的歷史事件來重新構建最終狀態。這裡進行同樣的日誌壓縮是不可能的。

命令和事件

事件溯源的哲學是仔細區分 事件(event)命令(command)。當來自使用者的請求剛到達時,它一開始是一個命令:在這個時間點上它仍然可能失敗,比如,因為違反了一些完整性條件。應用必須首先驗證它是否可以執行該命令。如果驗證成功並且命令被接受,則它變為一個持久化且不可變的事件。

例如,如果使用者試圖註冊特定使用者名稱,或預定飛機或劇院的座位,則應用需要檢查使用者名稱或座位是否已被佔用。(先前在 “容錯共識” 中討論過這個例子)當檢查成功時,應用可以生成一個事件,指示特定的使用者名稱是由特定的使用者 ID 註冊的,或者座位已經預留給特定的顧客。

在事件生成的時刻,它就成為了 事實(fact)。即使客戶稍後決定更改或取消預訂,他們之前曾預定了某個特定座位的事實仍然成立,而更改或取消是之後新增的單獨的事件。

狀態、串流和不變性

無論狀態如何變化,總是有一系列事件導致了這些變化。即使事情已經執行與回滾,這些事件出現是始終成立的。關鍵的想法是:可變的狀態與不可變事件的僅追加日誌相互之間並不矛盾:它們是一體兩面,互為陰陽的。所有變化的日誌 —— 變化日誌(changelog),表示了隨時間演變的狀態。

fig11-6.png

如果你持久儲存了變更日誌,那麼重現狀態就非常簡單。如果你認為事件日誌是你的記錄系統,而所有的衍生狀態都從它派生而來,那麼系統中的資料流動就容易理解的多。正如帕特・赫蘭(Pat Helland)所說的【52】:

事務日誌記錄了資料庫的所有變更。高速追加是更改日誌的唯一方法。從這個角度來看,資料庫的內容其實是日誌中記錄最新值的快取。日誌才是真相,資料庫是日誌子集的快取,這一快取子集恰好來自日誌中每條記錄與索引值的最新值。

日誌壓縮(如 “日誌壓縮” 中所述)是連線日誌與資料庫狀態之間的橋樑:它只保留每條記錄的最新版本,並丟棄被覆蓋的版本。

不可變事件的優點

  1. 使用不可變事件的僅追加日誌,診斷問題與故障恢復就要容易的多

會計在幾個世紀以來一直在財務記賬中應用不變性。一筆交易發生時,它被記錄在一個僅追加寫入的分類帳中,如果發生錯誤,會計師不會刪除或更改分類帳中的錯誤交易 —— 而是新增另一筆交易以補償錯誤

  1. 不可變的事件也包含了比當前狀態更多的資訊。

例如在購物網站上,顧客可以將物品新增到他們的購物車,然後再將其移除。雖然從履行訂單的角度,第二個事件取消了第一個事件,但對分析目的而言,知道客戶考慮過某個特定項而之後又反悔,可能是很有用的。也許他們會選擇在未來購買,或者他們已經找到了替代品。這個資訊被記錄在事件日誌中,但對於移出購物車就刪除記錄的資料庫而言,這個資訊在移出購物車時可能就丟失了

從同一事件日誌衍生多個視圖

透過從不變的事件日誌中分離出可變的狀態,你可以針對不同的讀取方式,從相同的事件日誌中衍生出幾種不同的表現形式。效果就像一個流的多個消費者一樣 (fig.11-5)

新增從事件日誌到資料庫的顯式轉換,能夠使應用更容易地隨時間演進:如果你想要引入一個新功能,以新的方式表示現有資料,則可以使用事件日誌來構建一個單獨的、針對新功能的讀取最佳化檢視,無需修改現有系統而與之共存。並行執行新舊系統通常比在現有系統中執行複雜的模式遷移更容易。一旦不再需要舊的系統,你可以簡單地關閉它並回收其資源

資料庫和模式設計的傳統方法是基於這樣一種謬誤,資料必須以與查詢相同的形式寫入。如果可以將資料從針對寫入最佳化的事件日誌轉換為針對讀取最佳化的應用狀態,那麼有關規範化和非規範化的爭論就變得無關緊要了

併發控制

事件溯源和變更資料捕獲的最大缺點是,事件日誌的消費者通常是非同步的

我們之前在 “讀己之寫” 中討論了這個問題以及可能的解決方案。ch.5

一種解決方案是將事件追加到日誌時同步執行讀取檢視的更新。而將這些寫入操作合併為一個原子單元需要 事務,所以要麼將事件日誌和讀取檢視儲存在同一個儲存系統中,要麼就需要跨不同系統進行分散式事務。或者,你也可以使用在 “使用全序廣播實現線性一致的儲存” 中討論的方法。

不變性的侷限性

  1. 效能方面的原因: 在這些情況下,不可變的歷史可能增至難以接受的巨大,碎片化可能成為一個問題,壓縮與垃圾收集的表現對於運維的穩健性變得至關重要

  2. 有出於管理方面的原因需要刪除資料的情況,儘管這些資料都是不可變的。例如,隱私條例可能要求在使用者關閉帳戶後刪除他們的個人資訊,資料保護立法可能要求刪除錯誤的資訊,或者可能需要阻止敏感資訊的意外洩露。

在這種情況下,僅僅在日誌中新增另一個事件來指明先前的資料應該被視為刪除是不夠的 —— 你實際上是想改寫歷史,並假裝資料從一開始就沒有寫入。例如,Datomic 管這個特性叫 切除(excision) ,而 Fossil 版本控制系統有一個類似的概念叫 避免(shunning)

真正刪除資料是非常非常困難的,因為副本可能存在於很多地方:例如,儲存引擎,檔案系統和 SSD 通常會向一個新位置寫入,而不是原地覆蓋舊資料,而備份通常是特意做成不可變的,防止意外刪除或損壞。

串流的處理

  1. 你可以將事件中的資料寫入資料庫、快取、搜尋索引或類似的儲存系統,然後能被其他客戶端查詢。如圖 11-5 所示,這是資料庫與系統其他部分所發生的變更保持同步的好方法 —— 特別是當流消費者是寫入資料庫的唯一客戶端時。如 “批處理工作流的輸出” 中所討論的,它是寫入儲存系統的stream等價物。

  2. 你能以某種方式將事件推送給使用者,例如傳送報警郵件或推送通知,或將事件流式傳輸到可實時顯示的儀表板上。在這種情況下,人是流的最終消費者。

  3. 你可以處理一個或多個輸入流,併產生一個或多個輸出流。流可能會經過由幾個這樣的處理階段組成的流水線,最後再輸出(選項 1 或 2)。

與批次作業相比的一個關鍵區別是,stream不會結束。這種差異會帶來很多隱含的結果。正如本章開始部分所討論的,排序對無界資料集沒有意義,因此無法使用 排序合併連線(請參閱 “Reduce 側連線與分組”)。容錯機制也必須改變:對於已經運行了幾分鐘的批處理作業,可以簡單地從頭開始重啟失敗任務,但是對於已經執行數年的流作業,重啟後從頭開始跑可能並不是一個可行的選項。

流式處理的用途

長期以來,流處理一直用於監控目的,如果某個事件發生,組織希望能得到警報。例如:

  • 欺詐檢測系統需要確定信用卡的使用模式是否有意外地變化,如果信用卡可能已被盜刷,則鎖卡。

  • 交易系統需要檢查金融市場的價格變化,並根據指定的規則進行交易。

  • 製造系統需要監控工廠中機器的狀態,如果出現故障,可以快速定位問題。

  • 軍事和情報系統需要跟蹤潛在侵略者的活動,並在出現襲擊徵兆時發出警報。

複雜事件處理

image.png

複合事件處理(complex event processing, CEP) 是 20 世紀 90 年代為分析事件流而開發出的一種方法,尤其適用於需要搜尋某些事件模式的應用。與正規表達式允許你在字串中搜索特定字元模式的方式類似,CEP 可以讓你指定用於搜尋stream中事件特定模式的規則。

在這些系統中,查詢和資料之間的關係與普通資料庫恰好相反。通常情況下,資料庫會持久儲存資料,並將查詢視為一種暫態操作:當查詢進入時,資料庫搜尋與查詢匹配的資料,然後在查詢完成時丟掉查詢。CEP 引擎反轉了角色:查詢是長期儲存的,而input stream的事件不斷經過它們,以搜尋出匹配查詢的事件.

串流分析

使用stream processing的另一個領域是對stream進行分析。CEP 與streamm分析之間的邊界是模糊的,但一般來說,分析往往對找出特定事件序列並不關心,而更關注大量事件上的聚合與統計指標 —— 例如:

  • 測量某種型別事件的速率(每個時間間隔內發生的頻率)

  • 滾動計算一段時間視窗內某個值的平均值

  • 將當前的統計值與先前的時間區間的值對比(例如,檢測趨勢,當指標與上週同比異常偏高或偏低時報警)

串流分析系統有時會使用機率演算法,例如 Bloom filter(我們在 “效能最佳化” 中遇到過)來管理成員資格,HyperLogLog【72】用於基數估計以及各種百分比估計算法(請參閱 “實踐中的百分位點”)。機率演算法產出近似的結果,但比起精確演算法的優點是記憶體使用要少得多。使用近似演算法有時讓人們覺得流處理系統總是有損的和不精確的,但這是錯誤看法:流處理並沒有任何內在的近似性,而機率演算法只是一種最佳化【73】。

許多開源分散式流處理框架的設計都是針對分析設計的:例如 Apache Storm、Spark Streaming、Flink、Concord、Samza 和 Kafka Streams 【74】。託管服務包括 Google Cloud Dataflow 和 Azure Stream Analytics。

實體化視圖的維護

我們在 “資料庫與串流” 中看到,資料庫的變更串流可以用於維護衍生資料系統(如快取、搜尋索引和資料倉庫),並使其與來源資料庫保持最新。我們可以將這些示例視作維護 物化檢視(materialized view) 的一種具體場景(請參閱 “聚合:資料立方體和物化檢視”):在某個資料集上衍生出一個替代檢視以便高效查詢,並在底層資料變更時更新檢視

對串流進行搜尋

除了允許搜尋由多個事件構成模式的 CEP 外,有時也存在基於複雜標準(例如全文搜尋查詢)來搜尋單個事件的需求。

例如,媒體監測服務會訂閱來自媒體中心的新聞和公告,並搜尋任何關於公司、產品或感興趣的話題的新聞。這是透過預先構建一個搜尋查詢來完成的,然後不斷地將新聞項的流與該查詢進行匹配。在一些網站上也有類似的功能:例如,當市場上出現符合其搜尋條件的新房產時,房地產網站的使用者可以要求網站通知他們。Elasticsearch 的這種過濾器功能,是實現這種流搜尋的一種選擇

傳統的搜尋引擎首先會對documents建立索引,然後在索引上跑查詢。相比之下,搜尋一個數據流則反了過來:查詢被儲存下來,讓所有documents都歷經這個查詢,就像在 CEP 中一樣。最簡單的情況就是,你可以為每個文件測試每個查詢。但是如果你有大量查詢,這可能會變慢。為了最佳化這個過程,可以像對文件一樣,為查詢建立索引。從而縮小可能匹配的查詢集合

串流的時間問題

stream processors通常需要與時間打交道,尤其是用於分析目的時,會頻繁使用時間視窗,例如 “過去五分鐘的平均值”。“過去五分鐘” 的含義看上去似乎是清晰而明確的,但不幸的是,這個概念非常棘手。

聚合的時間間隔稱為 視窗(window)

批次處理在大多數情況下,感興趣的時間線是歷史中的一年,而不是處理中的幾分鐘。

另一方面,許多串流處理框架使用處理機器上的本地系統時鐘(處理時間,即 processing time)來確定 視窗(windowing)【79】。這種方法的優點是簡單,如果事件建立與事件處理之間的延遲可以忽略不計,那也是合理的。然而,如果存在任何顯著的處理延遲 —— 即,事件處理顯著地晚於事件實際發生的時間,這種處理方式就失效了。

事件時間與處理時間

很多原因都可能導致處理延遲:排隊,網路故障(請參閱 “不可靠的網路”),效能問題導致訊息代理 / 訊息處理器出現爭用,流消費者重啟,從故障中恢復時重新處理過去的事件(請參閱 “重播舊訊息”),或者在修復程式碼 BUG 之後。

而且,訊息延遲還可能導致無法預測訊息順序

有一個類比也許能幫助理解,“star wars” 電影:第四集於 1977 年發行,第五集於 1980 年,第六集於 1983 年,緊隨其後的是 1999 年的第一集,2002 年的第二集,和 2005 年的第三集,以及 2015 年的第七集【80】[^ii]。如果你按照按照它們上映的順序觀看電影,你處理電影的順序與它們敘事的順序就是不一致的。(集數編號就像事件timestamp,而你觀看電影的日期就是處理時間)作為人類,我們能夠應對這種不連續性,但是流處理演算法需要專門編寫,以適應這種時序與順序的問題。

fig11-7.png

知道自己何時就緒

例如,假設你將事件分組為一分鐘的視窗,以便統計每分鐘的請求數。你已經計數了一些帶有本小時內第 37 分鐘timestamp的事件,時間流逝,現在進入的主要都是本小時內第 38 和第 39 分鐘的事件。什麼時候才能宣佈你已經完成了第 37 分鐘的視窗計數,並輸出其計數器值?

在一段時間沒有看到任何新的事件之後,你可以超時並宣佈一個視窗已經就緒,但仍然可能發生這種情況:某些事件被緩衝在另一臺機器上,由於網路中斷而延遲。你需要能夠處理這種在視窗宣告完成之後到達的 滯留(straggler) 事件。大體上,你有兩種選擇【1】:

  1. 忽略這些滯留事件,因為在正常情況下它們可能只是事件中的一小部分。你可以將丟棄事件的數量作為一個監控指標,並在出現大量丟訊息的情況時報警。

  2. 釋出一個 更正(correction),一個包括滯留事件的更新視窗值。你可能還需要收回以前的輸出。

你用的是誰的時鐘

  • 普遍來說會比較相信伺服器的時鐘。

要校正不正確的裝置時鐘,一種方法是記錄三個timestamp:

  • 事件發生的時間,取決於裝置時鐘

  • 事件傳送往伺服器的時間,取決於裝置時鐘

  • 事件被伺服器接收的時間,取決於伺服器時鐘

透過從第三個timestamp中減去第二個timestamp,可以估算裝置時鐘和伺服器時鐘之間的偏移(假設網路延遲與所需的timestamp精度相比可忽略不計)。然後可以將該偏移應用於事件timestamp,從而估計事件實際發生的真實時間(假設裝置時鐘偏移在事件發生時與送往伺服器之間沒有變化)。

這並不是stream processing獨有的問題,batch processing有著完全一樣的時間推理問題。只是在串流處理環境中這個問題會更明顯,我們更容易意識到時間的流逝。

事件被伺服器接收的時間(伺服器) - 事件傳送往伺服器的時間(裝置) = 伺服器與裝置的時間差\ 事件發生的時間(裝置) + 伺服器與裝置的時間差 = 事件發生的時間(伺服器)

視窗的型別

  • 滾動視窗(Tumbling Window)

    滾動視窗有著固定的長度,每個事件都僅能屬於一個視窗。例如,假設你有一個 1 分鐘的滾動視窗,則所有時間戳在 10:03:0010:03:59 之間的事件會被分組到一個視窗中,10:04:0010:04:59 之間的事件被分組到下一個視窗,依此類推。透過將每個事件時間戳四捨五入至最近的分鐘來確定它所屬的視窗,可以實現 1 分鐘的滾動視窗。

  • 跳躍視窗(Hopping Window)

    跳躍視窗也有著固定的長度,但允許視窗重疊以提供一些平滑。例如,一個帶有 1 分鐘跳躍步長的 5 分鐘視窗將包含 10:03:0010:07:59 之間的事件,而下一個視窗將覆蓋 10:04:0010:08:59 之間的事件,等等。透過首先計算 1 分鐘的滾動視窗(tunmbling window),然後在幾個相鄰視窗上進行聚合,可以實現這種跳躍視窗。

  • 滑動視窗(Sliding Window)

    滑動視窗包含了彼此間距在特定時長內的所有事件。例如,一個 5 分鐘的滑動視窗應當覆蓋 10:03:3910:08:12 的事件,因為它們相距不超過 5 分鐘(注意滾動視窗與步長 5 分鐘的跳躍視窗可能不會把這兩個事件分組到同一個視窗中,因為它們使用固定的邊界)。透過維護一個按時間排序的事件緩衝區,並不斷從視窗中移除過期的舊事件,可以實現滑動視窗。

  • 會話視窗(Session window)

    與其他視窗型別不同,會話視窗沒有固定的持續時間,而定義為:將同一使用者出現時間相近的所有事件分組在一起,而當用戶一段時間沒有活動時(例如,如果 30 分鐘內沒有事件)視窗結束。會話切分是網站分析的常見需求(請參閱 “分組”)。

串流的Joins

我們區分了流處理中可能出現的三種連線型別:

  • Stream-stream join (window join)

    兩個input streams都由活動事件組成,join會搜尋在某時間視窗內所發生的相關事件。例如,他可以匹配同一個使用者在30分鐘內所採收過的兩個動作.如果你希望在一個串流中找到相關的事件,那麼這兩個join inputs實際上可以是同一個串流(一個self-join)

  • stream-table join (串流加豐)

    一個input streams由活動事件組成,而另一個則是資料庫的change log。Change log保證了資料庫的本地副本是最新的。對於每個活動事件,join operator將查詢資料庫,並輸出一個加豐的活動事件。

    fig10-2.png

  • table-table join (實體化視圖維護)

    兩個input streams都是資料庫changelogs。在這種情況下,一側的每一個變化都與另一側的最新狀態join。結果是一個對兩個tables之間join的實體化視圖的變更串流。

容錯

  • 批次處理框架可以很容易地容錯:如果 MapReduce 作業中的任務失敗,可以簡單地在另一臺機器上再次啟動,並且丟棄失敗任務的輸出。這種透明的重試是可能的,因為輸入檔案是不可變的,每個任務都將其輸出寫入到 HDFS 上的獨立檔案中,而輸出僅當任務成功完成後可見。

  • 在串流處理中也出現了同樣的容錯問題,但是處理起來沒有那麼直觀:等待某個任務完成之後再使其輸出可見並不是一個可行選項,因為你永遠無法處理完一個無限的串流。

微批次處理與檢查點

一個解決方案是將stream分解成小塊,並像微型批處理一樣處理每個塊。這種方法被稱為 微批次處理(microbatching),Spark Streaming也採取了這種方案。批次處理的大小通常約為 1 秒,這是對效能妥協的結果:較小的批次會導致更大的排程與協調開銷,而較大的批次意味著流處理器結果可見之前的延遲要更長。

Apache Flink 則使用不同的方法,它會定期生成狀態的滾動存檔點並將其寫入持久儲存【92,93】。如果流運算元崩潰,它可以從最近的存檔點重啟,並丟棄從最近檢查點到崩潰之間的所有輸出。存檔點會由訊息流中的 壁障(barrier) 觸發,類似於微批次之間的邊界,但不會強制一個特定的視窗大小。

在串流處理框架的範圍內,微批次與存檔點方法提供了與批處理一樣的 恰好一次語義。但是,只要輸出離開流處理器(例如,寫入資料庫,向外部訊息代理傳送訊息,或傳送電子郵件),框架就無法拋棄失敗批次的輸出了。在這種情況下,重啟失敗任務會導致外部副作用發生兩次,只有微批次或存檔點不足以阻止這一問題。

原子提交再現

為了在出現故障時表現出恰好處理一次的樣子,我們需要確保事件處理的所有輸出和副作用 當且僅當 處理成功時才會生效。這些影響包括傳送給下游運算元或外部訊息傳遞系統(包括電子郵件或推送通知)的任何訊息,任何資料庫寫入,對運算元狀態的任何變更,以及對輸入訊息的任何確認(包括在基於日誌的訊息代理中將消費者偏移量前移)。

這些事情要麼都原子地發生,要麼都不發生,但是它們不應當失去同步。如果這種方法聽起來很熟悉,那是因為我們在分散式事務和兩階段提交的上下文中討論過它

然而在限制更為嚴苛的環境中,也是有可能高效實現這種原子提交機制的。Google Cloud Dataflow和 VoltDB 中使用了這種方法,Apache Kafka 有計劃加入類似的功能

冪等性

  • 我們的目標是丟棄任何失敗任務的不完整輸出

一個冪等操作是多次重複執行與單次執行效果相同的操作。例如,將key-value store中某個key對應的value設為某個常數(再次寫入該值,只是用同樣的值替代),而遞增一個計數器不是冪等的(再次執行遞增意味著該value遞增兩次)。

即使一個操作本身不是冪等的,通常也可以透過一些額外的中繼資料使其變得冪等:

例如,在使用來自 Kafka 的訊息時,每條訊息都有一個持久的、單調遞增的偏移量。將值寫入外部資料庫時可以將這個偏移量帶上,這樣你就可以判斷一條更新是不是已經執行過了,因而避免重複執行。

Storm 的 Trident 基於類似的想法來處理狀態。依賴冪等性意味著隱含了一些假設:重啟一個失敗的任務必須以相同的順序重播相同的訊息(基於日誌的訊息代理能做這些事),處理必須是確定性的,沒有其他節點能同時更新相同的值

在失敗後重建狀態

任何需要狀態的stream process —— 例如,任何視窗聚合(例如計數器,平均值和直方圖)以及任何用於連線的表和索引,都必須確保在失敗之後能恢復其狀態。

  • 一種選擇是將狀態儲存在遠端資料儲存中,並進行複製,然而正如在 “stream-table join” 中所述,每個訊息都要查詢遠端資料庫可能會很慢。
  • 另一種方法是在流處理器本地儲存狀態,並定期複製。然後當流處理器從故障中恢復時,新任務可以讀取狀態副本,恢復處理而不丟失資料。

    例如,Flink 定期捕獲運算元狀態的快照,並將它們寫入 HDFS 等持久儲存中。Samza 和 Kafka Streams 透過將狀態變更傳送到具有日誌壓縮功能的專用 Kafka 主題來複制狀態變更,這與變更資料捕獲類似。VoltDB 透過在多個節點上對每個輸入訊息進行冗餘處理來複制狀態

在某些情況下,甚至可能不需要複製狀態,因為可以從input streams重新建構出狀態.

  • 倘若狀態是由相當短的視窗所聚合而成,那麼簡單的重播與該視窗對應的輸入事件,在速度上應該也不會太慢
  • 如果狀態是在本機的資料庫副本,由CDC機制所維護的話,那麼就從日誌壓縮的變更串流(log-compacted change stream)來重新建構出資料庫

然而,所有這些權衡取決於底層基礎架構的效能特徵:在某些系統中,網路延遲可能低於磁碟訪問延遲,網路頻寬也可能與磁碟頻寬相當。沒有針對所有情況的普適理想權衡,隨著儲存和網路技術的發展,本地狀態與遠端狀態的優點也可能會互換。