Package csb :: Package io
[frames] | no frames]

Source Code for Package csb.io

  1  """ 
  2  Generic I/O utility objects. 
  3   
  4  Here is a list of the most essential classes in this module: 
  5   
  6      1. temporary file system objects: L{TempFile}, L{TempFolder}  
  7      2. special/decorated streams: L{MemoryStream}, L{AutoFlushStream} 
  8      3. reusable stream readers and writers: L{EntryReader}, L{EntryWriter} 
  9      4. convenient communication with the shell: L{Shell} 
 10   
 11  In addition, csb.io is also part of the CSB compatibility layer. In order to 
 12  ensure cross-interpreter compatibility, always use the following csb.io objects: 
 13   
 14      - L{MemoryStream} instead of (c)StringIO 
 15      - csb.io.Pickle instead of pickle or cPickle 
 16      - csb.io.urllib instead of urllib or urllib.request 
 17       
 18  See also L{csb.core} for additional notes on compatibility.     
 19  """ 
 20   
 21  import os 
 22  import time 
 23  import errno 
 24  import shlex 
 25  import shutil 
 26  import tempfile 
 27  import subprocess 
 28   
 29  import csb.core 
 30   
 31   
 32  try: 
 33      from StringIO import StringIO 
 34  except ImportError: 
 35      from io import StringIO 
 36       
 37  try: 
 38      import cPickle as Pickle 
 39  except ImportError: 
 40      import pickle as Pickle 
 41   
 42  try: 
 43      import urllib.request as urllib 
 44  except ImportError: 
 45      import urllib2 as urllib 
 46       
 47  try: 
 48      from __builtin__ import unichr 
 49  except ImportError: 
 50      from builtins import chr as unichr 
 51           
 52   
 53  NEWLINE = "\n" 
