1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30 '''
31 Library routines / utilities (some unused).
32 '''
33
34 from logging import getLogger
35
36
37 try:
38 chr = unichr
39 str = unicode
40 basestring = basestring
41 file = file
42 from StringIO import StringIO
43 reduce = reduce
44 except NameError:
45 from io import IOBase, StringIO
46 chr = chr
47 str = str
48 basestring = str
49 file = IOBase
50 from functools import reduce
51
52
54 '''
55 If the value is not of the given type, raise a syntax error.
56 '''
57 if none_ok and value is None:
58 return
59 if isinstance(value, type_):
60 return
61 raise TypeError(fmt('{0} (value {1}) must be of type {2}.',
62 name, repr(value), type_.__name__))
63
64
66 '''
67 A FIFO queue with a fixed maximum size that silently discards data on
68 overflow. It supports iteration for reading current contents and so
69 can be used for a "latest window".
70
71 Might be able to use deque instead? This may be more efficient
72 if the entire contents are read often (as is the case when depth gets
73 deeper)?
74 '''
75
77 '''
78 Stores up to size entries. Once full, appending a further value
79 will discard (and return) the oldest still present.
80 '''
81 self.__size = 0
82 self.__next = 0
83 self.__buffer = [None] * size
84
86 '''
87 This returns a value on overflow, otherwise None.
88 '''
89 capacity = len(self.__buffer)
90 if self.__size == capacity:
91 dropped = self.__buffer[self.__next]
92 else:
93 dropped = None
94 self.__size += 1
95 self.__buffer[self.__next] = value
96 self.__next = (self.__next + 1) % capacity
97 return dropped
98
99 - def pop(self, index=0):
100 '''
101 Remove and return the next item.
102 '''
103 if index != 0:
104 raise IndexError('FIFO is only a FIFO')
105 if self.__size < 1:
106 raise IndexError('FIFO empty')
107 popped = self.__buffer[(self.__next - self.__size) % len(self.__buffer)]
108 self.__size -= 1
109 return popped
110
112 return len(self.__buffer)
113
115 capacity = len(self.__buffer)
116 index = (self.__next - self.__size) % capacity
117 for _ in range(self.__size):
118 yield self.__buffer[index]
119 index = (index + 1) % capacity
120
122 '''
123 Clear the data (we just set the size to zero - this doesn't release
124 any references).
125 '''
126 self.__size = 0
127
128
130 '''
131 In Python 2.6 open [] appears to use maxint or similar, which is not
132 available in Python 3. This uses a minimum value for maxint I found
133 somewhere; hopefully no-one ever wants finite repeats larger than this.
134 '''
135 return spec.stop == None or spec.stop > 2147483647
136
137
138 -def lmap(function, values):
139 '''
140 A map that returns a list rather than an iterator.
141 '''
142
143 return list(map(function, values))
144
145
147 '''
148 Functional composition (assumes fun_a takes a single argument).
149 '''
150 def fun(*args, **kargs):
151 '''
152 This assumes fun_a takes a single argument.
153 '''
154 return fun_a(fun_b(*args, **kargs))
155 return fun
156
157
159 '''
160 Functional composition (assumes fun_b returns a sequence which is supplied
161 to fun_a via *args).
162 '''
163 def fun(*args, **kargs):
164 '''
165 Supply result from fun_b as *arg.
166 '''
167
168 return fun_a(*fun_b(*args, **kargs))
169 return fun
170
171
173 '''
174 An empty generator.
175 '''
176 if False:
177 yield None
178
179
180 -def count(value=0, step=1):
181 '''
182 Identical to itertools.count for recent Python, but 2.6 lacks the step.
183 '''
184 while True:
185 yield value
186 value += step
187
188
190 '''
191 Add standard Python logging to a class.
192 '''
193
195 super(LogMixin, self).__init__(*args, **kargs)
196 self._log = getLogger(self.__module__ + '.' + self.__class__.__name__)
197 self._debug = self._log.debug
198 self._info = self._log.info
199 self._warn = self._log.warn
200 self._error = self._log.error
201
202
203 -def safe_in(value, container, default=False):
204 '''
205 Test for membership without an error for unhashable items.
206 '''
207 try:
208 return value in container
209 except TypeError:
210 log = getLogger('lepl.support.safe_in')
211 log.debug(fmt('Cannot test for {0!r} in collection', value))
212 return default
213
214
216 '''
217 Add items to a container, if they are hashable.
218 '''
219 try:
220 container.add(value)
221 except TypeError:
222 log = getLogger('lepl.support.safe_add')
223 log.warn(fmt('Cannot add {0!r} to collection', value))
224
225
227 '''
228 Add items to a container. Call initially with a set, but accept the
229 returned collection, which will fallback to a list of necessary (if the
230 contents are unhashable).
231 '''
232 try:
233 container.add(value)
234 return container
235 except AttributeError:
236 container.append(value)
237 return container
238 except TypeError:
239 if isinstance(container, list):
240 raise
241 else:
242 container = list(container)
243 return fallback_add(container, value)
244
245
246 -def fold(fun, start, sequence):
247 '''
248 Fold over a sequence.
249 '''
250 for value in sequence:
251 start = fun(start, value)
252 return start
253
254
255 -def sample(prefix, rest, size=40):
256 '''
257 Provide a small sample of a string.
258 '''
259 text = prefix + rest
260 if len(text) > size:
261 text = prefix + rest[0:size-len(prefix)-3] + '...'
262 return text
263
264
265 __SINGLETONS = {}
266 '''
267 Map from factory (constructor/class) to singleton.
268 '''
269
279
280
281 -def fmt(template, *args, **kargs):
282 '''
283 Guarantee that template is always unicode, as embedding unicode in ascii
284 can cause errors.
285 '''
286 return str(template).format(*args, **kargs)
287
288
291
292
294 '''
295 Copy function name and docs.
296 '''
297 if text:
298 destn.__name__ = text
299 else:
300 destn.__name__ = source.__name__
301 destn.__doc__ = source.__doc__
302
303 destn.__module__ = source.__module__
304 return destn
305
306
308 '''
309 Add defaults to original dict if not already present.
310 '''
311 for (name, value) in defaults.items():
312 if prefix + name not in original:
313 original[prefix + name] = value
314 return original
315