root/Cheetah/Utils/memcache.py

Revision f17b49bd2a9cb5c693518283252cdbca4d04136b, 18.4 kB (checked in by Jason Michalski <armooo@armooo.net>, 2 years ago)

Lets try the import again

  • Property mode set to 100644
Line 
1 #!/usr/bin/env python
2
3 """
4 client module for memcached (memory cache daemon)
5
6 Overview
7 ========
8
9 See U{the MemCached homepage<http://www.danga.com/memcached>} for more about memcached.
10
11 Usage summary
12 =============
13
14 This should give you a feel for how this module operates::
15
16     import memcache
17     mc = memcache.Client(['127.0.0.1:11211'], debug=0)
18
19     mc.set("some_key", "Some value")
20     value = mc.get("some_key")
21
22     mc.set("another_key", 3)
23     mc.delete("another_key")
24     
25     mc.set("key", "1")   # note that the key used for incr/decr must be a string.
26     mc.incr("key")
27     mc.decr("key")
28
29 The standard way to use memcache with a database is like this::
30
31     key = derive_key(obj)
32     obj = mc.get(key)
33     if not obj:
34         obj = backend_api.get(...)
35         mc.set(key, obj)
36
37     # we now have obj, and future passes through this code
38     # will use the object from the cache.
39
40 Detailed Documentation
41 ======================
42
43 More detailed documentation is available in the L{Client} class.
44 """
45
46 import sys
47 import socket
48 import time
49 import types
50 try:
51     import cPickle as pickle
52 except ImportError:
53     import pickle
54
55 __author__    = "Evan Martin <martine@danga.com>"
56 __version__   = "1.2_tummy5"
57 __copyright__ = "Copyright (C) 2003 Danga Interactive"
58 __license__   = "Python"
59
60 class _Error(Exception):
61     pass
62
63 class Client:
64     """
65     Object representing a pool of memcache servers.
66     
67     See L{memcache} for an overview.
68
69     In all cases where a key is used, the key can be either:
70         1. A simple hashable type (string, integer, etc.).
71         2. A tuple of C{(hashvalue, key)}.  This is useful if you want to avoid
72         making this module calculate a hash value.  You may prefer, for
73         example, to keep all of a given user's objects on the same memcache
74         server, so you could use the user's unique id as the hash value.
75
76     @group Setup: __init__, set_servers, forget_dead_hosts, disconnect_all, debuglog
77     @group Insertion: set, add, replace
78     @group Retrieval: get, get_multi
79     @group Integers: incr, decr
80     @group Removal: delete
81     @sort: __init__, set_servers, forget_dead_hosts, disconnect_all, debuglog,\
82            set, add, replace, get, get_multi, incr, decr, delete
83     """
84
85     _usePickle = False
86     _FLAG_PICKLE  = 1<<0
87     _FLAG_INTEGER = 1<<1
88     _FLAG_LONG    = 1<<2
89
90     _SERVER_RETRIES = 10  # how many times to try finding a free server.
91
92     def __init__(self, servers, debug=0):
93         """
94         Create a new Client object with the given list of servers.
95
96         @param servers: C{servers} is passed to L{set_servers}.
97         @param debug: whether to display error messages when a server can't be
98         contacted.
99         """
100         self.set_servers(servers)
101         self.debug = debug
102         self.stats = {}
103    
104     def set_servers(self, servers):
105         """
106         Set the pool of servers used by this client.
107
108         @param servers: an array of servers.
109         Servers can be passed in two forms:
110             1. Strings of the form C{"host:port"}, which implies a default weight of 1.
111             2. Tuples of the form C{("host:port", weight)}, where C{weight} is
112             an integer weight value.
113         """
114         self.servers = [_Host(s, self.debuglog) for s in servers]
115         self._init_buckets()
116
117     def get_stats(self):
118         '''Get statistics from each of the servers. 
119
120         @return: A list of tuples ( server_identifier, stats_dictionary ).
121             The dictionary contains a number of name/value pairs specifying
122             the name of the status field and the string value associated with
123             it.  The values are not converted from strings.
124         '''
125         data = []
126         for s in self.servers:
127             if not s.connect(): continue
128             name = '%s:%s (%s)' % ( s.ip, s.port, s.weight )
129             s.send_cmd('stats')
130             serverData = {}
131             data.append(( name, serverData ))
132             readline = s.readline
133             while 1:
134                 line = readline()
135                 if not line or line.strip() == 'END': break
136                 stats = line.split(' ', 2)
137                 serverData[stats[1]] = stats[2]
138
139         return(data)
140
141     def flush_all(self):
142         'Expire all data currently in the memcache servers.'
143         for s in self.servers:
144             if not s.connect(): continue
145             s.send_cmd('flush_all')
146             s.expect("OK")
147
148     def debuglog(self, str):
149         if self.debug:
150             sys.stderr.write("MemCached: %s\n" % str)
151
152     def _statlog(self, func):
153         if not self.stats.has_key(func):
154             self.stats[func] = 1
155         else:
156             self.stats[func] += 1
157
158     def forget_dead_hosts(self):
159         """
160         Reset every host in the pool to an "alive" state.
161         """
162         for s in self.servers:
163             s.dead_until = 0
164
165     def _init_buckets(self):
166         self.buckets = []
167         for server in self.servers:
168             for i in range(server.weight):
169                 self.buckets.append(server)
170
171     def _get_server(self, key):
172         if type(key) == types.TupleType:
173             serverhash = key[0]
174             key = key[1]
175         else:
176             serverhash = hash(key)
177
178         for i in range(Client._SERVER_RETRIES):
179             server = self.buckets[serverhash % len(self.buckets)]
180             if server.connect():
181                 #print "(using server %s)" % server,
182                 return server, key
183             serverhash = hash(str(serverhash) + str(i))
184         return None, None
185
186     def disconnect_all(self):
187         for s in self.servers:
188             s.close_socket()
189    
190     def delete(self, key, time=0):
191         '''Deletes a key from the memcache.
192         
193         @return: Nonzero on success.
194         @rtype: int
195         '''
196         server, key = self._get_server(key)
197         if not server:
198             return 0
199         self._statlog('delete')
200         if time != None:
201             cmd = "delete %s %d" % (key, time)
202         else:
203             cmd = "delete %s" % key
204
205         try:
206             server.send_cmd(cmd)
207             server.expect("DELETED")
208         except socket.error, msg:
209             server.mark_dead(msg[1])
210             return 0
211         return 1
212
213     def incr(self, key, delta=1):
214         """
215         Sends a command to the server to atomically increment the value for C{key} by
216         C{delta}, or by 1 if C{delta} is unspecified.  Returns None if C{key} doesn't
217         exist on server, otherwise it returns the new value after incrementing.
218
219         Note that the value for C{key} must already exist in the memcache, and it
220         must be the string representation of an integer.
221
222         >>> mc.set("counter", "20")  # returns 1, indicating success
223         1
224         >>> mc.incr("counter")
225         21
226         >>> mc.incr("counter")
227         22
228
229         Overflow on server is not checked.  Be aware of values approaching
230         2**32.  See L{decr}.
231
232         @param delta: Integer amount to increment by (should be zero or greater).
233         @return: New value after incrementing.
234         @rtype: int
235         """
236         return self._incrdecr("incr", key, delta)
237
238     def decr(self, key, delta=1):
239         """
240         Like L{incr}, but decrements.  Unlike L{incr}, underflow is checked and
241         new values are capped at 0.  If server value is 1, a decrement of 2
242         returns 0, not -1.
243
244         @param delta: Integer amount to decrement by (should be zero or greater).
245         @return: New value after decrementing.
246         @rtype: int
247         """
248         return self._incrdecr("decr", key, delta)
249
250     def _incrdecr(self, cmd, key, delta):
251         server, key = self._get_server(key)
252         if not server:
253             return 0
254         self._statlog(cmd)
255         cmd = "%s %s %d" % (cmd, key, delta)
256         try:
257             server.send_cmd(cmd)
258             line = server.readline()
259             return int(line)
260         except socket.error, msg:
261             server.mark_dead(msg[1])
262             return None
263
264     def add(self, key, val, time=0):
265         '''
266         Add new key with value.
267         
268         Like L{set}, but only stores in memcache if the key doesn\'t already exist.
269
270         @return: Nonzero on success.
271         @rtype: int
272         '''
273         return self._set("add", key, val, time)
274     def replace(self, key, val, time=0):
275         '''Replace existing key with value.
276         
277         Like L{set}, but only stores in memcache if the key already exists. 
278         The opposite of L{add}.
279
280         @return: Nonzero on success.
281         @rtype: int
282         '''
283         return self._set("replace", key, val, time)
284     def set(self, key, val, time=0):
285         '''Unconditionally sets a key to a given value in the memcache.
286
287         The C{key} can optionally be an tuple, with the first element being the
288         hash value, if you want to avoid making this module calculate a hash value.
289         You may prefer, for example, to keep all of a given user's objects on the
290         same memcache server, so you could use the user's unique id as the hash
291         value.
292
293         @return: Nonzero on success.
294         @rtype: int
295         '''
296         return self._set("set", key, val, time)
297    
298     def _set(self, cmd, key, val, time):
299         server, key = self._get_server(key)
300         if not server:
301             return 0
302
303         self._statlog(cmd)
304
305         flags = 0
306         if isinstance(val, types.StringTypes):
307             pass
308         elif isinstance(val, int):
309             flags |= Client._FLAG_INTEGER
310             val = "%d" % val
311         elif isinstance(val, long):
312             flags |= Client._FLAG_LONG
313             val = "%d" % val
314         elif self._usePickle:
315             flags |= Client._FLAG_PICKLE
316             val = pickle.dumps(val, 2)
317         else:
318             pass
319        
320         fullcmd = "%s %s %d %d %d\r\n%s" % (cmd, key, flags, time, len(val), val)
321         try:
322             server.send_cmd(fullcmd)
323             server.expect("STORED")
324         except socket.error, msg:
325             server.mark_dead(msg[1])
326             return 0
327         return 1
328
329     def get(self, key):
330         '''Retrieves a key from the memcache.
331         
332         @return: The value or None.
333         '''
334         server, key = self._get_server(key)
335         if not server:
336             return None
337
338         self._statlog('get')
339
340         try:
341             server.send_cmd("get %s" % key)
342             rkey, flags, rlen, = self._expectvalue(server)
343             if not rkey:
344                 return None
345             value = self._recv_value(server, flags, rlen)
346             server.expect("END")
347         except (_Error, socket.error), msg:
348             if type(msg) is types.TupleType:
349                 msg = msg[1]
350             server.mark_dead(msg)
351             return None
352         return value
353
354     def get_multi(self, keys):
355         '''
356         Retrieves multiple keys from the memcache doing just one query.
357         
358         >>> success = mc.set("foo", "bar")
359         >>> success = mc.set("baz", 42)
360         >>> mc.get_multi(["foo", "baz", "foobar"]) == {"foo": "bar", "baz": 42}
361         1
362
363         This method is recommended over regular L{get} as it lowers the number of
364         total packets flying around your network, reducing total latency, since
365         your app doesn\'t have to wait for each round-trip of L{get} before sending
366         the next one.
367
368         @param keys: An array of keys.
369         @return:  A dictionary of key/value pairs that were available.
370
371         '''
372
373         self._statlog('get_multi')
374
375         server_keys = {}
376
377         # build up a list for each server of all the keys we want.
378         for key in keys:
379             server, key = self._get_server(key)
380             if not server:
381                 continue
382             if not server_keys.has_key(server):
383                 server_keys[server] = []
384             server_keys[server].append(key)
385
386         # send out all requests on each server before reading anything
387         dead_servers = []
388         for server in server_keys.keys():
389             try:
390                 server.send_cmd("get %s" % " ".join(server_keys[server]))
391             except socket.error, msg:
392                 server.mark_dead(msg[1])
393                 dead_servers.append(server)
394
395         # if any servers died on the way, don't expect them to respond.
396         for server in dead_servers:
397             del server_keys[server]
398
399         retvals = {}
400         for server in server_keys.keys():
401             try:
402                 line = server.readline()
403                 while line and line != 'END':
404                     rkey, flags, rlen = self._expectvalue(server, line)
405                     #  Bo Yang reports that this can sometimes be None
406                     if rkey is not None:
407                         val = self._recv_value(server, flags, rlen)
408                         retvals[rkey] = val
409                     line = server.readline()
410             except (_Error, socket.error), msg:
411                 server.mark_dead(msg)
412         return retvals
413
414     def _expectvalue(self, server, line=None):
415         if not line:
416             line = server.readline()
417
418         if line[:5] == 'VALUE':
419             resp, rkey, flags, len = line.split()
420             flags = int(flags)
421             rlen = int(len)
422             return (rkey, flags, rlen)
423         else:
424             return (None, None, None)
425
426     def _recv_value(self, server, flags, rlen):
427         rlen += 2 # include \r\n
428         buf = server.recv(rlen)
429         if len(buf) != rlen:
430             raise _Error("received %d bytes when expecting %d" % (len(buf), rlen))
431
432         if len(buf) == rlen:
433             buf = buf[:-2]  # strip \r\n
434
435         if flags == 0:
436             val = buf
437         elif flags & Client._FLAG_INTEGER:
438             val = int(buf)
439         elif flags & Client._FLAG_LONG:
440             val = long(buf)
441         elif self._usePickle and flags & Client._FLAG_PICKLE:
442             try:
443                 val = pickle.loads(buf)
444             except:
445                 self.debuglog('Pickle error...\n')
446                 val = None
447         else:
448             self.debuglog("unknown flags on get: %x\n" % flags)
449
450         return val
451
452 class _Host:
453     _DEAD_RETRY = 30  # number of seconds before retrying a dead server.
454
455     def __init__(self, host, debugfunc=None):
456         if isinstance(host, types.TupleType):
457             host = host[0]
458             self.weight = host[1]
459         else:
460             self.weight = 1
461
462         if host.find(":") > 0:
463             self.ip, self.port = host.split(":")
464             self.port = int(self.port)
465         else:
466             self.ip, self.port = host, 11211
467
468         if not debugfunc:
469             debugfunc = lambda x: x
470         self.debuglog = debugfunc
471
472         self.deaduntil = 0
473         self.socket = None
474    
475     def _check_dead(self):
476         if self.deaduntil and self.deaduntil > time.time():
477             return 1
478         self.deaduntil = 0
479         return 0
480
481     def connect(self):
482         if self._get_socket():
483             return 1
484         return 0
485
486     def mark_dead(self, reason):
487         self.debuglog("MemCache: %s: %s.  Marking dead." % (self, reason))
488         self.deaduntil = time.time() + _Host._DEAD_RETRY
489         self.close_socket()
490        
491     def _get_socket(self):
492         if self._check_dead():
493             return None
494         if self.socket:
495             return self.socket
496         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
497         # Python 2.3-ism:  s.settimeout(1)
498         try:
499             s.connect((self.ip, self.port))
500         except socket.error, msg:
501             self.mark_dead("connect: %s" % msg[1])
502             return None
503         self.socket = s
504         return s
505    
506     def close_socket(self):
507         if self.socket:
508             self.socket.close()
509             self.socket = None
510
511     def send_cmd(self, cmd):
512         if len(cmd) > 100:
513             self.socket.sendall(cmd)
514             self.socket.sendall('\r\n')
515         else:
516             self.socket.sendall(cmd + '\r\n')
517
518     def readline(self):
519         buffers = ''
520         recv = self.socket.recv
521         while 1:
522             data = recv(1)
523             if not data:
524                 self.mark_dead('Connection closed while reading from %s'
525                         % repr(self))
526                 break
527             if data == '\n' and buffers and buffers[-1] == '\r':
528                 return(buffers[:-1])
529             buffers = buffers + data
530         return(buffers)
531
532     def expect(self, text):
533         line = self.readline()
534         if line != text:
535             self.debuglog("while expecting '%s', got unexpected response '%s'" % (text, line))
536         return line
537    
538     def recv(self, rlen):
539         buf = ''
540         recv = self.socket.recv
541         while len(buf) < rlen:
542             buf = buf + recv(rlen - len(buf))
543         return buf
544
545     def __str__(self):
546         d = ''
547         if self.deaduntil:
548             d = " (dead until %d)" % self.deaduntil
549         return "%s:%d%s" % (self.ip, self.port, d)
550
551 def _doctest():
552     import doctest, memcache
553     servers = ["127.0.0.1:11211"]
554     mc = Client(servers, debug=1)
555     globs = {"mc": mc}
556     return doctest.testmod(memcache, globs=globs)
557
558 if __name__ == "__main__":
559     print "Testing docstrings..."
560     _doctest()
561     print "Running tests:"
562     print
563     #servers = ["127.0.0.1:11211", "127.0.0.1:11212"]
564     servers = ["127.0.0.1:11211"]
565     mc = Client(servers, debug=1)
566
567     def to_s(val):
568         if not isinstance(val, types.StringTypes):
569             return "%s (%s)" % (val, type(val))
570         return "%s" % val
571     def test_setget(key, val):
572         print "Testing set/get {'%s': %s} ..." % (to_s(key), to_s(val)),
573         mc.set(key, val)
574         newval = mc.get(key)
575         if newval == val:
576             print "OK"
577             return 1
578         else:
579             print "FAIL"
580             return 0
581
582     class FooStruct:
583         def __init__(self):
584             self.bar = "baz"
585         def __str__(self):
586             return "A FooStruct"
587         def __eq__(self, other):
588             if isinstance(other, FooStruct):
589                 return self.bar == other.bar
590             return 0
591        
592     test_setget("a_string", "some random string")
593     test_setget("an_integer", 42)
594     if test_setget("long", long(1<<30)):
595         print "Testing delete ...",
596         if mc.delete("long"):
597             print "OK"
598         else:
599             print "FAIL"
600     print "Testing get_multi ...",
601     print mc.get_multi(["a_string", "an_integer"])
602
603     print "Testing get(unknown value) ...",
604     print to_s(mc.get("unknown_value"))
605
606     f = FooStruct()
607     test_setget("foostruct", f)
608
609     print "Testing incr ...",
610     x = mc.incr("an_integer", 1)
611     if x == 43:
612         print "OK"
613     else:
614         print "FAIL"
615
616     print "Testing decr ...",
617     x = mc.decr("an_integer", 1)
618     if x == 42:
619         print "OK"
620     else:
621         print "FAIL"
622
623
624
625 # vim: ts=4 sw=4 et :
626
Note: See TracBrowser for help on using the browser.