# Copyright 2019 The Australian National University # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from rpython.rtyper.lltypesystem import rffi, lltype from rpython.rlib.rmu import zebu as rmu from rpython.rlib.rrtmu import rtzebu as rrtmu from rpython.translator.platform import platform from rpython.tool.ansi_print import AnsiLogger from util import fncptr_from_rpy_func, fncptr_from_py_script, may_spawn_proc, bin_dir, executable_from_rpy_func import ctypes, py, stat, os import pytest import time logger = AnsiLogger('rt_rpython_tests') # -------------------------- # tests # @may_spawn_proc # def test_newregion(): # from rpython.dev.dev_new_region import tester # # fn, _ = fncptr_from_rpy_func(tester, [], rffi.VOIDPP) # # assert fn() != lltype.nullptr(rffi.VOIDPP.TO) # # # # # @may_spawn_proc # def test_ralloc(): # from rpython.dev.dev_ralloc import test_ralloc # fn, _ = fncptr_from_rpy_func(test_ralloc, [rffi.SIGNED, rffi.SIGNED, rffi.SIGNED], rffi.SIGNED) # # assert fn(-2, 7, 0) == 5 # # # # # @may_spawn_proc # def test_ealloc(): # from rpython.dev.dev_ealloc import test_ealloc # fn, _ = fncptr_from_rpy_func(test_ealloc, [rffi.SIGNED, rffi.SIGNED, rffi.SIGNED], rffi.SIGNED) # # assert fn(111, 1335, 17) == 1463 # # # # # @may_spawn_proc # def test_mixed_memory_areas(): # from rpython.dev.dev_memory_areas import test_memory_areas # fn, _ = fncptr_from_rpy_func(test_memory_areas, [rffi.SIGNED, rffi.SIGNED, rffi.SIGNED], rffi.SIGNED) # # assert fn(2, -11, 14) == 15 # # @may_spawn_proc # def test_edelete(): # from rpython.dev.dev_edelete import test_edelete # fn, _ = fncptr_from_rpy_func(test_edelete, [rffi.SIGNED, rffi.SIGNED, rffi.SIGNED], rffi.SIGNED) # # assert fn(3, 1, 7) == 11 # @may_spawn_proc # def test_thread_simple(): # from rpython.dev.dev_new_thread import test_simple_threads # logger.info('going to build the test function') # exec_path = executable_from_rpy_func(test_simple_threads, [], rffi.SIGNED) # # logger.info('going to run the test function') # # # import subprocess # # subprocess.call([fn()]) # # subprocess.check_output([fn()]) # # import os # # start = time.time() # os.system('sudo LD_LIBRARY_PATH=$PWD/emit %s' % exec_path) # end = time.time() # # print("Test took: ") # print(end-start) # # logger.info('second call') # # assert True # @may_spawn_proc # def test_default_thread_attr(): # from rpython.dev.dev_thread_attr import test_default_attr # logger.info('going to build the test function') # exec_path = executable_from_rpy_func(test_default_attr, [], rffi.SIGNED) # # logger.info('going to run the test function') # # # import subprocess # # subprocess.call([fn()]) # # subprocess.check_output([fn()]) # # import os # # start = time.time() # os.system('sudo LD_LIBRARY_PATH=$PWD/emit %s' % exec_path) # end = time.time() # # print("Test took: ") # print(end-start) # # logger.info('second call') # # assert True # @may_spawn_proc # def test_set_attr_priority(): # from rpython.dev.dev_thread_attr import test_set_priority # logger.info('going to build the test function') # exec_path = executable_from_rpy_func(test_set_priority, [], rffi.SIGNED) # # logger.info('going to run the test function') # # # import subprocess # # subprocess.call([fn()]) # # subprocess.check_output([fn()]) # # import os # # start = time.time() # os.system('sudo LD_LIBRARY_PATH=$PWD/emit %s' % exec_path) # end = time.time() # # print("Test took: ") # print(end-start) # # logger.info('second call') # # assert True # @may_spawn_proc # def test_thread_global_args(): # from rpython.dev.dev_thread_global_args import test_simple_threads # logger.info('going to build the test function') # exec_path = executable_from_rpy_func(test_simple_threads, [], rffi.SIGNED) # # logger.info('going to run the test function') # # # import subprocess # # subprocess.call([fn()]) # # subprocess.check_output([fn()]) # # import os # # start = time.time() # os.system('sudo LD_LIBRARY_PATH=$PWD/emit %s' % exec_path) # end = time.time() # # print("Test took: ") # print(end-start) # # logger.info('second call') # # assert True # @may_spawn_proc # def test_futex_global(): # from rpython.dev.dev_futex import test_simple_threads # logger.info('going to build the test function') # exec_path = executable_from_rpy_func(test_simple_threads, [], rffi.SIGNED) # # logger.info('going to run the test function') # # # import subprocess # # subprocess.call([fn()]) # # subprocess.check_output([fn()]) # # import os # # start = time.time() # os.system('sudo LD_LIBRARY_PATH=$PWD/emit %s' % exec_path) # end = time.time() # # print("Test took: ") # print(end-start) # # logger.info('second call') # # assert True # @may_spawn_proc # def test_thread_global_args(): # from rpython.dev.dev_thread_local_args import test_simple_threads # logger.info('going to build the test function') # exec_path = executable_from_rpy_func(test_simple_threads, [], rffi.SIGNED) # # logger.info('going to run the test function') # # # import subprocess # # subprocess.call([fn()]) # # subprocess.check_output([fn()]) # # import os # # start = time.time() # os.system('sudo LD_LIBRARY_PATH=$PWD/emit %s' % exec_path) # end = time.time() # # print("Test took: ") # print(end-start) # # logger.info('second call') # # assert True # @may_spawn_proc # def test_gettime_ns(): # from rpython.dev.dev_get_time import test_gettime # fn, _ = fncptr_from_rpy_func(test_gettime, [rffi.SIGNED, rffi.SIGNED], rffi.SIGNED) # # res = fn(10000, 10000) # # print("result:") # print(res) # # assert res > 0 # @may_spawn_proc # def test_get_time(): # from rpython.dev.dev_get_time import test_gettime # logger.info('going to build the test function') # # exec_path = executable_from_rpy_func(test_gettime, [rffi.SIGNED, rffi.SIGNED], rffi.SIGNED) # exec_path = executable_from_rpy_func(test_gettime, [], rffi.SIGNED) # # logger.info('going to run the test function') # # # import subprocess # # subprocess.call([fn()]) # # subprocess.check_output([fn()]) # # import subprocess # # start = time.time() # res = subprocess.call('sudo LD_LIBRARY_PATH=$PWD/emit %s' % exec_path, shell=True) # end = time.time() # # logger.info("Test took: ") # logger.info(end-start) # # logger.info('returned result: %s' % res) # # assert False # @may_spawn_proc # def test_simple_timer(): # from rpython.dev.dev_timer import test_simple_timer # logger.info('going to build the test function') # exec_path = executable_from_rpy_func(test_simple_timer, [], rffi.SIGNED) # # logger.info('going to run the test function') # # # import subprocess # # subprocess.call([fn()]) # # subprocess.check_output([fn()]) # # import subprocess # import os # # start = time.time() # # res = subprocess.call( # 'sudo LD_LIBRARY_PATH=$PWD/emit:$LD_LIBRARY_PATH MU_LOG_LEVEL=debug %s' % exec_path, shell=True) # # end = time.time() # # logger.info("Test took: ") # logger.info(end-start) # # logger.info('returned res = %d' % res) # # # assert False # assert 0 <= res <= 50 @may_spawn_proc def test_collision_detection(): from rpython.dev.dev_CD import dev_cd logger.info('going to build the test function') exec_path = executable_from_rpy_func(dev_cd.main_region, [], rffi.SIGNED) logger.info('going to run the test function') # import subprocess # subprocess.call([fn()]) # subprocess.check_output([fn()]) import subprocess import os start = time.time() res = subprocess.call( 'sudo LD_LIBRARY_PATH=$PWD/emit:$LD_LIBRARY_PATH MU_LOG_LEVEL=trace %s' % exec_path, shell=True) end = time.time() logger.info("Test took: ") logger.info(end-start) logger.info('returned res = %d' % res) assert False # assert 0 <= res <= 50