/*
 * cachedb/redis.c - cachedb redis module
 *
 * Copyright (c) 2018, NLnet Labs. All rights reserved.
 *
 * This software is open source.
 * 
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 * 
 * Redistributions of source code must retain the above copyright notice,
 * this list of conditions and the following disclaimer.
 * 
 * Redistributions in binary form must reproduce the above copyright notice,
 * this list of conditions and the following disclaimer in the documentation
 * and/or other materials provided with the distribution.
 * 
 * Neither the name of the NLNET LABS nor the names of its contributors may
 * be used to endorse or promote products derived from this software without
 * specific prior written permission.
 * 
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
 * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
 * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */

/**
 * \file
 *
 * This file contains a module that uses the redis database to cache
 * dns responses.
 */

#include "config.h"
#ifdef USE_CACHEDB
#include "cachedb/redis.h"
#include "cachedb/cachedb.h"
#include "util/alloc.h"
#include "util/config_file.h"
#include "util/locks.h"
#include "util/timeval_func.h"
#include "sldns/sbuffer.h"

#ifdef USE_REDIS
#include "hiredis/hiredis.h"

struct redis_moddata {
	/* thread-specific redis contexts */
	redisContext** ctxs;
	redisContext** replica_ctxs;
	/* number of ctx entries */
	int numctxs;
	/* server's IP address or host name */
	const char* server_host;
	const char* replica_server_host;
	/* server's TCP port */
	int server_port;
	int replica_server_port;
	/* server's unix path, or "", NULL if unused */
	const char* server_path;
	const char* replica_server_path;
	/* server's AUTH password, or "", NULL if unused */
	const char* server_password;
	const char* replica_server_password;
	/* timeout for commands */
	struct timeval command_timeout;
	struct timeval replica_command_timeout;
	/* timeout for connection setup */
	struct timeval connect_timeout;
	struct timeval replica_connect_timeout;
	/* the reconnect interval time. */
	struct timeval reconnect_interval;
	struct timeval replica_reconnect_interval;
	/* reconnect attempts, 0 if connected, counts up failed reconnects. */
	int reconnect_attempts;
	int replica_reconnect_attempts;
	/* Lock on reconnect_wait time. */
	lock_basic_type wait_lock;
	lock_basic_type replica_wait_lock;
	/* reconnect wait time, wait until it has passed before reconnect. */
	struct timeval reconnect_wait;
	struct timeval replica_reconnect_wait;
	/* the redis logical database to use */
	int logical_db;
	int replica_logical_db;
	/* if the SET with EX command is supported */
	int set_with_ex_available;
};

/** The limit on the number of redis connect attempts. After failure if
 * the number is exceeded, the reconnects are throttled by the wait time. */
#define REDIS_RECONNECT_ATTEMPT_LIMIT 3

static redisReply* redis_command(struct module_env*, struct cachedb_env*,
	const char*, const uint8_t*, size_t, int);

static void
moddata_clean(struct redis_moddata** moddata) {
	if(!moddata || !*moddata)
		return;
	if((*moddata)->ctxs) {
		int i;
		for(i = 0; i < (*moddata)->numctxs; i++) {
			if((*moddata)->ctxs[i])
				redisFree((*moddata)->ctxs[i]);
		}
		free((*moddata)->ctxs);
	}
	if((*moddata)->replica_ctxs) {
		int i;
		for(i = 0; i < (*moddata)->numctxs; i++) {
			if((*moddata)->replica_ctxs[i])
				redisFree((*moddata)->replica_ctxs[i]);
		}
		free((*moddata)->replica_ctxs);
	}
	lock_basic_destroy(&(*moddata)->wait_lock);
	lock_basic_destroy(&(*moddata)->replica_wait_lock);
	free(*moddata);
	*moddata = NULL;
}

