feature: async binhost

master 3.7.2.36
root 1 year ago
parent fa41495e6a
commit 9422fec539

@ -13,7 +13,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import aiohttp
import sys
import urllib.request as urllib2
import re
@ -40,6 +41,27 @@ MINUTES = 60
HOURS = 60 * MINUTES
DAYS = 24 * HOURS
async def async_open(fn, encoding=None, timeout=900):
t = time.time()
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout)) as session:
try:
async with session.get(fn) as resp:
if resp.status in range(200, 300):
res = await resp.text(encoding=encoding)
return res
raise ValueError
except Exception as e:
return None
async def get_async_timestamp(timestamp):
start_time = time.time()
ts_res = await async_open(timestamp, timeout=15) or 0
duration = (time.time() - start_time) * 1000
return int(ts_res), duration
@contextmanager
def _urlopen(fn, timeout=None):
"""
@ -100,6 +122,17 @@ class BinhostsBase(Cachable):
def check_package_timestamp(self, fn, timestamp):
return timestamp.encode("UTF-8") == self.fetch_package_timestamp(fn)
@Cachable.methodcached()
async def async_fetch_envdata(self, binhost):
revision_files = (path.join(binhost, x) for x in self.revision_path)
for fn in revision_files:
try:
data = await async_open(fn, encoding="UTF-8")
return fn, data
except BaseException as e:
return None, ""
return None, ""
@Cachable.methodcached()
def fetch_envdata(self, binhost):
revision_files = (path.join(binhost, x)
@ -150,6 +183,8 @@ class BinhostsBase(Cachable):
re_revison = re.compile("\w+=(\w+)")
def _get_timestamp(self, timestamp_file):
"""
Получить timestamp с сервера обновлений
@ -262,6 +297,118 @@ class BinhostsBase(Cachable):
else:
return other.valid
@total_ordering
class AsyncBinhost():
def __init__(self, parent, host):
self.host = host
self.parent = parent
self.timestamp_file = path.join(self.host, self.parent.ts_path)
# try:
# self.timestamp_file = path.join(self.host, self.parent.ts_path)
# #self.timestamp = self.parent._get_timestamp(timestamp_file)
# #self.duration = int((time.time() - start_ts) * 1000)
# self.timestamp = 0
# self.duration = 10000000
# # self.outdated = int(start_ts) - self.timestamp > parent.actual_period
# # self.downgraded = self.timestamp < parent.last_ts
#
# fn, data = self.parent.fetch_envdata(self.host)
# if fn:
# cp = ConfigParserCaseSens()
# try:
# cp.read_string(data)
# if "update" in cp.sections():
# self._level = int(cp["update"]["level"])
# else:
# self._level = None
# except (CPError, KeyError) as e:
# self._level = None
# else:
# self._level = None
# except BaseException as e:
# if isinstance(e, KeyboardInterrupt):
# raise
# self.timestamp = 0
# self.duration = 0
# self.outdated = True
# self.downgraded = True
# self._level = 0
async def init(self):
try:
start_ts = time.time()
self.timestamp, self.duration = await get_async_timestamp(self.timestamp_file)
if int(self.timestamp) == 0:
raise ValueError
self.outdated = int(start_ts) - int(self.timestamp) > self.parent.actual_period
self.downgraded = self.timestamp < self.parent.last_ts
fn, data = await self.parent.async_fetch_envdata(self.host)
if fn:
cp = ConfigParserCaseSens()
try:
cp.read_string(data)
if "update" in cp.sections():
self._level = int(cp["update"]["level"])
else:
self._level = None
except (CPError, KeyError) as e:
self._level = None
else:
self._level = None
return self
except BaseException as e:
if isinstance(e, KeyboardInterrupt):
raise
self.timestamp = 0
self.duration = 0
self.outdated = True
self.downgraded = True
self._level = 0
return self
@property
def status(self):
return self.parent.binhost_status(self.host)
@property
def data(self):
return self.parent.fetch_envdata(self.host)[1]
@property
def valid(self):
return self.timestamp != 0
@property
def bad_sign(self):
return not self.parent.binhost_check_sign(self.host)
@property
def level(self):
return self._level
def __eq__(self, other):
if not self.valid and self.valid == other.valid:
return True
if self.valid != other.valid:
return False
return (self.outdated == other.outdated and self.duration == other.duration
and self.timestamp == self.timestamp)
def __lt__(self, other):
if self.valid:
if not other.valid:
return False
if self.outdated == other.outdated:
if self.outdated:
return (self.timestamp, -self.duration) < (other.timestamp, -other.duration)
else:
return (-self.duration, self.timestamp) < (-other.duration, other.timestamp)
return other.outdated
else:
return other.valid
class BaseBinhost(Binhost):
"""
@ -297,6 +444,13 @@ class BinhostsBase(Cachable):
def get_binhosts(self):
return [self.get_binhost(x) for x in self.binhost_list if x]
@Cachable.methodcached()
async def get_asyncbinhosts(self):
async_lst = await asyncio.gather(*[self.AsyncBinhost(self, x).init() for x in self.binhost_list
if x and not x.startswith("ftp://")])
ftp_lst = [self.get_binhost(x) for x in self.binhost_list if x and x.startswith("ftp://")]
return async_lst + ftp_lst
def is_cache(self):
return False
raise NotImplementedError("Need to revision")

@ -41,4 +41,4 @@ class VariableClVer(ReadonlyVariable):
"""
Package version
"""
value = "3.6.7"
value = "3.7.3.0"

Loading…
Cancel
Save