ARB
arb_cs.cxx
Go to the documentation of this file.
1 // ============================================================= //
2 // //
3 // File : arb_cs.cxx //
4 // Purpose : Basics for client/server communication //
5 // //
6 // Coded by Ralf Westram (coder@reallysoft.de) in March 2011 //
7 // Institute of Microbiology (Technical University Munich) //
8 // http://www.arb-home.de/ //
9 // //
10 // ============================================================= //
11 
12 #include "arb_cs.h"
13 #include "arb_msg.h"
14 #include "arb_pattern.h"
15 #include "arb_string.h"
16 
17 #include <smartptr.h>
18 
19 #include <unistd.h>
20 #include <netdb.h>
21 #include <sys/types.h>
22 #include <sys/socket.h>
23 #include <sys/un.h>
24 #include <sys/stat.h>
25 #include <netinet/tcp.h>
26 
27 // We need one of the below to prevent SIGPIPE on writes to
28 // closed socket. For systems that have neither (Solaris),
29 // we'd need to implement ignoring the signal in the write
30 // loop (not done).
31 #ifndef SO_NOSIGPIPE
32 #ifndef MSG_NOSIGNAL
33 #error Neither SO_NOSIGPIPE nor MSG_NOSIGNAL available!
34 #endif
35 #endif
36 
37 static void arb_gethostbyname(const char *name, struct hostent *& he, GB_ERROR& err) {
38  he = gethostbyname(name);
39  // Note: gethostbyname is marked obsolete.
40  // replacement getnameinfo seems less portable atm.
41 
42  if (he) {
43  err = NULp;
44  }
45  else {
46  err = GBS_global_string("Cannot resolve hostname: '%s' (h_errno=%i='%s')",
47  name, h_errno, hstrerror(h_errno));
48  }
49 }
50 
51 const char *arb_gethostname() {
52  static SmartCharPtr hostname;
53  if (hostname.isNull()) {
54  char buffer[4096];
55  gethostname(buffer, 4095);
56  hostname = ARB_strdup(buffer);
57  }
58  return &*hostname;
59 }
60 
61 size_t arb_socket_read(int socket, char* ptr, size_t size) {
62  size_t to_read = size;
63  while(to_read) {
64  ssize_t read_len = read(socket, ptr, to_read);
65  if (read_len <= 0) { // read failed
66  // FIXME: GB_export_error!
67  return 0;
68  }
69  ptr += read_len;
70  to_read -= read_len;
71  }
72  return size;
73 }
74 
75 ssize_t arb_socket_write(int socket, const char* ptr, size_t size) {
76  size_t to_write = size;
77 
78  while (to_write) {
79 #ifdef MSG_NOSIGNAL
80  // Linux has MSG_NOSIGNAL, but not SO_NOSIGPIPE
81  // prevent SIGPIPE here
82  ssize_t write_len = send(socket, ptr, to_write, MSG_NOSIGNAL);
83  // Note: if valgrind warns about uninitialized bytes sent,
84  // one common reason are parameters passed as int (instead of long).
85  // Affected functions are aisc_put, aisc_nput and aisc_create.
86 #else
87  ssize_t write_len = write(socket, ptr, to_write);
88 #endif
89  if (write_len <= 0) { // write failed
90  if (errno == EPIPE) {
91  fputs("pipe broken\n", stderr);
92  }
93 
94  // FIXME: GB_export_error!
95  return -1;
96  }
97  ptr += write_len;
98  to_write -= write_len;
99  }
100  return 0;
101 }
102 
103 static GB_ERROR arb_open_unix_socket(char* name, bool do_connect, int *fd);
104 static GB_ERROR arb_open_tcp_socket(char* name, bool do_connect, int *fd);
105 
124 GB_ERROR arb_open_socket(const char* name, bool do_connect, int *fd, char** filename_out) {
125  GB_ERROR error = NULp;
126  *fd = 0;
127 
128  if (!name || strlen(name) == 0) {
129  error = "Error opening socket: empty name";
130  }
131  else if (name[0] == ':') {
132  // expand variables in path
133  char *filename = arb_shell_expand(name+1);
134  error = GB_incur_error();
135  if (!error) error = arb_open_unix_socket(filename, do_connect, fd);
136 
137  if (error) {
138  free(filename);
139  }
140  else {
141  reassign(*filename_out, filename);
142  }
143  }
144  else {
145  char *socket_name = ARB_strdup(name);
146  error = arb_open_tcp_socket(socket_name, do_connect, fd);
147  free(socket_name);
148  freenull(*filename_out);
149  }
150 
151  if (error) {
152  *fd = 0;
153  }
154  else {
155  arb_assert(*fd>0);
156  }
157 
158  return error;
159 }
160 
161 static GB_ERROR arb_open_unix_socket(char* filename, bool do_connect, int *fd) {
162  GB_ERROR error = NULp;
163 
164  // create structure for connect/bind
165  sockaddr_un unix_socket;
166  unix_socket.sun_family = AF_UNIX;
167  if (strlen(filename)+1 > sizeof(unix_socket.sun_path)) {
168  error = GBS_global_string("Failed to create unix socket: "
169  "\"%s\" is longer than the allowed %zu characters",
170  filename, sizeof(unix_socket.sun_path));
171  }
172  else {
173  strncpy(unix_socket.sun_path, filename, sizeof(unix_socket.sun_path));
174 
175  // create socket
176  *fd = socket(PF_UNIX, SOCK_STREAM, 0);
177  if (*fd < 0) {
178  error = GBS_global_string("Failed to create unix socket: %s", strerror(errno));
179  }
180  else {
181  // connect or bind socket
182  if (do_connect) {
183  if (connect(*fd, (sockaddr*)&unix_socket, sizeof(sockaddr_un))) {
184  if (errno == ECONNREFUSED || errno == ENOENT) {
185  error = "";
186  }
187  else {
188  error = GBS_global_string("Failed to connect unix socket \"%s\": %s (%i)",
189  filename, strerror(errno), errno);
190  }
191  }
192  }
193  else {
194  struct stat stt;
195  if (!stat(filename, &stt)) {
196  if (!S_ISSOCK(stt.st_mode)) {
197  error = GBS_global_string("Failed to create unix socket at \"%s\": file exists"
198  " and is not a socket", filename);
199  }
200  else if (unlink(filename)) {
201  error = GBS_global_string("Failed to create unix socket at \"%s\": cannot remove"
202  " existing socket", filename);
203  }
204  }
205  if (!error && bind(*fd, (sockaddr*)&unix_socket, sizeof(sockaddr_un))) {
206  error = GBS_global_string("Failed to bind unix socket \"%s\": %s",
207  filename, strerror(errno));
208  }
209  }
210 
211 #ifdef SO_NOSIGPIPE
212  if (!error) {
213  // OSX has SO_NOSIGPIPE but not MSG_NOSIGNAL
214  // prevent SIGPIPE here:
215  int one = 1;
216  if (setsockopt(*fd, SOL_SOCKET, SO_NOSIGPIPE, (const char *)&one, sizeof(one))){
217  fprintf(stderr, "Warning: setsockopt(...NOSIGPIPE...) failed: %s", strerror(errno));
218  }
219  }
220 #endif
221 
222  if (error) {
223  close(*fd);
224  *fd = -1;
225  }
226  }
227  }
228 
229  return error;
230 }
231 
232 static GB_ERROR arb_open_tcp_socket(char* name, bool do_connect, int *fd) {
233  GB_ERROR error = NULp;
234 
235  // create socket
236  *fd = socket(PF_INET, SOCK_STREAM, 0);
237  if (*fd < 0) {
238  error = GBS_global_string("Failed to create tcp socket: %s", strerror(errno));
239  }
240  else {
241  // create sockaddr struct
242  sockaddr_in tcp_socket;
243  tcp_socket.sin_family = AF_INET;
244 
245  struct hostent *he;
246  // get port and host
247  char *p = strchr(name, ':');
248  if (!p) { // <port>
249  tcp_socket.sin_port = htons(atoi(name));
250  arb_gethostbyname(arb_gethostname(), he, error);
251  }
252  else { // <host>:<port>
253  tcp_socket.sin_port = htons(atoi(p+1));
254  p[0]='\0';
255  arb_gethostbyname(name, he, error);
256  p[0]=':';
257  }
258  if (tcp_socket.sin_port == 0) {
259  error = "Cannot open tcp socket on port 0. Is the port name malformed?";
260  }
261  if (!error) {
262  memcpy(&tcp_socket.sin_addr, he->h_addr_list[0], he->h_length);
263 
264  int one = 1;
265  if (do_connect) {
266  if (connect(*fd, (sockaddr*)&tcp_socket, sizeof(tcp_socket))) {
267  if (errno == ECONNREFUSED) {
268  error = "";
269  } else {
270  error = GBS_global_string("Failed to connect TCP socket \"%s\": %s",
271  name, strerror(errno));
272  }
273  }
274  }
275  else { // no connect (bind)
276  if (setsockopt(*fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one))) {
277  fprintf(stderr, "Warning: setsockopt(...REUSEADDR...) failed: %s", strerror(errno));
278  }
279  if (bind(*fd, (sockaddr*)&tcp_socket, sizeof(tcp_socket))) {
280  error = GBS_global_string("Failed to bind TCP socket \"%s\": %s",
281  name, strerror(errno));
282  }
283  }
284 
285  if (setsockopt(*fd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one))) {
286  fprintf(stderr, "Warning: setsockopt(...TCP_NODELAY...) failed: %s", strerror(errno));
287  }
288  }
289 
290  if (error) {
291  close(*fd);
292  *fd = -1;
293  }
294  }
295  return error;
296 }
297 
299 
300 #ifdef UNIT_TESTS
301 
302 #ifndef TEST_UNIT_H
303 # include <test_unit.h>
304 #endif
305 #include <sys/wait.h>
306 #include <arb_sleep.h>
307 
308 static int echo_server(const char* portname) {
309  int mypid = fork();
310  if (mypid) return mypid;
311 
312  int fd;
313  char *filename = NULp;
314  GB_ERROR error = arb_open_socket(portname, false, &fd, &filename);
315  if (error) {
316  exit(1);
317  }
318 
319  if (listen(fd, 1)) {
320  exit(2);
321  }
322 
323  {
324  int cli_fd = accept(fd, NULp, NULp);
325  if (cli_fd < 0) {
326  exit(3);
327  }
328 
329  char buf[500];
330  ssize_t n;
331  while(1) {
332  n = sizeof(buf);
333  n = arb_socket_read(cli_fd, buf, n);
334  if (n == 0) break;
335  n = arb_socket_write(cli_fd, buf, n);
336  if (n == -1) break;;
337  if (strcmp(buf, "exit") == 0) break;
338  }
339 
340  close(cli_fd);
341  }
342 
343  close(fd);
344  if (filename) {
345  unlink(filename);
346  free(filename);
347  }
348 
349  exit(0);
350 }
351 
352 #if !defined(DARWIN)
353 // TEST_DISABLED_OSX: this test may fail randomly (always timeouts under OSX)
354 void TEST_open_socket() {
355  int fd;
356  char *filename = NULp;
357  int server_pid, server_status;
358 
359  const int XTRABUF = 20; // silences buffer overflow warnings
360 
361  // set up port names
362  char *unix_socket = arb_shell_expand(":$ARBHOME/UNIT_TESTER/sok/test.socket");
363  char tcp_socket[sizeof("65536")+XTRABUF], tcp_socket2[sizeof("localhost:65536")+XTRABUF];
364  {
365  // select port to use for tcp_sockets randomly to reduce probability
366  // of conflicts between parallel builds on same host
367  srand(time(NULp)+getpid());
368 
369  const int RANGE = 100;
370  const int PORT_MIN = 32039;
371  const int PORT_MAX = PORT_MIN+RANGE-1;
372 
373  int order[RANGE]; // create random order of numbers in [0..RANGE-1]
374  {
375  int pos[RANGE];
376  for (int p = 0; p<RANGE; ++p) pos[p] = p;
377 
378  for (int o = 0; o<RANGE; ++o) {
379  int limit = RANGE-o;
380  int take = (rand()*double(limit))/RAND_MAX;
381  TEST_EXPECT(take>=0 && take<limit);
382  order[o] = pos[take];
383  for (int t = take+1; t<limit; ++t) {
384  pos[t-1] = pos[t];
385  }
386  }
387  }
388 
389  int port = -1;
390  for (int o = 0; o<RANGE; o++) {
391  port = PORT_MIN+order[o];
392  TEST_EXPECT(port>=PORT_MIN && port<=PORT_MAX);
393 
394  snprintf(tcp_socket, sizeof(tcp_socket), "%i", port);
395  const char *err = arb_open_socket(tcp_socket, true, &fd, &filename);
396  if (!err) { // connected
397  TEST_EXPECT_EQUAL(close(fd), 0);
398  }
399  else if (err[0] == '\0') { // could not connect
400  // found a free socket
401  break;
402  }
403  else { // other error
404  TEST_EXPECT_NULL(err);
405  }
406  }
407  TEST_REJECT(port == -1);
408  snprintf(tcp_socket2, sizeof(tcp_socket2), "localhost:%i", port);
409  }
410 
411  // Test opening server sockets
412  TEST_EXPECT_NULL(arb_open_socket(tcp_socket, false, &fd, &filename));
413  TEST_EXPECT(fd>0);
414  TEST_EXPECT_NULL(filename);
415  TEST_EXPECT_EQUAL(close(fd), 0);
416 
417  TEST_EXPECT_NULL(arb_open_socket(tcp_socket2, false, &fd, &filename));
418  TEST_EXPECT(fd>0);
419  TEST_EXPECT_NULL(filename);
420  TEST_EXPECT_EQUAL(close(fd), 0);
421 
422  TEST_EXPECT_NULL(arb_open_socket(unix_socket, false, &fd, &filename));
423  TEST_EXPECT(fd>0);
424  TEST_REJECT_NULL(filename);
425  TEST_EXPECT_EQUAL(close(fd), 0);
426  TEST_EXPECT_EQUAL(unlink(filename), 0);
427  freenull(filename);
428 
429  // Test connecting to existing tcp socket
430  server_pid = echo_server(tcp_socket);
431  TEST_REJECT_ZERO(server_pid);
432 
433  {
434  ARB_timeout maxconnect(10, SEC);
435  GB_ERROR res = "";
436 
437  while (!maxconnect.passed()) {
438  res = arb_open_socket(tcp_socket, true, &fd, &filename);
439  if (!res || res[0]) break; // accept 'could not connect'
440  ARB_sleep(30, MS);
441  }
442  TEST_EXPECT_NULL(res); // randomly failed (in older revisions)
443  }
444  TEST_EXPECT(fd>0);
445  TEST_EXPECT_NULL(filename);
446  TEST_EXPECT_EQUAL(close(fd), 0);
447  TEST_EXPECT_EQUAL(server_pid, waitpid(server_pid, &server_status, 0));
448 
449  // Test connecting to closed socket
450  TEST_EXPECT_EQUAL("", arb_open_socket(tcp_socket, true, &fd, &filename));
451  TEST_EXPECT_EQUAL("", arb_open_socket(unix_socket, true, &fd, &filename));
452 
453  // Test connecting to existing unix socket
454  server_pid = echo_server(unix_socket);
455  TEST_REJECT_ZERO(server_pid);
456  {
457  ARB_timeout maxconnect(10, SEC);
458  GB_ERROR res = "";
459 
460  while (!maxconnect.passed()) {
461  res = arb_open_socket(unix_socket, true, &fd, &filename);
462  if (!res || res[0]) break; // accept 'could not connect'
463  ARB_sleep(30, MS);
464  }
465  TEST_EXPECT_NULL(res); // randomly failed (in older revisions)
466  }
467  TEST_EXPECT(fd>0);
468 
469  // Test read/write
470  char send_buf[500], recv_buf[500];
471  for (unsigned int i=0; i < sizeof(send_buf); i++) {
472  send_buf[i]=i % 64 + '0';
473  }
474  send_buf[sizeof(send_buf)-1]='\0';
475 
476  TEST_EXPECT_NULL(arb_socket_write(fd, send_buf, sizeof(send_buf)));
477  TEST_EXPECT_EQUAL(sizeof(recv_buf), arb_socket_read(fd, recv_buf, sizeof(recv_buf)));
478  TEST_EXPECT_EQUAL(send_buf, recv_buf);
479  TEST_EXPECT_NULL(arb_socket_write(fd, send_buf, sizeof(send_buf)));
480  TEST_EXPECT_EQUAL(sizeof(recv_buf), arb_socket_read(fd, recv_buf, sizeof(recv_buf)));
481  TEST_EXPECT_EQUAL(send_buf, recv_buf);
482 
483  // Test sigpipe (writing to closed socket)
484  // tell server to die:
485  strcpy(send_buf, "exit");
486  TEST_EXPECT_NULL(arb_socket_write(fd, send_buf, sizeof(send_buf)));
487  TEST_EXPECT_EQUAL(sizeof(recv_buf), arb_socket_read(fd, recv_buf, sizeof(recv_buf)));
488  // wait for server to die
489  TEST_EXPECT_EQUAL(server_pid, waitpid(server_pid, &server_status, 0));
490  // try writing to closed pipe
491  TEST_EXPECT_EQUAL(-1, arb_socket_write(fd, send_buf, sizeof(send_buf)));
492 
493  TEST_EXPECT_EQUAL(close(fd), 0);
494  freenull(filename);
495 
496  free(unix_socket);
497 }
498 TEST_PUBLISH(TEST_open_socket);
499 
500 #endif
501 
502 #endif // UNIT_TESTS
503 
504 
#define arb_assert(cond)
Definition: arb_assert.h:245
const char * GB_ERROR
Definition: arb_core.h:25
GB_ERROR GB_incur_error()
Definition: arb_msg.h:49
static GB_ERROR arb_open_tcp_socket(char *name, bool do_connect, int *fd)
Definition: arb_cs.cxx:232
#define TEST_REJECT_ZERO(cond)
Definition: test_unit.h:1087
static GB_ERROR arb_open_unix_socket(char *name, bool do_connect, int *fd)
Definition: arb_cs.cxx:161
const char * arb_gethostname()
Definition: arb_cs.cxx:51
char * ARB_strdup(const char *str)
Definition: arb_string.h:27
GB_ERROR arb_open_socket(const char *name, bool do_connect, int *fd, char **filename_out)
Definition: arb_cs.cxx:124
const char * GBS_global_string(const char *templat,...)
Definition: arb_msg.cxx:203
int gethostname(char *name, int namelen)
char buffer[MESSAGE_BUFFERSIZE]
Definition: seq_search.cxx:34
#define TEST_PUBLISH(testfunction)
Definition: test_unit.h:1517
#define TEST_EXPECT(cond)
Definition: test_unit.h:1328
char * arb_shell_expand(const char *str)
Definition: arb_pattern.cxx:22
#define TEST_REJECT(cond)
Definition: test_unit.h:1330
#define TEST_REJECT_NULL(n)
Definition: test_unit.h:1325
static void error(const char *msg)
Definition: mkptypes.cxx:96
void ARB_sleep(int amount, TimeUnit tu)
Definition: arb_sleep.h:32
fputs(TRACE_PREFIX, stderr)
size_t arb_socket_read(int socket, char *ptr, size_t size)
Definition: arb_cs.cxx:61
#define TEST_EXPECT_NULL(n)
Definition: test_unit.h:1322
Definition: arb_sleep.h:30
#define NULp
Definition: cxxforward.h:116
ssize_t arb_socket_write(int socket, const char *ptr, size_t size)
Definition: arb_cs.cxx:75
Definition: arb_sleep.h:30
#define TEST_EXPECT_EQUAL(expr, want)
Definition: test_unit.h:1294
static void arb_gethostbyname(const char *name, struct hostent *&he, GB_ERROR &err)
Definition: arb_cs.cxx:37