Creating and Modifying Distributed Objects (DDL)

Creating and Distributing Schemas

Citus 12.0 introduced Schema-based sharding that allows a schema to be distributed. Distributed schemas are automatically associated with individual colocation groups such that the tables created in those schemas will be automatically converted to colocated distributed tables without a shard key.

There are two ways in which a schema can be distributed in Citus:

Manually by calling citus_schema_distribute function:

SELECT citus_schema_distribute('user_service');

This method also allows you to convert existing regular schemas into distributed schemas.

Note

You can only distribute schemas that do not contain distributed and reference tables.

Alternative approach is to enable citus.enable_schema_based_sharding (boolean) configuration variable:

SET citus.enable_schema_based_sharding TO ON;

CREATE SCHEMA AUTHORIZATION user_service;

The variable can be changed for the current session or permanently in postgresql.conf. With the parameter set to ON all created schemas will be distributed by default.

The process of distributing the schema will automatically assign and move it to an existing node in the cluster. The background shard rebalancer takes these schemas and all tables within them when rebalancing the cluster, performing the optimal moves and migrating the schemas between the nodes in the cluster.

To convert a schema back into a regular PostgreSQL schema use the citus_schema_undistribute function:

SELECT citus_schema_undistribute('user_service');

The tables and data in the user_service schema will be moved from the current node back to the coordinator node in the cluster.

Creating And Distributing Tables

To create a distributed table, you need to first define the table schema. To do so, you can define a table using the CREATE TABLE statement in the same way as you would do with a regular PostgreSQL table.

CREATE TABLE github_events
(
    event_id bigint,
    event_type text,
    event_public boolean,
    repo_id bigint,
    payload jsonb,
    repo jsonb,
    actor jsonb,
    org jsonb,
    created_at timestamp
);

Next, you can use the create_distributed_table() function to specify the table distribution column and create the worker shards.

SELECT create_distributed_table('github_events', 'repo_id');

This function informs Citus that the github_events table should be distributed on the repo_id column (by hashing the column value). The function also creates shards on the worker nodes using the citus.shard_count configuration value.

This example would create a total of citus.shard_count number of shards where each shard owns a portion of a hash token space. Once the shards are created, this function saves all distributed metadata on the coordinator.

Each created shard is assigned a unique shard id. Each shard is represented on the worker node as a regular PostgreSQL table with name ‘tablename_shardid’ where tablename is the name of the distributed table and shardid is the unique id assigned to that shard. You can connect to the worker postgres instances to view or run commands on individual shards.

You are now ready to insert data into the distributed table and run queries on it. You can also learn more about the UDF used in this section in the Citus Utility Functions of our documentation.

Reference Tables

The above method distributes tables into multiple horizontal shards, but another possibility is distributing tables into a single shard and replicating the shard to every worker node. Tables distributed this way are called reference tables. They are used to store data that needs to be frequently accessed by multiple nodes in a cluster.

Common candidates for reference tables include:

  • Smaller tables which need to join with larger distributed tables.

  • Tables in multi-tenant apps which lack a tenant id column or which aren’t associated with a tenant. (In some cases, to reduce migration effort, users might even choose to make reference tables out of tables associated with a tenant but which currently lack a tenant id.)

  • Tables which need unique constraints across multiple columns and are small enough.

For instance suppose a multi-tenant eCommerce site needs to calculate sales tax for transactions in any of its stores. Tax information isn’t specific to any tenant. It makes sense to consolidate it in a shared table. A US-centric reference table might look like this:

-- a reference table

CREATE TABLE states (
  code char(2) PRIMARY KEY,
  full_name text NOT NULL,
  general_sales_tax numeric(4,3)
);

-- distribute it to all workers

SELECT create_reference_table('states');

Now queries such as one calculating tax for a shopping cart can join on the states table with no network overhead, and can add a foreign key to the state code for better validation.

In addition to distributing a table as a single replicated shard, the create_reference_table UDF marks it as a reference table in the Citus metadata tables. Citus automatically performs two-phase commits (2PC) for modifications to tables marked this way, which provides strong consistency guarantees.

