这里主要是写操作相关的函数
- /* Lookup a key for write operations, and as a side effect, if needed, expires
- * the key if its TTL is reached.
- *
- * Returns the linked value object if the key exists or NULL if the key
- * does not exist in the specified DB. */
- robj *lookupKeyWriteWithFlags(redisDb *db, robj *key, int flags) {
- expireIfNeeded(db,key);
- return lookupKey(db,key,flags);
- }
-
- robj *lookupKeyWrite(redisDb *db, robj *key) {
- return lookupKeyWriteWithFlags(db, key, LOOKUP_NONE);
- }
- void SentReplyOnKeyMiss(client *c, robj *reply){
- serverAssert(sdsEncodedObject(reply));
- sds rep = reply->ptr;
- if (sdslen(rep) > 1 && rep[0] == '-'){
- addReplyErrorObject(c, reply);
- } else {
- addReply(c,reply);
- }
- }
- robj *lookupKeyReadOrReply(client *c, robj *key, robj *reply) {
- robj *o = lookupKeyRead(c->db, key);
- if (!o) SentReplyOnKeyMiss(c, reply);
- return o;
- }
-
- robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) {
- robj *o = lookupKeyWrite(c->db, key);
- if (!o) SentReplyOnKeyMiss(c, reply);
- return o;
- }
-
- /* Add the key to the DB. It's up to the caller to increment the reference
- * counter of the value if needed.
- *
- * The program is aborted if the key already exists. */
- void dbAdd(redisDb *db, robj *key, robj *val) {
- sds copy = sdsdup(key->ptr);
- int retval = dictAdd(db->dict, copy, val);
-
- serverAssertWithInfo(NULL,key,retval == DICT_OK);
- signalKeyAsReady(db, key, val->type);
- if (server.cluster_enabled) slotToKeyAdd(key->ptr);
- }
-
- /* This is a special version of dbAdd() that is used only when loading
- * keys from the RDB file: the key is passed as an SDS string that is
- * retained by the function (and not freed by the caller).
- *
- * Moreover this function will not abort if the key is already busy, to
- * give more control to the caller, nor will signal the key as ready
- * since it is not useful in this context.
- *
- * The function returns 1 if the key was added to the database, taking
- * ownership of the SDS string, otherwise 0 is returned, and is up to the
- * caller to free the SDS string. */
- int dbAddRDBLoad(redisDb *db, sds key, robj *val) {
- int retval = dictAdd(db->dict, key, val);
- if (retval != DICT_OK) return 0;
- if (server.cluster_enabled) slotToKeyAdd(key);
- return 1;
- }
-
- /* Overwrite an existing key with a new value. Incrementing the reference
- * count of the new value is up to the caller.
- * This function does not modify the expire time of the existing key.
- *
- * The program is aborted if the key was not already present. */
- void dbOverwrite(redisDb *db, robj *key, robj *val) {
- dictEntry *de = dictFind(db->dict,key->ptr);
-
- serverAssertWithInfo(NULL,key,de != NULL);
- dictEntry auxentry = *de;
- robj *old = dictGetVal(de);
- if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
- val->lru = old->lru;
- }
- /* Although the key is not really deleted from the database, we regard
- overwrite as two steps of unlink+add, so we still need to call the unlink
- callback of the module. */
- moduleNotifyKeyUnlink(key,old);
- dictSetVal(db->dict, de, val);
-
- if (server.lazyfree_lazy_server_del) {
- freeObjAsync(key,old);
- dictSetVal(db->dict, &auxentry, NULL);
- }
-
- dictFreeVal(db->dict, &auxentry);
- }
-
- /* High level Set operation. This function can be used in order to set
- * a key, whatever it was existing or not, to a new object.
- *
- * 1) The ref count of the value object is incremented.
- * 2) clients WATCHing for the destination key notified.
- * 3) The expire time of the key is reset (the key is made persistent),
- * unless 'keepttl' is true.
- *
- * All the new keys in the database should be created via this interface.
- * The client 'c' argument may be set to NULL if the operation is performed
- * in a context where there is no clear client performing the operation. */
- void genericSetKey(client *c, redisDb *db, robj *key, robj *val, int keepttl, int signal) {
- if (lookupKeyWrite(db,key) == NULL) {
- dbAdd(db,key,val);
- } else {
- dbOverwrite(db,key,val);
- }
- incrRefCount(val);
- if (!keepttl) removeExpire(db,key);
- if (signal) signalModifiedKey(c,db,key);
- }
-
- /* Common case for genericSetKey() where the TTL is not retained. */
- void setKey(client *c, redisDb *db, robj *key, robj *val) {
- genericSetKey(c,db,key,val,0,1);
- }
-
- /* Return a random key, in form of a Redis object.
- * If there are no keys, NULL is returned.
- *
- * The function makes sure to return keys not already expired. */
- robj *dbRandomKey(redisDb *db) {
- dictEntry *de;
- int maxtries = 100;
- int allvolatile = dictSize(db->dict) == dictSize(db->expires);
-
- while(1) {
- sds key;
- robj *keyobj;
-
- de = dictGetFairRandomKey(db->dict);
- if (de == NULL) return NULL;
-
- key = dictGetKey(de);
- keyobj = createStringObject(key,sdslen(key));
- if (dictFind(db->expires,key)) {
- if (allvolatile && server.masterhost && --maxtries == 0) {
- /* If the DB is composed only of keys with an expire set,
- * it could happen that all the keys are already logically
- * expired in the slave, so the function cannot stop because
- * expireIfNeeded() is false, nor it can stop because
- * dictGetRandomKey() returns NULL (there are keys to return).
- * To prevent the infinite loop we do some tries, but if there
- * are the conditions for an infinite loop, eventually we
- * return a key name that may be already expired. */
- return keyobj;
- }
- if (expireIfNeeded(db,keyobj)) {
- decrRefCount(keyobj);
- continue; /* search for another key. This expired. */
- }
- }
- return keyobj;
- }
- }
-
- /* Delete a key, value, and associated expiration entry if any, from the DB */
- int dbSyncDelete(redisDb *db, robj *key) {
- /* Deleting an entry from the expires dict will not free the sds of
- * the key, because it is shared with the main dictionary. */
- if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
- dictEntry *de = dictUnlink(db->dict,key->ptr);
- if (de) {
- robj *val = dictGetVal(de);
- /* Tells the module that the key has been unlinked from the database. */
- moduleNotifyKeyUnlink(key,val);
- dictFreeUnlinkedEntry(db->dict,de);
- if (server.cluster_enabled) slotToKeyDel(key->ptr);
- return 1;
- } else {
- return 0;
- }
- }
-
- /* This is a wrapper whose behavior depends on the Redis lazy free
- * configuration. Deletes the key synchronously or asynchronously. */
- int dbDelete(redisDb *db, robj *key) {
- return server.lazyfree_lazy_server_del ? dbAsyncDelete(db,key) :
- dbSyncDelete(db,key);
- }
-
- /* Prepare the string object stored at 'key' to be modified destructively
- * to implement commands like SETBIT or APPEND.
- *
- * An object is usually ready to be modified unless one of the two conditions
- * are true:
- *
- * 1) The object 'o' is shared (refcount > 1), we don't want to affect
- * other users.
- * 2) The object encoding is not "RAW".
- *
- * If the object is found in one of the above conditions (or both) by the
- * function, an unshared / not-encoded copy of the string object is stored
- * at 'key' in the specified 'db'. Otherwise the object 'o' itself is
- * returned.
- *
- * USAGE:
- *
- * The object 'o' is what the caller already obtained by looking up 'key'
- * in 'db', the usage pattern looks like this:
- *
- * o = lookupKeyWrite(db,key);
- * if (checkType(c,o,OBJ_STRING)) return;
- * o = dbUnshareStringValue(db,key,o);
- *
- * At this point the caller is ready to modify the object, for example
- * using an sdscat() call to append some data, or anything else.
- */
- robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) {
- serverAssert(o->type == OBJ_STRING);
- if (o->refcount != 1 || o->encoding != OBJ_ENCODING_RAW) {
- robj *decoded = getDecodedObject(o);
- o = createRawStringObject(decoded->ptr, sdslen(decoded->ptr));
- decrRefCount(decoded);
- dbOverwrite(db,key,o);
- }
- return o;
- }
-
- /* Remove all keys from the database(s) structure. The dbarray argument
- * may not be the server main DBs (could be a backup).
- *
- * The dbnum can be -1 if all the DBs should be emptied, or the specified
- * DB index if we want to empty only a single database.
- * The function returns the number of keys removed from the database(s). */
- long long emptyDbStructure(redisDb *dbarray, int dbnum, int async,
- void(callback)(void*))
- {
- long long removed = 0;
- int startdb, enddb;
-
- if (dbnum == -1) {
- startdb = 0;
- enddb = server.dbnum-1;
- } else {
- startdb = enddb = dbnum;
- }
-
- for (int j = startdb; j <= enddb; j++) {
- removed += dictSize(dbarray[j].dict);
- if (async) {
- emptyDbAsync(&dbarray[j]);
- } else {
- dictEmpty(dbarray[j].dict,callback);
- dictEmpty(dbarray[j].expires,callback);
- }
- /* Because all keys of database are removed, reset average ttl. */
- dbarray[j].avg_ttl = 0;
- dbarray[j].expires_cursor = 0;
- }
-
- return removed;
- }
-
- /* Remove all keys from all the databases in a Redis server.
- * If callback is given the function is called from time to time to
- * signal that work is in progress.
- *
- * The dbnum can be -1 if all the DBs should be flushed, or the specified
- * DB number if we want to flush only a single Redis database number.
- *
- * Flags are be EMPTYDB_NO_FLAGS if no special flags are specified or
- * EMPTYDB_ASYNC if we want the memory to be freed in a different thread
- * and the function to return ASAP.
- *
- * On success the function returns the number of keys removed from the
- * database(s). Otherwise -1 is returned in the specific case the
- * DB number is out of range, and errno is set to EINVAL. */
- long long emptyDb(int dbnum, int flags, void(callback)(void*)) {
- int async = (flags & EMPTYDB_ASYNC);
- RedisModuleFlushInfoV1 fi = {REDISMODULE_FLUSHINFO_VERSION,!async,dbnum};
- long long removed = 0;
-
- if (dbnum < -1 || dbnum >= server.dbnum) {
- errno = EINVAL;
- return -1;
- }
-
- /* Fire the flushdb modules event. */
- moduleFireServerEvent(REDISMODULE_EVENT_FLUSHDB,
- REDISMODULE_SUBEVENT_FLUSHDB_START,
- &fi);
-
- /* Make sure the WATCHed keys are affected by the FLUSH* commands.
- * Note that we need to call the function while the keys are still
- * there. */
- signalFlushedDb(dbnum, async);
-
- /* Empty redis database structure. */
- removed = emptyDbStructure(server.db, dbnum, async, callback);
-
- /* Flush slots to keys map if enable cluster, we can flush entire
- * slots to keys map whatever dbnum because only support one DB
- * in cluster mode. */
- if (server.cluster_enabled) slotToKeyFlush(async);
-
- if (dbnum == -1) flushSlaveKeysWithExpireList();
-
- /* Also fire the end event. Note that this event will fire almost
- * immediately after the start event if the flush is asynchronous. */
- moduleFireServerEvent(REDISMODULE_EVENT_FLUSHDB,
- REDISMODULE_SUBEVENT_FLUSHDB_END,
- &fi);
-
- return removed;
- }
-
- /* Store a backup of the database for later use, and put an empty one
- * instead of it. */
- dbBackup *backupDb(void) {
- dbBackup *backup = zmalloc(sizeof(dbBackup));
-
- /* Backup main DBs. */
- backup->dbarray = zmalloc(sizeof(redisDb)*server.dbnum);
- for (int i=0; i<server.dbnum; i++) {
- backup->dbarray[i] = server.db[i];
- server.db[i].dict = dictCreate(&dbDictType,NULL);
- server.db[i].expires = dictCreate(&dbExpiresDictType,NULL);
- }
-
- /* Backup cluster slots to keys map if enable cluster. */
- if (server.cluster_enabled) {
- backup->slots_to_keys = server.cluster->slots_to_keys;
- memcpy(backup->slots_keys_count, server.cluster->slots_keys_count,
- sizeof(server.cluster->slots_keys_count));
- server.cluster->slots_to_keys = raxNew();
- memset(server.cluster->slots_keys_count, 0,
- sizeof(server.cluster->slots_keys_count));
- }
-
- moduleFireServerEvent(REDISMODULE_EVENT_REPL_BACKUP,
- REDISMODULE_SUBEVENT_REPL_BACKUP_CREATE,
- NULL);
-
- return backup;
- }