Package csb :: Package bio :: Package io :: Module fasta
[frames] | no frames]

Source Code for Module csb.bio.io.fasta

  1  """ 
  2  FASTA format sequence I/O. 
  3   
  4  This module provides parsers and writers for sequences and alignments in 
  5  FASTA format. The most basic usage is: 
  6   
  7      >>> parser = SequenceParser() 
  8      >>> parser.parse_file('sequences.fa') 
  9      <SequenceCollection>   # collection of L{AbstractSequence}s 
 10   
 11  This will load all sequences in memory. If you are parsing a huge file, 
 12  then you could efficiently read the file sequence by sequence: 
 13   
 14      >>> for seq in parser.read('sequences.fa'): 
 15              ...            # seq is an L{AbstractSequence} 
 16               
 17  L{BaseSequenceParser} is the central class in this module, which defines a 
 18  common infrastructure for all sequence readers. L{SequenceParser} is a standard 
 19  implementation, and L{PDBSequenceParser} is specialized to read FASTA sequences 
 20  with PDB headers. 
 21   
 22  For parsing alignments, have a look at L{SequenceAlignmentReader} and 
 23  L{StructureAlignmentFactory}. 
 24   
 25  Finally, this module provides a number of L{OutputBuilder}s, which know how to 
 26  write L{AbstractSequence} and L{AbstractAlignment} objects to FASTA files: 
 27   
 28      >>> with open('file.fa', 'w') as out: 
 29              builder = OutputBuilder.create(AlignmentFormats.FASTA, out) 
 30              builder.add_alignment(alignment) 
 31              builder.add_sequence(sequence) 
 32              ... 
 33   
 34  or you could instantiate any of the L{OutputBuilder}s directly. 
 35  """ 
 36   
 37  import csb.io 
 38  import csb.core 
 39   
 40  from abc import ABCMeta, abstractmethod 
 41   
 42  from csb.bio.sequence import SequenceTypes, SequenceAlphabets, AlignmentFormats, SequenceError 
 43  from csb.bio.sequence import SequenceAlignment, StructureAlignment, A3MAlignment 
 44  from csb.bio.sequence import SequenceCollection, AbstractSequence, Sequence, RichSequence, ChainSequence 
45 46 47 -class SequenceFormatError(ValueError):
48 pass
49
50 51 -class BaseSequenceParser(object):
52 """ 53 FASTA parser template. Subclasses must implement the way FASTA strings are 54 handled by overriding C{BaseSequenceParser.read_sequence}. 55 56 @param product: sequence product factory (an L{AbstractSequence} subclass) 57 @type product: type 58 @param product_type: default L{SequenceTypes} member for the products 59 @type product_type: L{EnumItem} 60 """ 61 __metaclass__ = ABCMeta 62
63 - def __init__(self, product=Sequence, product_type=SequenceTypes.Protein):
64 65 self._product = None 66 67 if not issubclass(product, AbstractSequence): 68 raise TypeError(product) 69 70 if not product_type.enum is SequenceTypes: 71 raise TypeError(product_type) 72 73 self._product = product 74 self._type = product_type
75 76 @property
77 - def product_factory(self):
78 """ 79 Factory used to build sequence products 80 @rtype: class 81 """ 82 return self._product
83 84 @property
85 - def product_type(self):
86 """ 87 Default sequence type of the products - a member of L{SequenceTypes} 88 @rtype: enum item 89 """ 90 return self._type
91 92 @abstractmethod
93 - def read_sequence(self, string):
94 """ 95 Parse a single FASTA string 96 97 @return: a new sequence, created with L{BaseSequenceParser.product_factory} 98 @rtype: L{AbstractSequence} 99 100 @raise SequenceFormatError: on parse error 101 """ 102 pass
103
104 - def parse_string(self, fasta_string):
105 """ 106 Read FASTA sequences from an (m)FASTA-formatted string 107 108 @param fasta_string: FASTA string to parse 109 @type fasta_string: str 110 111 @return: a list of L{Sequence}s 112 @rtype: L{SequenceCollection} 113 114 @raise SequenceFormatError: on parse error 115 """ 116 117 stream = csb.io.MemoryStream() 118 stream.write(fasta_string) 119 120 return self.parse_file(stream)
121
122 - def parse_file(self, fasta_file):
123 """ 124 Read FASTA sequences from a (m)FASTA file 125 126 @param fasta_file: input FASTA file name or opened stream 127 @type fasta_file: str, file 128 129 @return: a list of L{Sequence}s 130 @rtype: L{SequenceCollection} 131 132 @raise SequenceFormatError: on parse error 133 """ 134 if isinstance(fasta_file, csb.core.string): 135 stream = open(fasta_file) 136 else: 137 stream = fasta_file 138 139 seqs = [] 140 reader = csb.io.EntryReader(stream, AbstractSequence.DELIMITER, None) 141 142 for entry in reader.entries(): 143 seqs.append(self.read_sequence(entry)) 144 145 return SequenceCollection(seqs)
146
147 - def read(self, fasta_file):
148 """ 149 Read FASTA sequences from an (m)FASTA file. 150 151 @param fasta_file: input FASTA file name or opened stream 152 @type fasta_file: str, file 153 154 @return: efficient cursor over all L{Sequence}s (parse on demand) 155 @rtype: iterator 156 157 @raise SequenceFormatError: on parse error 158 """ 159 if isinstance(fasta_file, csb.core.string): 160 stream = open(fasta_file) 161 else: 162 stream = fasta_file 163 164 reader = csb.io.EntryReader(stream, AbstractSequence.DELIMITER, None) 165 166 for entry in reader.entries(): 167 yield self.read_sequence(entry)
168
169 -class SequenceParser(BaseSequenceParser):
170 """ 171 Standard FASTA parser. See L{BaseSequenceParser} for details. 172 """ 173
174 - def read_sequence(self, string):
175 176 lines = string.strip().splitlines() 177 178 if not lines[0].startswith(AbstractSequence.DELIMITER): 179 lines = [''] + lines 180 if len(lines) < 2: 181 raise SequenceFormatError('Empty FASTA entry') 182 183 header = lines[0] 184 id = header[1:].split()[0] 185 sequence = ''.join(lines[1:]) 186 187 return self.product_factory(id, header, sequence, self.product_type)
188
189 -class PDBSequenceParser(SequenceParser):
190 """ 191 PDB FASTA parser. Reads the PDB ID and sequence type from the header. 192 See L{BaseSequenceParser} for more details. 193 """ 194
195 - def read_sequence(self, string):
196 197 seq = super(PDBSequenceParser, self).read_sequence(string) 198 199 if not (seq.header and seq.id) or not (len(seq.id) in(5, 6) and seq.header.find('mol:') != -1): 200 raise SequenceFormatError('Does not look like a PDB header: {0}'.format(seq.header)) 201 202 seq.id = seq.id.replace('_', '') 203 stype = seq.header.partition('mol:')[2].partition(' ')[0] 204 try: 205 seq.type = csb.core.Enum.parse(SequenceTypes, stype) 206 except csb.core.EnumValueError: 207 seq.type = SequenceTypes.Unknown 208 209 return seq
210
211 212 -class A3MSequenceIterator(object):
213
214 - def __init__(self, sequences, insertion):
215 216 self._temp = [] 217 self._insertion = insertion 218 219 for sequence in sequences: 220 221 row = list(sequence.sequence) 222 row.reverse() 223 self._temp.append(row)
224
225 - def next(self):
226 227 sequences = self._temp 228 229 column = [ ] 230 has_insertion = False 231 232 for sequence in sequences: 233 if len(sequence) > 0 and sequence[-1].islower(): 234 has_insertion = True 235 236 for sequence in sequences: 237 try: 238 if has_insertion and not sequence[-1].islower(): 239 column.append(self._insertion) 240 else: 241 column.append(sequence.pop()) 242 except IndexError: 243 column.append('') 244 245 if not any(column): 246 raise StopIteration() 247 248 return column
249
250 - def __iter__(self):
251 return self
252
253 - def __next__(self):
254 return self.next()
255
256 -class SequenceAlignmentReader(object):
257 """ 258 Sequence alignment parser. 259 260 @param product_type: default L{SequenceTypes} member for the sequence products 261 @type product_type: L{EnumItem} 262 @param strict: if True, raise exception on duplicate sequence identifiers. 263 See L{csb.bio.sequence.AbstractAlignment} for details 264 @type strict: bool 265 """ 266
267 - def __init__(self, product_type=SequenceTypes.Protein, strict=True):
268 269 if not product_type.enum is SequenceTypes: 270 raise TypeError(product_type) 271 272 self._type = product_type 273 self._strict = bool(strict)
274 275 @property
276 - def product_type(self):
277 """ 278 Default sequence type of the alignment entries - a member of L{SequenceTypes} 279 @rtype: enum item 280 """ 281 return self._type
282 283 @property
284 - def strict(self):
285 """ 286 True if strict mode is enabled 287 @rtype: bool 288 """ 289 return self._strict
290
291 - def read_fasta(self, string):
292 """ 293 Parse an alignment in multi-FASTA format. 294 295 @param string: alignment string 296 @type string: str 297 298 @rtype: L{SequenceAlignment} 299 """ 300 301 parser = SequenceParser(RichSequence, self.product_type) 302 sequences = parser.parse_string(string) 303 304 return SequenceAlignment(sequences, strict=self.strict)
305
306 - def read_a3m(self, string):
307 """ 308 Parse an alignment in A3M format. 309 310 @param string: alignment string 311 @type string: str 312 313 @rtype: L{A3MAlignment} 314 """ 315 alphabet = SequenceAlphabets.get(self.product_type) 316 317 # parse all "mis-aligned" sequences as case-sensitive strings 318 parser = SequenceParser(Sequence, self.product_type) 319 sequences = parser.parse_string(string) 320 321 # storage for expanded sequences 322 s = [] 323 324 for dummy in sequences: 325 s.append([]) 326 327 # expand all sequences with insertion characters and make them equal length 328 for column in A3MSequenceIterator(sequences, str(alphabet.INSERTION)): 329 for sn, char in enumerate(column): 330 s[sn].append(char) 331 332 # build normal sequence objects from the equalized sequence strings 333 aligned_seqs = [] 334 335 for sn, seq in enumerate(sequences): 336 337 sequence = RichSequence(seq.id, seq.header, s[sn], self.product_type) 338 aligned_seqs.append(sequence) 339 340 return A3MAlignment(aligned_seqs, strict=self.strict)
341
342 -class StructureAlignmentFactory(object):
343 """ 344 Protein structure alignment parser. 345 346 In order to construct the structural alignment, this factory needs a PDB 347 structure provider: an object, whose C{provider.get} method returns a 348 L{csb.bio.structute.Structure} for a given sequence identifier. Sequence 349 identifiers on the other hand need to be split into 'accession number' 350 and 'chain ID'. By default this is done using a standard PDB Entry ID 351 factory, but clients are free to provide custom factories. An C{id_factory} 352 must be a callable, which accepts a single string identifier and returns 353 an EntryID object. 354 355 @param provider: data source for all structures found in the alignment 356 @type provider: L{csb.bio.io.wwpdb.StructureProvider} 357 @param id_factory: callable factory, which transforms a sequence ID into 358 a L{csb.bio.io.wwpdb.EntryID} object. By default 359 this is L{csb.bio.io.wwpdb.EntryID.create}. 360 @type id_factory: callable 361 @param strict: if True, raise exception on duplicate sequence identifiers. 362 See L{csb.bio.sequence.AbstractAlignment} for details 363 @type strict: bool 364 """ 365
366 - def __init__(self, provider, id_factory=None, strict=True):
367 368 from csb.bio.io.wwpdb import EntryID 369 370 if id_factory is None: 371 id_factory = EntryID.create 372 if not hasattr(id_factory, '__call__'): 373 raise TypeError(id_factory) 374 375 if not hasattr(provider, 'get'): 376 raise TypeError(provider) 377 378 self._type = SequenceTypes.Protein 379 self._strict = bool(strict) 380 self._provider = provider 381 self._id_factory = id_factory
382 383 @property
384 - def product_type(self):
385 """ 386 Default sequence type of the alignment rows - a member of L{SequenceTypes} 387 @rtype: enum item 388 """ 389 return self._type
390 391 @property
392 - def provider(self):
393 """ 394 Current L{csb.bio.io.wwpdb.StructureProvider} instance in use 395 @rtype: L{StructureProvider} 396 """ 397 return self._provider
398 399 @property
400 - def id_factory(self):
401 """ 402 Current L{csb.bio.io.wwpdb.EntryID} factory instance in use 403 @rtype: L{EntryID} 404 """ 405 return self._id_factory
406 407 @property
408 - def strict(self):
409 """ 410 True if strict mode is enabled 411 @rtype: bool 412 """ 413 return self._strict
414
415 - def make_alignment(self, string):
416 """ 417 Build a protein structure alignment given a multi-FASTA string 418 and the current structure C{provider}. 419 420 @param string: alignment string 421 @type string: str 422 423 @rtype: L{SequenceAlignment} 424 425 @raise SequenceError: when an aligned sequence is not a proper 426 subsequence of its respective source PDB chain 427 @raise StructureNotFoundError: if C{provider} can't provide a structure 428 for a given sequence ID 429 @raise InvalidEntryIDError: if a given sequence ID cannot be parsed 430 """ 431 432 entries = [] 433 parser = SequenceParser(Sequence, self.product_type) 434 sequences = parser.parse_string(string) 435 436 for row in sequences: 437 id = self.id_factory(row.id) 438 chain = self.provider.get(id.accession).chains[id.chain] 439 440 entry = self.make_entry(row, chain) 441 entries.append(entry) 442 443 return StructureAlignment(entries, strict=self.strict)
444
445 - def make_entry(self, row, chain):
446 """ 447 Build a protein structure alignment entry, given a sequence alignment 448 entry and its corresponding source PDB chain. 449 450 @param row: sequence alignment entry (sequence with gaps) 451 @type row: L{AbstractSequence}, L{SequenceAdapter} 452 @param chain: source PDB chain 453 @type chain: L{csb.bio.structure.Chain} 454 455 @return: gapped chain sequence, containing cloned residues from the 456 source chain (except for the gaps) 457 @rtype: L{ChainSequence} 458 @raise SequenceError: when C{row} is not a proper subsequence of C{chain} 459 """ 460 offset = 1 461 residues = [] 462 sequence = row.strip().sequence.upper() 463 464 try: 465 start = chain.sequence.index(sequence) + 1 466 except ValueError: 467 raise SequenceError('{0}: not a subsequence of {1}'.format(row.id, chain.entry_id)) 468 469 for rinfo in row.residues: 470 471 if rinfo.type == row.alphabet.GAP: 472 residues.append(rinfo) 473 continue 474 else: 475 rank = start + offset - 1 476 assert chain.residues[rank].type == rinfo.type 477 residues.append(chain.residues[rank].clone()) 478 offset += 1 479 continue 480 481 return ChainSequence(row.id, row.header, residues, chain.type)
482
483 484 -class OutputBuilder(object):
485 """ 486 Base sequence/alignment string format builder. 487 488 @param output: destination stream, where the product is written. 489 @type output: file 490 @param headers: if False, omit headers 491 @type headers: bool 492 493 @note: File builders are not guaranteed to check the correctness of the 494 product. It is assumed that the client of the builder knows what 495 it is doing. 496 """ 497 __metaclass__ = ABCMeta 498 _registry = {} 499
500 - def __init__(self, output, headers=True):
501 502 if not hasattr(output, 'write'): 503 raise TypeError(output) 504 505 self._out = output 506 self._headers = bool(headers)
507 508 @staticmethod
509 - def create(format, *a, **k):
510 """ 511 Create an output builder, which knows how to handle the specified 512 sequence/alignment C{format}. Additional arguments are passed to the 513 builder's constructor. 514 515 @param format: L{AlignmentFormats} member 516 @type format: L{EnumItem} 517 @rtype: L{OutputBuilder} 518 """ 519 if format not in OutputBuilder._registry: 520 raise ValueError('Unhandled format: {0}'.format(format)) 521 522 klass = OutputBuilder._registry[format] 523 return klass(*a, **k)
524 525 @staticmethod
526 - def register(format, klass):
527 """ 528 Register a new output builder. 529 530 @param format: L{AlignmentFormats} member 531 @type format: L{EnumItem} 532 @param klass: builder class (L{OutputBuilder} sub-class) 533 @type klass: type 534 """ 535 assert format not in OutputBuilder._registry 536 assert issubclass(klass, OutputBuilder) 537 538 OutputBuilder._registry[format] = klass
539 540 @property
541 - def output(self):
542 """ 543 Destination stream 544 @rtype: stream 545 """ 546 return self._out
547 548 @property
549 - def headers(self):
550 """ 551 True if sequence headers will be written to the destination 552 @rtype: bool 553 """ 554 return self._headers
555
556 - def write(self, text):
557 """ 558 Write a chunk of C{text} to the output stream. 559 """ 560 self._out.write(text)
561
562 - def writeline(self, text):
563 """ 564 Write a chunk of C{text}, followed by a newline terminator. 565 """ 566 self._out.write(text) 567 self._out.write('\n')
568 569 @abstractmethod
570 - def add_sequence(self, sequence):
571 """ 572 Format and append a new sequence to the product. 573 @type sequence: L{AbstractSequence} 574 """ 575 pass
576
577 - def add_many(self, sequences):
578 """ 579 Format and append a collection of L{AbstractSequence}s to the product. 580 @type sequences: iterable of L{AbstractSequence}s 581 """ 582 for s in sequences: 583 self.add_sequence(s)
584 585 @abstractmethod
586 - def add_alignment(self, alignment):
587 """ 588 Format and append an alignment to the product. 589 @type alignment: L{AbstractAlignment} 590 """ 591 pass
592
593 - def add_separator(self, separator=''):
594 """ 595 Append a sequence separator to the product. 596 """ 597 self.writeline(separator)
598
599 - def add_comment(self, text, comment='#', length=120):
600 """ 601 Append a comment line to the product. 602 603 @param text: comment text 604 @type text: str 605 @param comment: comment prefix 606 @type comment: str 607 @param length: maximal comment length 608 @type length: int 609 """ 610 for i in range(0, len(text), length): 611 self.write(comment) 612 self.write(' ') 613 self.write(text[i : i + length]) 614 self.write('\n')
615
616 -class FASTAOutputBuilder(OutputBuilder):
617 """ 618 Formats sequences as standard FASTA strings. See L{OutputBuilder}. 619 """ 620 FORMAT = AlignmentFormats.FASTA 621
622 - def add_sequence(self, sequence):
623 624 if self.headers: 625 self.write(AbstractSequence.DELIMITER) 626 self.writeline(sequence.header) 627 628 insertion = str(sequence.alphabet.INSERTION) 629 gap = str(sequence.alphabet.GAP) 630 631 self.writeline(sequence.sequence.replace(insertion, gap))
632
633 - def add_alignment(self, alignment):
634 635 self.add_many(alignment.rows)
636
637 -class A3MOutputBuilder(OutputBuilder):
638 """ 639 Formats sequences as A3M strings. When appending an alignment, this builder 640 will write all insertion-containing columns in lower case. Also, gap symbols 641 are omitted if the respective columns contain insertions. 642 643 See L{OutputBuilder}. 644 """ 645 FORMAT = AlignmentFormats.A3M 646
647 - def add_sequence(self, sequence):
648 649 if self.headers: 650 self.write(AbstractSequence.DELIMITER) 651 self.writeline(sequence.header) 652 653 self.writeline(sequence.sequence)
654
655 - def add_alignment(self, alignment):
656 657 if isinstance(alignment, A3MAlignment): 658 self._add_a3m(alignment) 659 else: 660 self._add_proper(alignment)
661
662 - def _add_a3m(self, alignment):
663 664 for s in alignment.rows: 665 666 if self.headers: 667 self.write(AbstractSequence.DELIMITER) 668 self.writeline(s.header) 669 670 sequence = [] 671 672 for ci in s.columns: 673 674 if ci.residue.type != s.alphabet.INSERTION: 675 char = str(ci.residue.type) 676 677 if alignment.insertion_at(ci.column): 678 sequence.append(char.lower()) 679 else: 680 sequence.append(char) 681 else: 682 continue 683 684 self.writeline(''.join(sequence))
685
686 - def _add_proper(self, alignment):
687 688 for s in alignment.rows: 689 690 if self.headers: 691 self.write(AbstractSequence.DELIMITER) 692 self.writeline(s.header) 693 694 master = alignment.rows[1] 695 sequence = [] 696 697 for ci in s.columns: 698 char = str(ci.residue.type) 699 700 if master.columns[ci.column].residue.type == master.alphabet.GAP: 701 if ci.residue.type == s.alphabet.GAP: 702 continue 703 else: 704 sequence.append(char.lower()) 705 else: 706 sequence.append(char) 707 708 self.writeline(''.join(sequence))
709
710 -class PIROutputBuilder(OutputBuilder):
711 """ 712 Formats sequences as PIR FASTA strings, recognized by Modeller. 713 See L{OutputBuilder} for general alignment documentation. 714 """ 715 FORMAT = AlignmentFormats.PIR 716
717 - def add_sequence(self, sequence):
718 self._add(sequence, template=True)
719
720 - def add_alignment(self, alignment):
721 722 for n, sequence in enumerate(alignment.rows, start=1): 723 if n == 1: 724 self.add_target(sequence) 725 else: 726 self.add_template(sequence)
727 728
729 - def add_target(self, sequence):
730 self._add(sequence, template=False)
731
732 - def add_template(self, sequence):
733 self._add(sequence, template=True)
734
735 - def _add(self, sequence, template=True):
736 737 if self.headers: 738 739 if template: 740 type = 'structure' 741 else: 742 type = 'sequence' 743 744 id = sequence.id 745 start = end = '.' 746 747 if hasattr(sequence, 'start') and hasattr(sequence, 'end'): 748 start = sequence.start 749 end = sequence.end 750 751 header = 'P1;{0}\n{2}:{0}:{3}:{1}:{4}:{1}::::'.format(id[:-1], id[-1], type, start, end) 752 753 self.write(AbstractSequence.DELIMITER) 754 self.writeline(header) 755 756 insertion = str(sequence.alphabet.INSERTION) 757 gap = str(sequence.alphabet.GAP) 758 759 self.write(sequence.sequence.replace(insertion, gap)) 760 self.writeline('*')
761 762 763 # register builders 764 for klass in OutputBuilder.__subclasses__(): 765 OutputBuilder.register(klass.FORMAT, klass) 766