Source code for fastscore.suite.engine

## -- Engine class -- ##
import json
import time
import fastscore.errors as errors
from tabulate import tabulate

from binascii import b2a_hex
from os import urandom

from .instance import InstanceBase
from ..constants import MODEL_CONTENT_TYPES, ATTACHMENT_CONTENT_TYPES, \
                        SCHEMA_CONTENT_TYPE, POLICY_CONTENT_TYPES

from fastscore.v1 import EngineApi
from fastscore.v2 import EngineApi as EngineApi2
from fastscore.v1.rest import ApiException
from fastscore.v2.rest import ApiException as ApiException2
from fastscore import FastScoreError

import six

from fastscore.v2.models import ActiveModelInfo
## patch ActiveModelInfo generated by Swagger
def _unload(self):
    self._eng.unload_model()
ActiveModelInfo.unload = _unload

from fastscore.v2.models import ActiveStreamInfo
## patch ActiveStreamInfo generated by Swagger
def _detach(self):
    self._eng.detach_stream(self.slot)
ActiveStreamInfo.detach = _detach

# patch engine.output
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

[docs]class Engine(InstanceBase): """ An Engine instance. """ # Maximum size for an inline attachment. MAX_INLINE_ATTACHMENT = 1024*1024 def __init__(self, name): """ Constructor for the Engine class. Generally, this is not intended to be constructed 'by hand'. Instead, Engine instances should be retrieved from Connect. :param name: A name for this instance. """ super(Engine, self).__init__(name, 'engine', EngineApi(), EngineApi2()) self._active_model = None self._active_streams = None self._policy = Engine.PolicyProxy(self) @property def state(self): """ The current state of the engine. :returns: 'INIT', 'RUNNING', 'PAUSED', 'FINISHING', or 'FINISHED'. """ try: return self.swg2.engine_state_get(self.name).state.upper() except Exception as e: raise FastScoreError("Unable to determine engine state", caused_by=e) @property def active_model(self): """ The currently loaded model information. >>> mm = connect.lookup('model-manage') >>> engine = connect.lookup('engine') >>> print stream.active_model.name >>> print stream.active_model.jets :returns: An ActiveModelInfo object. """ if self._active_model == None: try: self._active_model = self.swg2.active_model_get(self.name) self._active_model._eng = self except Exception as e: if isinstance(e, ApiException2) and e.status == 404: return None else: raise FastScoreError("Unable to retrieve active model", caused_by=e) return self._active_model @property def active_streams(self): """ A collection of active streams indexed by a slot. >>> mm = connect.lookup('model-manage') >>> engine = connect.lookup('engine') >>> print stream.active_streams[1] """ if self._active_streams == None: try: d = {} for x in self.swg2.active_stream_list(self.name): x._eng = self # needed for detach() d[x.slot] = x self._active_streams = d except Exception as e: raise FastScoreError("Unable to retrieve active streams", caused_by=e) return self._active_streams def clear(self): self._active_model = None self._active_streams = None
[docs] def pause(self): """ Pauses the engine. The result depends on the current state of the engine. A running engine changes its state to PAUSED. An initializing engine will pause upon startup. In all other states the operation is ignored. """ try: self.swg2.engine_pause(self.name) except Exception as e: raise FastScoreError("Unable to pause the engine", caused_by=e)
[docs] def unpause(self): """ Unpauses the engine. """ try: self.swg2.engine_unpause(self.name) except Exception as e: raise FastScoreError("Unable to unpause the engine", caused_by=e)
[docs] def reset(self): """ Resets the engine. A loaded model is unloaded. All open streams are closed. The engine changes its state to INIT. """ try: self.swg2.engine_reset(self.name) except Exception as e: raise FastScoreError("Unable to reset the engine", caused_by=e)
[docs] def input(self, data, slot): """ Write data to a REST stream attached to the slot. :param data: The data to write to the stream. :param slot: The stream slot. """ try: self.swg.job_io_input(self.name, data, slot) except Exception as e: raise FastScoreError("Stream write error", caused_by=e)
[docs] def output(self, slot): """ Reads data from the REST stream attached to the slot. :param slot: The stream slot. """ # workaround for Swagger encoding bug # Should be removed if swagger-codegen is fixed. params = { 'host': self.swg.api_client.host, 'instance': self.name, 'slot' : slot } path = "{host}/{instance}/1/job/output/{slot}".format(**params) r = requests.get(path, verify=False) data, status = r.content, r.status_code # swagger-codegen way # try: # (data,status,_) = self.swg.job_io_output_with_http_info(self.name, slot) # except Exception as e: # raise FastScoreError("Stream read error", caused_by=e) if status == 202: return None elif status == 204 or data == '': raise EOFError return data
class PolicyProxy(object): def __init__(self, eng): self._eng = eng def set(self, title, mtype, text, preinstall=False): if title != 'import': raise FastScoreError("Only 'import' policy is currently supported") if not mtype in POLICY_CONTENT_TYPES: raise FastScoreError("Model type '%s' not recognized" % mtype) try: ct = POLICY_CONTENT_TYPES[mtype] self._eng.swg.policy_put(self._eng.name, text, preinstall=preinstall, content_type=ct) except Exception as e: raise FastScoreError("Unable to upload policy", caused_by=e) def get(self, title, mtype): if title != 'import': raise FastScoreError("Only 'import' policy is currently supported") if not mtype in POLICY_CONTENT_TYPES: raise FastScoreError("Model type '%s' not recognized" % mtype) try: ct = POLICY_CONTENT_TYPES[mtype] return self._eng.swg.policy_get(self._eng.name, accept=ct) except Exception as e: raise FastScoreError("Unable to retrieve policy", caused_by=e) @property def policy(self): """ Set/get the import policy. >>> engine = connect.lookup('engine-1') >>> engine.policy.set('import', 'python', text) >>> print engine.policy.get('import', 'python') """ return self._policy
[docs] def load_model(self, model, force_inline=False, embedded_schemas={}, dry_run=False): """ Load a model into this engine. :param model: A Model object. :param force_inline: If True, force all attachments to load inline. If False, attachments may be loaded by reference. :param embedded_schemas: A dict of schemas to send with the request to stop the Engine from contacting Model Manage when resolving schemas. :param dry_run: If True, do not actually load the model, check for errors only. """ self._active_model = None # force reload self._active_streams = None def maybe_externalize(att): ctype = ATTACHMENT_CONTENT_TYPES[att.atype] if six.PY3 or (att.datasize > Engine.MAX_INLINE_ATTACHMENT and not force_inline): ## See https://opendatagoup.atlassian.net/wiki/display/FAS/Working+with+large+attachments ## ## An example of an externalized attachment: ## ## Content-Type: message/external-body; access-type=x-model-manage; name="att1.zip" ## Content-Disposition: attachment; filename="att1.zip" ## ## Content-Type: application/zip ## Content-Length: 1234 ## ext_type = 'message/external-body; ' + \ 'access-type=x-model-manage; ' + \ 'ref="urn:fastscore:attachment:%s:%s"' % (model.name,att.name) body = 'Content-Type: %s\r\n' % ctype + \ 'Content-Length: %d\r\n' % att.datasize + \ '\r\n' return (att.name,body,ext_type) else: ## data retrieved when you touch .datafile property with open(att.datafile) as f: body = f.read() return (att.name,body,ctype) def quirk(name): return 'filename' if name == 'attachment' else 'name' def multipart_body(parts, boundary): noodle = \ [ '\r\n--' + boundary + '\r\n' + \ 'Content-Disposition: %s; %s="%s"\r\n' % (tag,quirk(tag),name) + \ 'Content-Type: %s\r\n' % ctype + \ '\r\n' + \ body for tag,(name,body,ctype) in parts ] noodle.append('\r\n--' + boundary + '--\r\n') return ''.join(noodle) try: ct = MODEL_CONTENT_TYPES[model.mtype] attachments = list(model.attachments) if len(attachments) == 0 and len(embedded_schemas) == 0: data = model.source cd = 'x-model; name="%s"' % model.name return self.swg.model_load(self.name, data, dry_run=dry_run, content_type=ct, content_disposition=cd) else: ## Swagger 2.0 does allow complex multipart requests - craft it manually. parts = [ ('attachment',maybe_externalize(x)) for x in attachments ] + \ [ ('x-schema',(name,schema,SCHEMA_CONTENT_TYPE)) for name,schema in embedded_schemas.items() ] parts.append( ('x-model',(model.name,model.source,ct)) ) boundary = b2a_hex(urandom(12)) if six.PY3: boundary = boundary.decode('utf-8') data = multipart_body(parts, boundary) return self.swg.model_load(self.name, data, dry_run=dry_run, content_type='multipart/mixed; boundary=' + boundary) except Exception as e: raise FastScoreError("Unable to load model '%s'" % model.name, caused_by=e)
[docs] def scale(self, factor): """ Changes the number of running model instances. """ try: self.swg2.active_model_scale(self.name, factor) except Exception as e: raise FastScoreError("Unable to scale model", caused_by=e)
def unload_model(self): try: self.swg2.active_model_delete(self.name) except Exception as e: if isinstance(e, ApiException2) and e.status == 404: raise FastScoreError("Model not loaded") else: raise FastScoreError("Unable to unload model", caused_by=e) def attach_stream(self, stream, slot, dry_run=False): try: cd = 'x-stream; name="%s"' % stream.name return self.swg2.active_stream_attach(self.name, stream.desc, slot, dry_run=dry_run, content_disposition=cd) except Exception as e: raise FastScoreError("Unable to attach stream", caused_by=e) def detach_stream(self, slot): try: self.swg2.active_stream_detach(self.name, slot) except Exception as e: raise FastScoreError("Unable to detach stream", caused_by=e) def sample_stream(self, stream, n): try: if n: return self.swg.stream_sample(self.name, stream.desc, n=n) else: return self.swg.stream_sample(self.name, stream.desc) except Exception as e: raise FastScoreError("Unable to sample stream", caused_by=e) def verify_schema(self, schema): try: reply = self.swg2.active_schema_verify(self.name, schema.source) return reply.id except ApiException2 as e: raise FastScoreError(e.body)
[docs] def verify_data(self, sid, rec): """ Verify schema against a prepared schema. :param int sid: The schema id. :param str rec: The data record to verify. """ try: self.swg2.active_schema_verify_data(self.name, sid, {'data': rec}) except ApiException2 as e: raise FastScoreError(e.body)
# What a clumsy name def unverify_schema(self, sid): try: self.swg2.active_schema_unverify(self.name, sid) except Exception as e: raise FastScoreError("Unable to unload a verification schema", caused_by=e) def restore_state(self, blobfile): try: self.swg.job_state_restore(self.name, blobfile) except Exception as e: raise FastScoreError("Unable to restore model state", caused_by=e)