-
S-PTAM 리뷰 - 코드(2) : Visual Odometry 2SLAM/S-PTAM 이론 설명 및 코드 리뷰 2021. 7. 5. 17:26
지난 글에서는 S-PTAM의 visual odometry에서 initialization 부분에 대해 알아보았습니다. 요약하자면 최초의 stereo frame으로부터 최초의 MapPoint와 Measurement들이 생성되고, 이 frame은 최초의 keyframe으로 선정되었습니다. 이제 두번째 프레임부터는 가지고 있는 MapPoint들로부터 3D-2D point correspondency를 찾아 6D pose 계산에 활용하고, 특정 조건(이 글 참조)을 만족하면 keyframe으로 지정하는 단계를 반복합니다.
Initialization단계는 sptam.py의 SPTAM 클래스의 initialize 함수를 통해 수행되었습니다. sptam.py의 main 함수에서 여기까지의 코드는 다음과 같습니다.
if __name__ == '__main__': import cv2 import g2o import argparse from threading import Thread from components import Camera from components import StereoFrame from feature import ImageFeature from params import ParamsKITTI, ParamsEuroc from dataset import KITTIOdometry, EuRoCDataset parser = argparse.ArgumentParser() parser.add_argument('--no-viz', action='store_true', help='do not visualize') parser.add_argument('--dataset', type=str, help='dataset (KITTI/EuRoC)', default='KITTI') parser.add_argument('--path', type=str, help='dataset path', default='path/to/your/KITTI_odometry/sequences/00') args = parser.parse_args() if args.dataset.lower() == 'kitti': params = ParamsKITTI() dataset = KITTIOdometry(args.path) elif args.dataset.lower() == 'euroc': params = ParamsEuroc() dataset = EuRoCDataset(args.path) sptam = SPTAM(params) visualize = not args.no_viz if visualize: from viewer import MapViewer viewer = MapViewer(sptam, params) cam = Camera( dataset.cam.fx, dataset.cam.fy, dataset.cam.cx, dataset.cam.cy, dataset.cam.width, dataset.cam.height, params.frustum_near, params.frustum_far, dataset.cam.baseline) durations = [] for i in range(len(dataset)): featurel = ImageFeature(dataset.left[i], params) featurer = ImageFeature(dataset.right[i], params) timestamp = dataset.timestamps[i] time_start = time.time() t = Thread(target=featurer.extract_feature_points) t.start() featurel.extract_feature_points() t.join() frame = StereoFrame(i, g2o.Isometry3d(), featurel, featurer, cam, timestamp=timestamp) if not sptam.is_initialized(): sptam.initialize(frame)
맨 밑의 sptam.initialize(frame)이 initialization과정입니다. 이 과정에서 self.status 딕셔너리의 initialized 키의 값이 True로 지정되므로, 다음 프레임부터는 sptam.is_initialized가 True가 되어 if를 뛰어넘고 else로 이동하게 됩니다.
else: sptam.track(frame)
SPTAM 클래스의 track 함수로 넘어가 보겠습니다. 입력되는 frame은 두 번째 stereo frame 이겠죠?
# 첫번째 이후의 모든 frame에서 수행되는 slam 구동 과정 # motion estimation, bundle adjustment, loop closure def track(self, frame): # loop closure가 수행되는 동안은 tracking을 잠깐 멈춤 while self.is_paused(): time.sleep(1e-4) # tracking이 시작됐음을 표시해둠 self.set_tracking(True) self.current = frame print('Tracking:', frame.idx, ' <- ', self.reference.id, self.reference.idx) # motion model을 사용해서 대강의 pose를 예측하고 predicted_pose, _ = self.motion_model.predict_pose(frame.timestamp) # frame에 pose 정보를 넣어놓음 frame.update_pose(predicted_pose)
self.is_paused는 visual odometry 과정의 일시정지를 제어하는 함수로, loop closure가 계산 중일 때는 True가 반환됩니다. 즉 loop closure가 끝날 때까지 대기하게 됩니다.
Loop closure 수행 중이 아니거나 끝났다면 self.set_tracking 함수로 tracking이 수행 중임을 표시해 둡니다.
그 다음에는 self.current 변수에 현재 frame을 저장해 두고, motion model을 사용해 현재 pose를 예측합니다(비선형 최적화를 위한 initial estimate). Pose 예측의 필요성과 방법은 이 글에 설명했었습니다. 이 예측된 pose를 frame.update_pose 함수를 이용해서 현재 frame에 넣어 둡니다.
# loop closing if self.loop_closing is not None: # loop closure 수행을 위한 보정 데이터(R, T)가 존재한다면(loop closure가 감지되어 계산되었다면) if self.loop_correction is not None: estimated_pose = g2o.Isometry3d( frame.orientation, frame.position) # 보정 데이터를 이용해 pose update estimated_pose = estimated_pose * self.loop_correction # update된 pose를 frame에 삽입 frame.update_pose(estimated_pose) # motion model도 update self.motion_model.apply_correction(self.loop_correction) # 보정 데이터용 변수 초기화 self.loop_correction = None
이 부분은 loop closing이 발생했을 때만 실행되는 부분으로, loop closure 알고리즘을 통해 계산된 오차 보정분을 받아 새로운 정확한 pose인 estimated_pose를 생성합니다. 그리고 이 정확한 pose를 현재 frame과 motion model에 넣어 줍니다.
# 현재 frame 주변의 mappoint의 list를 반환 local_mappoints = self.get_local_mappoints(frame)
여기부터 매우 복잡해지기 시작합니다. 일단 local_mappoints를 받아 오는데, local mappoints란 현재 위치 주변에 있는 MapPoint들의 모음을 말합니다. 즉, 3D-2D point correspondency를 계산하기 위한 3D point들입니다. get_local_mappoints 함수가 이것을 수행합니다. 이 함수를 살펴보겠습니다.
# local_mappoints 생성 def get_local_mappoints(self, frame): # 일단 현재의 graph를 참조해서 현재 frame 주변의 최대한 많은mappoint들을 가져옴 # self.preceding은 마지막으로 등록된 keyframe # self.reference는 현재 graph의 get_reference_frame에서 온 것으로, 입력된 tracked_map 안의 MapPoint들이 가장 많이 관측되는 키프레임이라고 할 수 있음 # get_local_map은 현재 frame 근처의 map(=keypoint들과 keyframe)을 갖고옴 # [0]은 키프레임 제외하고 mappoint들만 갖고오겠다는 것임 local_mappoints = self.graph.get_local_map( [self.preceding, self.reference])[0]
이 안에 또 get_local_map이라는 함수가 있습니다. 이 함수는 covisibility.py의 CovisibilityGraph 클래스 내부에 있습니다. 매우 복잡한 함수로, line-by-line으로 설명하기에는 무리가 있습니다. 이 함수의 역할을 최대한 간단하게 요약하자면, 입력받은 keyframe들(self.preceding, self.reference)과 그 covisible keyframe(이 글의 add_mappoints_and_handle_covisibility 함수 설명 참조)들을 모두 긁어모은 후 이 중에서 가장 covisible MapPoint들을 많이 가지고 있는 keyframe들을 골라서 이 keyframe들의 MapPoint들을 반환하는 것입니다. 더 축약하자면, preceding keyframe과 reference keyframe 주변의 MapPoint들을 최대한 많이 가져오는 역할입니다. 이 MapPoint들을 사용해 3D-2D point correspondency를 찾는 것입니다. 이제 get_local_mappoints 함수의 다음 line으로 넘어가 보겠습니다.
# can_view 함수를 이용 # local map에서 현재 frame에서 물리적으로 관측가능한 mappoint는 true, 아닌것들은 false인 리스트를 반환 # ex) [True, True, False, True, False...] can_view = frame.can_view(local_mappoints) print('filter points:', len(local_mappoints), can_view.sum(), len(self.preceding.mappoints()), len(self.reference.mappoints())) checked = set() filtered = [] # np.where(can_view)[0]: # can_view 에서 true인 원소의 index를 list로 반환 for i in np.where(can_view)[0]: pt = local_mappoints[i] ''' def is_bad(self): with self._lock: status = ( self.count['meas'] == 0 or (self.count['outlier'] > 20 and self.count['outlier'] > self.count['inlier']) or (self.count['proj'] > 20 and self.count['proj'] > self.count['meas'] * 10)) return status ''' if pt.is_bad(): continue pt.increase_projection_count() filtered.append(pt) checked.add(pt)
can_view 함수는 frustum culling을 수행하는 함수인데, 간단히 설명하자면 현재 카메라의 위치에서 물리적으로 관측 가능하지 않은 점들을 걸러내는 역할을 합니다. 이로 인해 혹시 모를 outlier들이 한 차례 더 걸러집니다. 그 뒤부터는 filtered 리스트와 checked 리스트에 can_view test를 통과한 MapPoint들을 담아 두는 부분입니다. 이 과정에서 is_bad() 함수를 사용해 outlier를 한 차례 더 걸러냅니다(components.py의 MapPoint 클래스에 구현되어 있는 함수로, 어떤 기준으로 outlier를 걸러내는지는 components.py에서 확인해 보시기 바랍니다). 그 후 increase_projection_count() 함수로 이 MapPoint가 관측된 횟수를 1만큼 증가시킵니다.
# 이 for 문이 돌면 아무래도 위의 can_view test가 의미없게되는듯?? # can_view가 아닌 point들까지 filtered에 들어가기때문 for reference in set([self.preceding, self.reference]): for pt in reference.mappoints(): # neglect can_view test if pt in checked or pt.is_bad(): continue pt.increase_projection_count() filtered.append(pt) # 최종적으로 motion estimation에 사용할 만한 mappoint들을 반환 return filtered
그 밑의 부분은 추가적인 MapPoint들을 filtered 리스트에 추가하는 과정인데, 잘 보시면 이 과정에서 checked 리스트에 들어 있는 점들, 즉 can_view test를 통과한 MapPoint들 이외에도 filtered 리스트에 추가를 하고 있습니다. 원 저자가 can_view test의 효용성이 높지 않다고 판단한 것 같기도 합니다. 이 부분은 제가 추후 수정을 가할 생각입니다. 이제 다시 tracking 함수의 다음 라인으로 넘어가 보겠습니다.
measurements = frame.create_measurements_from_mappoints( local_mappoints, Measurement.Source.TRACKING)
이 부분은 S-PTAM 전체 코드에서 가장 복잡하다고 해도 과언이 아닌 부분으로, 저 create_measurements_from_mappoints 함수가 정말 수많은 작업들을 수행합니다. 이 함수가 다른 함수들 몇 개를 실행하고, 그 각각의 함수들은 또 다른 함수를 실행하는 등 line-by-line으로 설명하기는 불가능에 가깝습니다(저는 해독할 때는 실제로 그렇게 했지만...). 이 함수의 역할을 최대한 간단하게 요약하자면, 3D-2D point correspondency를 계산 및 반환하는 역할이라고 할 수 있습니다. 즉 local_mappoints에 들어 있는 3D point들과 현재 frame 객체에 들어 있는 2D feature point들을 매칭시켜 Measurement 객체의 리스트(measurements)로 반환합니다. 정확한 절차가 궁금하시다면 디버거를 사용하여 line-by-line으로 추적해 보실 수도 있겠지만, 쉬운 일은 아닐 것 같습니다. 다시 다음 라인으로 가 보겠습니다.
# measurements는 local_mappoints를 2d에 project시켜 matching한것이므로 더 적을수밖에 없음 print('measurements:', len(measurements), ' ', len(local_mappoints)) tracked_map = set() # measurements에서 mappoint들만 빼와서 set에 집어넣음 # m에는 현재 프레임에서 키프레임의 키포인트들과 매칭된 포인트들의 정보가 들어있음 (Measurement 객체) for m in measurements: # m.mappoint는 현재 프레임의 키포인트와 매칭된 맵포인트(= 키프레임의 (3d)키포인트) mappoint = m.mappoint # 그 포인트의 descriptor를 현재 프레임의 descriptor로 업데이트(다음 프레임에 더 매칭 잘되게 하기 위함인듯) mappoint.update_descriptor(m.get_descriptor()) # 관측된 횟수를 기록 mappoint.increase_measurement_count() # 다음 단계에서 쓰일 tracked_map에 mappoint들만 추가 tracked_map.add(mappoint) # tracked_map의 생성 목적은 reference frame을 찾기 위함임
tracked_map을 생성하는 부분입니다. tracked_map은 reference frame을 찾기 위해 생성되는 객체로, 조금 뒤 자세히 설명하겠습니다. 그 과정에서 for문 안을 보시면 update_descriptor라는 함수가 있는데, 이 함수는 이전의 MapPoint의 descriptor를 현재 frame에서 계산된 descriptor로 update하는 부분입니다. 즉 하나의 MapPoint의 descriptor를 최신으로 유지하는 작업으로, 다음 프레임에서 좀 더 많은 매칭 성공이 이루어지도록 합니다. tracked_map에는 3D-2D 매칭에 성공한 MapPoint들이 추가됩니다.
# pose estimation try: # ***** tracked_map 안의 mappoint들이 가장 많이 등장한 keyframe 오브젝트를 반환 ***** # 최초에는 위의 initialize에서 self.mappping.add_mappoints_and_handle_covisibility 함수에서 mappoint에 keyframe 정보가 들어가있음 self.reference = self.graph.get_reference_frame(tracked_map) # 새로운 pose # optimizer를 통해 정확한 현재 pose 계산 pose = self.tracker.calculate_new_pose(frame.pose, frame.cam, measurements) # frame과 motion model update frame.update_pose(pose) self.motion_model.update_pose( frame.timestamp, pose.position(), pose.orientation()) tracking_is_ok = True except: tracking_is_ok = False print('tracking failed!!!')
get_reference_frame 함수로 reference frame을 선정합니다. 이 글에서 간단히 설명했듯 reference frame(reference keyframe) 이란 현재 3D-2D 매칭에 사용되고 있는 MapPoint들(여기서는 tracked_map)을 가장 많이 생성/관측한 keyframe으로, 개념적으로 설명하자면 현재 visual odometry의 기준이 되는 keyframe이라고 할 수 있겠습니다.
그 다음 calculate_new_pose 함수로 현재 frame의 pose를 계산합니다. 저 함수는 현재 pose(=initial estimate)와 measurements(=3D-2D point correspondency)를 사용하여 정확한 pose를 계산하는 과정입니다(이 글들을 참조하면 되시겠습니다). 정확한 pose가 계산된 후에는 frame의 pose와 motion model이 사용하는 pose를 업데이트합니다.
이 과정이 실패하면 tracking 실패를 선언합니다.
# 키프레임이 될 만하다면 새로운 키프레임으로 등록하고 reference를 update if tracking_is_ok and self.should_be_keyframe(frame, measurements): print('new keyframe', frame.idx) keyframe = frame.to_keyframe() keyframe.update_reference(self.reference) keyframe.update_preceding(self.preceding) # 1. mapping.py의 MappingThread에 있는 add_new_keyframe에 keyframe과 그 measurements 넣어줌 (여기서 keyframe은 지금 새로 만들어진 것) # 2. 그 안에서 첫번째로는 graph에 keyframe이 추가됨 # 3. 그 다음으로 mapping.py의 create_and_add_mappoints_and_handle_covisibility에 keyframe이 들어감 # 4. 여기서 triangulate 돼서 mappoints와 measurements가 만들어짐 # 5. 이게 또 add_mappoints_and_handle_covisibility 함수로 들어감 # 6. mappoint들은 graph내부에 추가되고 measurement들은 keyframe, mappoint와 함께 graph.add_measurement_and_do_covisibility_tasks 함수에 입력됨 # 7. 이 함수는 covisibility 관련 작업 수행 # covisibility.py의 add_measurement_and_do_covisibility_tasks 참조 self.mapping.add_new_keyframe(keyframe, measurements) # (1) mapping.add_new_keyframe -> graph.add_new_keyframe # (2) mapping.add_new_keyframe -> mapping.create_and_add_mappoints_and_handle_covisibility -> mapping.add_mappoints_and_handle_covisibility -> graph.add_mappoint, graph.add_measurement_and_do_covisibility_tasks # (3) mapping.add_new_keyframe -> for m in measurements, graph.add_measurement_and_do_covisibility_tasks # 참고로 initialization에서는 (1), (2)만 수행됨 # loop closure detector에도 추가 if self.loop_closing is not None: self.loop_closing.add_keyframe(keyframe) # preceding keyframe을 현재 keyframe으로 설정 self.preceding = keyframe
이제 tracking 함수의 마지막 부분으로, keyframe 지정 조건을 만족하면 현재 stereo frame을 keyframe으로 지정한 후 mapping thread와 loop closing thread에 이 keyframe을 저장합니다(두 thread는 각각 bundle adjustment와 loop closure를 담당하는 thread로, 추후 설명 예정입니다). 그리고 새 keyframe이 선정되었으므로 preceding keyframe을 업데이트합니다.
# 한 timestep의 tracking 종료 선언 self.set_tracking(False)
이제 한 frame에 대한 tracking 절차(=visual odometry)가 마무리되었습니다. 이 전체 과정을 새로 들어오는 모든 frame에 대해 반복하는 것이 visual odometry입니다.
Line-by-line으로 파고들어 설명하지 않은 부분이 많이 있고, 디테일한 설명을 곁들이지 않은 부분도 많습니다. 어차피 제 설명을 듣는다고 visual odometry pipeline이 한 번에 이해될 리는 없기 때문에, 대략 어떤 작업이 어떤 순서대로 진행되는지에 대한 흐름을 파악할 수 있는 guideline으로 보아 주시면 좋을 것 같습니다.
'SLAM > S-PTAM 이론 설명 및 코드 리뷰' 카테고리의 다른 글
S-PTAM 리뷰 - 이론(5) : Local Bundle Adjustment 2 (0) 2021.07.07 S-PTAM 리뷰 - 이론(4) : Local Bundle Adjustment 1 (0) 2021.07.06 S-PTAM 설치 및 구동 가이드 (0) 2021.07.02 S-PTAM 리뷰 - 코드(1) : Visual Odometry 1 (0) 2021.07.02 S-PTAM 리뷰 - 코드(0) : 주요 파일, 클래스 및 함수 (0) 2021.07.02