Skip to content

Commit

Permalink
support SharedMutex.
Browse files Browse the repository at this point in the history
  • Loading branch information
liyichao committed Dec 2, 2022
1 parent 988fcd0 commit 080a018
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 0 deletions.
71 changes: 71 additions & 0 deletions src/bthread/shared_mutex.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "shared_mutex.h"
#include "butex.h"

namespace bthread {
SharedMutex::SharedMutex(): _reader_count(0), _reader_wait(0) {
_writer_butex = butex_create_checked<uint32_t>();
*_writer_butex = 0;
_reader_butex = butex_create_checked<uint32_t>();
*_reader_butex = 0;
}

void SharedMutex::lock_shared() {
if (_reader_count.fetch_add(1) < 0) {
butex_wait(_reader_butex, 0, nullptr);
}
}

void SharedMutex::unlock_shared() {
int32_t r = _reader_count.fetch_add(-1);
if (r < 0) {
unlock_shared_slow(r);
}
}

void SharedMutex::unlock_shared_slow(int32_t r) {
if (r == 0 || r == -max_readers) {
throw std::system_error(std::error_code(static_cast<int>(std::errc::operation_not_permitted),
std::system_category()),
"unlock of unlocked SharedMutex");
}
if (_reader_wait.fetch_add(-1) == 1) {
butex_wake(_writer_butex);
}
}

void SharedMutex::lock() {
_w.lock();
int32_t r = _reader_count.fetch_add(-max_readers);
if (r != 0 && _reader_wait.fetch_add(r) + r != 0) {
butex_wait(_writer_butex, 0, nullptr);
}
}

void SharedMutex::unlock() {
int32_t r = _reader_count.fetch_add(max_readers);
for(int32_t i = 0; i < r; i++) {
butex_wake(_reader_butex);
}
_w.unlock();
}
}



47 changes: 47 additions & 0 deletions src/bthread/shared_mutex.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#ifndef BTHREAD_SHARED_MUTEX_H
#define BTHREAD_SHARED_MUTEX_H
#include "mutex.h"

namespace bthread {

// compatible with c++17 std::shared_mutex, migration from golang
// see https://github.com/golang/go/blob/master/src/sync/rwmutex.go
class SharedMutex {
public:
SharedMutex();
void lock_shared();
void unlock_shared();
void lock();
void unlock();

private:
DISALLOW_COPY_AND_ASSIGN(SharedMutex);
void unlock_shared_slow(int32_t r);

static constexpr int32_t max_readers = 1 << 30;
Mutex _w;
uint32_t* _writer_butex;
uint32_t* _reader_butex;
butil::atomic<int32_t> _reader_count;
butil::atomic<int32_t> _reader_wait;
};
}

#endif //BTHREAD_SHARED_MUTEX_H

0 comments on commit 080a018

Please sign in to comment.