"""
Collection of utility IO functions used in SASNet. Contains the read from disk
functions as well as the SQL generator.
"""
from __future__ import print_function
import ast
import itertools
import logging
import multiprocessing
import os
import re
import numpy as np
import psycopg2 as psql
from keras.utils.np_utils import to_categorical
from psycopg2 import sql
gpath = ""
gpattern = ""
try: # Python 3 compatibility
xrange(1)
nrange = xrange
except NameError:
nrange = range
[docs]def sql_dat_gen(dname, mname, dbname="sas_data", host="127.0.0.1",
user="sasnets", encoder=None):
"""
A Pythonic generator that gets its data from a PostgreSQL database. Yields a
(iq, diq) list and a label list.
:param dname: The data table name to connect to.
:param mname: The metadata table name to connect to.
:param dbname: The database name.
:param host: The database host.
:param user: The username to connect as.
:param encoder: LabelEncoder for transforming labels to categorical ints.
:return: None
"""
conn = psql.connect("dbname=" + dbname + " user=" + user + " host=" + host)
with conn:
with conn.cursor() as c:
c.execute("CREATE EXTENSION IF NOT EXISTS tsm_system_rows")
c.execute(
sql.SQL("SELECT * FROM {}").format(
sql.Identifier(mname)))
# x = np.asarray(c.fetchall())
# pprint(x)
while True:
c.execute(
sql.SQL(
"SELECT * FROM {} TABLESAMPLE SYSTEM_ROWS(5)").format(
sql.Identifier(dname)))
x = np.asarray(c.fetchall())
iq_list = x[:, 1]
diq = x[:, 2]
y_list = x[:, 3]
encoded = encoder.transform(y_list)
yt = np.asarray(to_categorical(encoded, 64))
q_list = np.asarray(
[np.transpose([np.log10(iq), np.log10(dq)]) for iq, dq in
zip(iq_list, diq)])
yield q_list, yt
conn.close()
# noinspection PyUnusedLocal
[docs]def read_parallel_1d(path, pattern='_eval_'):
"""
Reads all files in the folder path. Opens the files whose names match the
regex pattern. Returns lists of Q, I(Q), and ID. Path can be a
relative or absolute path. Uses Pool and map to speed up IO. WIP. Uses an
excessive amount of memory currently. It is recommended to use sequential on
systems with less than 16 GiB of memory.
Calling parallel on 69 150k line files, a gc, and parallel on 69 5k line
files takes around 70 seconds. Running sequential on both sets without a gc
takes around 562 seconds. Parallel peaks at 15 + GB of memory used with two
file reading threads. Sequential peaks at around 7 to 10 GB. Use at your own
risk. Be prepared to kill the threads and/or press the reset button.
Assumes files contain 1D data.
:param path: Path to the directory of files to read from.
:param pattern: A regex. Only files matching this regex are opened.
"""
global gpath
global gpattern
# q_list, iq_list, y_list = (list() for i in range(3))
# pattern = re.compile(pattern)
gpattern = pattern
gpath = path
nlines = 0
l = 0
fn = os.listdir(path)
chunked = [fn[i: i + 1] for i in nrange(0, len(fn), 1)]
pool = multiprocessing.Pool(multiprocessing.cpu_count() / 2,
maxtasksperchild=2)
result = np.asarray(
pool.map(read_h, chunked, chunksize=1))
pool.close()
pool.join()
logging.info("IO Done")
result = list(itertools.chain.from_iterable(result))
q_list = result[0::3]
iq_list = result[1::3]
y_list = result[2::3]
return q_list, iq_list, y_list, nlines # noinspection PyUnusedLocal
[docs]def read_h(l):
"""
Read helper for parallel read.
:param l: A list of filenames to read from.
:return: Three lists, Q, IQ, and Y, corresponding to Q data, I(Q) data, and model labels respectively.
"""
logging.info(os.getpid())
if l is None:
raise Exception("Empty args")
global gpath # Abuse globals because pool only passes one argument
global gpattern
q_list, iq_list, y_list = (list() for i in nrange(3))
p = re.compile(gpattern)
for fn in l:
if p.search(fn):
try:
with open(gpath + fn, 'r') as fd:
logging.info("Reading " + fn)
templ = ast.literal_eval(fd.readline().strip())
y_list.extend([templ[0] for i in nrange(templ[1])])
t2 = ast.literal_eval(fd.readline().strip())
q_list.extend([t2 for i in nrange(templ[1])])
iq_list.extend(ast.literal_eval(fd.readline().strip()))
except Exception as e:
logging.warning("skipped" + fn + ", " + str(e))
return q_list, iq_list, y_list
# noinspection PyCompatibility,PyUnusedLocal
[docs]def read_seq_1d(path, pattern='_eval_', typef='aggr', verbosity=False):
"""
Reads all files in the folder path. Opens the files whose names match the
regex pattern. Returns lists of Q, I(Q), and ID. Path can be a
relative or absolute path. Uses a single thread only. It is recommended to
use :meth:`read_parallel_1d`, except in hyperopt, where map() is broken.
typef is one of 'json' or 'aggr'. JSON mode reads in all and only json files
in the folder specified by path. aggr mode reads in aggregated data files.
See sasmodels/generate_sets.py for more about these formats.
Assumes files contain 1D data.
:param path: Path to the directory of files to read from.
:param pattern: A regex. Only files matching this regex are opened.
:param typef: Type of file to read (aggregate data or json data).
:param verbosity: Controls the verbosity of output.
"""
q_list, iq_list, y_list, = (list() for i in nrange(3))
# dq_list, iq_list, diq_list, y_list = (list() for i in nrange(5))
pattern = re.compile(pattern)
n = 0
nlines = None
if typef == 'json':
try:
from ruamel.yaml import safe_load
# ruamel has better json input processing.
except ImportError:
from json import loads as safe_load
for fn in os.listdir(path):
if pattern.search(fn): # Only open JSON files
with open(path + fn, 'r') as fd:
n += 1
data_d = safe_load(fd)
q_list.append(data_d['data']['Q'])
iq_list.append(data_d["data"]["I(Q)"])
y_list.append(data_d["model"])
if (n % 100 == 0) and verbosity:
print("Read " + str(n) + " files.")
if typef == 'aggr':
nlines = 0
for fn in sorted(os.listdir(path)):
if pattern.search(fn):
try:
with open(path + fn, 'r') as fd:
print("Reading " + fn)
templ = ast.literal_eval(fd.readline().strip())
y_list.extend([templ[0] for i in nrange(10000)])
t2 = ast.literal_eval(fd.readline().strip())
q_list.extend([t2 for i in nrange(10000)])
iq_list.extend(
ast.literal_eval(fd.readline().strip()))
# dqt = ast.literal_eval(fd.readline().strip())
# dq_list.extend([dqt for i in xrange(templ[1])])
# diqt = ast.literal_eval(fd.readline().strip())
# diq_list.extend([diqt for i in xrange(templ[1])])
nlines += 10000
if (n % 1000 == 0) and verbosity:
print("Read " + str(nlines) + " points.")
except Exception as e:
logging.warning("skipped " + fn + ", " + str(e))
# raise
else:
print(
"Error: the type " + typef + " was not recognised. Valid types "
"are 'aggr' and 'json'.")
return q_list, iq_list, y_list, nlines, # dq_list, diq_list, nlines
if __name__ == '__main__':
raise NotImplementedError("Cannot run sas_io as main. Import a specific "
"function instead.")