6 #include <zypp-core/zyppng/base/EventDispatcher> 10 #include <sys/socket.h> 18 : _writeBuffer(
std::move(writeBuffer) )
74 const auto oldState =
state();
76 if ( oldState == newState )
121 auto wbOld = std::move( std::get<ConnectedState>(
_state)._writeBuffer );
134 z_func()->IODevice::close();
147 auto doDelayedConnect = [
this, &
state ](){
148 if ( !
state._connectNotifier ) {
153 if ( !
state._connectTimeout ) {
157 state._connectNotifier.reset();
158 state._connectTimeout.reset();
161 state._connectTimeout->setSingleShot(
true );
162 state._connectTimeout->start( 30000 );
170 if (
_targetAddr->nativeSockAddr()->sa_family == AF_UNIX ) {
172 return doDelayedConnect();
181 return doDelayedConnect();
212 if ( bytesToRead == 0 ) {
218 char *buf = readBuf.reserve( bytesToRead );
219 const auto bytesRead = z_func()->readData( 0, buf, bytesToRead );
221 if ( bytesRead <= 0 ) {
222 readBuf.chop( bytesToRead );
224 switch ( bytesRead ) {
243 if ( bytesToRead > bytesRead )
244 readBuf.chop( bytesToRead-bytesRead );
253 return std::visit( [
this](
auto &s ){
254 using T = std::decay_t<decltype (s)>;
255 if constexpr ( std::is_same_v<T, ConnectedState> || std::is_same_v<T, ClosingState> ) {
256 const auto nwrite = s._writeBuffer.frontSize();
263 const auto nBuf = s._writeBuffer.front();
265 if ( written == -1 ) {
271 #if EAGAIN != EWOULDBLOCK 285 s._writeBuffer.discard( written );
287 if ( s._writeBuffer.size() == 0 )
343 std::visit( [
this, &ev ] (
const auto &currState ) {
344 using T = std::decay_t<decltype(currState)>;
345 if constexpr ( std::is_same<ConnectingState, T>() ) {
347 if ( this->
_targetAddr->nativeSockAddr()->sa_family == AF_UNIX ) {
355 socklen_t errSize =
sizeof ( err );
356 ::getsockopt(
_socket, SOL_SOCKET, SO_ERROR, &err, &errSize );
358 if ( err == 0 || err == EISCONN ) {
361 if ( err == EINPROGRESS || err == EAGAIN || err == EALREADY )
369 }
else if constexpr ( std::is_same<ConnectedState, T>() ) {
386 }
else if constexpr ( std::is_same<ClosingState, T>() ) {
395 if ( currState._writeBuffer.size() == 0 ) {
400 }
else if constexpr ( std::is_same<ListeningState, T>() ) {
406 DBG <<
"Unexpected state on socket activation" << std::endl;
413 return std::visit([](
const auto &s ) constexpr {
return s.type(); },
_state );
421 sptr->d_func()->_socket = fd;
424 if ( !sptr->setBlocking(
false ) ) {
425 DBG <<
"Failed to unblock socket." << std::endl;
429 if( sptr->d_func()->transition(
state ) )
443 if ( channel != 0 ) {
444 constexpr std::string_view msg(
"Socket does not support multiple read channels");
445 ERR << msg << std::endl;
446 throw std::logic_error( msg.data() );
448 return d_func()->rawBytesAvailable();
458 return Ptr(
new Socket( domain, type, protocol ) );
464 if ( !addr || !d->initSocket() )
467 int res = ::bind( d->_socket, addr->nativeSockAddr(), addr->size() );
468 if ( res >= 0)
return true;
510 if ( !d->initSocket() )
513 int res = ::listen( d->_socket, backlog );
539 if ( d->_socket == -1 )
543 const auto res =
eintrSafeCall( ::accept4, d->_socket, (
struct sockaddr*)
nullptr, (socklen_t *)
nullptr, SOCK_CLOEXEC );
546 #if EAGAIN != EWOULDBLOCK 566 socklen_t optlen =
sizeof(domain);
567 int res = getsockopt( fd, SOL_SOCKET, SO_DOMAIN, &domain, &optlen );
569 DBG <<
"Error querying socket domain: " <<
strerr_cxx() << std::endl;
575 optlen =
sizeof(protocol);
576 res = getsockopt( fd, SOL_SOCKET, SO_PROTOCOL, &protocol, &optlen );
578 DBG <<
"Error querying socket protocol: " <<
strerr_cxx() << std::endl;
584 optlen =
sizeof(type);
585 res = getsockopt( fd, SOL_SOCKET, SO_TYPE, &type, &optlen );
587 DBG <<
"Error querying socket type: " <<
strerr_cxx() << std::endl;
599 if ( !d->initSocket() )
602 const int oldFlags = fcntl( d->_socket, F_GETFL, 0 );
603 if (oldFlags == -1)
return false;
605 const int flags =
set ? ( oldFlags & ~(O_NONBLOCK) ) : ( oldFlags | O_NONBLOCK );
608 if ( flags == oldFlags )
611 if ( fcntl( d->_socket, F_SETFL, flags ) != 0) {
624 if ( !d->initSocket() )
627 d->_targetAddr = std::move(addr);
645 auto sock = d->_socket;
660 d->transition( ClosedState );
671 std::visit([&d](
const auto &s ){
672 using Type = std::decay_t<decltype (s)>;
673 if constexpr ( std::is_same_v<Type, SocketPrivate::ConnectedState > ) {
675 if ( s._writeBuffer.size() ) {
688 if ( d->state() != SocketState::ConnectedState )
691 auto &s = std::get<SocketPrivate::ConnectedState>( d->_state );
694 if ( s._writeBuffer.size() ) {
695 s._writeBuffer.append( data, count );
697 d->writePendingData();
701 auto written =
eintrSafeCall( ::send, d->_socket, data, count, MSG_NOSIGNAL );
702 if ( written == -1 ) {
704 #if EAGAIN != EWOULDBLOCK 723 if ( written >= 0 ) {
724 if ( written < count ) {
726 s._writeBuffer.append( data + written, count - written );
730 d->_sigBytesWritten.emit( written );
733 if ( s._writeBuffer.size() == 0 )
734 d->_sigAllBytesWritten.emit();
748 d->onSocketActivated( rEvents );
761 bool canContinue =
true;
762 bool bufferEmpty =
false;
764 while ( canContinue && !bufferEmpty ) {
769 std::visit([&](
const auto &s ){
770 using T = std::decay_t<decltype (s)>;
771 if constexpr ( std::is_same_v<T, SocketPrivate::ConnectedState> || std::is_same_v<T, SocketPrivate::ClosingState> ) {
772 if ( s._writeBuffer.size() > 0 ) {
777 d->onSocketActivated( rEvents );
780 if ( s._writeBuffer.size() == 0 ){
800 d->onSocketActivated( rEvents );
806 return bytesAvailable() > 0;
811 if ( channel != 0 ) {
812 constexpr std::string_view msg(
"Socket does not support multiple read channels");
813 ERR << msg << std::endl;
814 throw std::logic_error( msg.data() );
818 if ( d->state() != SocketState::ConnectedState )
825 d->setError( ConnectionClosedByRemote,
"The remote host closed the connection",
false );
827 }
else if (
read < 0 ) {
829 #if EAGAIN != EWOULDBLOCK 836 d->setError( UnknownSocketError,
strerr_cxx( errno ),
false );
846 if ( channel != 0 ) {
847 constexpr std::string_view msg(
"Changing the readChannel on a Socket is not supported");
848 ERR << msg << std::endl;
849 throw std::logic_error( msg.data() );
856 return std::visit([&](
const auto &s ) -> int64_t {
857 using T = std::decay_t<decltype (s)>;
858 if constexpr ( std::is_same_v<T, SocketPrivate::ConnectedState> || std::is_same_v<T, SocketPrivate::ClosingState> ) {
859 return s._writeBuffer.size();
867 return d_func()->state();
872 return d_func()->_incomingConnection;
877 return d_func()->_connected;
882 return d_func()->_disconnected;
887 return d_func()->_sigError;
std::shared_ptr< SockAddr > _targetAddr
static Ptr create(int domain, int type, int protocol)
bool listen(int backlog=50)
Signal< void(Socket::SocketError)> _sigError
bool setBlocking(const bool set=true)
Socket::SocketState state() const
bool bind(const std::shared_ptr< SockAddr > &addr)
SignalProxy< void()> sigIncomingConnection()
SocketNotifier::Ptr _socketNotifier
static Ptr create(int socket, int evTypes, bool enable=true)
SocketNotifier::Ptr _socketNotifier
Signal< void()> _sigAllBytesWritten
SignalProxy< void(Timer &t)> sigExpired()
This signal is always emitted when the timer expires.
std::shared_ptr< Socket > Ptr
static bool waitForFdEvent(const int fd, int events, int &revents, int &timeout)
bool waitForAllBytesWritten(int timeout=-1)
void onSocketActivated(int ev)
bool readRawBytesToBuffer()
Signal< void()> _connected
SignalProxy< void()> sigConnected()
SocketNotifier::Ptr _socketNotifier
ClosingState(IOBuffer &&writeBuffer)
Signal< void()> _readyRead
int64_t rawBytesAvailable() const
int64_t readData(uint channel, char *buffer, int64_t bufsize) override
int64_t bytesPending() const
void readChannelChanged(uint channel) override
static std::shared_ptr< Timer > create()
Creates a new Timer object, the timer is not started at this point.
std::variant< InitialState, ConnectingState, ConnectedState, ListeningState, ClosingState, ClosedState > _state
static Socket::Ptr wrapSocket(int fd, int domain, int type, int protocol, Socket::SocketState state)
SocketState state() const
std::string strerr_cxx(const int err=-1)
static Ptr fromSocket(int fd, SocketState state)
int64_t writeData(const char *data, int64_t count) override
void setError(Socket::SocketError error, std::string &&err, bool emit=true)
auto eintrSafeCall(Fun &&function, Args &&... args)
bool connect(std::shared_ptr< SockAddr > addr)
ZYPP_IMPL_PRIVATE(UnixSignalSource)
std::map< std::string, std::string > read(const Pathname &_path)
Read sysconfig file path_r and return (key,valye) pairs.
SignalProxy< void(Socket::SocketError)> sigError()
bool waitForConnected(int timeout=-1)
Socket::SocketError _error
SignalProxy< void(const SocketNotifier &sock, int evTypes)> sigActivated()
Signal< void()> _incomingConnection
bool transition(Socket::SocketState newState)
int64_t rawBytesAvailable(uint channel=0) const override
Signal< void()> _disconnected
SocketError lastError() const
Signal< void(uint)> _channelReadyRead
std::vector< IOBuffer > _readChannels
Signal< void(int64_t)> _sigBytesWritten
bool handleConnectError(int error)
int64_t bytesAvailableOnFD(int fd)
bool waitForReadyRead(uint channel, int timeout=-1) override
SignalProxy< void()> sigDisconnected()
std::shared_ptr< Base > Ptr
void onSocketActivatedSlot(const SocketNotifier &, int ev)