If you have an existing distributed table, you can change it to a reference table by running:

SELECT undistribute_table('table_name');
SELECT create_reference_table('table_name');

For another example of using reference tables in a multi-tenant application, see Sharing Data Between Tenants.

Distributing Coordinator Data

If an existing PostgreSQL database is converted into the coordinator node for a Citus cluster, the data in its tables can be distributed efficiently and with minimal interruption to an application.

The create_distributed_table function described earlier works on both empty and non-empty tables, and for the latter it automatically distributes table rows throughout the cluster. You will know if it does this by the presence of the message, “NOTICE: Copying data from local table…” For example:

CREATE TABLE series AS SELECT i FROM generate_series(1,1000000) i;
SELECT create_distributed_table('series', 'i');
NOTICE:  Copying data from local table...
NOTICE:  copying the data has completed
DETAIL:  The local data in the table is no longer visible, but is still on disk.
HINT:  To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$public.series$$)
 create_distributed_table
 --------------------------

 (1 row)

Writes on the table are blocked while the data is migrated, and pending writes are handled as distributed queries once the function commits. (If the function fails then the queries become local again.) Reads can continue as normal and will become distributed queries once the function commits.

When distributing tables A and B, where A has a foreign key to B, distribute the key destination table B first. Doing it in the wrong order will cause an error:

ERROR:  cannot create foreign key constraint
DETAIL:  Referenced table must be a distributed table or a reference table.

If it’s not possible to distribute in the correct order then drop the foreign keys, distribute the tables, and recreate the foreign keys.

After the tables are distributed, use the truncate_local_data_after_distributing_table function to remove local data. Leftover local data in distributed tables is inaccessible to Citus queries, and can cause irrelevant constraint violations on the coordinator.

When migrating data from an external database, such as from Amazon RDS to our Managed Service, first create the Citus distributed tables via create_distributed_table, then copy the data into the table. Copying into distributed tables avoids running out of space on the coordinator node.

Co-Locating Tables

Co-location is the practice of dividing data tactically, keeping related information on the same machines to enable efficient relational operations, while taking advantage of the horizontal scalability for the whole dataset. For more information and examples see Table Co-Location.

Tables are co-located in groups. To manually control a table’s co-location group assignment use the optional colocate_with parameter of create_distributed_table. If you don’t care about a table’s co-location then omit this parameter. It defaults to the value 'default', which groups the table with any other default co-location table having the same distribution column type, and shard count. If you want to break or update this implicit colocation, you can use update_distributed_table_colocation().

-- these tables are implicitly co-located by using the same
-- distribution column type and shard count with the default
-- co-location group

SELECT create_distributed_table('A', 'some_int_col');
SELECT create_distributed_table('B', 'other_int_col');

When a new table is not related to others in its would-be implicit co-location group, specify colocated_with => 'none'.

-- not co-located with other tables

SELECT create_distributed_table('A', 'foo', colocate_with => 'none');

Splitting unrelated tables into their own co-location groups will improve shard rebalancing performance, because shards in the same group have to be moved together.

When tables are indeed related (for instance when they will be joined), it can make sense to explicitly co-locate them. The gains of appropriate co-location are more important than any rebalancing overhead.

To explicitly co-locate multiple tables, distribute one and then put the others into its co-location group. For example:

-- distribute stores
SELECT create_distributed_table('stores', 'store_id');

-- add to the same group as stores
SELECT create_distributed_table('orders', 'store_id', colocate_with => 'stores');
SELECT create_distributed_table('products', 'store_id', colocate_with => 'stores');

Information about co-location groups is stored in the pg_dist_colocation table, while pg_dist_partition reveals which tables are assigned to which groups.

Dropping Tables

You can use the standard PostgreSQL DROP TABLE command to remove your distributed tables. As with regular tables, DROP TABLE removes any indexes, rules, triggers, and constraints that exist for the target table. In addition, it also drops the shards on the worker nodes and cleans up their metadata.

DROP TABLE github_events;

Modifying Tables

