Client refactoring.

This commit is contained in:
XMRig 2019-03-15 01:06:10 +07:00
parent d57b41c673
commit be5d609856
6 changed files with 72 additions and 47 deletions

View file

@ -37,6 +37,7 @@
#endif
#include "base/tools/Chrono.h"
#include "common/interfaces/IClientListener.h"
#include "common/log/Log.h"
#include "common/net/Client.h"
@ -72,12 +73,11 @@ static const char *states[] = {
xmrig::Client::Client(int id, const char *agent, IClientListener *listener) :
m_enabled(true),
m_ipv6(false),
m_nicehash(false),
m_quiet(false),
m_agent(agent),
m_listener(listener),
m_extensions(0),
m_id(id),
m_retries(5),
m_retryPause(5000),
@ -252,7 +252,7 @@ int64_t xmrig::Client::submit(const JobResult &result)
params.AddMember("nonce", StringRef(nonce), allocator);
params.AddMember("result", StringRef(data), allocator);
if ((m_extensions & AlgoExt) && result.algorithm.isValid()) {
if (has<EXT_ALGO>() && result.algorithm.isValid()) {
params.AddMember("algo", StringRef(result.algorithm.shortName()), allocator);
}
@ -327,7 +327,7 @@ bool xmrig::Client::parseJob(const rapidjson::Value &params, int *code)
return false;
}
Job job(m_id, m_nicehash, m_pool.algorithm(), m_rpcId);
Job job(m_id, has<EXT_NICEHASH>(), m_pool.algorithm(), m_rpcId);
if (!job.setId(params["job_id"].GetString())) {
*code = 3;
@ -402,11 +402,7 @@ bool xmrig::Client::parseLogin(const rapidjson::Value &result, int *code)
return false;
}
m_nicehash = m_pool.isNicehash();
if (result.HasMember("extensions")) {
parseExtensions(result["extensions"]);
}
parseExtensions(result);
const bool rc = parseJob(result["job"], code);
m_jobs = 0;
@ -547,7 +543,7 @@ int64_t xmrig::Client::send(size_t size)
}
}
m_expire = uv_now(uv_default_loop()) + kResponseTimeout;
m_expire = Chrono::steadyMSecs() + kResponseTimeout;
return m_sequence++;
}
@ -597,7 +593,7 @@ void xmrig::Client::handshake()
{
# ifndef XMRIG_NO_TLS
if (isTLS()) {
m_expire = uv_now(uv_default_loop()) + kResponseTimeout;
m_expire = Chrono::steadyMSecs() + kResponseTimeout;
m_tls->handshake();
}
@ -709,28 +705,37 @@ void xmrig::Client::parse(char *line, size_t len)
}
void xmrig::Client::parseExtensions(const rapidjson::Value &value)
void xmrig::Client::parseExtensions(const rapidjson::Value &result)
{
m_extensions = 0;
m_extensions.reset();
if (!value.IsArray()) {
if (!result.HasMember("extensions")) {
return;
}
for (const rapidjson::Value &ext : value.GetArray()) {
const rapidjson::Value &extensions = result["extensions"];
if (!extensions.IsArray()) {
return;
}
for (const rapidjson::Value &ext : extensions.GetArray()) {
if (!ext.IsString()) {
continue;
}
if (strcmp(ext.GetString(), "algo") == 0) {
m_extensions |= AlgoExt;
continue;
}
const char *name = ext.GetString();
if (strcmp(ext.GetString(), "nicehash") == 0) {
m_extensions |= NicehashExt;
m_nicehash = true;
continue;
if (strcmp(name, "algo") == 0) {
setExtension(EXT_ALGO, true);
}
else if (strcmp(name, "nicehash") == 0) {
setExtension(EXT_NICEHASH, true);
}
else if (strcmp(name, "connect") == 0) {
setExtension(EXT_CONNECT, true);
}
else if (strcmp(name, "keepalive") == 0) {
setExtension(EXT_KEEPALIVE, true);
}
}
}
@ -868,7 +873,7 @@ void xmrig::Client::reconnect()
m_failures++;
m_listener->onClose(this, (int) m_failures);
m_expire = uv_now(uv_default_loop()) + m_retryPause;
m_expire = Chrono::steadyMSecs() + m_retryPause;
}
@ -888,8 +893,10 @@ void xmrig::Client::startTimeout()
{
m_expire = 0;
if (m_pool.keepAlive()) {
m_keepAlive = uv_now(uv_default_loop()) + (m_pool.keepAlive() * 1000);
if (has<EXT_KEEPALIVE>()) {
const uint64_t ms = static_cast<uint64_t>(m_pool.keepAlive() > 0 ? m_pool.keepAlive() : Pool::kKeepAliveTimeout) * 1000;
m_keepAlive = Chrono::steadyMSecs() + ms;
}
}

View file

@ -26,6 +26,7 @@
#define XMRIG_CLIENT_H
#include <bitset>
#include <map>
#include <uv.h>
#include <vector>
@ -61,6 +62,14 @@ public:
ClosingState
};
enum Extension {
EXT_ALGO,
EXT_NICEHASH,
EXT_CONNECT,
EXT_KEEPALIVE,
EXT_MAX
};
constexpr static int kResponseTimeout = 20 * 1000;
# ifndef XMRIG_NO_TLS
@ -82,6 +91,7 @@ public:
void setPool(const Pool &pool);
void tick(uint64_t now);
inline bool isEnabled() const { return m_enabled; }
inline bool isReady() const { return m_state == ConnectedState && m_failures == 0; }
inline const char *host() const { return m_pool.host(); }
inline const char *ip() const { return m_ip; }
@ -90,19 +100,16 @@ public:
inline SocketState state() const { return m_state; }
inline uint16_t port() const { return m_pool.port(); }
inline void setAlgo(const Algorithm &algo) { m_pool.setAlgo(algo); }
inline void setEnabled(bool enabled) { m_enabled = enabled; }
inline void setQuiet(bool quiet) { m_quiet = quiet; }
inline void setRetries(int retries) { m_retries = retries; }
inline void setRetryPause(int ms) { m_retryPause = ms; }
template<Extension ext> inline bool has() const noexcept { return m_extensions.test(ext); }
private:
class Tls;
enum Extensions {
NicehashExt = 1,
AlgoExt = 2
};
bool close();
bool isCriticalError(const char *message);
bool isTLS() const;
@ -119,7 +126,7 @@ private:
void login();
void onClose();
void parse(char *line, size_t len);
void parseExtensions(const rapidjson::Value &value);
void parseExtensions(const rapidjson::Value &result);
void parseNotification(const char *method, const rapidjson::Value &params, const rapidjson::Value &error);
void parseResponse(int64_t id, const rapidjson::Value &result, const rapidjson::Value &error);
void ping();
@ -128,7 +135,8 @@ private:
void setState(SocketState state);
void startTimeout();
inline bool isQuiet() const { return m_quiet || m_failures >= m_retries; }
inline bool isQuiet() const { return m_quiet || m_failures >= m_retries; }
inline void setExtension(Extension ext, bool enable) noexcept { m_extensions.set(ext, enable); }
static void onAllocBuffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf);
static void onClose(uv_handle_t *handle);
@ -139,15 +147,15 @@ private:
static inline Client *getClient(void *data) { return m_storage.get(data); }
addrinfo m_hints;
bool m_enabled;
bool m_ipv6;
bool m_nicehash;
bool m_quiet;
char m_buf[kInputBufferSize];
char m_ip[46];
char m_sendBuf[2048];
const char *m_agent;
IClientListener *m_listener;
int m_extensions;
Id m_rpcId;
int m_id;
int m_retries;
int m_retryPause;
@ -156,6 +164,7 @@ private:
Pool m_pool;
size_t m_recvBufPos;
SocketState m_state;
std::bitset<EXT_MAX> m_extensions;
std::map<int64_t, SubmitResult> m_results;
Tls *m_tls;
uint64_t m_expire;
@ -166,13 +175,16 @@ private:
uv_getaddrinfo_t m_resolver;
uv_stream_t *m_stream;
uv_tcp_t *m_socket;
Id m_rpcId;
static int64_t m_sequence;
static Storage<Client> m_storage;
};
template<> inline bool Client::has<Client::EXT_NICEHASH>() const noexcept { return m_extensions.test(EXT_NICEHASH) || m_pool.isNicehash(); }
template<> inline bool Client::has<Client::EXT_KEEPALIVE>() const noexcept { return m_extensions.test(EXT_KEEPALIVE) || m_pool.keepAlive() > 0; }
} /* namespace xmrig */