static redisContext*
redis_connect(const char* host, int port, const char* path,
	const char* password, int logical_db,
	const struct timeval connect_timeout,
	const struct timeval command_timeout,
	const struct timeval* reconnect_interval,
	int* reconnect_attempts,
	struct timeval* reconnect_wait,
	lock_basic_type* wait_lock,
	struct timeval* now_tv,
	const char* infostr)
{
	struct timeval now_val;
	redisContext* ctx;

	/* See if the redis server is down, and reconnect has to wait. */
	if(*reconnect_attempts > REDIS_RECONNECT_ATTEMPT_LIMIT) {
		/* Acquire lock to look at timeval, the integer has atomic
		 * integrity. */
		struct timeval wait_tv;
		if(now_tv) {
			now_val = *now_tv;
		} else {
			if(gettimeofday(&now_val, NULL) < 0)
				log_err("redis: gettimeofday: %s",
					strerror(errno));
		}
		lock_basic_lock(wait_lock);
		wait_tv = *reconnect_wait;
		lock_basic_unlock(wait_lock);
		if(timeval_smaller(&now_val, &wait_tv)) {
			verbose(VERB_ALGO, "redis %sdown, reconnect wait",
				infostr);
			return NULL;
		}
	}

	if(path && path[0]!=0) {
		ctx = redisConnectUnixWithTimeout(path, connect_timeout);
	} else {
		ctx = redisConnectWithTimeout(host, port, connect_timeout);
	}
	if(!ctx || ctx->err) {
		const char *errstr = "out of memory";
		if(ctx)
			errstr = ctx->errstr;
		log_err("failed to connect to redis %sserver: %s", infostr, errstr);
		goto fail;
	}
	if(redisSetTimeout(ctx, command_timeout) != REDIS_OK) {
		log_err("failed to set redis %stimeout, %s", infostr, ctx->errstr);
		goto fail;
	}
	if(password && password[0]!=0) {
		redisReply* rep;
		rep = redisCommand(ctx, "AUTH %s", password);
		if(!rep || rep->type == REDIS_REPLY_ERROR) {
			log_err("failed to authenticate %swith password", infostr);
			freeReplyObject(rep);
			goto fail;
		}
		freeReplyObject(rep);
	}
	if(logical_db > 0) {
		redisReply* rep;
		rep = redisCommand(ctx, "SELECT %d", logical_db);
		if(!rep || rep->type == REDIS_REPLY_ERROR) {
			log_err("failed %sto set logical database (%d)",
				infostr, logical_db);
			freeReplyObject(rep);
			goto fail;
		}
		freeReplyObject(rep);
	}
	*reconnect_attempts = 0;
	if(verbosity >= VERB_OPS) {
		char port_str[6+1];
		port_str[0] = ' ';
		(void)snprintf(port_str+1, sizeof(port_str)-1, "%d", port);
		verbose(VERB_OPS, "Connection to Redis %sestablished (%s%s)",
			infostr,
			path&&path[0]!=0?path:host,
			path&&path[0]!=0?"":port_str);
	}
	return ctx;

fail:
	if(ctx)
		redisFree(ctx);
	(*reconnect_attempts)++;
	if(*reconnect_attempts > REDIS_RECONNECT_ATTEMPT_LIMIT) {
		/* Wait for the reconnect interval before trying again. */
		struct timeval tv;
		if(now_tv) {
			now_val = *now_tv;
		} else {
			if(gettimeofday(&now_val, NULL) < 0)
				log_err("redis: gettimeofday: %s",
					strerror(errno));
		}
		tv = now_val;
		timeval_add(&tv, reconnect_interval);
		lock_basic_lock(wait_lock);
		*reconnect_wait = tv;
		lock_basic_unlock(wait_lock);
		verbose(VERB_ALGO, "redis %sreconnect wait until %d.%6.6d",
			infostr, (int)tv.tv_sec, (int)tv.tv_usec);
	}
	return NULL;
}

static void
set_timeout(struct timeval* timeout, int value, int explicit_value)
{
	int v = explicit_value != 0 ? explicit_value : value;
	timeout->tv_sec = v / 1000;
	timeout->tv_usec = (v % 1000) * 1000;
}

