-
Notifications
You must be signed in to change notification settings - Fork 0
/
udp_reciver.cpp
123 lines (111 loc) · 4.56 KB
/
udp_reciver.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
//
// Created by eliwas on 14.10.2019.
//
#include "udp_reciver.h"
void udp_reciver::run() {
ds.bind(sa);
logger.information("started UDP thread");
init_postpacket();
while (!kill){
int size = ds.available();
if( size >= sizeof(PacketHeader)){
std::unique_ptr<unsigned char[]> buffer = make_unique<unsigned char[]>(size+1);
ds.receiveBytes(buffer.get(), size);
auto header = (PacketHeader*)buffer.get();
if(header->m_packetFormat == 2018){
telem->Header_mut.lock();
memcpy(&telem->Header, buffer.get(), sizeof(PacketHeader));
telem->Header_mut.unlock();
if(header->m_packetId == PacketType::CAR_SETUPS_PACKET){
telem->CarSetups_mut.lock();
memcpy(&telem->CarSetupData, buffer.get(), sizeof(PacketCarSetupData));
telem->CarSetups_mut.unlock();
}
else if(header->m_packetId == PacketType::CAR_STATUS){
telem->CarStatus_mut.lock();
memcpy(&telem->CarStatusData, buffer.get(), sizeof(PacketCarStatusData));
telem->CarStatus_mut.unlock();
}
else if(header->m_packetId == PacketType::CAR_TELEMETRY_PACKET){
telem->CarTelemetry_mut.lock();
memcpy(&telem->CarTelemetryData, buffer.get(), sizeof(PacketCarTelemetryData));
telem->CarTelemetry_mut.unlock();
}
else if(header->m_packetId == PacketType::EVENT_PACKET){
telem->Event_mut.lock();
memcpy(&telem->EventData, buffer.get(), sizeof(PacketEventData));
telem->Event_mut.unlock();
}
else if(header->m_packetId == PacketType::LAP_DATA_PACKET){
telem->LapData_mut.lock();
memcpy(&telem->LapData, buffer.get(), sizeof(PacketLapData));
telem->LapData_mut.unlock();
}
else if(header->m_packetId == PacketType::MOTION_PACKET){
telem->Motion_mut.lock();
memcpy(&telem->MotionData, buffer.get(), sizeof(PacketMotionData));
telem->Motion_mut.unlock();
}
else if(header->m_packetId == PacketType::PARTICIPANTS_PACKET){
telem->Participants_mut.lock();
memcpy(&telem->ParticipantsData, buffer.get(), sizeof(PacketParticipantsData));
telem->Participants_mut.unlock();
}
else if(header->m_packetId == PacketType::SESSION_PACKET){
telem->Session_mut.lock();
memcpy(&telem->SessionData, buffer.get(), sizeof(PacketSessionData));
telem->Session_mut.unlock();
}
this->on_new_packet();
} else{
logger.warning("invalid packet format: " + to_string(header->m_packetFormat));
}
} else if(size != 0){
logger.warning("got invalid packet of size " + to_string(size));
char a;
ds.receiveBytes(&a, 1);
}
}
tp.joinAll();
}
inline void udp_reciver::init_postpacket() {
post_packet[PacketType::LAP_DATA_PACKET].emplace_back(make_unique<timing_handler>(telem));
int max_post_packet_threads = 16;
try {
max_post_packet_threads = conf.getInt("UDP.max_post_packet_threads");
}
catch(Poco::NotFoundException& e){}
tp.addCapacity(max_post_packet_threads-tp.capacity());
}
void udp_reciver::on_new_packet() {
telem->Header_mut.lock();
auto packid = telem->Header.m_packetId;
telem->Header_mut.unlock();
telem->Session_mut.lock();
auto session_dat = telem->SessionData;
telem->Session_mut.unlock();
if(current_session_UID != session_dat.m_header.m_sessionUID){
tp.joinAll();
for(auto &i: post_packet){
for(auto &a: i){
a->on_new_session(session_dat);
}
}
current_session_UID = session_dat.m_header.m_sessionUID;
}
for(unsigned long a = 0; a<post_packet[packid].size(); a++){
const auto &i = post_packet[packid][a];
try{
if(!i->running){
tp.start(*i);
}
}
catch(const Poco::NoThreadAvailableException& e){
while(tp.available()<=0){}
a--;
}
catch(const Poco::Exception& e){
logger.fatal(e.what());
}
}
}