Fix connection issues and add enhancement to the Dashboard (#130)

* Fix connection errors when doing DNS lookup
* Fix connection handling when an error occurs
* Add remote logging feature
* Add algo variant to dashboard
This commit is contained in:
Ben Gräf 2018-06-12 09:15:04 +02:00 committed by GitHub
parent 872fce72b5
commit b379f21cb3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 377 additions and 62 deletions

View file

@ -29,9 +29,9 @@ class BoostConnection : public Connection, public std::enable_shared_from_this<B
public:
BoostConnection(const ConnectionListener::Ptr& listener)
: Connection(listener)
, socket_(ioService_)
, m_resolver(m_ioService)
, m_socket(m_ioService)
{
}
~BoostConnection()
@ -43,17 +43,28 @@ public:
{
LOG_DEBUG("[%s:%d] Connecting", server.c_str(), port);
boost::asio::ip::tcp::resolver resolver(ioService_);
boost::asio::ip::tcp::resolver::query query(server, std::to_string(port));
boost::asio::ip::tcp::resolver::iterator iterator = resolver.resolve(query);
socket_.connect(iterator,
boost::bind(&BoostConnection::handleConnect, this->shared_from_this(),
boost::asio::placeholders::error));
m_resolver.async_resolve(query,
boost::bind(&BoostConnection::handleResolve, this->shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::iterator));
std::thread([this]() { m_ioService.run(); }).detach();
}
void handleResolve(const boost::system::error_code& error,
boost::asio::ip::tcp::resolver::iterator endpointIterator)
{
if (!error) {
boost::asio::ip::tcp::endpoint endpoint = *endpointIterator;
std::thread([this]() { ioService_.run(); }).detach();
LOG_DEBUG("[%s:%d] DNS resolved ", endpoint.address().to_string().c_str(), endpoint.port());
m_socket.connect(endpointIterator, boost::bind(&BoostConnection::handleConnect, this->shared_from_this(),
boost::asio::placeholders::error));
} else {
notifyError(std::string("[DNS resolve] ") + error.message());
}
}
void handleConnect(const boost::system::error_code& error)
@ -63,7 +74,7 @@ public:
LOG_DEBUG("[%s:%d] Connected", getConnectedIp().c_str(), getConnectedPort());
notifyConnected();
} else {
notifyError(error.message());
notifyError(std::string("[Connect] ") + error.message());
}
}
@ -73,36 +84,36 @@ public:
LOG_DEBUG("[%s:%d] Disconnecting", getConnectedIp().c_str(), getConnectedPort());
boost::system::error_code ec;
socket_.get().lowest_layer().shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
socket_.get().lowest_layer().close();
m_socket.get().lowest_layer().shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
m_socket.get().lowest_layer().close();
}
ioService_.stop();
ioService_.reset();
m_ioService.stop();
m_ioService.reset();
}
bool isConnected() const override
{
boost::system::error_code ec;
socket_.get().lowest_layer().remote_endpoint(ec);
return !ec && socket_.get().lowest_layer().is_open();
m_socket.get().lowest_layer().remote_endpoint(ec);
return !ec && m_socket.get().lowest_layer().is_open();
}
std::string getConnectedIp() const override
{
return isConnected() ? socket_.get().lowest_layer().remote_endpoint().address().to_string() : "";
return isConnected() ? m_socket.get().lowest_layer().remote_endpoint().address().to_string() : "";
}
uint16_t getConnectedPort() const override
{
return isConnected() ? socket_.get().lowest_layer().remote_endpoint().port() : 0;
return isConnected() ? m_socket.get().lowest_layer().remote_endpoint().port() : 0;
}
void send(const char* data, std::size_t size) override
{
LOG_DEBUG("[%s:%d] Sending: %.*s", getConnectedIp().c_str(), getConnectedPort(), size, data);
boost::asio::async_write(socket_.get(),
boost::asio::async_write(m_socket.get(),
boost::asio::buffer(data, size),
boost::bind(&BoostConnection::handleWrite, this->shared_from_this(),
boost::asio::placeholders::error,
@ -114,13 +125,13 @@ public:
{
if (error) {
LOG_DEBUG_ERR("[%s:%d] Sending failed: %s", getConnectedIp().c_str(), getConnectedPort(), error.message().c_str());
notifyError(error.message());
notifyError(std::string("[Send] ") + error.message());
}
}
void startReading()
{
boost::asio::async_read(socket_.get(),
boost::asio::async_read(m_socket.get(),
boost::asio::buffer(receiveBuffer_, sizeof(receiveBuffer_)),
boost::asio::transfer_at_least(1),
boost::bind(&BoostConnection::handleRead, this->shared_from_this(),
@ -137,13 +148,14 @@ public:
startReading();
} else {
LOG_DEBUG_ERR("[%s:%d] Read failed: %s", getConnectedIp().c_str(), getConnectedPort(), error.message().c_str());
notifyError(error.message());
notifyError(std::string("[Read] ") + error.message());
}
}
private:
boost::asio::io_service ioService_;
SOCKET socket_;
boost::asio::io_service m_ioService;
boost::asio::ip::tcp::resolver m_resolver;
SOCKET m_socket;
char receiveBuffer_[2048];
};

View file

@ -81,11 +81,11 @@ Connection::Ptr establishConnection(const ConnectionListener::Ptr& listener,
connection->connect(host, port);
}
catch (...) {
LOG_ERR("[%s:%d] Failed to establish connection: %s", host.c_str(), port, boost::current_exception_diagnostic_information().c_str());
if (connection) {
connection->disconnect();
}
connection->notifyError(std::string("[EstablishConnection] ") + boost::current_exception_diagnostic_information());
}

View file

@ -49,9 +49,6 @@ public:
protected:
Connection(const ConnectionListener::Ptr& listener);
virtual ~Connection() {};
void notifyConnected();
void notifyRead(char* data, size_t size);
void notifyError(const std::string& error);
public:
virtual void connect(const std::string& server, uint16_t port) = 0;
@ -61,6 +58,10 @@ public:
virtual uint16_t getConnectedPort() const = 0;
virtual void send(const char* data, std::size_t size) = 0;
void notifyConnected();
void notifyRead(char* data, size_t size);
void notifyError(const std::string& error);
private:
ConnectionListener::WeakPtr listener_;
};

View file

@ -177,6 +177,7 @@ void Network::setJob(Client *client, const Job &job)
LOG_INFO("new job from %s:%d with diff %d and PoW %s", client->host(), client->port(), job.diff(), getPowVariantName(job.powVariant()).c_str());
}
m_state.powVariant = job.powVariant();
m_state.diff = job.diff();
Workers::setJob(job);
}