You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
86 lines
2.9 KiB
86 lines
2.9 KiB
import pytest
|
|
import os
|
|
from calculate.utils.files import Process
|
|
from subprocess import run, PIPE
|
|
|
|
|
|
CHROOT_PATH = os.path.join(os.getcwd(), 'tests/utils/testfiles/')
|
|
|
|
|
|
@pytest.mark.files_utils
|
|
class TestUtils():
|
|
def test_if_single_correct_command_executed_using_Process_object__it_successfully_executes(self):
|
|
ls_path = os.path.join(CHROOT_PATH, 'etc')
|
|
try:
|
|
ls_process = Process('ls', '-a', cwd=ls_path)
|
|
ls_result = run('ls -a', shell=True, stdout=PIPE, cwd=ls_path)
|
|
except Exception as error:
|
|
print('error:', str(error))
|
|
assert False
|
|
|
|
output = ls_process.read().strip()
|
|
result = ls_result.stdout.decode().strip()
|
|
assert output == result
|
|
|
|
def test_if_pipe_is_executed_using_Process_object__it_has_successfully_executed(self):
|
|
try:
|
|
pipe = Process('grep', 'VGA',
|
|
stdin=Process('/usr/sbin/lspci')
|
|
)
|
|
pipe_result = run('/usr/sbin/lspci | grep "VGA"', shell=True,
|
|
stdout=PIPE)
|
|
except Exception as error:
|
|
print('error:', str(error))
|
|
assert False
|
|
|
|
output = pipe.read().strip()
|
|
result = pipe_result.stdout.decode().strip()
|
|
assert output == result
|
|
|
|
def test_if_a_pipe_Process_object_is_used_for_several_writes_without_readings__it_successfully_executes(self):
|
|
input_lines = ['KEY: important line.\n',
|
|
'NONE: not important line.\n',
|
|
'NONE: nothing interesting.\n',
|
|
'KEY: another line.\n']
|
|
|
|
output_text = 'KEY: important line.\nKEY: another line.\n'
|
|
|
|
try:
|
|
pipe = Process('grep', 'KEY',
|
|
stdin=Process('cat', stdin=PIPE)
|
|
)
|
|
except Exception as error:
|
|
print('error:', str(error))
|
|
assert False
|
|
|
|
for line in input_lines:
|
|
pipe.write(line)
|
|
output = pipe.read()
|
|
assert output == output_text
|
|
|
|
def test_if_a_pipe_Process_object_is_used_for_several_writes__it_successfully_executes_even_after_read(self):
|
|
input_lines = ['KEY: important line.\n',
|
|
'NONE: not important line.\n',
|
|
'NONE: nothing interesting.\n',
|
|
'KEY: another line.\n']
|
|
|
|
output_lines = ['KEY: important line.\n',
|
|
'KEY: another line.\n']
|
|
|
|
try:
|
|
pipe = Process('grep', 'KEY',
|
|
stdin=Process('cat', stdin=PIPE)
|
|
)
|
|
except Exception as error:
|
|
print('error:', str(error))
|
|
assert False
|
|
|
|
output = []
|
|
for line in input_lines:
|
|
pipe.write(line)
|
|
output_line = pipe.read()
|
|
|
|
if output_line:
|
|
output.append(output_line)
|
|
assert output == output_lines
|