关键词搜索

源码搜索 ×
×

漫话Redis源码之三十六

发布2021-12-12浏览1498次

详情内容

这里主要是Pubsub相关的API, 大致看看就行,知道他的功能就基本差不多了:

  1. #include "server.h"
  2. int clientSubscriptionsCount(client *c);
  3. /*-----------------------------------------------------------------------------
  4. * Pubsub client replies API
  5. *----------------------------------------------------------------------------*/
  6. /* Send a pubsub message of type "message" to the client.
  7. * Normally 'msg' is a Redis object containing the string to send as
  8. * message. However if the caller sets 'msg' as NULL, it will be able
  9. * to send a special message (for instance an Array type) by using the
  10. * addReply*() API family. */
  11. void addReplyPubsubMessage(client *c, robj *channel, robj *msg) {
  12. if (c->resp == 2)
  13. addReply(c,shared.mbulkhdr[3]);
  14. else
  15. addReplyPushLen(c,3);
  16. addReply(c,shared.messagebulk);
  17. addReplyBulk(c,channel);
  18. if (msg) addReplyBulk(c,msg);
  19. }
  20. /* Send a pubsub message of type "pmessage" to the client. The difference
  21. * with the "message" type delivered by addReplyPubsubMessage() is that
  22. * this message format also includes the pattern that matched the message. */
  23. void addReplyPubsubPatMessage(client *c, robj *pat, robj *channel, robj *msg) {
  24. if (c->resp == 2)
  25. addReply(c,shared.mbulkhdr[4]);
  26. else
  27. addReplyPushLen(c,4);
  28. addReply(c,shared.pmessagebulk);
  29. addReplyBulk(c,pat);
  30. addReplyBulk(c,channel);
  31. addReplyBulk(c,msg);
  32. }
  33. /* Send the pubsub subscription notification to the client. */
  34. void addReplyPubsubSubscribed(client *c, robj *channel) {
  35. if (c->resp == 2)
  36. addReply(c,shared.mbulkhdr[3]);
  37. else
  38. addReplyPushLen(c,3);
  39. addReply(c,shared.subscribebulk);
  40. addReplyBulk(c,channel);
  41. addReplyLongLong(c,clientSubscriptionsCount(c));
  42. }
  43. /* Send the pubsub unsubscription notification to the client.
  44. * Channel can be NULL: this is useful when the client sends a mass
  45. * unsubscribe command but there are no channels to unsubscribe from: we
  46. * still send a notification. */
  47. void addReplyPubsubUnsubscribed(client *c, robj *channel) {
  48. if (c->resp == 2)
  49. addReply(c,shared.mbulkhdr[3]);
  50. else
  51. addReplyPushLen(c,3);
  52. addReply(c,shared.unsubscribebulk);
  53. if (channel)
  54. addReplyBulk(c,channel);
  55. else
  56. addReplyNull(c);
  57. addReplyLongLong(c,clientSubscriptionsCount(c));
  58. }
  59. /* Send the pubsub pattern subscription notification to the client. */
  60. void addReplyPubsubPatSubscribed(client *c, robj *pattern) {
  61. if (c->resp == 2)
  62. addReply(c,shared.mbulkhdr[3]);
  63. else
  64. addReplyPushLen(c,3);
  65. addReply(c,shared.psubscribebulk);
  66. addReplyBulk(c,pattern);
  67. addReplyLongLong(c,clientSubscriptionsCount(c));
  68. }
  69. /* Send the pubsub pattern unsubscription notification to the client.
  70. * Pattern can be NULL: this is useful when the client sends a mass
  71. * punsubscribe command but there are no pattern to unsubscribe from: we
  72. * still send a notification. */
  73. void addReplyPubsubPatUnsubscribed(client *c, robj *pattern) {
  74. if (c->resp == 2)
  75. addReply(c,shared.mbulkhdr[3]);
  76. else
  77. addReplyPushLen(c,3);
  78. addReply(c,shared.punsubscribebulk);
  79. if (pattern)
  80. addReplyBulk(c,pattern);
  81. else
  82. addReplyNull(c);
  83. addReplyLongLong(c,clientSubscriptionsCount(c));
  84. }
  85. /*-----------------------------------------------------------------------------
  86. * Pubsub low level API
  87. *----------------------------------------------------------------------------*/
  88. /* Return the number of channels + patterns a client is subscribed to. */
  89. int clientSubscriptionsCount(client *c) {
  90. return dictSize(c->pubsub_channels)+
  91. listLength(c->pubsub_patterns);
  92. }
  93. /* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
  94. * 0 if the client was already subscribed to that channel. */
  95. int pubsubSubscribeChannel(client *c, robj *channel) {
  96. dictEntry *de;
  97. list *clients = NULL;
  98. int retval = 0;
  99. /* Add the channel to the client -> channels hash table */
  100. if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
  101. retval = 1;
  102. incrRefCount(channel);
  103. /* Add the client to the channel -> list of clients hash table */
  104. de = dictFind(server.pubsub_channels,channel);
  105. if (de == NULL) {
  106. clients = listCreate();
  107. dictAdd(server.pubsub_channels,channel,clients);
  108. incrRefCount(channel);
  109. } else {
  110. clients = dictGetVal(de);
  111. }
  112. listAddNodeTail(clients,c);
  113. }
  114. /* Notify the client */
  115. addReplyPubsubSubscribed(c,channel);
  116. return retval;
  117. }
  118. /* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
  119. * 0 if the client was not subscribed to the specified channel. */
  120. int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) {
  121. dictEntry *de;
  122. list *clients;
  123. listNode *ln;
  124. int retval = 0;
  125. /* Remove the channel from the client -> channels hash table */
  126. incrRefCount(channel); /* channel may be just a pointer to the same object
  127. we have in the hash tables. Protect it... */
  128. if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {
  129. retval = 1;
  130. /* Remove the client from the channel -> clients list hash table */
  131. de = dictFind(server.pubsub_channels,channel);
  132. serverAssertWithInfo(c,NULL,de != NULL);
  133. clients = dictGetVal(de);
  134. ln = listSearchKey(clients,c);
  135. serverAssertWithInfo(c,NULL,ln != NULL);
  136. listDelNode(clients,ln);
  137. if (listLength(clients) == 0) {
  138. /* Free the list and associated hash entry at all if this was
  139. * the latest client, so that it will be possible to abuse
  140. * Redis PUBSUB creating millions of channels. */
  141. dictDelete(server.pubsub_channels,channel);
  142. }
  143. }
  144. /* Notify the client */
  145. if (notify) addReplyPubsubUnsubscribed(c,channel);
  146. decrRefCount(channel); /* it is finally safe to release it */
  147. return retval;
  148. }
  149. /* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the client was already subscribed to that pattern. */
  150. int pubsubSubscribePattern(client *c, robj *pattern) {
  151. dictEntry *de;
  152. list *clients;
  153. int retval = 0;
  154. if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
  155. retval = 1;
  156. listAddNodeTail(c->pubsub_patterns,pattern);
  157. incrRefCount(pattern);
  158. /* Add the client to the pattern -> list of clients hash table */
  159. de = dictFind(server.pubsub_patterns,pattern);
  160. if (de == NULL) {
  161. clients = listCreate();
  162. dictAdd(server.pubsub_patterns,pattern,clients);
  163. incrRefCount(pattern);
  164. } else {
  165. clients = dictGetVal(de);
  166. }
  167. listAddNodeTail(clients,c);
  168. }
  169. /* Notify the client */
  170. addReplyPubsubPatSubscribed(c,pattern);
  171. return retval;
  172. }
  173. /* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
  174. * 0 if the client was not subscribed to the specified channel. */
  175. int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) {
  176. dictEntry *de;
  177. list *clients;
  178. listNode *ln;
  179. int retval = 0;
  180. incrRefCount(pattern); /* Protect the object. May be the same we remove */
  181. if ((ln = listSearchKey(c->pubsub_patterns,pattern)) != NULL) {
  182. retval = 1;
  183. listDelNode(c->pubsub_patterns,ln);
  184. /* Remove the client from the pattern -> clients list hash table */
  185. de = dictFind(server.pubsub_patterns,pattern);
  186. serverAssertWithInfo(c,NULL,de != NULL);
  187. clients = dictGetVal(de);
  188. ln = listSearchKey(clients,c);
  189. serverAssertWithInfo(c,NULL,ln != NULL);
  190. listDelNode(clients,ln);
  191. if (listLength(clients) == 0) {
  192. /* Free the list and associated hash entry at all if this was
  193. * the latest client. */
  194. dictDelete(server.pubsub_patterns,pattern);
  195. }
  196. }
  197. /* Notify the client */
  198. if (notify) addReplyPubsubPatUnsubscribed(c,pattern);
  199. decrRefCount(pattern);
  200. return retval;
  201. }
  202. /* Unsubscribe from all the channels. Return the number of channels the
  203. * client was subscribed to. */
  204. int pubsubUnsubscribeAllChannels(client *c, int notify) {
  205. int count = 0;
  206. if (dictSize(c->pubsub_channels) > 0) {
  207. dictIterator *di = dictGetSafeIterator(c->pubsub_channels);
  208. dictEntry *de;
  209. while((de = dictNext(di)) != NULL) {
  210. robj *channel = dictGetKey(de);
  211. count += pubsubUnsubscribeChannel(c,channel,notify);
  212. }
  213. dictReleaseIterator(di);
  214. }
  215. /* We were subscribed to nothing? Still reply to the client. */
  216. if (notify && count == 0) addReplyPubsubUnsubscribed(c,NULL);
  217. return count;
  218. }
  219. /* Unsubscribe from all the patterns. Return the number of patterns the
  220. * client was subscribed from. */
  221. int pubsubUnsubscribeAllPatterns(client *c, int notify) {
  222. listNode *ln;
  223. listIter li;
  224. int count = 0;
  225. listRewind(c->pubsub_patterns,&li);
  226. while ((ln = listNext(&li)) != NULL) {
  227. robj *pattern = ln->value;
  228. count += pubsubUnsubscribePattern(c,pattern,notify);
  229. }
  230. if (notify && count == 0) addReplyPubsubPatUnsubscribed(c,NULL);
  231. return count;
  232. }
  233. /* Publish a message */
  234. int pubsubPublishMessage(robj *channel, robj *message) {
  235. int receivers = 0;
  236. dictEntry *de;
  237. dictIterator *di;
  238. listNode *ln;
  239. listIter li;
  240. /* Send to clients listening for that channel */
  241. de = dictFind(server.pubsub_channels,channel);
  242. if (de) {
  243. list *list = dictGetVal(de);
  244. listNode *ln;
  245. listIter li;
  246. listRewind(list,&li);
  247. while ((ln = listNext(&li)) != NULL) {
  248. client *c = ln->value;
  249. addReplyPubsubMessage(c,channel,message);
  250. receivers++;
  251. }
  252. }
  253. /* Send to clients listening to matching channels */
  254. di = dictGetIterator(server.pubsub_patterns);
  255. if (di) {
  256. channel = getDecodedObject(channel);
  257. while((de = dictNext(di)) != NULL) {
  258. robj *pattern = dictGetKey(de);
  259. list *clients = dictGetVal(de);
  260. if (!stringmatchlen((char*)pattern->ptr,
  261. sdslen(pattern->ptr),
  262. (char*)channel->ptr,
  263. sdslen(channel->ptr),0)) continue;
  264. listRewind(clients,&li);
  265. while ((ln = listNext(&li)) != NULL) {
  266. client *c = listNodeValue(ln);
  267. addReplyPubsubPatMessage(c,pattern,channel,message);
  268. receivers++;
  269. }
  270. }
  271. decrRefCount(channel);
  272. dictReleaseIterator(di);
  273. }
  274. return receivers;
  275. }
  276. /*-----------------------------------------------------------------------------
  277. * Pubsub commands implementation
  278. *----------------------------------------------------------------------------*/
  279. /* SUBSCRIBE channel [channel ...] */
  280. void subscribeCommand(client *c) {
  281. int j;
  282. if ((c->flags & CLIENT_DENY_BLOCKING) && !(c->flags & CLIENT_MULTI)) {
  283. /**
  284. * A client that has CLIENT_DENY_BLOCKING flag on
  285. * expect a reply per command and so can not execute subscribe.
  286. *
  287. * Notice that we have a special treatment for multi because of
  288. * backword compatibility
  289. */
  290. addReplyError(c, "SUBSCRIBE isn't allowed for a DENY BLOCKING client");
  291. return;
  292. }
  293. for (j = 1; j < c->argc; j++)
  294. pubsubSubscribeChannel(c,c->argv[j]);
  295. c->flags |= CLIENT_PUBSUB;
  296. }
  297. /* UNSUBSCRIBE [channel [channel ...]] */
  298. void unsubscribeCommand(client *c) {
  299. if (c->argc == 1) {
  300. pubsubUnsubscribeAllChannels(c,1);
  301. } else {
  302. int j;
  303. for (j = 1; j < c->argc; j++)
  304. pubsubUnsubscribeChannel(c,c->argv[j],1);
  305. }
  306. if (clientSubscriptionsCount(c) == 0) c->flags &= ~CLIENT_PUBSUB;
  307. }
  308. /* PSUBSCRIBE pattern [pattern ...] */
  309. void psubscribeCommand(client *c) {
  310. int j;
  311. if ((c->flags & CLIENT_DENY_BLOCKING) && !(c->flags & CLIENT_MULTI)) {
  312. /**
  313. * A client that has CLIENT_DENY_BLOCKING flag on
  314. * expect a reply per command and so can not execute subscribe.
  315. *
  316. * Notice that we have a special treatment for multi because of
  317. * backword compatibility
  318. */
  319. addReplyError(c, "PSUBSCRIBE isn't allowed for a DENY BLOCKING client");
  320. return;
  321. }
  322. for (j = 1; j < c->argc; j++)
  323. pubsubSubscribePattern(c,c->argv[j]);
  324. c->flags |= CLIENT_PUBSUB;
  325. }
  326. /* PUNSUBSCRIBE [pattern [pattern ...]] */
  327. void punsubscribeCommand(client *c) {
  328. if (c->argc == 1) {
  329. pubsubUnsubscribeAllPatterns(c,1);
  330. } else {
  331. int j;
  332. for (j = 1; j < c->argc; j++)
  333. pubsubUnsubscribePattern(c,c->argv[j],1);
  334. }
  335. if (clientSubscriptionsCount(c) == 0) c->flags &= ~CLIENT_PUBSUB;
  336. }
  337. /* PUBLISH <channel> <message> */
  338. void publishCommand(client *c) {
  339. int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
  340. if (server.cluster_enabled)
  341. clusterPropagatePublish(c->argv[1],c->argv[2]);
  342. else
  343. forceCommandPropagation(c,PROPAGATE_REPL);
  344. addReplyLongLong(c,receivers);
  345. }
  346. /* PUBSUB command for Pub/Sub introspection. */
  347. void pubsubCommand(client *c) {
  348. if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) {
  349. const char *help[] = {
  350. "CHANNELS [<pattern>]",
  351. " Return the currently active channels matching a <pattern> (default: '*').",
  352. "NUMPAT",
  353. " Return number of subscriptions to patterns.",
  354. "NUMSUB [<channel> ...]",
  355. " Return the number of subscribers for the specified channels, excluding",
  356. " pattern subscriptions(default: no channels).",
  357. NULL
  358. };
  359. addReplyHelp(c, help);
  360. } else if (!strcasecmp(c->argv[1]->ptr,"channels") &&
  361. (c->argc == 2 || c->argc == 3))
  362. {
  363. /* PUBSUB CHANNELS [<pattern>] */
  364. sds pat = (c->argc == 2) ? NULL : c->argv[2]->ptr;
  365. dictIterator *di = dictGetIterator(server.pubsub_channels);
  366. dictEntry *de;
  367. long mblen = 0;
  368. void *replylen;
  369. replylen = addReplyDeferredLen(c);
  370. while((de = dictNext(di)) != NULL) {
  371. robj *cobj = dictGetKey(de);
  372. sds channel = cobj->ptr;
  373. if (!pat || stringmatchlen(pat, sdslen(pat),
  374. channel, sdslen(channel),0))
  375. {
  376. addReplyBulk(c,cobj);
  377. mblen++;
  378. }
  379. }
  380. dictReleaseIterator(di);
  381. setDeferredArrayLen(c,replylen,mblen);
  382. } else if (!strcasecmp(c->argv[1]->ptr,"numsub") && c->argc >= 2) {
  383. /* PUBSUB NUMSUB [Channel_1 ... Channel_N] */
  384. int j;
  385. addReplyArrayLen(c,(c->argc-2)*2);
  386. for (j = 2; j < c->argc; j++) {
  387. list *l = dictFetchValue(server.pubsub_channels,c->argv[j]);
  388. addReplyBulk(c,c->argv[j]);
  389. addReplyLongLong(c,l ? listLength(l) : 0);
  390. }
  391. } else if (!strcasecmp(c->argv[1]->ptr,"numpat") && c->argc == 2) {
  392. /* PUBSUB NUMPAT */
  393. addReplyLongLong(c,dictSize(server.pubsub_patterns));
  394. } else {
  395. addReplySubcommandSyntaxError(c);
  396. }
  397. }

相关技术文章

点击QQ咨询
开通会员
返回顶部
×
微信扫码支付
微信扫码支付
确定支付下载
请使用微信描二维码支付
×

提示信息

×

选择支付方式

  • 微信支付
  • 支付宝付款
确定支付下载