static int
redis_init(struct module_env* env, struct cachedb_env* cachedb_env)
{
	int i;
	struct redis_moddata* moddata = NULL;

	verbose(VERB_OPS, "Redis initialization");

	moddata = calloc(1, sizeof(struct redis_moddata));
	if(!moddata) {
		log_err("out of memory");
		goto fail;
	}
	lock_basic_init(&moddata->wait_lock);
	lock_protect(&moddata->wait_lock, &moddata->reconnect_wait,
		sizeof(moddata->reconnect_wait));
	lock_basic_init(&moddata->replica_wait_lock);
	lock_protect(&moddata->replica_wait_lock,
		&moddata->replica_reconnect_wait,
		sizeof(moddata->replica_reconnect_wait));
	moddata->numctxs = env->cfg->num_threads;
	/* note: server_host and similar string configuration options are
	 * shallow references to configured strings; we don't have to free them
	 * in this module. */
	moddata->server_host = env->cfg->redis_server_host;
	moddata->replica_server_host = env->cfg->redis_replica_server_host;

	moddata->server_port = env->cfg->redis_server_port;
	moddata->replica_server_port = env->cfg->redis_replica_server_port;

	moddata->server_path = env->cfg->redis_server_path;
	moddata->replica_server_path = env->cfg->redis_replica_server_path;

	moddata->server_password = env->cfg->redis_server_password;
	moddata->replica_server_password = env->cfg->redis_replica_server_password;

	set_timeout(&moddata->command_timeout,
		env->cfg->redis_timeout,
		env->cfg->redis_command_timeout);
	set_timeout(&moddata->replica_command_timeout,
		env->cfg->redis_replica_timeout,
		env->cfg->redis_replica_command_timeout);
	set_timeout(&moddata->connect_timeout,
		env->cfg->redis_timeout,
		env->cfg->redis_connect_timeout);
	set_timeout(&moddata->replica_connect_timeout,
		env->cfg->redis_replica_timeout,
		env->cfg->redis_replica_connect_timeout);
	set_timeout(&moddata->reconnect_interval, 1000, 0);
	set_timeout(&moddata->replica_reconnect_interval, 1000, 0);

	moddata->logical_db = env->cfg->redis_logical_db;
	moddata->replica_logical_db = env->cfg->redis_replica_logical_db;

	moddata->ctxs = calloc(env->cfg->num_threads, sizeof(redisContext*));
	if(!moddata->ctxs) {
		log_err("out of memory");
		goto fail;
	}
	if((moddata->replica_server_host && moddata->replica_server_host[0]!=0)
		|| (moddata->replica_server_path && moddata->replica_server_path[0]!=0)) {
		/* There is a replica configured, allocate ctxs */
		moddata->replica_ctxs = calloc(env->cfg->num_threads, sizeof(redisContext*));
		if(!moddata->replica_ctxs) {
			log_err("out of memory");
			goto fail;
		}
	}
	for(i = 0; i < moddata->numctxs; i++) {
		redisContext* ctx = redis_connect(
			moddata->server_host,
			moddata->server_port,
			moddata->server_path,
			moddata->server_password,
			moddata->logical_db,
			moddata->connect_timeout,
			moddata->command_timeout,
			&moddata->reconnect_interval,
			&moddata->reconnect_attempts,
			&moddata->reconnect_wait,
			&moddata->wait_lock,
			env->now_tv,
			"");
		if(!ctx) {
			log_err("redis_init: failed to init redis "
				"(for thread %d)", i);
			/* And continue, the context can be established
			 * later, just like after a disconnect. */
		}
		moddata->ctxs[i] = ctx;
	}
	if(moddata->replica_ctxs) {
		for(i = 0; i < moddata->numctxs; i++) {
			redisContext* ctx = redis_connect(
				moddata->replica_server_host,
				moddata->replica_server_port,
				moddata->replica_server_path,
				moddata->replica_server_password,
				moddata->replica_logical_db,
				moddata->replica_connect_timeout,
				moddata->replica_command_timeout,
				&moddata->replica_reconnect_interval,
				&moddata->replica_reconnect_attempts,
				&moddata->replica_reconnect_wait,
				&moddata->replica_wait_lock,
				env->now_tv,
				"replica ");
			if(!ctx) {
				log_err("redis_init: failed to init redis "
					"replica (for thread %d)", i);
				/* And continue, the context can be established
				* later, just like after a disconnect. */
			}
			moddata->replica_ctxs[i] = ctx;
		}
	}
	cachedb_env->backend_data = moddata;
	if(env->cfg->redis_expire_records &&
		moddata->ctxs[env->alloc->thread_num] != NULL) {
		redisReply* rep = NULL;
		int redis_reply_type = 0;
		/** check if set with ex command is supported */
		rep = redis_command(env, cachedb_env,
			"SET __UNBOUND_REDIS_CHECK__ none EX 1", NULL, 0, 1);
		if(!rep) {
			/** init failed, no response from redis server*/
			goto set_with_ex_fail;
		}
		redis_reply_type = rep->type;
		freeReplyObject(rep);
		switch(redis_reply_type) {
		case REDIS_REPLY_STATUS:
			break;
		default:
			/** init failed, set_with_ex command not supported */
			goto set_with_ex_fail;
		}
		moddata->set_with_ex_available = 1;
	}
	return 1;

set_with_ex_fail:
	log_err("redis_init: failure during redis_init, the "
		"redis-expire-records option requires the SET with EX command "
		"(redis >= 2.6.12)");
	return 1;
fail:
	moddata_clean(&moddata);
	return 0;
}

