Skip to content

Commit

Permalink
chore: fix bugs in stream_family (#4237)
Browse files Browse the repository at this point in the history
1. Use transaction time in streams code, similarly to how we do it in other commands.
   Stop using mstime() and delete unused redis code.
2. Check for sequence overflow issue when passing huge sequence ids.
   Add a test.

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange authored Dec 2, 2024
1 parent ada96d9 commit 91aff49
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 459 deletions.
2 changes: 0 additions & 2 deletions src/redis/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,7 @@ void streamIteratorRemoveEntry(streamIterator *si, streamID *current);
void streamIteratorStop(streamIterator *si);
streamCG *streamLookupCG(stream *s, sds groupname);
streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags);
streamConsumer *streamCreateConsumer(streamCG *cg, sds name, robj *key, int dbid, int flags);
streamCG *streamCreateCG(stream *s, const char *name, size_t namelen, streamID *id, long long entries_read);
streamNACK *streamCreateNACK(streamConsumer *consumer);
void streamEncodeID(void *buf, streamID *id);
void streamDecodeID(void *buf, streamID *id);
int streamCompareID(streamID *a, streamID *b);
Expand Down
47 changes: 0 additions & 47 deletions src/redis/t_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -129,21 +129,6 @@ int streamDecrID(streamID *id) {
return ret;
}

/* Generate the next stream item ID given the previous one. If the current
* milliseconds Unix time is greater than the previous one, just use this
* as time part and start with sequence part of zero. Otherwise we use the
* previous time (and never go backward) and increment the sequence. */
void streamNextID(streamID *last_id, streamID *new_id) {
uint64_t ms = mstime();
if (ms > last_id->ms) {
new_id->ms = ms;
new_id->seq = 0;
} else {
*new_id = *last_id;
streamIncrID(new_id);
}
}

/* This is a wrapper function for lpGet() to directly get an integer value
* from the listpack (that may store numbers as a string), converting
* the string if needed.
Expand Down Expand Up @@ -1031,16 +1016,6 @@ long long streamCGLag(stream *s, streamCG *cg) {
* Low level implementation of consumer groups
* ----------------------------------------------------------------------- */

/* Create a NACK entry setting the delivery count to 1 and the delivery
* time to the current time. The NACK consumer will be set to the one
* specified as argument of the function. */
streamNACK *streamCreateNACK(streamConsumer *consumer) {
streamNACK *nack = zmalloc(sizeof(*nack));
nack->delivery_time = mstime();
nack->delivery_count = 1;
nack->consumer = consumer;
return nack;
}

/* Free a NACK entry. */
void streamFreeNACK(streamNACK *na) {
Expand Down Expand Up @@ -1093,35 +1068,13 @@ streamCG *streamLookupCG(stream *s, sds groupname) {
return (cg == raxNotFound) ? NULL : cg;
}

/* Create a consumer with the specified name in the group 'cg' and return.
* If the consumer exists, return NULL. As a side effect, when the consumer
* is successfully created, the key space will be notified and dirty++ unless
* the SCC_NO_NOTIFY or SCC_NO_DIRTIFY flags is specified. */
streamConsumer *streamCreateConsumer(streamCG *cg, sds name, robj *key, int dbid, int flags) {
if (cg == NULL) return NULL;
streamConsumer *consumer = zmalloc(sizeof(*consumer));
int success = raxTryInsert(cg->consumers,(unsigned char*)name,
sdslen(name),consumer,NULL);
if (!success) {
zfree(consumer);
return NULL;
}
consumer->name = sdsdup(name);
consumer->pel = raxNew();
consumer->seen_time = mstime();

return consumer;
}

/* Lookup the consumer with the specified name in the group 'cg'. Its last
* seen time is updated unless the SLC_NO_REFRESH flag is specified. */
streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags) {
if (cg == NULL) return NULL;
int refresh = !(flags & SLC_NO_REFRESH);
streamConsumer *consumer = raxFind(cg->consumers,(unsigned char*)name,
sdslen(name));
if (consumer == raxNotFound) return NULL;
if (refresh) consumer->seen_time = mstime();
return consumer;
}

Expand Down
310 changes: 0 additions & 310 deletions src/redis/util.c
Original file line number Diff line number Diff line change
Expand Up @@ -573,313 +573,3 @@ int ld2string(char *buf, size_t len, long double value, ld2string_mode mode) {
buf[l] = '\0';
return l;
}

