Package csb :: Package bio :: Package io :: Module noe
[frames] | no frames]

Source Code for Module csb.bio.io.noe

  1  """ 
  2  Simple XEASY and Sparky peak list parsers. 
  3  """ 
  4   
  5  from abc import ABCMeta, abstractmethod 
  6  from csb.bio.nmr import NOESpectrum 
7 8 9 -class PeakListFormatError(ValueError):
10 pass
11
12 -class BasePeakListReader(object):
13 14 __metaclass__ = ABCMeta 15 16 @abstractmethod
17 - def read(self, table):
18 """ 19 Parse a peak list table. 20 21 @param table: input peak list table 22 @type table: str 23 @rtype: L{NOESpectrum} 24 """ 25 pass
26
27 - def read_file(self, filename):
28 """ 29 Parse a peak list file. 30 31 @param filename: input file name 32 @type filename: str 33 @rtype: L{NOESpectrum} 34 """ 35 with open(filename) as input: 36 return self.read(input.read())
37
38 - def read_all(self, filenames):
39 """ 40 Parse a list of peak list files and merge the resulting spectra. 41 All spectra must have identical dimensions. 42 43 @param filenames: input file names 44 @type filenames: iterable of str 45 46 @return: joint spectrum 47 @rtype: L{NOESpectrum} 48 """ 49 spectra = [self.read_file(f) for f in filenames] 50 return NOESpectrum.join(*spectra)
51
52 -class SparkyPeakListReader(BasePeakListReader):
53 """ 54 Sparky NOE peak list parser. 55 56 @param elements: list of element names for each dimension 57 @type elements: list of (str or L{EnumItem}) 58 @param connected: list of covalently connected dimension indices in the 59 format: [(i1,i2),...] 60 @type connected: list of (int,int) tuples 61 """ 62
63 - def __init__(self, elements, connected):
64 65 self._elements = list(elements) 66 self._connected = [(d1, d2) for d1, d2 in connected] 67 68 if len(self._elements) < 1: 69 raise ValueError("Can't parse a 0-dimensional peak list")
70
71 - def read(self, table):
72 """ 73 Parse a Sparky peak list table. 74 75 @param table: input peak list 76 @type table: str 77 @rtype: L{NOESpectrum} 78 """ 79 offset = 0 80 spectrum = NOESpectrum(self._elements) 81 82 for d1, d2 in self._connected: 83 spectrum.connect(d1, d2) 84 85 for l in table.splitlines(): 86 if not l.strip() or ('w1' in l and 'w2' in l): 87 if l.lstrip().lower().startswith('assignment'): 88 offset = 1 89 continue 90 91 line = l.split()[offset:] 92 try: 93 float(line[-1]) # last item may or may not be a comment 94 except ValueError: 95 if len(line) > 0: 96 line.pop() 97 98 items = list(map(float, line)) 99 intensity = items[-1] 100 dimensions = items[:-1] 101 102 if len(dimensions) != len(self._elements): 103 raise PeakListFormatError("Expected {0} dimensional spectrum, got {1}".format( 104 len(self._elements), len(dimensions))) 105 106 spectrum.add(intensity, dimensions) 107 108 return spectrum
109
110 -class XeasyPeakListReader(BasePeakListReader):
111 """ 112 XEASY NOE peak list parser. 113 """ 114
115 - def __init__(self):
116 pass
117
118 - def read(self, table):
119 """ 120 Parse an XEASY peak list table. 121 122 @param table: input peak list 123 @type table: str 124 @rtype: L{NOESpectrum} 125 """ 126 lines = table.splitlines() 127 spectrum = self._read_header(lines) 128 129 for l in lines: 130 if not l.strip() or l.startswith('#'): 131 continue 132 133 parts = l.split()[1:] 134 peak = parts[:spectrum.num_dimensions] 135 height = parts[spectrum.num_dimensions + 2] 136 137 intensity = float(height) 138 dimensions = map(float, peak) 139 140 spectrum.add(intensity, dimensions) 141 142 return spectrum
143 144
145 - def _read_header(self, lines):
146 147 num = 0 148 dim = {} 149 el = {} 150 el2 = {} 151 connectivity = None 152 153 for l in lines: 154 if l.startswith('#'): 155 if l[1:].lstrip().lower().startswith('number of dimensions'): 156 num = int(l.split()[-1]) 157 158 if l.startswith('#INAME'): 159 parts = l.split()[1:] 160 if len(parts) != 2: 161 raise PeakListFormatError("Invalid Xeasy header") 162 163 index = int(parts[0]) - 1 164 if index < 0: 165 raise PeakListFormatError("Invalid Xeasy header: dimension index < 1") 166 167 element = ''.join(i for i in parts[1] if i.isalpha()) 168 el[parts[1]] = index 169 el2[element] = index 170 171 dim[index] = element 172 173 if l.startswith('#CYANAFORMAT'): 174 connectivity = l.split()[1] 175 176 if len(dim) != num or num == 0: 177 raise PeakListFormatError("Invalid Xeasy header") 178 179 elements = tuple(dim[i] for i in sorted(dim)) 180 spectrum = NOESpectrum(elements) 181 182 if connectivity: 183 # cyanaformat - explicitly defines connected dimensions: 184 # upper case dimensions are connected, e.g. "#CYANAFORMAT hCH" => 2-3 185 if connectivity.upper() != ''.join(elements).upper(): 186 raise ValueError("Invalid XEASY/CYANA header") 187 for i1 in range(len(connectivity)): 188 for i2 in range(len(connectivity)): 189 e1, e2 = connectivity[i1], connectivity[i2] 190 if i1 != i2 and e1.isupper() and e2.isupper(): 191 spectrum.connect(i1, i2) 192 else: 193 # dimension labels starting with a number are connected, e.g. "1A B2 3C" => 1-3 194 if len(el) != num: 195 raise PeakListFormatError("Invalid XEASY header") 196 for e1 in el: 197 for e2 in el: 198 if e1 != e2: 199 element1 = dim[el[e1]] 200 element2 = dim[el[e2]] 201 202 num1 = e1.replace(element1, '') 203 num2 = e2.replace(element2, '') 204 205 if e1.startswith(num1) and e2.startswith(num2): 206 spectrum.connect(el[e1], el[e2]) 207 208 return spectrum
209
210 211 -class XeasyFileBuilder(object):
212 """ 213 XEASY output format builder. 214 215 @param stream: destination stream, were the output is written 216 @type stream: file 217 """ 218
219 - def __init__(self, stream):
220 self._out = stream
221
222 - def add_spectrum(self, spectrum):
223 224 self.add_header(spectrum) 225 self.add_peaks(spectrum)
226
227 - def add_header(self, spectrum):
228 """ 229 Write the XEASY header. 230 231 @param spectrum: NOE spectrum 232 @type spectrum: L{NOESpectrum} 233 """ 234 235 self._out.write( 236 '# Number of dimensions {0}\n'.format(spectrum.num_dimensions)) 237 238 conn = '' 239 240 for en, e in enumerate(spectrum.dimensions, start=1): 241 element = repr(e).upper() 242 self._out.write('#INAME {0} {1}{0}\n'.format(en, element)) 243 244 if spectrum.has_connected_dimensions(en - 1): 245 conn += element.upper() 246 else: 247 conn += element.lower() 248 249 self._out.write( 250 '#CYANAFORMAT {0}\n'.format(conn))
251
252 - def add_peaks(self, spectrum):
253 """ 254 Write all peaks from C{spectrum}. 255 256 @param spectrum: NOE spectrum 257 @type spectrum: L{NOESpectrum} 258 """ 259 260 for pn, peak in enumerate(spectrum, start=1): 261 self._out.write("{0:5} ".format(pn)) 262 263 for dim in range(spectrum.num_dimensions): 264 data = "{0:7.3f} ".format(peak.get(dim)) 265 self._out.write(data) 266 267 self._out.write("2 U ") 268 self._out.write("{0:18e} ".format(peak.intensity)) 269 self._out.write("0.00e+00 m 0 0 0 0 0\n")
270