Source code for vxi11.vxi11

"""

Python VXI-11 driver

Copyright (c) 2012-2014 Alex Forencich and Michael Walle

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

"""

from . import rpc
import random
import re

# VXI-11 RPC constants

# Device async
DEVICE_ASYNC_PROG = 0x0607b0
DEVICE_ASYNC_VERS = 1
DEVICE_ABORT      = 1

# Device core
DEVICE_CORE_PROG  = 0x0607af
DEVICE_CORE_VERS  = 1
CREATE_LINK       = 10
DEVICE_WRITE      = 11
DEVICE_READ       = 12
DEVICE_READSTB    = 13
DEVICE_TRIGGER    = 14
DEVICE_CLEAR      = 15
DEVICE_REMOTE     = 16
DEVICE_LOCAL      = 17
DEVICE_LOCK       = 18
DEVICE_UNLOCK     = 19
DEVICE_ENABLE_SRQ = 20
DEVICE_DOCMD      = 22
DESTROY_LINK      = 23
CREATE_INTR_CHAN  = 25
DESTROY_INTR_CHAN = 26

# Device intr
DEVICE_INTR_PROG  = 0x0607b1
DEVICE_INTR_VERS  = 1
DEVICE_INTR_SRQ   = 30

# Error states
ERR_NO_ERROR = 0
ERR_SYNTAX_ERROR = 1
ERR_DEVICE_NOT_ACCESSIBLE = 3
ERR_INVALID_LINK_IDENTIFIER = 4
ERR_PARAMETER_ERROR = 5
ERR_CHANNEL_NOT_ESTABLISHED = 6
ERR_OPERATION_NOT_SUPPORTED = 8
ERR_OUT_OF_RESOURCES = 9
ERR_DEVICE_LOCKED_BY_ANOTHER_LINK = 11
ERR_NO_LOCK_HELD_BY_THIS_LINK = 12
ERR_IO_TIMEOUT = 15
ERR_IO_ERROR = 17
ERR_INVALID_ADDRESS = 21
ERR_ABORT = 23
ERR_CHANNEL_ALREADY_ESTABLISHED = 29

# Flags
OP_FLAG_WAIT_BLOCK = 1
OP_FLAG_END = 8
OP_FLAG_TERMCHAR_SET = 128

RX_REQCNT = 1
RX_CHR = 2
RX_END = 4

def parse_visa_resource_string(resource_string):
    # valid resource strings:
    # TCPIP::10.0.0.1::INSTR
    # TCPIP0::10.0.0.1::INSTR
    # TCPIP::10.0.0.1::gpib,5::INSTR
    # TCPIP0::10.0.0.1::gpib,5::INSTR
    # TCPIP0::10.0.0.1::usb0::INSTR
    # TCPIP0::10.0.0.1::usb0[1234::5678::MYSERIAL::0]::INSTR
    m = re.match('^(?P<prefix>(?P<type>TCPIP)\d*)(::(?P<arg1>[^\s:]+))'
            '(::(?P<arg2>[^\s:]+(\[.+\])?))?(::(?P<suffix>INSTR))$',
            resource_string, re.I)

    if m is not None:
        return dict(
                type = m.group('type').upper(),
                prefix = m.group('prefix'),
                arg1 = m.group('arg1'),
                arg2 = m.group('arg2'),
                suffix = m.group('suffix'),
        )

# Exceptions
class Vxi11Exception(Exception):
    em = {0:  "No error",
          1:  "Syntax error",
          3:  "Device not accessible",
          4:  "Invalid link identifier",
          5:  "Parameter error",
          6:  "Channel not established",
          8:  "Operation not supported",
          9:  "Out of resources",
          11: "Device locked by another link",
          12: "No lock held by this link",
          15: "IO timeout",
          17: "IO error",
          21: "Invalid address",
          23: "Abort",
          29: "Channel already established"}

    def __init__(self, err = None, note = None):
        self.err = err
        self.note = note
        self.msg = ''

        if err is None:
            self.msg = note
        else:
            if type(err) is int:
                if err in self.em:
                    self.msg = "%d: %s" % (err, self.em[err])
                else:
                    self.msg = "%d: Unknown error" % err
            else:
                self.msg = err
            if note is not None:
                self.msg = "%s [%s]" % (self.msg, note)

    def __str__(self):
        return self.msg

