Reimplement whole stratum network communication with boost::asio (#90)

Logger is now thread safe
This commit is contained in:
Ben Gräf 2018-04-14 19:55:13 +02:00 committed by GitHub
parent 15d752d9e0
commit df084acff6
40 changed files with 915 additions and 1942 deletions

View file

@ -5,6 +5,7 @@
* Copyright 2014-2016 Wolf9466 <https://github.com/OhGodAPet>
* Copyright 2016 Jay D Dee <jayddee246@gmail.com>
* Copyright 2016-2017 XMRig <support@xmrig.com>
* Copyright 2018 Sebastian Stolzenberg <https://github.com/sebastianstolzenberg>
* Copyright 2018- BenDr0id <ben@graef.in>
*
*
@ -32,19 +33,12 @@
#include "interfaces/IClientListener.h"
#include "log/Log.h"
#include "net/Client.h"
#include "net/JobResult.h"
#include "rapidjson/document.h"
#include "rapidjson/error/en.h"
#include "rapidjson/stringbuffer.h"
#include "rapidjson/writer.h"
#ifdef XMRIG_PROXY_PROJECT
# include "proxy/JobResult.h"
#else
# include "net/JobResult.h"
#endif
#ifdef _MSC_VER
# define strncasecmp(x,y,z) _strnicmp(x,y,z)
#endif
@ -54,54 +48,65 @@ int64_t Client::m_sequence = 1;
Client::Client(int id, const char *agent, IClientListener *listener) :
m_quiet(false),
m_nicehash(false),
m_agent(agent),
m_listener(listener),
m_id(id),
m_retryPause(5000),
m_failures(0),
m_jobs(0),
m_recvBufPos(0),
m_expire(0)
m_quiet(false),
m_nicehash(false),
m_agent(agent),
m_listener(listener),
m_id(id),
m_retryPause(5000),
m_failures(0),
m_jobs(0),
m_recvBufPos(0),
m_expire(0)
{
m_recvBuf.base = m_buf;
m_recvBuf.len = sizeof(m_buf);
# ifndef XMRIG_PROXY_PROJECT
m_keepAliveTimer.data = this;
uv_timer_init(uv_default_loop(), &m_keepAliveTimer);
# endif
uv_mutex_init(&m_mutex);
uv_async_init(uv_default_loop(), &onConnectedAsync, Client::onConnected);
uv_async_init(uv_default_loop(), &onReceivedAsync, Client::onReceived);
uv_async_init(uv_default_loop(), &onErrorAsync, Client::onError);
}
Client::~Client()
{
if (m_net) {
net_free(m_net);
}
uv_close((uv_handle_t*) &onConnectedAsync, NULL);
uv_close((uv_handle_t*) &onReceivedAsync, NULL);
uv_close((uv_handle_t*) &onErrorAsync, NULL);
uv_mutex_destroy(&m_mutex);
}
/**
* @brief Connect to server.
*
* @param url
*/
void Client::connect(const Url *url)
{
LOG_DEBUG("connect %s", url);
setUrl(url);
connect();
}
void Client::connect()
{
LOG_DEBUG("connect");
m_connection = establishConnection(shared_from_this(),
m_url.useTls() ? CONNECTION_TYPE_TLS : CONNECTION_TYPE_TCP,
m_url.host(), m_url.port());
}
void Client::disconnect()
{
LOG_DEBUG("Client::disconnect");
LOG_DEBUG("disconnect");
# ifndef XMRIG_PROXY_PROJECT
uv_timer_stop(&m_keepAliveTimer);
# endif
m_expire = 0;
m_failures = -1;
@ -112,6 +117,8 @@ void Client::disconnect()
void Client::setUrl(const Url *url)
{
LOG_DEBUG("setUrl");
if (!url || !url->isValid()) {
return;
}
@ -126,12 +133,15 @@ void Client::tick(uint64_t now)
return;
}
if (m_net) {
LOG_DEBUG("tick expired");
if (m_connection) {
LOG_WARN("[%s:%u] timeout", m_url.host(), m_url.port());
close();
LOG_DEBUG("tick -> reconnect");
reconnect();
}
else {
LOG_DEBUG("Client::tick -> connect");
LOG_DEBUG("tick -> connect");
connect();
}
}
@ -139,10 +149,6 @@ void Client::tick(uint64_t now)
int64_t Client::submit(const JobResult &result)
{
# ifdef XMRIG_PROXY_PROJECT
const char *nonce = result.nonce;
const char *data = result.result;
# else
char nonce[9];
char data[65];
@ -151,13 +157,24 @@ int64_t Client::submit(const JobResult &result)
Job::toHex(result.result, 32, data);
data[64] = '\0';
# endif
const size_t size = snprintf(m_sendBuf, sizeof(m_sendBuf), "{\"id\":%" PRIu64 ",\"jsonrpc\":\"2.0\",\"method\":\"submit\",\"params\":{\"id\":\"%s\",\"job_id\":\"%s\",\"nonce\":\"%s\",\"result\":\"%s\"}}\n",
m_sequence, m_rpcId, result.jobId.data(), nonce, data);
const int size = snprintf(m_sendBuf, sizeof(m_sendBuf),
"{"
"\"id\":%" PRIu64 ","
"\"jsonrpc\":\"2.0\","
"\"method\":\"submit\","
"\"params\":"
"{"
"\"id\":\"%s\","
"\"job_id\":\"%s\","
"\"nonce\":\"%s\","
"\"result\":\"%s\""
"}"
"}\n",
m_sequence, m_rpcId, result.jobId.data(), nonce, data);
m_results[m_sequence] = SubmitResult(m_sequence, result.diff, result.actualDiff());
return send(size);
return send(m_sendBuf, size);
}
@ -191,6 +208,7 @@ bool Client::parseJob(const rapidjson::Value &params, int *code)
}
Job job(m_id, m_nicehash);
if (!job.setId(params["job_id"].GetString())) {
*code = 3;
return false;
@ -211,17 +229,19 @@ bool Client::parseJob(const rapidjson::Value &params, int *code)
switch (variantFromProxy) {
case -1:
Options::i()->setForcePowVersion(Options::POW_AUTODETECT);
job.setPowVersion(Options::POW_AUTODETECT);
break;
case 0:
Options::i()->setForcePowVersion(Options::POW_V1);
job.setPowVersion(Options::POW_V1);
break;
case 1:
Options::i()->setForcePowVersion(Options::POW_V2);
job.setPowVersion(Options::POW_V2);
break;
default:
break;
}
} else {
job.setPowVersion(Options::i()->forcePowVersion());
}
if (m_job != job) {
@ -238,11 +258,29 @@ bool Client::parseJob(const rapidjson::Value &params, int *code)
LOG_WARN("[%s:%u] duplicate job received, reconnect", m_url.host(), m_url.port());
}
close();
reconnect();
return false;
}
void Client::parseExtensions(const rapidjson::Value &value)
{
if (!value.IsArray()) {
return;
}
for (const rapidjson::Value &ext : value.GetArray()) {
if (!ext.IsString()) {
continue;
}
if (strcmp(ext.GetString(), "nicehash") == 0) {
m_nicehash = true;
}
}
}
bool Client::parseLogin(const rapidjson::Value &result, int *code)
{
const char *id = result["id"].GetString();
@ -251,9 +289,7 @@ bool Client::parseLogin(const rapidjson::Value &result, int *code)
return false;
}
# ifndef XMRIG_PROXY_PROJECT
m_nicehash = m_url.isNicehash();
# endif
if (result.HasMember("extensions")) {
parseExtensions(result["extensions"]);
@ -268,139 +304,31 @@ bool Client::parseLogin(const rapidjson::Value &result, int *code)
return rc;
}
int64_t Client::send(size_t size)
int64_t Client::send(char* buf, size_t size)
{
LOG_DEBUG("Client::send");
LOG_DEBUG("[%s:%u] send (%d bytes): \"%s\"", m_url.host(), m_url.port(), size, m_sendBuf);
if (!m_net) {
LOG_DEBUG_ERR("[%s:%u] send failed", m_url.host(), m_url.port());
return -1;
if (m_connection)
{
m_connection->send(buf, size);
m_expire = uv_now(uv_default_loop()) + kResponseTimeout;
m_sequence++;
}
if (net_write2(m_net, m_sendBuf, static_cast<unsigned int>(size)) < 0) {
close();
return -1;
}
m_expire = uv_now(uv_default_loop()) + kResponseTimeout;
return m_sequence++;
return m_sequence;
}
void Client::close()
{
LOG_DEBUG("Client::close");
LOG_DEBUG("close");
if (m_net) {
net_close(m_net, nullptr);
} else {
reconnect();
}
m_connection.reset();
}
void Client::connect()
{
LOG_DEBUG("Client::connect");
m_net = net_new(const_cast<char *>(m_url.host()), m_url.port());
m_net->data = this;
m_net->conn_cb = Client::onConnect;
m_net->read_cb = Client::onRead;
m_net->close_cb = Client::onClose;
m_net->error_cb = Client::onError;
#ifndef XMRIG_NO_TLS
if (m_url.useTls()) {
tls_ctx* tls_ctx = tls_ctx_new();
net_set_tls(m_net, tls_ctx);
}
#endif
net_connect(m_net);
}
void Client::onRead(net_t *net, size_t size, char *buf)
{
LOG_DEBUG("Client::onRead");
auto client = getClient(net->data);
if (size == 0) {
if (size != UV_EOF && !client->m_quiet) {
LOG_ERR("[%s:%u] read error: \"%s\"", client->m_url.host(), client->m_url.port(), uv_strerror((int) size));
}
return client->close();
}
client->m_recvBufPos += size;
char* end;
char* start = buf;
size_t remaining = client->m_recvBufPos;
while ((end = static_cast<char*>(memchr(start, '\n', remaining))) != nullptr) {
end++;
size_t len = end - start;
client->parse(start, len);
remaining -= len;
start = end;
}
if (remaining == 0) {
client->m_recvBufPos = 0;
return;
}
if (start == buf) {
return;
}
memcpy(buf, start, remaining);
client->m_recvBufPos = remaining;
}
void Client::onConnect(net_t *net) {
LOG_DEBUG("Client::onConnect");
auto client = getClient(net->data);
client->login();
}
void Client::onError(net_t *net, int err, char *errStr)
{
LOG_DEBUG("Client::onError");
if (net) {
auto client = getClient(net->data);
if (!client->m_quiet) {
LOG_ERR("[%s:%u] error: \"%s\"", client->m_url.host(), client->m_url.port(), errStr);
}
client->close();
}
}
void Client::onClose(net_t *net)
{
LOG_DEBUG("Client::onClose");
if (net) {
auto client = getClient(net->data);
net_free(net);
client->m_net = nullptr;
client->reconnect();
}
}
void Client::login()
{
LOG_DEBUG("Client::login");
LOG_DEBUG("login");
m_results.clear();
@ -433,20 +361,54 @@ void Client::login()
m_sendBuf[size] = '\n';
m_sendBuf[size + 1] = '\0';
send(size + 1);
send(m_sendBuf, size + 1);
}
void Client::processReceivedData(char* data, size_t size)
{
LOG_DEBUG("processReceivedData");
if ((size_t) size > (sizeof(m_buf) - 8 - m_recvBufPos)) {
reconnect();
return;
}
m_recvBufPos += size;
char* end;
char* start = data;
size_t remaining = m_recvBufPos;
while ((end = static_cast<char*>(memchr(start, '\n', remaining))) != nullptr) {
end++;
size_t len = end - start;
parse(start, len);
remaining -= len;
start = end;
}
if (remaining == 0) {
m_recvBufPos = 0;
return;
}
if (start == data) {
return;
}
memcpy(data, start, remaining);
m_recvBufPos = remaining;
}
void Client::parse(char *line, size_t len)
{
LOG_DEBUG("Client::parse");
LOG_DEBUG("parse");
startTimeout();
line[len - 1] = '\0';
LOG_DEBUG("[%s:%u] received (%d bytes): \"%s\"", m_url.host(), m_url.port(), len, line);
rapidjson::Document doc;
if (doc.ParseInsitu(line).HasParseError()) {
if (!m_quiet) {
@ -470,28 +432,11 @@ void Client::parse(char *line, size_t len)
}
void Client::parseExtensions(const rapidjson::Value &value)
{
if (!value.IsArray()) {
return;
}
for (const rapidjson::Value &ext : value.GetArray()) {
if (!ext.IsString()) {
continue;
}
if (strcmp(ext.GetString(), "nicehash") == 0) {
m_nicehash = true;
}
}
}
void Client::parseNotification(const char *method, const rapidjson::Value &params, const rapidjson::Value &error)
{
if (error.IsObject()) {
if (!m_quiet) {
LOG_ERR("[%s:%u] error: \"%s\", code: %d", m_url.host(), m_url.port(), error["message"].GetString(), error["code"].GetInt());
LOG_ERR("[%s:%u] Parse notification failed: \"%s\", code: %d", m_url.host(), m_url.port(), error["message"].GetString(), error["code"].GetInt());
}
return;
}
@ -509,7 +454,7 @@ void Client::parseNotification(const char *method, const rapidjson::Value &param
return;
}
LOG_WARN("[%s:%u] unsupported method: \"%s\"", m_url.host(), m_url.port(), method);
LOG_WARN("[%s:%u] Unsupported method: \"%s\"", m_url.host(), m_url.port(), method);
}
@ -525,11 +470,11 @@ void Client::parseResponse(int64_t id, const rapidjson::Value &result, const rap
m_results.erase(it);
}
else if (!m_quiet) {
LOG_ERR("[%s:%u] error: \"%s\", code: %d", m_url.host(), m_url.port(), message, error["code"].GetInt());
LOG_ERR("[%s:%u] Parse response failed: \"%s\", code: %d", m_url.host(), m_url.port(), message, error["code"].GetInt());
}
if (id == 1 || isCriticalError(message)) {
close();
reconnect();
}
return;
@ -543,10 +488,10 @@ void Client::parseResponse(int64_t id, const rapidjson::Value &result, const rap
int code = -1;
if (!parseLogin(result, &code)) {
if (!m_quiet) {
LOG_ERR("[%s:%u] login error code: %d", m_url.host(), m_url.port(), code);
LOG_ERR("[%s:%u] Login error code: %d", m_url.host(), m_url.port(), code);
}
return close();
return reconnect();
}
m_failures = 0;
@ -566,26 +511,35 @@ void Client::parseResponse(int64_t id, const rapidjson::Value &result, const rap
void Client::ping()
{
LOG_DEBUG("Client::ping");
send(snprintf(m_sendBuf, sizeof(m_sendBuf), "{\"id\":%" PRId64 ",\"jsonrpc\":\"2.0\",\"method\":\"keepalived\",\"params\":{\"id\":\"%s\"}}\n", m_sequence, m_rpcId));
LOG_DEBUG("ping");
const int size = snprintf(m_sendBuf, sizeof(m_sendBuf),
"{"
"\"id\":%" PRId64 ","
"\"jsonrpc\":\"2.0\","
"\"method\":\"keepalived\","
"\"params\":"
"{"
"\"id\":\"%s\""
"}"
"}\n",
m_sequence, m_rpcId);
send(m_sendBuf, size);
}
void Client::reconnect() {
void Client::reconnect()
{
LOG_DEBUG("reconnect");
LOG_DEBUG("Client::reconnect");
close();
# ifndef XMRIG_PROXY_PROJECT
if (m_url.isKeepAlive()) {
uv_timer_stop(&m_keepAliveTimer);
}
# endif
if (m_failures == -1) {
LOG_DEBUG("Client::reconnect -> m_failures == -1");
m_failures = 0;
m_expire = 0;
LOG_DEBUG("reconnect -> m_failures == -1");
return m_listener->onClose(this, -1);
}
@ -595,17 +549,86 @@ void Client::reconnect() {
m_expire = uv_now(uv_default_loop()) + m_retryPause;
}
void Client::startTimeout()
{
LOG_DEBUG("Client::startTimeout");
LOG_DEBUG("startTimeout");
m_expire = 0;
# ifndef XMRIG_PROXY_PROJECT
if (!m_url.isKeepAlive()) {
return;
}
uv_timer_start(&m_keepAliveTimer, [](uv_timer_t *handle) { getClient(handle->data)->ping(); }, kKeepAliveTimeout, 0);
# endif
}
void Client::onConnected(uv_async_t *handle)
{
LOG_DEBUG("onConnected");
auto client = getClient(handle->data);
if (client) {
client->login();
}
}
void Client::scheduleOnConnected()
{
LOG_DEBUG("scheduleOnConnected");
onConnectedAsync.data = this;
uv_async_send(&onConnectedAsync);
}
void Client::onReceived(uv_async_t *handle)
{
LOG_DEBUG("onReceived");
auto client = getClient(handle->data);
if (client) {
uv_mutex_lock(&client->m_mutex);
while (!client->m_readQueue.empty()) {
std::string data = client->m_readQueue.front();
client->processReceivedData(const_cast<char *>(data.c_str()), data.size());
client->m_readQueue.pop_front();
}
uv_mutex_unlock(&client->m_mutex);
}
}
void Client::scheduleOnReceived(char* data, std::size_t size)
{
LOG_DEBUG("scheduleOnReceived");
uv_mutex_lock(&m_mutex);
m_readQueue.emplace_back(data, size);
uv_mutex_unlock(&m_mutex);
onReceivedAsync.data = this;
uv_async_send(&onReceivedAsync);
}
void Client::onError(uv_async_t *handle)
{
LOG_DEBUG("onError");
auto client = getClient(handle->data);
if (client) {
client->reconnect();
}
}
void Client::scheduleOnError(const std::string &error)
{
LOG_DEBUG("scheduleOnError");
if (!m_quiet) {
LOG_ERR("[%s:%u] Error: \"%s\"", m_url.host(), m_url.port(), error.c_str());
}
onErrorAsync.data = this;
uv_async_send(&onErrorAsync);
}