Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • flow/emper
  • aj46ezos/emper
  • i4/manycore/emper
3 results
Show changes
Commits on Source (804)
Showing with 2128 additions and 408 deletions
......@@ -6,12 +6,18 @@ Checks: >
performance-*,
portability-*,
readability-*,
-bugprone-assignment-in-if-condition,
-bugprone-easily-swappable-parameters,
-cert-err58-cpp,
-clang-diagnostic-empty-translation-unit,
-performance-avoid-endl,
-readability-braces-around-statements,
-readability-function-cognitive-complexity,
-readability-identifier-length,
-readability-implicit-bool-conversion,
-readability-isolate-declaration,
-readability-magic-numbers,
-readability-simplify-boolean-expr,
WarningsAsErrors: >
bugprone-*,
......@@ -20,4 +26,4 @@ WarningsAsErrors: >
readability-*,
performance-*,
HeaderFilterRegex: .*
HeaderFilterRegex: '(?!subprojects).*'
......@@ -10,3 +10,11 @@ insert_final_newline = true
[*.{c,h,cpp,hpp}]
indent_style = tab
indent_size = 2
[*.yml]
indent_style = space
indent_size = 2
[meson_options.txt]
indent_style = space
indent_size = 2
......@@ -4,6 +4,12 @@
/build-*/
/.cache/
/.clangd/
/clang-tidy-report
tools/gdb/__pycache__/
subprojects/packagecache/
subprojects/googletest*
subprojects/liburing*
*
!.gitignore
image: "flowdalic/debian-testing-dev:1.4"
image: "flowdalic/gentoo-dev:20241216"
before_script:
- ulimit -a
- nproc
- |
readarray TOOLS <<EOF
c++
......@@ -17,46 +19,100 @@ before_script:
valgrind
EOF
for tool in ${TOOLS[@]}; do
if ! command -v $tool; then
echo "No $tool binary found"
continue
fi
echo -n "$tool version: "
$tool --version
done
cache:
paths:
- subprojects/packagecache
variables:
BUILDDTYPE: debugoptimized
CC: gcc
CXX: g++
GCC: gcc
GXX: g++
CC: $GCC
CXX: $GXX
EXTRA_NINJA_ARGS: -v
EMPER_IO: "false"
stages:
- smoke-test
- test
.meson-test:
artifacts:
paths:
- build-*/meson-logs
when: on_failure
reports:
junit: build-*/meson-logs/testlog.junit.xml
smoke-test-suite:
extends:
- .meson-test
stage: smoke-test
script: make smoke-test-suite
static-analysis-with-emper-io:
stage: smoke-test
script: make static-analysis
variables:
EMPER_IO: "true"
fast-static-analysis:
stage: smoke-test
script: make fast-static-analysis
variables:
EMPER_IO: "true"
.fast-variant-check:
stage: test
script: make fast-static-analysis smoke-test-suite
variables:
EMPER_IO: "true"
iwyu:
script: IWYU_TOOL="${CI_PROJECT_DIR}/tools/iwyu_tool.py" make iwyu
variables:
EMPER_IO: "true"
clang-tidy:
script: make tidy
variables:
EMPER_IO: "true"
.build:
stage: test
script:
- make
.test:
extends:
- .meson-test
stage: test
script:
- make test
.gcc:
variables:
CC: gcc
CXX: g++
CC: $GCC
CXX: $GXX
.clang:
variables:
CC: clang
CXX: clang++
.libc++:
extends:
- .clang
variables:
EMPER_USE_BUNDLED_DEPS: "always"
EMPER_CPP_ARGS: "-stdlib=libc++"
EMPER_CPP_LINK_ARGS: "-stdlib=libc++"
.lto:
variables:
EMPER_B_LTO: "true"
.emper-ws-scheduling:
variables:
EMPER_DEFAULT_SCHEDULING_STRATEGY: "work_stealing"
......@@ -69,14 +125,58 @@ static-analysis-with-emper-io:
variables:
EMPER_WORKER_SLEEP: 'false'
.emper-worker-stats:
.emper-stats:
variables:
EMPER_WORKER_STATS: 'true'
EMPER_STATS_ALL: 'true'
.emper-userspace-rcu:
variables:
EMPER_USERSPACE_RCU: 'true'
.emper-no-io:
variables:
EMPER_IO: 'false'
.emper-pipe-sleep-strategy:
variables:
EMPER_WORKER_SLEEP_STRATEGY: 'pipe'
.emper-no-completer:
variables:
EMPER_IO_COMPLETER_BEHAVIOR: 'none'
.emper-single-poller:
variables:
EMPER_IO_URING_SQ_POLLER: 'one'
.emper-numa-poller:
variables:
EMPER_IO_URING_SQ_POLLER: 'numa'
.emper-each-poller:
variables:
EMPER_IO_URING_SQ_POLLER: 'each'
.emper-single-uring:
variables:
EMPER_IO_SINGLE_URING: 'true'
.emper-synchronous-io:
variables:
EMPER_IO_SYNCHRONOUS: 'true'
.emper-io-stealing:
variables:
EMPER_IO_STEALING: 'true'
.emper-lockless-cq:
variables:
EMPER_IO_LOCKLESS_CQ: 'true'
.emper-io-waitfree-stealing:
variables:
EMPER_IO_WAITFREE_STEALING: 'true'
.default-library-static:
variables:
EMPER_DEFAULT_LIBRARY: 'static'
......@@ -123,22 +223,50 @@ static-analysis-with-emper-io:
variables:
EMPER_WORKER_WAKEUP_STRATEGY: "all"
.emper-worker-wakeup-strategy-throttle:
variables:
EMPER_WORKER_WAKEUP_STRATEGY: "throttle"
.do-not-log-timestamp:
variables:
EMPER_LOG_TIMESTAMP: "false"
EMPER_LOG_TIMESTAMP: "none"
.ws-queue-scheduler-locked:
variables:
EMPER_WS_QUEUE_SCHEDULER: "locked"
.ws-queue-scheduler-cl2:
variables:
EMPER_WS_QUEUE_SCHEDULER: "cl2"
.ws-queue-scheduler-cl3:
variables:
EMPER_WS_QUEUE_SCHEDULER: "cl3"
.ws-queue-scheduler-cl4:
variables:
EMPER_WS_QUEUE_SCHEDULER: "cl4"
.locked-ws-queues:
.waitfree-ws:
variables:
EMPER_LOCKED_WS_QUEUE: "true"
EMPER_WAITFREE_WORK_STEALING: "true"
.futex-wakeup-semaphore:
variables:
EMPER_WAKEUP_SEMAPHORE_IMPLEMENTATION: "futex"
.futex2-wakeup-semaphore:
variables:
EMPER_WAKEUP_SEMAPHORE_IMPLEMENTATION: "futex2"
.locked-wakeup-semaphore:
variables:
EMPER_WAKEUP_SEMAPHORE_IMPLEMENTATION: "locked"
.set-affinity-on-block:
variables:
EMPER_SET_AFFINITY_ON_BLOCK: 'true'
test-gcc:
extends:
- .test
......@@ -169,6 +297,22 @@ test-clang-debug:
- test-clang
- .debug-build
smoke-test-libc++:
stage: smoke-test
extends:
- .fast-variant-check
- .libc++
test-lto:
extends:
- .test
- .lto
test-libc++:
extends:
- .test
- .libc++
test-worker-no-sleep:
extends:
- .test
......@@ -177,7 +321,7 @@ test-worker-no-sleep:
test-with-stats:
extends:
- .test
- .emper-worker-stats
- .emper-stats
test-with-userspace-rcu:
extends:
......@@ -215,22 +359,194 @@ test-worker-wakeup-strategy-all:
- .test
- .worker-wakeup-strategy-all
# Disable throttle test till throttle works with notifySpecific
#test-worker-wakeup-strategy-throttle:
# extends:
# - .test
# - .emper-worker-wakeup-strategy-throttle
test-do-not-log-timestamp:
extends:
- .test
- .do-not-log-timestamp
test-locked-ws-queues:
test-ws-queue-scheduler-locked:
extends:
- .test
- .ws-queue-scheduler-locked
test-ws-queue-scheduler-cl2:
extends:
- .test
- .ws-queue-scheduler-cl2
test-ws-queue-scheduler-cl3:
extends:
- .test
- .ws-queue-scheduler-cl3
test-ws-queue-scheduler-cl3:
extends:
- .test
- .locked-ws-queues
- .ws-queue-scheduler-cl3
test-waitfree-ws:
extends:
- .test
- .waitfree-ws
test-futex-wakeup-semaphore:
extends:
- .test
- .futex-wakeup-semaphore
# TODO: enable this if the CI has linux >= 5.16
build-futex-wakeup-semaphore:
extends:
- .build
- .futex2-wakeup-semaphore
test-locked-wakeup-semaphore:
extends:
- .test
- .locked-wakeup-semaphore
test-set-affinity-on-block:
extends:
- .test
- .set-affinity-on-block
test-mmapped-log:
extends:
- .meson-test
script: make && EMPER_LOG_FILE=emper.log make test
test-no-io:
extends:
- .test
- .emper-no-io
test-single-uring:
extends:
- .test
- .emper-single-uring
test-synchronous-io:
extends:
- .test
- .emper-synchronous-io
test-pipe-sleep-strategy:
extends:
- .test
- .emper-pipe-sleep-strategy
test-pipe-sleep-strategy-no-completer:
extends:
- .test
- .emper-pipe-sleep-strategy
- .emper-no-completer
test-lockless-cq:
extends:
- .test
- .emper-lockless-cq
test-io-stealing:
extends:
- .test
- .emper-io-stealing
test-lockless-io-stealing:
extends:
- .test
- .emper-io-stealing
- .emper-lockless-cq
test-waitfree-io-stealing:
extends:
- .test
- .emper-io-stealing
- .emper-lockless-cq
- .emper-io-waitfree-stealing
test-io-stealing-pipe-no-completer:
extends:
- .test
- .emper-pipe-sleep-strategy
- .emper-no-completer
- .emper-io-stealing
test-io-stealing-pipe-no-completer-lockless:
extends:
- .test
- .emper-pipe-sleep-strategy
- .emper-no-completer
- .emper-io-stealing
- .emper-lockless-cq
smoke-test-locked-queue-rwlock:
extends:
- .fast-variant-check
variables:
EMPER_LOCKED_UNBOUNDED_QUEUE_IMPLEMENTATION: "rwlock"
smoke-test-locked-queue-shared-mutex:
extends:
- .fast-variant-check
variables:
EMPER_LOCKED_UNBOUNDED_QUEUE_IMPLEMENTATION: "shared_mutex"
smoke-test-locked-queue-boost-shared-mutex:
extends:
- .fast-variant-check
variables:
EMPER_LOCKED_UNBOUNDED_QUEUE_IMPLEMENTATION: "boost_shared_mutex"
smoke-test-locked-queue-boost-userspace-rcu:
extends:
- .fast-variant-check
variables:
EMPER_USERSPACE_RCU: "true"
# Only build the poller variants because sqpoll needs linux >= 5.15
# TODO: also test those variants if the CI uses linux >= 5.15
build-single-poller:
extends:
- .build
- .emper-single-poller
build-numa-poller:
extends:
- .build
- .emper-single-poller
build-each-poller:
extends:
- .build
- .emper-each-poller
continuation-stealing-locked:
extends:
- .test
- .ws-queue-scheduler-locked
variables:
EMPER_CONTINUATION_STEALING_MODE: 'locked'
continuation-stealing-madv-free:
extends:
- .test
variables:
EMPER_CONTINUATION_STEALING_MADVISE_STACK: 'free'
stack-guard-page:
extends:
- .test
variables:
EMPER_STACK_GUARD_PAGE: 'true'
build-only-emper-dep:
extends:
- .build
variables:
EMPER_BUILD_ONLY_EMPER_DEP: 'true'
GNU LESSER GENERAL PUBLIC LICENSE
Version 3, 29 June 2007
Copyright (C) 2007 Free Software Foundation, Inc. <https://fsf.org/>
Everyone is permitted to copy and distribute verbatim copies
of this license document, but changing it is not allowed.
This version of the GNU Lesser General Public License incorporates
the terms and conditions of version 3 of the GNU General Public
License, supplemented by the additional permissions listed below.
0. Additional Definitions.
As used herein, "this License" refers to version 3 of the GNU Lesser
General Public License, and the "GNU GPL" refers to version 3 of the GNU
General Public License.
"The Library" refers to a covered work governed by this License,
other than an Application or a Combined Work as defined below.
An "Application" is any work that makes use of an interface provided
by the Library, but which is not otherwise based on the Library.
Defining a subclass of a class defined by the Library is deemed a mode
of using an interface provided by the Library.
A "Combined Work" is a work produced by combining or linking an
Application with the Library. The particular version of the Library
with which the Combined Work was made is also called the "Linked
Version".
The "Minimal Corresponding Source" for a Combined Work means the
Corresponding Source for the Combined Work, excluding any source code
for portions of the Combined Work that, considered in isolation, are
based on the Application, and not on the Linked Version.
The "Corresponding Application Code" for a Combined Work means the
object code and/or source code for the Application, including any data
and utility programs needed for reproducing the Combined Work from the
Application, but excluding the System Libraries of the Combined Work.
1. Exception to Section 3 of the GNU GPL.
You may convey a covered work under sections 3 and 4 of this License
without being bound by section 3 of the GNU GPL.
2. Conveying Modified Versions.
If you modify a copy of the Library, and, in your modifications, a
facility refers to a function or data to be supplied by an Application
that uses the facility (other than as an argument passed when the
facility is invoked), then you may convey a copy of the modified
version:
a) under this License, provided that you make a good faith effort to
ensure that, in the event an Application does not supply the
function or data, the facility still operates, and performs
whatever part of its purpose remains meaningful, or
b) under the GNU GPL, with none of the additional permissions of
this License applicable to that copy.
3. Object Code Incorporating Material from Library Header Files.
The object code form of an Application may incorporate material from
a header file that is part of the Library. You may convey such object
code under terms of your choice, provided that, if the incorporated
material is not limited to numerical parameters, data structure
layouts and accessors, or small macros, inline functions and templates
(ten or fewer lines in length), you do both of the following:
a) Give prominent notice with each copy of the object code that the
Library is used in it and that the Library and its use are
covered by this License.
b) Accompany the object code with a copy of the GNU GPL and this license
document.
4. Combined Works.
You may convey a Combined Work under terms of your choice that,
taken together, effectively do not restrict modification of the
portions of the Library contained in the Combined Work and reverse
engineering for debugging such modifications, if you also do each of
the following:
a) Give prominent notice with each copy of the Combined Work that
the Library is used in it and that the Library and its use are
covered by this License.
b) Accompany the Combined Work with a copy of the GNU GPL and this license
document.
c) For a Combined Work that displays copyright notices during
execution, include the copyright notice for the Library among
these notices, as well as a reference directing the user to the
copies of the GNU GPL and this license document.
d) Do one of the following:
0) Convey the Minimal Corresponding Source under the terms of this
License, and the Corresponding Application Code in a form
suitable for, and under terms that permit, the user to
recombine or relink the Application with a modified version of
the Linked Version to produce a modified Combined Work, in the
manner specified by section 6 of the GNU GPL for conveying
Corresponding Source.
1) Use a suitable shared library mechanism for linking with the
Library. A suitable mechanism is one that (a) uses at run time
a copy of the Library already present on the user's computer
system, and (b) will operate properly with a modified version
of the Library that is interface-compatible with the Linked
Version.
e) Provide Installation Information, but only if you would otherwise
be required to provide such information under section 6 of the
GNU GPL, and only to the extent that such information is
necessary to install and execute a modified version of the
Combined Work produced by recombining or relinking the
Application with a modified version of the Linked Version. (If
you use option 4d0, the Installation Information must accompany
the Minimal Corresponding Source and Corresponding Application
Code. If you use option 4d1, you must provide the Installation
Information in the manner specified by section 6 of the GNU GPL
for conveying Corresponding Source.)
5. Combined Libraries.
You may place library facilities that are a work based on the
Library side by side in a single library together with other library
facilities that are not Applications and are not covered by this
License, and convey such a combined library under terms of your
choice, if you do both of the following:
a) Accompany the combined library with a copy of the same work based
on the Library, uncombined with any other library facilities,
conveyed under the terms of this License.
b) Give prominent notice with the combined library that part of it
is a work based on the Library, and explaining where to find the
accompanying uncombined form of the same work.
6. Revised Versions of the GNU Lesser General Public License.
The Free Software Foundation may publish revised and/or new versions
of the GNU Lesser General Public License from time to time. Such new
versions will be similar in spirit to the present version, but may
differ in detail to address new problems or concerns.
Each version is given a distinguishing version number. If the
Library as you received it specifies that a certain numbered version
of the GNU Lesser General Public License "or any later version"
applies to it, you have the option of following the terms and
conditions either of that published version or of any later version
published by the Free Software Foundation. If the Library as you
received it does not specify a version number of the GNU Lesser
General Public License, you may choose any version of the GNU Lesser
General Public License ever published by the Free Software Foundation.
If the Library as you received it specifies that a proxy can decide
whether future versions of the GNU Lesser General Public License shall
apply, that proxy's public statement of acceptance of any version is
permanent authorization for you to choose that version for the
Library.
GNU Lesser General Public License, version 3 or any later version.
See LGPL-3 for the full text of this license.
GNU LESSER GENERAL PUBLIC LICENSE
Version 3, 29 June 2007
Copyright (C) 2007 Free Software Foundation, Inc. <https://fsf.org/>
Everyone is permitted to copy and distribute verbatim copies
of this license document, but changing it is not allowed.
This version of the GNU Lesser General Public License incorporates
the terms and conditions of version 3 of the GNU General Public
License, supplemented by the additional permissions listed below.
0. Additional Definitions.
As used herein, "this License" refers to version 3 of the GNU Lesser
General Public License, and the "GNU GPL" refers to version 3 of the GNU
General Public License.
"The Library" refers to a covered work governed by this License,
other than an Application or a Combined Work as defined below.
An "Application" is any work that makes use of an interface provided
by the Library, but which is not otherwise based on the Library.
Defining a subclass of a class defined by the Library is deemed a mode
of using an interface provided by the Library.
A "Combined Work" is a work produced by combining or linking an
Application with the Library. The particular version of the Library
with which the Combined Work was made is also called the "Linked
Version".
The "Minimal Corresponding Source" for a Combined Work means the
Corresponding Source for the Combined Work, excluding any source code
for portions of the Combined Work that, considered in isolation, are
based on the Application, and not on the Linked Version.
The "Corresponding Application Code" for a Combined Work means the
object code and/or source code for the Application, including any data
and utility programs needed for reproducing the Combined Work from the
Application, but excluding the System Libraries of the Combined Work.
1. Exception to Section 3 of the GNU GPL.
You may convey a covered work under sections 3 and 4 of this License
without being bound by section 3 of the GNU GPL.
2. Conveying Modified Versions.
If you modify a copy of the Library, and, in your modifications, a
facility refers to a function or data to be supplied by an Application
that uses the facility (other than as an argument passed when the
facility is invoked), then you may convey a copy of the modified
version:
a) under this License, provided that you make a good faith effort to
ensure that, in the event an Application does not supply the
function or data, the facility still operates, and performs
whatever part of its purpose remains meaningful, or
b) under the GNU GPL, with none of the additional permissions of
this License applicable to that copy.
3. Object Code Incorporating Material from Library Header Files.
The object code form of an Application may incorporate material from
a header file that is part of the Library. You may convey such object
code under terms of your choice, provided that, if the incorporated
material is not limited to numerical parameters, data structure
layouts and accessors, or small macros, inline functions and templates
(ten or fewer lines in length), you do both of the following:
a) Give prominent notice with each copy of the object code that the
Library is used in it and that the Library and its use are
covered by this License.
b) Accompany the object code with a copy of the GNU GPL and this license
document.
4. Combined Works.
You may convey a Combined Work under terms of your choice that,
taken together, effectively do not restrict modification of the
portions of the Library contained in the Combined Work and reverse
engineering for debugging such modifications, if you also do each of
the following:
a) Give prominent notice with each copy of the Combined Work that
the Library is used in it and that the Library and its use are
covered by this License.
b) Accompany the Combined Work with a copy of the GNU GPL and this license
document.
c) For a Combined Work that displays copyright notices during
execution, include the copyright notice for the Library among
these notices, as well as a reference directing the user to the
copies of the GNU GPL and this license document.
d) Do one of the following:
0) Convey the Minimal Corresponding Source under the terms of this
License, and the Corresponding Application Code in a form
suitable for, and under terms that permit, the user to
recombine or relink the Application with a modified version of
the Linked Version to produce a modified Combined Work, in the
manner specified by section 6 of the GNU GPL for conveying
Corresponding Source.
1) Use a suitable shared library mechanism for linking with the
Library. A suitable mechanism is one that (a) uses at run time
a copy of the Library already present on the user's computer
system, and (b) will operate properly with a modified version
of the Library that is interface-compatible with the Linked
Version.
e) Provide Installation Information, but only if you would otherwise
be required to provide such information under section 6 of the
GNU GPL, and only to the extent that such information is
necessary to install and execute a modified version of the
Combined Work produced by recombining or relinking the
Application with a modified version of the Linked Version. (If
you use option 4d0, the Installation Information must accompany
the Minimal Corresponding Source and Corresponding Application
Code. If you use option 4d1, you must provide the Installation
Information in the manner specified by section 6 of the GNU GPL
for conveying Corresponding Source.)
5. Combined Libraries.
You may place library facilities that are a work based on the
Library side by side in a single library together with other library
facilities that are not Applications and are not covered by this
License, and convey such a combined library under terms of your
choice, if you do both of the following:
a) Accompany the combined library with a copy of the same work based
on the Library, uncombined with any other library facilities,
conveyed under the terms of this License.
b) Give prominent notice with the combined library that part of it
is a work based on the Library, and explaining where to find the
accompanying uncombined form of the same work.
6. Revised Versions of the GNU Lesser General Public License.
The Free Software Foundation may publish revised and/or new versions
of the GNU Lesser General Public License from time to time. Such new
versions will be similar in spirit to the present version, but may
differ in detail to address new problems or concerns.
Each version is given a distinguishing version number. If the
Library as you received it specifies that a certain numbered version
of the GNU Lesser General Public License "or any later version"
applies to it, you have the option of following the terms and
conditions either of that published version or of any later version
published by the Free Software Foundation. If the Library as you
received it does not specify a version number of the GNU Lesser
General Public License, you may choose any version of the GNU Lesser
General Public License ever published by the Free Software Foundation.
If the Library as you received it specifies that a proxy can decide
whether future versions of the GNU Lesser General Public License shall
apply, that proxy's public statement of acceptance of any version is
permanent authorization for you to choose that version for the
Library.
......@@ -15,8 +15,12 @@ all: build
export BUILDTYPE ?= debugoptimized
export BUILDDIR = build-$(BUILDTYPE)
NPROC := $(shell nproc)
JOBS := $(shell echo $$(( $(NPROC) + 6)))
LOAD := $(shell echo $$(( $(NPROC) * 2)))
NINJA_BIN ?= ninja
NINJA := $(NINJA_BIN) $(EXTRA_NINJA_ARGS)
NINJA := $(NINJA_BIN) -j $(JOBS) -l $(LOAD) $(EXTRA_NINJA_ARGS)
build:
[[ -L build ]] || ./tools/prepare-build-dir
......@@ -30,20 +34,103 @@ debug:
rm -f build
$(MAKE) build BUILDTYPE=$@
STATIC_ANALYSIS_NINJA_TARGETS += iwyu
libc++:
rm -f build
$(MAKE) build \
CC=clang CXX=clang++ \
EMPER_CPP_ARGS="-stdlib=libc++" \
EMPER_CPP_LINK_ARGS="-stdlib=libc++" \
EMPER_USE_BUNDLED_DEPS="always" \
BUILDDIR="build-libc++"
.PHONY: clang
clang:
rm -f build
$(MAKE) build \
CC=clang CXX=clang++ \
BUILDDIR="build-$@"
.PHONY: clang-release
clang-release:
rm -f build
$(MAKE) build \
CC=clang CXX=clang++ \
BUILDTYPE=release \
BUILDDIR="build-$@"
.PHONY: gcc-%
gcc-%: GCC_VER=$(subst gcc-,,$@)
gcc-%:
rm -f build
$(MAKE) build \
CC=gcc-$(GCC_VER) CXX=g++-$(GCC_VER) \
BUILDDIR="build-$@"
.PHONY: test-%
test-%: TEST_BUILD=$(subst test-,,$@)
test-%:
$(MAKE) $(TEST_BUILD)
meson test -C build-$(TEST_BUILD)
.PHONY: fibril-locked
fibril-locked:
rm -f build
$(MAKE) build \
EMPER_CONTINUATION_STEALING_MODE=locked \
EMPER_WS_QUEUE_SCHEDULER=locked \
EMPER_IO=false \
BUILDDIR="build-$@"
.PHONY: fibril-unmap
fibril-unmap:
rm -f build
$(MAKE) build \
EMPER_CONTINUATION_STEALING_MADVISE_STACK=free \
EMPER_IO=false \
BUILDDIR="build-$@"
.PHONY: stats
stats:
rm -f build
$(MAKE) build \
EMPER_LOG_LEVEL="Info" \
EMPER_STATS=true \
BUILDDIR="build-$@"
# Meson >= 0.52 will automatically generate a clang-tidy target if a
# .clang-tidy file is found.
# Source version check: https://stackoverflow.com/a/3732456/194894
ifeq ($(shell [ $(MESON_MINOR_VERSION) -ge 52 ] && echo true), true)
STATIC_ANALYSIS_NINJA_TARGETS += clang-tidy
else
$(warning old mesion version $(MESON_VERSION) detected, meson >= 0.52 required for clang-tidy)
endif
.PHONY: lto
lto:
rm -f build
$(MAKE) build \
BUILDTYPE=release \
EMPER_B_LTO=true \
BUILDDIR="build-$@"
.PHONY: asan
asan:
rm -f build
$(MAKE) build \
EMPER_LOG_LEVEL="Info" \
EMPER_B_SANITIZE=address \
BUILDDIR="build-$@"
.PHONY: asan-test
asan-test: asan
meson test -C build-asan
.PHONY: waitfd
waitfd:
rm -f build
$(MAKE) build \
BUILDTYPE=release \
BUILDDIR="build-$@" \
EMPER_WORKER_SLEEP_STRATEGY=waitfd
.PHONY: fast-static-analysis
fast-static-analysis: all check-format check-license doc
.PHONY: static-analysis
static-analysis: all check-format check-license doc
$(NINJA) -C build $(STATIC_ANALYSIS_NINJA_TARGETS)
static-analysis: fast-static-analysis iwyu tidy
.PHONY: smoke-test-suite
smoke-test-suite: all
......@@ -80,10 +167,19 @@ check-license:
format: all
$(NINJA) -C build clang-format
.PHONY: tidy
tidy: compile_commands_wo_subprojects/compile_commands.json
./tools/run-clang-tidy
PHONY: iwyu
iwyu: all
iwyu: compile_commands_wo_subprojects/compile_commands.json
$(NINJA) -C build $@
build/compile_commands.json: all
compile_commands_wo_subprojects/compile_commands.json: all build/compile_commands.json
./tools/gen-compile-commands-wo-subprojects
PHONY: fix-includes
fix-includes: all
./tools/fix-includes
......@@ -95,4 +191,11 @@ stresstest: test
# e.g. test-gcc.
.PHONY: gitlab-runner
gitlab-runner:
gitlab-runner exec docker smoke-test
gitlab-ci-local smoke-test-suite
.PHONY: subprojects
subprojects:
meson subprojects update --reset
.PHONY: external-deps
external-deps: subprojects
# EMPER
The Efficient Massive-Parallelism Execution Realm (EMPER) is a concurrency platform to develop and execute parallel applications.
EMPER's primary objective is to support the research of different aspects of parallel execution of a potentially massive amount of concurrent strands of execution on current and future many-core systems.
Those aspects include, for example, scheduling strategies, synchronization primitives, and ultimately the overall coordination of concurrent strands of execution concerning optimal resource usage, including locality and energy efficiency.
We believe that a concurrency platform can only achieve outstanding results if the runtime system and operating system cooperate closely.
Hence another research focus of EMPER is the interface between the user-space runtime system and the operating system.
For example, EMPER uses modern asynchronous system request techniques and a novel approach called "I/O-stealing" to dispatch the operating system's responses back into the user-space runtime system.
The research with EMPER follows a focused approach to demonstrate and prove the viability of researched concepts in a concrete implementation.
Furthermore, EMPER strives for correctness so that the results of evaluations involving EMPER are reliable.
While developed as part of academic activity, we develop EMPER to produce a usable end-product in mind, wherever possible and sensible.
To enable every efficient fork/join parallelism, the runtime system of EMPER implements the *wait-free* "Nowa" continuation-stealing approach as described in the IPDPS 2021 paper by Schmaus et al.
This implementation allows for very efficient fork/join parallelism for two reasons. First, continuation-stealing enables dynamic task parallelism: The concurrency expressed at the programming-language layer is only lifted into parallelism by the runtime system if there are available workers.
Secondly, the wait-free Nowa approach allows for scalability on systems with many cores.
```c++
emper_fibril auto fib(int n) -> int {
if (n < 2) return n;
Fibril fibril;
int a, b;
fibril.spawn(&a, fib, n - 1);
b = fib(n - 2);
fibril.sync();
return a + b;
}
```
## Scheduling
EMPER provides a modular architecture allowing for different scheduling strategies. Currently two scheduling strategies are implemented:
- Work stealing scheduling (WS)
- Locality aware and work stealing scheduling (LAWS)
# License
EMPER is licensed under the GNU Lesser General Public License, version 3 (or any later version). See the file 'LGPL-3' for the full text of this license.
# Acknowledgements
Nicolas Pfeiffer wrote the first prototypical implementation of continuation-stealing for EMPER.
Much of his code was re-used for the current implementation.
Florian Fischer wrote the EMPER interface for "pseudo-blocking system calls" based on Linux's io_uring.
We would also like to thank Jens Axboe and Pavel Begunkov for creating and constantly improving io_uring.
This work was partially funded by the Deutsche Forschungsgemeinschaft (DFG, German Research Foundation) – project number 146371743 – TRR 89 "Invasive Computing".
# Publications
[schmaus2021modern]
Schmaus, Florian, Florian Fischer, Timo Hönig, and Wolfgang Schröder-Preikschat.
Modern Concurrency Platforms Require Modern System-Call Techniques.
Tech. rep. CS-2021-02. Friedrich-Alexander Universität Erlangen-Nürnberg,
Technische Fakultät, Nov. 23, 2021. doi: 10.25593/issn.2191-5008/CS-2021-02.
url: https://www4.cs.fau.de/~flow/papers/schmaus2021modern.pdf
[schmaus2021nowa]
Schmaus, Florian, Nicolas Pfeiffer, Timo Hönig, Jörg Nolte, and Wolfgang Schröder-
Preikschat. “Nowa: A Wait-Free Continuation-Stealing Concurrency Platform”.
In: 2021 IEEE International Parallel and Distributed Processing Symposium (IPDPS).
May 2021, pp. 360–371. doi: 10.1109/IPDPS49936.2021.00044.
url: https://www4.cs.fau.de/~flow/papers/schmaus2021nowa.pdf
[pfeiffer2020cactus]
Pfeiffer, Nicolas. A Wait-Free Cactus Stack Implementation for a Microparalelism
Runtime. Master's thesis. MA-I4-2020-02. Mar. 2, 2020.
url: https://www4.cs.fau.de/~flow/papers/pfeiffer2020cactus.pdf
# Literature
> The dwarf sees farther than the giant, when he has the giant's shoulder to mount on.
- Samuel Taylor Coleridge, The Friend (1828)
EMPER uses concepts, ideas and algorithms from the following
publications. You will find the key of a publication, e.g.,
[chase2005dynamic] sometimes mentioned in EMPER's source code.
[chase2005dynamic]
Chase, David and Yossi Lev. “Dynamic Circular Work-Stealing Deque”. In: Pro-
ceedings of the Seventeenth Annual ACM Symposium on Parallelism in Algorithms
and Architectures. SPAA ’05. Las Vegas, Nevada, USA: Association
for Computing Machinery, 2005, pp. 21–28. isbn: 1581139861. doi: [10.1145/1073970.1073974](https://doi.org/10.1145/1073970.1073974).
[le2013correct]
Lê, Nhat Minh, Antoniu Pop, Albert Cohen, and Francesco Zappa Nardelli.
“Correct and Efficient Work-Stealing for Weak Memory Models”. In: Proceedings
of the 18th ACM SIGPLAN Symposium on Principles and Practice of
Parallel Programming. PPoPP ’13. Shenzhen, China: Association for Computing
Machinery, 2013, pp. 69–80. isbn: 9781450319225. doi: 10.1145/2442516.2442524.
url: https://hal.inria.fr/hal-00802885/document
[norris2013cdschecker]
Norris, Brian and Brian Demsky. “CDSchecker: Checking Concurrent Data Structures
Written with C/C++ Atomics”. In: Proceedings of the 2013 ACM SIGPLAN
International Conference on Object Oriented Programming Systems
Languages Applications. OOPSLA ’13. Indianapolis, Indiana, USA: Association
for Computing Machinery, 2013, pp. 131–150. isbn: 9781450323741.
doi: 10.1145/2509136.2509514.
url: http://plrg.eecs.uci.edu/publications/c11modelcheck.pdf
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2021 Florian Fischer
#include <sys/types.h>
#include <array>
#include <atomic>
#include <cstdlib>
#include <functional>
#include <iostream>
#include <mutex>
#include <string>
#include <vector>
#include "Common.hpp"
#include "Runtime.hpp"
#include "io.hpp"
#include "io/Future.hpp"
#include "io/IoContext.hpp"
std::string HOST = "::";
std::string PORT = "12346";
std::atomic<unsigned> ready;
std::mutex lock;
std::vector<int> conns;
void notify() {
std::vector<emper::io::SendFuture*> futures;
std::string msg = "Go";
for (int conn : conns) {
auto* sf = new emper::io::SendFuture(conn, msg.c_str(), msg.size(), 0);
futures.push_back(sf);
}
IoContext::getIo()->submit(futures.begin(), futures.end());
for (auto* f : futures) {
int res = f->wait();
if (res < 0) DIE_MSG_ERRNO("send failed");
delete f;
}
for (int conn : conns) emper::io::closeAndForget(conn);
}
// NOLINTNEXTLINE(bugprone-exception-escape)
auto main(int argc, char* argv[]) -> int {
if (argc != 2) {
std::cerr << "Usage: " << argv[0] << " <count>" << std::endl;
exit(EXIT_FAILURE);
}
const int scount = std::stoi(argv[1]);
if (scount < 0) DIE_MSG("count must be positiv");
const auto count = static_cast<unsigned>(scount);
std::cout << "Network barrier listening on " << HOST << ":" << PORT << " for " << count
<< " connections" << std::endl;
Runtime runtime;
auto coordinator_func = [&](int socket) {
{
std::lock_guard<std::mutex> l(lock);
conns.push_back(socket);
}
std::array<char, 16> buf;
ssize_t recv = emper::io::recvAndWait(socket, buf.data(), buf.size(), 0);
if (recv < 0) DIE_MSG_ERRNO("recv failed");
unsigned r = ready.fetch_add(1) + 1;
if (r == count) {
notify();
runtime.initiateTermination();
}
};
auto* listener = emper::io::tcp_listener(HOST, PORT, coordinator_func, scount,
{emper::io::SockOpt::ReusePort});
if (!listener) {
exit(EXIT_FAILURE);
}
runtime.scheduleFromAnywhere(*listener);
runtime.waitUntilFinished();
return EXIT_SUCCESS;
}
This diff is collapsed.
......@@ -3,26 +3,67 @@
#include <sys/socket.h>
#include <sys/types.h>
#include <atomic>
#include <cerrno>
#include <chrono>
#include <cstdlib>
#include <cstring>
#include <functional>
#include <iostream>
#include <random>
#include <string>
#include <vector>
#include "Common.hpp"
#include "Debug.hpp"
#include "Runtime.hpp"
#include "RuntimeBuilder.hpp"
#include "emper-common.h"
#include "emper-config.h"
#include "io.hpp"
const std::string HOST = "::";
const std::string PORT = "12345";
#ifdef EMPER_HAS_COMPARE_H
#include <compare>
#endif
static const std::string HOST = "::";
static const std::string PORT = "12345";
static const int BACKLOG = 1024;
static const size_t BUF_SIZE = 1024;
static unsigned int computations_us = 0;
static unsigned int max_computations_us = 0;
static float max_computations_probability = -1;
static std::atomic<bool> quit = false;
// NOLINTNEXTLINE(cert-msc32-c,cert-msc51-cpp)
static thread_local std::mt19937 randGenerator;
static auto getComputation() -> unsigned {
// fixed computation is computations_us
if (!max_computations_us) return computations_us;
// computation is in range [computations_us, max_computations_us]
if (max_computations_probability == -1) {
std::uniform_int_distribution<unsigned int> distribution(computations_us, max_computations_us);
return computations_us += distribution(randGenerator);
}
// computation is either computations_us or max_computations_us with probability
// max_computations_probability
std::uniform_real_distribution<float> distribution(0, 1);
float p = distribution(randGenerator);
return p >= max_computations_probability ? max_computations_us : computations_us;
}
auto main(int argc, char* argv[]) -> int {
std::string host = HOST;
std::string port = PORT;
if (argc > 2) {
std::cerr << "Usage: " << argv[0] << " [port]" << std::endl;
if (argc > 5) {
std::cerr << "Usage: " << argv[0] << " [port] [computations_us]"
<< " [max_computations_us] [max_computations_probability]" << std::endl;
exit(EXIT_FAILURE);
}
......@@ -30,36 +71,79 @@ auto main(int argc, char* argv[]) -> int {
port = std::string(argv[1]);
}
std::cout << "Echoserver listening on " << host << ":" << port << std::endl;
if (argc > 2) {
computations_us = std::stoi(argv[2]);
}
if (argc > 3) {
max_computations_us = std::stoi(argv[3]);
if (max_computations_us < computations_us)
DIE_MSG("max_computations_us must be bigger than computations_us");
}
Runtime runtime;
auto* listener = emper::io::tcp_listener(host, port, [](int socket) {
if (argc > 4) {
max_computations_probability = std::stof(argv[4]);
if (max_computations_probability < 0 || max_computations_probability > 1)
DIE_MSG("max_computations_probability must be in [0,1]");
}
std::cout << "Echoserver listening on " << host << ":" << port;
if (computations_us) {
std::cout << " with " << computations_us;
if (max_computations_us) std::cout << " - " << max_computations_us;
std::cout << " us computations";
}
std::cout << std::endl;
RuntimeBuilder runtimeBuilder;
if (max_computations_us) {
runtimeBuilder.newWorkerHook([](workerid_t id) { randGenerator.seed(id); });
}
auto runtime = runtimeBuilder.build();
auto serverFunc = [](int socket) {
// NOLINTNEXTLINE(modernize-avoid-c-arrays)
char buf[1024];
for (;;) {
char buf[BUF_SIZE];
while (!quit.load(std::memory_order_consume)) {
ssize_t bytes_recv = emper::io::recvAndWait(socket, buf, sizeof(buf), 0);
if (unlikely(bytes_recv <= 0)) {
// socket was shutdown
if (bytes_recv < 0) {
LOGE("server read failed:" << strerror(errno));
}
finish:
emper::io::closeAndForget(socket);
return;
break;
}
if (unlikely(bytes_recv == 5 && strncmp("quit\n", buf, bytes_recv) == 0)) {
exit(EXIT_SUCCESS);
quit = true;
Runtime::getRuntime()->initiateTermination();
break;
}
if (computations_us) {
unsigned int computation = getComputation();
const auto start = std::chrono::steady_clock::now();
const auto deadline = start + std::chrono::microseconds(computation);
// TODO: The suppressed linter error below may be a false positive
// reported by clang-tidy.
// NOLINTNEXTLINE(modernize-use-nullptr)
while (std::chrono::steady_clock::now() < deadline) {
}
}
ssize_t bytes_send = emper::io::sendAndWait(socket, buf, bytes_recv, MSG_NOSIGNAL, true);
if (unlikely(bytes_recv != bytes_send)) {
LOGE("server send failed: " << strerror(errno));
goto finish;
break;
}
}
});
emper::io::closeAndForget(socket);
};
auto* listener =
emper::io::tcp_listener(host, port, serverFunc, BACKLOG, {emper::io::SockOpt::ReusePort});
if (!listener) {
exit(EXIT_FAILURE);
......@@ -69,5 +153,5 @@ auto main(int argc, char* argv[]) -> int {
runtime.waitUntilFinished();
return EXIT_FAILURE;
return EXIT_SUCCESS;
}
......@@ -3,11 +3,14 @@
#include <sys/socket.h>
#include <array>
#include <atomic>
#include <cstdint>
#include <cstdlib>
#include <cstring>
#include <functional>
#include <iostream>
#include <string>
#include <vector>
#include "Common.hpp"
#include "Debug.hpp"
......@@ -20,67 +23,70 @@ using RecvFuture = emper::io::RecvFuture;
const std::string HOST = "::";
const std::string PORT = "12345";
static const int BACKLOG = 1024;
const size_t BUF_SIZE = 1024;
static std::atomic<bool> quit = false;
class Client {
public:
int sockfd;
size_t bytes_recv;
size_t bytes_send;
std::array<char, BUF_SIZE> buf;
Client(int socket) : sockfd(socket), bytes_recv(0), bytes_send(0) {}
void terminate() {
emper::io::closeAndForget(sockfd);
delete this;
}
void submitSend() {
SendFuture sf(sockfd, &buf[bytes_send], bytes_recv - bytes_send, MSG_NOSIGNAL);
sf.setCallback([this](int32_t bytes_send) { this->onSend(bytes_send); });
void submitSend(int32_t bytes_recv) {
SendFuture sf(sockfd, buf.data(), bytes_recv, MSG_NOSIGNAL);
sf.setCallback(
[this, bytes_recv](int32_t bytes_send) { this->onSend(bytes_send, bytes_recv); });
sf.submit();
}
void onSend(int32_t res) {
if (unlikely(res < 0)) {
LOGE("server send failed: " << strerror(-res));
emper::io::closeAndForget(sockfd);
void onSend(int32_t bytes_send, int32_t bytes_recv) {
if (unlikely(bytes_send != bytes_recv)) {
LOGE("server send failed: " << strerror(-bytes_send));
this->terminate();
return;
}
bytes_send += res;
// Send again
if (bytes_send < bytes_recv) {
submitSend();
return;
if (!quit.load(std::memory_order_consume)) {
submitRecv();
} else {
this->terminate();
}
submitRecv();
}
void submitRecv() {
bytes_send = 0;
RecvFuture rf(sockfd, buf.data(), BUF_SIZE, 0);
rf.setCallback([this](int32_t bytes_recv) { this->onRecv(bytes_recv); });
rf.submit();
}
void onRecv(int32_t res) {
if (unlikely(res <= 0)) {
void onRecv(int32_t bytes_recv) {
if (unlikely(bytes_recv <= 0)) {
// socket was shutdown
if (res < 0) {
LOGE("server read failed:" << strerror(-res));
if (bytes_recv < 0) {
LOGE("server read failed:" << strerror(-bytes_recv));
}
emper::io::closeAndForget(sockfd);
this->terminate();
return;
}
bytes_recv = res;
if (unlikely(bytes_recv == 5 && strncmp("quit\n", buf.data(), bytes_recv) == 0)) {
exit(EXIT_SUCCESS);
quit = true;
Runtime::getRuntime()->initiateTermination();
this->terminate();
return;
}
submitSend();
submitSend(bytes_recv);
}
public:
Client(int socket) : sockfd(socket) {}
void submitRecv() {
RecvFuture rf(sockfd, buf.data(), BUF_SIZE, 0);
rf.setCallback([this](int32_t bytes_recv) { this->onRecv(bytes_recv); });
rf.submit();
}
};
......@@ -100,10 +106,12 @@ auto main(int argc, char* argv[]) -> int {
std::cout << "Echoserver listening on " << host << ":" << port << std::endl;
Runtime runtime;
auto* listener = emper::io::tcp_listener(host, port, [](int socket) {
auto* client = new Client(socket);
client->submitRecv();
});
auto* listener = emper::io::tcp_listener(host, port,
[](int socket) {
auto* client = new Client(socket);
client->submitRecv();
},
BACKLOG, {emper::io::SockOpt::ReusePort});
if (!listener) {
exit(EXIT_FAILURE);
......
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2022 Florian Schmaus
#include <cstdlib>
#include "Emper.hpp"
#include "emper-common.h"
auto main(UNUSED_ARG int argc, UNUSED_ARG char* argv[]) -> int {
emper::printInfo();
return EXIT_SUCCESS;
}
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2020-2022 Florian Schmaus
#include <boost/program_options.hpp>
#include <cstdint>
#include <iostream> // for basic_ostream::operator<<
#include <string>
#include <thread>
#include "BinaryPrivateSemaphore.hpp" // for BPS
#include "CountingPrivateSemaphore.hpp" // for CPS
#include "Debug.hpp" // for DBG
#include "Fiber.hpp" // for Fiber
#include "PrivateSemaphore.hpp" // for PS
#include "Runtime.hpp" // for Runtime
#include "lib/sync/Semaphore.hpp"
namespace po = boost::program_options;
using fibParams = struct {
uint64_t n;
uint64_t* result;
PS* sem;
};
static void fib(void* voidParams) {
auto* params = static_cast<fibParams*>(voidParams);
uint64_t n = params->n;
auto* result = params->result;
PS* sem = params->sem;
if (n < 2) {
*result = n;
} else {
CPS newSem(2);
uint64_t a, b;
fibParams newParams1;
newParams1.n = n - 1;
newParams1.result = &a;
newParams1.sem = &newSem;
// Note that this is the inefficient spawn/sync variant, we
// usually would compute one previous fib number without spawning.
fibParams newParams2;
newParams2.n = n - 2;
newParams2.result = &b;
newParams2.sem = &newSem;
Fiber* f1 = Fiber::from(&fib, &newParams1);
Fiber* f2 = Fiber::from(&fib, &newParams2);
Runtime* runtime = Runtime::getRuntime();
runtime->schedule(*f1);
runtime->schedule(*f2);
DBG("fib: Calling wait for n=" << n);
newSem.wait();
*result = a + b;
}
DBG("fib: Calling signalAndExit for n=" << n);
sem->signalAndExit();
}
// NOLINTNEXTLINE(bugprone-exception-escape)
auto main(int argc, char* argv[]) -> int {
uint64_t fibNum = 12;
po::options_description desc("Allowed options");
// clang-format off
desc.add_options()
("help", "Show help")
("nthreads", po::value<unsigned int>()->default_value(std::thread::hardware_concurrency()), "Number of worker threads used by EMPER's runtime system")
("fibnum", po::value<uint64_t>(&fibNum)->default_value(fibNum), "The Fibonacci number to compute")
;
// clang-format on
// Make 'fibnum' a positional option.
po::positional_options_description pos_desc;
pos_desc.add("fibnum", -1);
// clang-format off
auto parse_result = po::command_line_parser(argc, argv)
.options(desc)
.positional(pos_desc)
.run()
;
// clang-format on
po::variables_map vm;
po::store(parse_result, vm);
po::notify(vm);
const unsigned nthreads = vm["nthreads"].as<unsigned int>();
std::cout << "Number of threads: " << nthreads << std::endl;
Runtime runtime(nthreads);
emper::lib::sync::Semaphore semaphore;
Fiber* fibFiber = Fiber::from([&] {
uint64_t result;
BPS sem;
fibParams params = {fibNum, &result, &sem};
fib(&params);
sem.wait();
std::cout << "fib(" << fibNum << ") = " << result << std::endl;
semaphore.notify();
});
runtime.scheduleFromAnywhere(*fibFiber);
semaphore.wait();
return 0;
}
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2024 Florian Schmaus
#include <array>
#include <boost/numeric/ublas/io.hpp>
#include <boost/numeric/ublas/matrix.hpp>
#include <boost/program_options.hpp>
#include <chrono>
#include <cstdint>
#include <cstdlib>
#include <exception>
#include <iostream>
#include <random>
#include <string>
#include <thread>
#include <type_traits>
#include <vector>
#include "Common.hpp"
#include "CountingPrivateSemaphore.hpp"
#include "Fiber.hpp"
#include "Runtime.hpp"
#include "Semaphore.hpp"
#include "emper.hpp"
namespace po = boost::program_options;
template <class T>
using matrix = boost::numeric::ublas::matrix<T>;
static auto sequentialMatMul(matrix<double> &a, matrix<double> &b) -> matrix<double> {
const auto m = a.size1(); // number of rows of a
const auto n = a.size2(); // number of columns of a
if (n != b.size1()) DIE_MSG("Invalid argument");
const auto l = b.size2(); // number of rows of b
matrix<double> res(m, l);
for (unsigned int i = 0; i < m; ++i) {
for (unsigned int j = 0; j < l; ++j) {
res(i, j) = 0;
for (unsigned int k = 0; k < n; ++k) {
res(i, j) = res(i, j) + a(i, k) * b(k, j);
}
}
}
return res;
}
static auto naiveParallelMatMul(matrix<double> &a, matrix<double> &b) -> matrix<double> {
const auto m = a.size1(); // number of rows of a
const auto n = a.size2(); // number of columns of a
if (n != b.size1()) DIE_MSG("Invalid argument");
const auto l = b.size2(); // number of rows of b
matrix<double> res(m, l);
CPS cps;
for (unsigned int i = 0; i < m; ++i) {
for (unsigned int j = 0; j < l; ++j) {
spawn(
[i, j, n, &a, &b, &res] {
res(i, j) = 0;
for (unsigned int k = 0; k < n; ++k) {
res(i, j) = res(i, j) + a(i, k) * b(k, j);
}
},
cps);
}
}
cps.wait();
return res;
}
static auto stupidParallelMatMul(matrix<double> &a, matrix<double> &b) -> matrix<double> {
const auto m = a.size1(); // number of rows of a
const auto n = a.size2(); // number of columns of a
if (n != b.size1()) DIE_MSG("Invalid argument");
const auto l = b.size2(); // number of rows of b
matrix<double> res(m, l);
CPS outerCps;
for (unsigned int i = 0; i < m; ++i) {
for (unsigned int j = 0; j < l; ++j) {
outerCps.incrementCounterByOne();
auto *innerCps = new CPS();
auto *sem = new emper::Semaphore(1);
auto *scalprod = new double;
for (unsigned int k = 0; k < n; ++k) {
spawn(
[i, j, k, sem, scalprod, &a, &b] {
double product = a(i, k) * b(k, j);
sem->acquire();
*scalprod += product;
sem->release();
},
*innerCps);
}
async([i, j, innerCps, sem, scalprod, &res, &outerCps] {
innerCps->wait();
res(i, j) = *scalprod;
outerCps.signal();
delete innerCps;
delete sem;
delete scalprod;
});
}
}
outerCps.wait();
return res;
}
template <class G>
static auto createRandomMatrix(unsigned int m_dim, unsigned int n_dim, float zero_probability, G &g)
-> matrix<double> {
double min = 0;
double max = 1'000'000'000'000'000;
std::uniform_real_distribution<> value_distribution(min, max);
std::bernoulli_distribution zero_distribution(zero_probability);
matrix<double> res(m_dim, n_dim);
for (unsigned int m = 0; m < m_dim; ++m) {
for (unsigned int n = 0; n < n_dim; ++n) {
double value;
if (zero_distribution(g))
value = 0;
else
value = value_distribution(g);
res(m, n) = value;
}
}
return res;
}
using ParMatMul = struct {
std::string name;
matrix<double> (*mul)(matrix<double> &a, matrix<double> &b);
};
namespace chrn = std::chrono;
using Clock = std::chrono::high_resolution_clock;
static auto matmul(const po::variables_map &vm) -> int {
const auto nthreads = vm["nthreads"].as<unsigned int>();
const auto seed = vm["seed"].as<std::uint_fast64_t>();
const auto m = vm["m"].as<unsigned int>();
const auto n = vm["n"].as<unsigned int>();
const auto l = vm["l"].as<unsigned int>();
const auto par_mat_muls = vm["parMatMuls"].as<unsigned int>();
const auto iters = vm["iters"].as<unsigned int>();
const auto zero_probability = vm["zeroProb"].as<float>();
const auto verify = vm["verify"].as<bool>();
Runtime runtime(nthreads);
std::mt19937_64 rng(seed);
std::vector<matrix<double>> a_mtxs, b_mtxs, seq_results;
for (unsigned int i = 0; i < par_mat_muls; ++i) {
auto a = createRandomMatrix(m, n, zero_probability, rng);
a_mtxs.push_back(a);
auto b = createRandomMatrix(n, l, zero_probability, rng);
b_mtxs.push_back(b);
auto seq_res = sequentialMatMul(a_mtxs[i], b_mtxs[i]);
seq_results.push_back(seq_res);
}
auto par_mat_mul_impls = std::to_array<ParMatMul>({
{"naive", naiveParallelMatMul},
{"stupid", stupidParallelMatMul},
});
int exitStatus = EXIT_SUCCESS;
Fiber *alphaFiber = Fiber::from([&] {
for (auto impl : par_mat_mul_impls) {
std::vector<chrn::milliseconds> durations(iters);
std::cout << "Start " << impl.name << " with " << iters << " iterations: " << std::flush;
for (unsigned int iter = 0; iter < iters; ++iter) {
std::vector<matrix<double>> results(par_mat_muls);
CPS cps(par_mat_muls);
chrn::time_point<Clock> start = Clock::now();
for (unsigned int i = 0; i < par_mat_muls; ++i) {
async([i, &cps, &impl, &results, &a_mtxs, &b_mtxs] {
auto a = a_mtxs[i];
auto b = b_mtxs[i];
auto res = impl.mul(a, b);
results[i] = res;
cps.signal();
});
}
cps.wait();
chrn::time_point<Clock> end = Clock::now();
std::cout << "." << std::flush;
auto duration = chrn::duration_cast<chrn::milliseconds>(end - start);
durations[iter] = duration;
if (!verify) continue;
for (unsigned int i = 0; i < par_mat_muls; ++i) {
if (boost::numeric::ublas::detail::equals(seq_results[i], results[i], 1.e-6, 0.))
continue;
// clang-format off
std::cerr
<< std::endl
<< "ERROR: Matrix " << i << " produced by " << impl.name << " incorrect" << std::endl
<< "Expected matrix: " << std::endl << seq_results[i] << std::endl
<< "Provided matrix: " << std::endl << results[i] << std::endl
;
// clang-format on
exitStatus = EXIT_FAILURE;
runtime.initiateTermination();
return;
}
}
std::cout << std::endl << "Implementation '" << impl.name << "' took ";
for (unsigned int i = 0; i < iters; ++i) {
std::cout << durations[i];
if (i + 1 != iters) std::cout << ", ";
}
std::cout << ";" << std::endl;
}
runtime.initiateTermination();
});
runtime.scheduleFromAnywhere(*alphaFiber);
runtime.waitUntilFinished();
return exitStatus;
}
auto main(int argc, char *argv[]) -> int {
po::options_description desc("Allowed options");
// clang-format off
desc.add_options()
("help", "Show help")
("nthreads", po::value<unsigned int>()->default_value(std::thread::hardware_concurrency()), "Number of worker threads used by EMPER's runtime system")
("seed", po::value<std::uint_fast64_t>()->default_value(20180922), "Initial seed of the random number generator")
("m", po::value<unsigned int>()->default_value(10), "Number of rows of the first matrix and of the result matrix")
("n", po::value<unsigned int>()->default_value(10), "Number of columns of the first matrix and of rows of the second matrix")
("l", po::value<unsigned int>()->default_value(10), "Number of columns of the second matrix and of the result matrix")
("parMatMuls", po::value<unsigned int>()->default_value(5), "Number of matrix multiplications to perform in parallel")
("iters", po::value<unsigned int>()->default_value(3), "Number of benchmark iterations")
("zeroProb", po::value<float>()->default_value(0.01), "Probaility of generating a zero when initializing the matrices")
("verify", po::value<bool>()->default_value(true), "Verify the result of the matrix multiplication")
;
// clang-format on
auto parse_result = po::command_line_parser(argc, argv).options(desc).run();
po::variables_map vm;
po::store(parse_result, vm);
po::notify(vm);
if (vm.count("help")) {
std::cout << desc << std::endl;
return EXIT_SUCCESS;
}
try {
return matmul(vm);
} catch (std::exception &e) {
std::cerr << e.what() << std::endl;
return EXIT_FAILURE;
}
}
......@@ -3,7 +3,6 @@
#include <chrono> // for milliseconds, operator+, hig...
#include <cstdlib> // for exit, EXIT_SUCCESS
#include <iostream> // for operator<<, basic_ostream, endl
#include <ratio> // for ratio
#include <string>
#include "CountingPrivateSemaphore.hpp" // for CPS
......@@ -33,8 +32,7 @@ static void letsGetBusy(std::chrono::duration<Rep, Period> duration) {
// TODO: The suppressed linter error below may be a false positive
// reported by clang-tidy.
// NOLINTNEXTLINE(modernize-use-nullptr)
while (std::chrono::high_resolution_clock::now() < deadline)
;
while (std::chrono::high_resolution_clock::now() < deadline);
}
static void alphaFiber() {
......
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2021 Florian Fischer
// Copyright © 2021-2022 Florian Fischer, Florian Schmaus
#include <fcntl.h>
#include <sys/types.h>
#include <unistd.h>
#include <array>
#include <cstdio>
#include <boost/program_options.hpp>
#include <cstdlib>
#include <cstring>
#include <exception>
#include <filesystem>
#include <functional>
#include <iostream>
#include <sstream>
#include <string>
#include "Common.hpp"
#include "CountingPrivateSemaphore.hpp"
#include "Fiber.hpp"
#include "Runtime.hpp"
#include "emper.hpp"
#include "Semaphore.hpp"
#include "StealingMode.hpp"
#include "io.hpp"
#include "lib/ShardedFileBuffer.hpp"
namespace fs = std::filesystem;
namespace po = boost::program_options;
#define EMPER_RIPGREP_BUFSIZE 4096
static constexpr size_t EMPER_RIPGREP_BUFSIZE = 4096;
const char* needle;
size_t needle_len;
static const char* needle;
static size_t needle_len;
static emper::Semaphore* concurrent_open;
static emper::Semaphore* max_fibers;
static emper::Semaphore* max_searching;
static emper::Semaphore* max_walking;
static emper::lib::ShardedFileBuffer* outBuf;
static enum emper::StealingMode stealingMode;
void search(const std::string& path) {
if (concurrent_open) concurrent_open->acquire();
int fd = emper::io::openAndWait(path.c_str(), O_RDONLY);
if (fd < 0) {
DIE_MSG_ERRNO("open failed");
if (unlikely(fd < 0)) {
DIE_MSG_ERRNO("open of " << path << " failed");
}
if (concurrent_open) concurrent_open->release();
std::array<char, EMPER_RIPGREP_BUFSIZE> buf;
size_t bytes_searched = 0;
ssize_t bytes_read = emper::io::readFileAndWait(fd, buf.data(), buf.size(), bytes_searched);
ssize_t bytes_read = emper::io::readFileAndWait(fd, buf.data(), buf.size(), 0);
while (bytes_read > 0) {
if (memmem(&buf[0], bytes_read, needle, needle_len)) {
printf("%s\n", path.c_str());
return;
if (memmem(buf.data(), bytes_read, needle, needle_len)) {
outBuf->getStream() << path << std::endl;
goto out;
}
bytes_searched += static_cast<size_t>(bytes_read);
bytes_read = emper::io::readFileAndWait(fd, buf.data(), buf.size(), -1);
}
if (bytes_read < 0) {
DIE_MSG_ERRNO("read failed");
if (unlikely(bytes_read < 0)) {
DIE_MSG_ERRNO("read of " << path << " failed");
}
out:
emper::io::closeAndForget(fd);
}
void walk_dir() {
CPS cps;
for (const auto& p : fs::recursive_directory_iterator(".")) {
if (p.is_regular_file()) {
spawn([=] { search(p.path()); }, cps);
}
outBuf = new emper::lib::ShardedFileBuffer(*Runtime::getRuntime(), STDOUT_FILENO);
// Only search in regular files
auto filter = [](const fs::directory_entry& dirent) { return dirent.is_regular_file(); };
// Search the file
auto fn = [](const fs::directory_entry& dirent) { search(dirent.path()); };
switch (stealingMode) {
case emper::StealingMode::child:
emper::io::recursive_directory_walk(".", filter, fn, max_searching, max_walking);
break;
case emper::StealingMode::continuation:
emper::io::fibril_recursive_directory_walk(".", filter, fn);
break;
}
cps.wait();
exit(EXIT_SUCCESS);
delete outBuf;
Runtime::getRuntime()->initiateTermination();
}
auto main(int argc, char* argv[]) -> int {
if (argc < 2) {
std::cerr << "Usage: " << argv[0] << " <needle>" << std::endl;
static auto fssearch(const po::variables_map& vm) -> int {
if (!vm.count("needle")) {
std::cerr << "No 'needle' specified" << std::endl;
return EXIT_FAILURE;
}
needle = argv[1];
needle_len = strlen(needle);
auto needleStr = vm["needle"].as<std::string>();
needle = needleStr.c_str();
needle_len = needleStr.length();
if (vm.count("concurrent-open")) {
unsigned int concurrentOpenCount = vm["concurrent-open"].as<unsigned int>();
concurrent_open = new emper::Semaphore(concurrentOpenCount);
}
if (vm.count("max-fibers")) {
unsigned int maxFibersCount = vm["max-fibers"].as<unsigned int>();
max_fibers = max_searching = max_walking = new emper::Semaphore(maxFibersCount);
} else {
if (vm.count("max-searchers")) {
unsigned int maxSearchersCount = vm["max-searchers"].as<unsigned int>();
max_searching = new emper::Semaphore(maxSearchersCount);
}
if (vm.count("max-walkers")) {
unsigned int maxWalkersCount = vm["max-walkers"].as<unsigned int>();
max_walking = new emper::Semaphore(maxWalkersCount);
}
}
Runtime runtime;
std::cerr << "Starting fsearch with stealingMode=" << stealingMode;
if (max_fibers)
std::cerr << " and " << max_fibers->getCount() << " fibers";
else {
if (max_searching) std::cerr << ", " << max_searching->getCount() << " searching fibers";
if (max_walking) std::cerr << " and " << max_walking->getCount() << " walking fibers";
}
std::cerr << std::endl;
auto* dirWalker = Fiber::from(walk_dir);
runtime.scheduleFromAnywhere(*dirWalker);
runtime.waitUntilFinished();
delete concurrent_open;
if (max_searching == max_walking)
delete max_fibers;
else {
delete max_searching;
delete max_walking;
}
return EXIT_SUCCESS;
}
auto main(int argc, char* argv[]) -> int {
po::options_description desc("Allowed options");
// clang-format off
desc.add_options()
("help", "Show help")
("needle", po::value<std::string>(), "The String to search for")
("max-fibers", po::value<unsigned int>(), "Maximum number of fibers")
("max-searchers", po::value<unsigned int>(), "Maximum number of file searching fibers")
("max-walkers", po::value<unsigned int>(), "Maximum number of directory walking fibers")
("concurrent-open", po::value<unsigned int>(), "Maximum number of fibers calling open")
("stealing-mode", po::value<enum emper::StealingMode>(&stealingMode)->default_value(emper::StealingMode::child), "Stealing mode to use, either 'child' or 'continuation'")
;
// clang-format on
// Make 'needle' a positional option.
po::positional_options_description pos_desc;
pos_desc.add("needle", -1);
// clang-format off
auto parse_result = po::command_line_parser(argc, argv)
.options(desc)
.positional(pos_desc)
.run()
;
// clang-format on
po::variables_map vm;
po::store(parse_result, vm);
po::notify(vm);
if (vm.count("help")) {
std::cout << desc << "\n";
return EXIT_SUCCESS;
}
try {
return fssearch(vm);
} catch (const std::exception& e) {
std::cerr << e.what();
return 1;
}
}
......@@ -76,7 +76,7 @@ class FileSearcher {
DIE_MSG_ERRNO("read failed");
}
if (memmem(&buf[0], bytes_read, needle, needle_len)) {
if (memmem(buf.data(), bytes_read, needle, needle_len)) {
printf("%s\n", path.c_str());
delete this;
return;
......
walk_dir_code = '''#include <filesystem>
int main() { auto it = std::filesystem::recursive_directory_iterator("."); }
'''
does_link = cpp_compiler.links(walk_dir_code, args: '--std=c++2a', name : 'walk_dir')
if does_link
if cpp_has_fs_recursive_directory_iterator and cpp_can_link_with_boost_program_options
fsearch_exe = executable(
'fsearch',
'fsearch.cpp',
dependencies: emper_dep,
dependencies: [emper_dep, boost_program_options_dep],
)
fsearch_callback_exe = executable(
......