class Packer(rpc.Packer):
    def pack_device_link(self, link):
        self.pack_int(link)

    def pack_create_link_parms(self, params):
        id, lock_device, lock_timeout, device = params
        self.pack_int(id)
        self.pack_bool(lock_device)
        self.pack_uint(lock_timeout)
        self.pack_string(device)

    def pack_device_write_parms(self, params):
        link, timeout, lock_timeout, flags, data = params
        self.pack_int(link)
        self.pack_uint(timeout)
        self.pack_uint(lock_timeout)
        self.pack_int(flags)
        self.pack_opaque(data)

    def pack_device_read_parms(self, params):
        link, request_size, timeout, lock_timeout, flags, term_char = params
        self.pack_int(link)
        self.pack_uint(request_size)
        self.pack_uint(timeout)
        self.pack_uint(lock_timeout)
        self.pack_int(flags)
        self.pack_int(term_char)

    def pack_device_generic_parms(self, params):
        link, flags, lock_timeout, timeout = params
        self.pack_int(link)
        self.pack_int(flags)
        self.pack_uint(lock_timeout)
        self.pack_uint(timeout)

    def pack_device_remote_func_parms(self, params):
        host_addr, host_port, prog_num, prog_vers, prog_family = params
        self.pack_uint(host_addr)
        self.pack_uint(host_port)
        self.pack_uint(prog_num)
        self.pack_uint(prog_vers)
        self.pack_int(prog_family)

    def pack_device_enable_srq_parms(self, params):
        link, enable, handle = params
        self.pack_int(link)
        self.pack_bool(enable)
        if len(handle) > 40:
            raise Vxi11Exception("array length too long")
        self.pack_opaque(handle)

    def pack_device_lock_parms(self, params):
        link, flags, lock_timeout = params
        self.pack_int(link)
        self.pack_int(flags)
        self.pack_uint(lock_timeout)

    def pack_device_docmd_parms(self, params):
        link, flags, timeout, lock_timeout, cmd, network_order, datasize, data_in = params
        self.pack_int(link)
        self.pack_int(flags)
        self.pack_uint(timeout)
        self.pack_uint(lock_timeout)
        self.pack_int(cmd)
        self.pack_bool(network_order)
        self.pack_int(datasize)
        self.pack_opaque(data_in)

    def pack_device_error(self, error):
        self.pack_int(error)

    def pack_device_srq_parms(self, params):
        handle = params
        self.pack_opaque(handle)

    def pack_create_link_resp(self, params):
        error, link, abort_port, max_recv_size = params
        self.pack_int(error)
        self.pack_int(link)
        self.pack_uint(abort_port)
        self.pack_uint(max_recv_size)

    def pack_device_write_resp(self, params):
        error, size = params
        self.pack_int(error)
        self.pack_uint(size)

    def pack_device_read_resp(self, params):
        error, reason, data = params
        self.pack_int(error)
        self.pack_int(reason)
        self.pack_opaque(data)

    def pack_device_read_stb_resp(self, params):
        error, stb = params
        self.pack_int(error)
        self.pack_uint(stb)

    def pack_device_docmd_resp(self, params):
        error, data_out = params
        self.pack_int(error)
        self.pack_opaque(data_out)

