Implemented reconnect.
This commit is contained in:
parent
b8cc1136a4
commit
c29dc8bcf4
7 changed files with 40 additions and 7 deletions
|
@ -131,7 +131,7 @@ Options::Options(int argc, char **argv) :
|
||||||
m_donateLevel(kDonateLevel),
|
m_donateLevel(kDonateLevel),
|
||||||
m_maxCpuUsage(75),
|
m_maxCpuUsage(75),
|
||||||
m_retries(5),
|
m_retries(5),
|
||||||
m_retryPause(5),
|
m_retryPause(2),
|
||||||
m_threads(0),
|
m_threads(0),
|
||||||
m_affinity(-1L),
|
m_affinity(-1L),
|
||||||
m_backupUrl(nullptr),
|
m_backupUrl(nullptr),
|
||||||
|
|
|
@ -56,6 +56,7 @@ public:
|
||||||
inline const char *user() const { return m_user; }
|
inline const char *user() const { return m_user; }
|
||||||
inline const Url *backupUrl() const { return m_backupUrl; }
|
inline const Url *backupUrl() const { return m_backupUrl; }
|
||||||
inline const Url *url() const { return m_url; }
|
inline const Url *url() const { return m_url; }
|
||||||
|
inline int retryPause() const { return m_retryPause; }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
Options(int argc, char **argv);
|
Options(int argc, char **argv);
|
||||||
|
|
|
@ -34,6 +34,7 @@ class IClientListener
|
||||||
public:
|
public:
|
||||||
virtual ~IClientListener() {}
|
virtual ~IClientListener() {}
|
||||||
|
|
||||||
|
virtual void onClose(Client *client, int failures);
|
||||||
virtual void onJobReceived(Client *client, const Job &job) = 0;
|
virtual void onJobReceived(Client *client, const Job &job) = 0;
|
||||||
virtual void onLoginCredentialsRequired(Client *client) = 0;
|
virtual void onLoginCredentialsRequired(Client *client) = 0;
|
||||||
virtual void onLoginSuccess(Client *client) = 0;
|
virtual void onLoginSuccess(Client *client) = 0;
|
||||||
|
|
|
@ -32,7 +32,8 @@ Client::Client(int id, IClientListener *listener) :
|
||||||
m_host(nullptr),
|
m_host(nullptr),
|
||||||
m_listener(listener),
|
m_listener(listener),
|
||||||
m_id(id),
|
m_id(id),
|
||||||
m_retries(0),
|
m_retryPause(2000),
|
||||||
|
m_failures(0),
|
||||||
m_sequence(1),
|
m_sequence(1),
|
||||||
m_recvBufPos(0),
|
m_recvBufPos(0),
|
||||||
m_state(UnconnectedState),
|
m_state(UnconnectedState),
|
||||||
|
@ -49,6 +50,9 @@ Client::Client(int id, IClientListener *listener) :
|
||||||
|
|
||||||
m_recvBuf.base = static_cast<char*>(malloc(kRecvBufSize));
|
m_recvBuf.base = static_cast<char*>(malloc(kRecvBufSize));
|
||||||
m_recvBuf.len = kRecvBufSize;
|
m_recvBuf.len = kRecvBufSize;
|
||||||
|
|
||||||
|
m_retriesTimer.data = this;
|
||||||
|
uv_timer_init(uv_default_loop(), &m_retriesTimer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -80,6 +84,8 @@ void Client::connect(const Url *url)
|
||||||
|
|
||||||
void Client::disconnect()
|
void Client::disconnect()
|
||||||
{
|
{
|
||||||
|
m_failures = -1;
|
||||||
|
|
||||||
close();
|
close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -296,11 +302,24 @@ void Client::parseResponse(int64_t id, const json_t *result, const json_t *error
|
||||||
return close();
|
return close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
m_failures = 0;
|
||||||
m_listener->onLoginSuccess(this);
|
m_listener->onLoginSuccess(this);
|
||||||
m_listener->onJobReceived(this, m_job);
|
m_listener->onJobReceived(this, m_job);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void Client::reconnect()
|
||||||
|
{
|
||||||
|
if (m_failures == -1) {
|
||||||
|
return m_listener->onClose(this, -1);
|
||||||
|
}
|
||||||
|
|
||||||
|
m_failures++;
|
||||||
|
m_listener->onClose(this, m_failures);
|
||||||
|
|
||||||
|
uv_timer_start(&m_retriesTimer, [](uv_timer_t *handle) { getClient(handle->data)->connect(); }, m_retryPause, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -335,7 +354,7 @@ void Client::onClose(uv_handle_t *handle)
|
||||||
client->m_socket = nullptr;
|
client->m_socket = nullptr;
|
||||||
client->setState(UnconnectedState);
|
client->setState(UnconnectedState);
|
||||||
|
|
||||||
LOG_NOTICE("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
|
client->reconnect();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -405,7 +424,7 @@ void Client::onResolved(uv_getaddrinfo_t *req, int status, struct addrinfo *res)
|
||||||
auto client = getClient(req->data);
|
auto client = getClient(req->data);
|
||||||
if (status < 0) {
|
if (status < 0) {
|
||||||
LOG_ERR("[%s:%u] DNS error: \"%s\"", client->m_host, client->m_port, uv_strerror(status));
|
LOG_ERR("[%s:%u] DNS error: \"%s\"", client->m_host, client->m_port, uv_strerror(status));
|
||||||
return client->close();;
|
return client->reconnect();;
|
||||||
}
|
}
|
||||||
|
|
||||||
client->connect(res->ai_addr);
|
client->connect(res->ai_addr);
|
||||||
|
|
|
@ -59,6 +59,7 @@ public:
|
||||||
|
|
||||||
inline int id() const { return m_id; }
|
inline int id() const { return m_id; }
|
||||||
inline SocketState state() const { return m_state; }
|
inline SocketState state() const { return m_state; }
|
||||||
|
inline void setRetryPause(int ms) { m_retryPause = ms; }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
constexpr static size_t kRecvBufSize = 4096;
|
constexpr static size_t kRecvBufSize = 4096;
|
||||||
|
@ -71,6 +72,7 @@ private:
|
||||||
void parse(char *line, size_t len);
|
void parse(char *line, size_t len);
|
||||||
void parseNotification(const char *method, const json_t *params);
|
void parseNotification(const char *method, const json_t *params);
|
||||||
void parseResponse(int64_t id, const json_t *result, const json_t *error);
|
void parseResponse(int64_t id, const json_t *result, const json_t *error);
|
||||||
|
void reconnect();
|
||||||
void setState(SocketState state);
|
void setState(SocketState state);
|
||||||
|
|
||||||
static void onAllocBuffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf);
|
static void onAllocBuffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf);
|
||||||
|
@ -85,7 +87,8 @@ private:
|
||||||
char m_rpcId[64];
|
char m_rpcId[64];
|
||||||
IClientListener *m_listener;
|
IClientListener *m_listener;
|
||||||
int m_id;
|
int m_id;
|
||||||
int64_t m_retries;
|
int m_retryPause;
|
||||||
|
int64_t m_failures;
|
||||||
int64_t m_sequence;
|
int64_t m_sequence;
|
||||||
Job m_job;
|
Job m_job;
|
||||||
size_t m_recvBufPos;
|
size_t m_recvBufPos;
|
||||||
|
@ -96,6 +99,7 @@ private:
|
||||||
uv_getaddrinfo_t m_resolver;
|
uv_getaddrinfo_t m_resolver;
|
||||||
uv_stream_t *m_stream;
|
uv_stream_t *m_stream;
|
||||||
uv_tcp_t *m_socket;
|
uv_tcp_t *m_socket;
|
||||||
|
uv_timer_t m_retriesTimer;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -65,6 +65,12 @@ void Network::connect()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void Network::onClose(Client *client, int failures)
|
||||||
|
{
|
||||||
|
LOG_DEBUG("CLOSE %d %d", client->id(), failures);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void Network::onJobReceived(Client *client, const Job &job)
|
void Network::onJobReceived(Client *client, const Job &job)
|
||||||
{
|
{
|
||||||
|
|
||||||
|
@ -90,6 +96,7 @@ void Network::addPool(const Url *url)
|
||||||
|
|
||||||
Client *client = new Client(m_pools.size(), this);
|
Client *client = new Client(m_pools.size(), this);
|
||||||
client->setUrl(url);
|
client->setUrl(url);
|
||||||
|
client->setRetryPause(m_options->retryPause() * 1000);
|
||||||
|
|
||||||
m_pools.push_back(client);
|
m_pools.push_back(client);
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,6 +46,7 @@ public:
|
||||||
static char *userAgent();
|
static char *userAgent();
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
|
void onClose(Client *client, int failures) override;
|
||||||
void onJobReceived(Client *client, const Job &job) override;
|
void onJobReceived(Client *client, const Job &job) override;
|
||||||
void onLoginCredentialsRequired(Client *client) override;
|
void onLoginCredentialsRequired(Client *client) override;
|
||||||
void onLoginSuccess(Client *client) override;
|
void onLoginSuccess(Client *client) override;
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue