generated from pykit3/tmpl
-
Notifications
You must be signed in to change notification settings - Fork 1
/
zkconf.py
232 lines (166 loc) · 6.08 KB
/
zkconf.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
#!/usr/bin/env python
# coding: utf-8
from kazoo.client import KazooClient
from k3confloader import conf
import k3utfjson
from . import zkutil
class ZKConf(object):
"""
It is a config wrapper, provding several method for accessing config.
If one of the config field is not spedified when initializing this class, it
falls back to using `config.zk_<field>`.
Classes in this module relies on this class to access config.
E.g.:
"""
# config.zk_journal_dir = "my_dir/"
# c = k3zkutil.ZKConf(hosts="127.0.0.1:9999")
# c.hosts() # "127.0.0.1:9999" # specified
# c.journal_dir() # "my_dir/" # by default using `config.zk_<field>`
"""
zk dir:
<prefix>/record/<key>
<prefix>/lock/<key>
<prefix>/tx/
alive/0000000001
journal/0000000001
state/0000000001
journal_id_set
txid_maker
<prefix>/seq/<key>
alive: Contains `ephemeral` node each of which represents a transaction.
Its modifications is used.
journal: Contains journal transaction modifications. Each of them is a complete transaction.
journal_id_set: Committed and Purged journal id.
"""
def __init__(self,
hosts=None,
tx_dir=None,
seq_dir=None,
record_dir=None,
lock_dir=None,
node_id=None,
auth=None,
acl=None
):
self.conf = {
'hosts': hosts,
'tx_dir': tx_dir,
'seq_dir': seq_dir,
'record_dir': record_dir,
'lock_dir': lock_dir,
'node_id': node_id,
'auth': auth,
'acl': acl,
}
def hosts(self): return self._get_config('hosts')
def record_dir(self): return self._get_config('record_dir')
def lock_dir(self): return self._get_config('lock_dir')
def node_id(self): return self._get_config('node_id')
def auth(self): return self._get_config('auth')
def acl(self): return self._get_config('acl')
def lock(self, key=''): return ''.join([self.lock_dir(), _dump_txid(key)])
def record(self, key=''): return ''.join([self.record_dir(), key])
def seq_dir(self): return self._get_config('seq_dir')
def seq(self, key=''): return ''.join([self.seq_dir(), key])
def tx_dir(self): return self._get_config('tx_dir')
def tx_alive(self, txid=''): return ''.join([self.tx_dir(), 'alive/', _dump_txid(txid)])
def tx_state(self, txid=''): return ''.join([self.tx_dir(), 'state/', _dump_txid(txid)])
def journal(self, journal_id=''): return ''.join([self.tx_dir(), 'journal/', _dump_journal_id(journal_id)])
def journal_id_set(self): return ''.join([self.tx_dir(), 'journal_id_set'])
def txid_maker(self): return ''.join([self.tx_dir(), 'txid_maker'])
def kazoo_digest_acl(self):
a = self.acl()
if a is None:
return a
return zkutil.make_kazoo_digest_acl(a)
def kazoo_auth(self):
a = self.auth()
if a is None:
return None
return a[0], a[1] + ':' + a[2]
def _get_config(self, name):
if self.conf[name] is None:
return getattr(conf, 'zk_' + name)
else:
return self.conf[name]
class KazooClientExt(KazooClient):
def __init__(self, zkclient, json=True):
if isinstance(zkclient, KazooClientExt):
self._zk = zkclient._zk
self._zkconf = ZKConf(**zkclient._zkconf.conf)
elif isinstance(zkclient, KazooClient):
self._zk = zkclient
self._zkconf = None
else:
raise TypeError('invalid zkclient type: expect KazooClient or KazooClientExt')
self._json = json
def __getattr__(self, n):
return getattr(self._zk, n)
def _jl(self, v):
if self._json:
return k3utfjson.load(v)
else:
return v
def _jd(self, v):
if self._json:
return k3utfjson.dump(v)
else:
return v
def _encode(self, v):
if isinstance(v, str):
return v.encode("utf-8")
return v
def get(self, path, watch=None):
val, zstat = self._zk.get(path, watch=watch)
return self._jl(val), zstat
def set(self, path, value, version=-1):
value = self._jd(value)
value= self._encode(value)
return self._zk.set(path, value, version=version)
def create(self, path, value=b"", acl=None, ephemeral=False,
sequence=False, makepath=False):
value = self._jd(value)
return self._zk.create(path, value=value,
acl=acl,
ephemeral=ephemeral,
sequence=sequence,
makepath=makepath)
def _dump_txid(txid):
if isinstance(txid, str):
return txid
elif isinstance(txid, int):
return '%010d' % txid
else:
raise TypeError('invalid type txid: ' + repr(txid))
def _dump_journal_id(journal_id):
if isinstance(journal_id, int):
return 'journal_id%010d' % journal_id
elif isinstance(journal_id, str):
return journal_id
else:
raise TypeError('invalid type journal id: ' + repr(journal_id))
def kazoo_client_ext(zk, json=True):
"""
return zkclient created or original zkclient, and if zkclient is created
"""
zkconf = None
if isinstance(zk, str):
zkconf = ZKConf(hosts=zk)
if isinstance(zk, dict):
zkconf = ZKConf(**zk)
if isinstance(zk, ZKConf):
zkconf = zk
if zkconf is None:
zkconf = ZKConf()
owning = False
else:
zk = KazooClient(zkconf.hosts())
owning = True
zkclient = KazooClientExt(zk, json=json)
if zkclient._zkconf is None:
zkclient._zkconf = zkconf
zkclient.start()
auth = zkconf.kazoo_auth()
if auth is not None:
zkclient.add_auth(*auth)
return zkclient, owning