Home | Projects | Notes > Multi-Threading (POSIX Threads) > Listener Threads
It is a common scenario that an application needs to constantly listen to external events. Those external events can arrive at any time, and the application needs to be able to process them upon their arrival.
Applications can use threads to delegate its responsibilities to listen to external events. This way the process can continue carrying out the important task while the listener threads are waiting for the external events in the blocking state.
Threads themselves can take care of the external events or they can pass those events to the process so the process can handle them. It depends on the requirements of the application.

In this notes, we will implement an application that deploys listener threads to handle multiple network UDP sockets, and user inputs only. Once you learn more about the kernel programming and interprocess communication, you can apply the same concept to implement the threads handling kernel events or events from other processes.
network_utils.h
xxxxxxxxxx411/*2 * File Name : network_utils.h3 * Description : Interface for Network Utils4 * Author : Modified by Kyungjae Lee 5 * (Original: Abhishek Sagar - Juniper Networks)6 * Date Created : 01/07/20237 */8
91011
1213141516171819202122232425
2627
28typedef void (*recv_fn_cb)(char *, /* msg recvd */29 uint32_t, /* recvd msg size */30 char *, /* Sender's IP address */31 uint32_t); /* Sender's Port number */32
33pthread_t* udp_server_create_and_start(char *ip_addr, uint32_t udp_port_no, recv_fn_cb recv_fn);34int send_udp_msg(char *dest_ip_addr, uint32_t udp_port_no, char *msg, uint32_t msg_size);35
36/* General Nw utilities */37
38char* network_covert_ip_n_to_p(uint32_t ip_addr, char *output_buffer);39uint32_t network_covert_ip_p_to_n(char *ip_addr);40
41/* __NETWORK_UTILS__ */network_utils.c
xxxxxxxxxx1561/*2 * File Name : network_utils.c3 * Description : Implementation of Network Utils 4 * (This file contains routines to work with network sockets program.)5 * Author : Modified by Kyungjae Lee 6 * (Original: Abhishek Sagar - Juniper Networks)7 * Date Created : 01/07/20238 */9
1011
12/* 13 * definition of 'thread argument package' data structure14 * - this data structure contains all the information that needs to be passed to the thread function 15 */16typedef struct thread_arg_pkg_17{18 char ip_addr[16];19 uint32_t port_no;20 recv_fn_cb recv_fn;21} thread_arg_pkg_t;22
23/* 24 * UDP Server code25 */26
27static void* _udp_server_create_and_start(void *arg)28{29 thread_arg_pkg_t *thread_arg_pkg = (thread_arg_pkg_t *)arg; /* typecast back to its original type */30
31 char ip_addr[16];32 strncpy(ip_addr, thread_arg_pkg->ip_addr, 16);33 uint32_t port_no = thread_arg_pkg->port_no;34 recv_fn_cb recv_fn = thread_arg_pkg->recv_fn;35 36 free(thread_arg_pkg);37 thread_arg_pkg = NULL;38 39 /* create a UDP socket */40 int udp_sock_fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP );41
42 if (udp_sock_fd == -1)43 {44 printf("Socket Creation Failed\n");45 return 0;46 }47
48 struct sockaddr_in server_addr;49 server_addr.sin_family = AF_INET;50 server_addr.sin_port = port_no;51 server_addr.sin_addr.s_addr = INADDR_ANY;52
53 if (bind(udp_sock_fd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr)) == -1) 54 {55 printf("Error : UDP socket bind failed\n");56 return 0;57 }58
59 /* memory space to store the packet received */60 char *recv_buffer = calloc(1, MAX_PACKET_BUFFER_SIZE);61
62 struct sockaddr_in client_addr;63 int bytes_recvd = 0,64 addr_len = sizeof(client_addr);65
66 while(1)67 {68 memset(recv_buffer, 0, MAX_PACKET_BUFFER_SIZE);69
70 /* block here to recv data, recvfrom is blocking system call by default */71 bytes_recvd = recvfrom(udp_sock_fd, recv_buffer, MAX_PACKET_BUFFER_SIZE, 0, 72 (struct sockaddr *)&client_addr, &addr_len);73
74 /* recv_fn will be executed only when the packet has been received (recv_fn is an application75 specific function) */76 recv_fn(recv_buffer, bytes_recvd, 77 network_covert_ip_n_to_p((uint32_t)htonl(client_addr.sin_addr.s_addr), 0),78 client_addr.sin_port); 79 }80 81 return 0;82}83
84/* 85 * API that creates a listening thread86 * - When the listener thread receives a packet from the network, this API will invoke the function87 * passed as a function pointer (the last argument) to pass that packet to the application. The88 * application will process the packets on its end.89 */90pthread_t* udp_server_create_and_start(char *ip_addr, uint32_t udp_port_no, recv_fn_cb recv_fn)91{92 pthread_attr_t attr;93 pthread_t *recv_pkt_thread;94 thread_arg_pkg_t *thread_arg_pkg;95
96 pthread_attr_init(&attr);97 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); /* detached thread */98
99 /* 'thread argument package' data structure */100 thread_arg_pkg = calloc(1, sizeof(thread_arg_pkg_t));101
102 /* pack all data in a single object which needs to be passed as argument to thread fn */103 strncpy(thread_arg_pkg->ip_addr, ip_addr, 16);104 thread_arg_pkg->port_no = udp_port_no;105 thread_arg_pkg->recv_fn = recv_fn;106
107 recv_pkt_thread = calloc(1, sizeof(pthread_t));108
109 pthread_create(recv_pkt_thread, &attr, _udp_server_create_and_start, (void *)thread_arg_pkg);110
111 return recv_pkt_thread; /* ptr to the thread; i.e., thread handle */112}113
114int send_udp_msg(char *dest_ip_addr, uint32_t dest_port_no, char *msg, uint32_t msg_size) 115{ 116 struct sockaddr_in dest;117
118 dest.sin_family = AF_INET;119 dest.sin_port = dest_port_no;120 struct hostent *host = (struct hostent *)gethostbyname(dest_ip_addr);121 dest.sin_addr = *((struct in_addr *)host->h_addr);122 int addr_len = sizeof(struct sockaddr);123 int sock_fd;124
125 sock_fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);126
127 if (sock_fd < 0)128 {129 printf("socket creation failed, errno = %d\n", errno);130 return -1;131 }132
133 int rc = sendto(sock_fd, msg, msg_size, 0, (struct sockaddr *)&dest, sizeof(struct sockaddr));134 close(sock_fd);135 return rc;136}137
138char* network_covert_ip_n_to_p(uint32_t ip_addr, char *output_buffer)139{140 char *out = NULL;141 static char str_ip[16];142 out = !output_buffer ? str_ip : output_buffer;143 memset(out, 0, 16);144 ip_addr = htonl(ip_addr);145 inet_ntop(AF_INET, &ip_addr, out, 16);146 out[15] = '\0';147 return out;148}149
150uint32_t network_covert_ip_p_to_n(char *ip_addr)151{152 uint32_t binary_prefix = 0;153 inet_pton(AF_INET, ip_addr, &binary_prefix);154 binary_prefix = htonl(binary_prefix);155 return binary_prefix;156}UDP server structure overview:
Create a thread using
pthread_create()API.Create a socket using
socket()system call.Bind the socket using
bind()system call.Create a buffer to store the received packet.
Invoke
recvfrom()system call to block for the packet reception.Once the packet arrives, notify the application using the application specific callback function
recv_fn().
listenter_threads_main.c
xxxxxxxxxx371/*2 * File Name : listenter_threads_main.c3 * Description : 4 * Author : Modified by Kyungjae Lee 5 * (Original: Abhishek Sagar - Juniper Networks)6 * Date Created : 01/07/20237 */8
9101112
13/* packet processing function simplified for demonstration purpose */14void pkt_recv_fn(char *pkt, uint32_t pkt_size, char *sender_ip, uint32_t port_no)15{16 printf("%s() : pkt recvd = %s, pkt size = %u\n", __FUNCTION__, pkt, pkt_size);17}18
19/* pointers to listener threads */20pthread_t *listener1 = NULL;21pthread_t *listener2 = NULL;22
23int main(int argc, char **argv)24{25 /* create the 1st listener thread */26 printf("Listening on UDP port no 3000\n");27 listener1 = udp_server_create_and_start("127.0.0.1", 3000, pkt_recv_fn);28 29 /* create the 2nd listener thread */30 printf("Listening on UDP port no 3001\n");31 listener2 = udp_server_create_and_start("127.0.0.1", 3001, pkt_recv_fn);32
33 pthread_exit(0); /* this API allows other threads to continue their execution even though the main34 thread terminates */35 36 return 0;37}udp_sender.c
xxxxxxxxxx171/*2 * File Name : udp_sender.c3 * Description : 4 * Author : Modified by Kyungjae Lee 5 * (Original: Abhishek Sagar - Juniper Networks)6 * Date Created : 01/07/20237 */8
910
11int main(int argc, char **argv) {12
13 printf("Dest = [%s,%d] \n", argv[1], atoi(argv[2]));14 send_udp_msg(argv[1], atoi(argv[2]), argv[3], strlen( argv[3] ));15 16 return 0;17}Shell script to compile this project:
xxxxxxxxxx91# compile.sh2
3rm *.o 4rm *exe5gcc -g -c network_utils.c -o network_utils.o6gcc -g -c listener_threads_main.c -o listener_threads_main.o7gcc -g -c udp_sender.c -o udp_sender.o8gcc -g listener_threads_main.o network_utils.o -o listener_threads_main.exe -lpthread9gcc -g udp_sender.o network_utils.o -o udp_sender.exe -lpthreadTerminating .exe does not necessarily mean that the file is Windows executable. .exe was put on purpose to make it easier to delete the existing executables when re-running this shell script.
Execution result:
Shell 1
xxxxxxxxxx51$ ./listener_threads_main.exe 2Listening on UDP port no 30003Listening on UDP port no 30014pkt_recv_fn() : pkt recvd = hello_msg, pkt size = 95pkt_recv_fn() : pkt recvd = bye_msg, pkt size = 7Shell 2
xxxxxxxxxx41$ ./udp_sender.exe 127.0.0.1 3000 hello_msg2Dest = [127.0.0.1,3000] 3$ ./udp_sender.exe 127.0.0.1 3000 bye_msg4Dest = [127.0.0.1,3000]
Notice that the thread in the program above is not designed for cancellation. Use the thread cancellation technique you learned in the previous section to make those threads in the program cancellable.
xxxxxxxxxx11/* write your code here */
Sagar, A. (2022). Part A - Multithreading & Thread Synchronization - Pthreads [Video file]. Retrieved from https://www.udemy.com/course/multithreading_parta/