class Unpacker(rpc.Unpacker):
    def unpack_device_link(self):
        return self.unpack_int()

    def unpack_create_link_parms(self):
        id = self.unpack_int()
        lock_device = self.unpack_bool()
        lock_timeout = self.unpack_uint()
        device = self.unpack_string()
        return id, lock_device, lock_timeout, device

    def unpack_device_write_parms(self):
        link = self.unpack_int()
        timeout = self.unpack_uint()
        lock_timeout = self.unpack_uint()
        flags = self.unpack_int()
        data = self.unpack_opaque()
        return link, timeout, lock_timeout, flags, data

    def unpack_device_read_parms(self):
        link = self.unpack_int()
        request_size = self.unpack_uint()
        timeout = self.unpack_uint()
        lock_timeout = self.unpack_uint()
        flags = self.unpack_int()
        term_char = self.unpack_int()
        return link, request_size, timeout, lock_timeout, flags, term_char

    def unpack_device_generic_parms(self):
        link = self.unpack_int()
        flags = self.unpack_int()
        lock_timeout = self.unpack_uint()
        timeout = self.unpack_uint()
        return link, flags, lock_timeout, timeout

    def unpack_device_remote_func_parms(self):
        host_addr = self.unpack_uint()
        host_port = self.unpack_uint()
        prog_num = self.unpack_uint()
        prog_vers = self.unpack_uint()
        prog_family = self.unpack_int()
        return host_addr, host_port, prog_num, prog_vers, prog_family

    def unpack_device_enable_srq_parms(self):
        link = self.unpack_int()
        enable = self.unpack_bool()
        handle = self.unpack_opaque()
        return link, enable, handle

    def unpack_device_lock_parms(self):
        link = self.unpack_int()
        flags = self.unpack_int()
        lock_timeout = self.unpack_uint()
        return link, flags, lock_timeout

    def pack_device_docmd_parms(self):
        link = self.unpack_int()
        flags = self.unpack_int()
        timeout = self.unpack_uint()
        lock_timeout = self.unpack_uint()
        cmd = self.unpack_int()
        network_order = self.unpack_bool()
        datasize = self.unpack_int()
        data_in = self.unpack_opaque()
        return link, flags, timeout, lock_timeout, cmd, network_order, datasize, data_in

    def unpack_device_error(self):
        return self.unpack_int()

    def unpack_device_srq_params(self):
        handle = self.unpack_opaque()
        return handle

    def unpack_create_link_resp(self):
        error = self.unpack_int()
        link = self.unpack_int()
        abort_port = self.unpack_uint()
        max_recv_size = self.unpack_uint()
        return error, link, abort_port, max_recv_size

    def unpack_device_write_resp(self):
        error = self.unpack_int()
        size = self.unpack_uint()
        return error, size

    def unpack_device_read_resp(self):
        error = self.unpack_int()
        reason = self.unpack_int()
        data = self.unpack_opaque()
        return error, reason, data

    def unpack_device_read_stb_resp(self):
        error = self.unpack_int()
        stb = self.unpack_uint()
        return error, stb

    def unpack_device_docmd_resp(self):
        error = self.unpack_int()
        data_out = self.unpack_opaque()
        return error, data_out

    def done(self):
        # ignore any trailing bytes
        pass