54 55 56 -class Shell(object):
57 58 POLL = 1.0 59 60 @staticmethod
61 - def run(cmd, timeout=None):
62 """ 63 Run a shell command and return the output. 64 65 @param cmd: shell command with its arguments 66 @param cmd: tuple or str 67 @param timeout: maximum duration in seconds 68 @type timeout: float or None 69 70 @rtype: L{ShellInfo} 71 @raise InvalidCommandError: on invalid executable 72 @raise TimeoutError: when the timeout is expired 73 """ 74 75 if isinstance(cmd, csb.core.string): 76 cmd = shlex.split(cmd) 77 78 try: 79 cmd = tuple(cmd) 80 start = time.time() 81 process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 82 83 if timeout is not None: 84 while True: 85 if process.poll() == 0: 86 break 87 elif time.time() >= (start + timeout): 88 try: 89 process.kill() 90 except: 91 pass 92 raise TimeoutError(cmd, timeout) 93 else: 94 time.sleep(Shell.POLL) 95 96 stdout, stderr = process.communicate() 97 code = process.returncode 98 99 except OSError as oe: 100 if oe.errno == 2: 101 raise InvalidCommandError(oe.strerror, cmd) 102 else: 103 raise 104 105 return ShellInfo(code, stdout.decode() or '', stderr.decode() or '', cmd)
106 107 @staticmethod
108 - def runstrict(cmd, timeout=None):
109 """ 110 Same as L{Shell.run()}, but raises L{ProcessError} on bad exit code. 111 112 @param cmd: shell command with its arguments 113 @param cmd: tuple or str 114 @param timeout: maximum duration in seconds 115 @type timeout: float or None 116 117 @rtype: L{ShellInfo} 118 @raise ProcessError: on bad exit code 119 @raise TimeoutError: when the timeout is expired 120 """ 121 si = Shell.run(cmd, timeout=timeout) 122 123 if si.code == 0: 124 return si 125 else: 126 raise ProcessError(si)
127
128 -class ProcessError(Exception):
129 """ 130 Raised on L{Shell.run()} failures. 131 @type context: L{ShellInfo} 132 """
133 - def __init__(self, context, *args):
134 self.context = context 135 super(ProcessError, self).__init__(context, [])
136
137 - def __str__(self):
138 return 'Bad exit code: #{0.code}'.format(self.context)
139
140 -class TimeoutError(ProcessError):
141 """ 142 Raised on L{Shell.run()} timeouts. 143 """
144 - def __init__(self, cmd, timeout):
145 146 self.timeout = timeout 147 context = ShellInfo(None, '', '', cmd) 148 149 super(TimeoutError, self).__init__(context)
150
151 - def __str__(self):
152 return 'The process "{0.context.cmd}" did not finish in {0.timeout}s'.format(self)
153
154 -class InvalidCommandError(ValueError):
155 """ 156 Raised when L{Shell.run()} encounters an OSError. 157 """
158 - def __init__(self, message, cmd):
159 160 self.program = cmd[0] 161 if csb.core.iterable(cmd): 162 cmd = ' '.join(cmd) 163 self.cmd = cmd 164 self.msg = message 165 166 super(InvalidCommandError, self).__init__(message, cmd)
167
168 - def __str__(self):
169 return self.msg
170
171 -class ShellInfo(object):
172 """ 173 Shell command execution info 174 """ 175
176 - def __init__(self, code, stdout, stderr, cmd):
177 178 self.code = code 179 self.stdout = stdout or '' 180 self.stderr = stderr or '' 181 self.cmd = ' '.join(cmd)
182
183 184 -class MemoryStream(StringIO):
185 """ 186 In-memory stream object. Can be used in a context manager. 187 """
188 - def __enter__(self):
189 return self
190
191 - def __exit__(self, *a, **k):
192 try: 193 self.close() 194 except: 195 pass
196
197 -class AutoFlushStream(csb.core.Proxy):
198 """ 199 Wrapper around a buffered stream which automatically calls flush() 200 after each write(). This is essentially a proxy/decorator. 201 202 @param stream: the stream object to wrap 203 @type stream: file 204 """ 205
206 - def __init__(self, stream):
207 super(AutoFlushStream, self).__init__(stream)
208
209 - def write(self, data):
210 self._subject.write(data) 211 self._subject.flush()
212
213 -class TempFile(csb.core.Proxy):
214 """ 215 Create a temporary file and take care of deleting it upon object 216 destruction. The file can be opened multiple times on any platform, unlike 217 the case with tempfile.NamedTemporaryFile (does not work on Windows). 218 219 >>> with TempFile() as tmp: 220 tmp.write(...) 221 open(tmp.name)... 222 223 @param dispose: automatically delete the file 224 @type dispose: bool 225 @param mode: file open mode (text, binary), default=t 226 @type text: str 227 """ 228
229 - def __init__(self, dispose=True, mode='t'):
230 231 fd, file = tempfile.mkstemp() 232 233 self.__file = file 234 self.__fd = fd 235 self.__fh = open(self.__file, 'w' + mode) 236 self.__mode = mode 237 self.__dispose = bool(dispose) 238 239 super(TempFile, self).__init__(self.__fh)
240
241 - def __del__(self):
242 243 if self.__dispose: 244 try: 245 self.close() 246 except: 247 pass
248
249 - def __enter__(self):
250 return self
251
252 - def __exit__(self, *args):
253 self.close()
254
255 - def close(self):
256 """ 257 Flush, close and delete the file. 258 """ 259 260 if not self.__fh.closed: 261 self.__fh.flush() 262 self.__fh.close() 263 os.close(self.__fd) 264 265 if os.path.exists(self.__file): 266 os.remove(self.__file)
267
268 - def content(self):
269 """ 270 @return: the current content of the file. 271 @rtype: str or bytes 272 """ 273 self.flush() 274 with open(self.name, 'r' + self.__mode) as f: 275 return f.read()
276 277 @property
278 - def name(self):
279 """ 280 Full path and file name 281 @rtype: str 282 """ 283 return self.__file
284
285 -class TempFolder(object):
286 """ 287 Create a temporary directory which is automatically wiped when the object 288 is closed. 289 290 >>> with TempFolder() as tmp: 291 # put some files in tmp.name... 292 293 @param dispose: automaticlaly delete the folder and its contents 294 @type dispose: bool 295 """ 296
297 - def __init__(self, dispose=True):
298 299 name = tempfile.mkdtemp() 300 301 self.__name = os.path.abspath(name) 302 self.__dispose = bool(dispose)
303
304 - def __del__(self):
305 306 if self.__dispose: 307 try: 308 self.close() 309 except: 310 pass
311
312 - def __enter__(self):
313 return self
314
315 - def __exit__(self, *args):
316 self.close()
317
318 - def close(self):
319 """ 320 Delete the entire directory and its contents. 321 """ 322 if os.path.exists(self.name): 323 shutil.rmtree(self.name)
324 325 @property
326 - def name(self):
327 """ 328 Full directory name 329 @rtype: str 330 """ 331 return self.__name
332
333 -class EntryReader(object):
334 """ 335 Generic flatfile reader. Provides efficient iterable interface over the entries 336 in the specified stream. 337 338 @param stream: the source data stream to read 339 @type stream: file 340 @param start_marker: a string marker which marks the beginning of a new entry 341 @type start_marker: str 342 @param end_marker: a string marker which signals the end of the file 343 @type end_marker: str, None 344 """
345 - def __init__(self, stream, start_marker, end_marker=None):
346 347 if not (hasattr(stream, 'seek') and hasattr(stream, 'readline')): 348 raise TypeError('The entry reader requires an opened stream.') 349 350 stream.seek(0) 351 self._stream = stream 352 self._start_marker = None 353 self._end_marker = None 354 355 self.start_marker = start_marker 356 self.end_marker = end_marker
357 358 @property
359 - def start_marker(self):
360 return self._start_marker
361 @start_marker.setter
362 - def start_marker(self, value):
363 if value is not None: 364 value = str(value) 365 self._start_marker = value
366 367 @property
368 - def end_marker(self):
369 return self._end_marker
370 @end_marker.setter
371 - def end_marker(self, value):
372 if value is not None: 373 value = str(value) 374 self._end_marker = value
375
376 - def __del__(self):
377 378 try: 379 self._stream.close() 380 except: 381 pass
382
383 - def entries(self):
384 """ 385 Return an iterator over all entries from the stream/flat file. 386 387 @return: iterable over all entries read from the stream 388 @rtype: generator 389 """ 390 391 self._stream.seek(0) 392 393 entry = '' 394 in_entry = False 395 396 while True: 397 line = self._stream.readline() 398 399 if not in_entry: 400 if line.startswith(self.start_marker): 401 in_entry = True 402 entry = line 403 else: 404 if line.startswith(self.start_marker): 405 yield entry 406 entry = line 407 elif not line or line.strip() == self.end_marker: 408 yield entry 409 break 410 else: 411 entry += line 412 413 if not line: 414 break
415
416 - def readall(self):
417 """ 418 Return a list of all entries in the stream. 419 420 @rtype: list 421 """ 422 return list(self.entries())
423
424 -class EntryWriter(object):
425 """ 426 A simple stream writer. The best way to use it is:: 427 428 with EntryWriter(output_file, close=True) as out: 429 out.write(object) 430 431 In this way the stream is automatically closed at the end of the block. 432 433 @param destination: output file name or opened stream 434 @type destination: str or stream 435 @param newline: new line string (the default is L{csb.io.NEWLINE}) 436 @type newline: str 437 @param close: if True (the default), the stream is automatically 438 closed when the object is destroyed 439 @type close: bool 440 """ 441
442 - def __init__(self, destination, close=True, newline=NEWLINE):
443 444 self._stream = None 445 self._newline = NEWLINE 446 self._autoclose = True 447 448 self.newline = newline 449 self.autoclose = close 450 451 if isinstance(destination, csb.core.string): 452 self._stream = open(destination, 'w') 453 self.autoclose = True 454 455 elif hasattr(destination, 'write'): 456 self._stream = destination 457 458 else: 459 raise TypeError(destination)
460
461 - def __enter__(self):
462 return self
463
464 - def __exit__(self, exc_type, exc_value, traceback):
465 if self.autoclose: 466 self.close()
467
468 - def __del__(self):
469 if self.autoclose: 470 self.close()
471 472 @property
473 - def stream(self):
474 """ 475 Destination stream 476 @rtype: stream 477 """ 478 return self._stream
479 480 @property
481 - def newline(self):
482 return self._newline
483 @newline.setter
484 - def newline(self, value):
485 self._newline = str(value)
486 487 @property
488 - def autoclose(self):
489 return self._autoclose
490 @autoclose.setter
491 - def autoclose(self, value):
492 self._autoclose = bool(value)
493
494 - def close(self):
495 """ 496 Close the destination stream. 497 """ 498 try: 499 self._stream.close() 500 except: 501 pass
502
503 - def write(self, data):
504 """ 505 Write a chunk of sting data to the destination stream. 506 """ 507 self._stream.write(data)
508
509 - def writeline(self, data):
510 """ 511 Same as C{write}, but appends a newline character at the end. 512 """ 513 self._stream.write(data) 514 self._stream.write(self.newline)
515
516 - def writeall(self, entries, delimiter=NEWLINE):
517 """ 518 Write all C{entries} to the destination stream, separating them with 519 C{delimiter} 520 521 @param entries: a collection of objects 522 @type entries: iterable 523 @param delimiter: append this string after each entry (the default is a 524 C{self.newline} character) 525 @type delimiter: str 526 """ 527 if delimiter == NEWLINE: 528 delimiter = self.newline 529 for entry in entries: 530 self.write(entry) 531 self.write(delimiter)
532
533 -def dump(this, filename, gzip=False, lock=None, timeout=None):
534 """ 535 Writes a python object to a file, using python cPickle 536 Supports also '~' or '~user'. 537 538 @param this: The object, which will be written to disk 539 @type this: Any python object 540 @param filename: Filename of the new file 541 @type filename: String 542 @param gzip: Use gzip to compress the file 543 @type gzip: Boolean 544 @param lock: Use a lockfile to restrict access 545 """ 546 547 ## check whether file is locked 548 ## file locked? 549 filename = os.path.expanduser(filename) 550 551 if lock is not None: 552 lockdir = filename + '.lock' 553 554 if timeout is not None and timeout > 0: 555 end_time = timeout + time.time() 556 557 while True: 558 try: 559 os.mkdir(lockdir) 560 except OSError as e: 561 # File is already locked 562 if e.errno == errno.EEXIST: 563 if timeout is not None and time.time() > end_time: 564 raise IOError("Failed to acquire Lock") 565 else: 566 raise IOError("Failed to acquire Lock") 567 else: 568 break 569 570 if gzip: 571 import gzip 572 stream = gzip.GzipFile(filename, 'wb') 573 else: 574 stream = open(filename, 'wb') 575 576 try: 577 if type(this).__name__ == 'array': 578 import Numeric #@UnresolvedImport 579 p = Numeric.Pickler(stream) 580 p.dump(this) 581 else: 582 Pickle.dump(this, stream, 2) 583 finally: 584 stream.close() 585 586 if lock is not None: 587 ## release lock 588 try: 589 os.rmdir(lockdir) 590 except: 591 raise IOError('missing lockfile {0}'.format(lockdir))
592
593 -def load(filename, gzip=False, lock=None, timeout=None):
594 """ 595 Unpickle an object from filename 596 597 @param filename: Filename pickled object 598 @param gzip: Use gzip to uncompress the file 599 @type gzip: Boolean 600 @param lock: Use a lockfile to restrict access 601 602 @return: Python object unpickled from file 603 """ 604 ## file locked? 605 filename = os.path.expanduser(filename) 606 607 if lock is not None: 608 lockdir = filename + '.lock' 609 610 if timeout is not None and timeout > 0: 611 end_time = timeout + time.time() 612 613 while True: 614 try: 615 os.mkdir(lockdir) 616 except OSError as e: 617 # File is already locked 618 if e.errno == errno.EEXIST: 619 if timeout is not None and time.time() > end_time: 620 raise IOError("Failed to acquire Lock") 621 else: 622 raise IOError("Failed to acquire Lock") 623 else: 624 break 625 626 if gzip: 627 import gzip 628 stream = gzip.GzipFile(filename) 629 try: 630 stream.readline() 631 stream.seek(0) 632 except: 633 stream.close() 634 raise 635 636 else: 637 stream = open(filename, 'rb') 638 639 try: 640 this = Pickle.load(stream) 641 except: 642 stream.close() 643 import Numeric #@UnresolvedImport 644 try: 645 stream = gzip.GzipFile(filename) 646 except: 647 stream = open(filename, 'rb') 648 649 try: 650 unpickler = Numeric.Unpickler(stream) 651 this = unpickler.load() 652 except: 653 stream.close() 654 raise 655 656 stream.close() 657 658 if lock is not None: 659 ## release lock 660 try: 661 os.rmdir(lockdir) 662 except: 663 raise IOError('missing lockfile {0}'.format(lockdir)) 664 665 return this
666