zk_adaptor.h 7.63 KB
Newer Older
1
2
3
4
5
6
7
8
/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
mahadevkonar's avatar
mahadevkonar committed
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#ifndef ZK_ADAPTOR_H_
#define ZK_ADAPTOR_H_
#include <zookeeper.jute.h>
#ifdef THREADED
#include <pthread.h>
#endif
#include "zookeeper.h"
26
#include "zk_hashtable.h"
27

28
29
30
31
/* predefined xid's values recognized as special by the server */
#define WATCHER_EVENT_XID -1 
#define PING_XID -2
#define AUTH_XID -4
32
#define SET_WATCHES_XID -8
33

34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/* zookeeper state constants */
#define EXPIRED_SESSION_STATE_DEF -112
#define AUTH_FAILED_STATE_DEF -113
#define CONNECTING_STATE_DEF 1
#define ASSOCIATING_STATE_DEF 2
#define CONNECTED_STATE_DEF 3

/* zookeeper event type constants */
#define CREATED_EVENT_DEF 1
#define DELETED_EVENT_DEF 2
#define CHANGED_EVENT_DEF 3
#define CHILD_EVENT_DEF 4
#define SESSION_EVENT_DEF -1
#define NOTWATCHING_EVENT_DEF -2

49
50
51
52
#ifdef __cplusplus
extern "C" {
#endif

mahadevkonar's avatar
mahadevkonar committed
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
struct _buffer_list;
struct _completion_list;

typedef struct _buffer_head {
    struct _buffer_list *head;
    struct _buffer_list *last;
#ifdef THREADED
    pthread_mutex_t lock;
#endif
} buffer_head_t;

typedef struct _completion_head {
    struct _completion_list *head;
    struct _completion_list *last;
#ifdef THREADED
    pthread_cond_t cond;
    pthread_mutex_t lock;
#endif
} completion_head_t;

void lock_buffer_list(buffer_head_t *l);
void unlock_buffer_list(buffer_head_t *l);
void lock_completion_list(completion_head_t *l);
void unlock_completion_list(completion_head_t *l);

struct sync_completion {
    int rc;
    union {
        struct {
            char *str;
            int str_len;
        } str;
        struct Stat stat;
        struct {
            char *buffer;
            int buff_len;
            struct Stat stat;
        } data;
        struct {
            struct ACL_vector acl;
            struct Stat stat;
        } acl;
        struct String_vector strs;
    } u;
    int complete;
#ifdef THREADED
    pthread_cond_t cond;
    pthread_mutex_t lock;
#endif
};

typedef struct _auth_info {
    int state; /* 0=>inactive, >0 => active */
    char* scheme;
    struct buffer auth;
    void_completion_t completion;
    const char* data;
} auth_info;

/**
 * This structure represents a packet being read or written.
 */
typedef struct _buffer_list {
    char *buffer;
    int len; /* This represents the length of sizeof(header) + length of buffer */
    int curr_offset; /* This is the offset into the header followed by offset into the buffer */
    struct _buffer_list *next;
} buffer_list_t;

122
123
124
125
126
127
128
129
130
131
132
/* the size of connect request */
#define HANDSHAKE_REQ_SIZE 44
/* connect request */
struct connect_req {
    int32_t protocolVersion;
    int64_t lastZxidSeen;
    int32_t timeOut;
    int64_t sessionId;
    int32_t passwd_len;
    char passwd[16];
};
mahadevkonar's avatar
mahadevkonar committed
133

134
135
136
137
138
139
140
141
142
/* the connect response */
struct prime_struct {
    int32_t len;
    int32_t protocolVersion;
    int32_t timeOut;
    int64_t sessionId;
    int32_t passwd_len;
    char passwd[16];
}; 
mahadevkonar's avatar
mahadevkonar committed
143

144
145
146
147
148
149
150
151
152
153
154
155
156
#ifdef THREADED
/* this is used by mt_adaptor internally for thread management */
struct adaptor_threads {
     pthread_t io;
     pthread_t completion;
     int threadsToWait;         // barrier
     pthread_cond_t cond;       // barrier's conditional
     pthread_mutex_t lock;      // ... and a lock
     pthread_mutex_t zh_lock;   // critical section lock
     int self_pipe[2];
};
#endif
 
mahadevkonar's avatar
mahadevkonar committed
157
158
159
160
161
162
163
164
165
166
/**
 * This structure represents the connection to zookeeper.
 */

struct _zhandle {
    int fd; /* the descriptor used to talk to zookeeper */
    char *hostname; /* the hostname of zookeeper */
    struct sockaddr *addrs; /* the addresses that correspond to the hostname */
    int addrs_count; /* The number of addresses in the addrs array */
    watcher_fn watcher; /* the registered watcher */
167
168
169
    struct timeval last_recv; /* The time that the last message was received */
    struct timeval last_send; /* The time that the last message was sent */
    struct timeval next_deadline; /* The time of the next deadline */
mahadevkonar's avatar
mahadevkonar committed
170
171
172
173
174
175
176
177
178
179
180
181
182
183
    int recv_timeout; /* The maximum amount of time that can go by without 
     receiving anything from the zookeeper server */
    buffer_list_t *input_buffer; /* the current buffer being read in */
    buffer_head_t to_process; /* The buffers that have been read and are ready to be processed. */
    buffer_head_t to_send; /* The packets queued to send */
    completion_head_t sent_requests; /* The outstanding requests */
    completion_head_t completions_to_process; /* completions that are ready to run */
    int connect_index; /* The index of the address to connect to */
    clientid_t client_id;
    long long last_zxid;
    int outstanding_sync; /* Number of outstanding synchronous requests */
    struct _buffer_list primer_buffer; /* The buffer used for the handshake at the start of a connection */
    struct prime_struct primer_storage; /* the connect response */
    char primer_storage_buffer[40]; /* the true size of primer_storage */
184
    volatile int state;
mahadevkonar's avatar
mahadevkonar committed
185
186
187
188
189
    void *context;
    struct _auth_info auth; /* authentication data */
    /* zookeeper_close is not reentrant because it de-allocates the zhandler. 
     * This guard variable is used to defer the destruction of zhandle till 
     * right before top-level API call returns to the caller */
190
191
    int32_t ref_counter;
    volatile int close_requested;
mahadevkonar's avatar
mahadevkonar committed
192
    void *adaptor_priv;
193
    /* Used for debugging only: non-zero value indicates the time when the zookeeper_process
194
195
     * call returned while there was at least one unprocessed server response 
     * available in the socket recv buffer */
196
    struct timeval socket_readable;
197
    
198
199
    zk_hashtable* active_node_watchers;   
    zk_hashtable* active_exist_watchers;
200
    zk_hashtable* active_child_watchers;
mahadevkonar's avatar
mahadevkonar committed
201
202
203
204
};

int adaptor_init(zhandle_t *zh);
void adaptor_finish(zhandle_t *zh);
205
void adaptor_destroy(zhandle_t *zh);
mahadevkonar's avatar
mahadevkonar committed
206
207
208
209
210
211
212
struct sync_completion *alloc_sync_completion(void);
int wait_sync_completion(struct sync_completion *sc);
void free_sync_completion(struct sync_completion *sc);
void notify_sync_completion(struct sync_completion *sc);
int adaptor_send_queue(zhandle_t *zh, int timeout);
int process_async(int outstanding_sync);
void process_completions(zhandle_t *zh);
Andrew Kornev's avatar
Andrew Kornev committed
213
214
int flush_send_queue(zhandle_t*zh, int timeout);

215
216
217
218
// critical section guards
void enter_critical(zhandle_t* zh);
void leave_critical(zhandle_t* zh);
// zhandle object reference counting
219
220
void api_prolog(zhandle_t* zh);
int api_epilog(zhandle_t *zh, int rc);
221
222
223
224
int32_t get_xid();
// returns the new value of the ref counter
int32_t inc_ref_counter(zhandle_t* zh,int i);

mahadevkonar's avatar
mahadevkonar committed
225
#ifdef THREADED
226
227
// atomic post-increment
int32_t fetch_and_add(volatile int32_t* operand, int incr);
228
229
230
231
// in mt mode process session event asynchronously by the completion thread
#define PROCESS_SESSION_EVENT(zh,newstate) queue_session_event(zh,newstate)
#else
// in single-threaded mode process session event immediately
232
233
//#define PROCESS_SESSION_EVENT(zh,newstate) deliverWatchers(zh,ZOO_SESSION_EVENT,newstate,0)
#define PROCESS_SESSION_EVENT(zh,newstate) queue_session_event(zh,newstate)
234
#endif
mahadevkonar's avatar
mahadevkonar committed
235

236
237
#ifdef __cplusplus
}
mahadevkonar's avatar
mahadevkonar committed
238
#endif
239

mahadevkonar's avatar
mahadevkonar committed
240
241
#endif /*ZK_ADAPTOR_H_*/

242