Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make P3Client thread safe for all major operations #46

Merged
merged 1 commit into from
Jan 9, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 75 additions & 55 deletions P3/P3Client.class.st
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ Class {
'settings',
'properties',
'converter',
'message'
'message',
'access'
],
#category : #'P3-Core'
}
Expand Down Expand Up @@ -114,6 +115,13 @@ P3Client >> authenticationTypes [
10 -> #SASL } asDictionary
]

{ #category : #'initialize-release' }
P3Client >> beThreadSafe [
"Configure me so that I can be safely used from multiple threads/processes during important operations"

access := Monitor new
]

{ #category : #private }
P3Client >> clearSSL [
settings removeKey: #ssl ifAbsent: [ ]
Expand Down Expand Up @@ -148,12 +156,13 @@ P3Client >> close [
P3Client >> connect [
"Connect me to a PostgreSQL database.
Run the authentication and startup protocols. Configure the session."

self
ensureOpen;
connectInternal;
clearSSL;
logConnected

self critical: [
self
ensureOpen;
connectInternal;
clearSSL;
logConnected ]
]

{ #category : #protocol }
Expand All @@ -172,14 +181,14 @@ P3Client >> connectInternal [
P3Client >> connectSSL [
"Connect me to a PostgreSQL database over an encrypted SSL connection.
Run the authentication and startup protocols. Configure the session."

self
ensureOpen;
upgradeToSSL;
connectInternal;
setSSL;
logConnected

self critical: [
self
ensureOpen;
upgradeToSSL;
connectInternal;
setSSL;
logConnected ]
]

{ #category : #accessing }
Expand Down Expand Up @@ -216,6 +225,13 @@ P3Client >> createPreparedStatementNamed: name withParameters: parameterDescript
^ preparedStatement
]

{ #category : #private }
P3Client >> critical: block [
^ access
ifNil: block
ifNotNil: [ access critical: block ]
]

{ #category : #accessing }
P3Client >> database [
"Return the database name I (want to) connect to.
Expand Down Expand Up @@ -374,6 +390,8 @@ P3Client >> initialize [

settings := IdentityDictionary new.
properties := Dictionary new.

self beThreadSafe
]

{ #category : #private }
Expand Down Expand Up @@ -646,41 +664,40 @@ P3Client >> prepare: query named: queryName [

| parameterDescriptions rowDescriptions statement |

self
ensureConnected;
writeParseMessage: query name: queryName types: #();
writeDescribeMessage: queryName type: $S;
writeSyncMessage.
self critical: [
self
ensureConnected;
writeParseMessage: query name: queryName types: #();
writeDescribeMessage: queryName type: $S;
writeSyncMessage.

self readMessage tag = $1
ifFalse: [ ^ P3Error parseCompleteExpected signal ].
self readMessage tag = $1
ifFalse: [ ^ P3Error parseCompleteExpected signal ].

self readMessage.
message tag = $t
ifTrue: [ parameterDescriptions := self processParameterDescription: message readStream ]
ifFalse: [
parameterDescriptions := #().
message tag = $n
ifFalse: [ P3Error noDataExpected signal ] ].
self readMessage tag = $t
ifTrue: [ parameterDescriptions := self processParameterDescription: message readStream ]
ifFalse: [
parameterDescriptions := #().
message tag = $n
ifFalse: [ P3Error noDataExpected signal ] ].

self readMessage.
message tag = $T
ifTrue: [ rowDescriptions := self processRowDescription: message readStream ]
ifFalse: [
rowDescriptions := #().
message tag = $n
ifFalse: [ P3Error noDataExpected signal ] ].
self readMessage tag = $T
ifTrue: [ rowDescriptions := self processRowDescription: message readStream ]
ifFalse: [
rowDescriptions := #().
message tag = $n
ifFalse: [ P3Error noDataExpected signal ] ].

self readMessage tag = $Z
ifFalse: [ ^ P3Error readyForQueryExpected signal ].
self readMessage tag = $Z
ifFalse: [ ^ P3Error readyForQueryExpected signal ].

statement := self
createPreparedStatementNamed: queryName
withParameters: parameterDescriptions
andFields: rowDescriptions.
statement := self
createPreparedStatementNamed: queryName
withParameters: parameterDescriptions
andFields: rowDescriptions.

self logPreparedStatement: statement query: query.
self logPreparedStatement: statement query: query ].

^ statement
]

Expand Down Expand Up @@ -775,11 +792,12 @@ P3Client >> query: query [
Descriptions is a collection of row field description objects, if any.
Data is a collection of rows with fully converted field values as objects, if any."

^ self withConnection: [
self
ensureConnected;
writeQueryMessage: query;
runQueryResult ]
^ self critical: [
self withConnection: [
self
ensureConnected;
writeQueryMessage: query;
runQueryResult ] ]
]

{ #category : #accessing }
Expand Down Expand Up @@ -901,13 +919,15 @@ P3Client >> runExtendedQueryResults: fieldDescriptions [

| results queryStarted |

queryStarted := Time millisecondClockValue.
self readMessage.
results := Array streamContents: [ :out |
[
properties at: #query_started ifAbsentPut: queryStarted. "each result will use the same start"
out nextPut: (self runExtendedQueryResult: fieldDescriptions).
self readMessage tag = $Z ] whileFalse ].
self critical: [
queryStarted := Time millisecondClockValue.
self readMessage.
results := Array streamContents: [ :out |
[
"each result will use the same start"
properties at: #query_started ifAbsentPut: queryStarted.
out nextPut: (self runExtendedQueryResult: fieldDescriptions).
self readMessage tag = $Z ] whileFalse ] ].

^ results
]
Expand Down