Commit dc78c696 authored by Flavio Paiva Junqueira's avatar Flavio Paiva Junqueira
Browse files

ZOOKEEPER-827. enable r/o mode in C client library (rgs via fpj)



git-svn-id: https://svn.apache.org/repos/asf/zookeeper/trunk@1608620 13f79535-47bb-0310-9956-ffa450edef68
parent 66bd0584
......@@ -32,6 +32,8 @@ NEW FEATURES:
ZOOKEEPER-1928. add configurable throttling to the number of snapshots
concurrently sent by a leader (Edward Carter via fpj)
ZOOKEEPER-827. enable r/o mode in C client library (rgs via fpj)
BUGFIXES:
ZOOKEEPER-1900. NullPointerException in truncate (Camille Fournier)
......
......@@ -93,7 +93,9 @@ TEST_SOURCES = \
tests/TestWatchers.cc \
tests/TestClient.cc \
tests/ZooKeeperQuorumServer.cc \
tests/ZooKeeperQuorumServer.h
tests/ZooKeeperQuorumServer.h \
tests/TestReadOnlyClient.cc \
$(NULL)
SYMBOL_WRAPPERS=$(shell cat ${srcdir}/tests/wrappers.opt)
......
......@@ -85,6 +85,10 @@ against zookeeper_st library):
$ cli_mt zookeeper_host:9876
To start a client with read-only mode enabled, use the -r flag:
$ cli_mt -r zookeeper_host:9876
This is a client application that gives you a shell for executing
simple zookeeper commands. Once successfully started and connected to
the server it displays a shell prompt.
......
......@@ -121,8 +121,10 @@ enum ZOO_ERRORS {
ZCLOSING = -116, /*!< ZooKeeper is closing */
ZNOTHING = -117, /*!< (not error) no server responses to process */
ZSESSIONMOVED = -118, /*!<session moved to another server, so operation is ignored */
ZNOTREADONLY = -119, /*!< state-changing request is passed to read-only server */
ZEPHEMERALONLOCALSESSION = -120, /*!< Attempt to create ephemeral node on a local session */
ZNOWATCHER = -121 /*!< The watcher couldn't be found */
ZNOWATCHER = -121, /*!< The watcher couldn't be found */
ZRWSERVERFOUND = -122 /*!< r/w server found while in r/o mode */
};
#ifdef __cplusplus
......@@ -147,6 +149,9 @@ extern ZOOAPI const int ZOO_PERM_ALL;
#define ZOO_CONFIG_NODE "/zookeeper/config"
/* flags for zookeeper_init{,2} */
#define ZOO_READONLY 1
/** This Id represents anyone. */
extern ZOOAPI struct Id ZOO_ANYONE_ID_UNSAFE;
/** This Id is only usable to set ACLs. It will get substituted with the
......@@ -196,6 +201,7 @@ extern ZOOAPI const int ZOO_AUTH_FAILED_STATE;
extern ZOOAPI const int ZOO_CONNECTING_STATE;
extern ZOOAPI const int ZOO_ASSOCIATING_STATE;
extern ZOOAPI const int ZOO_CONNECTED_STATE;
extern ZOOAPI const int ZOO_READONLY_STATE;
extern ZOOAPI const int ZOO_NOTCONNECTED_STATE;
// @}
......
......@@ -194,21 +194,45 @@ int addrvec_atend(const addrvec_t *avec)
void addrvec_next(addrvec_t *avec, struct sockaddr_storage *next)
{
int index;
// If we're at the end of the list, then reset index to start
if (addrvec_atend(avec))
{
if (addrvec_atend(avec)) {
avec->next = 0;
}
if (!addrvec_hasnext(avec))
{
if (!addrvec_hasnext(avec)) {
if (next) {
memset(next, 0, sizeof(*next));
}
return;
}
index = avec->next++;
if (next) {
*next = avec->data[index];
}
}
void addrvec_peek(addrvec_t *avec, struct sockaddr_storage *next)
{
int index = avec->next;
if (avec->count == 0) {
memset(next, 0, sizeof(*next));
return;
}
*next = avec->data[avec->next++];
if (addrvec_atend(avec)) {
index = 0;
}
*next = avec->data[index];
}
int addrvec_eq(const addrvec_t *a1, const addrvec_t *a2)
{
uint32_t i = 0;
......
......@@ -111,13 +111,20 @@ int addrvec_hasnext(const addrvec_t *avec);
int addrvec_atend(const addrvec_t *avec);
/**
* Get the next entry from the addrvec and update the associated index.
* Get the next entry from the addrvec and update the associated index.
*
* If next is NULL, the index will still be updated.
*
* If the current index points at (or after) the last element in the vector then
* it will loop back around and start at the beginning of the list.
*/
void addrvec_next(addrvec_t *avec, struct sockaddr_storage *next);
/**
* Retrieves the next entry from the addrvec but doesn't update the index.
*/
void addrvec_peek(addrvec_t *avec, struct sockaddr_storage *next);
/**
* Compare two addrvecs for equality.
*
......
......@@ -76,6 +76,8 @@ static const char* state2String(int state){
return "ASSOCIATING_STATE";
if (state == ZOO_CONNECTED_STATE)
return "CONNECTED_STATE";
if (state == ZOO_READONLY_STATE)
return "READONLY_STATE";
if (state == ZOO_EXPIRED_SESSION_STATE)
return "EXPIRED_SESSION_STATE";
if (state == ZOO_AUTH_FAILED_STATE)
......@@ -661,6 +663,7 @@ int main(int argc, char **argv) {
char appId[64];
#endif
int bufoff = 0;
int flags, i;
FILE *fh;
if (argc < 2) {
......@@ -690,6 +693,15 @@ int main(int argc, char **argv) {
}
}
}
flags = 0;
for (i = 1; i < argc; ++i) {
if (strcmp("-r", argv[i]) == 0) {
flags = ZOO_READONLY;
break;
}
}
#ifdef YCA
strcpy(appId,"yahoo.example.yca_test");
cert = yca_get_cert_once(appId);
......@@ -708,7 +720,7 @@ int main(int argc, char **argv) {
zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
zoo_deterministic_conn_order(1); // enable deterministic order
hostPort = argv[1];
zh = zookeeper_init(hostPort, watcher, 30000, &myid, 0, 0);
zh = zookeeper_init(hostPort, watcher, 30000, &myid, NULL, flags);
if (!zh) {
return errno;
}
......
......@@ -75,8 +75,8 @@ void waitCounter(){
}
void listener(zhandle_t *zzh, int type, int state, const char *path,void* ctx) {
if(type == ZOO_SESSION_EVENT){
if(state == ZOO_CONNECTED_STATE){
if (type == ZOO_SESSION_EVENT) {
if (state == ZOO_CONNECTED_STATE || state == ZOO_READONLY_STATE) {
pthread_mutex_lock(&lock);
pthread_cond_broadcast(&cond);
pthread_mutex_unlock(&lock);
......
......@@ -42,6 +42,7 @@
#define CONNECTING_STATE_DEF 1
#define ASSOCIATING_STATE_DEF 2
#define CONNECTED_STATE_DEF 3
#define READONLY_STATE_DEF 5
#define NOTCONNECTED_STATE_DEF 999
/* zookeeper event type constants */
......@@ -131,7 +132,7 @@ typedef struct _buffer_list {
} buffer_list_t;
/* the size of connect request */
#define HANDSHAKE_REQ_SIZE 44
#define HANDSHAKE_REQ_SIZE 45
/* connect request */
struct connect_req {
int32_t protocolVersion;
......@@ -140,6 +141,7 @@ struct connect_req {
int64_t sessionId;
int32_t passwd_len;
char passwd[16];
char readOnly;
};
/* the connect response */
......@@ -150,6 +152,7 @@ struct prime_struct {
int64_t sessionId;
int32_t passwd_len;
char passwd[16];
char readOnly;
};
#ifdef THREADED
......@@ -217,6 +220,10 @@ struct _zhandle {
completion_head_t completions_to_process; // completions that are ready to run
int outstanding_sync; // number of outstanding synchronous requests
/* read-only mode specific fields */
struct timeval last_ping_rw; /* The last time we checked server for being r/w */
int ping_rw_timeout; /* The time that can go by before checking next server */
// State info
volatile int state; // Current zookeeper state
void *context; // client-side provided context
......@@ -228,7 +235,7 @@ struct _zhandle {
// Primer storage
struct _buffer_list primer_buffer; // The buffer used for the handshake at the start of a connection
struct prime_struct primer_storage; // the connect response
char primer_storage_buffer[40]; // the true size of primer_storage
char primer_storage_buffer[41]; // the true size of primer_storage
/* zookeeper_close is not reentrant because it de-allocates the zhandler.
* This guard variable is used to defer the destruction of zhandle till
......@@ -249,6 +256,11 @@ struct _zhandle {
/** used for chroot path at the client side **/
char *chroot;
/** Indicates if this client is allowed to go to r/o mode */
char allow_read_only;
/** Indicates if we connected to a majority server before */
char seen_rw_server_before;
};
......
This diff is collapsed.
......@@ -22,140 +22,14 @@
#include <signal.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/select.h>
#include "CollectionUtil.h"
#include "ThreadingUtil.h"
using namespace Util;
#include "Vector.h"
using namespace std;
#include <cstring>
#include <list>
#include <zookeeper.h>
#include "Util.h"
#ifdef THREADED
static void yield(zhandle_t *zh, int i)
{
sleep(i);
}
#else
static void yield(zhandle_t *zh, int seconds)
{
int fd;
int interest;
int events;
struct timeval tv;
int rc;
time_t expires = time(0) + seconds;
time_t timeLeft = seconds;
fd_set rfds, wfds, efds;
FD_ZERO(&rfds);
FD_ZERO(&wfds);
FD_ZERO(&efds);
while(timeLeft >= 0) {
zookeeper_interest(zh, &fd, &interest, &tv);
if (fd != -1) {
if (interest&ZOOKEEPER_READ) {
FD_SET(fd, &rfds);
} else {
FD_CLR(fd, &rfds);
}
if (interest&ZOOKEEPER_WRITE) {
FD_SET(fd, &wfds);
} else {
FD_CLR(fd, &wfds);
}
} else {
fd = 0;
}
FD_SET(0, &rfds);
if (tv.tv_sec > timeLeft) {
tv.tv_sec = timeLeft;
}
rc = select(fd+1, &rfds, &wfds, &efds, &tv);
timeLeft = expires - time(0);
events = 0;
if (FD_ISSET(fd, &rfds)) {
events |= ZOOKEEPER_READ;
}
if (FD_ISSET(fd, &wfds)) {
events |= ZOOKEEPER_WRITE;
}
zookeeper_process(zh, events);
}
}
#endif
typedef struct evt {
string path;
int type;
} evt_t;
typedef struct watchCtx {
private:
list<evt_t> events;
public:
bool connected;
zhandle_t *zh;
Mutex mutex;
watchCtx() {
connected = false;
zh = 0;
}
~watchCtx() {
if (zh) {
zookeeper_close(zh);
zh = 0;
}
}
evt_t getEvent() {
evt_t evt;
mutex.acquire();
CPPUNIT_ASSERT( events.size() > 0);
evt = events.front();
events.pop_front();
mutex.release();
return evt;
}
int countEvents() {
int count;
mutex.acquire();
count = events.size();
mutex.release();
return count;
}
void putEvent(evt_t evt) {
mutex.acquire();
events.push_back(evt);
mutex.release();
}
bool waitForConnected(zhandle_t *zh) {
time_t expires = time(0) + 10;
while(!connected && time(0) < expires) {
yield(zh, 1);
}
return connected;
}
bool waitForDisconnected(zhandle_t *zh) {
time_t expires = time(0) + 15;
while(connected && time(0) < expires) {
yield(zh, 1);
}
return !connected;
}
} watchctx_t;
#include "WatchUtil.h"
class Zookeeper_clientretry : public CPPUNIT_NS::TestFixture
{
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <cppunit/extensions/HelperMacros.h>
#include "CppAssertHelper.h"
#include <sys/socket.h>
#include <unistd.h>
#include <zookeeper.h>
#include "Util.h"
#include "WatchUtil.h"
class Zookeeper_readOnly : public CPPUNIT_NS::TestFixture {
CPPUNIT_TEST_SUITE(Zookeeper_readOnly);
#ifdef THREADED
CPPUNIT_TEST(testReadOnly);
#endif
CPPUNIT_TEST_SUITE_END();
static void watcher(zhandle_t* zh, int type, int state,
const char* path, void* v) {
watchctx_t *ctx = (watchctx_t*)v;
if (state==ZOO_CONNECTED_STATE || state==ZOO_READONLY_STATE) {
ctx->connected = true;
} else {
ctx->connected = false;
}
if (type != ZOO_SESSION_EVENT) {
evt_t evt;
evt.path = path;
evt.type = type;
ctx->putEvent(evt);
}
}
FILE *logfile;
public:
Zookeeper_readOnly() {
logfile = openlogfile("Zookeeper_readOnly");
}
~Zookeeper_readOnly() {
if (logfile) {
fflush(logfile);
fclose(logfile);
logfile = 0;
}
}
void setUp() {
zoo_set_log_stream(logfile);
}
void startReadOnly() {
char cmd[1024];
sprintf(cmd, "%s startReadOnly", ZKSERVER_CMD);
CPPUNIT_ASSERT(system(cmd) == 0);
}
void stopPeer() {
char cmd[1024];
sprintf(cmd, "%s stop", ZKSERVER_CMD);
CPPUNIT_ASSERT(system(cmd) == 0);
}
void testReadOnly() {
startReadOnly();
watchctx_t watch;
zhandle_t* zh = zookeeper_init("localhost:22181",
watcher,
10000,
NULL,
&watch,
ZOO_READONLY);
watch.zh = zh;
CPPUNIT_ASSERT(zh != 0);
sleep(1);
int len = 1024;
char buf[len];
int res = zoo_get(zh, "/", 0, buf, &len, 0);
CPPUNIT_ASSERT_EQUAL((int)ZOK, res);
char path[1024];
res = zoo_create(zh, "/test", buf, 10, &ZOO_OPEN_ACL_UNSAFE, 0, path,
512);
CPPUNIT_ASSERT_EQUAL((int)ZNOTREADONLY, res);
stopPeer();
}
};
CPPUNIT_TEST_SUITE_REGISTRATION(Zookeeper_readOnly);
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef WATCH_UTIL_H_
#define WATCH_UTIL_H_
#include <sys/select.h>
#include <cstring>
#include <list>
using namespace std;
#include "CollectionUtil.h"
#include "ThreadingUtil.h"
using namespace Util;
#ifdef THREADED
static void yield(zhandle_t *zh, int i)
{
sleep(i);
}
#else
static void yield(zhandle_t *zh, int seconds)
{
int fd;
int interest;
int events;
struct timeval tv;
int rc;
time_t expires = time(0) + seconds;
time_t timeLeft = seconds;
fd_set rfds, wfds, efds;
FD_ZERO(&rfds);
FD_ZERO(&wfds);
FD_ZERO(&efds);
while(timeLeft >= 0) {
zookeeper_interest(zh, &fd, &interest, &tv);
if (fd != -1) {
if (interest&ZOOKEEPER_READ) {
FD_SET(fd, &rfds);
} else {
FD_CLR(fd, &rfds);
}
if (interest&ZOOKEEPER_WRITE) {
FD_SET(fd, &wfds);
} else {
FD_CLR(fd, &wfds);
}
} else {
fd = 0;
}
FD_SET(0, &rfds);
if (tv.tv_sec > timeLeft) {
tv.tv_sec = timeLeft;
}
rc = select(fd+1, &rfds, &wfds, &efds, &tv);
timeLeft = expires - time(0);
events = 0;
if (FD_ISSET(fd, &rfds)) {
events |= ZOOKEEPER_READ;
}
if (FD_ISSET(fd, &wfds)) {
events |= ZOOKEEPER_WRITE;
}
zookeeper_process(zh, events);
}
}
#endif
typedef struct evt {
string path;
int type;
} evt_t;
typedef struct watchCtx {
private:
list<evt_t> events;
watchCtx(const watchCtx&);
watchCtx& operator=(const watchCtx&);
public:
bool connected;
zhandle_t *zh;
Mutex mutex;
watchCtx() {
connected = false;
zh = 0;
}
~watchCtx() {
if (zh) {
zookeeper_close(zh);
zh = 0;
}
}
evt_t getEvent() {
evt_t evt;
mutex.acquire();