The ROS bag file format is popular in robotics, where it is used to store ROS message data. Several tools for working with ROS bag files exist, such as rosbag (a tool for recording, playing back, and filtering data), rqt_bag (a tool for visualising rosbag data), rostopic (a tool for listing and echoing the content of ROS topics recorded in the bag), and webviz (a browser based tool for playing back and visualising ROS bag files locally or from a cloud service).
ROS bags are designed to work natively within the ROS ecosystem. When developing algorithms external to ROS, which can later be integrated as a ROS node, it can be useful to load ROS bag files directly into Python. The simplest way to achieve this is by converting ROS bag files into PNG format (for images) and CSV format (for all other data).
bag2csv
bag2csv is a command line utility for converting ROS bag files into CSV files. A slightly modified version of Nick Speal’s bag2csv file is shown below. This was tested and worked using a Python 3 with ROS 1 setup.
''' This script saves each topic in a bagfile as a csv. Accepts a filename as an optional argument. Operates on all bagfiles in current directory if no argument provided Written by Nick Speal in May 2013 at McGill University's Aerospace Mechatronics Laboratory www.speal.ca Supervised by Professor Inna Sharf, Professor Meyer Nahon ''' import rosbag, sys, csv import time import string import os #for file management make directory import shutil #for file management, copy file #verify correct input arguments: 1 or 2 if (len(sys.argv) > 2): print("invalid number of arguments: " + str(len(sys.argv))) print("should be 2: 'bag2csv.py' and 'bagName'") print("or just 1 : 'bag2csv.py'") sys.exit(1) elif (len(sys.argv) == 2): listOfBagFiles = [sys.argv[1]] numberOfFiles = "1" print("reading only 1 bagfile: " + str(listOfBagFiles[0])) elif (len(sys.argv) == 1): listOfBagFiles = [f for f in os.listdir(".") if f[-4:] == ".bag"] #get list of only bag files in current dir. numberOfFiles = str(len(listOfBagFiles)) print("reading all " + numberOfFiles + " bagfiles in current directory: \n") for f in listOfBagFiles: print(f) print("\n press ctrl+c in the next 10 seconds to cancel \n") time.sleep(10) else: print("bad argument(s): " + str(sys.argv)) #shouldnt really come up sys.exit(1) count = 0 for bagFile in listOfBagFiles: count += 1 print("reading file " + str(count) + " of " + numberOfFiles + ": " + bagFile) #access bag bag = rosbag.Bag(bagFile) bagContents = bag.read_messages() bagName = bag.filename #create a new directory folder = bagName.rstrip('.bag') try: #else already exists os.makedirs(folder) except: pass shutil.copyfile(bagName, folder + '/' + bagName) #get list of topics from the bag listOfTopics = [] for topic, msg, t in bagContents: if topic not in listOfTopics: listOfTopics.append(topic) for topicName in listOfTopics: #Create a new CSV file for each topic filename = folder + '/' + topicName.replace('/', '_slash_') + '.csv' with open(filename, 'w+') as csvfile: filewriter = csv.writer(csvfile, delimiter = ',') firstIteration = True #allows header row for subtopic, msg, t in bag.read_messages(topicName): # for each instant in time that has data for topicName #parse data from this instant, which is of the form of multiple lines of "Name: value\n" # - put it in the form of a list of 2-element lists msgString = str(msg) msgList = msgString.split('\n') instantaneousListOfData = [] for nameValuePair in msgList: splitPair = nameValuePair.split(':') for i in range(len(splitPair)): #should be 0 to 1 splitPair[i] = splitPair[i].strip() instantaneousListOfData.append(splitPair) #write the first row from the first element of each pair if firstIteration: # header headers = ["rosbagTimestamp"] #first column header for pair in instantaneousListOfData: headers.append(pair[0]) filewriter.writerow(headers) firstIteration = False # write the value from each pair to the file values = [str(t)] #first column will have rosbag timestamp for pair in instantaneousListOfData: if len(pair) > 1: values.append(pair[1]) filewriter.writerow(values) bag.close() print("Done reading all " + numberOfFiles + " bag files.")
To run this script, place it inside the same folder containing the ROS bag files you want to convert. If the folder contains more than one bag file, run python bag2csv.py otherwise run python bag2csv.py bagfilename.bag to process a single bag file.
Reading the resulting CSV files into Python is straightforward using pandas. The code snippet below shows how to read data from the lidar CSV file.
import pandas as pd import numpy as np ROOT_DIR = '/home/dataset/csv/' CSV_FILE_SCAN = ROOT_DIR + '_slash_scan.csv' if __name__ == '__main__': scan_frame = pd.read_csv(CSV_FILE_SCAN) lidar = np.ones((len(scan_frame), 513), dtype=np.float32) lidar_timestamps = [] for i in range(len(scan_frame)): scanstring = scan_frame.iloc[i, 14] scan = np.fromstring(scanstring[1:-1], dtype=np.float32, sep=',') timestampstring = scan_frame.iloc[i, 0] lidar_timestamps.append(timestampstring) lidar[i, :] = scan
The headings of the CSV file in this case are [rosbagTimestamp, header, seq, stamp, secs, nsecs, frame_id, angle_min, angle_max, angle_increment, time_increment, scan_time, range_min, range_max, ranges, intensities] thus the first column contains the ROS timestamp, and the fifteenth column contains the lidar range measurements.
bag2png
The image data contained in a ROS bag file can be extracted to Python using OpenCV. The code below extracts RGB and depth image data (saved in a ROS bag from running a RealSense camera) and saves the resulting PNG image files to two separate directories.
import subprocess import yaml import rosbag import cv2 from cv_bridge import CvBridge import numpy as np FILENAME = 'Indoor' ROOT_DIR = '/home/Dataset' BAGFILE = ROOT_DIR + '/' + FILENAME + '.bag' if __name__ == '__main__': bag = rosbag.Bag(BAGFILE) for i in range(2): if (i == 0): TOPIC = '/camera/depth/image_rect_raw' DESCRIPTION = 'depth_' else: TOPIC = '/camera/color/image_raw' DESCRIPTION = 'color_' image_topic = bag.read_messages(TOPIC) for k, b in enumerate(image_topic): bridge = CvBridge() cv_image = bridge.imgmsg_to_cv2(b.message, b.message.encoding) cv_image.astype(np.uint8) if (DESCRIPTION == 'depth_'): depth_colormap = cv2.applyColorMap(cv2.convertScaleAbs(cv_image, alpha=0.03), cv2.COLORMAP_JET) cv2.imwrite(ROOT_DIR + '/depth/' + DESCRIPTION + str(b.timestamp) + '.png', cv_image) else: cv2.imwrite(ROOT_DIR + '/color/' + DESCRIPTION + str(b.timestamp) + '.png', cv_image) print('saved: ' + DESCRIPTION + str(b.timestamp) + '.png') bag.close() print('PROCESS COMPLETE')
Reading an image file into Python as a PIL image is straightforward as shown in the code snippet below. This reads an image, in the given root directory, named ‘color_002482024802.png’
from skimage import io import os import pandas as pd import numpy as np from PIL import Image root_dir = '/home/Dataset/color' colorimstring = 'color_002482024802.png' colorimage = io.imread(os.path.join(root_dir, colorimstring)) colorimage_rgb = colorimage[:, :, [2, 1, 0]] colorim = Image.fromarray(colorimage_rgb) colorim.show()
Once the image data is in Python it can be processed using any of the popular computer vision libraries such as OpenCV or PIL, or used to fine-tune a pre-trained PyTorch Convolutional Neural Network.