Skip to content

Commit

Permalink
WIP map every goroutine to a new OS thread
Browse files Browse the repository at this point in the history
  • Loading branch information
aykevl committed Oct 28, 2024
1 parent 75f8cf0 commit 5555c98
Show file tree
Hide file tree
Showing 11 changed files with 889 additions and 2 deletions.
2 changes: 1 addition & 1 deletion compileopts/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
var (
validBuildModeOptions = []string{"default", "c-shared"}
validGCOptions = []string{"none", "leaking", "conservative", "custom", "precise"}
validSchedulerOptions = []string{"none", "tasks", "asyncify"}
validSchedulerOptions = []string{"none", "tasks", "asyncify", "threads"}
validSerialOptions = []string{"none", "uart", "usb", "rtt"}
validPrintSizeOptions = []string{"none", "short", "full"}
validPanicStrategyOptions = []string{"print", "trap"}
Expand Down
1 change: 1 addition & 0 deletions compileopts/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ func defaultTarget(options *Options) (*TargetSpec, error) {
spec.CFlags = append(spec.CFlags, "-mno-outline-atomics")
}
spec.ExtraFiles = append(spec.ExtraFiles,
"src/internal/task/task_threads.c",
"src/runtime/runtime_unix.c",
"src/runtime/signal.c")
case "windows":
Expand Down
32 changes: 32 additions & 0 deletions src/internal/llsync/semaphore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package llsync

// Barebones semaphore implementation.
// The main limitation is that if there are multiple waiters, a single Post()
// call won't do anything. Only when Post() has been called to awaken all
// waiters will the waiters proceed.
// This limitation is not a problem when there will only be a single waiter.
type Semaphore struct {
futex Futex
}

// Post (unlock) the semaphore, incrementing the value in the semaphore.
func (s *Semaphore) Post() {
newValue := s.futex.Add(1)
if newValue == 0 {
s.futex.WakeAll()
}
}

// Wait (lock) the semaphore, decrementing the value in the semaphore.
func (s *Semaphore) Wait() {
delta := int32(-1)
value := s.futex.Add(uint32(delta))
for {
if int32(value) >= 0 {
// Semaphore unlocked!
return
}
s.futex.Wait(value)
value = s.futex.Load()
}
}
20 changes: 20 additions & 0 deletions src/internal/task/linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
//go:build linux && !baremetal

package task

import "unsafe"

type pthread_mutex struct {
// 40 bytes on a 64-bit system, 24 bytes on a 32-bit system
state1 uint64
state2 [4]uintptr
}

// pthread_mutex_t and pthread_cond_t are both initialized to zero in
// PTHREAD_*_INITIALIZER.

type sem struct {
// 64 bytes on 64-bit systems, 32 bytes on 32-bit systems:
// volatile int __val[4*sizeof(long)/sizeof(int)];
state [4 * unsafe.Sizeof(uintptr(0)) / 4]int32
}
72 changes: 72 additions & 0 deletions src/internal/task/task_threads.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
//go:build none

#include <pthread.h>
#include <stdint.h>
#include <stdio.h>
#include <semaphore.h>

// Pointer to the current task.Task structure.
// Ideally the entire task.Task structure would be a thread-local variable but
// this also works.
static __thread void *current_task;

struct state_pass {
void *(*start)(void*);
void *args;
void *task;
sem_t startlock;
};

// Helper to start a goroutine while also storing the 'task' structure.
static void* start_wrapper(void *arg) {
struct state_pass *state = arg;
void *(*start)(void*) = state->start;
void *args = state->args;
current_task = state->task;
sem_post(&state->startlock);
start(args);
return NULL;
};

// Start a new goroutine in an OS thread.
int tinygo_task_start(uintptr_t fn, void *args, void *task, uint64_t id, void *context) {
struct state_pass state = {
.start = (void*)fn,
.args = args,
.task = task,
};
sem_init(&state.startlock, 0, 0);
pthread_t thread;
int result = pthread_create(&thread, NULL, &start_wrapper, &state);

// Wait until the thread has been crated and read all state_pass variables.
sem_wait(&state.startlock);

return result;
}

// Return the current task (for task.Current()).
void* tinygo_task_current(void) {
return current_task;
}

// Set the current task at startup.
void tinygo_task_set_current(void *task, void *context) {
current_task = task;
}

uintptr_t tinygo_mutex_size(void) {
return sizeof(pthread_mutex_t);
}

uintptr_t tinygo_mutex_align(void) {
return _Alignof(pthread_mutex_t);
}

uintptr_t tinygo_sem_size(void) {
return sizeof(sem_t);
}

uintptr_t tinygo_sem_align(void) {
return _Alignof(sem_t);
}
182 changes: 182 additions & 0 deletions src/internal/task/task_threads.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
//go:build scheduler.threads

package task

import (
"sync/atomic"
"unsafe"
)

// If true, print verbose debug logs.
const verbose = false

// Scheduler-specific state.
type state struct {
// Goroutine ID. The number here is not really significant and after a while
// it could wrap around. But it is useful for debugging.
id uint64

// Semaphore to pause/resume the thread atomically.
sem sem
}

// Goroutine counter, starting at 0 for the main goroutine.
var goroutineID uint64

var mainTask Task

func OnSystemStack() bool {
runtimePanic("todo: task.OnSystemStack")
return false
}

// Initialize the main goroutine state. Must be called by the runtime on
// startup, before starting any other goroutines.
func Init() {
// Sanity check. With ThinLTO, this should be getting optimized away.
if unsafe.Sizeof(pthread_mutex{}) != tinygo_mutex_size() {
panic("internal/task: unexpected sizeof(pthread_mutex_t)")
}
if unsafe.Alignof(pthread_mutex{}) != tinygo_mutex_align() {
panic("internal/task: unexpected _Alignof(pthread_mutex_t)")
}
if unsafe.Sizeof(sem{}) != tinygo_sem_size() {
panic("semaphore is an unexpected size!")
}
if unsafe.Alignof(sem{}) != tinygo_sem_align() {
panic("semaphore is an unexpected alignment!")
}

mainTask.init()
tinygo_task_set_current(&mainTask)
}

func (t *Task) init() {
sem_init(&t.state.sem, 0, 0)
}

// Return the task struct for the current thread.
func Current() *Task {
t := (*Task)(tinygo_task_current())
if t == nil {
runtimePanic("unknown current task")
}
return t
}

// Pause pauses the current task, until it is resumed by another task.
// It is possible that another task has called Resume() on the task before it
// hits Pause(), in which case the task won't be paused but continues
// immediately.
func Pause() {
// Wait until resumed
t := Current()
if verbose {
println("*** pause: ", t.state.id)
}
if sem_wait(&t.state.sem) != 0 {
runtimePanic("sem_wait error!")
}
}

// Resume the given task.
// It is legal to resume a task before it gets paused, it means that the next
// call to Pause() won't pause but will continue immediately. This happens in
// practice sometimes in channel operations, where the Resume() might get called
// between the channel unlock and the call to Pause().
func (t *Task) Resume() {
if verbose {
println("*** resume: ", t.state.id)
}
// Increment the semaphore counter.
// If the task is currently paused in sem_wait, it will resume.
// If the task is not yet paused, the next call to sem_wait will continue
// immediately.
if sem_post(&t.state.sem) != 0 {
runtimePanic("sem_post: error!")
}
}

// Start a new OS thread.
func start(fn uintptr, args unsafe.Pointer, stackSize uintptr) {
t := &Task{}
t.state.id = atomic.AddUint64(&goroutineID, 1)
if verbose {
println("*** start: ", t.state.id, "from", Current().state.id)
}
t.init()
errCode := tinygo_task_start(fn, args, t, t.state.id)
if errCode != 0 {
runtimePanic("could not start thread")
}
}

type AsyncLock struct {
// TODO: lock on macOS needs to be initialized with a magic value
pthread_mutex
}

func (l *pthread_mutex) Lock() {
errCode := pthread_mutex_lock(l)
if errCode != 0 {
runtimePanic("mutex Lock has error code")
}
}

func (l *pthread_mutex) TryLock() bool {
return pthread_mutex_trylock(l) == 0
}

func (l *pthread_mutex) Unlock() {
errCode := pthread_mutex_unlock(l)
if errCode != 0 {
runtimePanic("mutex Unlock has error code")
}
}

//go:linkname runtimePanic runtime.runtimePanic
func runtimePanic(msg string)

// Using //go:linkname instead of //export so that we don't tell the compiler
// that the 't' parameter won't escape (because it will).
//
//go:linkname tinygo_task_set_current tinygo_task_set_current
func tinygo_task_set_current(t *Task)

// Here same as for tinygo_task_set_current.
//
//go:linkname tinygo_task_start tinygo_task_start
func tinygo_task_start(fn uintptr, args unsafe.Pointer, t *Task, id uint64) int32

//export tinygo_task_current
func tinygo_task_current() unsafe.Pointer

//export tinygo_mutex_size
func tinygo_mutex_size() uintptr

//export tinygo_mutex_align
func tinygo_mutex_align() uintptr

//export pthread_mutex_lock
func pthread_mutex_lock(*pthread_mutex) int32

//export pthread_mutex_trylock
func pthread_mutex_trylock(*pthread_mutex) int32

//export pthread_mutex_unlock
func pthread_mutex_unlock(*pthread_mutex) int32

//export sem_init
func sem_init(s *sem, pshared int32, value uint32) int32

//export sem_wait
func sem_wait(*sem) int32

//export sem_post
func sem_post(*sem) int32

//export tinygo_sem_size
func tinygo_sem_size() uintptr

//export tinygo_sem_align
func tinygo_sem_align() uintptr
2 changes: 2 additions & 0 deletions src/runtime/chan.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//go:build !scheduler.threads

package runtime

// This file implements the 'chan' type and send/receive/select operations.
Expand Down
Loading

0 comments on commit 5555c98

Please sign in to comment.