Extremely overdue that I write this down as it’s a common problem, and really applies to any database that needs to scale horizontally, not just Cassandra.
Good partition keys are not always obvious, and it’s easy to create a bad one.
Pros
Cons
Example Idea
-- shard table
CREATE TABLE IF NOT EXISTS shard_table ( id uuid, shard_id uuid, PRIMARY KEY(id, shard_id));
-- data table
CREATE TABLE IF NOT EXISTS my_table ( id uuid, shard_id uuid, clustering_id timeuuid, data text, PRIMARY KEY((id, shard_id), clustering_id));
-- first you have to query the shard table
SELECT * FROM shard_table WHERE id = 38bea878–3bc9–48ea-9b12–38aefcefb749;
-- the shard table returns
-- 7bc5204a-f24f-4a1a-8dcc-e19073d3e4d2
-- f3836d84-a694–47d5-ba1e-65eea128de13
-- and you do queries for each shard (using async in the driver)
SELECT * FROM my_table WHERE id = 38bea878–3bc9–48ea-9b12–38aefcefb749 AND shard_id = 7bc5204a-f24f-4a1a-8dcc-e19073d3e4d2;
-- ingest will have to first create shards (say every 10k or split up by workers).
SELECT * FROM my_table WHERE id = 38bea878–3bc9–48ea-9b12–38aefcefb749 AND shard_id = f3836d84-a694–47d5-ba1e-65eea128de13;
INSERT INTO shard_table (id, shard_id) VALUES (38bea878–3bc9–48ea-9b12–38aefcefb749, 7bc5204a-f24f-4a1a-8dcc-e19073d3e4d2);
-- insert into my_table using the shard you just created.
INSERT INTO my_table (id, shard_id, clustering_id, data) VALUES (
38bea878–3bc9–48ea-9b12–38aefcefb749, 7bc5204a-f24f-4a1a-8dcc-e19073d3e4d2, 9eb3f56a-bf24-11e5-9912-ba0be0483c18, 'my data');
INSERT INTO my_table (id, shard_id, clustering_id, data) VALUES (
38bea878–3bc9–48ea-9b12–38aefcefb749, 7bc5204a-f24f-4a1a-8dcc-e19073d3e4d2, f84532ba-bf24-11e5-9912-ba0be0483c18, 'my other data');
-- repeat insert until you "fill up" the shard with your calculated max.
Pros
Cons
Example Idea
-- single table
CREATE TABLE IF NOT EXISTS my_table ( id uuid, shard_count int STATIC, shard_id int, clustering_id timeuuid, data text, PRIMARY KEY((id, shard_id), clustering_id));
-- first you have to query the first shard to see if there are more.
SELECT * FROM my_table WHERE id = 38bea878–3bc9–48ea-9b12–38aefcefb749 AND shard_id = 0;
-- and it returns
-- | id |shard_count|shard_id|clustering_id | data |
-- |38bea878–3bc9–48ea-9b12–38aefcefb749|2 |0 |9eb3f56a-bf24-11e5-9912-ba0be0483c18|'my data'|
-- Since we have more than one shard we can now query the other shard
SELECT * FROM my_table WHERE id = 38bea878-3bc9-48ea-9b12-38aefcefb749 AND shard_id = 1;
-- You can just update the count as a separate update as you generate new shards or if you know how many shards you'll end up with before you start ingesting.
UPDATE my_table set shard_count = 2 WHERE id = 38bea878-3bc9-48ea-9b12-38aefcefb749 AND shard_id = 0
-- ingest into a given shard.
INSERT INTO my_table (id, shard_id, clustering_id, data) VALUES (
38bea878-3bc9-48ea-9b12-38aefcefb749, 0, 9eb3f56a-bf24-11e5-9912-ba0be0483c18, 'my data');
INSERT INTO my_table (id, shard_id, clustering_id, data) VALUES (
38bea878-3bc9-48ea-9b12-38aefcefb749, 0 , f84532ba-bf24-11e5-9912-ba0be0483c18, 'my other data');
-- ingesting into another shard.
INSERT INTO my_table (id, shard_id, clustering_id, data) VALUES (
38bea878-3bc9-48ea-9b12-38aefcefb749, 1, f84532ba-bf24-11e5-9912-ba0be0483c18, 'my shard 2 data');
INSERT INTO my_table (id, shard_id, clustering_id, data) VALUES (
38bea878-3bc9-48ea-9b12-38aefcefb749, 1, f84532ba-bf24-11e5-9912-ba0be0483c18, 'my other shard 2 data');
Pros
Cons
Example Idea
-- single table
CREATE TABLE IF NOT EXISTS my_table ( id uuid, shard_id int, clustering_id timeuuid, data text, PRIMARY KEY((id, shard_id), clustering_id));
-- You always assume there are 5 shards. This is useful when you
-- just want to shrink the max partition size down and so you throw every record in a different shard.
SELECT * FROM my_table WHERE id = 38bea878-3bc9-48ea-9b12-38aefcefb749 AND shard_id = 0;
SELECT * FROM my_table WHERE id = 38bea878-3bc9-48ea-9b12-38aefcefb749 AND shard_id = 1;
SELECT * FROM my_table WHERE id = 38bea878-3bc9-48ea-9b12-38aefcefb749 AND shard_id = 2;
SELECT * FROM my_table WHERE id = 38bea878-3bc9-48ea-9b12-38aefcefb749 AND shard_id = 3;
SELECT * FROM my_table WHERE id = 38bea878-3bc9-48ea-9b12-38aefcefb749 AND shard_id = 4;
-- ingest into a shard.
INSERT INTO my_table (id, shard_id, clustering_id, data) VALUES (
38bea878-3bc9-48ea-9b12-38aefcefb749, 0, 9eb3f56a-bf24-11e5-9912-ba0be0483c18, 'my data');
INSERT INTO my_table (id, shard_count, shard_id, clustering_id, data) VALUES (
38bea878-3bc9-48ea-9b12-38aefcefb749, 0 , f84532ba-bf24-11e5-9912-ba0be0483c18, 'my other data');
-- inserting into another shard.
INSERT INTO my_table (id, shard_count, shard_id, clustering_id, data) VALUES (
38bea878-3bc9-48ea-9b12-38aefcefb749, 1, f84532ba-bf24-11e5-9912-ba0be0483c18, 'my shard 2 data');
INSERT INTO my_table (id, shard_count, shard_id, clustering_id, data) VALUES (
38bea878-3bc9-48ea-9b12-38aefcefb749, 1, f84532ba-bf24-11e5-9912-ba0be0483c18, 'my other shard 2 data');
To full advantage of sharded data model one needs to use async queries, and since many developers struggle with the API for async queries, so samples are useful. What follows is a simple example using integer-based shards using the 3.11 driver:
public class CassandraDAO {
private final Cluster cluster;
private final Session session;
private final PreparedStatement query;
public CassandraDAO(){
cluster = Cluster.Builder.addContactPoint(“127.0.0.1”).build();
session = cluster.newSession();
query = session.prepare("SELECT * FROM my_table WHERE id = ? and shard_id = ?");
}
public List<String> findDataById(UUID id) {
List<ResultSetFuture> futures = new ArrayList<>();
int totalShards = 5
for (int i = 0; i < totalShards; i++) {
futures.add(session.executeAsync(query.bind(id, i)));
}
List<String> dataResults = new ArrayList<>();
for(ResultSetFuture future: futures) {
Result result = future.getUninterruptibly();
dataResults.add(result.getString("data"));
}
return dataResults;
}