Source code for binoculars.backends.id03

import sys
import os
import itertools
import glob
import numpy
import time

#python3 support
PY3 = sys.version_info > (3,)
if PY3:
    pass
else:
    from itertools import izip as zip

try:
    from PyMca import specfilewrapper, EdfFile, SixCircle, specfile
except ImportError:
    from PyMca5.PyMca import specfilewrapper, EdfFile, SixCircle, specfile

from .. import backend, errors, util


[docs]class pixels(backend.ProjectionBase):
[docs] def project(self, wavelength, UB, gamma, delta, theta, mu, chi, phi): y, x = numpy.mgrid[slice(None, gamma.shape[0]), slice(None, delta.shape[0])] return (y, x)
[docs] def get_axis_labels(self): return 'y', 'x'
[docs]class HKLProjection(backend.ProjectionBase): # arrays: gamma, delta # scalars: theta, mu, chi, phi
[docs] def project(self, wavelength, UB, gamma, delta, theta, mu, chi, phi): R = SixCircle.getHKL(wavelength, UB, gamma=gamma, delta=delta, theta=theta, mu=mu, chi=chi, phi=phi) shape = gamma.size, delta.size H = R[0, :].reshape(shape) K = R[1, :].reshape(shape) L = R[2, :].reshape(shape) return (H, K, L)
[docs] def get_axis_labels(self): return 'H', 'K', 'L'
[docs]class HKProjection(HKLProjection):
[docs] def project(self, wavelength, UB, gamma, delta, theta, mu, chi, phi): H, K, L = super(HKProjection, self).project(wavelength, UB, gamma, delta, theta, mu, chi, phi) return (H, K)
[docs] def get_axis_labels(self): return 'H', 'K'
[docs]class specularangles(backend.ProjectionBase):
[docs] def project(self, wavelength, UB, gamma, delta, theta, mu, chi, phi): delta, gamma = numpy.meshgrid(delta, gamma) mu *= numpy.pi/180 delta *= numpy.pi/180 gamma *= numpy.pi/180 chi *= numpy.pi/180 phi *= numpy.pi/180 theta *= numpy.pi/180 def mat(u, th): ux, uy, uz = u[0], u[1], u[2] sint = numpy.sin(th) cost = numpy.cos(th) mcost = (1 - numpy.cos(th)) return numpy.matrix([[cost + ux**2 * mcost, ux * uy * mcost - uz * sint, ux * uz * mcost + uy * sint], [uy * ux * mcost + uz * sint, cost + uy**2 * mcost, uy * uz - ux * sint], [uz * ux * mcost - uy * sint, uz * uy * mcost + ux * sint, cost + uz**2 * mcost]]) def rot(vx, vy, vz, u, th): R = mat(u, th) return R[0, 0] * vx + R[0, 1] * vy + R[0, 2] * vz, R[1, 0] * vx + R[1, 1] * vy + R[1, 2] * vz, R[2, 0] * vx + R[2, 1] * vy + R[2, 2] * vz #what are the angles of kin and kout in the sample frame? #angles in the hexapod frame koutx, kouty, koutz = numpy.sin(- numpy.pi / 2 + gamma) * numpy.cos(delta), numpy.sin(- numpy.pi / 2 + gamma) * numpy.sin(delta), numpy.cos(- numpy.pi / 2 + gamma) kinx, kiny, kinz = numpy.sin(numpy.pi / 2 - mu), 0, numpy.cos(numpy.pi / 2 - mu) #now we rotate the frame around hexapod rotation th xaxis = numpy.array(rot(1, 0, 0, numpy.array([0, 0, 1]), theta)) yaxis = numpy.array(rot(0, 1, 0, numpy.array([0, 0, 1]), theta)) #first we rotate the sample around the xaxis koutx, kouty, koutz = rot(koutx, kouty, koutz, xaxis, chi) kinx, kiny, kinz = rot(kinx, kiny, kinz, xaxis, chi) yaxis = numpy.array(rot(yaxis[0], yaxis[1], yaxis[2], xaxis, chi)) # we also have to rotate the yaxis #then we rotate the sample around the yaxis koutx, kouty, koutz = rot(koutx, kouty, koutz, yaxis, phi) kinx, kiny, kinz = rot(kinx, kiny, kinz, yaxis, phi) #to calculate the equivalent gamma, delta and mu in the sample frame we rotate the frame around the sample z which is 0,0,1 back = numpy.arctan2(kiny, kinx) koutx, kouty, koutz = rot(koutx, kouty, koutz, numpy.array([0, 0, 1]), -back) kinx, kiny, kinz = rot(kinx, kiny, kinz, numpy.array([0, 0, 1]), -back) mu = numpy.arctan2(kinz, kinx) * numpy.ones_like(delta) delta = numpy.pi - numpy.arctan2(kouty, koutx) gamma = numpy.pi - numpy.arctan2(koutz, koutx) delta[delta > numpy.pi] -= 2 * numpy.pi gamma[gamma > numpy.pi] -= 2 * numpy.pi mu *= 1 / numpy.pi * 180 delta *= 1 / numpy.pi * 180 gamma *= 1 / numpy.pi * 180 return (gamma - mu, gamma + mu, delta)
[docs] def get_axis_labels(self): return 'g-m', 'g+m', 'delta'
[docs]class ThetaLProjection(backend.ProjectionBase): # arrays: gamma, delta # scalars: theta, mu, chi, phi
[docs] def project(self, wavelength, UB, gamma, delta, theta, mu, chi, phi): R = SixCircle.getHKL(wavelength, UB, gamma=gamma, delta=delta, theta=theta, mu=mu, chi=chi, phi=phi) shape = gamma.size, delta.size L = R[2, :].reshape(shape) theta_array = numpy.ones_like(L) * theta return (theta_array, L)
[docs] def get_axis_labels(self): return 'Theta', 'L'
[docs]class QProjection(backend.ProjectionBase):
[docs] def project(self, wavelength, UB, gamma, delta, theta, mu, chi, phi): shape = gamma.size, delta.size sixc = SixCircle.SixCircle() sixc.setLambda(wavelength) sixc.setUB(UB) R = sixc.getQSurface(gamma=gamma, delta=delta, theta=theta, mu=mu, chi=chi, phi=phi) qz = R[0, :].reshape(shape) qy = R[1, :].reshape(shape) qx = R[2, :].reshape(shape) return (qz, qy, qx)
[docs] def get_axis_labels(self): return 'qx', 'qy', 'qz'
[docs]class SphericalQProjection(QProjection):
[docs] def project(self, wavelength, UB, gamma, delta, theta, mu, chi, phi): qz, qy, qx = super(SphericalQProjection, self).project(wavelength, UB, gamma, delta, theta, mu, chi, phi) q = numpy.sqrt(qx**2 + qy**2 + qz**2) theta = numpy.arccos(qz / q) phi = numpy.arctan2(qy, qx) return (q, theta, phi)
[docs] def get_axis_labels(self): return 'Q', 'Theta', 'Phi'
[docs]class CylindricalQProjection(QProjection):
[docs] def project(self, wavelength, UB, gamma, delta, theta, mu, chi, phi): qz, qy, qx = super(CylindricalQProjection, self).project(wavelength, UB, gamma, delta, theta, mu, chi, phi) qpar = numpy.sqrt(qx**2 + qy**2) phi = numpy.arctan2(qy, qx) return (qpar, qz, phi)
[docs] def get_axis_labels(self): return 'qpar', 'qz', 'Phi'
[docs]class nrQProjection(QProjection):
[docs] def project(self, wavelength, UB, gamma, delta, theta, mu, chi, phi): qx, qy, qz = super(nrQProjection, self).project(wavelength, UB, gamma, delta, 0, mu, chi, phi) return (qx, qy, qz)
[docs] def get_axis_labels(self): return 'qx', 'qy', 'qz'
[docs]class TwoThetaProjection(SphericalQProjection):
[docs] def project(self, wavelength, UB, gamma, delta, theta, mu, chi, phi): q, theta, phi = super(TwoThetaProjection, self).project(wavelength, UB, gamma, delta, theta, mu, chi, phi) return 2 * numpy.arcsin(q * wavelength / (4 * numpy.pi)) / numpy.pi * 180, # note: we need to return a 1-tuple?
[docs] def get_axis_labels(self): return 'TwoTheta'
[docs]class Qpp(nrQProjection):
[docs] def project(self, wavelength, UB, gamma, delta, theta, mu, chi, phi): qx, qy, qz = super(Qpp, self).project(wavelength, UB, gamma, delta, theta, mu, chi, phi) qpar = numpy.sqrt(qx**2 + qy**2) qpar[numpy.sign(qx) == -1] *= -1 return (qpar, qz)
[docs] def get_axis_labels(self): return 'Qpar', 'Qz'
[docs]class GammaDeltaTheta(HKLProjection): # just passing on the coordinates, makes it easy to accurately test the theta correction
[docs] def project(self, wavelength, UB, gamma, delta, theta, mu, chi, phi): delta, gamma = numpy.meshgrid(delta, gamma) theta = theta * numpy.ones_like(delta) return (gamma, delta, theta)
[docs] def get_axis_labels(self): return 'Gamma', 'Delta', 'Theta'
[docs]class GammaDelta(HKLProjection): # just passing on the coordinates, makes it easy to accurately test the theta correction
[docs] def project(self, wavelength, UB, gamma, delta, theta, mu, chi, phi): delta, gamma = numpy.meshgrid(delta, gamma) return (gamma, delta)
[docs] def get_axis_labels(self): return 'Gamma', 'Delta'
[docs]class GammaDeltaMu(HKLProjection): # just passing on the coordinates, makes it easy to accurately test the theta correction
[docs] def project(self, wavelength, UB, gamma, delta, theta, mu, chi, phi): delta, gamma = numpy.meshgrid(delta, gamma) mu = mu * numpy.ones_like(delta) return (gamma, delta, mu)
[docs] def get_axis_labels(self): return 'Gamma', 'Delta', 'Mu'
[docs]class QTransformation(QProjection):
[docs] def project(self, wavelength, UB, gamma, delta, theta, mu, chi, phi): qx, qy, qz = super(QTransformation, self).project(wavelength, UB, gamma, delta, theta, mu, chi, phi) M = self.config.matrix q1 = qx * M[0] + qy * M[1] + qz * M[2] q2 = qx * M[3] + qy * M[4] + qz * M[5] q3 = qx * M[6] + qy * M[7] + qz * M[8] return (q1, q2, q3)
[docs] def get_axis_labels(self): return 'q1', 'q2', 'q3'
[docs] def parse_config(self, config): super(QTransformation, self).parse_config(config) self.config.matrix = util.parse_tuple(config.pop('matrix'), length=9, type=float)
[docs]class ID03Input(backend.InputBase): # OFFICIAL API dbg_scanno = None dbg_pointno = None
[docs] def generate_jobs(self, command): scans = util.parse_multi_range(','.join(command).replace(' ', ',')) if not len(scans): sys.stderr.write('error: no scans selected, nothing to do\n') for scanno in scans: util.status('processing scan {0}...'.format(scanno)) if self.config.wait_for_data: for job in self.get_delayed_jobs(scanno): yield job else: scan = self.get_scan(scanno) if self.config.pr: pointcount = self.config.pr[1] - self.config.pr[0] + 1 start = self.config.pr[0] else: start = 0 try: pointcount = scan.lines() except specfile.error: # no points continue next(self.get_images(scan, 0, pointcount-1, dry_run=True)) # dryrun if pointcount > self.config.target_weight * 1.4: for s in util.chunk_slicer(pointcount, self.config.target_weight): yield backend.Job(scan=scanno, firstpoint=start+s.start, lastpoint=start+s.stop-1, weight=s.stop-s.start) else: yield backend.Job(scan=scanno, firstpoint=start, lastpoint=start+pointcount-1, weight=pointcount)
[docs] def get_delayed_jobs(self, scanno): scan = self.get_delayed_scan(scanno) if self.config.pr: firstpoint, lastpoint = self.config.pr # firstpoint is the first index to be included, lastpoint the last index to be included. else: firstpoint, lastpoint = 0, self.target(scan) - 1 pointcount = lastpoint - firstpoint + 1 if self.is_zap(scan): # wait until the scan is finished. if not self.wait_for_points(scanno, self.target(scan), timeout=self.config.timeout): # wait for last datapoint for s in util.chunk_slicer(pointcount, self.config.target_weight): yield backend.Job(scan=scanno, firstpoint=firstpoint+s.start, lastpoint=firstpoint+s.stop-1, weight=s.stop-s.start) else: raise errors.BackendError('Image collection timed out. Zapscan was probably aborted') elif lastpoint >= 0: # scanlength is known for s in util.chunk_slicer(pointcount, self.config.target_weight): if self.wait_for_points(scanno, firstpoint + s.stop, timeout=self.config.timeout): stop = self.get_scan(scanno).lines() yield backend.Job(scan=scanno, firstpoint=firstpoint+s.start, lastpoint=stop-1, weight=s.stop-s.start) break else: yield backend.Job(scan=scanno, firstpoint=firstpoint+s.start, lastpoint=firstpoint+s.stop-1, weight=s.stop-s.start) else: # scanlength is unknown step = int(self.config.target_weight / 1.4) for start, stop in zip(itertools.count(0, step), itertools.count(step, step)): if self.wait_for_points(scanno, stop, timeout=self.config.timeout): stop = self.get_scan(scanno).lines() yield backend.Job(scan=scanno, firstpoint=start, lastpoint=stop-1, weight=stop-start) break else: yield backend.Job(scan=scanno, firstpoint=start, lastpoint=stop-1, weight=stop-start)
[docs] def process_job(self, job): super(ID03Input, self).process_job(job) scan = self.get_scan(job.scan) self.metadict = dict() try: scanparams = self.get_scan_params(scan) # wavelength, UB pointparams = self.get_point_params(scan, job.firstpoint, job.lastpoint) # 2D array of diffractometer angles + mon + transm images = self.get_images(scan, job.firstpoint, job.lastpoint) # iterator! for pp, image in zip(pointparams, images): yield self.process_image(scanparams, pp, image) util.statuseol() except Exception as exc: exc.args = errors.addmessage(exc.args, ', An error occured for scan {0} at point {1}. See above for more information'.format(self.dbg_scanno, self.dbg_pointno)) raise self.metadata.add_section('id03_backend', self.metadict)
[docs] def parse_config(self, config): super(ID03Input, self).parse_config(config) self.config.xmask = util.parse_multi_range(config.pop('xmask', None)) # Optional, select a subset of the image range in the x direction. all by default self.config.ymask = util.parse_multi_range(config.pop('ymask', None)) # Optional, select a subset of the image range in the y direction. all by default self.config.specfile = config.pop('specfile') # Location of the specfile self.config.imagefolder = config.pop('imagefolder', None) # Optional, takes specfile folder tag by default self.config.pr = config.pop('pr', None) # Optional, all range by default self.config.background = config.pop('background', None) # Optional, if supplied a space of this image is constructed self.config.th_offset = float(config.pop('th_offset', 0)) # Optional; Only used in zapscans, zero by default. self.config.wavelength = config.pop('wavelength', None) # Optional; Overrides wavelength from specfile. if self.config.wavelength is not None: self.config.wavelength = float(self.config.wavelength) if self.config.xmask is None: self.config.xmask = slice(None) if self.config.ymask is None: self.config.ymask = slice(None) self.config.maskmatrix = load_matrix(config.pop('maskmatrix', None)) # Optional, if supplied pixels where the mask is 0 will be removed if self.config.pr: self.config.pr = util.parse_tuple(self.config.pr, length=2, type=int) self.config.sdd = float(config.pop('sdd')) # sample to detector distance (mm) self.config.pixelsize = util.parse_tuple(config.pop('pixelsize'), length=2, type=float) # pixel size x/y (mm) (same dimension as sdd) self.config.wait_for_data = util.parse_bool(config.pop('wait_for_data', 'false')) # Optional, if true wait until the data appears self.config.timeout = int(config.pop('timeout', 180)) # Optional, how long the script wait until it assumes the scan is not continuing
[docs] def get_destination_options(self, command): if not command: return False command = ','.join(command).replace(' ', ',') scans = util.parse_multi_range(command) return dict(first=min(scans), last=max(scans), range=','.join(str(scan) for scan in scans))
# CONVENIENCE FUNCTIONS
[docs] def get_scan(self, scannumber): spec = specfilewrapper.Specfile(self.config.specfile) return spec.select('{0}.1'.format(scannumber))
[docs] def get_delayed_scan(self, scannumber, timeout=None): delay = util.loop_delayer(5) start = time.time() while 1: try: return self.get_scan(scannumber) # reload entire specfile except specfile.error: if timeout is not None and time.time() - start > timeout: raise errors.BackendError('Scan timed out. There is no data to process') else: util.status('waiting for scan {0}...'.format(scannumber)) next(delay)
[docs] def wait_for_points(self, scannumber, stop, timeout=None): delay = util.loop_delayer(1) start = time.time() while 1: scan = self.get_scan(scannumber) try: if scan.lines() >= stop: next(delay) # time delay between specfile and edf file return False except specfile.error: pass finally: next(delay) util.status('waiting for scan {0}, point {1}...'.format(scannumber, stop)) if (timeout is not None and time.time() - start > timeout) or self.is_aborted(scan): try: util.statusnl('scan {0} aborted at point {1}'.format(scannumber, scan.lines())) return True except specfile.error: raise errors.BackendError('Scan was aborted before images were collected. There is no data to process')
[docs] def target(self, scan): if any(tuple(scan.command().startswith(pattern) for pattern in ['hklscan', 'a2scan', 'ascan', 'ringscan'])): return int(scan.command().split()[-2]) + 1 elif scan.command().startswith('mesh'): return int(scan.command().split()[-6]) * int(scan.command().split()[-2]) + 1 elif scan.command().startswith('loopscan'): return int(scan.command().split()[-3]) elif scan.command().startswith('xascan'): params = numpy.array(scan.command().split()[-6:]).astype(float) return int(params[2] + 1 + (params[4] - 1) / params[5] * params[2]) elif self.is_zap(scan): return int(scan.command().split()[-2]) else: return -1
[docs] @staticmethod def is_zap(scan): return scan.command().startswith('zap')
[docs] @staticmethod def is_aborted(scan): for line in scan.header('C'): if 'Scan aborted' in line: return True return False
[docs] def find_edfs(self, pattern, scanno): files = glob.glob(pattern) ret = {} for file in files: try: filename = os.path.basename(file).split('.')[0] scan, point, image = filename.split('_')[-3:] scan, point, image = int(scan), int(point), int(image) if scan == scanno and point not in list(ret.keys()): ret[point] = file except ValueError: continue return ret
[docs] @staticmethod def apply_mask(data, xmask, ymask): roi = data[ymask, :] return roi[:, xmask]
[docs] def get_wavelength(self, G): for line in G: if line.startswith('#G4'): return float(line.split(' ')[4]) return None
# MAIN LOGIC
[docs] def get_scan_params(self, scan): self.dbg_scanno = scan.number() if self.is_zap(scan): # zapscans don't contain the UB matrix, this needs to be fixed at ID03 scanno = scan.number() UB = None while 1: # look back in spec file to locate a UB matrix try: ubscan = self.get_scan(scanno) except specfilewrapper.specfile.error: break try: UB = numpy.array(ubscan.header('G')[2].split(' ')[-9:], dtype=numpy.float) except: scanno -= 1 else: break if UB is None: # fall back to UB matrix from the configfile if not self.config.UB: raise errors.ConfigError('UB matrix must be specified in configuration file when processing zapscans') UB = numpy.array(self.config.UB) else: UB = numpy.array(scan.header('G')[2].split(' ')[-9:], dtype=numpy.float) if self.config.wavelength is None: wavelength = self.get_wavelength(scan.header('G')) if wavelength is None or wavelength == 0: raise errors.BackendError('No or incorrect wavelength specified in the specfile. Please add wavelength to the configfile in the input section') else: wavelength = self.config.wavelength self.metadict['UB'] = UB self.metadict['wavelength'] = wavelength return wavelength, UB
[docs] def get_images(self, scan, first, last, dry_run=False): if self.config.background: if not os.path.exists(self.config.background): raise errors.FileError('could not find background file {0}'.format(self.config.background)) if dry_run: yield else: edf = EdfFile.EdfFile(self.config.background) for i in range(first, last+1): self.dbg_pointno = i yield edf.GetData(0) else: if self.is_zap(scan): scanheaderC = scan.header('C') zapscanno = int(scanheaderC[2].split(' ')[-1]) # is different from scanno should be changed in spec! try: uccdtagline = scanheaderC[0] UCCD = os.path.split(uccdtagline.split()[-1]) except: print('warning: UCCD tag not found, use imagefolder for proper file specification') UCCD = [] pattern = self._get_pattern(UCCD) matches = self.find_edfs(pattern, zapscanno) if 0 not in matches: raise errors.FileError('could not find matching edf for zapscannumber {0} using pattern {1}'.format(zapscanno, pattern)) if dry_run: yield else: edf = EdfFile.EdfFile(matches[0]) for i in range(first, last+1): self.dbg_pointno = i yield edf.GetData(i) else: try: uccdtagline = scan.header('UCCD')[0] UCCD = os.path.split(os.path.dirname(uccdtagline.split()[-1])) except: print('warning: UCCD tag not found, use imagefolder for proper file specification') UCCD = [] pattern = self._get_pattern(UCCD) matches = self.find_edfs(pattern, scan.number()) if set(range(first, last + 1)) > set(matches.keys()): raise errors.FileError("incorrect number of matches for scan {0} using pattern {1}".format(scan.number(), pattern)) if dry_run: yield else: for i in range(first, last+1): self.dbg_pointno = i edf = EdfFile.EdfFile(matches[i]) yield edf.GetData(0)
def _get_pattern(self, UCCD): imagefolder = self.config.imagefolder if imagefolder: try: imagefolder = imagefolder.format(UCCD=UCCD, rUCCD=list(reversed(UCCD))) except Exception as e: raise errors.ConfigError("invalid 'imagefolder' specification '{0}': {1}".format(self.config.imagefolder, e)) else: if not os.path.exists(imagefolder): raise errors.ConfigError("invalid 'imagefolder' specification '{0}'. Path {1} does not exist".format(self.config.imagefolder, imagefolder)) else: imagefolder = os.path.join(*UCCD) if not os.path.exists(imagefolder): raise errors.ConfigError("invalid UCCD tag '{0}'. The UCCD tag in the specfile does not point to an existing folder. Specify the imagefolder in the configuration file.".format(imagefolder)) return os.path.join(imagefolder, '*')
[docs]class EH1(ID03Input): monitor_counter = 'mon'
[docs] def parse_config(self, config): super(EH1, self).parse_config(config) self.config.centralpixel = util.parse_tuple(config.pop('centralpixel'), length=2, type=int) # x,y self.config.hr = config.pop('hr', None) # Optional, hexapod rotations in miliradians. At the entered value the sample is assumed flat, if not entered the sample is assumed flat at the spec values. self.config.UB = config.pop('ub', None) # Optional, takes specfile matrix by default if self.config.UB: self.config.UB = util.parse_tuple(self.config.UB, length=9, type=float) if self.config.hr: self.config.hr = util.parse_tuple(self.config.hr, length=2, type=float)
[docs] def process_image(self, scanparams, pointparams, image): gamma, delta, theta, chi, phi, mu, mon, transm, hrx, hry = pointparams wavelength, UB = scanparams weights = numpy.ones_like(image) if self.config.hr: zerohrx, zerohry = self.config.hr chi = (hrx - zerohrx) / numpy.pi * 180. / 1000 phi = (hry - zerohry) / numpy.pi * 180. / 1000 if self.config.background: data = image / mon else: data = image / mon / transm if mon == 0: raise errors.BackendError('Monitor is zero, this results in empty output. Scannumber = {0}, pointnumber = {1}. Did you forget to open the shutter?'.format(self.dbg_scanno, self.dbg_pointno)) util.status('{4}| gamma: {0}, delta: {1}, theta: {2}, mu: {3}'.format(gamma, delta, theta, mu, time.ctime(time.time()))) # pixels to angles pixelsize = numpy.array(self.config.pixelsize) sdd = self.config.sdd app = numpy.arctan(pixelsize / sdd) * 180 / numpy.pi centralpixel = self.config.centralpixel # (column, row) = (delta, gamma) gamma_range = -app[1] * (numpy.arange(data.shape[1]) - centralpixel[1]) + gamma delta_range = app[0] * (numpy.arange(data.shape[0]) - centralpixel[0]) + delta # masking if self.config.maskmatrix is not None: if self.config.maskmatrix.shape != data.shape: raise errors.BackendError('The mask matrix does not have the same shape as the images') weights *= self.config.maskmatrix gamma_range = gamma_range[self.config.ymask] delta_range = delta_range[self.config.xmask] intensity = self.apply_mask(data, self.config.xmask, self.config.ymask) weights = self.apply_mask(weights, self.config.xmask, self.config.ymask) #polarisation correction delta_grid, gamma_grid = numpy.meshgrid(delta_range, gamma_range) Pver = 1 - numpy.sin(delta_grid * numpy.pi / 180.)**2 * numpy.cos(gamma_grid * numpy.pi / 180.)**2 intensity /= Pver return intensity, weights, (wavelength, UB, gamma_range, delta_range, theta, mu, chi, phi)
[docs] def get_point_params(self, scan, first, last): sl = slice(first, last+1) GAM, DEL, TH, CHI, PHI, MU, MON, TRANSM, HRX, HRY = list(range(10)) params = numpy.zeros((last - first + 1, 10)) # gamma delta theta chi phi mu mon transm params[:, CHI] = scan.motorpos('Chi') params[:, PHI] = scan.motorpos('Phi') try: params[:, HRX] = scan.motorpos('hrx') params[:, HRY] = scan.motorpos('hry') except: raise errors.BackendError('The specfile does not accept hrx and hry as a motor label. Have you selected the right hutch? Scannumber = {0}, pointnumber = {1}'.format(self.dbg_scanno, self.dbg_pointno)) if self.is_zap(scan): if 'th' in scan.alllabels(): th = scan.datacol('th')[sl] if len(th) > 1: sign = numpy.sign(th[1] - th[0]) else: sign = 1 # correction for difference between back and forth in th motor params[:, TH] = th + sign * self.config.th_offset else: params[:, TH] = scan.motorpos('Theta') params[:, GAM] = scan.motorpos('Gam') params[:, DEL] = scan.motorpos('Delta') params[:, MU] = scan.motorpos('Mu') params[:, MON] = scan.datacol('zap_mon')[sl] transm = scan.datacol('zap_transm') transm[-1] = transm[-2] # bug in specfile params[:, TRANSM] = transm[sl] else: if 'hrx' in scan.alllabels(): params[:, HRX] = scan.datacol('hrx')[sl] if 'hry' in scan.alllabels(): params[:, HRY] = scan.datacol('hry')[sl] params[:, TH] = scan.datacol('thcnt')[sl] params[:, GAM] = scan.datacol('gamcnt')[sl] params[:, DEL] = scan.datacol('delcnt')[sl] try: params[:, MON] = scan.datacol(self.monitor_counter)[sl] # differs in EH1/EH2 except: raise errors.BackendError('The specfile does not accept {2} as a monitor label. Have you selected the right hutch? Scannumber = {0}, pointnumber = {1}'.format(self.dbg_scanno, self.dbg_pointno, self.monitor_counter)) params[:, TRANSM] = scan.datacol('transm')[sl] params[:, MU] = scan.datacol('mucnt')[sl] return params
[docs]class EH2(ID03Input): monitor_counter = 'Monitor'
[docs] def parse_config(self, config): super(EH2, self).parse_config(config) self.config.centralpixel = util.parse_tuple(config.pop('centralpixel'), length=2, type=int) # x,y self.config.UB = config.pop('ub', None) # Optional, takes specfile matrix by default if self.config.UB: self.config.UB = util.parse_tuple(self.config.UB, length=9, type=float)
[docs] def process_image(self, scanparams, pointparams, image): gamma, delta, theta, chi, phi, mu, mon, transm = pointparams wavelength, UB = scanparams weights = numpy.ones_like(image) if self.config.background: data = image / mon else: data = image / mon / transm if mon == 0: raise errors.BackendError('Monitor is zero, this results in empty output. Scannumber = {0}, pointnumber = {1}. Did you forget to open the shutter?'.format(self.dbg_scanno, self.dbg_pointno)) util.status('{4}| gamma: {0}, delta: {1}, theta: {2}, mu: {3}'.format(gamma, delta, theta, mu, time.ctime(time.time()))) # area correction sdd = self.config.sdd / numpy.cos(gamma * numpy.pi / 180) data *= (self.config.sdd / sdd)**2 # pixels to angles pixelsize = numpy.array(self.config.pixelsize) app = numpy.arctan(pixelsize / sdd) * 180 / numpy.pi centralpixel = self.config.centralpixel # (row, column) = (gamma, delta) gamma_range = - 1 * app[0] * (numpy.arange(data.shape[0]) - centralpixel[0]) + gamma delta_range = app[1] * (numpy.arange(data.shape[1]) - centralpixel[1]) + delta # masking if self.config.maskmatrix is not None: if self.config.maskmatrix.shape != data.shape: raise errors.BackendError('The mask matrix does not have the same shape as the images') weights *= self.config.maskmatrix gamma_range = gamma_range[self.config.xmask] delta_range = delta_range[self.config.ymask] intensity = self.apply_mask(data, self.config.xmask, self.config.ymask) weights = self.apply_mask(weights, self.config.xmask, self.config.ymask) intensity = numpy.fliplr(intensity) intensity = numpy.rot90(intensity) weights = numpy.fliplr(weights) # TODO: should be done more efficiently. Will prob change with new HKL calculations weights = numpy.rot90(weights) #polarisation correction delta_grid, gamma_grid = numpy.meshgrid(delta_range, gamma_range) Phor = 1 - (numpy.sin(mu * numpy.pi / 180.) * numpy.sin(delta_grid * numpy.pi / 180.) * numpy.cos(gamma_grid * numpy.pi / 180.) + numpy.cos(mu * numpy.pi / 180.) * numpy.sin(gamma_grid * numpy.pi / 180.))**2 intensity /= Phor return intensity, weights, (wavelength, UB, gamma_range, delta_range, theta, mu, chi, phi)
[docs] def get_point_params(self, scan, first, last): sl = slice(first, last+1) GAM, DEL, TH, CHI, PHI, MU, MON, TRANSM = list(range(8)) params = numpy.zeros((last - first + 1, 8)) # gamma delta theta chi phi mu mon transm params[:, CHI] = scan.motorpos('Chi') params[:, PHI] = scan.motorpos('Phi') if self.is_zap(scan): if 'th' in scan.alllabels(): th = scan.datacol('th')[sl] if len(th) > 1: sign = numpy.sign(th[1] - th[0]) else: sign = 1 # correction for difference between back and forth in th motor params[:, TH] = th + sign * self.config.th_offset else: params[:, TH] = scan.motorpos('Theta') params[:, GAM] = scan.motorpos('Gamma') params[:, DEL] = scan.motorpos('Delta') params[:, MU] = scan.motorpos('Mu') params[:, MON] = scan.datacol('zap_mon')[sl] transm = scan.datacol('zap_transm') transm[-1] = transm[-2] # bug in specfile params[:, TRANSM] = transm[sl] else: params[:, TH] = scan.datacol('thcnt')[sl] params[:, GAM] = scan.datacol('gamcnt')[sl] params[:, DEL] = scan.datacol('delcnt')[sl] try: params[:, MON] = scan.datacol(self.monitor_counter)[sl] # differs in EH1/EH2 except: raise errors.BackendError('The specfile does not accept {2} as a monitor label. Have you selected the right hutch? Scannumber = {0}, pointnumber = {1}'.format(self.dbg_scanno, self.dbg_pointno, self.monitor_counter)) params[:, TRANSM] = scan.datacol('transm')[sl] params[:, MU] = scan.datacol('mucnt')[sl] return params
[docs]class GisaxsDetector(ID03Input): monitor_counter = 'mon'
[docs] def process_image(self, scanparams, pointparams, image): ccdy, ccdz, theta, chi, phi, mu, mon, transm = pointparams weights = numpy.ones_like(image) wavelength, UB = scanparams if self.config.background: data = image / mon else: data = image / mon / transm if mon == 0: raise errors.BackendError('Monitor is zero, this results in empty output. Scannumber = {0}, pointnumber = {1}. Did you forget to open the shutter?'.format(self.dbg_scanno, self.dbg_pointno)) util.status('{4}| ccdy: {0}, ccdz: {1}, theta: {2}, mu: {3}'.format(ccdy, ccdz, theta, mu, time.ctime(time.time()))) # pixels to angles pixelsize = numpy.array(self.config.pixelsize) sdd = self.config.sdd directbeam = (self.config.directbeam[0] - (ccdy - self.config.directbeam_coords[0]) * pixelsize[0], self.config.directbeam[1] - (ccdz - self.config.directbeam_coords[1]) * pixelsize[1]) gamma_distance = - pixelsize[1] * (numpy.arange(data.shape[1]) - directbeam[1]) delta_distance = - pixelsize[0] * (numpy.arange(data.shape[0]) - directbeam[0]) gamma_range = numpy.arctan2(gamma_distance, sdd) / numpy.pi * 180 - mu delta_range = numpy.arctan2(delta_distance, sdd) / numpy.pi * 180 #sample pixel distance spd = numpy.sqrt(gamma_distance**2 + delta_distance**2 + sdd**2) data *= spd**2 / sdd # masking if self.config.maskmatrix is not None: if self.config.maskmatrix.shape != data.shape: raise errors.BackendError('The mask matrix does not have the same shape as the images') weights *= self.config.maskmatrix gamma_range = gamma_range[self.config.ymask] delta_range = delta_range[self.config.xmask] intensity = self.apply_mask(data, self.config.xmask, self.config.ymask) weights = self.apply_mask(weights, self.config.xmask, self.config.ymask) return intensity, weights, (wavelength, UB, gamma_range, delta_range, theta, mu, chi, phi)
[docs] def parse_config(self, config): super(GisaxsDetector, self).parse_config(config) self.config.directbeam = util.parse_tuple(config.pop('directbeam'), length=2, type=int) self.config.directbeam_coords = util.parse_tuple(config.pop('directbeam_coords'), length=2, type=float) # Coordinates of ccdy and ccdz at the direct beam position
[docs] def get_point_params(self, scan, first, last): sl = slice(first, last+1) CCDY, CCDZ, TH, CHI, PHI, MU, MON, TRANSM = list(range(8)) params = numpy.zeros((last - first + 1, 8)) # gamma delta theta chi phi mu mon transm params[:, CHI] = scan.motorpos('Chi') params[:, PHI] = scan.motorpos('Phi') params[:, CCDY] = scan.motorpos('ccdy') params[:, CCDZ] = scan.motorpos('ccdz') params[:, TH] = scan.datacol('thcnt')[sl] try: params[:, MON] = scan.datacol(self.monitor_counter)[sl] # differs in EH1/EH2 except: raise errors.BackendError('The specfile does not accept {2} as a monitor label. Have you selected the right hutch? Scannumber = {0}, pointnumber = {1}'.format(self.dbg_scanno, self.dbg_pointno, self.monitor_counter)) params[:, TRANSM] = scan.datacol('transm')[sl] params[:, MU] = scan.datacol('mucnt')[sl] return params
[docs] def find_edfs(self, pattern, scanno): files = glob.glob(pattern) ret = {} for file in files: try: filename = os.path.basename(file).split('.')[0] scan, point = filename.split('_')[-2:] scan, point = int(scan), int(point) if scan == scanno and point not in list(ret.keys()): ret[point] = file except ValueError: continue return ret
[docs]def load_matrix(filename): if filename == None: return None if os.path.exists(filename): ext = os.path.splitext(filename)[-1] if ext == '.txt': return numpy.array(numpy.loadtxt(filename), dtype=numpy.bool) elif ext == '.npy': return numpy.array(numpy.load(filename), dtype=numpy.bool) elif ext == '.edf': return numpy.array(EdfFile.EdfFile(filename).getData(0), dtype=numpy.bool) else: raise ValueError('unknown extension {0}, unable to load matrix!\n'.format(ext)) else: raise IOError('filename: {0} does not exist. Can not load matrix'.format(filename))