CREATE TABLE
Use the CREATE TABLE
command to create a new table. Tables consist of fixed columns and insertable rows.
Rows can be added using the INSERT command. When creating a table, you can specify connector settings and data format.
If you choose not to persist the data from the source in RisingWave, use CREATE SOURCE instead. For more details about the differences between sources and tables, see here.
Syntax
Notes
-
RisingWave supports NOT NULL constraints in table schemas. For batch inserts and updates, RisingWave throws an error if any row contains NULL in a NOT NULL column; for streaming ingestion, rows with NULL in NOT NULL columns are silently ignored.
-
For tables with primary key constraints, if you insert a new data record with an existing key, the new record will overwrite the existing record.
-
A generated column that is defined with non-deterministic functions cannot be specified as part of the primary key. For example, if
A1
is defined ascurrent_timestamp()
, then it cannot be part of the primary key. -
Names and unquoted identifiers are case-insensitive. Therefore, you must double-quote any of these fields for them to be case-sensitive. See also Identifiers.
-
The syntax for creating a table with connector settings and the supported connectors are the same as for creating a source. See CREATE SOURCE for a full list of supported connectors and data formats.
-
To know when a data record is loaded to RisingWave, you can define a column that is generated based on the processing time (
<column_name> timestamptz AS proctime()
) when creating the table or source. See also proctime(). -
For a table with schema from external connector, use
*
to represent all columns from the external connector first, so that you can define a generated column on table with an external connector. Alternatively, you can partially combine the schema with generated columns or apply the schema directly to define the table structure. See the examples below:
Parameters
Parameter or clause | Description |
---|---|
table_name | The name of the table. If a schema name is given (for example, CREATE TABLE <schema>.<table> …), then the table is created in the specified schema. Otherwise it is created in the current schema. |
col_name | The name of a column. |
data_type | The data type of a column. With the struct data type, you can create a nested table. Elements in a nested table need to be enclosed with angle brackets (<>). |
DEFAULT | The DEFAULT clause allows you to assign a default value to a column. This default value is used when a new row is inserted, and no explicit value is provided for that column. default_expr is any constant value or variable-free expression that does not reference other columns in the current table or involve subqueries. The data type of default_expr must match the data type of the column. |
generation_expression | The expression for the generated column. For details about generated columns, see Generated columns. |
watermark_clause | A clause that defines the watermark for a timestamp column. The syntax is WATERMARK FOR column_name as expr. For the watermark clause to be valid, the table must be an append-only table. That is, the APPEND ONLY option must be specified. This restriction only applies to a table. For details about watermarks, See Watermarks below for more. |
APPEND ONLY | When this option is specified, the table will be created as an append-only table. An append-only table cannot have primary keys. UPDATE and DELETE statements are not valid for append-only tables. See Watermarks below for more. |
ON CONFLICT | Specify the alternative action when the newly inserted record brings a violation of PRIMARY KEY constraint on the table. See PK conflict behavior below for more information. |
INCLUDE clause | Extract fields not included in the payload as separate columns. For more details on its usage, see INCLUDE clause. |
WITH clause | Specify the connector settings here if trying to store all the source data. See the Data ingestion page for the full list of supported source as well as links to specific connector pages detailing the syntax for each source. |
FORMAT and ENCODE options | Specify the data format and the encoding format of the source data. To learn about the supported data formats, see Data formats. |
ENGINE | Specify the Iceberg table engine to store data natively in the Iceberg format. For more information, see Create a table with the Iceberg engine. |
Please distinguish between the parameters set in the FORMAT and ENCODE options and those set in the WITH clause. Ensure that you place them correctly and avoid any misuse.
Append only
The append-only table does not support delete or update operations, nor does it support non-append-only upstream connectors. There are two main reasons for using append-only tables:
- Certain features are exclusively available for append-only tables, such as watermark and TTL (Time-To-Live).
- Streaming jobs created downstream from append-only tables can leverage their append-only property to optimize performance.
Use Iceberg table engine
In a CREATE TABLE
statement, you can specify ENGINE = iceberg
to store table data in the Apache Iceberg format on external object storage. This provides an alternative to RisingWave’s default internal row-based storage, making it easier to persist data for long-term use and cross-system access. For more information on the syntax and usage, see Iceberg table engine.
Watermarks
RisingWave supports generating watermarks when creating an append-only streaming table. Watermarks are like markers or signals that track the progress of event time, allowing you to process events within their corresponding time windows. For more information on the syntax on how to create a watermark, see Watermarks.
TTL of append-only table
Data in an append-only table cannot be deleted with DELETE
statements. Therefore, RisingWave provides TTL (Time-To-Live) feature to automatically clean up expired data in the table.
However, please note that this cleanup only applies to the data within the table itself—it does not affect downstream materialized views or other streaming jobs. If you need to clean up data in downstream materialized views used in queries, you should use a Temporal Filter in the downstream streaming job.
PK conflict behavior
Records with insert operations could introduce duplicate records with the same primary key in the table. In that case, an alternative action specified by the ON CONFLICT conflict_action
clause will be adopted. The record can come from an INSERT
statement, external connectors of the table, or sinks into the table.
The conflict_action
could be one of the following. A version column can be specified together for DO UPDATE FULL
and DO UPDATE IF NOT NULL
. When a version column is specified, the insert operation will take effect only when the newly inserted row’s version column is greater than or equal to the existing row’s version column, and is not NULL.
IGNORE
: Ignore the newly inserted record.OVERWRITE [WITH VERSION COLUMN(col_name)]
: Full update: replace the existing row in the table with the new row.DO UPDATE IF NOT NULL [WITH VERSION COLUMN(col_name)]
: Partial update: only replace those fields with non-NULL values in the inserted row. NULL values in the inserted row means unchanged in this case.
Delete and update operations on the table cannot break the primary key constraint on the table, so the option will not take effect for those cases.
When DO UPDATE IF NOT NULL
behavior is applied, DEFAULT
clause is not allowed on the table’s columns.
Version column
A version column is a non-primary-key column that can be used with OVERWRITE
and DO UPDATE IF NOT NULL
to tune the conflict handling behaviors.
Common use cases include:
-
Use an event time column as the version column to ensure temporal ordering and ignore late-arriving data.
-
Use an integer as a logical version number. You can use an Upsert-then-Delete pattern with a logical version column to implement “zero-downtime incremental version upgrade” for data in the table: you insert a new version of data with a higher version number while keeping old data, and then after finishing the upgrade, you clean up old data with a
DELETE
statement.This can be used to implement in-place streaming job modification, or re-sync external batch data into RisingWave (reverse ETL).
When a row is updated only with a new version number and other columns remain unchanged, downstream streaming jobs that don’t use the version column won’t be affected. RisingWave will
automatically compact “no-op” updates to prevent massive updates for downstream jobs. You can create a logical view with SELECT * EXCEPT(version_col)
to avoid depending on the version column by mistake.
Example
The statement below creates a table that has three columns.
The statement below creates a table that includes nested tables.
The statement below creates a table with a Kafka broker as the source.