import {CommandResponseType, ExecuteResult, GenericCommandMapping, GenericCommands, GenericNotify} from "./Definitions"; import { MessageContext, MessageHandler, NotifyHandler, WorkerMessage, WorkerMessageCommand, WorkerMessageCommandResponseError, WorkerMessageCommandResponseSuccess, WorkerMessageNotify } from "./Protocol"; type PendingCommand = { timeout?: any, timestampSend: number, callbackResolve: (_: ExecuteResult) => void; } export interface WorkerEvents { notify_worker_died: {} } export abstract class WorkerCommunication< CommandsSend extends GenericCommands, CommandsReceive extends GenericCommands, CommandMapping extends GenericCommandMapping, NotifySend extends GenericNotify, NotifyReceive extends GenericNotify > { private tokenIndex = 0; protected pendingCommands: {[key: string]: PendingCommand} = {}; protected messageHandlers: {[key: string]: MessageHandler} = {}; protected notifyHandlers: {[key: string]: NotifyHandler} = {}; protected constructor() { } registerMessageHandler(command: Command, handler: MessageHandler>) { this.messageHandlers[command as any] = handler; } registerNotifyHandler(notify: Notify, handler: NotifyHandler) { this.notifyHandlers[notify as any] = handler; } execute( command: T, data: CommandsSend[T], timeout?: number, transfer?: Transferable[] ) : Promise>> { return new Promise(resolve => { const token = this.tokenIndex++ + "_token"; this.pendingCommands[token] = { timeout: typeof timeout === "number" ? setTimeout(() => { this.pendingCommands[token]?.callbackResolve({ success: false, error: "command timed out", timings: { upstream: 0, handle: 0, downstream: 0 } }); }, timeout) : undefined, callbackResolve: result => { clearTimeout(this.pendingCommands[token]?.timeout); delete this.pendingCommands[token]; resolve(result); }, timestampSend: Date.now() }; try { this.postMessage({ command: command, type: "command", payload: data, token: token } as WorkerMessageCommand, transfer); } catch (error) { let message; if(typeof error === "string") { message = error; } else if(error instanceof Error) { message = error.message; } else { console.error("Failed to post a message: %o", error); message = "lookup the console"; } this.pendingCommands[token].callbackResolve({ success: false, error: message, timings: { downstream: 0, handle: 0, upstream: 0 } }); } }); } async executeThrow( command: T, data: CommandsSend[T], timeout?: number, transfer?: Transferable[] ) : Promise> { const response = await this.execute(command, data, timeout, transfer); if(response.success === false) { throw response.error; } return response.result; } notify(notify: T, payload: NotifySend[T], transfer?: Transferable[]) { this.postMessage({ type: "notify", notify: notify, payload: payload } as WorkerMessageNotify, transfer); } protected handleMessage(message: WorkerMessage) { const timestampReceived = Date.now(); if(message.type === "notify") { const notifyHandler = this.notifyHandlers[message.notify]; if(typeof notifyHandler !== "function") { console.warn("Received unknown notify (%s)", message.notify); return; } notifyHandler(message.payload); return; } else if(message.type === "response") { const request = this.pendingCommands[message.token]; if(typeof request !== "object") { console.warn("Received execute result for unknown token (%s)", message.token); return; } delete this.pendingCommands[message.token]; clearTimeout(request.timeout); if(message.status === "success") { request.callbackResolve({ timings: { downstream: message.timestampReceived - request.timestampSend, handle: message.timestampSend - message.timestampReceived, upstream: Date.now() - message.timestampSend }, success: true, result: message.result }); } else { request.callbackResolve({ timings: { downstream: message.timestampReceived - request.timestampSend, handle: message.timestampSend - message.timestampReceived, upstream: Date.now() - message.timestampSend }, success: false, error: message.error }); } } else if(message.type === "command") { const command = message as WorkerMessageCommand; const sendExecuteError = error => { let errorMessage; if(typeof error === "string") { errorMessage = error; } else if(error instanceof Error) { console.error("Message handle error: %o", error); errorMessage = error.message; } else { console.error("Message handle error: %o", error); errorMessage = "lookup the console"; } postMessage({ type: "response", status: "error", error: errorMessage, timestampReceived: timestampReceived, timestampSend: Date.now(), token: command.token } as WorkerMessageCommandResponseError, undefined); }; const sendExecuteResult = (result, transfer) => { postMessage({ type: "response", status: "success", result: result, timestampReceived: timestampReceived, timestampSend: Date.now(), token: command.token } as WorkerMessageCommandResponseSuccess, undefined, transfer); }; const handler = this.messageHandlers[message.command as any]; if(!handler) { sendExecuteError("unknown command"); return; } let context = { transferObjects: [] } as MessageContext; let response; try { response = handler(command.payload, context); } catch(error) { response = Promise.reject(error); } (response instanceof Promise ? response : Promise.resolve(response)).then(result => { sendExecuteResult(result, context.transferObjects); }).catch(error => sendExecuteError(error)); return; } else { console.warn("Received unknown message of type %s. This should never happen!", (message as any).type); return; } } protected abstract postMessage(message: WorkerMessage, transfer?: Transferable[]); }