Skip to content

Commit

Permalink
support NOMKSTREAM option in xadd command (redis#7910)
Browse files Browse the repository at this point in the history
introduces a NOMKSTREAM option for xadd command, this would be useful for some
use cases when we do not want to create new stream by default:

XADD key [MAXLEN [~|=] <count>] [NOMKSTREAM] <ID or *> [field value] [field value]
  • Loading branch information
hwware authored Oct 18, 2020
1 parent f6010e1 commit f328194
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 3 deletions.
13 changes: 10 additions & 3 deletions src/t_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -1130,10 +1130,14 @@ size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start

/* Look the stream at 'key' and return the corresponding stream object.
* The function creates a key setting it to an empty stream if needed. */
robj *streamTypeLookupWriteOrCreate(client *c, robj *key) {
robj *streamTypeLookupWriteOrCreate(client *c, robj *key, int no_create) {
robj *o = lookupKeyWrite(c->db,key);
if (checkType(c,o,OBJ_STREAM)) return NULL;
if (o == NULL) {
if (no_create) {
addReplyNull(c);
return NULL;
}
o = createStreamObject();
dbAdd(c->db,key,o);
}
Expand Down Expand Up @@ -1214,14 +1218,15 @@ void streamRewriteApproxMaxlen(client *c, stream *s, int maxlen_arg_idx) {
decrRefCount(maxlen_obj);
}

/* XADD key [MAXLEN [~|=] <count>] <ID or *> [field value] [field value] ... */
/* XADD key [MAXLEN [~|=] <count>] [NOMKSTREAM] <ID or *> [field value] [field value] ... */
void xaddCommand(client *c) {
streamID id;
int id_given = 0; /* Was an ID different than "*" specified? */
long long maxlen = -1; /* If left to -1 no trimming is performed. */
int approx_maxlen = 0; /* If 1 only delete whole radix tree nodes, so
the maximum length is not applied verbatim. */
int maxlen_arg_idx = 0; /* Index of the count in MAXLEN, for rewriting. */
int no_mkstream = 0; /* if set to 1 do not create new stream */

/* Parse options. */
int i = 2; /* This is the first argument position where we could
Expand Down Expand Up @@ -1252,6 +1257,8 @@ void xaddCommand(client *c) {
}
i++;
maxlen_arg_idx = i;
} else if (!strcasecmp(opt,"nomkstream")) {
no_mkstream = 1;
} else {
/* If we are here is a syntax error or a valid ID. */
if (streamParseStrictIDOrReply(c,c->argv[i],&id,0) != C_OK) return;
Expand All @@ -1278,7 +1285,7 @@ void xaddCommand(client *c) {
/* Lookup the stream at key. */
robj *o;
stream *s;
if ((o = streamTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return;
if ((o = streamTypeLookupWriteOrCreate(c,c->argv[1],no_mkstream)) == NULL) return;
s = o->ptr;

/* Return ASAP if the stream has reached the last possible ID */
Expand Down
12 changes: 12 additions & 0 deletions tests/unit/type/stream.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,18 @@ start_server {
}
}

test {XADD with NOMKSTREAM option} {
r DEL mystream
assert_equal "" [r XADD mystream NOMKSTREAM * item 1 value a]
assert_equal 0 [r EXISTS mystream]
r XADD mystream * item 1 value a
r XADD mystream NOMKSTREAM * item 2 value b
assert_equal 2 [r XLEN mystream]
set items [r XRANGE mystream - +]
assert_equal [lindex $items 0 1] {item 1 value a}
assert_equal [lindex $items 1 1] {item 2 value b}
}

test {XADD mass insertion and XLEN} {
r DEL mystream
r multi
Expand Down

0 comments on commit f328194

Please sign in to comment.