Skip to Content
You're looking at the old Cube documentation. Visit the new docs →

ksqlDB

ksqlDB  is a purpose-built database for stream processing applications, ingesting data from Apache Kafka .

Available on the Enterprise Premier plan . Contact us  for details.

See how you can use ksqlDB and Cube Cloud to power real-time analytics in Power BI:

In this video, the SQL API is used to connect to Power BI. Currently, it’s recommended to use the DAX API.

Prerequisites

  • Hostname for the ksqlDB server
  • Username and password (or an API key) to connect to ksqlDB server

Confluent Cloud

If you are using Confluent Cloud , you need to generate an API key and use the API key name as your username and the API key secret as your password.

You can generate an API key by installing confluent-cli and running the following commands in the command line:

brew install --cask confluent-cli confluent login confluent environment use <YOUR-ENVIRONMENT-ID> confluent ksql cluster list confluent api-key create --resource <YOUR-KSQL-CLUSTER-ID>

Setup

Manual

Add the following to a .env file in your Cube project:

CUBEJS_DB_TYPE=ksql CUBEJS_DB_URL=https://xxxxxx-xxxxx.us-west4.gcp.confluent.cloud:443 CUBEJS_DB_USER=username CUBEJS_DB_PASS=password

Environment Variables

Environment VariableDescriptionPossible ValuesRequired
CUBEJS_DB_URLThe host URL for ksqlDB with portA valid database host URL
CUBEJS_DB_USERThe username used to connect to the ksqlDB. API key for Confluent Cloud.A valid database username
CUBEJS_DB_PASSThe password used to connect to the ksqlDB. API secret for Confluent Cloud.A valid database password
CUBEJS_DB_KAFKA_HOSTKafka broker host(s) for Kafka streams mode. Multiple brokers can be comma-separated.A valid Kafka broker URL
CUBEJS_DB_KAFKA_USERUsername for Kafka broker authentication (SASL PLAIN)A valid Kafka username
CUBEJS_DB_KAFKA_PASSPassword for Kafka broker authentication (SASL PLAIN)A valid Kafka password
CUBEJS_DB_KAFKA_USE_SSLIf true, enables SASL_SSL for the Kafka connectiontrue, false
CUBEJS_CONCURRENCYThe number of concurrent queries to the data sourceA valid number

Pre-Aggregations Support

ksqlDB supports only streaming pre-aggregations.

Kafka streams mode

By default, Cube connects to ksqlDB via its REST API. ksqlDB uses its REST API both for metadata (discovering tables and streams) and for streaming data into Cube Store during pre-aggregation builds.

In this default mode, Cube may create tables and streams in ksqlDB as part of the pre-aggregation build process (e.g., CREATE TABLE ... AS SELECT statements for non-read-only pre-aggregations).

When Kafka streams mode is enabled, Cube reads data directly from the underlying Kafka topics instead of going through the ksqlDB REST API for data streaming. ksqlDB is still used for metadata operations such as discovering tables, streams, and their schemas, but Cube Store subscribes to the backing Kafka topic directly.

In this mode, Cube does not create any tables or streams in ksqlDB. All pre-aggregations use the read-only refresh path: Cube discovers the existing ksqlDB objects and their backing Kafka topics, then streams data directly from Kafka into Cube Store.

When to use Kafka streams mode

Kafka streams mode is useful when:

  • You want to prevent Cube from creating any objects in ksqlDB
  • You need higher throughput for data ingestion by reading Kafka directly
  • Your ksqlDB environment has restricted permissions that don’t allow creating tables or streams
  • You prefer Cube Store to consume from Kafka topics without an intermediary

Enabling Kafka streams mode

Set the CUBEJS_DB_KAFKA_HOST environment variable to the address of your Kafka broker(s). This activates Kafka streams mode automatically:

CUBEJS_DB_TYPE=ksql CUBEJS_DB_URL=https://xxxxxx-xxxxx.us-west4.gcp.confluent.cloud:443 CUBEJS_DB_USER=ksql_username CUBEJS_DB_PASS=ksql_password CUBEJS_DB_KAFKA_HOST=pkc-xxxxx.us-west4.gcp.confluent.cloud:9092 CUBEJS_DB_KAFKA_USER=kafka_api_key CUBEJS_DB_KAFKA_PASS=kafka_api_secret CUBEJS_DB_KAFKA_USE_SSL=true

Multiple Kafka brokers can be specified as a comma-separated list:

CUBEJS_DB_KAFKA_HOST=broker1:9092,broker2:9092,broker3:9092

When using Confluent Cloud , the Kafka credentials are separate from the ksqlDB credentials. Generate an API key for the Kafka cluster (not the ksqlDB cluster) and use it as CUBEJS_DB_KAFKA_USER and CUBEJS_DB_KAFKA_PASS.

How it works

