1 """
2 Generic I/O utility objects.
3
4 Here is a list of the most essential classes in this module:
5
6 1. temporary file system objects: L{TempFile}, L{TempFolder}
7 2. special/decorated streams: L{MemoryStream}, L{AutoFlushStream}
8 3. reusable stream readers and writers: L{EntryReader}, L{EntryWriter}
9 4. convenient communication with the shell: L{Shell}
10
11 In addition, csb.io is also part of the CSB compatibility layer. In order to
12 ensure cross-interpreter compatibility, always use the following csb.io objects:
13
14 - L{MemoryStream} instead of (c)StringIO
15 - csb.io.Pickle instead of pickle or cPickle
16 - csb.io.urllib instead of urllib or urllib.request
17
18 See also L{csb.core} for additional notes on compatibility.
19 """
20
21 import os
22 import time
23 import errno
24 import shlex
25 import shutil
26 import tempfile
27 import subprocess
28
29 import csb.core
30
31
32 try:
33 from StringIO import StringIO
34 except ImportError:
35 from io import StringIO
36
37 try:
38 import cPickle as Pickle
39 except ImportError:
40 import pickle as Pickle
41
42 try:
43 import urllib.request as urllib
44 except ImportError:
45 import urllib2 as urllib
46
47 try:
48 from __builtin__ import unichr
49 except ImportError:
50 from builtins import chr as unichr
51
52
53 NEWLINE = "\n"
54
55
56 -class Shell(object):
57
58 POLL = 1.0
59
60 @staticmethod
61 - def run(cmd, timeout=None):
62 """
63 Run a shell command and return the output.
64
65 @param cmd: shell command with its arguments
66 @param cmd: tuple or str
67 @param timeout: maximum duration in seconds
68 @type timeout: float or None
69
70 @rtype: L{ShellInfo}
71 @raise InvalidCommandError: on invalid executable
72 @raise TimeoutError: when the timeout is expired
73 """
74
75 if isinstance(cmd, csb.core.string):
76 cmd = shlex.split(cmd)
77
78 try:
79 cmd = tuple(cmd)
80 start = time.time()
81 process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
82
83 if timeout is not None:
84 while True:
85 if process.poll() == 0:
86 break
87 elif time.time() >= (start + timeout):
88 try:
89 process.kill()
90 except:
91 pass
92 raise TimeoutError(cmd, timeout)
93 else:
94 time.sleep(Shell.POLL)
95
96 stdout, stderr = process.communicate()
97 code = process.returncode
98
99 except OSError as oe:
100 if oe.errno == 2:
101 raise InvalidCommandError(oe.strerror, cmd)
102 else:
103 raise
104
105 return ShellInfo(code, stdout.decode() or '', stderr.decode() or '', cmd)
106
107 @staticmethod
109 """
110 Same as L{Shell.run()}, but raises L{ProcessError} on bad exit code.
111
112 @param cmd: shell command with its arguments
113 @param cmd: tuple or str
114 @param timeout: maximum duration in seconds
115 @type timeout: float or None
116
117 @rtype: L{ShellInfo}
118 @raise ProcessError: on bad exit code
119 @raise TimeoutError: when the timeout is expired
120 """
121 si = Shell.run(cmd, timeout=timeout)
122
123 if si.code == 0:
124 return si
125 else:
126 raise ProcessError(si)
127
129 """
130 Raised on L{Shell.run()} failures.
131 @type context: L{ShellInfo}
132 """
136
138 return 'Bad exit code: #{0.code}'.format(self.context)
139
141 """
142 Raised on L{Shell.run()} timeouts.
143 """
150
152 return 'The process "{0.context.cmd}" did not finish in {0.timeout}s'.format(self)
153
155 """
156 Raised when L{Shell.run()} encounters an OSError.
157 """
167
170
172 """
173 Shell command execution info
174 """
175
176 - def __init__(self, code, stdout, stderr, cmd):
177
178 self.code = code
179 self.stdout = stdout or ''
180 self.stderr = stderr or ''
181 self.cmd = ' '.join(cmd)
182
185 """
186 In-memory stream object. Can be used in a context manager.
187 """
190
192 try:
193 self.close()
194 except:
195 pass
196
198 """
199 Wrapper around a buffered stream which automatically calls flush()
200 after each write(). This is essentially a proxy/decorator.
201
202 @param stream: the stream object to wrap
203 @type stream: file
204 """
205
208
210 self._subject.write(data)
211 self._subject.flush()
212
214 """
215 Create a temporary file and take care of deleting it upon object
216 destruction. The file can be opened multiple times on any platform, unlike
217 the case with tempfile.NamedTemporaryFile (does not work on Windows).
218
219 >>> with TempFile() as tmp:
220 tmp.write(...)
221 open(tmp.name)...
222
223 @param dispose: automatically delete the file
224 @type dispose: bool
225 @param mode: file open mode (text, binary), default=t
226 @type text: str
227 """
228
229 - def __init__(self, dispose=True, mode='t'):
230
231 fd, file = tempfile.mkstemp()
232
233 self.__file = file
234 self.__fd = fd
235 self.__fh = open(self.__file, 'w' + mode)
236 self.__mode = mode
237 self.__dispose = bool(dispose)
238
239 super(TempFile, self).__init__(self.__fh)
240
242
243 if self.__dispose:
244 try:
245 self.close()
246 except:
247 pass
248
251
254
256 """
257 Flush, close and delete the file.
258 """
259
260 if not self.__fh.closed:
261 self.__fh.flush()
262 self.__fh.close()
263 os.close(self.__fd)
264
265 if os.path.exists(self.__file):
266 os.remove(self.__file)
267
269 """
270 @return: the current content of the file.
271 @rtype: str or bytes
272 """
273 self.flush()
274 with open(self.name, 'r' + self.__mode) as f:
275 return f.read()
276
277 @property
279 """
280 Full path and file name
281 @rtype: str
282 """
283 return self.__file
284
286 """
287 Create a temporary directory which is automatically wiped when the object
288 is closed.
289
290 >>> with TempFolder() as tmp:
291 # put some files in tmp.name...
292
293 @param dispose: automaticlaly delete the folder and its contents
294 @type dispose: bool
295 """
296
298
299 name = tempfile.mkdtemp()
300
301 self.__name = os.path.abspath(name)
302 self.__dispose = bool(dispose)
303
305
306 if self.__dispose:
307 try:
308 self.close()
309 except:
310 pass
311
314
317
319 """
320 Delete the entire directory and its contents.
321 """
322 if os.path.exists(self.name):
323 shutil.rmtree(self.name)
324
325 @property
327 """
328 Full directory name
329 @rtype: str
330 """
331 return self.__name
332
333 -class EntryReader(object):
334 """
335 Generic flatfile reader. Provides efficient iterable interface over the entries
336 in the specified stream.
337
338 @param stream: the source data stream to read
339 @type stream: file
340 @param start_marker: a string marker which marks the beginning of a new entry
341 @type start_marker: str
342 @param end_marker: a string marker which signals the end of the file
343 @type end_marker: str, None
344 """
345 - def __init__(self, stream, start_marker, end_marker=None):
346
347 if not (hasattr(stream, 'seek') and hasattr(stream, 'readline')):
348 raise TypeError('The entry reader requires an opened stream.')
349
350 stream.seek(0)
351 self._stream = stream
352 self._start_marker = None
353 self._end_marker = None
354
355 self.start_marker = start_marker
356 self.end_marker = end_marker
357
358 @property
359 - def start_marker(self):
360 return self._start_marker
361 @start_marker.setter
362 - def start_marker(self, value):
363 if value is not None:
364 value = str(value)
365 self._start_marker = value
366
367 @property
368 - def end_marker(self):
369 return self._end_marker
370 @end_marker.setter
371 - def end_marker(self, value):
372 if value is not None:
373 value = str(value)
374 self._end_marker = value
375
377
378 try:
379 self._stream.close()
380 except:
381 pass
382
384 """
385 Return an iterator over all entries from the stream/flat file.
386
387 @return: iterable over all entries read from the stream
388 @rtype: generator
389 """
390
391 self._stream.seek(0)
392
393 entry = ''
394 in_entry = False
395
396 while True:
397 line = self._stream.readline()
398
399 if not in_entry:
400 if line.startswith(self.start_marker):
401 in_entry = True
402 entry = line
403 else:
404 if line.startswith(self.start_marker):
405 yield entry
406 entry = line
407 elif not line or line.strip() == self.end_marker:
408 yield entry
409 break
410 else:
411 entry += line
412
413 if not line:
414 break
415
417 """
418 Return a list of all entries in the stream.
419
420 @rtype: list
421 """
422 return list(self.entries())
423
424 -class EntryWriter(object):
425 """
426 A simple stream writer. The best way to use it is::
427
428 with EntryWriter(output_file, close=True) as out:
429 out.write(object)
430
431 In this way the stream is automatically closed at the end of the block.
432
433 @param destination: output file name or opened stream
434 @type destination: str or stream
435 @param newline: new line string (the default is L{csb.io.NEWLINE})
436 @type newline: str
437 @param close: if True (the default), the stream is automatically
438 closed when the object is destroyed
439 @type close: bool
440 """
441
442 - def __init__(self, destination, close=True, newline=NEWLINE):
443
444 self._stream = None
445 self._newline = NEWLINE
446 self._autoclose = True
447
448 self.newline = newline
449 self.autoclose = close
450
451 if isinstance(destination, csb.core.string):
452 self._stream = open(destination, 'w')
453 self.autoclose = True
454
455 elif hasattr(destination, 'write'):
456 self._stream = destination
457
458 else:
459 raise TypeError(destination)
460
461 - def __enter__(self):
463
464 - def __exit__(self, exc_type, exc_value, traceback):
465 if self.autoclose:
466 self.close()
467
469 if self.autoclose:
470 self.close()
471
472 @property
474 """
475 Destination stream
476 @rtype: stream
477 """
478 return self._stream
479
480 @property
483 @newline.setter
484 - def newline(self, value):
485 self._newline = str(value)
486
487 @property
488 - def autoclose(self):
489 return self._autoclose
490 @autoclose.setter
491 - def autoclose(self, value):
492 self._autoclose = bool(value)
493
495 """
496 Close the destination stream.
497 """
498 try:
499 self._stream.close()
500 except:
501 pass
502
503 - def write(self, data):
504 """
505 Write a chunk of sting data to the destination stream.
506 """
507 self._stream.write(data)
508
509 - def writeline(self, data):
510 """
511 Same as C{write}, but appends a newline character at the end.
512 """
513 self._stream.write(data)
514 self._stream.write(self.newline)
515
516 - def writeall(self, entries, delimiter=NEWLINE):
517 """
518 Write all C{entries} to the destination stream, separating them with
519 C{delimiter}
520
521 @param entries: a collection of objects
522 @type entries: iterable
523 @param delimiter: append this string after each entry (the default is a
524 C{self.newline} character)
525 @type delimiter: str
526 """
527 if delimiter == NEWLINE:
528 delimiter = self.newline
529 for entry in entries:
530 self.write(entry)
531 self.write(delimiter)
532
533 -def dump(this, filename, gzip=False, lock=None, timeout=None):
534 """
535 Writes a python object to a file, using python cPickle
536 Supports also '~' or '~user'.
537
538 @param this: The object, which will be written to disk
539 @type this: Any python object
540 @param filename: Filename of the new file
541 @type filename: String
542 @param gzip: Use gzip to compress the file
543 @type gzip: Boolean
544 @param lock: Use a lockfile to restrict access
545 """
546
547
548
549 filename = os.path.expanduser(filename)
550
551 if lock is not None:
552 lockdir = filename + '.lock'
553
554 if timeout is not None and timeout > 0:
555 end_time = timeout + time.time()
556
557 while True:
558 try:
559 os.mkdir(lockdir)
560 except OSError as e:
561
562 if e.errno == errno.EEXIST:
563 if timeout is not None and time.time() > end_time:
564 raise IOError("Failed to acquire Lock")
565 else:
566 raise IOError("Failed to acquire Lock")
567 else:
568 break
569
570 if gzip:
571 import gzip
572 stream = gzip.GzipFile(filename, 'wb')
573 else:
574 stream = open(filename, 'wb')
575
576 try:
577 if type(this).__name__ == 'array':
578 import Numeric
579 p = Numeric.Pickler(stream)
580 p.dump(this)
581 else:
582 Pickle.dump(this, stream, 2)
583 finally:
584 stream.close()
585
586 if lock is not None:
587
588 try:
589 os.rmdir(lockdir)
590 except:
591 raise IOError('missing lockfile {0}'.format(lockdir))
592
593 -def load(filename, gzip=False, lock=None, timeout=None):
594 """
595 Unpickle an object from filename
596
597 @param filename: Filename pickled object
598 @param gzip: Use gzip to uncompress the file
599 @type gzip: Boolean
600 @param lock: Use a lockfile to restrict access
601
602 @return: Python object unpickled from file
603 """
604
605 filename = os.path.expanduser(filename)
606
607 if lock is not None:
608 lockdir = filename + '.lock'
609
610 if timeout is not None and timeout > 0:
611 end_time = timeout + time.time()
612
613 while True:
614 try:
615 os.mkdir(lockdir)
616 except OSError as e:
617
618 if e.errno == errno.EEXIST:
619 if timeout is not None and time.time() > end_time:
620 raise IOError("Failed to acquire Lock")
621 else:
622 raise IOError("Failed to acquire Lock")
623 else:
624 break
625
626 if gzip:
627 import gzip
628 stream = gzip.GzipFile(filename)
629 try:
630 stream.readline()
631 stream.seek(0)
632 except:
633 stream.close()
634 raise
635
636 else:
637 stream = open(filename, 'rb')
638
639 try:
640 this = Pickle.load(stream)
641 except:
642 stream.close()
643 import Numeric
644 try:
645 stream = gzip.GzipFile(filename)
646 except:
647 stream = open(filename, 'rb')
648
649 try:
650 unpickler = Numeric.Unpickler(stream)
651 this = unpickler.load()
652 except:
653 stream.close()
654 raise
655
656 stream.close()
657
658 if lock is not None:
659
660 try:
661 os.rmdir(lockdir)
662 except:
663 raise IOError('missing lockfile {0}'.format(lockdir))
664
665 return this
666