Creating and Modifying Distributed Tables (DDL)¶
Creating And Distributing Tables¶
To create a distributed table, you need to first define the table schema. To do so, you can define a table using the CREATE TABLE statement in the same way as you would do with a regular PostgreSQL table.
CREATE TABLE github_events ( event_id bigint, event_type text, event_public boolean, repo_id bigint, payload jsonb, repo jsonb, actor jsonb, org jsonb, created_at timestamp );
Next, you can use the create_distributed_table() function to specify the table distribution column and create the worker shards.
SELECT create_distributed_table('github_events', 'repo_id');
This function informs Citus that the github_events table should be distributed on the repo_id column (by hashing the column value). The function also creates shards on the worker nodes using the citus.shard_count and citus.shard_replication_factor configuration values.
This example would create a total of citus.shard_count number of shards where each shard owns a portion of a hash token space and gets replicated based on the default citus.shard_replication_factor configuration value. The shard replicas created on the worker have the same table schema, index, and constraint definitions as the table on the coordinator. Once the replicas are created, this function saves all distributed metadata on the coordinator.
Each created shard is assigned a unique shard id and all its replicas have the same shard id. Each shard is represented on the worker node as a regular PostgreSQL table with name ‘tablename_shardid’ where tablename is the name of the distributed table and shardid is the unique id assigned to that shard. You can connect to the worker postgres instances to view or run commands on individual shards.
You are now ready to insert data into the distributed table and run queries on it. You can also learn more about the UDF used in this section in the Citus Utility Functions of our documentation.
The above method distributes tables into multiple horizontal shards, but another possibility is distributing tables into a single shard and replicating the shard to every worker node. Tables distributed this way are called reference tables. They are used to store data that needs to be frequently accessed by multiple nodes in a cluster.
Common candidates for reference tables include:
- Smaller tables which need to join with larger distributed tables.
- Tables in multi-tenant apps which lack a tenant id column or which aren’t associated with a tenant. (In some cases, to reduce migration effort, users might even choose to make reference tables out of tables associated with a tenant but which currently lack a tenant id.)
- Tables which need unique constraints across multiple columns and are small enough.
For instance suppose a multi-tenant eCommerce site needs to calculate sales tax for transactions in any of its stores. Tax information isn’t specific to any tenant. It makes sense to consolidate it in a shared table. A US-centric reference table might look like this:
-- a reference table CREATE TABLE states ( code char(2) PRIMARY KEY, full_name text NOT NULL, general_sales_tax numeric(4,3) ); -- distribute it to all workers SELECT create_reference_table('states');
Now queries such as one calculating tax for a shopping cart can join on the
states table with no network overhead.
In addition to distributing a table as a single replicated shard, the
create_reference_table UDF marks it as a reference table in the Citus metadata tables. Citus automatically performs two-phase commits (2PC) for modifications to tables marked this way, which provides strong consistency guarantees.
If you have an existing distributed table which has a shard count of one, you can upgrade it to be a recognized reference table by running
For another example of using reference tables in a multi-tenant application, see Sharing Data Between Tenants.
Distributing Coordinator Data¶
If an existing PostgreSQL database is converted into the coordinator node for a Citus cluster, the data in its tables can be distributed efficiently and with minimal interruption to an application.
create_distributed_table function described earlier works on both empty and non-empty tables, and for the latter automatically distributes table rows throughout the cluster. You will know if it does this by the presence of the message, “NOTICE: Copying data from local table...” For example:
CREATE TABLE series AS SELECT i FROM generate_series(1,1000000) i; SELECT create_distributed_table('series', 'i'); NOTICE: Copying data from local table... create_distributed_table -------------------------- (1 row)
Writes on the table are blocked while the data is migrated, and pending writes are handled as distributed queries once the function commits. (If the function fails then the queries become local again.) Reads can continue as normal and will become distributed queries once the function commits.
When distributing a number of tables with foreign keys between them, it’s best to drop the foreign keys before running
create_distributed_table and recreating them after distributing the tables. Foreign keys cannot always be enforced when one table is distributed and the other is not.
When migrating data from an external database, such as from Amazon RDS to Citus Cloud, first create the Citus distributed tables via
create_distributed_table, then copy the data into the table.
Co-location is the practice of dividing data tactically, keeping related information on the same machines to enable efficient relational operations, while taking advantage of the horizontal scalability for the whole dataset. For more information and examples see Table Co-Location.
Tables are co-located in groups. To manually control a table’s co-location group assignment use the optional
colocate_with parameter of
create_distributed_table. If you don’t care about a table’s co-location then omit this parameter. It defaults to the value
'default', which groups the table with any other default co-location table having the same distribution column type, shard count, and replication factor.
-- these tables are implicitly co-located by using the same -- distribution column type and shard count with the default -- co-location group SELECT create_distributed_table('A', 'some_int_col'); SELECT create_distributed_table('B', 'other_int_col');
If you would prefer a table to be in its own co-location group, specify
-- not co-located with other tables SELECT create_distributed_table('A', 'foo', colocate_with => 'none');
To co-locate a number of tables, distribute one and then put the others into its co-location group. For example:
-- distribute stores SELECT create_distributed_table('stores', 'store_id'); -- add to the same group as stores SELECT create_distributed_table('orders', 'store_id', colocate_with => 'stores'); SELECT create_distributed_table('products', 'store_id', colocate_with => 'stores');
Upgrading from Citus 5.x¶
Starting with Citus 6.0, we made co-location a first-class concept, and started tracking tables’ assignment to co-location groups in pg_dist_colocation. Since Citus 5.x didn’t have this concept, tables created with Citus 5 were not explicitly marked as co-located in metadata, even when the tables were physically co-located.
Since Citus uses co-location metadata information for query optimization and pushdown, it becomes critical to inform Citus of this co-location for previously created tables. To fix the metadata, simply mark the tables as co-located:
-- Assume that stores, products and line_items were created in a Citus 5.x database. -- Put products and line_items into store's co-location group SELECT mark_tables_colocated('stores', ARRAY['products', 'line_items']);
This function requires the tables to be distributed with the same method, column type, number of shards, and replication method. It doesn’t re-shard or physically move data, it merely updates Citus metadata.
You can use the standard PostgreSQL DROP TABLE command to remove your distributed tables. As with regular tables, DROP TABLE removes any indexes, rules, triggers, and constraints that exist for the target table. In addition, it also drops the shards on the worker nodes and cleans up their metadata.
DROP TABLE github_events;
Citus automatically propagates many kinds of DDL statements, which means that modifying a distributed table on the coordinator node will update shards on the workers too. Other DDL statements require manual propagation, and certain others are prohibited such as those which would modify a distribution column. Attempting to run DDL that is ineligible for automatic propagation will raise an error and leave tables on the coordinator node unchanged.
Here is a reference of the categories of DDL statements which propagate. Note that automatic propagation can be enabled or disabled with a configuration parameter.
Citus propagates most ALTER TABLE commands automatically. Adding columns or changing their default values work as they would in a single-machine PostgreSQL database:
-- Adding a column ALTER TABLE products ADD COLUMN description text; -- Changing default value ALTER TABLE products ALTER COLUMN price SET DEFAULT 7.77;
Significant changes to an existing column are fine too, except for those applying to the distribution column. This column determines how table data distributes through the Citus cluster and cannot be modified in a way that would change data distribution.
-- Cannot be executed against a distribution column -- Removing a column ALTER TABLE products DROP COLUMN description; -- Changing column data type ALTER TABLE products ALTER COLUMN price TYPE numeric(10,2); -- Renaming a column ALTER TABLE products RENAME COLUMN product_no TO product_number;
Using Citus allows you to continue to enjoy the safety of a relational database, including database constraints (see the PostgreSQL docs). Due to the nature of distributed systems, Citus will not cross-reference uniqueness constraints or referential integrity between worker nodes. Foreign keys must always be declared between colocated tables. To do this, use compound foreign keys that include the distribution column.
This example shows how to create primary and foreign keys on distributed tables.
-- -- Adding a primary key -- -------------------- -- Ultimately we'll distribute these tables on the account id, so the -- ads and clicks tables use compound keys to include it. ALTER TABLE accounts ADD PRIMARY KEY (id); ALTER TABLE ads ADD PRIMARY KEY (account_id, id); ALTER TABLE clicks ADD PRIMARY KEY (account_id, id); -- Next distribute the tables -- (primary keys must be created prior to distribution) SELECT create_distributed_table('accounts', 'id'); SELECT create_distributed_table('ads', 'account_id'); SELECT create_distributed_table('clicks', 'account_id'); -- -- Adding foreign keys -- ------------------- -- Note that this can happen before or after distribution, as long as -- there exists a uniqueness constraint on the target column(s) which -- can only be enforced before distribution. ALTER TABLE ads ADD CONSTRAINT ads_account_fk FOREIGN KEY (account_id) REFERENCES accounts (id); ALTER TABLE clicks ADD CONSTRAINT clicks_account_fk FOREIGN KEY (account_id) REFERENCES accounts (id);
Uniqueness constraints, like primary keys, must be added prior to table distribution.
-- Suppose we want every ad to use a unique image. Notice we can -- enforce it only per account when we distribute by account id. ALTER TABLE ads ADD CONSTRAINT ads_unique_image UNIQUE (account_id, image_url);
Not-null constraints can always be applied because they require no lookups between workers.
ALTER TABLE ads ALTER COLUMN image_url SET NOT NULL;
Citus supports adding and removing indices:
-- Adding an index CREATE INDEX clicked_at_idx ON clicks USING BRIN (clicked_at); -- Removing an index DROP INDEX clicked_at_idx;
Adding an index takes a write lock, which can be undesirable in a multi-tenant “system-of-record.” To minimize application downtime, create the index concurrently instead. This method requires more total work than a standard index build and takes significantly longer to complete. However, since it allows normal operations to continue while the index is built, this method is useful for adding new indexes in a production environment.
-- Adding an index without locking table writes CREATE INDEX CONCURRENTLY clicked_at_idx ON clicks USING BRIN (clicked_at);