关键词搜索

源码搜索 ×
×

漫话Redis源码之六十八

发布2022-01-23浏览634次

详情内容

这里主要是写操作相关的函数

  1. /* Lookup a key for write operations, and as a side effect, if needed, expires
  2. * the key if its TTL is reached.
  3. *
  4. * Returns the linked value object if the key exists or NULL if the key
  5. * does not exist in the specified DB. */
  6. robj *lookupKeyWriteWithFlags(redisDb *db, robj *key, int flags) {
  7. expireIfNeeded(db,key);
  8. return lookupKey(db,key,flags);
  9. }
  10. robj *lookupKeyWrite(redisDb *db, robj *key) {
  11. return lookupKeyWriteWithFlags(db, key, LOOKUP_NONE);
  12. }
  13. void SentReplyOnKeyMiss(client *c, robj *reply){
  14. serverAssert(sdsEncodedObject(reply));
  15. sds rep = reply->ptr;
  16. if (sdslen(rep) > 1 && rep[0] == '-'){
  17. addReplyErrorObject(c, reply);
  18. } else {
  19. addReply(c,reply);
  20. }
  21. }
  22. robj *lookupKeyReadOrReply(client *c, robj *key, robj *reply) {
  23. robj *o = lookupKeyRead(c->db, key);
  24. if (!o) SentReplyOnKeyMiss(c, reply);
  25. return o;
  26. }
  27. robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) {
  28. robj *o = lookupKeyWrite(c->db, key);
  29. if (!o) SentReplyOnKeyMiss(c, reply);
  30. return o;
  31. }
  32. /* Add the key to the DB. It's up to the caller to increment the reference
  33. * counter of the value if needed.
  34. *
  35. * The program is aborted if the key already exists. */
  36. void dbAdd(redisDb *db, robj *key, robj *val) {
  37. sds copy = sdsdup(key->ptr);
  38. int retval = dictAdd(db->dict, copy, val);
  39. serverAssertWithInfo(NULL,key,retval == DICT_OK);
  40. signalKeyAsReady(db, key, val->type);
  41. if (server.cluster_enabled) slotToKeyAdd(key->ptr);
  42. }
  43. /* This is a special version of dbAdd() that is used only when loading
  44. * keys from the RDB file: the key is passed as an SDS string that is
  45. * retained by the function (and not freed by the caller).
  46. *
  47. * Moreover this function will not abort if the key is already busy, to
  48. * give more control to the caller, nor will signal the key as ready
  49. * since it is not useful in this context.
  50. *
  51. * The function returns 1 if the key was added to the database, taking
  52. * ownership of the SDS string, otherwise 0 is returned, and is up to the
  53. * caller to free the SDS string. */
  54. int dbAddRDBLoad(redisDb *db, sds key, robj *val) {
  55. int retval = dictAdd(db->dict, key, val);
  56. if (retval != DICT_OK) return 0;
  57. if (server.cluster_enabled) slotToKeyAdd(key);
  58. return 1;
  59. }
  60. /* Overwrite an existing key with a new value. Incrementing the reference
  61. * count of the new value is up to the caller.
  62. * This function does not modify the expire time of the existing key.
  63. *
  64. * The program is aborted if the key was not already present. */
  65. void dbOverwrite(redisDb *db, robj *key, robj *val) {
  66. dictEntry *de = dictFind(db->dict,key->ptr);
  67. serverAssertWithInfo(NULL,key,de != NULL);
  68. dictEntry auxentry = *de;
  69. robj *old = dictGetVal(de);
  70. if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
  71. val->lru = old->lru;
  72. }
  73. /* Although the key is not really deleted from the database, we regard
  74. overwrite as two steps of unlink+add, so we still need to call the unlink
  75. callback of the module. */
  76. moduleNotifyKeyUnlink(key,old);
  77. dictSetVal(db->dict, de, val);
  78. if (server.lazyfree_lazy_server_del) {
  79. freeObjAsync(key,old);
  80. dictSetVal(db->dict, &auxentry, NULL);
  81. }
  82. dictFreeVal(db->dict, &auxentry);
  83. }
  84. /* High level Set operation. This function can be used in order to set
  85. * a key, whatever it was existing or not, to a new object.
  86. *
  87. * 1) The ref count of the value object is incremented.
  88. * 2) clients WATCHing for the destination key notified.
  89. * 3) The expire time of the key is reset (the key is made persistent),
  90. * unless 'keepttl' is true.
  91. *
  92. * All the new keys in the database should be created via this interface.
  93. * The client 'c' argument may be set to NULL if the operation is performed
  94. * in a context where there is no clear client performing the operation. */
  95. void genericSetKey(client *c, redisDb *db, robj *key, robj *val, int keepttl, int signal) {
  96. if (lookupKeyWrite(db,key) == NULL) {
  97. dbAdd(db,key,val);
  98. } else {
  99. dbOverwrite(db,key,val);
  100. }
  101. incrRefCount(val);
  102. if (!keepttl) removeExpire(db,key);
  103. if (signal) signalModifiedKey(c,db,key);
  104. }
  105. /* Common case for genericSetKey() where the TTL is not retained. */
  106. void setKey(client *c, redisDb *db, robj *key, robj *val) {
  107. genericSetKey(c,db,key,val,0,1);
  108. }
  109. /* Return a random key, in form of a Redis object.
  110. * If there are no keys, NULL is returned.
  111. *
  112. * The function makes sure to return keys not already expired. */
  113. robj *dbRandomKey(redisDb *db) {
  114. dictEntry *de;
  115. int maxtries = 100;
  116. int allvolatile = dictSize(db->dict) == dictSize(db->expires);
  117. while(1) {
  118. sds key;
  119. robj *keyobj;
  120. de = dictGetFairRandomKey(db->dict);
  121. if (de == NULL) return NULL;
  122. key = dictGetKey(de);
  123. keyobj = createStringObject(key,sdslen(key));
  124. if (dictFind(db->expires,key)) {
  125. if (allvolatile && server.masterhost && --maxtries == 0) {
  126. /* If the DB is composed only of keys with an expire set,
  127. * it could happen that all the keys are already logically
  128. * expired in the slave, so the function cannot stop because
  129. * expireIfNeeded() is false, nor it can stop because
  130. * dictGetRandomKey() returns NULL (there are keys to return).
  131. * To prevent the infinite loop we do some tries, but if there
  132. * are the conditions for an infinite loop, eventually we
  133. * return a key name that may be already expired. */
  134. return keyobj;
  135. }
  136. if (expireIfNeeded(db,keyobj)) {
  137. decrRefCount(keyobj);
  138. continue; /* search for another key. This expired. */
  139. }
  140. }
  141. return keyobj;
  142. }
  143. }
  144. /* Delete a key, value, and associated expiration entry if any, from the DB */
  145. int dbSyncDelete(redisDb *db, robj *key) {
  146. /* Deleting an entry from the expires dict will not free the sds of
  147. * the key, because it is shared with the main dictionary. */
  148. if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
  149. dictEntry *de = dictUnlink(db->dict,key->ptr);
  150. if (de) {
  151. robj *val = dictGetVal(de);
  152. /* Tells the module that the key has been unlinked from the database. */
  153. moduleNotifyKeyUnlink(key,val);
  154. dictFreeUnlinkedEntry(db->dict,de);
  155. if (server.cluster_enabled) slotToKeyDel(key->ptr);
  156. return 1;
  157. } else {
  158. return 0;
  159. }
  160. }
  161. /* This is a wrapper whose behavior depends on the Redis lazy free
  162. * configuration. Deletes the key synchronously or asynchronously. */
  163. int dbDelete(redisDb *db, robj *key) {
  164. return server.lazyfree_lazy_server_del ? dbAsyncDelete(db,key) :
  165. dbSyncDelete(db,key);
  166. }
  167. /* Prepare the string object stored at 'key' to be modified destructively
  168. * to implement commands like SETBIT or APPEND.
  169. *
  170. * An object is usually ready to be modified unless one of the two conditions
  171. * are true:
  172. *
  173. * 1) The object 'o' is shared (refcount > 1), we don't want to affect
  174. * other users.
  175. * 2) The object encoding is not "RAW".
  176. *
  177. * If the object is found in one of the above conditions (or both) by the
  178. * function, an unshared / not-encoded copy of the string object is stored
  179. * at 'key' in the specified 'db'. Otherwise the object 'o' itself is
  180. * returned.
  181. *
  182. * USAGE:
  183. *
  184. * The object 'o' is what the caller already obtained by looking up 'key'
  185. * in 'db', the usage pattern looks like this:
  186. *
  187. * o = lookupKeyWrite(db,key);
  188. * if (checkType(c,o,OBJ_STRING)) return;
  189. * o = dbUnshareStringValue(db,key,o);
  190. *
  191. * At this point the caller is ready to modify the object, for example
  192. * using an sdscat() call to append some data, or anything else.
  193. */
  194. robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) {
  195. serverAssert(o->type == OBJ_STRING);
  196. if (o->refcount != 1 || o->encoding != OBJ_ENCODING_RAW) {
  197. robj *decoded = getDecodedObject(o);
  198. o = createRawStringObject(decoded->ptr, sdslen(decoded->ptr));
  199. decrRefCount(decoded);
  200. dbOverwrite(db,key,o);
  201. }
  202. return o;
  203. }
  204. /* Remove all keys from the database(s) structure. The dbarray argument
  205. * may not be the server main DBs (could be a backup).
  206. *
  207. * The dbnum can be -1 if all the DBs should be emptied, or the specified
  208. * DB index if we want to empty only a single database.
  209. * The function returns the number of keys removed from the database(s). */
  210. long long emptyDbStructure(redisDb *dbarray, int dbnum, int async,
  211. void(callback)(void*))
  212. {
  213. long long removed = 0;
  214. int startdb, enddb;
  215. if (dbnum == -1) {
  216. startdb = 0;
  217. enddb = server.dbnum-1;
  218. } else {
  219. startdb = enddb = dbnum;
  220. }
  221. for (int j = startdb; j <= enddb; j++) {
  222. removed += dictSize(dbarray[j].dict);
  223. if (async) {
  224. emptyDbAsync(&dbarray[j]);
  225. } else {
  226. dictEmpty(dbarray[j].dict,callback);
  227. dictEmpty(dbarray[j].expires,callback);
  228. }
  229. /* Because all keys of database are removed, reset average ttl. */
  230. dbarray[j].avg_ttl = 0;
  231. dbarray[j].expires_cursor = 0;
  232. }
  233. return removed;
  234. }
  235. /* Remove all keys from all the databases in a Redis server.
  236. * If callback is given the function is called from time to time to
  237. * signal that work is in progress.
  238. *
  239. * The dbnum can be -1 if all the DBs should be flushed, or the specified
  240. * DB number if we want to flush only a single Redis database number.
  241. *
  242. * Flags are be EMPTYDB_NO_FLAGS if no special flags are specified or
  243. * EMPTYDB_ASYNC if we want the memory to be freed in a different thread
  244. * and the function to return ASAP.
  245. *
  246. * On success the function returns the number of keys removed from the
  247. * database(s). Otherwise -1 is returned in the specific case the
  248. * DB number is out of range, and errno is set to EINVAL. */
  249. long long emptyDb(int dbnum, int flags, void(callback)(void*)) {
  250. int async = (flags & EMPTYDB_ASYNC);
  251. RedisModuleFlushInfoV1 fi = {REDISMODULE_FLUSHINFO_VERSION,!async,dbnum};
  252. long long removed = 0;
  253. if (dbnum < -1 || dbnum >= server.dbnum) {
  254. errno = EINVAL;
  255. return -1;
  256. }
  257. /* Fire the flushdb modules event. */
  258. moduleFireServerEvent(REDISMODULE_EVENT_FLUSHDB,
  259. REDISMODULE_SUBEVENT_FLUSHDB_START,
  260. &fi);
  261. /* Make sure the WATCHed keys are affected by the FLUSH* commands.
  262. * Note that we need to call the function while the keys are still
  263. * there. */
  264. signalFlushedDb(dbnum, async);
  265. /* Empty redis database structure. */
  266. removed = emptyDbStructure(server.db, dbnum, async, callback);
  267. /* Flush slots to keys map if enable cluster, we can flush entire
  268. * slots to keys map whatever dbnum because only support one DB
  269. * in cluster mode. */
  270. if (server.cluster_enabled) slotToKeyFlush(async);
  271. if (dbnum == -1) flushSlaveKeysWithExpireList();
  272. /* Also fire the end event. Note that this event will fire almost
  273. * immediately after the start event if the flush is asynchronous. */
  274. moduleFireServerEvent(REDISMODULE_EVENT_FLUSHDB,
  275. REDISMODULE_SUBEVENT_FLUSHDB_END,
  276. &fi);
  277. return removed;
  278. }
  279. /* Store a backup of the database for later use, and put an empty one
  280. * instead of it. */
  281. dbBackup *backupDb(void) {
  282. dbBackup *backup = zmalloc(sizeof(dbBackup));
  283. /* Backup main DBs. */
  284. backup->dbarray = zmalloc(sizeof(redisDb)*server.dbnum);
  285. for (int i=0; i<server.dbnum; i++) {
  286. backup->dbarray[i] = server.db[i];
  287. server.db[i].dict = dictCreate(&dbDictType,NULL);
  288. server.db[i].expires = dictCreate(&dbExpiresDictType,NULL);
  289. }
  290. /* Backup cluster slots to keys map if enable cluster. */
  291. if (server.cluster_enabled) {
  292. backup->slots_to_keys = server.cluster->slots_to_keys;
  293. memcpy(backup->slots_keys_count, server.cluster->slots_keys_count,
  294. sizeof(server.cluster->slots_keys_count));
  295. server.cluster->slots_to_keys = raxNew();
  296. memset(server.cluster->slots_keys_count, 0,
  297. sizeof(server.cluster->slots_keys_count));
  298. }
  299. moduleFireServerEvent(REDISMODULE_EVENT_REPL_BACKUP,
  300. REDISMODULE_SUBEVENT_REPL_BACKUP_CREATE,
  301. NULL);
  302. return backup;
  303. }

相关技术文章

点击QQ咨询
开通会员
返回顶部
×
微信扫码支付
微信扫码支付
确定支付下载
请使用微信描二维码支付
×

提示信息

×

选择支付方式

  • 微信支付
  • 支付宝付款
确定支付下载