문제정의
-
Druid는 대규모 데이터의 Real-Time 분석을 위한 open source 이다. (OLAP 목적)
- Column-oriented Storage, distributed, shared-nothing architecture, indexing structure의 특징을 가진다.
- 대규모의 transactional event (log data)를 탐지하고, ingesting 할 수 있도록 설계되었다.
-
OLAP workflow는 timeseries 데이터 형태를 가진다.
- timeseries 데이터는 Timestamp, dimension columns(string), metrics(numeric)으로 이루어져 있다.
- 이러한 데이터를 바탕으로 drill-down, aggregate를 계산한다.
- 예) wiki 데이터
- San Francisco 내 남자들로부터 Justin Bieber page가 얼마나 edit가 발생했는지?
- 한달 단위로 Calgary 내 page를 추가한 character 평균 수는?
- 예) wiki 데이터
-
즉, Druid를 통해 대규모의 transaction event를 빠르게 수집, interactive query를 수행하고자 함.
구조
- 크게 Real-time Node, Historical Node, Coordinator Node, Broker Node 4개의 Node로 이루어져 있다.
Real-Time Node
- event streams를 ingest하고 query 할 수 있도록 수행하는 노드
- 해당 노드를 통해 index 된 event는 즉시 query 할 수 있다.
- in-memory index buffer를 유지하며, 오로지 small time range의 streams의 query를 위해 사용한다.
- heap overflow 문제를 피하기 위해 주기적으로 disk에 persist 해야한다.
- persist 할 때, column oriented storage format 형태로 변환한다.
- 이후 persist 된 데이터에 query가 필요하면, off-heap memory (GC의 관리대상이 아닌 메모리 영역)에 load 한다.
- 주기적으로 persist 된 index를 merge 한다.
- merge 된 index는 immutable block (
segment
)으로 만든다. - 이후, handoff 단계에서 최종 HDFS, S3에 저장한다. (deep storage)
- merge 된 index는 immutable block (
- 즉, Real-TIme Node는
ingest, persist, merge, handoff 단계
를 수행한다.-
- 데이터를 ingest 해 memory index를 만든다.
-
- 주기적으로 (ex. 10분) memory index를 disk에 persist 한다.
- window 기간 이후 persist된 memory index를 merge 한다. 이는
segment
라 한다. - 만들어진 segment는 deep storage에 저장해 historical node가 사용할 수 있다.
-
- Zookeeper 와 연동해 본인의 online state나 data를 알려준다.
- 해당 노드를 통해 index 된 event는 즉시 query 할 수 있다.
- Real-Time Node는 Kafka와 같은 Message Bus와 연동 후 event를 읽는다.
Historical Node
- immutable block (segment)를 serve, load 하는 노드
- Real-Time에서 저장한 immutable block (segment)을 통해 배치 처리 하기 위한 용도이다.
- deep storage에서 segment를 읽는데 아래 단계로 수행한다.
-
- local cache에 찾고자 하는 segment가 있는지 확인한다.
- 없다면, deep storage에서 다운로드 받고 아니면, 바로 query에 사용한다.
-
- 오로지 immutable block을 다루기 때문에 read에 대한 consistency를 보장한다.
- deep storage에서 segment를 읽는데 아래 단계로 수행한다.
- Real-Time에서 저장한 immutable block (segment)을 통해 배치 처리 하기 위한 용도이다.
- Historical Node는 데이터의 종류에 따라 여러 클러스터 (Tiers)로 구성할 수 있다.
- Hot 데이터에 대해서는 스펙이 높은 Tiers로 구성한다.
- Cold 데이터에 대해서는 스펙이 상대적으로 낮은 Tiers로 구성한다.
- 이 또한, Real-Time Node와 마찬가지로 Zookeeper 와 연동해 본인의 online state나 data를 알려준다.
- Zookeeper가 장애가 발생하면, 신규 데이터에 대해서는 Serving할 수 없지만 기존 데이터는 HTTP를 통해 Serving 할 수 있다.
- 즉, Zookeeper 장애가 발생하더라도 Availability는 보장된다.
Broker Node
- Client에 받은 Query를 Historical Node, Real-Time Node에게 전달하는 Router 역할을 수행하는 노드
- Zookeeper를 통해 metadata를 읽으며, segment가 query가 가능한지, 어느 historical node에 저장되어 있는지 알고 있어야 한다.
- 그래야만 정확하게 Historical Node들이나 Real-Time Node에게 query를 요청할 수 있다.
- 또한, 받은 결과를 merge 후 client에게 전달해준다.
- local heap memory, Memcached같은 external Cache 기술을 통해 Cache 한다.
- Historical Node의 결과는 Cache하며, Real-Time Node의 결과는 Cache하지 않는다. (Real-Time Node의 결과는 지속적으로 변하기 때문)
- Zookeeper를 통해 metadata를 읽으며, segment가 query가 가능한지, 어느 historical node에 저장되어 있는지 알고 있어야 한다.
Coordinator Node
- historical node의 distribution, 데이터 관리를 수행하는 노드
- historical node에게 new data load, outdated data drop, load balance를 위해 데이터 이동 등을 요청한다.
- Coordinator Node는 MySQL와 연동해 추가적인 operational parameter, configuration을 관리한다.
- MySQL은 historical node에 의해 serve 된 모든 segment의 리스트를 가지는 Table을 가진다.
- 해당 Table은 Real-Time Node와 같이 segment를 생성하는 service로 부터 update 된다.
- 또한 Rule Table을 가지며 이는, 얼마나 segment가 만들어지고, 삭제되고, 복제되는지를 컨트롤 한다.
- MySQL은 historical node에 의해 serve 된 모든 segment의 리스트를 가지는 Table을 가진다.
- Rules
- historical segment가 load, drop에 관한 모든 것을 관리할 수 있다.
- 여러 tiers에 segment가 할당이 되었는지, 기간을 정해 언제 segment를 drop할지 등등에 관한 모든 규칙이 저장되어 있다.
- 사용자는 Rule을 통해 한달 내 segment를 hot cluster, 일년 내 segment를 cold cluster, 그 이상의 segment를 drop 할 수 있다.
- Rule에 관한 모든 정보는 MySQL에 Rule Table에 저장한다.
- historical segment가 load, drop에 관한 모든 것을 관리할 수 있다.
- Load Balancing
- Historical Node는 자원의 한계 때문에, 반드시 여러 Historical Node에 segment를 balancing 해야한다.
- 최적의 distribution을 위해 Query Pattern이나 speed에 대한 몇가지 정보를 알 필요가 있다.
- Replication
- 같은 segment를 여러 historical node에 복제함으로써 historical node가 죽더라도 query를 수행할 수 있다.
- 또한, 이를 통해 Software Update를 수행할 수 있다.
STORAGE FORMAT
data table (data source, 위에서 봤던 Wiki Table
)는 timestamp를 가진 event의 collection이고, 이를 여러 segment로 partition 한다.
-
segment는 특정 기간의 row data의 collection 이며, 이는 druid에서 사용하는 storage unit 이다.
-
segment partition은 interval을 기반으로 수행하며, 이 외에도 다른 column을 통해 partition을 수행해 segment size를 조정할 수 있다.
- interval은 한시간, 하루, 한달 등을 나타내며, time granularity를 통해 데이터 volumn, time range를 지정할 수 있다.
-
segment를 구분하기 위한 unique 값은 time interval, version이다.
- version은 segment의 최신성을 나타내며, read operation을 수행했을때 가장 최신의 segment를 읽는다.
- 즉, segment는 version 별로 여러 관리가 된다.
-
segment는 column orientation 기반으로 저장된다.
- Row orientation 보다 효율적으로 CPU를 사용하고 필요한 데이터만 load, scan 할 수 있다.
- 또한, String을 그대로 저장하는 것은 비효율적이기 때문에 이는 Dictionary Encoding을 수행한다.
- Justin Bieber -> 0
- Ke$ha -> 1
- 이를 통해 page column은 아래 list로 표현할 수 있다.
- [0, 0, 1, 1]
- 기본 [0, 0, 1, 1] 로 저장하고, 0과 1이 각 무엇인지 저장한다면 상당한 사이즈를 줄일 수 있다.
Filter
-
Filter
- 원하는 dimension만 boolean expression을 통해 filter할 수 있다.
- 데이터를 inverted bitmap index로 만들 수 있는데, 아래와 같이 표현할 수 있다. row 형태의 데이터를 column 형태로 변형한다.
- bieberfever.com -> rows [0,1,2] -> 111000
- ultratrimfast.com -> rows[3,4,5] -> 000111
- 만약 bieberfever.com, ultratrimfast.com를 모두 query하고 싶다면, 두 inverted index를 OR 하면 된다.
- 또한, metric도 위와 같이 표현할 수 있으며, 필요한 metric 만 query에 사용할 수 있다.
- 이는 특정 Column 값이 어느 row에 있는지 바로 찾을 수 있다.
-
예) San Francisco 내 남자들로부터 Justin Bieber page가 얼마나 edit가 발생했는지?
- 간단하게 Boolean expression을 통해 filter할 수 있다.
- page column을 아래와 같이 boolean expression을 통해 표현할 수 있다.
- justin bieber -> 1, 1, 0, 0
- ke$ha -> 0, 0, 1, 1
- 이후 justin bieber, ke$ha를 filter하고 싶다면 두 binary array (bitmap set)를 OR 하면, 원하는 row만 추출할 수 있다.
- 위 방법은 Search Engine에서도 공통적으로 사용한다.
-
허나 위 방법 Druid는 Concise compression 알고리즘을 통해 Size를 기존 bitmap set 보다 대폭 줄였음.
- 알고리즘이 정확히 뭔지는 모르겠지만.. bitmap set 보다 카디널리티 수에 따라 사이즈가 훨씬 적다.
추가 내용
- Segment 개념 및 특징
- DataTable (=Data Source) 의 partition 단위를 뜻함.
- 여기서, DataTable은 timestamped된 event들의 집합이다.
- timestamp column을 통해
data distribution 정책, date retention 정책, first level query pruning
을 할 수 있다. - 약 5-10 million row로 이루어져 있다.
- partition은 time interval 단위로 이루어진다. time interval은 보통 하루, 1시간와 같은 단위이다.
- Segment는 identifier에 의해 여러 version으로 구분된다. (뒤늦게 들어오는 event 들은 새로운 버전의 segment를 구성할것으로 보인다.)
- segment를 읽을 때 항상 최신 version을 읽는다.
- Druid Node는 기본적으로 1개의 thread로 1개의 segment를 읽는다.
- 즉, 이용 가능한 core 수가 많을 수록 많은 segment를 동시에 읽을 수 있다는 의미이다.
- Segment는 immutable한 특성을 가지기 때문에, Read Consistency를 보장한다.
- 그래서 같은 시간 때에 동시에 여러 쓰레드들이 같은 segment를 읽을 수 있다.
- 이는 높은 read throughput을 가능하게 한다.
- Segment는 여러 Query에 사용되어질 수 있고, 이는 보장이 되어야 한다.
- 거대한 Query가 있더라도, 다른 Query가 사용할 수 있어야 한다.
- Segment는 Timestamp (long), dimension (string), metric (int, float, long, double)으로 이루어져 있다.
- 각 Type은 서로다른 압축 방법이 사용될 수 있다.
- metric는 LZ4 압축, dimension은 Dictionary Encode와 비슷한 PowerDrill
- DataTable (=Data Source) 의 partition 단위를 뜻함.
Comments