Added SSL/TLS support to stratum communication

This commit is contained in:
BenDroid 2018-02-25 22:06:40 +01:00
parent d6f27624b7
commit b5cba7bf57
8 changed files with 139 additions and 270 deletions

View file

@ -5,6 +5,7 @@
* Copyright 2014-2016 Wolf9466 <https://github.com/OhGodAPet>
* Copyright 2016 Jay D Dee <jayddee246@gmail.com>
* Copyright 2016-2017 XMRig <support@xmrig.com>
* Copyright 2018- BenDr0id <ben@graef.in>
*
*
* This program is free software: you can redistribute it and/or modify
@ -23,15 +24,13 @@
#include <inttypes.h>
#include <iterator>
#include <stdio.h>
#include <string.h>
#include <utility>
#include <uv.h>
#include "interfaces/IClientListener.h"
#include "log/Log.h"
#include "net/Client.h"
#include "net/Url.h"
#include "rapidjson/document.h"
#include "rapidjson/error/en.h"
#include "rapidjson/stringbuffer.h"
@ -61,20 +60,8 @@ Client::Client(int id, const char *agent, IClientListener *listener) :
m_retryPause(5000),
m_failures(0),
m_recvBufPos(0),
m_state(UnconnectedState),
m_expire(0),
m_stream(nullptr),
m_socket(nullptr)
m_expire(0)
{
memset(m_ip, 0, sizeof(m_ip));
memset(&m_hints, 0, sizeof(m_hints));
m_resolver.data = this;
m_hints.ai_family = PF_INET;
m_hints.ai_socktype = SOCK_STREAM;
m_hints.ai_protocol = IPPROTO_TCP;
m_recvBuf.base = m_buf;
m_recvBuf.len = sizeof(m_buf);
@ -87,13 +74,9 @@ Client::Client(int id, const char *agent, IClientListener *listener) :
Client::~Client()
{
delete m_socket;
}
void Client::connect()
{
resolve(m_url.host());
if (m_net) {
net_free(m_net);
}
}
@ -105,7 +88,7 @@ void Client::connect()
void Client::connect(const Url *url)
{
setUrl(url);
resolve(m_url.host());
connect();
}
@ -138,13 +121,11 @@ void Client::tick(uint64_t now)
return;
}
if (m_state == ConnectedState) {
if (m_net) {
LOG_DEBUG_ERR("[%s:%u] timeout", m_url.host(), m_url.port());
close();
reconnect();
}
if (m_state == ConnectingState) {
else {
connect();
}
}
@ -247,41 +228,15 @@ bool Client::parseLogin(const rapidjson::Value &result, int *code)
return parseJob(result["job"], code);
}
int Client::resolve(const char *host)
{
setState(HostLookupState);
m_expire = 0;
m_recvBufPos = 0;
if (m_failures == -1) {
m_failures = 0;
}
const int r = uv_getaddrinfo(uv_default_loop(), &m_resolver, Client::onResolved, host, NULL, &m_hints);
if (r) {
if (!m_quiet) {
LOG_ERR("[%s:%u] getaddrinfo error: \"%s\"", host, m_url.port(), uv_strerror(r));
}
return 1;
}
return 0;
}
int64_t Client::send(size_t size)
{
LOG_DEBUG("[%s:%u] send (%d bytes): \"%s\"", m_url.host(), m_url.port(), size, m_sendBuf);
if (state() != ConnectedState || !uv_is_writable(m_stream)) {
LOG_DEBUG_ERR("[%s:%u] send failed, invalid state: %d", m_url.host(), m_url.port(), m_state);
LOG_DEBUG("[%s:%u] send (%d bytes): \"%s\"", m_url.host(), m_url.port(), size, buf);
if (!m_net) {
LOG_DEBUG_ERR("[%s:%u] send failed", m_url.host(), m_url.port());
return -1;
}
uv_buf_t buf = uv_buf_init(m_sendBuf, (unsigned int) size);
if (uv_try_write(m_stream, &buf, 1) < 0) {
if (net_write2(m_net, m_sendBuf, static_cast<unsigned int>(size)) < 0) {
close();
return -1;
}
@ -293,42 +248,93 @@ int64_t Client::send(size_t size)
void Client::close()
{
if (m_state == UnconnectedState || m_state == ClosingState || !m_socket) {
if (m_net) {
auto client = getClient(m_net->data);
net_free(m_net);
m_net = nullptr;
client->reconnect();
}
}
void Client::connect()
{
m_net = net_new(const_cast<char *>(m_url.host()), m_url.port());
m_net->data = this;
m_net->conn_cb = Client::onConnect;
m_net->read_cb = Client::onRead;
m_net->error_cb = Client::onError;
#ifndef XMRIG_NO_SSL_TLS
if (m_url.isTls()) {
tls_ctx* tls_ctx = tls_ctx_new();
net_set_tls(m_net, tls_ctx);
}
#endif
net_connect(m_net);
}
void Client::onRead(net_t *net, size_t size, char *buf)
{
auto client = getClient(net->data);
if (size < 0) {
if (size != UV_EOF && !client->m_quiet) {
LOG_ERR("[%s:%u] read error: \"%s\"", client->m_url.host(), client->m_url.port(), uv_strerror((int) size));
}
return client->close();
}
client->m_recvBufPos += size;
char* end;
char* start = buf;
size_t remaining = client->m_recvBufPos;
while ((end = static_cast<char*>(memchr(start, '\n', remaining))) != nullptr) {
end++;
size_t len = end - start;
client->parse(start, len);
remaining -= len;
start = end;
}
if (remaining == 0) {
client->m_recvBufPos = 0;
return;
}
setState(ClosingState);
if (start == buf) {
return;
}
if (uv_is_closing(reinterpret_cast<uv_handle_t*>(m_socket)) == 0) {
uv_close(reinterpret_cast<uv_handle_t*>(m_socket), Client::onClose);
memcpy(buf, start, remaining);
client->m_recvBufPos = remaining;
}
void Client::onConnect(net_t *net) {
auto client = getClient(net->data);
client->login();
}
void Client::onError(net_t *net, int err, char *errStr)
{
if (net) {
auto client = getClient(net->data);
if (!client->m_quiet) {
LOG_ERR("[%s:%u] error: \"%s\"", client->m_url.host(), client->m_url.port(), errStr);
}
client->close();
}
}
void Client::connect(struct sockaddr *addr)
{
setState(ConnectingState);
reinterpret_cast<struct sockaddr_in*>(addr)->sin_port = htons(m_url.port());
delete m_socket;
uv_connect_t *req = new uv_connect_t;
req->data = this;
m_socket = new uv_tcp_t;
m_socket->data = this;
uv_tcp_init(uv_default_loop(), m_socket);
uv_tcp_nodelay(m_socket, 1);
# ifndef WIN32
uv_tcp_keepalive(m_socket, 1, 60);
# endif
uv_tcp_connect(req, m_socket, reinterpret_cast<const sockaddr*>(addr), Client::onConnect);
}
void Client::login()
{
m_results.clear();
@ -480,9 +486,7 @@ void Client::ping()
}
void Client::reconnect()
{
setState(ConnectingState);
void Client::reconnect() {
# ifndef XMRIG_PROXY_PROJECT
if (m_url.isKeepAlive()) {
@ -500,19 +504,6 @@ void Client::reconnect()
m_expire = uv_now(uv_default_loop()) + m_retryPause;
}
void Client::setState(SocketState state)
{
LOG_DEBUG("[%s:%u] state: %d", m_url.host(), m_url.port(), state);
if (m_state == state) {
return;
}
m_state = state;
}
void Client::startTimeout()
{
m_expire = 0;
@ -525,127 +516,3 @@ void Client::startTimeout()
uv_timer_start(&m_keepAliveTimer, [](uv_timer_t *handle) { getClient(handle->data)->ping(); }, kKeepAliveTimeout, 0);
# endif
}
void Client::onAllocBuffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf)
{
auto client = getClient(handle->data);
buf->base = &client->m_recvBuf.base[client->m_recvBufPos];
buf->len = client->m_recvBuf.len - client->m_recvBufPos;
}
void Client::onClose(uv_handle_t *handle)
{
auto client = getClient(handle->data);
delete client->m_socket;
client->m_stream = nullptr;
client->m_socket = nullptr;
client->setState(UnconnectedState);
client->reconnect();
}
void Client::onConnect(uv_connect_t *req, int status)
{
auto client = getClient(req->data);
if (status < 0) {
if (!client->m_quiet) {
LOG_ERR("[%s:%u] connect error: \"%s\"", client->m_url.host(), client->m_url.port(), uv_strerror(status));
}
delete req;
client->close();
return;
}
client->m_stream = static_cast<uv_stream_t*>(req->handle);
client->m_stream->data = req->data;
client->setState(ConnectedState);
uv_read_start(client->m_stream, Client::onAllocBuffer, Client::onRead);
delete req;
client->login();
}
void Client::onRead(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
{
auto client = getClient(stream->data);
if (nread < 0) {
if (nread != UV_EOF && !client->m_quiet) {
LOG_ERR("[%s:%u] read error: \"%s\"", client->m_url.host(), client->m_url.port(), uv_strerror((int) nread));
}
return client->close();
}
if ((size_t) nread > (sizeof(m_buf) - 8 - client->m_recvBufPos)) {
return client->close();
}
client->m_recvBufPos += nread;
char* end;
char* start = buf->base;
size_t remaining = client->m_recvBufPos;
while ((end = static_cast<char*>(memchr(start, '\n', remaining))) != nullptr) {
end++;
size_t len = end - start;
client->parse(start, len);
remaining -= len;
start = end;
}
if (remaining == 0) {
client->m_recvBufPos = 0;
return;
}
if (start == buf->base) {
return;
}
memcpy(buf->base, start, remaining);
client->m_recvBufPos = remaining;
}
void Client::onResolved(uv_getaddrinfo_t *req, int status, struct addrinfo *res)
{
auto client = getClient(req->data);
if (status < 0) {
LOG_ERR("[%s:%u] DNS error: \"%s\"", client->m_url.host(), client->m_url.port(), uv_strerror(status));
return client->reconnect();
}
addrinfo *ptr = res;
std::vector<addrinfo*> ipv4;
while (ptr != nullptr) {
if (ptr->ai_family == AF_INET) {
ipv4.push_back(ptr);
}
ptr = ptr->ai_next;
}
if (ipv4.empty()) {
LOG_ERR("[%s:%u] DNS error: \"No IPv4 records found\"", client->m_url.host(), client->m_url.port());
return client->reconnect();
}
ptr = ipv4[rand() % ipv4.size()];
uv_ip4_name(reinterpret_cast<sockaddr_in*>(ptr->ai_addr), client->m_ip, 16);
client->connect(ptr->ai_addr);
uv_freeaddrinfo(res);
}