Citus automatically propagates many kinds of DDL statements, which means that modifying a distributed table on the coordinator node will update shards on the workers too. Other DDL statements require manual propagation, and certain others are prohibited such as those which would modify a distribution column. Attempting to run DDL that is ineligible for automatic propagation will raise an error and leave tables on the coordinator node unchanged.

Here is a reference of the categories of DDL statements which propagate. Note that automatic propagation can be enabled or disabled with a configuration parameter.

Adding/Modifying Columns

Citus propagates most ALTER TABLE commands automatically. Adding columns or changing their default values work as they would in a single-machine PostgreSQL database:

-- Adding a column

ALTER TABLE products ADD COLUMN description text;

-- Changing default value

ALTER TABLE products ALTER COLUMN price SET DEFAULT 7.77;

Significant changes to an existing column like renaming it or changing its data type are fine too. However, the data type of the distribution column cannot be altered. This column determines how table data distributes through the Citus cluster, and modifying its data type would require moving the data.

Attempting to do so causes an error:

-- assuming store_id is the distribution column
-- for products, and that it has type integer

ALTER TABLE products
ALTER COLUMN store_id TYPE text;

/*
ERROR:  cannot execute ALTER TABLE command involving partition column
*/

As a workaround, you can consider changing the distribution column, updating it, and changing it back.

Adding/Removing Constraints

Using Citus allows you to continue to enjoy the safety of a relational database, including database constraints (see the PostgreSQL docs). Due to the nature of distributed systems, Citus will not cross-reference uniqueness constraints or referential integrity between worker nodes.

To set up a foreign key between colocated distributed tables, always include the distribution column in the key. This may involve making the key compound.

Foreign keys may be created in these situations:

Foreign keys from reference tables to distributed tables are not supported.

Citus supports all referential actions on foreign keys from local to reference tables, but does not support support ON DELETE/UPDATE CASCADE in the reverse direction (reference to local).

Note

Primary keys and uniqueness constraints must include the distribution column. Adding them to a non-distribution column will generate an error (see Cannot create uniqueness constraint).

This example shows how to create primary and foreign keys on distributed tables:

--
-- Adding a primary key
-- --------------------

-- We'll distribute these tables on the account_id. The ads and clicks
-- tables must use compound keys that include account_id.

ALTER TABLE accounts ADD PRIMARY KEY (id);
ALTER TABLE ads ADD PRIMARY KEY (account_id, id);
ALTER TABLE clicks ADD PRIMARY KEY (account_id, id);

-- Next distribute the tables

SELECT create_distributed_table('accounts', 'id');
SELECT create_distributed_table('ads',      'account_id');
SELECT create_distributed_table('clicks',   'account_id');

--
-- Adding foreign keys
-- -------------------

-- Note that this can happen before or after distribution, as long as
-- there exists a uniqueness constraint on the target column(s) which
-- can only be enforced before distribution.

ALTER TABLE ads ADD CONSTRAINT ads_account_fk
  FOREIGN KEY (account_id) REFERENCES accounts (id);
ALTER TABLE clicks ADD CONSTRAINT clicks_ad_fk
  FOREIGN KEY (account_id, ad_id) REFERENCES ads (account_id, id);

Similarly, include the distribution column in uniqueness constraints:

-- Suppose we want every ad to use a unique image. Notice we can
-- enforce it only per account when we distribute by account id.

ALTER TABLE ads ADD CONSTRAINT ads_unique_image
  UNIQUE (account_id, image_url);

Not-null constraints can be applied to any column (distribution or not) because they require no lookups between workers.

ALTER TABLE ads ALTER COLUMN image_url SET NOT NULL;

Using NOT VALID Constraints

In some situations it can be useful to enforce constraints for new rows, while allowing existing non-conforming rows to remain unchanged. Citus supports this feature for CHECK constraints and foreign keys, using PostgreSQL’s “NOT VALID” constraint designation.

For example, consider an application which stores user profiles in a reference table.

-- we're using the "text" column type here, but a real application
-- might use "citext" which is available in a postgres contrib module