class CoreClient(rpc.TCPClient):
    def __init__(self, host, port=0):
        self.packer = Packer()
        self.unpacker = Unpacker('')
        rpc.TCPClient.__init__(self, host, DEVICE_CORE_PROG, DEVICE_CORE_VERS, port)

    def create_link(self, id, lock_device, lock_timeout, name):
        params = (id, lock_device, lock_timeout, name)
        return self.make_call(CREATE_LINK, params,
                self.packer.pack_create_link_parms,
                self.unpacker.unpack_create_link_resp)

    def device_write(self, link, timeout, lock_timeout, flags, data):
        params = (link, timeout, lock_timeout, flags, data)
        return self.make_call(DEVICE_WRITE, params,
                self.packer.pack_device_write_parms,
                self.unpacker.unpack_device_write_resp)

    def device_read(self, link, request_size, timeout, lock_timeout, flags, term_char):
        params = (link, request_size, timeout, lock_timeout, flags, term_char)
        return self.make_call(DEVICE_READ, params,
                self.packer.pack_device_read_parms,
                self.unpacker.unpack_device_read_resp)

    def device_read_stb(self, link, flags, lock_timeout, timeout):
        params = (link, flags, lock_timeout, timeout)
        return self.make_call(DEVICE_READSTB, params,
                self.packer.pack_device_generic_parms,
                self.unpacker.unpack_device_read_stb_resp)

    def device_trigger(self, link, flags, lock_timeout, timeout):
        params = (link, flags, lock_timeout, timeout)
        return self.make_call(DEVICE_TRIGGER, params,
                self.packer.pack_device_generic_parms,
                self.unpacker.unpack_device_error)

    def device_clear(self, link, flags, lock_timeout, timeout):
        params = (link, flags, lock_timeout, timeout)
        return self.make_call(DEVICE_CLEAR, params,
                self.packer.pack_device_generic_parms,
                self.unpacker.unpack_device_error)

    def device_remote(self, link, flags, lock_timeout, timeout):
        params = (link, flags, lock_timeout, timeout)
        return self.make_call(DEVICE_REMOTE, params,
                self.packer.pack_device_generic_parms,
                self.unpacker.unpack_device_error)

    def device_local(self, link, flags, lock_timeout, timeout):
        params = (link, flags, lock_timeout, timeout)
        return self.make_call(DEVICE_LOCAL, params,
                self.packer.pack_device_generic_parms,
                self.unpacker.unpack_device_error)

    def device_lock(self, link, flags, lock_timeout):
        params = (link, flags, lock_timeout)
        return self.make_call(DEVICE_LOCK, params,
                self.packer.pack_device_lock_parms,
                self.unpacker.unpack_device_error)

    def device_unlock(self, link):
        return self.make_call(DEVICE_UNLOCK, link,
                self.packer.pack_device_link,
                self.unpacker.unpack_device_error)

    def device_enable_srq(self, link, enable, handle):
        params = (link, enable, handle)
        return self.make_call(DEVICE_ENABLE_SRQ, params,
                self.packer.pack_device_enable_srq_parms,
                self.unpacker.unpack_device_error)

    def device_docmd(self, link, flags, timeout, lock_timeout, cmd, network_order, datasize, data_in):
        params = (link, flags, timeout, lock_timeout, cmd, network_order, datasize, data_in)
        return self.make_call(DEVICE_DOCMD, params,
                self.packer.pack_device_docmd_parms,
                self.unpacker.unpack_device_docmd_resp)

    def destroy_link(self, link):
        return self.make_call(DESTROY_LINK, link,
                self.packer.pack_device_link,
                self.unpacker.unpack_device_error)

    def create_intr_chan(self, host_addr, host_port, prog_num, prog_vers, prog_family):
        params = (host_addr, host_port, prog_num, prog_vers, prog_family)
        return self.make_call(CREATE_INTR_CHAN, params,
                self.packer.pack_device_docmd_parms,
                self.unpacker.unpack_device_error)

    def destroy_intr_chan(self):
        return self.make_call(DESTROY_INTR_CHAN, None,
                None,
                self.unpacker.unpack_device_error)


class AbortClient(rpc.TCPClient):
    def __init__(self, host, port=0):
        self.packer = Packer()
        self.unpacker = Unpacker('')
        rpc.TCPClient.__init__(self, host, DEVICE_ASYNC_PROG, DEVICE_ASYNC_VERS, port)

    def device_abort(self, link):
        return self.make_call(DEVICE_ABORT, link,
                self.packer.pack_device_link,
                self.unpacker.unpack_device_error)