#ifdef ROMAN_DISABLE_CODE
/* Get random bytes, attempts to get an initial seed from /dev/urandom and
* the uses a one way hash function in counter mode to generate a random
* stream. However if /dev/urandom is not available, a weaker seed is used.
*
* This function is not thread safe, since the state is global. */
void getRandomBytes(unsigned char *p, size_t len) {
/* Global state. */
static int seed_initialized = 0;
static unsigned char seed[64]; /* 512 bit internal block size. */
static uint64_t counter = 0; /* The counter we hash with the seed. */

if (!seed_initialized) {
/* Initialize a seed and use SHA1 in counter mode, where we hash
* the same seed with a progressive counter. For the goals of this
* function we just need non-colliding strings, there are no
* cryptographic security needs. */
FILE *fp = fopen("/dev/urandom","r");
if (fp == NULL || fread(seed,sizeof(seed),1,fp) != 1) {
/* Revert to a weaker seed, and in this case reseed again
* at every call.*/
for (unsigned int j = 0; j < sizeof(seed); j++) {
struct timeval tv;
gettimeofday(&tv,NULL);
pid_t pid = getpid();
seed[j] = tv.tv_sec ^ tv.tv_usec ^ pid ^ (long)fp;
}
} else {
seed_initialized = 1;
}
if (fp) fclose(fp);
}

while(len) {
/* This implements SHA256-HMAC. */
unsigned char digest[SHA256_BLOCK_SIZE];
unsigned char kxor[64];
unsigned int copylen =
len > SHA256_BLOCK_SIZE ? SHA256_BLOCK_SIZE : len;

/* IKEY: key xored with 0x36. */
memcpy(kxor,seed,sizeof(kxor));
for (unsigned int i = 0; i < sizeof(kxor); i++) kxor[i] ^= 0x36;

/* Obtain HASH(IKEY||MESSAGE). */
SHA256_CTX ctx;
sha256_init(&ctx);
sha256_update(&ctx,kxor,sizeof(kxor));
sha256_update(&ctx,(unsigned char*)&counter,sizeof(counter));
sha256_final(&ctx,digest);

/* OKEY: key xored with 0x5c. */
memcpy(kxor,seed,sizeof(kxor));
for (unsigned int i = 0; i < sizeof(kxor); i++) kxor[i] ^= 0x5C;

/* Obtain HASH(OKEY || HASH(IKEY||MESSAGE)). */
sha256_init(&ctx);
sha256_update(&ctx,kxor,sizeof(kxor));
sha256_update(&ctx,digest,SHA256_BLOCK_SIZE);
sha256_final(&ctx,digest);

/* Increment the counter for the next iteration. */
counter++;

memcpy(p,digest,copylen);
len -= copylen;
p += copylen;
}
}

/* Generate the Redis "Run ID", a SHA1-sized random number that identifies a
* given execution of Redis, so that if you are talking with an instance
* having run_id == A, and you reconnect and it has run_id == B, you can be
* sure that it is either a different instance or it was restarted. */
void getRandomHexChars(char *p, size_t len) {
char *charset = "0123456789abcdef";
size_t j;

getRandomBytes((unsigned char*)p,len);
for (j = 0; j < len; j++) p[j] = charset[p[j] & 0x0F];
}



/* Given the filename, return the absolute path as an SDS string, or NULL
* if it fails for some reason. Note that "filename" may be an absolute path
* already, this will be detected and handled correctly.
*
* The function does not try to normalize everything, but only the obvious
* case of one or more "../" appearing at the start of "filename"
* relative path. */
sds getAbsolutePath(char *filename) {
char cwd[1024];
sds abspath;
sds relpath = sdsnew(filename);

relpath = sdstrim(relpath," \r\n\t");
if (relpath[0] == '/') return relpath; /* Path is already absolute. */

/* If path is relative, join cwd and relative path. */
if (getcwd(cwd,sizeof(cwd)) == NULL) {
sdsfree(relpath);
return NULL;
}
abspath = sdsnew(cwd);
if (sdslen(abspath) && abspath[sdslen(abspath)-1] != '/')
abspath = sdscat(abspath,"/");

/* At this point we have the current path always ending with "/", and
* the trimmed relative path. Try to normalize the obvious case of
* trailing ../ elements at the start of the path.
*
* For every "../" we find in the filename, we remove it and also remove
* the last element of the cwd, unless the current cwd is "/". */
while (sdslen(relpath) >= 3 &&
relpath[0] == '.' && relpath[1] == '.' && relpath[2] == '/')
{
sdsrange(relpath,3,-1);
if (sdslen(abspath) > 1) {
char *p = abspath + sdslen(abspath)-2;
int trimlen = 1;

while(*p != '/') {
p--;
trimlen++;
}
sdsrange(abspath,0,-(trimlen+1));
}
}

/* Finally glue the two parts together. */
abspath = sdscatsds(abspath,relpath);
sdsfree(relpath);
return abspath;
}

#endif

/* Return the UNIX time in microseconds */
long long ustime(void) {
struct timeval tv;
long long ust;

gettimeofday(&tv, NULL);
ust = ((long long)tv.tv_sec)*1000000;
ust += tv.tv_usec;
return ust;
}