CREATE TABLE users ( email text PRIMARY KEY );
SELECT create_reference_table('users');

In the course of time imagine that a few non-addresses get into the table.

INSERT INTO users VALUES
   ('foo@example.com'), ('hacker12@aol.com'), ('lol');

We would like to validate the addresses, but PostgreSQL does not ordinarily allow us to add a CHECK constraint that fails for existing rows. However, it does allow a constraint marked not valid:

ALTER TABLE users
ADD CONSTRAINT syntactic_email
CHECK (email ~
   '^[a-zA-Z0-9.!#$%&''*+/=?^_`{|}~-]+@[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(?:\.[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)*$'
) NOT VALID;

This succeeds, and new rows are protected.

INSERT INTO users VALUES ('fake');

/*
ERROR:  new row for relation "users_102010" violates
        check constraint "syntactic_email_102010"
DETAIL:  Failing row contains (fake).
*/

Later, during non-peak hours, a database administrator can attempt to fix the bad rows and re-validate the constraint.

-- later, attempt to validate all rows
ALTER TABLE users
VALIDATE CONSTRAINT syntactic_email;

The PostgreSQL documentation has more information about NOT VALID and VALIDATE CONSTRAINT in the ALTER TABLE section.

Adding/Removing Indices

Citus supports adding and removing indices:

-- Adding an index

CREATE INDEX clicked_at_idx ON clicks USING BRIN (clicked_at);

-- Removing an index

DROP INDEX clicked_at_idx;

Adding an index takes a write lock, which can be undesirable in a multi-tenant “system-of-record.” To minimize application downtime, create the index concurrently instead. This method requires more total work than a standard index build and takes significantly longer to complete. However, since it allows normal operations to continue while the index is built, this method is useful for adding new indexes in a production environment.

-- Adding an index without locking table writes

CREATE INDEX CONCURRENTLY clicked_at_idx ON clicks USING BRIN (clicked_at);

Types and Functions

Creating custom SQL types and user-defined functions propogates to worker nodes. However, creating such database objects in a transaction with distributed operations involves tradeoffs.

Citus parallelizes operations such as create_distributed_table() across shards using multiple connections per worker. Whereas, when creating a database object, Citus propagates it to worker nodes using a single connection per worker. Combining the two operations in a single transaction may cause issues, because the parallel connections will not be able to see the object that was created over a single connection but not yet committed.

Consider a transaction block that creates a type, a table, loads data, and distributes the table:

BEGIN;

-- type creation over a single connection:
CREATE TYPE coordinates AS (x int, y int);
CREATE TABLE positions (object_id text primary key, position coordinates);

-- data loading thus goes over a single connection:
SELECT create_distributed_table(‘positions’, ‘object_id’);
\COPY positions FROM ‘positions.csv’

COMMIT;

Prior to Citus 11.0, Citus would defer creating the type on the worker nodes, and commit it separately when creating the distributed table. This enabled the data copying in create_distributed_table() to happen in parallel. However, it also meant that the type was not always present on the Citus worker nodes – or if the transaction rolled back, the type would remain on the worker nodes.

With Citus 11.0, the default behaviour changes to prioritize schema consistency between coordinator and worker nodes. The new behavior has a downside: if object propagation happens after a parallel command in the same transaction, then the transaction can no longer be completed, as highlighted by the ERROR in the code block below:

BEGIN;
CREATE TABLE items (key text, value text);
-- parallel data loading:
SELECT create_distributed_table(‘items’, ‘key’);
\COPY items FROM ‘items.csv’
CREATE TYPE coordinates AS (x int, y int);

ERROR:  cannot run type command because there was a parallel operation on a distributed table in the transaction

If you run into this issue, there are two simple workarounds:

  1. Use set citus.create_object_propagation to deferred to return to the old object propagation behavior, in which case there may be some inconsistency between which database objects exist on different nodes.

  2. Use set citus.multi_shard_modify_mode to sequential to disable per-node parallelism. Data load in the same transaction might be slower.

Manual Modification

Most DDL commands are auto-propagated. For any others, you can propagate the changes manually. See Manual Query Propagation.