This repository has been archived on 2023-11-05. You can view files and clone it, but cannot push or open issues or pull requests.
wasm-micro-runtime/test-tools/IoT-APP-Store-Demo/wasm_django/server/wasm_server.py
Josh Triplett b0b0789dca Relicense to Apache-2.0 with the LLVM-exception (#137)
With agreement from contributors.
2019-11-12 07:45:21 +08:00

605 lines
21 KiB
Python
Executable File

'''
/* Copyright (C) 2019 Intel Corporation. All rights reserved.
* SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
*/
'''
import select
import socket
import queue
from time import sleep
import struct
import threading
import time
from ctypes import *
import json
import logging
import os
attr_type_list = [
"ATTR_NONE",
"ATTR_TYPE_SHORT",
"ATTR_TYPE_INT",
"ATTR_TYPE_INT64",
"ATTR_TYPE_BYTE",
"ATTR_TYPE_UINT16",
"ATTR_TYPE_FLOAT",
"ATTR_TYPE_DOUBLE",
"ATTR_TYPE_BOOLEAN",
"ATTR_TYPE_STRING",
"ATTR_TYPE_BYTEARRAY"
]
Phase_Non_Start = 0
Phase_Leading = 1
Phase_Type = 2
Phase_Size = 3
Phase_Payload = 4
class imrt_link_message(object):
def __init__(self):
self.leading = bytes([0x12, 0x34])
self.phase = Phase_Non_Start
self.size_in_phase = 0
self.message_type = bytes()
self.message_size = bytes()
self.payload = bytes()
self.msg = bytes()
def set_recv_phase(self, phase):
self.phase = phase
def on_imrt_link_byte_arrive(self, ch):
self.msg += ch
if self.phase == Phase_Non_Start:
if ch == b'\x12':
self.set_recv_phase(Phase_Leading)
else:
return -1
elif self.phase == Phase_Leading:
if ch == b'\x34':
self.set_recv_phase(Phase_Type)
else:
self.set_recv_phase(Phase_Non_Start)
return -1
elif self.phase == Phase_Type:
self.message_type += ch
self.size_in_phase += 1
if self.size_in_phase == 2:
(self.message_type, ) = struct.unpack('!H', self.message_type)
self.size_in_phase = 0
self.set_recv_phase(Phase_Size)
elif self.phase == Phase_Size:
self.message_size += ch
self.size_in_phase += 1
if self.size_in_phase == 4:
(self.message_size, ) = struct.unpack('!I', self.message_size)
self.size_in_phase = 0
self.set_recv_phase(Phase_Payload)
if self.message_size == b'\x00':
self.set_recv_phase(Phase_Non_Start)
return 0
self.set_recv_phase(Phase_Payload)
elif self.phase == Phase_Payload:
self.payload += ch
self.size_in_phase += 1
if self.size_in_phase == self.message_size:
self.set_recv_phase(Phase_Non_Start)
return 0
return 2
return 1
def read_file_to_buffer(file_name):
file_object = open(file_name, 'rb')
buffer = None
if not os.path.exists(file_name):
logging.error("file {} not found.".format(file_name))
return "file not found"
try:
buffer = file_object.read()
finally:
file_object.close()
return buffer
def decode_attr_container(msg):
attr_dict = {}
buf = msg[26 : ]
(total_len, tag_len) = struct.unpack('@IH', buf[0 : 6])
tag_name = buf[6 : 6 + tag_len].decode()
buf = buf[6 + tag_len : ]
(attr_num, ) = struct.unpack('@H', buf[0 : 2])
buf = buf[2 : ]
logging.info("parsed attr:")
logging.info("total_len:{}, tag_len:{}, tag_name:{}, attr_num:{}"
.format(str(total_len), str(tag_len), str(tag_name), str(attr_num)))
for i in range(attr_num):
(key_len, ) = struct.unpack('@H', buf[0 : 2])
key_name = buf[2 : 2 + key_len - 1].decode()
buf = buf[2 + key_len : ]
(type_index, ) = struct.unpack('@c', buf[0 : 1])
attr_type = attr_type_list[int(type_index[0])]
buf = buf[1 : ]
if attr_type == "ATTR_TYPE_SHORT":
(attr_value, ) = struct.unpack('@h', buf[0 : 2])
buf = buf[2 : ]
# continue
elif attr_type == "ATTR_TYPE_INT":
(attr_value, ) = struct.unpack('@I', buf[0 : 4])
buf = buf[4 : ]
# continue
elif attr_type == "ATTR_TYPE_INT64":
(attr_value, ) = struct.unpack('@q', buf[0 : 8])
buf = buf[8 : ]
# continue
elif attr_type == "ATTR_TYPE_BYTE":
(attr_value, ) = struct.unpack('@c', buf[0 : 1])
buf = buf[1 : ]
# continue
elif attr_type == "ATTR_TYPE_UINT16":
(attr_value, ) = struct.unpack('@H', buf[0 : 2])
buf = buf[2 : ]
# continue
elif attr_type == "ATTR_TYPE_FLOAT":
(attr_value, ) = struct.unpack('@f', buf[0 : 4])
buf = buf[4 : ]
# continue
elif attr_type == "ATTR_TYPE_DOUBLE":
(attr_value, ) = struct.unpack('@d', buf[0 : 8])
buf = buf[8 : ]
# continue
elif attr_type == "ATTR_TYPE_BOOLEAN":
(attr_value, ) = struct.unpack('@?', buf[0 : 1])
buf = buf[1 : ]
# continue
elif attr_type == "ATTR_TYPE_STRING":
(str_len, ) = struct.unpack('@H', buf[0 : 2])
attr_value = buf[2 : 2 + str_len - 1].decode()
buf = buf[2 + str_len : ]
# continue
elif attr_type == "ATTR_TYPE_BYTEARRAY":
(byte_len, ) = struct.unpack('@I', buf[0 : 4])
attr_value = buf[4 : 4 + byte_len]
buf = buf[4 + byte_len : ]
# continue
attr_dict[key_name] = attr_value
logging.info(str(attr_dict))
return attr_dict
class Request():
mid = 0
url = ""
action = 0
fmt = 0
payload = ""
payload_len = 0
sender = 0
def __init__(self, url, action, fmt, payload, payload_len):
self.url = url
self.action = action
self.fmt = fmt
# if type(payload) == bytes:
# self.payload = bytes(payload, encoding = "utf8")
# else:
self.payload_len = payload_len
if self.payload_len > 0:
self.payload = payload
def pack_request(self):
url_len = len(self.url) + 1
buffer_len = url_len + self.payload_len
req_buffer = struct.pack('!2BH2IHI',1, self.action, self.fmt, self.mid, self.sender, url_len, self.payload_len)
for i in range(url_len - 1):
req_buffer += struct.pack('!c', bytes(self.url[i], encoding = "utf8"))
req_buffer += bytes([0])
for i in range(self.payload_len):
req_buffer += struct.pack('!B', self.payload[i])
return req_buffer, len(req_buffer)
def send(self, conn, is_install):
leading = struct.pack('!2B', 0x12, 0x34)
if not is_install:
msg_type = struct.pack('!H', 0x0002)
else:
msg_type = struct.pack('!H', 0x0004)
buff, buff_len = self.pack_request()
lenth = struct.pack('!I', buff_len)
try:
conn.send(leading)
conn.send(msg_type)
conn.send(lenth)
conn.send(buff)
except socket.error as e:
logging.error("device closed")
for dev in tcpserver.devices:
if dev.conn == conn:
tcpserver.devices.remove(dev)
return -1
def query(conn):
req = Request("/applet", 1, 0, "", 0)
if req.send(conn, False) == -1:
return "fail"
time.sleep(0.05)
try:
receive_context = imrt_link_message()
start = time.time()
while True:
if receive_context.on_imrt_link_byte_arrive(conn.recv(1)) == 0:
break
elif time.time() - start >= 5.0:
return "fail"
query_resp = receive_context.msg
print(query_resp)
except OSError as e:
logging.error("OSError exception occur")
return "fail"
res = decode_attr_container(query_resp)
logging.info('Query device infomation success')
return res
def install(conn, app_name, wasm_file):
wasm = read_file_to_buffer(wasm_file)
if wasm == "file not found":
return "failed to install: file not found"
print("wasm file len:")
print(len(wasm))
req = Request("/applet?name=" + app_name, 3, 98, wasm, len(wasm))
if req.send(conn, True) == -1:
return "fail"
time.sleep(0.05)
try:
receive_context = imrt_link_message()
start = time.time()
while True:
if receive_context.on_imrt_link_byte_arrive(conn.recv(1)) == 0:
break
elif time.time() - start >= 5.0:
return "fail"
msg = receive_context.msg
except OSError as e:
logging.error("OSError exception occur")
# TODO: check return message
if len(msg) == 24 and msg[8 + 1] == 65:
logging.info('Install application success')
return "success"
else:
res = decode_attr_container(msg)
logging.warning('Install application failed: %s' % (str(res)))
print(str(res))
return str(res)
def uninstall(conn, app_name):
req = Request("/applet?name=" + app_name, 4, 99, "", 0)
if req.send(conn, False) == -1:
return "fail"
time.sleep(0.05)
try:
receive_context = imrt_link_message()
start = time.time()
while True:
if receive_context.on_imrt_link_byte_arrive(conn.recv(1)) == 0:
break
elif time.time() - start >= 5.0:
return "fail"
msg = receive_context.msg
except OSError as e:
logging.error("OSError exception occur")
# TODO: check return message
if len(msg) == 24 and msg[8 + 1] == 66:
logging.info('Uninstall application success')
return "success"
else:
res = decode_attr_container(msg)
logging.warning('Uninstall application failed: %s' % (str(res)))
print(str(res))
return str(res)
class Device:
def __init__(self, conn, addr, port):
self.conn = conn
self.addr = addr
self.port = port
self.app_num = 0
self.apps = []
cmd = []
class TCPServer:
def __init__(self, server, server_address, inputs, outputs, message_queues):
# Create a TCP/IP
self.server = server
self.server.setblocking(False)
# Bind the socket to the port
self.server_address = server_address
print('starting up on %s port %s' % self.server_address)
self.server.bind(self.server_address)
# Listen for incoming connections
self.server.listen(10)
self.cmd_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.cmd_sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
self.cmd_sock.bind(('127.0.0.1', 8889))
self.cmd_sock.listen(5)
# Sockets from which we expect to read
self.inputs = inputs
self.inputs.append(self.cmd_sock)
# Sockets to which we expect to write
# 处理要发送的消息
self.outputs = outputs
# Outgoing message queues (socket: Queue)
self.message_queues = message_queues
self.devices = []
self.conn_dict = {}
def handler_recever(self, readable):
# Handle inputs
for s in readable:
if s is self.server:
# A "readable" socket is ready to accept a connection
connection, client_address = s.accept()
self.client_address = client_address
print('connection from', client_address)
# this is connection not server
# connection.setblocking(0)
self.inputs.append(connection)
# Give the connection a queue for data we want to send
# self.message_queues[connection] = queue.Queue()
res = query(connection)
if res != "fail":
dev = Device(connection, client_address[0], client_address[1])
self.devices.append(dev)
self.conn_dict[client_address] = connection
dev_info = {}
dev_info['addr'] = dev.addr
dev_info['port'] = dev.port
dev_info['apps'] = 0
logging.info('A new client connected from ("%s":"%s")' % (dev.conn, dev.port))
elif s is self.cmd_sock:
connection, client_address = s.accept()
print("web server socket connected")
logging.info("Django server connected")
self.inputs.append(connection)
self.message_queues[connection] = queue.Queue()
else:
data = s.recv(1024)
if data != b'':
# A readable client socket has data
logging.info('received "%s" from %s' % (data, s.getpeername()))
# self.message_queues[s].put(data)
# # Add output channel for response
# if s not in self.outputs:
# self.outputs.append(s)
if(data.decode().split(':')[0] == "query"):
if data.decode().split(':')[1] == "all":
resp = []
print('start query all devices')
for dev in self.devices:
dev_info = query(dev.conn)
if dev_info == "fail":
continue
dev_info["addr"] = dev.addr
dev_info["port"] = dev.port
resp.append(str(dev_info))
print(resp)
if self.message_queues[s] is not None:
# '*' is used in web server to sperate the string
self.message_queues[s].put(bytes("*".join(resp), encoding = 'utf8'))
if s not in self.outputs:
self.outputs.append(s)
else:
client_addr = (data.decode().split(':')[1],int(data.decode().split(':')[2]))
if client_addr in self.conn_dict.keys():
print('start query device from (%s:%s)' % (client_addr[0], client_addr[1]))
resp = query(self.conn_dict[client_addr])
print(resp)
if self.message_queues[s] is not None:
self.message_queues[s].put(bytes(str(resp), encoding = 'utf8'))
if s not in self.outputs:
self.outputs.append(s)
else: # no connection
if self.message_queues[s] is not None:
self.message_queues[s].put(bytes(str("fail"), encoding = 'utf8'))
if s not in self.outputs:
self.outputs.append(s)
elif(data.decode().split(':')[0] == "install"):
client_addr = (data.decode().split(':')[1],int(data.decode().split(':')[2]))
app_name = data.decode().split(':')[3]
app_file = data.decode().split(':')[4]
if client_addr in self.conn_dict.keys():
print('start install application %s to ("%s":"%s")' % (app_name, client_addr[0], client_addr[1]))
res = install(self.conn_dict[client_addr], app_name, app_file)
if self.message_queues[s] is not None:
logging.info("response {} to cmd server".format(res))
self.message_queues[s].put(bytes(res, encoding = 'utf8'))
if s not in self.outputs:
self.outputs.append(s)
elif(data.decode().split(':')[0] == "uninstall"):
client_addr = (data.decode().split(':')[1],int(data.decode().split(':')[2]))
app_name = data.decode().split(':')[3]
if client_addr in self.conn_dict.keys():
print("start uninstall")
res = uninstall(self.conn_dict[client_addr], app_name)
if self.message_queues[s] is not None:
logging.info("response {} to cmd server".format(res))
self.message_queues[s].put(bytes(res, encoding = 'utf8'))
if s not in self.outputs:
self.outputs.append(s)
# if self.message_queues[s] is not None:
# self.message_queues[s].put(data)
# if s not in self.outputs:
# self.outputs.append(s)
else:
logging.warning(data)
# Interpret empty result as closed connection
try:
for dev in self.devices:
if s == dev.conn:
self.devices.remove(dev)
# Stop listening for input on the connection
if s in self.outputs:
self.outputs.remove(s)
self.inputs.remove(s)
# Remove message queue
if s in self.message_queues.keys():
del self.message_queues[s]
s.close()
except OSError as e:
logging.error("OSError raised, unknown connection")
return "got it"
def handler_send(self, writable):
# Handle outputs
for s in writable:
try:
message_queue = self.message_queues.get(s)
send_data = ''
if message_queue is not None:
send_data = message_queue.get_nowait()
except queue.Empty:
self.outputs.remove(s)
else:
# print "sending %s to %s " % (send_data, s.getpeername)
# print "send something"
if message_queue is not None:
s.send(send_data)
else:
print("client has closed")
# del message_queues[s]
# writable.remove(s)
# print "Client %s disconnected" % (client_address)
return "got it"
def handler_exception(self, exceptional):
# # Handle "exceptional conditions"
for s in exceptional:
print('exception condition on', s.getpeername())
# Stop listening for input on the connection
self.inputs.remove(s)
if s in self.outputs:
self.outputs.remove(s)
s.close()
# Remove message queue
del self.message_queues[s]
return "got it"
def event_loop(tcpserver, inputs, outputs):
while inputs:
# Wait for at least one of the sockets to be ready for processing
print('waiting for the next event')
readable, writable, exceptional = select.select(inputs, outputs, inputs)
if readable is not None:
tcp_recever = tcpserver.handler_recever(readable)
if tcp_recever == 'got it':
print("server have received")
if writable is not None:
tcp_send = tcpserver.handler_send(writable)
if tcp_send == 'got it':
print("server have send")
if exceptional is not None:
tcp_exception = tcpserver.handler_exception(exceptional)
if tcp_exception == 'got it':
print("server have exception")
sleep(0.1)
def run_wasm_server():
server_address = ('localhost', 8888)
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
inputs = [server]
outputs = []
message_queues = {}
tcpserver = TCPServer(server, server_address, inputs, outputs, message_queues)
task = threading.Thread(target=event_loop,args=(tcpserver,inputs,outputs))
task.start()
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG,
filename='wasm_server.log',
filemode='a',
format=
'%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s'
)
server_address = ('0.0.0.0', 8888)
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
inputs = [server]
outputs = []
message_queues = {}
tcpserver = TCPServer(server, server_address, inputs, outputs, message_queues)
logging.info("TCP Server start at {}:{}".format(server_address[0], "8888"))
task = threading.Thread(target=event_loop,args=(tcpserver,inputs,outputs))
task.start()
# event_loop(tcpserver, inputs, outputs)