[docs]class Instrument(object): "VXI-11 instrument interface client" def __init__(self, host, name = None, client_id = None, term_char = None): "Create new VXI-11 instrument object" if host.upper().startswith('TCPIP') and '::' in host: res = parse_visa_resource_string(host) if res is None: raise Vxi11Exception('Invalid resource string', 'init') host = res['arg1'] name = res['arg2'] self.client = CoreClient(host) self.abort_client = None self.host = host self.name = name self.client_id = client_id self.term_char = term_char self.lock_timeout = 10 self.timeout = 10 self.abort_port = 0 self.link = None self.max_recv_size = 0 if self.name is None: self.name = "inst0" if self.client_id is None: self.client_id = random.getrandbits(31) @property def timeout(self): return self._timeout @timeout.setter def timeout(self, val): self._timeout = val self._timeout_ms = int(val * 1000) if self.client is not None: self.client.sock.settimeout(self.timeout+1) if self.abort_client is not None: self.abort_client.sock.settimeout(self.timeout+1) @property def lock_timeout(self): return self._lock_timeout @lock_timeout.setter def lock_timeout(self, val): self._lock_timeout = val self._lock_timeout_ms = int(val * 1000)
[docs] def open(self): "Open connection to VXI-11 instrument" if self.client is None: self.client = CoreClient(self.host) self.client.sock.settimeout(self.timeout+1) error, link, abort_port, max_recv_size = self.client.create_link(self.client_id, 0, self._lock_timeout_ms, self.name.encode("utf-8")) if error: raise Vxi11Exception(error, 'open') self.abort_port = abort_port self.abort_client = AbortClient(self.host, abort_port) self.abort_client.sock.settimeout(self.timeout) self.link = link self.max_recv_size = min(max_recv_size, 1073741824)
[docs] def close(self): "Close connection" self.client.destroy_link(self.link) self.client.close() self.link = None self.client = None
[docs] def abort(self): "Asynchronous abort" if self.link is None: self.open() self.abort_client.sock.settimeout(1.0) error = self.abort_client.device_abort(self.link) if error: raise Vxi11Exception(error, 'abort')
[docs] def write_raw(self, data): "Write binary data to instrument" if self.link is None: self.open() if self.term_char is not None: flags = OP_FLAG_TERMCHAR_SET term_char = str(self.term_char).encode('utf-8')[0] data += term_char flags = 0 num = len(data) offset = 0 while num > 0: if num <= self.max_recv_size: flags |= OP_FLAG_END block = data[offset:offset+self.max_recv_size] error, size = self.client.device_write(self.link, self._timeout_ms, self._lock_timeout_ms, flags, block) if error: raise Vxi11Exception(error, 'write') elif size < len(block): raise Vxi11Exception("did not write complete block", 'write') offset += size num -= size
[docs] def read_raw(self, num=-1): "Read binary data from instrument" if self.link is None: self.open() read_len = self.max_recv_size if num > 0 and num < self.max_recv_size: read_len = num flags = 0 reason = 0 term_char = 0 if self.term_char is not None: flags = OP_FLAG_TERMCHAR_SET term_char = str(self.term_char).encode('utf-8')[0] read_data = bytearray() while reason & (RX_END | RX_CHR) == 0: error, reason, data = self.client.device_read(self.link, read_len, self._timeout_ms, self._lock_timeout_ms, flags, term_char) if error: raise Vxi11Exception(error, 'read') read_data.extend(data) if num > 0: num = num - len(data) if num <= 0: break if num < read_len: read_len = num return bytes(read_data)
[docs] def ask_raw(self, data, num=-1): "Write then read binary data" self.write_raw(data) return self.read_raw(num)
[docs] def write(self, message, encoding = 'utf-8'): "Write string to instrument" if type(message) is tuple or type(message) is list: # recursive call for a list of commands for message_i in message: self.write(message_i, encoding) return self.write_raw(str(message).encode(encoding))
[docs] def read(self, num=-1, encoding = 'utf-8'): "Read string from instrument" return self.read_raw(num).decode(encoding).rstrip('\r\n')
[docs] def ask(self, message, num=-1, encoding = 'utf-8'): "Write then read string" if type(message) is tuple or type(message) is list: # recursive call for a list of commands val = list() for message_i in message: val.append(self.ask(message_i, num, encoding)) return val self.write(message, encoding) return self.read(num, encoding)
[docs] def read_stb(self): "Read status byte" if self.link is None: self.open() flags = 0 error, stb = self.client.device_read_stb(self.link, flags, self._lock_timeout_ms, self._timeout_ms) if error: raise Vxi11Exception(error, 'read_stb') return stb
[docs] def trigger(self): "Send trigger command" if self.link is None: self.open() flags = 0 error = self.client.device_trigger(self.link, flags, self._lock_timeout_ms, self._timeout_ms) if error: raise Vxi11Exception(error, 'trigger')
[docs] def clear(self): "Send clear command" if self.link is None: self.open() flags = 0 error = self.client.device_clear(self.link, flags, self._lock_timeout_ms, self._timeout_ms) if error: raise Vxi11Exception(error, 'clear')
[docs] def remote(self): "Send remote command" if self.link is None: self.open() flags = 0 error = self.client.device_remote(self.link, flags, self._lock_timeout_ms, self._timeout_ms) if error: raise Vxi11Exception(error, 'remote')
[docs] def local(self): "Send local command" if self.link is None: self.open() flags = 0 error = self.client.device_local(self.link, flags, self._lock_timeout_ms, self._timeout_ms) if error: raise Vxi11Exception(error, 'local')
[docs] def lock(self): "Send lock command" if self.link is None: self.open() flags = 0 error = self.client.device_lock(self.link, flags, self._lock_timeout_ms) if error: raise Vxi11Exception(error, 'lock')
[docs] def unlock(self): "Send unlock command" if self.link is None: self.open() flags = 0 error = self.client.device_unlock(self.link) if error: raise Vxi11Exception(error, 'unlock')