With Kafka streams mode enabled:

  1. Cube uses the ksqlDB REST API to discover available tables and streams and to retrieve their schemas via DESCRIBE.
  2. For each table or stream, Cube resolves the backing Kafka topic name from the ksqlDB metadata.
  3. Instead of streaming data through ksqlDB, Cube Store connects directly to the Kafka broker(s) and consumes from the resolved topic.
  4. Pre-aggregation builds use the read-only refresh strategy. Cube does not issue any CREATE TABLE or CREATE STREAM statements to ksqlDB.

Data modeling

ksqlDB is typically used as an additional data source alongside a primary data warehouse. To use Kafka streams mode, configure ksqlDB as a named data source using decorated environment variables and point your cubes to it with the data_source property.

First, declare the data sources and configure the ksqlDB connection with Kafka credentials:

CUBEJS_DATASOURCES=default,ksql CUBEJS_DB_TYPE=postgres CUBEJS_DB_HOST=my.postgres.host CUBEJS_DB_NAME=my_database CUBEJS_DB_USER=postgres_user CUBEJS_DB_PASS=postgres_password CUBEJS_DS_KSQL_DB_TYPE=ksql CUBEJS_DS_KSQL_DB_URL=https://xxxxxx-xxxxx.us-west4.gcp.confluent.cloud:443 CUBEJS_DS_KSQL_DB_USER=ksql_api_key CUBEJS_DS_KSQL_DB_PASS=ksql_api_secret CUBEJS_DS_KSQL_DB_KAFKA_HOST=pkc-xxxxx.us-west4.gcp.confluent.cloud:9092 CUBEJS_DS_KSQL_DB_KAFKA_USER=kafka_api_key CUBEJS_DS_KSQL_DB_KAFKA_PASS=kafka_api_secret CUBEJS_DS_KSQL_DB_KAFKA_USE_SSL=true

Then, create cubes that reference your data. A common pattern is to combine a batch cube (reading historical data from your warehouse) with a streaming cube (reading real-time data from ksqlDB via Kafka) using a lambda pre-aggregation.

The batch cube queries the warehouse and builds daily partitions incrementally. The streaming cube points at an existing ksqlDB stream with data_source: ksql and uses a read-only streaming pre-aggregation that consumes from the backing Kafka topic directly. The lambda pre-aggregation in the batch cube merges both, serving historical data from the warehouse rollup and real-time data from the streaming rollup:

cube("order_events", { data_source: "default", sql: ` SELECT order_id, user_id, status, amount, created_at FROM ecommerce.order_events WHERE ${FILTER_PARAMS.order_events.created_at.filter( (from, to) => `created_at >= ${from} AND created_at < ${to}` )} `, measures: { count: { type: `count`, }, total_amount: { sql: `amount`, type: `sum`, }, failed_count: { sql: `CASE WHEN status = 'failed' THEN 1 ELSE 0 END`, type: `sum`, }, }, dimensions: { order_id: { sql: `order_id`, type: `string`, primary_key: true, }, user_id: { sql: `user_id`, type: `string`, }, status: { sql: `status`, type: `string`, }, created_at: { sql: `created_at`, type: `time`, }, }, pre_aggregations: { lambda: { type: `rollup_lambda`, rollups: [ order_events.batch, order_events_stream.stream, ], }, batch: { type: `rollup`, measures: [CUBE.count, CUBE.total_amount, CUBE.failed_count], dimensions: [CUBE.order_id, CUBE.user_id, CUBE.status], time_dimension: CUBE.created_at, granularity: `second`, partition_granularity: `day`, build_range_start: { sql: `SELECT NOW() - INTERVAL '90 days'`, }, build_range_end: { sql: `SELECT NOW()`, }, refresh_key: { every: `8 hour`, update_window: `1 day`, incremental: true, }, indexes: { user_status: { columns: [CUBE.user_id, CUBE.status], }, }, }, }, }); cube("order_events_stream", { data_source: "ksql", sql: `SELECT * FROM ORDER_EVENTS_STREAM`, measures: { count: { type: `count`, }, total_amount: { sql: `AMOUNT`, type: `sum`, }, failed_count: { sql: `CASE WHEN STATUS = 'failed' THEN 1 ELSE 0 END`, type: `sum`, }, }, dimensions: { order_id: { sql: `ORDER_ID`, type: `string`, primary_key: true, }, user_id: { sql: `USER_ID`, type: `string`, }, status: { sql: `STATUS`, type: `string`, }, created_at: { sql: `CREATED_AT`, type: `time`, }, }, pre_aggregations: { stream: { type: `rollup`, read_only: true, measures: [CUBE.count, CUBE.total_amount, CUBE.failed_count], dimensions: [CUBE.order_id, CUBE.user_id, CUBE.status], unique_key_columns: [`order_id`], time_dimension: CUBE.created_at, granularity: `second`, partition_granularity: `day`, build_range_start: { sql: `SELECT date_trunc('day', DATE_SUB(NOW(), INTERVAL '5 hour'))`, }, build_range_end: { sql: `SELECT DATE_ADD(NOW(), INTERVAL '15 minute')`, }, refresh_key: { every: `1 minute`, update_window: `1 hour`, incremental: true, }, indexes: { user_status: { columns: [CUBE.user_id, CUBE.status], }, }, stream_offset: `latest`, }, }, });

