Skip to content
This repository has been archived by the owner on Aug 31, 2022. It is now read-only.

Commit

Permalink
Merge branch 'threaded'
Browse files Browse the repository at this point in the history
  • Loading branch information
rhenium committed Jan 14, 2016
2 parents 8ab57c0 + 65ba412 commit 6f2e135
Show file tree
Hide file tree
Showing 9 changed files with 184 additions and 90 deletions.
2 changes: 2 additions & 0 deletions lib/plum/rack.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@
require "plum/rack/listener"
require "plum/rack/server"
require "plum/rack/session"
require "plum/rack/legacy_session"
require "plum/rack/thread_pool"
5 changes: 5 additions & 0 deletions lib/plum/rack/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def transform_options
config[:debug] = @options[:debug] unless @options[:debug].nil?
config[:server_push] = @options[:server_push] unless @options[:server_push].nil?
config[:threaded] = @options[:threaded] unless @options[:threaded].nil?
config[:threadpool_size] = @options[:threadpool_size] unless @options[:threadpool_size].nil?

if @options[:fallback_legacy]
h, p = @options[:fallback_legacy].split(":")
Expand Down Expand Up @@ -124,6 +125,10 @@ def setup_parser
@options[:threaded] = true
end

o.on "--threadpool-size SIZE", "Set the size of thread pool" do |arg|
@options[:threadpool_size] = arg.to_i
end

o.on "--fallback-legacy HOST:PORT", "Fallbacks if the client doesn't support HTTP/2" do |arg|
@options[:fallback_legacy] = arg
end
Expand Down
3 changes: 2 additions & 1 deletion lib/plum/rack/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ class Config
debug: false,
log: nil, # $stdout
server_push: true,
threaded: false
threaded: false,
threadpool_size: 20,
}.freeze

def initialize(config = {})
Expand Down
4 changes: 4 additions & 0 deletions lib/plum/rack/dsl.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ def threaded(bool)
@config[:threaded] = !!bool
end

def threadpool_size(int)
@config[:threadpool_size] = int.to_i
end

def fallback_legacy(str)
h, p = str.split(":")
@config[:fallback_legacy_host] = h
Expand Down
36 changes: 36 additions & 0 deletions lib/plum/rack/legacy_session.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# -*- frozen-string-literal: true -*-
using Plum::BinaryString

module Plum
module Rack
class LegacySession
def initialize(svc, e, sock)
@svc = svc
@e = e
@sock = sock
@config = svc.config
end

def run
if @config[:fallback_legacy_host]
@logger.info "legacy HTTP: fallbacking to: #{@config[:fallback_legacy_host]}:#{@config[:fallback_legacy_port]}"
upstream = TCPSocket.open(@config[:fallback_legacy_host], @config[:fallback_legacy_port])
upstream.write(@e.buf) if @e.buf
loop do
ret = IO.select([@sock, upstream])
ret[0].each { |s|
a = s.readpartial(65536)
if s == upstream
@sock.write(a)
else
upstream.write(a)
end
}
end
end
ensure
upstream.close if upstream
end
end
end
end
65 changes: 57 additions & 8 deletions lib/plum/rack/listener.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ def to_io
raise "not implemented"
end

def accept(svc)
raise "not implemented"
end

def method_missing(name, *args)
@server.__send__(name, *args)
end
Expand All @@ -24,8 +28,24 @@ def to_io
@server.to_io
end

def plum(sock)
::Plum::HTTPServerConnection.new(sock.method(:write))
def accept(svc)
sock = @server.accept
Thread.start {
begin
plum = ::Plum::HTTPServerConnection.new(sock.method(:write))
sess = Session.new(svc, sock, plum)
sess.run
rescue Errno::ECONNRESET, EOFError # closed
rescue ::Plum::LegacyHTTPError => e
@logger.info "legacy HTTP client: #{e}"
sess = LegacySession.new(svc, e, sock)
sess.run
rescue => e
svc.log_exception(e)
ensure
sock.close
end
}
end
end

Expand Down Expand Up @@ -57,7 +77,7 @@ def initialize(lc)
}
tcp_server = ::TCPServer.new(lc[:hostname], lc[:port])
@server = OpenSSL::SSL::SSLServer.new(tcp_server, ctx)
@server.start_immediately = false
@server.start_immediately = false # call socket#accept twice: [tcp, tls]
end

def parse_chained_cert(str)
Expand All @@ -68,9 +88,26 @@ def to_io
@server.to_io
end

def plum(sock)
raise ::Plum::LegacyHTTPError.new("client didn't offer h2 with ALPN", nil) unless sock.alpn_protocol == "h2"
::Plum::ServerConnection.new(sock.method(:write))
def accept(svc)
sock = @server.accept
Thread.start {
begin
sock = sock.accept
raise ::Plum::LegacyHTTPError.new("client didn't offer h2 with ALPN", nil) unless sock.alpn_protocol == "h2"
plum = ::Plum::ServerConnection.new(sock.method(:write))
sess = Session.new(svc, sock, plum)
sess.run
rescue Errno::ECONNRESET, EOFError # closed
rescue ::Plum::LegacyHTTPError => e
@logger.info "legacy HTTP client: #{e}"
sess = LegacySession.new(svc, e, sock)
sess.run
rescue => e
svc.log_exception(e)
ensure
sock.close if sock
end
}
end

private
Expand Down Expand Up @@ -126,8 +163,20 @@ def to_io
@server.to_io
end