#ifdef REDIS_TEST
#include <assert.h>

static void test_string2ll(void) {
char buf[32];
long long v;

/* May not start with +. */
strcpy(buf,"+1");
assert(string2ll(buf,strlen(buf),&v) == 0);

/* Leading space. */
strcpy(buf," 1");
assert(string2ll(buf,strlen(buf),&v) == 0);

/* Trailing space. */
strcpy(buf,"1 ");
assert(string2ll(buf,strlen(buf),&v) == 0);

/* May not start with 0. */
strcpy(buf,"01");
assert(string2ll(buf,strlen(buf),&v) == 0);

strcpy(buf,"-1");
assert(string2ll(buf,strlen(buf),&v) == 1);
assert(v == -1);

strcpy(buf,"0");
assert(string2ll(buf,strlen(buf),&v) == 1);
assert(v == 0);

strcpy(buf,"1");
assert(string2ll(buf,strlen(buf),&v) == 1);
assert(v == 1);

strcpy(buf,"99");
assert(string2ll(buf,strlen(buf),&v) == 1);
assert(v == 99);

strcpy(buf,"-99");
assert(string2ll(buf,strlen(buf),&v) == 1);
assert(v == -99);

strcpy(buf,"-9223372036854775808");
assert(string2ll(buf,strlen(buf),&v) == 1);
assert(v == LLONG_MIN);

strcpy(buf,"-9223372036854775809"); /* overflow */
assert(string2ll(buf,strlen(buf),&v) == 0);

strcpy(buf,"9223372036854775807");
assert(string2ll(buf,strlen(buf),&v) == 1);
assert(v == LLONG_MAX);

strcpy(buf,"9223372036854775808"); /* overflow */
assert(string2ll(buf,strlen(buf),&v) == 0);
}

static void test_string2l(void) {
char buf[32];
long v;

/* May not start with +. */
strcpy(buf,"+1");
assert(string2l(buf,strlen(buf),&v) == 0);

/* May not start with 0. */
strcpy(buf,"01");
assert(string2l(buf,strlen(buf),&v) == 0);

strcpy(buf,"-1");
assert(string2l(buf,strlen(buf),&v) == 1);
assert(v == -1);

strcpy(buf,"0");
assert(string2l(buf,strlen(buf),&v) == 1);
assert(v == 0);

strcpy(buf,"1");
assert(string2l(buf,strlen(buf),&v) == 1);
assert(v == 1);

strcpy(buf,"99");
assert(string2l(buf,strlen(buf),&v) == 1);
assert(v == 99);

strcpy(buf,"-99");
assert(string2l(buf,strlen(buf),&v) == 1);
assert(v == -99);

#if LONG_MAX != LLONG_MAX
strcpy(buf,"-2147483648");
assert(string2l(buf,strlen(buf),&v) == 1);
assert(v == LONG_MIN);

strcpy(buf,"-2147483649"); /* overflow */
assert(string2l(buf,strlen(buf),&v) == 0);

strcpy(buf,"2147483647");
assert(string2l(buf,strlen(buf),&v) == 1);
assert(v == LONG_MAX);

strcpy(buf,"2147483648"); /* overflow */
assert(string2l(buf,strlen(buf),&v) == 0);
#endif
}

static void test_ll2string(void) {
char buf[32];
long long v;
int sz;

v = 0;
sz = ll2string(buf, sizeof buf, v);
assert(sz == 1);
assert(!strcmp(buf, "0"));

v = -1;
sz = ll2string(buf, sizeof buf, v);
assert(sz == 2);
assert(!strcmp(buf, "-1"));

v = 99;
sz = ll2string(buf, sizeof buf, v);
assert(sz == 2);
assert(!strcmp(buf, "99"));

v = -99;
sz = ll2string(buf, sizeof buf, v);
assert(sz == 3);
assert(!strcmp(buf, "-99"));

v = -2147483648;
sz = ll2string(buf, sizeof buf, v);
assert(sz == 11);
assert(!strcmp(buf, "-2147483648"));

v = LLONG_MIN;
sz = ll2string(buf, sizeof buf, v);
assert(sz == 20);
assert(!strcmp(buf, "-9223372036854775808"));

v = LLONG_MAX;
sz = ll2string(buf, sizeof buf, v);
assert(sz == 19);
assert(!strcmp(buf, "9223372036854775807"));
}

#define UNUSED(x) (void)(x)
int utilTest(int argc, char **argv, int accurate) {
UNUSED(argc);
UNUSED(argv);
UNUSED(accurate);

test_string2ll();
test_string2l();
test_ll2string();
return 0;
}
#endif
Loading

0 comments on commit 91aff49

Please sign in to comment.