Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions code/natneg/server/NNDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ namespace NN {
break;
case NN_ADDRESS_REPLY:
case NN_NATIFY_REQUEST:
case NN_ADDRESS_CHECK:
case NN_ERTTEST:
case NN_INIT:
case NN_INITACK:
Expand Down
143 changes: 143 additions & 0 deletions code/qr/server/ClientMessageResender.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
#include "ClientMessageResender.h"


#include <tasks/tasks.h>
#include "QRDriver.h"


constexpr auto RESEND_INTERVAL_MS = 750;
constexpr auto RESEND_COUNT = 8;


namespace QR {

ClientMessageResender::Task::~Task() {
uv_close((uv_handle_t *)timer_handle, +[](uv_handle_t *handle){
delete (uv_timer_t *)handle;
});
}

ClientMessageResender::ClientMessageResender() : current_message_key{0} {
uv_mutex_init(&m_queue_mutex);
m_async_send_handle = new uv_async_t;
uv_async_init(uv_default_loop(), m_async_send_handle, +[](uv_async_t *handle){
auto& thiz = *(ClientMessageResender *)uv_handle_get_data((uv_handle_t *)handle);
thiz.async_send_callback();
});
uv_handle_set_data((uv_handle_t *)m_async_send_handle, this);
}

ClientMessageResender::~ClientMessageResender(){
uv_close((uv_handle_t*)m_async_send_handle, +[](uv_handle_t *handle){
delete (uv_async_t *)handle;
});
uv_mutex_destroy(&m_queue_mutex);
}


void ClientMessageResender::put_message(
uint32_t instance_key,
OS::Buffer& message_buffer,
const OS::Address& to_address,
QR::Driver *driver
){
std::shared_ptr<Task> task = std::make_shared<Task>(
true,
to_address,
instance_key,
message_buffer,
driver,
this,
RESEND_COUNT
);

uv_mutex_lock(&m_queue_mutex);
m_task_queue.push(task);
uv_mutex_unlock(&m_queue_mutex);
uv_async_send(m_async_send_handle);
}

void ClientMessageResender::remove_message(uint32_t message_key, const OS::Address *from_address){
std::shared_ptr<TaskBasicData> task = std::make_shared<TaskBasicData>(
false,
message_key,
*from_address
);

uv_mutex_lock(&m_queue_mutex);
m_task_queue.push(task);
uv_mutex_unlock(&m_queue_mutex);
uv_async_send(m_async_send_handle);
}


// process queued add/remove tasks
void ClientMessageResender::async_send_callback(){
uv_mutex_lock(&m_queue_mutex);
while(!m_task_queue.empty()){
std::shared_ptr<TaskBasicData> task = m_task_queue.front();
m_task_queue.pop();
uv_mutex_unlock(&m_queue_mutex);

// cancel and remove existing task
auto it = m_active_tasks.find(task);
if(it != m_active_tasks.end()){
m_active_tasks.erase(it);
}

// add new task
if(task->add_task){
task->message_key = current_message_key++;

const std::shared_ptr<TaskBasicData>& task2 = *m_active_tasks.emplace(task).first;
Task *task3 = (Task *)task2.get();
task3->timer_handle = new uv_timer_t;
uv_timer_init(uv_default_loop(), task3->timer_handle);
uv_handle_set_data((uv_handle_t *)task3->timer_handle, (void *)&task2);
uv_timer_start(
task3->timer_handle,
+[](uv_timer_t *handle) {
auto& task = *(std::shared_ptr<TaskBasicData> *)uv_handle_get_data((uv_handle_t *)handle);
Task *task2 = (Task *)task.get();
task2->resender->timer_handler(&task);
},
0,
RESEND_INTERVAL_MS
);
}
uv_mutex_lock(&m_queue_mutex);
}
uv_mutex_unlock(&m_queue_mutex);
}

void ClientMessageResender::timer_handler(std::shared_ptr<TaskBasicData> *task){
WorkQueueData *work_data = new WorkQueueData{*task};
work_data->uv_req.data = work_data;

Task *task2 = (Task *)task->get();

uv_queue_work(
uv_default_loop(),
&work_data->uv_req,
+[](uv_work_t *req) {
WorkQueueData *work_data = (WorkQueueData *)req->data;
Task *task = (Task *)work_data->task.get();
task->driver->send_client_message(2, task->address, task->instance_key, task->message_key, task->message_buffer);
},
+[](uv_work_t *req, int status) {
WorkQueueData *work_data = (WorkQueueData *)req->data;
delete work_data;
}
);


if(--task2->retries_left == 0){
auto it = m_active_tasks.find(*task);
if(it != m_active_tasks.end()) {
m_active_tasks.erase(it);
}
}
}


}
109 changes: 109 additions & 0 deletions code/qr/server/ClientMessageResender.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
#ifndef _CLIENTMESSAGERESENDER_H
#define _CLIENTMESSAGERESENDER_H


#include <core/OS/OpenSpy.h>
#include <core/OS/Buffer.h>
#include <unordered_set>
#include <cstdint>
#include <memory>
#include <queue>


namespace QR {
class Driver;

class ClientMessageResender {
public:
ClientMessageResender();
~ClientMessageResender();
void put_message(
uint32_t instance_key,
OS::Buffer& message_buffer,
const OS::Address& to_address,
QR::Driver *driver
);
void remove_message(uint32_t message_key, const OS::Address *from_address);
private:

struct TaskBasicData {
inline TaskBasicData(
bool add_task,
uint32_t message_key,
const OS::Address& address
) : add_task{add_task},
message_key{message_key},
address{address} {}
inline TaskBasicData(
bool add_task,
const OS::Address& address
) : add_task{add_task},
address{address} {}
virtual ~TaskBasicData() = default;

bool add_task;
uint32_t message_key;
OS::Address address;
};

struct Task : TaskBasicData {
inline Task(
bool add_task,
const OS::Address& address,
uint32_t instance_key,
OS::Buffer& message_buffer,
QR::Driver *driver,
ClientMessageResender *resender,
uint32_t retries_left
) : TaskBasicData{add_task, address},
instance_key{instance_key},
message_buffer{message_buffer},
driver{driver},
resender{resender},
retries_left{retries_left} {}
~Task() override;

uint32_t instance_key;
OS::Buffer message_buffer;
QR::Driver *driver;
ClientMessageResender *resender;
uint32_t retries_left;
uv_timer_t *timer_handle;
};

struct WorkQueueData {
std::shared_ptr<TaskBasicData> task;
uv_work_t uv_req;
};

struct TaskSharedPtrHash {
inline size_t operator()(const std::shared_ptr<TaskBasicData>& t) const noexcept {
auto ip_hash = std::hash<uint32_t>()(t->address.ip);
auto port_hash = std::hash<uint16_t>()(t->address.port);
auto message_key_hash = std::hash<uint32_t>()(t->message_key);
auto final_hash = ip_hash ^ (port_hash << 1);
final_hash = final_hash ^ (message_key_hash << 1);
return final_hash;
}
};

struct TaskSharedPtrEqual {
bool operator()(const std::shared_ptr<TaskBasicData>& lhs, const std::shared_ptr<TaskBasicData>& rhs) const noexcept {
return lhs->address == rhs->address && lhs->message_key == rhs->message_key;
}
};

void async_send_callback();
uv_async_t *m_async_send_handle;
uv_mutex_t m_queue_mutex;
std::queue<std::shared_ptr<TaskBasicData>> m_task_queue;

// should be std::shared_ptr<Task>, but in C++11 there is no support
// for heterogenous .find() in std::unordered_set
std::unordered_set<std::shared_ptr<TaskBasicData>, TaskSharedPtrHash, TaskSharedPtrEqual> m_active_tasks;
uint32_t current_message_key;
void timer_handler(std::shared_ptr<TaskBasicData> *task);
};
}

#endif //_CLIENTMESSAGERESENDER_H
4 changes: 3 additions & 1 deletion code/qr/server/QRServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#define _QRSERVER_H
#include <stdint.h>
#include <OS/Net/NetServer.h>

#include "ClientMessageResender.h"
#include <tasks/tasks.h>
#define MASTER_PORT 27900
namespace MM {
Expand All @@ -17,6 +17,8 @@ namespace QR {
~Server();
void tick();
Driver *findDriverByAddress(OS::Address address);

ClientMessageResender client_message_resender;
private:
};
}
Expand Down
5 changes: 4 additions & 1 deletion code/qr/tasks/ClientMessageAck.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#include "../server/v2.h"
#include <server/QRDriver.h>
#include <OS/gamespy/gsmsalg.h>

#include <server/QRServer.h>
#include <jansson.h>

namespace MM {
Expand Down Expand Up @@ -38,6 +38,9 @@ namespace MM {
if(request.callback)
request.callback(response);

QR::Server *server = (QR::Server *)uv_loop_get_data(uv_default_loop());
server->client_message_resender.remove_message(request.server.id, &request.from_address);

return true;
}

Expand Down
8 changes: 1 addition & 7 deletions code/qr/tasks/Handle_QRMessage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,6 @@ namespace MM {
instance_key = json_integer_value(instance_key_json);
}

int message_key = 0;
json_t *message_key_json = json_object_get(root, "identifier");
if(message_key_json && json_is_integer(message_key_json)) {
message_key = json_integer_value(message_key_json);
}

OS::Buffer message_buffer;
uint8_t *data_out;
size_t data_len;
Expand Down Expand Up @@ -72,7 +66,7 @@ namespace MM {
const char *type_str = json_string_value(type);
if(stricmp(type_str, "client_message") == 0) {
if(version == 2) {
driver->send_client_message(version, to_address, instance_key, message_key, message_buffer);
server->client_message_resender.put_message(instance_key, message_buffer, to_address, driver);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion code/qr/tasks/tasks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
namespace MM {
const char *mm_channel_exchange = "openspy.master";

const char *mm_client_message_routingkey = "qr.message";
const char *mm_client_message_routingkey = "client.message";
const char *mm_server_event_routingkey = "server.event";
const char *mm_server_client_acks_routingkey = "client-messages.acks";

Expand Down
19 changes: 18 additions & 1 deletion code/serverbrowsing/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,29 @@ int main() {
uv_timer_init(uv_default_loop(), &tick_timer);

OS::Init("serverbrowsing");
g_gameserver = new SBServer();

char address_buff[256];
char port_buff[16];
size_t temp_env_sz = sizeof(address_buff);

OS::Address qr_address;
if(uv_os_getenv("OPENSPY_QR_BIND_ADDR", (char *)&address_buff, &temp_env_sz) != UV_ENOENT) {
temp_env_sz = sizeof(port_buff);

uint16_t port = 27900;
if(uv_os_getenv("OPENSPY_QR_BIND_PORT", (char *)&port_buff, &temp_env_sz) != UV_ENOENT) {
port = atoi(port_buff);
}
std::string qr_addr = address_buff;
qr_address = OS::Address{qr_addr};
qr_address.port = htons(port);

} else {
OS::LogText(OS::ELogLevel_Warning, "Missing QR bind address environment variable");
}

g_gameserver = new SBServer(qr_address);

if(uv_os_getenv("OPENSPY_SBV1_BIND_ADDR", (char *)&address_buff, &temp_env_sz) != UV_ENOENT) {
temp_env_sz = sizeof(port_buff);

Expand Down
2 changes: 1 addition & 1 deletion code/serverbrowsing/server/SBServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#include "SBServer.h"
#include "SBDriver.h"
#include <OS/OpenSpy.h>
SBServer::SBServer() : INetServer() {
SBServer::SBServer(const OS::Address& qr_address) : INetServer(), qr_address{qr_address} {
uv_loop_set_data(uv_default_loop(), this);
}
SBServer::~SBServer() {
Expand Down
4 changes: 3 additions & 1 deletion code/serverbrowsing/server/SBServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@

class SBServer : public INetServer {
public:
SBServer();
SBServer(const OS::Address& qr_address);
~SBServer();
void tick();

void OnNewServer(MM::Server server);
void OnUpdateServer(MM::Server server);
void OnDeleteServer(MM::Server server);

const OS::Address qr_address;
private:
};
#endif //_SBSERVER_H
Loading