static void
redis_deinit(struct module_env* env, struct cachedb_env* cachedb_env)
{
	struct redis_moddata* moddata = (struct redis_moddata*)
		cachedb_env->backend_data;
	(void)env;

	verbose(VERB_OPS, "Redis deinitialization");
	moddata_clean(&moddata);
}

/*
 * Send a redis command and get a reply.  Unified so that it can be used for
 * both SET and GET.  If 'data' is non-NULL the command is supposed to be
 * SET and GET otherwise, but the implementation of this function is agnostic
 * about the semantics (except for logging): 'command', 'data', and 'data_len'
 * are opaquely passed to redisCommand().
 * This function first checks whether a connection with a redis server has
 * been established; if not it tries to set up a new one.
 * It returns redisReply returned from redisCommand() or NULL if some low
 * level error happens.  The caller is responsible to check the return value,
 * if it's non-NULL, it has to free it with freeReplyObject().
 */
static redisReply*
redis_command(struct module_env* env, struct cachedb_env* cachedb_env,
	const char* command, const uint8_t* data, size_t data_len, int write)
{
	redisContext* ctx, **ctx_selector;
	redisReply* rep;
	struct redis_moddata* d = (struct redis_moddata*)
		cachedb_env->backend_data;

	/* We assume env->alloc->thread_num is a unique ID for each thread
	 * in [0, num-of-threads).  We could treat it as an error condition
	 * if the assumption didn't hold, but it seems to be a fundamental
	 * assumption throughout the unbound architecture, so we simply assert
	 * it. */
	log_assert(env->alloc->thread_num < d->numctxs);

	ctx_selector = !write && d->replica_ctxs
		?d->replica_ctxs
		:d->ctxs;
	ctx = ctx_selector[env->alloc->thread_num];

	/* If we've not established a connection to the server or we've closed
	 * it on a failure, try to re-establish a new one.   Failures will be
	 * logged in redis_connect(). */
	if(!ctx) {
		if(!write && d->replica_ctxs) {
			ctx = redis_connect(
				d->replica_server_host,
				d->replica_server_port,
				d->replica_server_path,
				d->replica_server_password,
				d->replica_logical_db,
				d->replica_connect_timeout,
				d->replica_command_timeout,
				&d->replica_reconnect_interval,
				&d->replica_reconnect_attempts,
				&d->replica_reconnect_wait,
				&d->replica_wait_lock,
				env->now_tv,
				"replica ");
		} else {
			ctx = redis_connect(
				d->server_host,
				d->server_port,
				d->server_path,
				d->server_password,
				d->logical_db,
				d->connect_timeout,
				d->command_timeout,
				&d->reconnect_interval,
				&d->reconnect_attempts,
				&d->reconnect_wait,
				&d->wait_lock,
				env->now_tv,
				"");
		}
		ctx_selector[env->alloc->thread_num] = ctx;
	}
	if(!ctx) return NULL;

	/* Send the command and get a reply, synchronously. */
	rep = (redisReply*)redisCommand(ctx, command, data, data_len);
	if(!rep) {
		/* Once an error as a NULL-reply is returned the context cannot
		 * be reused and we'll need to set up a new connection. */
		log_err("redis_command: failed to receive a reply, "
			"closing connection: %s", ctx->errstr);
		redisFree(ctx);
		ctx_selector[env->alloc->thread_num] = NULL;
		return NULL;
	}

	/* Check error in reply to unify logging in that case.
	 * The caller may perform context-dependent checks and logging. */
	if(rep->type == REDIS_REPLY_ERROR)
		log_err("redis: %s resulted in an error: %s",
			data ? "set" : "get", rep->str);

	return rep;
}

static int
redis_lookup(struct module_env* env, struct cachedb_env* cachedb_env,
	char* key, struct sldns_buffer* result_buffer)
{
	redisReply* rep;
	/* Supported commands:
	 * - "GET " + key
	 */
#define REDIS_LOOKUP_MAX_BUF_LEN			\
	4				/* "GET " */	\
	+(CACHEDB_HASHSIZE/8)*2		/* key hash */	\
	+ 1				/* \0 */
	char cmdbuf[REDIS_LOOKUP_MAX_BUF_LEN];
	int n;
	int ret = 0;

	verbose(VERB_ALGO, "redis_lookup of %s", key);

	n = snprintf(cmdbuf, sizeof(cmdbuf), "GET %s", key);
	if(n < 0 || n >= (int)sizeof(cmdbuf)) {
		log_err("redis_lookup: unexpected failure to build command");
		return 0;
	}

	rep = redis_command(env, cachedb_env, cmdbuf, NULL, 0, 0);
	if(!rep)
		return 0;
	switch(rep->type) {
	case REDIS_REPLY_NIL:
		verbose(VERB_ALGO, "redis_lookup: no data cached");
		break;
	case REDIS_REPLY_STRING:
		verbose(VERB_ALGO, "redis_lookup found %d bytes",
			(int)rep->len);
		if((size_t)rep->len > sldns_buffer_capacity(result_buffer)) {
			log_err("redis_lookup: replied data too long: %lu",
				(size_t)rep->len);
			break;
		}
		sldns_buffer_clear(result_buffer);
		sldns_buffer_write(result_buffer, rep->str, rep->len);
		sldns_buffer_flip(result_buffer);
		ret = 1;
		break;
	case REDIS_REPLY_ERROR:
		break;		/* already logged */
	default:
		log_err("redis_lookup: unexpected type of reply for (%d)",
			rep->type);
		break;
	}
	freeReplyObject(rep);
	return ret;
}

static void
redis_store(struct module_env* env, struct cachedb_env* cachedb_env,
	char* key, uint8_t* data, size_t data_len, time_t ttl)
{
	redisReply* rep;
	int n;
	struct redis_moddata* moddata = (struct redis_moddata*)
		cachedb_env->backend_data;
	int set_ttl = (moddata->set_with_ex_available &&
		env->cfg->redis_expire_records &&
		(!env->cfg->serve_expired || env->cfg->serve_expired_ttl > 0));
	/* Supported commands:
	 * - "SET " + key + " %b"
	 * - "SET " + key + " %b EX " + ttl
	 *   older redis 2.0.0 was "SETEX " + key + " " + ttl + " %b"
	 * - "EXPIRE " + key + " 0"
	 */
#define REDIS_STORE_MAX_BUF_LEN				\
	7			/* "EXPIRE " */		\
	+(CACHEDB_HASHSIZE/8)*2	/* key hash */		\
	+ 7			/* " %b EX " */		\
	+ 20			/* ttl (uint64_t) */	\
	+ 1			/* \0 */
	char cmdbuf[REDIS_STORE_MAX_BUF_LEN];

	if (!set_ttl) {
		verbose(VERB_ALGO, "redis_store %s (%d bytes)", key, (int)data_len);
		/* build command to set to a binary safe string */
		n = snprintf(cmdbuf, sizeof(cmdbuf), "SET %s %%b", key);
	} else if(ttl == 0) {
		/* use the EXPIRE command, SET with EX 0 is an invalid time. */
		/* Replies with REDIS_REPLY_INTEGER of 1. */
		verbose(VERB_ALGO, "redis_store expire %s (%d bytes)",
			key, (int)data_len);
		n = snprintf(cmdbuf, sizeof(cmdbuf), "EXPIRE %s 0", key);
		data = NULL;
		data_len = 0;
	} else {
		/* add expired ttl time to redis ttl to avoid premature eviction of key */
		ttl += env->cfg->serve_expired_ttl;
		verbose(VERB_ALGO, "redis_store %s (%d bytes) with ttl %u",
			key, (int)data_len, (unsigned)(uint32_t)ttl);
		/* build command to set to a binary safe string */
		n = snprintf(cmdbuf, sizeof(cmdbuf), "SET %s %%b EX %u", key,
			(unsigned)(uint32_t)ttl);
	}


	if(n < 0 || n >= (int)sizeof(cmdbuf)) {
		log_err("redis_store: unexpected failure to build command");
		return;
	}

	rep = redis_command(env, cachedb_env, cmdbuf, data, data_len, 1);
	if(rep) {
		verbose(VERB_ALGO, "redis_store set completed");
		if(rep->type != REDIS_REPLY_STATUS &&
			rep->type != REDIS_REPLY_ERROR &&
			rep->type != REDIS_REPLY_INTEGER) {
			log_err("redis_store: unexpected type of reply (%d)",
				rep->type);
		}
		freeReplyObject(rep);
	}
}

struct cachedb_backend redis_backend = { "redis",
	redis_init, redis_deinit, redis_lookup, redis_store
};
#endif	/* USE_REDIS */
#endif /* USE_CACHEDB */
