Home | Projects | Notes > Multi-Threading (POSIX Threads) > Listener Threads
It is a common scenario that an application needs to constantly listen to external events. Those external events can arrive at any time, and the application needs to be able to process them upon their arrival.
Applications can use threads to delegate its responsibilities to listen to external events. This way the process can continue carrying out the important task while the listener threads are waiting for the external events in the blocking state.
Threads themselves can take care of the external events or they can pass those events to the process so the process can handle them. It depends on the requirements of the application.
In this notes, we will implement an application that deploys listener threads to handle multiple network UDP sockets, and user inputs only. Once you learn more about the kernel programming and interprocess communication, you can apply the same concept to implement the threads handling kernel events or events from other processes.
network_utils.h
xxxxxxxxxx
411/*
2 * File Name : network_utils.h
3 * Description : Interface for Network Utils
4 * Author : Modified by Kyungjae Lee
5 * (Original: Abhishek Sagar - Juniper Networks)
6 * Date Created : 01/07/2023
7 */
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28typedef void (*recv_fn_cb)(char *, /* msg recvd */
29 uint32_t, /* recvd msg size */
30 char *, /* Sender's IP address */
31 uint32_t); /* Sender's Port number */
32
33pthread_t* udp_server_create_and_start(char *ip_addr, uint32_t udp_port_no, recv_fn_cb recv_fn);
34int send_udp_msg(char *dest_ip_addr, uint32_t udp_port_no, char *msg, uint32_t msg_size);
35
36/* General Nw utilities */
37
38char* network_covert_ip_n_to_p(uint32_t ip_addr, char *output_buffer);
39uint32_t network_covert_ip_p_to_n(char *ip_addr);
40
41/* __NETWORK_UTILS__ */
network_utils.c
xxxxxxxxxx
1561/*
2 * File Name : network_utils.c
3 * Description : Implementation of Network Utils
4 * (This file contains routines to work with network sockets program.)
5 * Author : Modified by Kyungjae Lee
6 * (Original: Abhishek Sagar - Juniper Networks)
7 * Date Created : 01/07/2023
8 */
9
10
11
12/*
13 * definition of 'thread argument package' data structure
14 * - this data structure contains all the information that needs to be passed to the thread function
15 */
16typedef struct thread_arg_pkg_
17{
18 char ip_addr[16];
19 uint32_t port_no;
20 recv_fn_cb recv_fn;
21} thread_arg_pkg_t;
22
23/*
24 * UDP Server code
25 */
26
27static void* _udp_server_create_and_start(void *arg)
28{
29 thread_arg_pkg_t *thread_arg_pkg = (thread_arg_pkg_t *)arg; /* typecast back to its original type */
30
31 char ip_addr[16];
32 strncpy(ip_addr, thread_arg_pkg->ip_addr, 16);
33 uint32_t port_no = thread_arg_pkg->port_no;
34 recv_fn_cb recv_fn = thread_arg_pkg->recv_fn;
35
36 free(thread_arg_pkg);
37 thread_arg_pkg = NULL;
38
39 /* create a UDP socket */
40 int udp_sock_fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP );
41
42 if (udp_sock_fd == -1)
43 {
44 printf("Socket Creation Failed\n");
45 return 0;
46 }
47
48 struct sockaddr_in server_addr;
49 server_addr.sin_family = AF_INET;
50 server_addr.sin_port = port_no;
51 server_addr.sin_addr.s_addr = INADDR_ANY;
52
53 if (bind(udp_sock_fd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr)) == -1)
54 {
55 printf("Error : UDP socket bind failed\n");
56 return 0;
57 }
58
59 /* memory space to store the packet received */
60 char *recv_buffer = calloc(1, MAX_PACKET_BUFFER_SIZE);
61
62 struct sockaddr_in client_addr;
63 int bytes_recvd = 0,
64 addr_len = sizeof(client_addr);
65
66 while(1)
67 {
68 memset(recv_buffer, 0, MAX_PACKET_BUFFER_SIZE);
69
70 /* block here to recv data, recvfrom is blocking system call by default */
71 bytes_recvd = recvfrom(udp_sock_fd, recv_buffer, MAX_PACKET_BUFFER_SIZE, 0,
72 (struct sockaddr *)&client_addr, &addr_len);
73
74 /* recv_fn will be executed only when the packet has been received (recv_fn is an application
75 specific function) */
76 recv_fn(recv_buffer, bytes_recvd,
77 network_covert_ip_n_to_p((uint32_t)htonl(client_addr.sin_addr.s_addr), 0),
78 client_addr.sin_port);
79 }
80
81 return 0;
82}
83
84/*
85 * API that creates a listening thread
86 * - When the listener thread receives a packet from the network, this API will invoke the function
87 * passed as a function pointer (the last argument) to pass that packet to the application. The
88 * application will process the packets on its end.
89 */
90pthread_t* udp_server_create_and_start(char *ip_addr, uint32_t udp_port_no, recv_fn_cb recv_fn)
91{
92 pthread_attr_t attr;
93 pthread_t *recv_pkt_thread;
94 thread_arg_pkg_t *thread_arg_pkg;
95
96 pthread_attr_init(&attr);
97 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); /* detached thread */
98
99 /* 'thread argument package' data structure */
100 thread_arg_pkg = calloc(1, sizeof(thread_arg_pkg_t));
101
102 /* pack all data in a single object which needs to be passed as argument to thread fn */
103 strncpy(thread_arg_pkg->ip_addr, ip_addr, 16);
104 thread_arg_pkg->port_no = udp_port_no;
105 thread_arg_pkg->recv_fn = recv_fn;
106
107 recv_pkt_thread = calloc(1, sizeof(pthread_t));
108
109 pthread_create(recv_pkt_thread, &attr, _udp_server_create_and_start, (void *)thread_arg_pkg);
110
111 return recv_pkt_thread; /* ptr to the thread; i.e., thread handle */
112}
113
114int send_udp_msg(char *dest_ip_addr, uint32_t dest_port_no, char *msg, uint32_t msg_size)
115{
116 struct sockaddr_in dest;
117
118 dest.sin_family = AF_INET;
119 dest.sin_port = dest_port_no;
120 struct hostent *host = (struct hostent *)gethostbyname(dest_ip_addr);
121 dest.sin_addr = *((struct in_addr *)host->h_addr);
122 int addr_len = sizeof(struct sockaddr);
123 int sock_fd;
124
125 sock_fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
126
127 if (sock_fd < 0)
128 {
129 printf("socket creation failed, errno = %d\n", errno);
130 return -1;
131 }
132
133 int rc = sendto(sock_fd, msg, msg_size, 0, (struct sockaddr *)&dest, sizeof(struct sockaddr));
134 close(sock_fd);
135 return rc;
136}
137
138char* network_covert_ip_n_to_p(uint32_t ip_addr, char *output_buffer)
139{
140 char *out = NULL;
141 static char str_ip[16];
142 out = !output_buffer ? str_ip : output_buffer;
143 memset(out, 0, 16);
144 ip_addr = htonl(ip_addr);
145 inet_ntop(AF_INET, &ip_addr, out, 16);
146 out[15] = '\0';
147 return out;
148}
149
150uint32_t network_covert_ip_p_to_n(char *ip_addr)
151{
152 uint32_t binary_prefix = 0;
153 inet_pton(AF_INET, ip_addr, &binary_prefix);
154 binary_prefix = htonl(binary_prefix);
155 return binary_prefix;
156}
UDP server structure overview:
Create a thread using
pthread_create()
API.Create a socket using
socket()
system call.Bind the socket using
bind()
system call.Create a buffer to store the received packet.
Invoke
recvfrom()
system call to block for the packet reception.Once the packet arrives, notify the application using the application specific callback function
recv_fn()
.
listenter_threads_main.c
xxxxxxxxxx
371/*
2 * File Name : listenter_threads_main.c
3 * Description :
4 * Author : Modified by Kyungjae Lee
5 * (Original: Abhishek Sagar - Juniper Networks)
6 * Date Created : 01/07/2023
7 */
8
9
10
11
12
13/* packet processing function simplified for demonstration purpose */
14void pkt_recv_fn(char *pkt, uint32_t pkt_size, char *sender_ip, uint32_t port_no)
15{
16 printf("%s() : pkt recvd = %s, pkt size = %u\n", __FUNCTION__, pkt, pkt_size);
17}
18
19/* pointers to listener threads */
20pthread_t *listener1 = NULL;
21pthread_t *listener2 = NULL;
22
23int main(int argc, char **argv)
24{
25 /* create the 1st listener thread */
26 printf("Listening on UDP port no 3000\n");
27 listener1 = udp_server_create_and_start("127.0.0.1", 3000, pkt_recv_fn);
28
29 /* create the 2nd listener thread */
30 printf("Listening on UDP port no 3001\n");
31 listener2 = udp_server_create_and_start("127.0.0.1", 3001, pkt_recv_fn);
32
33 pthread_exit(0); /* this API allows other threads to continue their execution even though the main
34 thread terminates */
35
36 return 0;
37}
udp_sender.c
xxxxxxxxxx
171/*
2 * File Name : udp_sender.c
3 * Description :
4 * Author : Modified by Kyungjae Lee
5 * (Original: Abhishek Sagar - Juniper Networks)
6 * Date Created : 01/07/2023
7 */
8
9
10
11int main(int argc, char **argv) {
12
13 printf("Dest = [%s,%d] \n", argv[1], atoi(argv[2]));
14 send_udp_msg(argv[1], atoi(argv[2]), argv[3], strlen( argv[3] ));
15
16 return 0;
17}
Shell script to compile this project:
xxxxxxxxxx
91# compile.sh
2
3rm *.o
4rm *exe
5gcc -g -c network_utils.c -o network_utils.o
6gcc -g -c listener_threads_main.c -o listener_threads_main.o
7gcc -g -c udp_sender.c -o udp_sender.o
8gcc -g listener_threads_main.o network_utils.o -o listener_threads_main.exe -lpthread
9gcc -g udp_sender.o network_utils.o -o udp_sender.exe -lpthread
Terminating .exe
does not necessarily mean that the file is Windows executable. .exe
was put on purpose to make it easier to delete the existing executables when re-running this shell script.
Execution result:
Shell 1
xxxxxxxxxx
51$ ./listener_threads_main.exe
2Listening on UDP port no 3000
3Listening on UDP port no 3001
4pkt_recv_fn() : pkt recvd = hello_msg, pkt size = 9
5pkt_recv_fn() : pkt recvd = bye_msg, pkt size = 7
Shell 2
xxxxxxxxxx
41$ ./udp_sender.exe 127.0.0.1 3000 hello_msg
2Dest = [127.0.0.1,3000]
3$ ./udp_sender.exe 127.0.0.1 3000 bye_msg
4Dest = [127.0.0.1,3000]
Notice that the thread in the program above is not designed for cancellation. Use the thread cancellation technique you learned in the previous section to make those threads in the program cancellable.
xxxxxxxxxx
11/* write your code here */
Sagar, A. (2022). Part A - Multithreading & Thread Synchronization - Pthreads [Video file]. Retrieved from https://www.udemy.com/course/multithreading_parta/