## -- Engine class -- ##
import json
import time
import fastscore.errors as errors
from tabulate import tabulate
from binascii import b2a_hex
from os import urandom
from .instance import InstanceBase
from ..constants import MODEL_CONTENT_TYPES, ATTACHMENT_CONTENT_TYPES, \
SCHEMA_CONTENT_TYPE, POLICY_CONTENT_TYPES
from fastscore.v1 import EngineApi
from fastscore.v2 import EngineApi as EngineApi2
from fastscore.v1.rest import ApiException
from fastscore.v2.rest import ApiException as ApiException2
from fastscore import FastScoreError
import six
from fastscore.v2.models import ActiveModelInfo
## patch ActiveModelInfo generated by Swagger
def _unload(self):
self._eng.unload_model()
ActiveModelInfo.unload = _unload
from fastscore.v2.models import ActiveStreamInfo
## patch ActiveStreamInfo generated by Swagger
def _detach(self):
self._eng.detach_stream(self.slot)
ActiveStreamInfo.detach = _detach
# patch engine.output
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
[docs]class Engine(InstanceBase):
"""
An Engine instance.
"""
# Maximum size for an inline attachment.
MAX_INLINE_ATTACHMENT = 1024*1024
def __init__(self, name):
"""
Constructor for the Engine class.
Generally, this is not intended to be constructed 'by hand'. Instead,
Engine instances should be retrieved from Connect.
:param name: A name for this instance.
"""
super(Engine, self).__init__(name, 'engine', EngineApi(), EngineApi2())
self._active_model = None
self._active_streams = None
self._policy = Engine.PolicyProxy(self)
@property
def state(self):
"""
The current state of the engine.
:returns: 'INIT', 'RUNNING', 'PAUSED', 'FINISHING', or 'FINISHED'.
"""
try:
return self.swg2.engine_state_get(self.name).state.upper()
except Exception as e:
raise FastScoreError("Unable to determine engine state", caused_by=e)
@property
def active_model(self):
"""
The currently loaded model information.
>>> mm = connect.lookup('model-manage')
>>> engine = connect.lookup('engine')
>>> print stream.active_model.name
>>> print stream.active_model.jets
:returns: An ActiveModelInfo object.
"""
if self._active_model == None:
try:
self._active_model = self.swg2.active_model_get(self.name)
self._active_model._eng = self
except Exception as e:
if isinstance(e, ApiException2) and e.status == 404:
return None
else:
raise FastScoreError("Unable to retrieve active model", caused_by=e)
return self._active_model
@property
def active_streams(self):
"""
A collection of active streams indexed by a slot.
>>> mm = connect.lookup('model-manage')
>>> engine = connect.lookup('engine')
>>> print stream.active_streams[1]
"""
if self._active_streams == None:
try:
d = {}
for x in self.swg2.active_stream_list(self.name):
x._eng = self # needed for detach()
d[x.slot] = x
self._active_streams = d
except Exception as e:
raise FastScoreError("Unable to retrieve active streams", caused_by=e)
return self._active_streams
def clear(self):
self._active_model = None
self._active_streams = None
[docs] def pause(self):
"""
Pauses the engine. The result depends on the current state of the
engine. A running engine changes its state to PAUSED. An initializing
engine will pause upon startup. In all other states the operation is
ignored.
"""
try:
self.swg2.engine_pause(self.name)
except Exception as e:
raise FastScoreError("Unable to pause the engine", caused_by=e)
[docs] def unpause(self):
"""
Unpauses the engine.
"""
try:
self.swg2.engine_unpause(self.name)
except Exception as e:
raise FastScoreError("Unable to unpause the engine", caused_by=e)
[docs] def reset(self):
"""
Resets the engine. A loaded model is unloaded. All open streams are
closed. The engine changes its state to INIT.
"""
try:
self.swg2.engine_reset(self.name)
except Exception as e:
raise FastScoreError("Unable to reset the engine", caused_by=e)
[docs] def output(self, slot):
"""
Reads data from the REST stream attached to the slot.
:param slot: The stream slot.
"""
# workaround for Swagger encoding bug
# Should be removed if swagger-codegen is fixed.
params = {
'host': self.swg.api_client.host,
'instance': self.name,
'slot' : slot
}
path = "{host}/{instance}/1/job/output/{slot}".format(**params)
r = requests.get(path, verify=False)
data, status = r.content, r.status_code
# swagger-codegen way
# try:
# (data,status,_) = self.swg.job_io_output_with_http_info(self.name, slot)
# except Exception as e:
# raise FastScoreError("Stream read error", caused_by=e)
if status == 202:
return None
elif status == 204 or data == '':
raise EOFError
return data
class PolicyProxy(object):
def __init__(self, eng):
self._eng = eng
def set(self, title, mtype, text, preinstall=False):
if title != 'import':
raise FastScoreError("Only 'import' policy is currently supported")
if not mtype in POLICY_CONTENT_TYPES:
raise FastScoreError("Model type '%s' not recognized" % mtype)
try:
ct = POLICY_CONTENT_TYPES[mtype]
self._eng.swg.policy_put(self._eng.name, text,
preinstall=preinstall, content_type=ct)
except Exception as e:
raise FastScoreError("Unable to upload policy", caused_by=e)
def get(self, title, mtype):
if title != 'import':
raise FastScoreError("Only 'import' policy is currently supported")
if not mtype in POLICY_CONTENT_TYPES:
raise FastScoreError("Model type '%s' not recognized" % mtype)
try:
ct = POLICY_CONTENT_TYPES[mtype]
return self._eng.swg.policy_get(self._eng.name, accept=ct)
except Exception as e:
raise FastScoreError("Unable to retrieve policy", caused_by=e)
@property
def policy(self):
"""
Set/get the import policy.
>>> engine = connect.lookup('engine-1')
>>> engine.policy.set('import', 'python', text)
>>> print engine.policy.get('import', 'python')
"""
return self._policy
[docs] def load_model(self, model, force_inline=False, embedded_schemas={}, dry_run=False):
"""
Load a model into this engine.
:param model: A Model object.
:param force_inline: If True, force all attachments to load inline. If False,
attachments may be loaded by reference.
:param embedded_schemas: A dict of schemas to send with the request to stop
the Engine from contacting Model Manage when resolving schemas.
:param dry_run: If True, do not actually load the model, check for
errors only.
"""
self._active_model = None # force reload
self._active_streams = None
def maybe_externalize(att):
ctype = ATTACHMENT_CONTENT_TYPES[att.atype]
if six.PY3 or (att.datasize > Engine.MAX_INLINE_ATTACHMENT and not force_inline):
## See https://opendatagoup.atlassian.net/wiki/display/FAS/Working+with+large+attachments
##
## An example of an externalized attachment:
##
## Content-Type: message/external-body; access-type=x-model-manage; name="att1.zip"
## Content-Disposition: attachment; filename="att1.zip"
##
## Content-Type: application/zip
## Content-Length: 1234
##
ext_type = 'message/external-body; ' + \
'access-type=x-model-manage; ' + \
'ref="urn:fastscore:attachment:%s:%s"' % (model.name,att.name)
body = 'Content-Type: %s\r\n' % ctype + \
'Content-Length: %d\r\n' % att.datasize + \
'\r\n'
return (att.name,body,ext_type)
else:
## data retrieved when you touch .datafile property
with open(att.datafile) as f:
body = f.read()
return (att.name,body,ctype)
def quirk(name):
return 'filename' if name == 'attachment' else 'name'
def multipart_body(parts, boundary):
noodle = \
[ '\r\n--' + boundary + '\r\n' + \
'Content-Disposition: %s; %s="%s"\r\n' % (tag,quirk(tag),name) + \
'Content-Type: %s\r\n' % ctype + \
'\r\n' + \
body
for tag,(name,body,ctype) in parts ]
noodle.append('\r\n--' + boundary + '--\r\n')
return ''.join(noodle)
try:
ct = MODEL_CONTENT_TYPES[model.mtype]
attachments = list(model.attachments)
if len(attachments) == 0 and len(embedded_schemas) == 0:
data = model.source
cd = 'x-model; name="%s"' % model.name
return self.swg.model_load(self.name,
data,
dry_run=dry_run,
content_type=ct,
content_disposition=cd)
else:
## Swagger 2.0 does allow complex multipart requests - craft it manually.
parts = [ ('attachment',maybe_externalize(x))
for x in attachments ] + \
[ ('x-schema',(name,schema,SCHEMA_CONTENT_TYPE))
for name,schema in embedded_schemas.items() ]
parts.append( ('x-model',(model.name,model.source,ct)) )
boundary = b2a_hex(urandom(12))
if six.PY3:
boundary = boundary.decode('utf-8')
data = multipart_body(parts, boundary)
return self.swg.model_load(self.name,
data,
dry_run=dry_run,
content_type='multipart/mixed; boundary=' + boundary)
except Exception as e:
raise FastScoreError("Unable to load model '%s'" % model.name, caused_by=e)
[docs] def scale(self, factor):
"""
Changes the number of running model instances.
"""
try:
self.swg2.active_model_scale(self.name, factor)
except Exception as e:
raise FastScoreError("Unable to scale model", caused_by=e)
def unload_model(self):
try:
self.swg2.active_model_delete(self.name)
except Exception as e:
if isinstance(e, ApiException2) and e.status == 404:
raise FastScoreError("Model not loaded")
else:
raise FastScoreError("Unable to unload model", caused_by=e)
def attach_stream(self, stream, slot, dry_run=False):
try:
cd = 'x-stream; name="%s"' % stream.name
return self.swg2.active_stream_attach(self.name,
stream.desc, slot,
dry_run=dry_run,
content_disposition=cd)
except Exception as e:
raise FastScoreError("Unable to attach stream", caused_by=e)
def detach_stream(self, slot):
try:
self.swg2.active_stream_detach(self.name, slot)
except Exception as e:
raise FastScoreError("Unable to detach stream", caused_by=e)
def sample_stream(self, stream, n):
try:
if n:
return self.swg.stream_sample(self.name, stream.desc, n=n)
else:
return self.swg.stream_sample(self.name, stream.desc)
except Exception as e:
raise FastScoreError("Unable to sample stream", caused_by=e)
def verify_schema(self, schema):
try:
reply = self.swg2.active_schema_verify(self.name, schema.source)
return reply.id
except ApiException2 as e:
raise FastScoreError(e.body)
[docs] def verify_data(self, sid, rec):
"""
Verify schema against a prepared schema.
:param int sid: The schema id.
:param str rec: The data record to verify.
"""
try:
self.swg2.active_schema_verify_data(self.name, sid, {'data': rec})
except ApiException2 as e:
raise FastScoreError(e.body)
# What a clumsy name
def unverify_schema(self, sid):
try:
self.swg2.active_schema_unverify(self.name, sid)
except Exception as e:
raise FastScoreError("Unable to unload a verification schema", caused_by=e)
def restore_state(self, blobfile):
try:
self.swg.job_state_restore(self.name, blobfile)
except Exception as e:
raise FastScoreError("Unable to restore model state", caused_by=e)