Good evening

There are many articles online on how to create a simple TCP or HTTP server in Golang. After all, backend is one of its specialties. But I’ve had a hard time finding any recipes for the processing of binary protocols. Specifically, when you have messages that all have a header of a known fixed size and a data body of an arbitrary size denoted in the header. Today, we’re going to make a simple server program exactly for that.

Let’s invent the test protocol

It can be anything, but for this example it will be the following. The header will always be 7 bytes long. The first byte, that we call a prefix is always the same, say, &. The next two bytes denote the length of the data payload (without header!), as a little endian integer. The final four bytes are unix time when this data packet was created; again, as a little-endian integer. And then we have the data. If the packet does not start with &, the server should close the connection with the client.

TCP sockets provide integrity of data and preservance of order. Yet, there is no guarantee that the data block read by our server (in one conn.Read() call) from socket will be a full packet right away; nor can we be sure that only one packet comes at a time, as several packets may come “glued” together, so the server will have to cut them apart and process them separately. That’s what creates problems with some conventional approaches shown in most tutorials. We have to implement some kind of a buffer and an algorithm that will wait for more data if it is too short, and make several processing runs if the data received so far accomodates several packets.

The program

First things first, let’s write the main.

package main

import (
	"bytes"
	"encoding/binary"
	"fmt"
	"io"
	"net"
	"sync/atomic"

	"github.com/pkg/profile"
)

//'&' prefix,
//two bytes for little-endian length of body (excluding header),
//four bytes for time of packet generation (as little-endian Unix timestamp)
const headerSize = 7
const packetPrefix byte = '&'

//simply for debug purposes
var counter uint64

//dummy function simulating packet processing
func processPacket(packet []byte) {
	//optional sleeping to simulate processing delay
	//time.Sleep(time.Millisecond * 3)

	//just a debug counter to track the total number of processed packets
	atomic.AddUint64(&counter, 1)
	count := atomic.LoadUint64(&counter)
	if count%1000000 == 0 {
		fmt.Println(count)
	}
}

func main() {
	//A simple way of profiling memory allocated during runtime by various functions
	defer profile.Start(profile.MemProfile).Stop()

	listener, _ := net.ListenTCP("tcp", &net.TCPAddr{Port: 9999})

	//Main connection-managing loop
	for {
		conn, err := listener.AcceptTCP()

		if err != nil {
			fmt.Println("Could not connect", err.Error())
			//not calling `conn.Close()` because `conn` is `nil` in case of error.
			//See `AcceptTCP` source.
			continue
		}

		go processorRoutine(conn)
	}
}

The TCP server will listen on the port 9999. I have added a dummy routine which will receive and “process” a full packet (with the complete header and data body that has the length written in the header). I also added memory profiling which I will mention in a later section.

So, this server creates a separate processorRoutine for every new client connected to it. What does it look like?

The routine for a client

This routine will manage the data structures that will store the incoming bytes. At first I thought about using the Reader from bufio package, but I had to create way too much boilerplate to make it work properly, and I’m aiming for simplicity. So let’s use simpler data structures.

const deviceReadBufferSize = 1024

func processorRoutine(conn *net.TCPConn) {
	defer conn.Close()

	// Convenient storage
	localBuffer := new(bytes.Buffer)

	// Temporary storage for reading from socket.
	readBuf := make([]byte, deviceReadBufferSize)

	for {
		// Get the data. There can be more than one packets glued together.
		dataLen, err := conn.Read(readBuf)

		if err != nil {
			if err == io.EOF {
				fmt.Println("Connection closed by client!")
				break
			}
		}

		//dumping to buffer for further processing
		localBuffer.Write(readBuf[:dataLen])

		terminateConnection := processExistingData(localBuffer)
		if terminateConnection {
			break
		}
	}
}

So I use a byte slice to read into from socket, and then I dump its contents into buffer from bytes package, which has some useful methods to simplify the packet integrity analysis.

As a side note, if the amount of data that came into the socket is larger than readBuf can store, more of it will simply be read on the next iteration.

