flowchart TB
subgraph msk["MSK"]
    data[("Kafka Cluster A")]
    event[("Kafka Cluster B")]
    exchange[("Kafka Cluster C")]
    etc[("...")]
end

subgraph eks["EKS"]
    direction TB
    ksql["ksql-cluster"]
    ui["kafbat"]
    ui -.-> |used| ksql
end

client --> |create TABLE/STREAM w/ SQL| ui
data --> |consume| ksql --> |publish| data

재료

집계 Table 생성에 필요한 Stream 생성하기

ksql 에서는 table/stream 개념이 존재하는데, bounded(state)/unbounded 으로 이해하면 좋다. 컨플루언트 문서를 살펴보면 좋을 것 같아 링크로 남긴다. 데비지움 메세지 포맷을 파싱한 trade_raw Stream 을 kafbat 에서 생성해보자.

CREATE STREAM trade_raw (
  schema STRUCT<
    type STRING,
    fields ARRAY<STRUCT<
      type STRING,
      optional BOOLEAN,
      field STRING,
      name STRING,
      version INTEGER,
      default STRING,
      parameters STRUCT<
        allowed STRING
      >
    >>,
    optional BOOLEAN,
    name STRING,
    version INTEGER
  >,
  payload STRUCT<
    before STRUCT<
      id BIGINT,
      ...
    >,
    after STRUCT<
      id BIGINT,
      ...
    >,
    source STRUCT<
      version STRING,
      `connector` STRING,
      name STRING,
      ts_ms BIGINT,
      snapshot STRING,
      db STRING,
      sequence STRING,
      `table` STRING,
      server_id BIGINT,
      gtid STRING,
      file STRING,
      pos BIGINT,
      row INTEGER,
      thread BIGINT,
      `query` STRING
    >,
    op STRING,
    ts_ms BIGINT,
    transaction STRUCT<
      id STRING,
      total_order BIGINT,
      data_collection_order BIGINT
    >
  >
) WITH (
  KAFKA_TOPIC='{your_source_topic}',
  VALUE_FORMAT='JSON'
);

이렇게 소스토픽을 바라보는 스트림 혹은 테이블을 생성하면 바로 토픽이 생성되겠지? 생각했지만 아직 생성되지 않았다. (끝 부분에 확인할 수 있는데 소스와 직접 연결된 리소스를 이용하면 그때 모두 생성한다.)


검색되지 않는 Stream

trade_raw 전처리한 Stream 생성하기

struct 로 뭉쳐있는 각 값들을 destrcut 하고 timestamp 형변환된 스트림을 생성해보자.

CREATE STREAM trade_history WITH (
  TIMESTAMP='timestamp',
  TIMESTAMP_FORMAT='yyyy-MM-dd''T''HH:mm:ss''Z'''
) AS
SELECT
  payload->after->id AS id,
  ...
  payload->after->timestamp AS timestamp
FROM trade_raw
EMIT CHANGES;

-> 와 같은 문법을 사용한다. 독특하다. ksqldb 탭에 가면 스트림이 생성된 것을 확인할 수 있다.

리파티셔닝한 Stream 생성하기

Windowing 기능은 GROUP BY 와 함께 사용해야 한다. GROUP BY 는 메세지 키로 집계한다. 현재 생성된 스트림에는 키가 없기 때문에, 더미 키를 가지도록 스트림을 생성한다.

CREATE STREAM trade_repartitioned AS
SELECT *
FROM trade_history
PARTITION BY 'ALL';

리파티셔닝한 스트림 생성 시 이전에 정의한 Stream 들이 토픽으로 생성된 것을 확인할 수 있다.

집계 Table 생성하기

window 기능과 함께 분 단위 집계결과를 Table 로 생성한다.

CREATE TABLE trades_per_minute AS
SELECT
  'ALL' AS group_key,
  TIMESTAMPTOSTRING(WINDOWSTART, 'yyyy-MM-dd HH:mm:ss') AS window_start,
  COUNT(*) AS trade_count
FROM trade_repartitioned
WINDOW TUMBLING (SIZE 1 MINUTE, GRACE PERIOD 10 SECONDS)
GROUP BY 'ALL'
EMIT FINAL;
EMIT FINAL & GRADE PERIOD

EMIT CHANGES 와 EMIT FINAL 이 존재한다. EMIT FINAL 인 경우
윈도우가 닫힐 때만 최종 결과를 발행한다. EMIT CHANGES 는 윈도우가 닫히든 말든 모든 변경사항을 확인하는 경우 사용하기 때문에 이 예제에서는 FINAL 을 선택했다.
https://docs.confluent.io/platform/current/ksqldb/developer-guide/ksqldb-reference/select-push-query.html#emit

윈도우는 다음 메세지가 도착할 때 닫히므로 윈도우 끝 이후 메세지가 늦게 도착할 수 있음을 감안하여 GRADE PERIOD 옵션을 사용했다.
https://docs.confluent.io/platform/current/ksqldb/developer-guide/ksqldb-reference/quick-reference.html#grace-period

운영 시 고려해야할 혹은 더 봐야할 것들

(추가) 카프카 클러스터 옵션 변경

References