package Mojo::IOLoop; use Mojo::Base 'Mojo::EventEmitter'; # "Professor: Amy, technology isn't intrinsically good or evil. It's how it's # used. Like the death ray." use Carp 'croak'; use Mojo::IOLoop::Client; use Mojo::IOLoop::Delay; use Mojo::IOLoop::Server; use Mojo::IOLoop::Stream; use Mojo::IOLoop::Subprocess; use Mojo::Reactor::Poll; use Mojo::Util qw(md5_sum steady_time); use Scalar::Util qw(blessed weaken); use constant DEBUG => $ENV{MOJO_IOLOOP_DEBUG} || 0; has max_accepts => 0; has max_connections => 1000; has reactor => sub { my $class = Mojo::Reactor::Poll->detect; warn "-- Reactor initialized ($class)\n" if DEBUG; return $class->new->catch(sub { warn "@{[blessed $_[0]]}: $_[1]" }); }; # Ignore PIPE signal $SIG{PIPE} = 'IGNORE'; # Initialize singleton reactor early __PACKAGE__->singleton->reactor; sub acceptor { my ($self, $acceptor) = (_instance(shift), @_); # Find acceptor for id return $self->{acceptors}{$acceptor} unless ref $acceptor; # Connect acceptor with reactor $self->{acceptors}{my $id = $self->_id} = $acceptor; weaken $acceptor->reactor($self->reactor)->{reactor}; # Allow new acceptor to get picked up $self->_not_accepting->_maybe_accepting; return $id; } sub client { my ($self, $cb) = (_instance(shift), pop); my $id = $self->_id; my $client = $self->{out}{$id}{client} = Mojo::IOLoop::Client->new; weaken $client->reactor($self->reactor)->{reactor}; weaken $self; $client->on( connect => sub { delete $self->{out}{$id}{client}; my $stream = Mojo::IOLoop::Stream->new(pop); $self->_stream($stream => $id); $self->$cb(undef, $stream); } ); $client->on(error => sub { $self->_remove($id); $self->$cb(pop, undef) }); $client->connect(@_); return $id; } sub delay { my $delay = Mojo::IOLoop::Delay->new; weaken $delay->ioloop(_instance(shift))->{ioloop}; return @_ ? $delay->steps(@_) : $delay; } sub is_running { _instance(shift)->reactor->is_running } sub next_tick { my ($self, $cb) = (_instance(shift), @_); weaken $self; return $self->reactor->next_tick(sub { $self->$cb }); } sub one_tick { my $self = _instance(shift); croak 'Mojo::IOLoop already running' if $self->is_running; $self->reactor->one_tick; } sub recurring { shift->_timer(recurring => @_) } sub remove { my ($self, $id) = (_instance(shift), @_); my $c = $self->{in}{$id} || $self->{out}{$id}; if ($c && (my $stream = $c->{stream})) { return $stream->close_gracefully } $self->_remove($id); } sub reset { my $self = _instance(shift); delete @$self{qw(accepting acceptors events in out stop)}; $self->reactor->reset; $self->stop; } sub server { my ($self, $cb) = (_instance(shift), pop); my $server = Mojo::IOLoop::Server->new; weaken $self; $server->on( accept => sub { my $stream = Mojo::IOLoop::Stream->new(pop); $self->$cb($stream, $self->_stream($stream, $self->_id, 1)); # Enforce connection limit (randomize to improve load balancing) if (my $max = $self->max_accepts) { $self->{accepts} //= $max - int rand $max / 2; $self->stop_gracefully if ($self->{accepts} -= 1) <= 0; } # Stop accepting if connection limit has been reached $self->_not_accepting if $self->_limit; } ); $server->listen(@_); return $self->acceptor($server); } sub singleton { state $loop = shift->SUPER::new } sub start { my $self = _instance(shift); croak 'Mojo::IOLoop already running' if $self->is_running; $self->reactor->start; } sub stop { _instance(shift)->reactor->stop } sub stop_gracefully { my $self = _instance(shift)->_not_accepting; ++$self->{stop} and !$self->emit('finish')->_in and $self->stop; } sub stream { my ($self, $stream) = (_instance(shift), @_); return $self->_stream($stream => $self->_id) if ref $stream; my $c = $self->{in}{$stream} || $self->{out}{$stream} || {}; return $c->{stream}; } sub subprocess { my $subprocess = Mojo::IOLoop::Subprocess->new; weaken $subprocess->ioloop(_instance(shift))->{ioloop}; return @_ ? $subprocess->run(@_) : $subprocess; } sub timer { shift->_timer(timer => @_) } sub _id { my $self = shift; my $id; do { $id = md5_sum 'c' . steady_time . rand } while $self->{in}{$id} || $self->{out}{$id} || $self->{acceptors}{$id}; return $id; } sub _in { scalar keys %{shift->{in} || {}} } sub _instance { ref $_[0] ? $_[0] : $_[0]->singleton } sub _limit { $_[0]{stop} ? 1 : $_[0]->_in >= $_[0]->max_connections } sub _maybe_accepting { my $self = shift; return if $self->{accepting} || $self->_limit; $_->start for values %{$self->{acceptors} || {}}; $self->{accepting} = 1; } sub _not_accepting { my $self = shift; return $self unless delete $self->{accepting}; $_->stop for values %{$self->{acceptors} || {}}; return $self; } sub _out { scalar keys %{shift->{out} || {}} } sub _remove { my ($self, $id) = @_; # Timer return unless my $reactor = $self->reactor; return if $reactor->remove($id); # Acceptor return $self->_not_accepting->_maybe_accepting if delete $self->{acceptors}{$id}; # Connection return unless delete $self->{in}{$id} || delete $self->{out}{$id}; return $self->stop if $self->{stop} && !$self->_in; $self->_maybe_accepting; warn "-- $id <<< $$ (@{[$self->_in]}:@{[$self->_out]})\n" if DEBUG; } sub _stream { my ($self, $stream, $id, $server) = @_; # Connect stream with reactor $self->{$server ? 'in' : 'out'}{$id}{stream} = $stream; warn "-- $id >>> $$ (@{[$self->_in]}:@{[$self->_out]})\n" if DEBUG; weaken $stream->reactor($self->reactor)->{reactor}; weaken $self; $stream->on(close => sub { $self && $self->_remove($id) }); $stream->start; return $id; } sub _timer { my ($self, $method, $after, $cb) = (_instance(shift), @_); weaken $self; return $self->reactor->$method($after => sub { $self->$cb }); } 1; =encoding utf8 =head1 NAME Mojo::IOLoop - Minimalistic event loop =head1 SYNOPSIS use Mojo::IOLoop; # Listen on port 3000 Mojo::IOLoop->server({port => 3000} => sub { my ($loop, $stream) = @_; $stream->on(read => sub { my ($stream, $bytes) = @_; # Process input chunk say $bytes; # Write response $stream->write('HTTP/1.1 200 OK'); }); }); # Connect to port 3000 my $id = Mojo::IOLoop->client({port => 3000} => sub { my ($loop, $err, $stream) = @_; $stream->on(read => sub { my ($stream, $bytes) = @_; # Process input say "Input: $bytes"; }); # Write request $stream->write("GET / HTTP/1.1\x0d\x0a\x0d\x0a"); }); # Add a timer Mojo::IOLoop->timer(5 => sub { my $loop = shift; $loop->remove($id); }); # Start event loop if necessary Mojo::IOLoop->start unless Mojo::IOLoop->is_running; =head1 DESCRIPTION L is a very minimalistic event loop based on L, it has been reduced to the absolute minimal feature set required to build solid and scalable non-blocking clients and servers. Depending on operating system, the default per-process and system-wide file descriptor limits are often very low and need to be tuned for better scalability. The C environment variable should also be used to select the best possible L backend, which usually defaults to the not very scalable C