You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
220 lines
7.5 KiB
220 lines
7.5 KiB
#!/usr/bin/env python3
|
|
# -*- coding: UTF-8 -*-
|
|
import json
|
|
# from BaseHTTPServer import BaseHTTPRequestHandler,HTTPServer
|
|
from http.server import BaseHTTPRequestHandler
|
|
import logging
|
|
import os
|
|
from utils.utils import get_list_overlays, load_config, write_config, sort_install_pkg
|
|
from utils.package import search
|
|
from utils.findfsdb import on_find
|
|
from core.route import Router
|
|
from genaratorSecretsToken import jwt
|
|
import logging
|
|
from io import BytesIO
|
|
#repl = '<?xml version=\'1.0\' encoding=\'utf-8\'?>\n<!DOCTYPE repositories SYSTEM "http://www.gentoo.org/dtd/repositories.dtd">'
|
|
|
|
def login(Auth={user='demo', password='demo'}):
|
|
if auth():
|
|
return(generatoSecretsToken())
|
|
else:
|
|
return ("403")
|
|
|
|
|
|
|
|
def main():
|
|
try:
|
|
with open('./pkgs.json', 'tr') as fn:
|
|
data = fn.read()
|
|
pkg_list = json.loads(data)
|
|
#print(pkg_list)
|
|
return json.dumps(pkg_list)
|
|
except Exception(e):
|
|
print(e)
|
|
return "404"
|
|
|
|
def set_settings_app(params):
|
|
config = load_config()
|
|
# param = self.path.replace("/?st_app=", "")
|
|
list_param = param.split(',')
|
|
print(list_param)
|
|
for i in list_param:
|
|
if i.startswith('port'):
|
|
port = int(i.split('=')[1])
|
|
elif i.startswith('Lang'):
|
|
Lang = i.split('=')[1]
|
|
|
|
write_config(port, Lang)
|
|
print(config)
|
|
print(param)
|
|
|
|
def search(pkg):
|
|
# param = self.path.replace("/find?pkg=", "") #request['params']['name']
|
|
Pk_list = []
|
|
Search_result = {}
|
|
if len(param.split('/')) == 2:
|
|
param = param.split('/')[1]
|
|
P_list = on_find(param)
|
|
print(p_list)
|
|
if len(P_list) == 0:
|
|
print("Never Found")
|
|
self.r_t = str(json.dumps({"Package_result": p_list}))
|
|
else:
|
|
for p in p_list:
|
|
print(p)
|
|
if len(param.split("/")) == 2:
|
|
Pk_list.append(search(param.split("/")[1]))
|
|
else:
|
|
Pk_list.append(search(param))
|
|
# print(pk)
|
|
Search_result = {"Package_result": Pk_list}
|
|
# self.r_t = str(json.dumps(search_result))
|
|
return str(json.dumps(Search_result))
|
|
# self.r_t = json.dumps(pkg_list)
|
|
|
|
def get_settings_app():
|
|
return str(json.dumps(load_config()))
|
|
|
|
class Handler(BaseHTTPRequestHandler):
|
|
def _set_response(self):
|
|
self.send_response(200)
|
|
self.send_header['Access-Control-Allow-Origin', '*']
|
|
#self.send_header['Access-Control-Allow-Methods', 'GET']
|
|
self.send_header['Access-Control-Allow-Headers', 'X-Requested-With,content-type']
|
|
# self.send_header['Access-Control-Allow-Credentials', true]
|
|
self.end_headers()
|
|
def response(self):
|
|
# return self.r_t to HTTP Rsponse
|
|
if self.r_t is not "":
|
|
# self._set_response()
|
|
# HTTP 2.0 response <<Bynary>> HTTP 1.* response <<Text>>
|
|
try:
|
|
return self.wfile.write(BytesIO(self.r_t).getvalue())
|
|
except TypeError:
|
|
#print("TypeError")
|
|
return self.wfile.write(bytes(self.r_t, 'UTF-8'))
|
|
else:
|
|
self.send_response(404)
|
|
|
|
def get_data(self):
|
|
length = int(self.headers['content-length'])
|
|
data = self.rfile.read(length)
|
|
|
|
def do_POST(self):
|
|
self.get_data =()
|
|
post_data = self.rfile.read(content_length) # <--- Gets the data itself
|
|
logging.info("POST request,\nPath: %s\nHeaders:\n%s\n\nBody:\n%s\n",
|
|
str(self.path), str(self.headers), post_data.decode('utf-8')
|
|
print("do_POST:\n")
|
|
print(str(self.path), str(self.headers))
|
|
return(post_data)
|
|
|
|
def do_HEAD(self):
|
|
print("do_HEAD:\n")
|
|
print(str(self.path), str(self.headers))
|
|
#elf.send_header(['Access-Control-Allow-Origin', '*'],
|
|
#self.send_header(['Access-Control-Allow-Methods', 'GET'],
|
|
#self.send_header(['Access-Control-Allow-Headers', 'X-Requested-With,content-type'])
|
|
#self.send_header(['Access-Control-Allow-Credentials', true])
|
|
self.end_headers()
|
|
|
|
def do_REQUEST(self):
|
|
print("do_REQUEST:\n")
|
|
print(str(self.path), str(self.headers))
|
|
self.r_t = str(self.path), str(self.headers)
|
|
# return self.response()
|
|
|
|
def do_GET(self):
|
|
print("do_GET:\n")
|
|
print(str(self.path), str(self.headers))
|
|
|
|
# print(Router.parse_url(self))
|
|
# print("data:\t" + self.rfile.read())
|
|
# request = Router.parse_url(self)
|
|
#self.send_response(200)
|
|
self.p_list = []
|
|
self.r_t = ""
|
|
# length = int(self.headers['content-length'])
|
|
# self.send_header(['Access-Control-Allow-Origin', '*'],
|
|
# self.send_header(['Access-Control-Allow-Methods', 'GET'],
|
|
# self.send_header(['Access-Control-Allow-Headers', 'X-Requested-With,content-type'])
|
|
# self.send_header(['Access-Control-Allow-Credentials', true])
|
|
# self.end_headers()
|
|
|
|
if self.path == "/":
|
|
# path root -> Return all DATA - (All Tree Portage) config.json as Json
|
|
# print("data:\t" + self.get_data())
|
|
self.r_t = main()
|
|
self.response()
|
|
|
|
elif self.path == '/ovelays':
|
|
# Return List repositories
|
|
overlays = get_list_overlays()
|
|
#print(ovls)
|
|
if overlays: # == "":
|
|
overlays = "Error"
|
|
|
|
self.r_t = json.dumps({"repositories": overlays})
|
|
|
|
elif self.path.startswith("/?st_app="):
|
|
#Parse Settings App -> Write paramets to config.json
|
|
config = load_config()
|
|
param = self.path.replace("/?st_app=", "")
|
|
list_param = param.split(',')
|
|
print(list_param)
|
|
for i in list_param:
|
|
if i.startswith('port'):
|
|
port = int(i.split('=')[1])
|
|
elif i.startswith('Lang'):
|
|
Lang = i.split('=')[1]
|
|
|
|
write_config(port, Lang)
|
|
print(config)
|
|
print(param)
|
|
|
|
elif self.path.startswith("/find?pkg="):
|
|
# search list packages to Teamplates -> return Result
|
|
param = self.path.replace("/find?pkg=", "") #request['params']['name']
|
|
pk_list = []
|
|
search_result = {}
|
|
if len(param.split('/')) == 2:
|
|
param = param.split('/')[1]
|
|
p_list = on_find(param)
|
|
print(p_list)
|
|
if len(p_list) == 0:
|
|
print("Never Found")
|
|
self.r_t = str(json.dumps({"Package_result": p_list}))
|
|
else:
|
|
for p in p_list:
|
|
print(p)
|
|
if len(param.split("/")) == 2:
|
|
pk_list.append(search(param.split("/")[1]))
|
|
else:
|
|
pk_list.append(search(param))
|
|
#print(pk)
|
|
search_result = {"Package_result": pk_list}
|
|
self.r_t = str(json.dumps(search_result))
|
|
|
|
elif self.path == "/get_settings_app":
|
|
#Return Settings App
|
|
|
|
self.r_t = str(json.dumps(load_config()))
|
|
|
|
elif self.path == '/get_portage':
|
|
# Return Tree portage <<Json>>
|
|
|
|
#self.r_t = str(sort_inatll_pkg())
|
|
self.r_t = str(json.dumps(scan_config_portage()))
|
|
|
|
else:
|
|
#Return Error 404
|
|
self.send_response(404)
|
|
|
|
self.end_headers()
|
|
print(str(self.client_address[0]) +"\t" + str(404))
|
|
|
|
# Send the html message
|
|
# self.wfile.write(bytes(self.r_t, "utf-8"))
|
|
|
|
|