Changeset 12 in chevmsgr for trunk/msgclnt.cpp
- Timestamp:
- 08/30/15 15:08:31 (9 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/msgclnt.cpp
r10 r12 9 9 // -------------------------------------------------------------- 10 10 11 typedef struct S WorkerArg11 typedef struct SMessageQWorkerArg 12 12 { 13 13 cf::network::tcp * socket; 14 14 MessageQ * messageQ; 15 } SWorkerArg; 16 17 int worker(void * arg) 18 { 19 SWorkerArg * inst = (SWorkerArg *) arg; 20 21 Protocol::Message parser; 15 } SMessageQWorkerArg; 16 17 typedef struct SCallbackWorkerArg 18 { 19 MessageQ * messageQ; 20 SCallback callback; 21 } SCallbackWorkerArg; 22 23 static int messageQworekr(void * arg) 24 { 25 SMessageQWorkerArg * inst = (SMessageQWorkerArg *)arg; 26 27 Protocol::Message message; 22 28 cf::bin raw; 23 29 … … 27 33 { 28 34 raw = inst->socket->receive(); 29 parser.parse(raw.toString());35 message.parse(raw.toString()); 30 36 } 31 37 catch (cf::exception & e) … … 38 44 } 39 45 40 inst->messageQ->push( parser);46 inst->messageQ->push(message); 41 47 } 42 48 … … 46 52 } 47 53 54 static inline SConversation toCnversation(const Protocol::Message & message) 55 { 56 SConversation c; 57 58 c.sessid = message.get<std::string>(ProtocolType::SESSION_ID); 59 c.message = message.get<std::string>(ProtocolType::MESSAGE); 60 c.sensitive = message.get<int>(ProtocolType::SENSITIVE); 61 c.isError = message.get<bool>(ProtocolType::RESULT); 62 63 c.from = message.get<std::string>(ProtocolType::FROM); 64 65 return c; 66 } 67 68 static inline SOpenSession toOpenSession(const Protocol::Message & message) 69 { 70 SOpenSession o; 71 72 o.sessid = message.get<std::string>(ProtocolType::SESSION_ID); 73 o.idList = message.getList<std::string>(ProtocolType::ID_LIST); 74 75 return o; 76 } 77 78 static int callbackWorker(void * arg) 79 { 80 SCallbackWorkerArg * inst = (SCallbackWorkerArg *)arg; 81 82 Protocol::Message message; 83 84 while (true) 85 { 86 message = inst->messageQ->pop(ProtocolType::CHAT, false); 87 if (message.type() != ProtocolType::NONE) 88 inst->callback.onChat(toCnversation(message)); 89 90 message = inst->messageQ->pop(ProtocolType::OPEN_SESSION, false); 91 if (message.type() != ProtocolType::NONE) 92 inst->callback.onOpenSession(toOpenSession(message)); 93 } 94 95 free(inst); 96 97 return 0; 98 } 99 48 100 // -------------------------------------------------------------- 49 101 50 void MessageQ::push(const Protocol::Message & parser)102 void MessageQ::push(const Protocol::Message & message) 51 103 { 52 104 mutex.lock(); 53 messageQ.push_back( parser);105 messageQ.push_back(message); 54 106 mutex.unlock(); 55 107 } 56 108 57 Protocol::Message MessageQ::pop(const std::string & requestType )58 { 59 while ( true)109 Protocol::Message MessageQ::pop(const std::string & requestType, bool isWait) 110 { 111 while (isWait) 60 112 { 61 113 SYNCHRONIZED(mutex) 62 114 { 63 Protocol::Message parser= messageQ.front();64 65 if ( parser.type() == requestType)115 Protocol::Message message = messageQ.front(); 116 117 if (message.type() == requestType) 66 118 { 67 119 messageQ.erase(messageQ.begin()); 68 120 69 return parser;121 return message; 70 122 } 71 123 } 72 124 } 125 126 return Protocol::Message(); // return dummy NONE 73 127 } 74 128 … … 76 130 77 131 chev::chev() 78 : listener(worker) 132 : listener(messageQworekr), 133 caller(callbackWorker) 79 134 { 80 135 } … … 96 151 socket.connect(host, port); 97 152 98 S WorkerArg * arg = (SWorkerArg *)malloc(sizeof(SWorkerArg));153 SMessageQWorkerArg * arg = (SMessageQWorkerArg *)malloc(sizeof(SMessageQWorkerArg)); 99 154 if (!arg) 100 155 return false; … … 203 258 try 204 259 { 260 idList.insert(idList.begin(), request.getUserID()); 261 205 262 if (idList.size() == 2) 206 263 std::sort(idList.begin(), idList.end()); 207 264 208 std::string concat = idList[0]; 209 210 for (int iter = 1; iter < idList.size(); iter++) 211 concat += DELIMITER + idList[iter]; 265 std::string concat = joinStrings(idList); 212 266 213 267 if (sessionMap.find(concat) == sessionMap.end()) … … 251 305 } 252 306 253 SConversation chev::listen() 254 { 255 SConversation c; 256 Protocol::Message message = messageQ.pop(ProtocolType::CHAT); 257 258 c.sessid = message.get<std::string>(ProtocolType::SESSION_ID); 259 c.message = message.get<std::string>(ProtocolType::MESSAGE); 260 c.sensitive = message.get<int>(ProtocolType::SENSITIVE); 261 c.isError = message.get<bool>(ProtocolType::RESULT); 262 263 c.from = message.get<std::string>(ProtocolType::FROM); 264 265 return c; 266 } 307 bool chev::listen(const SCallback & callback) 308 { 309 SCallbackWorkerArg * arg = (SCallbackWorkerArg *)malloc(sizeof(SCallbackWorkerArg)); 310 if (!arg) 311 return false; 312 313 arg->messageQ = &messageQ; 314 arg->callback = callback; 315 316 caller.start(arg); 317 318 return true; 319 }
Note:
See TracChangeset
for help on using the changeset viewer.