Before attempting workarounds consider whether Citus is appropriate for your situation. Citus’ current version works well for real-time analytics and multi-tenant use cases.
Citus supports all SQL statements in the multi-tenant use-case. Even in the real-time analytics use-cases, with queries that span across nodes, Citus supports the majority of statements. The few types of unsupported queries are listed in Are there any PostgreSQL features not supported by Citus? Many of the unsupported features have workarounds; below are a number of the most useful.
JOIN a local and a distributed table¶
Attempting to execute a JOIN between a local table “local” and a distributed table “dist” causes an error:
SELECT * FROM local JOIN dist USING (id); /* ERROR: relation local is not distributed STATEMENT: SELECT * FROM local JOIN dist USING (id); ERROR: XX000: relation local is not distributed LOCATION: DistributedTableCacheEntry, metadata_cache.c:711 */
Although you can’t join such tables directly, by wrapping the local table in a subquery or CTE you can make Citus’ recursive query planner copy the local table data to worker nodes. By colocating the data this allows the query to proceed.
-- either SELECT * FROM (SELECT * FROM local) AS x JOIN dist USING (id); -- or WITH x AS (SELECT * FROM local) SELECT * FROM x JOIN dist USING (id);
Remember that the coordinator will send the results in the subquery or CTE to all workers which require it for processing. Thus it’s best to either add the most specific filters and limits to the inner query as possible, or else aggregate the table. That reduces the network overhead which such a query can cause. More about this in Subquery/CTE Network Overhead.
Currently Citus does not have out-of-the-box support for window functions on cross-shard queries, but there is a straightforward workaround. Window functions will work across shards on a distributed table if
- The window function is in a subquery and
- It includes a
PARTITION BYclause containing the table’s distribution column
Suppose you have table called
github_events, distributed by the column
user_id. This query will not work directly:
-- won't work, see workaround SELECT repo_id, org->'id' as org_id, count(*) OVER (PARTITION BY user_id) FROM github_events;
You can make it work by moving the window function into a subquery like this:
SELECT * FROM ( SELECT repo_id, org->'id' as org_id, count(*) OVER (PARTITION BY user_id) FROM github_events ) windowed;
Remember that it specifies
PARTITION BY user_id, the distribution column.
Temp Tables: the Last Resort¶
There are still a few queries that are unsupported even with the use of push-pull execution via subqueries. One of them is running window functions that partition by a non-distribution column. For example, if we update the example from the previous example to partition on
repo_id the workaround mentioned in the previous section will no longer work:
-- this won't work, not even with the subquery workaround SELECT repo_id, org->'id' as org_id, count(*) OVER (PARTITION BY repo_id) -- repo_id is not distribution column FROM github_events WHERE repo_id IN (8514, 15435, 19438, 21692);
There is another trick though. We can pull the relevant information to the coordinator as a temporary table:
-- grab the data, minus the aggregate, into a local table CREATE TEMP TABLE results AS ( SELECT repo_id, org->'id' as org_id FROM github_events WHERE repo_id IN (8514, 15435, 19438, 21692) ); -- now run the aggregate locally SELECT repo_id, org_id, count(*) OVER (PARTITION BY repo_id) FROM results;
Creating a temporary table on the coordinator is a last resort. It is limited by the disk size and CPU of the node.