def plum(sock)
::Plum::ServerConnection.new(sock.method(:write))
def accept(svc)
sock = @server.accept
Thread.start {
begin
plum = ::Plum::ServerConnection.new(sock.method(:write))
sess = Session.new(svc, sock, plum)
sess.run
rescue Errno::ECONNRESET, EOFError # closed
rescue => e
svc.log_exception(e)
ensure
sock.close if sock
end
}
end
end
end
Expand Down
85 changes: 18 additions & 67 deletions lib/plum/rack/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,15 @@
module Plum
module Rack
class Server
attr_reader :config
attr_reader :config, :app, :logger, :threadpool

def initialize(app, config)
@config = config
@state = :null
@app = config[:debug] ? ::Rack::CommonLogger.new(app) : app
@logger = Logger.new(config[:log] || $stdout).tap { |l|
l.level = config[:debug] ? Logger::DEBUG : Logger::INFO
}
@listeners = config[:listeners].map { |lc|
lc[:listener].new(lc)
}
@logger = Logger.new(config[:log] || $stdout).tap { |l| l.level = config[:debug] ? Logger::DEBUG : Logger::INFO }
@listeners = config[:listeners].map { |lc| lc[:listener].new(lc) }
@threadpool = ThreadPool.new(@config[:threadpool_size]) if @config[:threaded]

@logger.info("Plum #{::Plum::VERSION}")
@logger.info("Config: #{config}")
Expand All @@ -24,87 +21,41 @@ def initialize(app, config)
end

def start
#trap(:INT) { @state = :ee }
#require "lineprof"
#Lineprof.profile(//){
@state = :running
while @state == :running
break if @listeners.empty?
while @state == :running && !@listeners.empty?
begin
if ss = IO.select(@listeners, nil, nil, 2.0)
ss[0].each { |svr|
new_con(svr)
begin
svr.accept(self)
rescue Errno::ECONNRESET, Errno::ECONNABORTED # closed
rescue => e
log_exception(e)
end
}
end
rescue Errno::EBADF, Errno::ENOTSOCK, IOError => e # closed
rescue StandardError => e
rescue Errno::EBADF # closed
rescue => e
log_exception(e)
end
end
#}
end

def stop
@state = :stop
@listeners.map(&:stop)
# TODO: gracefully shutdown connections
# TODO: gracefully shutdown connections (wait threadpool?)
end

private
def new_con(svr)
sock = svr.accept
Thread.new {
begin
begin
sock = sock.accept if sock.respond_to?(:accept)
plum = svr.plum(sock)

con = Session.new(app: @app,
plum: plum,
sock: sock,
logger: @logger,
config: @config,
remote_addr: sock.peeraddr.last)
con.run
rescue ::Plum::LegacyHTTPError => e
@logger.info "legacy HTTP client: #{e}"
handle_legacy(e, sock)
end
rescue Errno::ECONNRESET, Errno::EPROTO, Errno::EINVAL, EOFError => e # closed
rescue StandardError => e
log_exception(e)
ensure
sock.close if sock
end
}
rescue Errno::ECONNRESET, Errno::ECONNABORTED, Errno::EPROTO, Errno::EINVAL => e # closed
sock.close if sock
rescue StandardError => e
log_exception(e)
sock.close if sock
end

def log_exception(e)
@logger.error("#{e.class}: #{e.message}\n#{e.backtrace.map { |b| "\t#{b}" }.join("\n")}")
end

def handle_legacy(e, sock)
if @config[:fallback_legacy_host]
@logger.info "legacy HTTP: fallbacking to: #{@config[:fallback_legacy_host]}:#{@config[:fallback_legacy_port]}"
upstream = TCPSocket.open(@config[:fallback_legacy_host], @config[:fallback_legacy_port])
upstream.write(e.buf) if e.buf
loop do
ret = IO.select([sock, upstream])
ret[0].each { |s|
a = s.readpartial(65536)
if s == upstream
sock.write(a)
else
upstream.write(a)
end
}
end
end
ensure
upstream.close if upstream
end

def drop_privileges
begin
user = @config[:user]
Expand Down
39 changes: 25 additions & 14 deletions lib/plum/rack/session.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ module Rack
class Session
attr_reader :app, :plum

def initialize(app:, plum:, sock:, logger:, config:, remote_addr: "127.0.0.1")
@app = app
@plum = plum
def initialize(svc, sock, plum)
@svc = svc
@app = svc.app
@sock = sock
@logger = logger
@config = config
@remote_addr = remote_addr
@request_thread = {} # used if threaded
@plum = plum
@logger = svc.logger
@config = svc.config
@remote_addr = sock.peeraddr.last
@threadpool = svc.threadpool

setup_plum
end
Expand All @@ -24,12 +25,15 @@ def stop
@plum.close
end

def to_io
@sock.to_io
end

def run
while !@sock.closed? && !@sock.eof?
@plum << @sock.readpartial(1024)
end
ensure
@request_thread.each { |stream, thread| thread.kill }
stop
end

Expand Down Expand Up @@ -57,12 +61,21 @@ def setup_plum
}

@plum.on(:end_stream) { |stream|
if @config[:threaded]
@request_thread[stream] = Thread.new {
handle_request(stream, reqs[stream][:headers], reqs[stream][:data])
req = reqs.delete(stream)
err = proc { |err|
stream.send_headers({ ":status" => 500 }, end_stream: true)
@logger.error(err)
}
if @threadpool
@threadpool.acquire(err) {
handle_request(stream, req[:headers], req[:data])
}
else
handle_request(stream, reqs[stream][:headers], reqs[stream][:data])
begin
handle_request(stream, req[:headers], req[:data])
rescue
err.call($!)
end
end
}
end
Expand Down Expand Up @@ -138,8 +151,6 @@ def handle_request(stream, headers, data)
send_body(st, p_body) unless pno_body
}
end

@request_thread.delete(stream)
end

def new_env(h, data)
Expand Down
Loading

0 comments on commit 6f2e135

Please sign in to comment.