Druid - A Real-time Analytical Data Store (SIGMOD 2014)

문제정의

  • Druid는 대규모 데이터의 Real-Time 분석을 위한 open source 이다. (OLAP 목적)

    • Column-oriented Storage, distributed, shared-nothing architecture, indexing structure의 특징을 가진다.
    • 대규모의 transactional event (log data)를 탐지하고, ingesting 할 수 있도록 설계되었다.
  • OLAP workflow는 timeseries 데이터 형태를 가진다.

    • timeseries 데이터는 Timestamp, dimension columns(string), metrics(numeric)으로 이루어져 있다.

    1

    • 이러한 데이터를 바탕으로 drill-down, aggregate를 계산한다.
      • 예) wiki 데이터
        • San Francisco 내 남자들로부터 Justin Bieber page가 얼마나 edit가 발생했는지?
        • 한달 단위로 Calgary 내 page를 추가한 character 평균 수는?
  • 즉, Druid를 통해 대규모의 transaction event를 빠르게 수집, interactive query를 수행하고자 함.


구조

2

  • 크게 Real-time Node, Historical Node, Coordinator Node, Broker Node 4개의 Node로 이루어져 있다.

Real-Time Node

  • event streams를 ingest하고 query 할 수 있도록 수행하는 노드
    • 해당 노드를 통해 index 된 event는 즉시 query 할 수 있다.
      • in-memory index buffer를 유지하며, 오로지 small time range의 streams의 query를 위해 사용한다.
      • heap overflow 문제를 피하기 위해 주기적으로 disk에 persist 해야한다.
        • persist 할 때, column oriented storage format 형태로 변환한다.
        • 이후 persist 된 데이터에 query가 필요하면, off-heap memory (GC의 관리대상이 아닌 메모리 영역)에 load 한다.
    • 주기적으로 persist 된 index를 merge 한다.
      • merge 된 index는 immutable block (segment)으로 만든다.
      • 이후, handoff 단계에서 최종 HDFS, S3에 저장한다. (deep storage)
    • 즉, Real-TIme Node는 ingest, persist, merge, handoff 단계를 수행한다.
        1. 데이터를 ingest 해 memory index를 만든다.
        1. 주기적으로 (ex. 10분) memory index를 disk에 persist 한다.
        2. window 기간 이후 persist된 memory index를 merge 한다. 이는 segment 라 한다.
        3. 만들어진 segment는 deep storage에 저장해 historical node가 사용할 수 있다.
    • Zookeeper 와 연동해 본인의 online state나 data를 알려준다.
  • Real-Time Node는 Kafka와 같은 Message Bus와 연동 후 event를 읽는다.

Historical Node

  • immutable block (segment)를 serve, load 하는 노드
    • Real-Time에서 저장한 immutable block (segment)을 통해 배치 처리 하기 위한 용도이다.
      • deep storage에서 segment를 읽는데 아래 단계로 수행한다.
          1. local cache에 찾고자 하는 segment가 있는지 확인한다.
          2. 없다면, deep storage에서 다운로드 받고 아니면, 바로 query에 사용한다.
      • 오로지 immutable block을 다루기 때문에 read에 대한 consistency를 보장한다.
  • Historical Node는 데이터의 종류에 따라 여러 클러스터 (Tiers)로 구성할 수 있다.
    • Hot 데이터에 대해서는 스펙이 높은 Tiers로 구성한다.
    • Cold 데이터에 대해서는 스펙이 상대적으로 낮은 Tiers로 구성한다.
  • 이 또한, Real-Time Node와 마찬가지로 Zookeeper 와 연동해 본인의 online state나 data를 알려준다.
    • Zookeeper가 장애가 발생하면, 신규 데이터에 대해서는 Serving할 수 없지만 기존 데이터는 HTTP를 통해 Serving 할 수 있다.
    • 즉, Zookeeper 장애가 발생하더라도 Availability는 보장된다.

Broker Node

  • Client에 받은 Query를 Historical Node, Real-Time Node에게 전달하는 Router 역할을 수행하는 노드
    • Zookeeper를 통해 metadata를 읽으며, segment가 query가 가능한지, 어느 historical node에 저장되어 있는지 알고 있어야 한다.
      • 그래야만 정확하게 Historical Node들이나 Real-Time Node에게 query를 요청할 수 있다.
      • 또한, 받은 결과를 merge 후 client에게 전달해준다.
    • local heap memory, Memcached같은 external Cache 기술을 통해 Cache 한다.
      • Historical Node의 결과는 Cache하며, Real-Time Node의 결과는 Cache하지 않는다. (Real-Time Node의 결과는 지속적으로 변하기 때문)

Coordinator Node

  • historical node의 distribution, 데이터 관리를 수행하는 노드
    • historical node에게 new data load, outdated data drop, load balance를 위해 데이터 이동 등을 요청한다.
    • Coordinator Node는 MySQL와 연동해 추가적인 operational parameter, configuration을 관리한다.
      • MySQL은 historical node에 의해 serve 된 모든 segment의 리스트를 가지는 Table을 가진다.
        • 해당 Table은 Real-Time Node와 같이 segment를 생성하는 service로 부터 update 된다.
      • 또한 Rule Table을 가지며 이는, 얼마나 segment가 만들어지고, 삭제되고, 복제되는지를 컨트롤 한다.
    • Rules
      • historical segment가 load, drop에 관한 모든 것을 관리할 수 있다.
        • 여러 tiers에 segment가 할당이 되었는지, 기간을 정해 언제 segment를 drop할지 등등에 관한 모든 규칙이 저장되어 있다.
        • 사용자는 Rule을 통해 한달 내 segment를 hot cluster, 일년 내 segment를 cold cluster, 그 이상의 segment를 drop 할 수 있다.
      • Rule에 관한 모든 정보는 MySQL에 Rule Table에 저장한다.
    • Load Balancing
      • Historical Node는 자원의 한계 때문에, 반드시 여러 Historical Node에 segment를 balancing 해야한다.
      • 최적의 distribution을 위해 Query Pattern이나 speed에 대한 몇가지 정보를 알 필요가 있다.
    • Replication
      • 같은 segment를 여러 historical node에 복제함으로써 historical node가 죽더라도 query를 수행할 수 있다.
      • 또한, 이를 통해 Software Update를 수행할 수 있다.

