Joe Gregorio
October 2008
, updated
January 2013
This is part four of a five-part series on effectively scaling your App Engine-based apps. To see the other articles in the series, see Related links .
When developing an efficient application on Google App Engine, you need to pay attention to how often an entity is updated. While App Engine's datastore scales to support a huge number of entities, it is important to note that you can only expect to update any single entity or entity group about five times a second. That is an estimate and the actual update rate for an entity is dependent on several attributes of the entity, including how many properties it has, how large it is, and how many indexes need updating. While a single entity or entity group has a limit on how quickly it can be updated, App Engine excels at handling many parallel requests distributed across distinct entities, and we can take advantage of this by using sharding.
The question is, what if you had an entity that you wanted to update faster than five times a second? For example, you might count the number of votes in a poll, the number of comments, or even the number of visitors to your site. Take this simple example:
Python
class Counter(ndb.Model): count = ndb.IntegerProperty()
Java
@PersistenceCapable(identityType = IdentityType.APPLICATION) public class Counter { @PrimaryKey @Persistent(valueStrategy = IdGeneratorStrategy.IDENTITY) private Long id; @Persistent private Integer count; public Long getId() { return id; } public Integer getCount() { return count; } // ... }
Go
type Counter struct { Count int }
If you had a single entity that was the counter and the update rate
was too fast, then you would have contention as the serialized
writes would stack up and start to timeout. The way to solve this
problem is a little counter-intuitive if you are coming from a
relational database; the solution relies on the fact that
reads
from the App Engine datastore are extremely fast and cheap. The way to
reduce the contention is to build a sharded counter – break the
counter up into
N
different counters. When you
want to increment the counter, you pick one of the shards at random
and increment it. When you want to know the total count,
you read all of the counter shards and sum up their individual counts. The
more shards you have, the higher the throughput you will have for
increments on your counter. This technique works for a lot more
than just counters and an important skill to learn is spotting the
entities in your application with a lot of writes and then finding
good ways to shard them.
Here is a very simple implementation of a sharded counter:
Python
Run / Modifyimport random from google.appengine.ext import ndb NUM_SHARDS = 20 class SimpleCounterShard(ndb.Model): """Shards for the counter""" count = ndb.IntegerProperty(default=0) def get_count(): """Retrieve the value for a given sharded counter. Returns: Integer; the cumulative count of all sharded counters. """ total = 0 for counter in SimpleCounterShard.query(): total += counter.count return total @ndb.transactional def increment(): """Increment the value for a given sharded counter.""" shard_string_index = str(random.randint(0, NUM_SHARDS - 1)) counter = SimpleCounterShard.get_by_id(shard_string_index) if counter is None: counter = SimpleCounterShard(id=shard_string_index) counter.count += 1 counter.put()
Java
import com.google.appengine.api.datastore.DatastoreService; import com.google.appengine.api.datastore.DatastoreServiceFactory; import com.google.appengine.api.datastore.Entity; import com.google.appengine.api.datastore.EntityNotFoundException; import com.google.appengine.api.datastore.Key; import com.google.appengine.api.datastore.KeyFactory; import com.google.appengine.api.datastore.Query; import com.google.appengine.api.datastore.Transaction; import java.util.ConcurrentModificationException; import java.util.Random; import java.util.logging.Level; import java.util.logging.Logger; /** * This initial implementation simply counts all instances of the * SimpleCounterShard kind in the datastore. The only way to increment the * number of shards is to add another shard by creating another entity in the * datastore. */ public class ShardedCounter { /** * DatastoreService object for Datastore access. */ private static final DatastoreService DS = DatastoreServiceFactory .getDatastoreService(); /** * Default number of shards. */ private static final int NUM_SHARDS = 20; /** * A random number generator, for distributing writes across shards. */ private final Random generator = new Random(); /** * A logger object. */ private static final Logger LOG = Logger.getLogger(ShardedCounter.class .getName()); /** * Retrieve the value of this sharded counter. * * @return Summed total of all shards' counts */ public final long getCount() { long sum = 0; Query query = new Query("SimpleCounterShard"); for (Entity e : DS.prepare(query).asIterable()) { sum += (Long) e.getProperty("count"); } return sum; } /** * Increment the value of this sharded counter. */ public final void increment() { int shardNum = generator.nextInt(NUM_SHARDS); Key shardKey = KeyFactory.createKey("SimpleCounterShard", Integer.toString(shardNum)); Transaction tx = DS.beginTransaction(); Entity shard; try { try { shard = DS.get(tx, shardKey); long count = (Long) shard.getProperty("count"); shard.setUnindexedProperty("count", count + 1L); } catch (EntityNotFoundException e) { shard = new Entity(shardKey); shard.setUnindexedProperty("count", 1L); } DS.put(tx, shard); tx.commit(); } catch (ConcurrentModificationException e) { LOG.log(Level.WARNING, "You may need more shards. Consider adding more shards."); LOG.log(Level.WARNING, e.toString(), e); } catch (Exception e) { LOG.log(Level.WARNING, e.toString(), e); } finally { if (tx.isActive()) { tx.rollback(); } } } }
Java (JDO)
import PMF; import java.util.List; import java.util.Random; import javax.jdo.PersistenceManager; /** * This initial implementation simply counts all instances of the * SimpleCounterShard class in the datastore. The only way to increment the * number of shards is to add another shard by creating another entity in the * datastore */ public class ShardedCounter { private static final int NUM_SHARDS = 20; /** * Retrieve the value of this sharded counter. * * @return Summed total of all shards' counts */ public int getCount() { int sum = 0; PersistenceManager pm = PMF.get().getPersistenceManager(); try { String query = "select from " + SimpleCounterShard.class.getName(); List<SimpleCounterShard> shards = (List<SimpleCounterShard>) pm.newQuery(query).execute(); if (shards != null && !shards.isEmpty()) { for (SimpleCounterShard shard : shards) { sum += shard.getCount(); } } } finally { pm.close(); } return sum; } /** * Increment the value of this sharded counter. */ public void increment() { PersistenceManager pm = PMF.get().getPersistenceManager(); Random generator = new Random(); int shardNum = generator.nextInt(NUM_SHARDS); try { Query shardQuery = pm.newQuery(SimpleCounterShard.class); shardQuery.setFilter("shardNumber == numParam"); shardQuery.declareParameters("int numParam"); List<SimpleCounterShard> shards = (List<SimpleCounterShard>) shardQuery.execute(shardNum); SimpleCounterShard shard; // If the shard with the passed shard number exists, increment its count // by 1. Otherwise, create a new shard object, set its count to 1, and // persist it. if (shards != null && !shards.isEmpty()) { shard = shards.get(0); shard.setCount(shard.getCount() + 1); } else { shard = new SimpleCounterShard(); shard.setShardNumber(shardNum); shard.setCount(1); } pm.makePersistent(shard); } finally { pm.close(); } } }
Go
package sharded_counter import ( "appengine" "appengine/datastore" "fmt" "os" "rand" ) type simpleCounterShard struct { Count int } const ( numShards = 20 shardKind = "SimpleCounterShard" ) // Count retrieves the value of the counter. func Count(c appengine.Context) (int, error) { total := 0 q := datastore.NewQuery(shardKind) for t := q.Run(c); ; { var s simpleCounterShard _, err := t.Next(&s;) if err == datastore.Done { break } if err != nil { return total, err } total += s.Count } return total, nil } // Increment increments the counter. func Increment(c appengine.Context) error { return datastore.RunInTransaction(c, func(c appengine.Context) error { shardName := fmt.Sprintf("shard%d", rand.Intn(numShards)) key := datastore.NewKey(c, shardKind, shardName, 0, nil) var s simpleCounterShard err := datastore.Get(c, key, &s;) // A missing entity and a present entity will both work. if err != nil && err != datastore.ErrNoSuchEntity { return err } s.Count++ _, err = datastore.Put(c, key, &s;) return err }, nil) }
In
get_count()
(Python),
getCount()
(Java) and
Count()
(Go),
we simply loop over all the shards and add up
the individual shard counts. In
increment()
(Python and Java) and
Increment()
(Go),
we choose a shard at random and then read,
increment, and write it back to the datastore.
Note that we create the shards lazily, only creating them when they
are first incremented. The lazy creation of the shards allows the
number of shards to be increased (but never decreased) in the
future if more are needed. The value of
NUM_SHARDS
/
numShards
could be doubled
and the results from
get_count()
/
Count()
would not change since the
query only selects the shards that have been added to the
datastore, and
increment()
/
Increment()
will lazily create shards that aren't
there.
That is useful as an example to learn from, but a more general
purpose counter would allow you to create named counters on the
fly, increase the number of shards dynamically, and use memcache to
speed up reads to shards. By default,
ndb
uses memcache to
automatically speed up reads. The code below is an updated implementation
of the example that
Brett Slatkin gave in his Google I/O talk
,
along with a function to increase the number of shards for a particular counter:
Python
Run / Modifyimport random from google.appengine.api import memcache from google.appengine.ext import ndb SHARD_KEY_TEMPLATE = 'shard-{}-{:d}' class GeneralCounterShardConfig(ndb.Model): """Tracks the number of shards for each named counter.""" num_shards = ndb.IntegerProperty(default=20) @classmethod def all_keys(cls, name): """Returns all possible keys for the counter name given the config. Args: name: The name of the counter. Returns: The full list of ndb.Key values corresponding to all the possible counter shards that could exist. """ config = cls.get_or_insert(name) shard_key_strings = [SHARD_KEY_TEMPLATE.format(name, index) for index in range(config.num_shards)] return [ndb.Key(GeneralCounterShard, shard_key_string) for shard_key_string in shard_key_strings] class GeneralCounterShard(ndb.Model): """Shards for each named counter.""" count = ndb.IntegerProperty(default=0) def get_count(name): """Retrieve the value for a given sharded counter. Args: name: The name of the counter. Returns: Integer; the cumulative count of all sharded counters for the given counter name. """ total = memcache.get(name) if total is None: total = 0 all_keys = GeneralCounterShardConfig.all_keys(name) for counter in ndb.get_multi(all_keys): if counter is not None: total += counter.count memcache.add(name, total, 60) return total def increment(name): """Increment the value for a given sharded counter. Args: name: The name of the counter. """ config = GeneralCounterShardConfig.get_or_insert(name) _increment(name, config.num_shards) @ndb.transactional def _increment(name, num_shards): """Transactional helper to increment the value for a given sharded counter. Also takes a number of shards to determine which shard will be used. Args: name: The name of the counter. num_shards: How many shards to use. """ index = random.randint(0, num_shards - 1) shard_key_string = SHARD_KEY_TEMPLATE.format(name, index) counter = GeneralCounterShard.get_by_id(shard_key_string) if counter is None: counter = GeneralCounterShard(id=shard_key_string) counter.count += 1 counter.put() # Memcache increment does nothing if the name is not a key in memcache memcache.incr(name) @ndb.transactional def increase_shards(name, num_shards): """Increase the number of shards for a given sharded counter. Will never decrease the number of shards. Args: name: The name of the counter. num_shards: How many shards to use. """ config = GeneralCounterShardConfig.get_or_insert(name) if config.num_shards < num_shards: config.num_shards = num_shards config.put()
Java
import com.google.appengine.api.datastore.DatastoreService; import com.google.appengine.api.datastore.DatastoreServiceFactory; import com.google.appengine.api.datastore.Entity; import com.google.appengine.api.datastore.EntityNotFoundException; import com.google.appengine.api.datastore.Key; import com.google.appengine.api.datastore.KeyFactory; import com.google.appengine.api.datastore.Query; import com.google.appengine.api.datastore.Transaction; import com.google.appengine.api.memcache.Expiration; import com.google.appengine.api.memcache.MemcacheService; import com.google.appengine.api.memcache.MemcacheService.SetPolicy; import com.google.appengine.api.memcache.MemcacheServiceFactory; import java.util.ConcurrentModificationException; import java.util.Random; import java.util.logging.Level; import java.util.logging.Logger; /** * A counter which can be incremented rapidly. * * Capable of incrementing the counter and increasing the number of shards. When * incrementing, a random shard is selected to prevent a single shard from being * written too frequently. If increments are being made too quickly, increase * the number of shards to divide the load. Performs datastore operations using * the low level datastore API. */ public class ShardedCounter { /** * Convenience class which contains constants related to a named sharded * counter. The counter name provided in the constructor is used as the * entity key. */ private static final class Counter { /** * Entity kind representing a named sharded counter. */ private static final String KIND = "Counter"; /** * Property to store the number of shards in a given {@value #KIND} * named sharded counter. */ private static final String SHARD_COUNT = "shard_count"; } /** * Convenience class which contains constants related to the counter shards. * The shard number (as a String) is used as the entity key. */ private static final class CounterShard { /** * Entity kind prefix, which is concatenated with the counter name to * form the final entity kind, which represents counter shards. */ private static final String KIND_PREFIX = "CounterShard_"; /** * Property to store the current count within a counter shard. */ private static final String COUNT = "count"; } /** * DatastoreService object for Datastore access. */ private static final DatastoreService DS = DatastoreServiceFactory .getDatastoreService(); /** * Default number of shards. */ private static final int INITIAL_SHARDS = 5; /** * Cache duration for memcache. */ private static final int CACHE_PERIOD = 60; /** * The name of this counter. */ private final String counterName; /** * A random number generating, for distributing writes across shards. */ private final Random generator = new Random(); /** * The counter shard kind for this counter. */ private String kind; /** * Memcache service object for Memcache access. */ private final MemcacheService mc = MemcacheServiceFactory .getMemcacheService(); /** * A logger object. */ private static final Logger LOG = Logger.getLogger(ShardedCounter.class .getName()); /** * Constructor which creates a sharded counter using the provided counter * name. * * @param name * name of the sharded counter */ public ShardedCounter(final String name) { counterName = name; kind = CounterShard.KIND_PREFIX + counterName; } /** * Increase the number of shards for a given sharded counter. Will never * decrease the number of shards. * * @param count * Number of new shards to build and store */ public final void addShards(final int count) { Key counterKey = KeyFactory.createKey(Counter.KIND, counterName); incrementPropertyTx(counterKey, Counter.SHARD_COUNT, count, INITIAL_SHARDS + count); } /** * Retrieve the value of this sharded counter. * * @return Summed total of all shards' counts */ public final long getCount() { Long value = (Long) mc.get(kind); if (value != null) { return value; } long sum = 0; Query query = new Query(kind); for (Entity shard : DS.prepare(query).asIterable()) { sum += (Long) shard.getProperty(CounterShard.COUNT); } mc.put(kind, sum, Expiration.byDeltaSeconds(CACHE_PERIOD), SetPolicy.ADD_ONLY_IF_NOT_PRESENT); return sum; } /** * Increment the value of this sharded counter. */ public final void increment() { // Find how many shards are in this counter. int numShards = getShardCount(); // Choose the shard randomly from the available shards. long shardNum = generator.nextInt(numShards); Key shardKey = KeyFactory.createKey(kind, Long.toString(shardNum)); incrementPropertyTx(shardKey, CounterShard.COUNT, 1, 1); mc.increment(kind, 1); } /** * Get the number of shards in this counter. * * @return shard count */ private int getShardCount() { try { Key counterKey = KeyFactory.createKey(Counter.KIND, counterName); Entity counter = DS.get(counterKey); Long shardCount = (Long) counter.getProperty(Counter.SHARD_COUNT); return shardCount.intValue(); } catch (EntityNotFoundException ignore) { return INITIAL_SHARDS; } } /** * Increment datastore property value inside a transaction. If the entity * with the provided key does not exist, instead create an entity with the * supplied initial property value. * * @param key * the entity key to update or create * @param prop * the property name to be incremented * @param increment * the amount by which to increment * @param initialValue * the value to use if the entity does not exist */ private void incrementPropertyTx(final Key key, final String prop, final long increment, final long initialValue) { Transaction tx = DS.beginTransaction(); Entity thing; long value; try { try { thing = DS.get(tx, key); value = (Long) thing.getProperty(prop) + increment; } catch (EntityNotFoundException e) { thing = new Entity(key); value = initialValue; } thing.setUnindexedProperty(prop, value); DS.put(tx, thing); tx.commit(); } catch (ConcurrentModificationException e) { LOG.log(Level.WARNING, "You may need more shards. Consider adding more shards."); LOG.log(Level.WARNING, e.toString(), e); } catch (Exception e) { LOG.log(Level.WARNING, e.toString(), e); } finally { if (tx.isActive()) { tx.rollback(); } } } }
Java (JDO)
import PMF; import java.util.List; import java.util.Random; import javax.jdo.PersistenceManager; import javax.jdo.Query; /** * A counter which can be incremented rapidly. * * Capable of incrementing the counter and increasing the number of shards. * When incrementing, a random shard is selected to prevent a single shard * from being written to too frequently. If increments are being made too * quickly, increase the number of shards to divide the load. Performs * datastore operations using JDO. */ public class ShardedCounter { private String counterName; public ShardedCounter(String counterName) { this.counterName = counterName; } public String getCounterName() { return counterName; } /** * Retrieve the value of this sharded counter. * * @return Summed total of all shards' counts */ public int getCount() { int sum = 0; PersistenceManager pm = PMF.get().getPersistenceManager(); try { Query shardsQuery = pm.newQuery(GeneralCounterShard.class, "counterName == nameParam"); shardsQuery.declareParameters("String nameParam"); List<GeneralCounterShard> shards = (List<GeneralCounterShard>) shardsQuery.execute(counterName); if (shards != null && !shards.isEmpty()) { for (GeneralCounterShard current : shards) { sum += current.getCount(); } } } finally { pm.close(); } return sum; } /** * Increment the value of this sharded counter. */ public void increment() { PersistenceManager pm = PMF.get().getPersistenceManager(); // Find how many shards are in this counter. int shardCount = 0; try { Counter current = getThisCounter(pm); shardCount = current.getShardCount(); } finally { pm.close(); } // Choose the shard randomly from the available shards. Random generator = new Random(); int shardNum = generator.nextInt(shardCount); pm = PMF.get().getPersistenceManager(); try { Query randomShardQuery = pm.newQuery(GeneralCounterShard.class); randomShardQuery.setFilter( "counterName == nameParam && shardNumber == numParam"); randomShardQuery.declareParameters("String nameParam, int numParam"); List<GeneralCounterShard> shards = (List<GeneralCounterShard>) randomShardQuery.execute(counterName, shardNum); if (shards != null && !shards.isEmpty()) { GeneralCounterShard shard = shards.get(0); shard.increment(1); pm.makePersistent(shard); } } finally { pm.close(); } } /** * Increase the number of shards for a given sharded counter. * Will never decrease the number of shards. * * @param count Number of new shards to build and store * @return Total number of shards */ public int addShards(int count) { PersistenceManager pm = PMF.get().getPersistenceManager(); // Find the initial shard count for this counter. int numShards = 0; try { Counter current = getThisCounter(pm); if (current != null) { numShards = current.getShardCount().intValue(); current.setShardCount(numShards + count); // Save the increased shard count for this counter. pm.makePersistent(current); } } finally { pm.close(); } // Create new shard objects for this counter. pm = PMF.get().getPersistenceManager(); try { for (int i = 0; i < count; i++) { GeneralCounterShard newShard = new GeneralCounterShard(getCounterName(), numShards); pm.makePersistent(newShard); numShards++; } } finally { pm.close(); } return numShards; } /** * @return Counter datastore object matching this object's counterName value */ private Counter getThisCounter(PersistenceManager pm) { Counter current = null; Query thisCounterQuery = pm.newQuery(Counter.class, "counterName == nameParam"); thisCounterQuery.declareParameters("String nameParam"); List<Counter> counter = (List<Counter>) thisCounterQuery.execute(counterName); if (counter != null && !counter.isEmpty()) { current = counter.get(0); } return current; } }
Go
package sharded_counter import ( "appengine" "appengine/datastore" "appengine/memcache" "fmt" "os" "rand" ) type counterConfig struct { Shards int } type shard struct { Name string Count int } const ( defaultShards = 20 configKind = "GeneralCounterShardConfig" shardKind = "GeneralCounterShard" ) func memcacheKey(name string) string { return shardKind + ":" + name } // Count retrieves the value of the named counter. func Count(c appengine.Context, name string) (int, error) { total := 0 mkey := memcacheKey(name) if _, err := memcache.JSON.Get(c, mkey, &total;); err == nil { return total, nil } q := datastore.NewQuery(shardKind).Filter("Name =", name) for t := q.Run(c); ; { var s shard _, err := t.Next(&s;) if err == datastore.Done { break } if err != nil { return total, err } total += s.Count } memcache.JSON.Set(c, &memcache.Item;{ Key: mkey, Object: &total;, Expiration: 60, }) return total, nil } // Increment increments the named counter. func Increment(c appengine.Context, name string) error { // Get counter config. var cfg counterConfig ckey := datastore.NewKey(c, configKind, name, 0, nil) err := datastore.RunInTransaction(c, func(c appengine.Context) error { err := datastore.Get(c, ckey, &cfg;) if err == datastore.ErrNoSuchEntity { cfg.Shards = defaultShards _, err = datastore.Put(c, ckey, &cfg;) } return err }) if err != nil { return err } err = datastore.RunInTransaction(c, func(c appengine.Context) error { shardName := fmt.Sprintf("shard%d", rand.Intn(cfg.Shards)) key := datastore.NewKey(c, shardKind, shardName, 0, nil) var s shard err := datastore.Get(c, key, &s;) // A missing entity and a present entity will both work. if err != nil && err != datastore.ErrNoSuchEntity { return err } s.Count++ _, err = datastore.Put(c, key, &s;) return err }, nil) if err != nil { return err } memcache.Increment(c, memcacheKey(name), 1, 0) return nil } // IncreaseShards increases the number of shards for the named counter to n. // It will never decrease the number of shards. func IncreaseShards(c appengine.Context, name string, n int) error { ckey := datastore.NewKey(c, configKind, name, 0, nil) return datastore.RunInTransaction(c, func(c appengine.Context) error { var cfg counterConfig mod := false err := datastore.Get(c, ckey, &cfg;) if err == datastore.ErrNoSuchEntity { cfg.Shards = defaultShards mod = true } else if err != nil { return err } if cfg.Shards < n { cfg.Shards = n mod = true } if mod { _, err = datastore.Put(c, ckey, &cfg;) } return err }, nil) }
Source
The Python source and Java source for both counters described above is available in our samples repository. While the web interface in the examples isn't much to look at, it's instructive to use the admin interface and inspect the data models after you have incremented both counters a few times.
Conclusion
Sharding is one of many important techniques in building a scalable application and hopefully these examples will give you ideas of where you apply the technique in your application. The code in these articles is available under the Apache 2 license so feel free to start with them as you build your solutions.
More Info
Watch Brett Slatkin's Google I/O talk " Building Scalable Web Applications with Google App Engine ".