Key properties for the streaming pre-aggregation:

  • read_only: true — Cube will not create any objects in ksqlDB. The data is consumed directly from the backing Kafka topic.
  • stream_offset — controls where Cube Store starts consuming from in the Kafka topic. Set to "latest" to only consume new messages arriving after the pre-aggregation is created. Set to "earliest" to replay the topic from the beginning. Defaults to "latest" if not specified. On subsequent refreshes, Cube Store automatically resumes from the last processed offset regardless of this setting.
  • unique_key_columns — columns that uniquely identify a record, used for deduplication (see below).

The sql_table or sql value should reference an existing ksqlDB stream or table. Cube discovers its schema automatically. With Kafka streams mode enabled, the streaming pre-aggregation reads the backing Kafka topic directly — no objects are created in ksqlDB.

Unique key columns and deduplication

When unique_key_columns is set, Cube Store appends an internal sequence column (__seq) to the table, populated from the Kafka partition offset. The unique key columns together with __seq form the sort key for all indexes on this table.

Deduplication is not applied at ingestion time — all incoming records are appended as they arrive. Instead, Cube Store deduplicates during reads and compaction: rows are sorted by the unique key columns and then by __seq, and only the last row per unique key (the one with the highest sequence number) is kept. This means that if the same key appears multiple times in the stream, the most recent version is always the one returned by queries.

For Kafka messages, unique key column values can come from either the message payload (the JSON value) or the message key. If a column listed in unique_key_columns is missing from the payload, Cube Store falls back to the Kafka message key: for a single unique key column, the raw key value is used; for composite keys, the key is expected to be a JSON object with matching field names.

Stream format

Cube Store expects Kafka messages to have a JSON object as their value payload, with field names matching the column names defined in the cube. For example, given the streaming cube above, each Kafka message value should look like:

{ "ORDER_ID": "ord_12345", "USER_ID": "usr_789", "STATUS": "completed", "AMOUNT": 49.99, "CREATED_AT": "2025-01-15T10:30:00.000" }

Field names are case-sensitive and must match the column names used in the sql property of each dimension and measure definition. Missing fields default to null.

The message key is optional. When present and the value starts with {, it is parsed as a JSON object and used as a fallback source for unique key column values (see above).

Filtering on the stream

When the streaming cube defines a sql property with a SELECT statement (rather than sql_table), Cube Store applies the projection and any WHERE filters from that statement directly on each micro-batch of incoming Kafka messages. This filtering happens inside Cube Store using its query engine — it does not require ksqlDB to process the filter. Only rows that pass the filter are ingested into the pre-aggregation table.

This allows you to define a streaming cube that only ingests a subset of the data from the underlying Kafka topic without creating any server-side filter objects in ksqlDB.

Supported SQL syntax

The SELECT statement must follow a strict shape. Cube Store only accepts plans that resolve to Projection > Filter > TableScan (where the filter is optional). Any other query plan shape is rejected.

Supported:

  • SELECT with column references (e.g., SELECT col1, col2 FROM topic)
  • SELECT * wildcard
  • Column aliases (SELECT col1 AS my_alias)
  • WHERE clause with comparison operators (=, !=, <, >, <=, >=)
  • Boolean logic in WHERE (AND, OR, NOT)
  • IS NULL and IS NOT NULL
  • IN lists (col IN (1, 2, 3))
  • BETWEEN expressions
  • CASE ... WHEN ... THEN ... ELSE ... END expressions
  • CAST(expr AS type) type conversions
  • EXTRACT(field FROM expr) for date/time parts
  • SUBSTRING(expr FROM start FOR length)
  • Scalar functions (e.g., COALESCE, CONCAT, arithmetic)
  • CONVERT_TZ for timezone conversion (internally rewritten for compatibility)
  • Nested expressions with parentheses

Not supported:

  • JOIN clauses — only a single FROM table is allowed
  • Subqueries in SELECT or WHERE
  • GROUP BY, HAVING, or aggregate functions (SUM, COUNT, AVG, etc.)
  • ORDER BY (rows are consumed in stream order)
  • LIMIT and OFFSET
  • UNION, INTERSECT, EXCEPT
  • Window functions (OVER, PARTITION BY)
  • Multiple FROM or multiple WHERE clauses
  • Common Table Expressions (WITH ... AS)

All column expressions in the SELECT list that are not simple column references must have explicit aliases. Unique key columns may reference the source column through a scalar function (e.g., CAST(id AS VARCHAR) AS id), but not through arbitrary expressions.

Was this page useful?