# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
The CDS Sesame service interface. This service provides basic information
about sources--most importantly, its position in the sky--given any of
their official names. One can resolve names into J2000 positions via the
functions object2pos() (returning R.A.-Dec. decimal tuples) and
object2sexapos() (returning positions formated into sexagesimal strings).
More metadata about the source is available via the resolve() function.
Full access to the Sesame service capabilities (documented at
http://cdsweb.u-strasbg.fr/doc/sesame.htx) is available via the SesameQuery
class. Sesame can consult three object databases: Simbad, NED, and Vizier;
Simbad is consulted by default.
The Sesame service is mirrored at multiple locations; the service
endpoints are listed in this module the ``endpoints`` dictionary where
the keys are short labels indicating the location. The default one
that will be used is given by the symbol ``default_endpoint``. The
function ``set_default_endpoint()`` will set the default endpoint given
its name.
"""
__all__ = [ "resolve", "object2pos", "object2sexapos", "set_default_endpoint",
"SesameQuery", "ObjectData" ]
import re
from urllib.request import urlopen
from urllib.parse import quote_plus
import xml.etree.ElementTree as ET
from xml.parsers.expat import ExpatError
from ..dal.query import DALQueryError, DALFormatError, DALServiceError
endpoints = { "cds": "http://cdsweb.u-strasbg.fr/cgi-bin/nph-sesame",
"cfa": "http://vizier.cfa.harvard.edu/viz-bin/nph-sesame" }
default_endpoint = endpoints["cds"]
[docs]def set_default_endpoint(name):
"""
set the endpoint for the sesame service that will be used by default
given a short label representing its location. Currently available
labels can be listed via ``endpoints.keys()``; these include "cds"
and "cfa".
"""
global default_endpoint
try:
default_endpoint = endpoints[name]
except KeyError:
raise LookupError("unrecognized sesame endpoint label: " + name)
[docs]def resolve(names, db="Simbad", include="", mirror=None):
"""
resolve one or more object names each to an :class:`.ObjectData`
instance containing metadata about the object.
Parameters
----------
names : str or list of str
either a single object name (as a string) or a list of
object names (as in a list of strings).
db : str
the object database to consult as a case-insennitive,
minimum match to one of ["Simbad", "NED", "Vizier"].
include : str
strextra data to include (if available) given either as a
string or list of stirngs. If a value is a string, it
will be split into a list of words, where each should
be a case-insensitive, minimum match to one of "aliases"
(additional identifiers that the object is known by) or
"fluxes" (flux magnitudes).
mirror : str
Choose the service mirror by a name that is one of
"cds" or "cfa". The default will be service
pointed to by the modeule attribute, default_endpoint.
(see also set_default_endpoint().)
Returns
-------
ObjectData
if a single name was provided, or a
list of ObjectData
if a list of names was given. See :class:`.ObjectData` for details
of the object's contents.
"""
baseurl = default_endpoint
if mirror:
try:
baseurl = endpoints[mirror]
except KeyError:
raise LookupError("unrecognized sesame mirror: " + mirror)
q = SesameQuery(baseurl)
q.useDatabases(db)
if not isinstance(include, list):
include = include.strip().split()
for inc in include:
opt = [i for i in "fluxes aliases".split() if i.startswith(inc.lower())]
if len(opt) > 1:
raise ValueError("Ambiguous include parameter value: " + inc)
if len(opt) == 0:
raise ValueError("Unrecognized include parameter value: " + inc)
if opt[0] == "fluxes":
q.fluxes = True
elif opt[0] == "aliases":
q.aliases = True
objs = names
if not isinstance(objs, list):
objs = [objs]
q.names = objs
targets = q.execute()
out = []
for t in targets:
out.append(t.responses[0])
if isinstance(names, list):
return out
return out[0]
[docs]def object2pos(names, db="Simbad", mirror=None):
"""
resolve one or more object names each to a position.
Parameters
----------
names : str
either a single object name (as a string) or a list of object names
(as in a list of strings).
db : str
the object database to consult as a case-insennitive,
minimum match to one of ["Simbad", "NED", "Vizier"].
mirror : str
Choose the service mirror by a name that is one of
"cds" or "cfa". The default will be service
pointed to by the modeule attribute, default_endpoint.
(see also set_default_endpoint().)
Returns
-------
tuple
2-element floating point position if a single name was provided
list of tuples
if a list of names was given
"""
targetdata = resolve(names, db, mirror=mirror)
if isinstance(targetdata, list):
return [t.pos for t in targetdata]
else:
return targetdata.pos
[docs]def object2sexapos(names, db="Simbad", mirror=None):
"""
resolve one or more object names each to a sesagesimal-formatted
position.
Parameters
----------
names : str or list of str
either a single object name (as a string) or a list of
object names (as in a list of strings).
db : str
the object database to consult as a case-insennitive,
minimum match to one of ["Simbad", "NED", "Vizier"].
mirror : str
Choose the service mirror by a name that is one of
"cds" or "cfa". The default will be service
pointed to by the modeule attribute, default_endpoint.
(see also set_default_endpoint().)
Returns
-------
tuple
2-element floating point position if a single name was provided
list of tuples
if a list of names was given
"""
targetdata = resolve(names, db, mirror=mirror)
if isinstance(targetdata, list):
return [t.sexapos for t in targetdata]
else:
return targetdata.sexapos
[docs]class SesameQuery(object):
"""
a class for preparing a query to a sesame service. Query constraints
are added via properties. The execute() function will submit the query
and return the results.
The base URL for the query can be changed via the baseurl property.
"""
database_codes = { "simbad": "S", "vizier": "V", "ned": "N", "all": "A" }
def __init__(self, baseurl=None):
"""
initialize the query object with a baseurl
Parameters
----------
baseurl : str
the service endpoint. If None, the value of the
module attribute, default_endpoint will be used.
(see also set_default_endpoint().)
"""
if not baseurl:
baseurl = default_endpoint
self._baseurl = baseurl
self._dbs = ""
self._opts = ""
self._names = []
@property
def baseurl(self):
"""
the base URL that this query will be sent to when one of the
execute functions is called.
"""
return self._baseurl
@baseurl.setter
def baseurl(self, val):
self._baseurl = val
@property
def dbs(self):
"""
the database selection argument. This is a sequence of any of the
following characters, indicating which databases to query:
= ================
S Simbad
V Vizier
N NED
A All of the above
= ================
Without ``A`` included, only the result from the database returning
a matched result will be returned. A value preceded by a '~'
requests that the result cache be ignored.
No syntax checking is done on this value upon setting (though it is
done via getqueryurl when lax=false); consider using useDatabases().
"""
return self._dbs
@dbs.setter
def dbs(self, val):
if not isinstance(val, str):
raise TypeError("dbs must be of type str; given " + type(val))
self._dbs = val
@dbs.deleter
def dbs(self):
self._dbs = ""
@property
def opts(self):
"""
the options that control the content and format of the output.
"""
return self._opts
@opts.setter
def opts(self, val):
if val.startswith("-o"):
val = val[2:]
self._opts = val
@opts.deleter
def opts(self):
self._opts = ""
@property
def ignorecache(self):
"""
boolean indicating whether the database caches will be ignored when
retrieving results. If true, the databases will queried directly;
otherwise, the cache will be consulted first.
"""
return '~' in self._dbs
@ignorecache.setter
def ignorecache(self, tf):
if isinstance(tf, int):
tf = bool(tf)
if not isinstance(tf, bool):
raise TypeError("ignorecache requires bool or int, got: " +
type(tf))
if '~' in self._dbs:
if not tf:
self._dbs = [c for c in self._dbs if c != '~']
elif tf:
self._dbs = '~' + self._dbs
@property
def aliases(self):
"""
a boolean indicating whether to return all known identifiers for
the resolved source. If false, only the main designation will be
returned.
"""
return 'I' in self._opts
@aliases.setter
def aliases(self, tf):
if isinstance(tf, int):
tf = bool(tf)
if not isinstance(tf, bool):
raise TypeError("aliases requires bool or int, got: " +
type(tf))
if 'I' in self._opts:
if not tf:
self._opts = [c for c in self._opts if c != 'I']
elif tf:
self._opts += 'I'
@property
def fluxes(self):
"""
a boolean indicating whether to return all known identifiers for
the resolved source. If false, only the main designation will be
returned.
"""
return self._fluxes
@fluxes.setter
def fluxes(self, tf):
if isinstance(tf, int):
tf = bool(tf)
if not isinstance(tf, bool):
raise TypeError("fluxes requires bool or int, got: " +
type(tf))
if 'F' in self._opts:
if not tf:
self._opts = [c for c in self._opts if c != 'F']
elif tf:
self._opts += 'F'
[docs] def useDatabases(self, *args):
"""
use the given databases to resolve the names. The arguments are
database names that are case-insensitive, minimum matches to any of
``["Simbad", "NED", "Vizier", "all"]``. The order indicates the
order that the databases will be checked. Unless "all" is included,
Only the result from the first database returning a positive result
will be returned by the query.
"""
bad = []
use = []
for arg in args:
abr = arg.lower()
db = [d for d in list(self.database_codes.keys()) if d.startswith(abr)]
if len(db) != 1:
bad.append(arg)
elif db[0] not in use:
use.append(db[0])
if len(bad) > 0:
raise ValueError("Unrecognized or ambiguous database name(s): " +
str(bad))
self._dbs = "".join([self.database_codes[d] for d in use])
[docs] def useDefaultDatabase(self):
"""
clear any previously set database selection so as to use the
default database (Simbad) to resolve the targets.
"""
self._dbs = ""
@property
def names(self):
"""
the list of the object names to resolve
"""
return self._names
@names.setter
def names(self, names):
if not isinstance(names, list):
names = [names]
self._names = names[:]
[docs] def getqueryurl(self, lax=False, format=None, astext=False):
"""
return the GET URL that encodes the current query. This is the
URL that the execute functions will use if called next.
Parameters
----------
lax : bool
if False (default), a DALQueryError exception will be
raised if the current set of parameters cannot be
used to form a legal query. This implementation does
no syntax checking; thus, this argument is ignored.
format : str
a format code for the return results, overriding the
default XML format. The value should be one or "x",
"x4", "x2", "t". The first three are different
versions of the XML formats, and "pc" is the default
percent-code format.
astext : bool
request results be returned with a MIME-type of
text/plain", regardless of the format.
Raises
------
DALQueryError
when lax=False, for errors in the input query syntax
"""
if not lax:
bad = [c for c in set(self._dbs) if c != '~' and
c not in set(self.database_codes.values())]
if len(bad) > 0:
raise DALQueryError(("database selection, {0}, includes " +
"unrecognized databases: {1}.").format(
self._dbs, str(tuple(bad))))
bad = [c for c in set(self._opts) if c not in set("IF")]
if len(bad) > 0:
raise DALQueryError(("options, {0}, includes " +
"unrecognized items: {0}.").format(
self._opts, str(tuple(bad))))
if format and format not in "x x2 x4 pc".split():
raise DALQueryError("unrecognized format: " + format)
if not self._names:
raise DALQueryError("No source names provided")
out = self._baseurl
opts = "/-o"
if not format:
opts += "x"
elif format and format != "pc":
opts += format
if astext:
opts += 'p'
if self._opts:
opts += self._opts
if len(opts) > 3:
out += opts
if self._dbs:
out += '/' + self._dbs
out += "?" + "&".join([quote_plus(n) for n in self._names])
return out
[docs] def execute_stream(self, format=None, astext=False, lax=False):
"""
submit the query and return the raw file stream
Parameters
----------
format : str
a format code for the return results, overriding the
default XML format. The value should be one or "x",
"x4", "x2", "t". The first three are different
versions of the XML formats, and "pc" is the default
percent-code format.
astext : bool
request results be returned with a MIME-type of
"text/plain", regardless of the format.
lax : bool
if False (default), a DALQueryError exception will be
raised if the current set of parameters cannot be
used to form a legal query. This implementation does
no syntax checking; thus, this argument is ignored.
Raises
------
DALServiceError
for errors connecting to or communicating with the service
DALQueryError
for errors in the input query syntax
"""
try:
url = self.getqueryurl(lax, format, astext)
return urlopen(url)
except IOError as ex:
raise DALServiceError.from_except(ex, url)
[docs] def execute(self):
"""
execute the query and return a list Target instances, one for
each requested target.
Raises
------
DALServiceError
for errors connecting to or communicating with the service
DALQueryError
for errors in the input query syntax
DALFormatError
if the XML response is corrupted
"""
resp = self.execute_stream(lax=False)
out = []
try:
root = ET.parse(resp).getroot()
if root.tag != "Sesame":
raise DALServiceError("Unexpected output: " + ET.dump(root))
for tel in root.findall('Target'):
out.append(Target(tel))
except ExpatError as e:
raise DALFormatError(e)
if len(out) == 0:
raise DALServiceError("No targets resolved")
return out
class Target(object):
"""
a result from the name resolver
"""
def __init__(self, etreeEl):
"""
Wrap sn XML Target element
"""
self._data = etreeEl
resolves = []
self._lookup = {}
for resolve in self._data.findall("Resolver"):
resolves.append(ObjectData(resolve))
keys = self._parse_resolver_name(resolves[-1].resolver_name)
self._lookup[keys[1].lower()] = resolves[-1]
self._responses = tuple(resolves)
_res_name_pat = re.compile(r'^(\w+)=(\w+)')
def _parse_resolver_name(self, label):
m = self._res_name_pat.match(label)
if m:
return (m.group(1), m.group(2))
else:
return (label[0], label)
@property
def dbcodes(self):
"""
the database option codes that were requested in the sesame query
"""
return self._data.attrib.get("option")
@property
def name(self):
"""
the name of the target that was resolved
"""
out = self._data.find("name")
if out is not None: out = out.text
return out
@property
def responses(self):
"""
the tuple of responses from each of the object databases. Unless
multiple databases were requested (e.g. via SesameQuery.useDatabases())
this tuple will have only one element.
"""
return self._responses
@property
def resolved(self):
"""
a boolean indicating whether the source name was successfully resolved.
That is, this will be True if at least one database returned a
successful response
"""
return any([r.success for r in self._responses])
def according_to(self, dbname):
"""
return the object data from a particular resolver given by the
case-insensitive, minimum match to one of "Simbad", "NED", and
"Vizier". None is returned if no response from the database is
available.
"""
dbn = dbname.lower()
db = [d for d in list(self._lookup.keys()) if d.lower().startswith(dbn)]
if len(db) == 0: return None
if len(db) > 1:
raise LookupError("Ambiguous database name: " + dbname)
return self._lookup[db[0]]
[docs]class ObjectData(object):
"""
a container for the target metadata returned from a resolver. The
success attribute will be true if the resolver successfully matched the
target name and returned metadata for it.
The metadata that gets returned will depend on the resolver, the type of
object (and what is known about it), and the input options given in
the sesame query. The full set of possible metadata is given by the
class attribute "metadata", a dictionary where the keys are the metadata
names and each value is a short definition of the corresponding metadatum.
A ObjectData instance follows dictionary semantics--i.e. metadata
can be accessed via the bracket operator ([]) or the get() function. The
key() function returns the metadata names that are present. For most of
the metadata, the value will be either a string or a list of strings if
more than one value is available (e.g. the alias metadatum). Exceptions
are "Vel", "z", "mag", and "plx", which will be of a DocQuantity type,
and "pm", which will be of a VecQuantity type.
Some important metadata are made available as attributes. This includes
"pos", the decimal J2000 position converted to a 2-elment tuple of floats.
It also includes "sexapos", the sexagesimal-formatted J2000 position (as
a single string), and "oname", the primary name for the target. If aliases
were requested, the "aliases" attribute will contain the list of names
the object is also known as.
"""
metadata = {"INFO": "status message from resolver",
"ERROR": "error message",
"oid": "database-internal object identifier",
"otype": "object type code",
"jpos": "sexagesimal-formatted J2000 position",
"jradeg": "J2000 decimal right ascension",
"jdedeg": "J2000 decimal declination",
"refPos": "bibcode of reference defining the position",
"errRAmas": "milliarcsecond positional error in right ascension",
"errDEmas": "milliarcsecond positional error in declination",
"pm": "proper motion",
"MType": "galaxy classification code",
"spType": "",
"spNum": "",
"Vel": "recessional velocity",
"z": "redshift",
"mag": "",
"plx": "paralax",
"oname": "primary name within the resolver database",
"alias": "secondary name",
"nrefs": "number of literature references consulted for target"
}
_qtype_md = "Vel z mag plx".split()
_pmtype_md = "pm".split()
def __init__(self, etreeEl):
"""
Wrap sn XML Target element
"""
self._data = etreeEl
@property
def resolver_name(self):
"""
the name of the resolver that produced this information
"""
return self._data.attrib.get("name")
def _loadINFOinfo(self):
for info in self._data.findall('INFO'):
if info.text == 'from cache':
self._fromcache = True
elif info.text == 'Zero (0) answers' or \
'Nothing found' in info.text:
self._success = False
if not hasattr(self, '_fromcache'):
self._fromcache = False
if not hasattr(self, '_success'):
self._success = True
@property
def fromcache(self):
"""
a boolean indicating as to whether this represents cached information
"""
if not hasattr(self, '_fromcache'):
self._loadINFOinfo()
return self._fromcache
@property
def success(self):
"""
a boolean whether the name was successfully matched (i.e. resolved).
False indicates that the source name is not found in the resolver's
database.
"""
if not hasattr(self, '_success'):
self._loadINFOinfo()
if self._success:
self._success = self.get("ERROR", self._success)
return self._success
[docs] def get(self, name, defval=None):
"""
return the target metadata with the given name. The result will
either be a string or a list of strings, depending on whether
multiple values were returned with that name. If the name is "alias",
the response will always be a list.
The possible names that can be returned are
"""
out = []
if name in self._qtype_md:
for el in self._data.findall(name):
out.append(DocQuantity(el))
elif name in self._pmtype_md:
for el in self._data.findall(name):
out.append(ProperMotion(el))
else:
for el in self._data.findall(name):
out.append(el.text.strip())
if len(out) == 0:
return defval
if len(out) == 1 and name != 'alias':
out = out[0]
return out
[docs] def keys(self):
"""
return the names of the target metadata that are available from
this resolver
"""
out = set()
for child in self._data.getchildren():
out.add(child.tag)
return list(out)
def __getitem__(self, name):
return self.get(name)
@property
def oname(self):
return self.get("oname")
[docs] def getpos(self):
"""
return the decimal J2000 position as a 2-element tuple giving
right ascension and declination. If None, a position
was not returned.
Raises
------
DALFormatError
if the position data is incomplete or otherwise contains a
formatting error
"""
ra = self.get("jradeg")
dec = self.get("jdedeg")
if ra is None and dec is None:
return None
if ra is None:
raise DALFormatError("Missing RA value (jradeg)")
if dec is None:
raise DALFormatError("Missing Dec value (jdedeg)")
try:
return (float(ra), float(dec))
except ValueError:
raise DALFormatError("Non-float given in ({0}, {1})".format(ra, dec))
@property
def pos(self):
"""
the decimal J2000 position as a 2-element tuple giving
right ascension and declination. If None, a valid position
was not returned from the resolver. This differs form getpos()
in that accessing will not raise an exception.
"""
try:
return self.getpos()
except Exception:
return None
@property
def sexapos(self):
"""
the sexagismal formatted position returned by the resolver
"""
return self.get("jpos")
@property
def aliases(self):
"""
the list of other names the object is known as. This will be an
empty list if none were returned
"""
return self.get("alias", [])
class DocQuantity(object):
"""
a documented quantity made up of a value and unit, as well as optionally
an error, quality flag, and bibcode reference. If the optional values are
not available, the attribute value will be None.
Attributes
----------
val : float
the decimal value in the units given by unit
unit : str
the string unit
error : float
the decimal error in the units given by unit
qual : str
a quality code
ref : str
a bibcode indicating the literature reference documenting this quantity
"""
_unit_by_name = { "pm": "mas/yr", "Vel": "km/s", "z": "", "mag": "",
"plx": "mas" }
def __init__(self, etreeEl):
d = []
item = None
for tag in "veqr":
item = etreeEl.find(tag)
if item is not None:
item = item.text.strip()
d.append(item)
if d[0] is None:
raise DALFormatError("{0}: Missing quantity value".format(
etreeEl.tag))
try:
self.val = float(d[0])
except ValueError:
raise DALFormatError("{0}: non-decimal value: {1}".format(
etreeEl.tag, d[0]))
self.error = d[1]
if self.error is not None:
try:
self.error = float(d[1])
except ValueError:
raise DALFormatError("{0}: non-decimal error: {0}".format(
etreeEl.tag, d[1]))
self.unit = self._unit_by_name.get(etreeEl.tag)
self.qual = d[2]
self.ref = d[3]
def __str__(self):
return self.to_string(True)
def to_string(self, showerr=False):
"""
convert the quantity to a string, showing the value, unit, and
optionally the error
Parameters
----------
showerr : bool
if True, the error value will be included; the form
will be "val +/- error unit". If False, the error
will be excluded, as in "val unit".
"""
out = "{0}".format(self.val)
if showerr and self.error:
out += " +/- {0}".format(self.error)
if self.unit:
out += " {0}".format(self.unit)
return out
def __repr__(self):
return "quant({0}, {1}, {2}, {3}, {4})".format(
self.val, self.unit, self.error, self.qual, self.ref)
class ProperMotion(DocQuantity):
"""
a documented proper motion quantity made up of a vector magnitude, unit,
vector position angle, a vector component along right ascension, and a
vector component along declination. It cann also optionally include
an error in the magnitude, errors along right ascension and declination,
a quality flag, and a bibcode reference. If the optional values are
not available, the attribute value will be None.
Properties
----------
val : float
the decimal vector magnitude in the units given by unit
unit : str
the string unit
error : float
the decimal error in the magnitude in the units given by unit
qual : str
a quality code
ref : str
a bibcode indicating the literature reference documenting this quantity
pa : float
the vector position angle
pmRA : float
the vector component along right ascension
pmDE : float
the vector component along declination
epmRA : float
the error in the vector component along right ascension
epmDE : float
the error in the vector component along declination
"""
def __init__(self, etreeEl):
super(ProperMotion, self).__init__(etreeEl)
d = []
item = None
for tag in "pa pmRA pmDE epmRA epmDE".split():
item = etreeEl.find(tag)
if item is not None:
try:
item = float(item.text.strip())
except:
raise DALFormatError("{0}: non-decimal {0}: {1}".format(
etreeEl.tag, tag, item))
elif tag in ["pm", "pmRA", "pmDE"]:
raise DALFormatError("{0}: Missing {1}".format(etreeEl.tag, tag))
d.append(item)
self.pa = d[0]
self.val_ra = d[1]
self.val_dec = d[2]
self.error_ra = d[3]
self.error_dec = d[4]
def __repr__(self):
return "pm({0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9})".format(
self.val, self.unit, self.error, self.qual, self.ref,
self.pa, self.val_ra, self.val_dec, self.error_ra, self.error_dec)