STORAGE FORMAT

data table (data source, 위에서 봤던 Wiki Table)는 timestamp를 가진 event의 collection이고, 이를 여러 segment로 partition 한다.

  • segment는 특정 기간의 row data의 collection 이며, 이는 druid에서 사용하는 storage unit 이다.

    • segment partition은 interval을 기반으로 수행하며, 이 외에도 다른 column을 통해 partition을 수행해 segment size를 조정할 수 있다.

      • interval은 한시간, 하루, 한달 등을 나타내며, time granularity를 통해 데이터 volumn, time range를 지정할 수 있다.
    • segment를 구분하기 위한 unique 값은 time interval, version이다.

      • version은 segment의 최신성을 나타내며, read operation을 수행했을때 가장 최신의 segment를 읽는다.
      • 즉, segment는 version 별로 여러 관리가 된다.

segment는 column orientation 기반으로 저장된다.

  • Row orientation 보다 효율적으로 CPU를 사용하고 필요한 데이터만 load, scan 할 수 있다.
  • 또한, String을 그대로 저장하는 것은 비효율적이기 때문에 이는 Dictionary Encoding을 수행한다.
    • Justin Bieber -> 0
    • Ke$ha -> 1
  • 이를 통해 page column은 아래 list로 표현할 수 있다.
    • [0, 0, 1, 1]
    • 기본 [0, 0, 1, 1] 로 저장하고, 0과 1이 각 무엇인지 저장한다면 상당한 사이즈를 줄일 수 있다.

Filter

  • Filter

    • 원하는 dimension만 boolean expression을 통해 filter할 수 있다.
    • 데이터를 inverted bitmap index로 만들 수 있는데, 아래와 같이 표현할 수 있다. row 형태의 데이터를 column 형태로 변형한다.
      • bieberfever.com -> rows [0,1,2] -> 111000
      • ultratrimfast.com -> rows[3,4,5] -> 000111
    • 만약 bieberfever.com, ultratrimfast.com를 모두 query하고 싶다면, 두 inverted index를 OR 하면 된다.
      • 또한, metric도 위와 같이 표현할 수 있으며, 필요한 metric 만 query에 사용할 수 있다.
    • 이는 특정 Column 값이 어느 row에 있는지 바로 찾을 수 있다.
  • 예) San Francisco 내 남자들로부터 Justin Bieber page가 얼마나 edit가 발생했는지?

    • 간단하게 Boolean expression을 통해 filter할 수 있다.
    • page column을 아래와 같이 boolean expression을 통해 표현할 수 있다.
      • justin bieber -> 1, 1, 0, 0
      • ke$ha -> 0, 0, 1, 1
    • 이후 justin bieber, ke$ha를 filter하고 싶다면 두 binary array (bitmap set)를 OR 하면, 원하는 row만 추출할 수 있다.
    • 위 방법은 Search Engine에서도 공통적으로 사용한다.
  • 허나 위 방법 Druid는 Concise compression 알고리즘을 통해 Size를 기존 bitmap set 보다 대폭 줄였음.

    • 알고리즘이 정확히 뭔지는 모르겠지만.. bitmap set 보다 카디널리티 수에 따라 사이즈가 훨씬 적다.

    1

추가 내용

  • Segment 개념 및 특징
    • DataTable (=Data Source) 의 partition 단위를 뜻함.
      • 여기서, DataTable은 timestamped된 event들의 집합이다.
      • timestamp column을 통해 data distribution 정책, date retention 정책, first level query pruning을 할 수 있다.
      • 약 5-10 million row로 이루어져 있다.
      • partition은 time interval 단위로 이루어진다. time interval은 보통 하루, 1시간와 같은 단위이다.
    • Segment는 identifier에 의해 여러 version으로 구분된다. (뒤늦게 들어오는 event 들은 새로운 버전의 segment를 구성할것으로 보인다.)
      • segment를 읽을 때 항상 최신 version을 읽는다.
    • Druid Node는 기본적으로 1개의 thread로 1개의 segment를 읽는다.
      • 즉, 이용 가능한 core 수가 많을 수록 많은 segment를 동시에 읽을 수 있다는 의미이다.
    • Segment는 immutable한 특성을 가지기 때문에, Read Consistency를 보장한다.
      • 그래서 같은 시간 때에 동시에 여러 쓰레드들이 같은 segment를 읽을 수 있다.
      • 이는 높은 read throughput을 가능하게 한다.
    • Segment는 여러 Query에 사용되어질 수 있고, 이는 보장이 되어야 한다.
      • 거대한 Query가 있더라도, 다른 Query가 사용할 수 있어야 한다.
    • Segment는 Timestamp (long), dimension (string), metric (int, float, long, double)으로 이루어져 있다.
      • 각 Type은 서로다른 압축 방법이 사용될 수 있다.
      • metric는 LZ4 압축, dimension은 Dictionary Encode와 비슷한 PowerDrill

Comments