Finally, I pass the buffer (by reference) to the processing function we are yet to define. If it returns false we should close the connection because the packet is invalid.

Ensuring packet integrity

Finally, the function that checks the length and passes the packet to the aforementiond (dummy) processing function.

func processExistingData(data *bytes.Buffer) (terminateConnection bool) {
	terminateConnection = false

	for {

		// if the prefix is wrong - disconnect.
		if data.Bytes()[0] != packetPrefix {
			fmt.Println("Wrong packet prefix! Disconnecting!")
			terminateConnection = true
			return
		}

		// if the header is fully loaded, get the packet length (without header)
		var payloadSize int
		if data.Len() > headerSize {
			payloadSize = int(binary.LittleEndian.Uint16(data.Bytes()[1:3]))
		} else {
			// incomplete header
			return
		}

		packetSize := headerSize + payloadSize

		if data.Len() < packetSize {
			//incomplete packet
			return
		}

		packet := data.Next(packetSize)
		processPacket(packet)

		if data.Len() == 0 {
			return
		}

	}
}

The only scenario when we disconnect from the server side is when a packet does not start with &. If it is too short, we return and give control back to processorRoutine so it would get more data. If there are many packets in the buffer, we reiterate within processExistingData to process them all. The Next() method conveniently returns the packet of the given size and advances the buffer so we could process the rest of the data without thinking about shortening the slices or something like that.

During my first experiments, I had a suspicion that the buffer grows indefinitely, even though Next shortens it. It might have something to do with slices and lack of reallocation of underlying arrays, I thought. I had data.Reset() in that if data.Len() == 0 block; I even recreated the buffer entirely at some point. I’m not sure if it is so in older versions of Go, but as of v1.12 it is not the case. This code works fine without eating up all the RAM.

Running

Before running, I had to increase the limit of file descriptors the server can open by issuing ulimit -Sn 1000000. By default it is 1024 and if I run too many clients, there will be Could not connect accept tcp [::]:9999: accept4: too many open files error.

The client

I wrote the client app in Python. It’s just a quick and dirty prototype and I didn’t feel like doing it in Go. Just threads that send the same packets over and over.

from threading import Thread
import socket
from time import sleep, time

HOST = '127.0.0.1'
PORT = 9999

N_CLIENTS = 10000

def client():
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.connect((HOST, PORT))
        
        while True:
            payload = b'12345'*5
            payloadLen = len(payload)
            
            s.sendall(b'&'+payloadLen.to_bytes(2,'little')+int(time()).to_bytes(4,'little')+payload)
            # sleep(1)

threads = []
for i in range(N_CLIENTS):
    t = Thread(target=client)
    t.start()
    threads.append(t)
    
for t in threads:
    t.join()

Profiling memory consumption

There is a tool called pprof that makes visualization of RAM consumption profiles easy. If you want to know more about the library I use here, check out this article.

When I start the program, I see something like this in the console:

2019/05/29 19:22:52 profile: memory profiling enabled (rate 4096), /tmp/profile059091682/mem.pprof

Then I run go tool pprof --pdf buffer2 $PROFFILE > /tmp/memprofile.pdf from the folder containing my server executable to create the PDF file with the memory profile. $PROFFILE is the filepath at the end of that output above.

I won’t include the whole picture I see in that PDF, only the relevant part.

The relevant part of the memory profile chart, showing the most RAM-consuming structures and the function calls that spawn them

We can see that a lot of RAM is allocated during Write() call from processorRoutine. Essentially, when the buffer runs out of storage, it tries to grow it gracefully by increasing the size of the underlying slice. If it fails to do so, it calls makeSlice which recreates the slice from scratch, and whatever is left in the old slice is simply copied (that old slice will eventually be garbage-collected).

In conclusion

So here’s your recipe for a binary protocol server. It’s a very simple implementation, it does not communicate back to clients, it does not check for lingering connections, it’s here to simply get you started.

Thanks for tuning in!