ダイエットを始めてそろそろ 5 ヶ月。
帰りは隣の駅で降りて家まで歩くようにしてるけど、筋トレをサボっていたせいか、体重は減ったのに体脂肪が変わらないという結果に。
今年はあと少しだけど、どこまで減らせるか。
そういうわけで、全天球写真を撮れる THETA。
MobileHackerz さんのところで WiFi 越しに THETA のシャッターを押せる perl スクリプトが公開されているので、処理の流れを把握するためにざっくりと python に書き直してみた次第。
http://mobilehackerz.jp/contents/Review/RICOH_THETA/WiFi_Control
エラー処理できてないし、Mac でしか試してないけど、せっかくなので公開しておきます。
iPhone でアプリを作ろうかと思ったけど、わざわざ WiFi でつないでからアプリを立ち上げるというのが若干煩わしいので、いっそのこと Raspberry Pi や BeagleBone Black でつなぎっぱなしにして使おうかと。
いや、やっぱり iPhone アプリかなあ。
#!/usr/bin/env python # coding: UTF-8 import socket import struct #============================================================================== class THETA360(object): # ------------------------------------------------------------------------- def __init__(self): '''Initialize''' self.host = '192.168.1.1' self.port = 15740 self.name = 'THETA' self.GUID = '8a7ab04f-ebda-4f33-8649-8bf8c1cdc838' self.command_sock = None self.event_sock = None # ------------------------------------------------------------------------- def OpenConnection(self): # Init_Command self.command_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: self.command_sock.connect((self.host, self.port)) except: print 'Connection Failed' return self.Send_InitCommandRequest(self.command_sock) result, self.session_id = self.Wait_InitCommandAck(self.command_sock) if result == 0: print 'InitCommandRequest failed' return print '(session_id = %d)' % self.session_id # Init_Event self.event_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: self.event_sock.connect((self.host, self.port)) except: print 'Connection Failed' return self.Send_InitEventRequest(self.event_sock, self.session_id) result = self.Wait_InitEventAck(self.event_sock) if result == 0: print 'InitEventRequest failed' return self.transaction_id = 0 # ------------------------------------------------------------------------- def CloseConnection(self): if self.command_sock is not None: self.command_sock.close() self.command_sock = None if self.event_sock is not None: self.event_sock.close() self.event_sock = None # ------------------------------------------------------------------------- def PTP_OpenSession(self): print 'PTP_OC_OpenSession' self.Send_PTPCommandRequest(self.command_sock, self.transaction_id, '', 0x1002, self.session_id) self.transaction_id += 1 result, args, payload = self.Wait_PTPCommandResponse(self.command_sock) if result == 0: print 'Failed' # ------------------------------------------------------------------------- def PTP_SetDevicePropValue(self, prop_id, val): print 'PTP_OC_SetDevicePropValue' # payload = struct.pack('<h', val) self.Send_PTPCommandRequest(self.command_sock, self.transaction_id, val, 0x1016, prop_id) self.transaction_id += 1 result, args, payload = self.Wait_PTPCommandResponse(self.command_sock) if result == 0: print 'Failed' return 0 return 1 # ------------------------------------------------------------------------- def PTP_InitiateCapture(self): print 'Send PTP_OC_InitiateCapture' self.Send_PTPCommandRequest(self.command_sock, self.transaction_id, '', 0x100E, 0, 0) self.transaction_id += 1 result, args, payload = self.Wait_PTPCommandResponse(self.command_sock) if result == 0: print 'Failed' return 0 print 'Wait PTP_EC_CaptureComplete' handle = 0 for loop in range(0, 20): ptp_event, args = self.Wait_PTPEvent(self.event_sock) if ptp_event == 0x400D: break elif ptp_event == 0x4002: handle = args[0] return handle # ------------------------------------------------------------------------- def Send_InitCommandRequest(self, sock): print 'Send InitCommandRequest' payload = '' payload += self.packGUID() payload += self.packString(self.name) payload += struct.pack('<i', 1) self.sendCommand(sock, 1, payload) # ------------------------------------------------------------------------- def Wait_InitCommandAck(self, sock): print 'Wait InitCommandAck' cmd_id, payload = self.recvResponse(sock) if cmd_id != 2: print 'failed' return 0, 0 session_id = struct.unpack('<i', payload[0:4])[0] target_GUID = self.unpackGUID(payload[4:20]) target_name = self.unpackString(payload[20:-4]) # and unknown 4 bytes print 'Target GUID : %s' % target_GUID print 'Target Name : %s' % target_name return 1, session_id # ------------------------------------------------------------------------- def Send_InitEventRequest(self, sock, session_id): print 'Send InitEventRequest' payload = '' payload += struct.pack('<i', session_id) self.sendCommand(sock, 3, payload) # ------------------------------------------------------------------------- def Wait_InitEventAck(self, sock): print 'Wait InitEventAck' cmd_id, payload = self.recvResponse(sock) if cmd_id != 4: print 'failed' return 0 return 1 # ------------------------------------------------------------------------- def Send_PTPCommandRequest(self, sock, transaction_id, ptp_payload, ptp_cmd, *args, **kwargs): # Cmd_Request payload = '' payload += struct.pack('<i', 1) payload += struct.pack('<h', ptp_cmd) payload += struct.pack('<i', transaction_id) for arg in args: payload += struct.pack('<i', arg) self.sendCommand(sock, 6, payload) if ptp_payload == '': return # Start_Data_Packet payload = '' payload += struct.pack('<i', transaction_id) payload += struct.pack('<i', len(ptp_payload)) payload += struct.pack('<i', 0) self.sendCommand(sock, 9, payload) idx = 0 next_idx = idx + 200 while idx < len(ptp_payload): payload = '' payload += struct.pack('<i', transaction_id) payload += ptp_payload[idx:next_idx] if next_idx < len(ptp_payload): # Data_Packet self.sendCommand(sock, 10, payload) else: # End_Data_Packet self.sendCommand(sock, 12, payload) idx = next_idx next_idx += 200 # ------------------------------------------------------------------------- def Wait_PTPCommandResponse(self, sock): cmd_id, payload = self.recvResponse(sock) ptp_payload = '' if cmd_id == 9: # Start_Data_Packet transaction_id = struct.unpack('<i', payload[0:4])[0] ptp_payload_len = struct.unpack('<i', payload[4:8])[0] while True: # Data_Packet or End_Data_Packet cmd_id, payload = self.recvResponse(sock) if cmd_id != 10 and cmd_id != 12: return 0, None, None temp_id = struct.unpack('<i', payload[0:4])[0] if temp_id != transaction_id: return 0, None, None ptp_payload += payload[4:] if len(ptp_payload) >= ptp_payload_len or cmd == 12: break # Cmd_Response cmd_id, payload = self.recvResponse(sock) if cmd_id != 7: return 0, None, None ptp_res = struct.unpack('<h', payload[0:2])[0] transaction_id = struct.unpack('<i', payload[2:6])[0] ptp_args = [] idx = 6 while idx < len(payload): ptp_args.append(struct.unpack('<i', payload[idx:idx + 4])) idx += 4 return 1, ptp_args, ptp_payload # ------------------------------------------------------------------------- def Wait_PTPEvent(self, sock): sock.settimeout(0.5) cmd_id, payload = self.recvResponse(sock) if cmd_id != 8: return 0, None # Event ptp_event = struct.unpack('<h', payload[0:2])[0] transaction_id = struct.unpack('<i', payload[2:6])[0] ptp_args = [] idx = 6 while idx < len(payload): ptp_args.append(struct.unpack('<i', payload[idx:idx + 4])) idx += 4 return ptp_event, ptp_args # ------------------------------------------------------------------------- def sendCommand(self, sock, cmd_id, payload): packet = '' packet += struct.pack('<ii', len(payload) + 8, cmd_id) packet += payload # print 'SEND', # self.printPacket(packet) sock.send(packet) # ------------------------------------------------------------------------- def recvResponse(self, sock): packet = '' # packet length try: recv_data = sock.recv(4) except: # print 'recv timeout' return 0, None if recv_data is None or len(recv_data) != 4: return 0, None packet_len = struct.unpack('<i', recv_data)[0] # print 'recv packet len = %d' % packet_len if packet_len < 8 or packet_len > 1024: return 0, None packet += recv_data # command try: recv_data = sock.recv(4) except: # print 'recv timeout' return 0, None if recv_data is None or len(recv_data) != 4: return 0, None cmd_id = struct.unpack('<i', recv_data)[0] # print 'recv cmd id = %d' % cmd_id packet += recv_data # payload packet_len -= 8 try: recv_data = sock.recv(packet_len) except: # print 'recv timeout' return 0, None if recv_data is None or len(recv_data) != packet_len: return 0, None packet += recv_data # print 'RECV', # self.printPacket(packet) return cmd_id, recv_data # ------------------------------------------------------------------------- def printPacket(self, packet): for ch in packet: print '%02X' % ord(ch), print '' # ------------------------------------------------------------------------- def packGUID(self): data = '' for val in self.GUID.split('-'): idx = 0 while idx < len(val): data += chr(int(val[idx:idx + 2], 16)) idx += 2 return data # ------------------------------------------------------------------------- def unpackGUID(self, packet): guid = '' idx = 0 for ch in packet: if idx == 4 or idx == 6 or idx == 8 or idx == 10: guid += '-' guid += '%02x' % ord(ch) idx += 1 return guid # ------------------------------------------------------------------------- def packString(self, str): data = '' for ch in str: data += ch data += '\x00' data += '\x00' data += '\x00' return data # ------------------------------------------------------------------------- def unpackString(self, packet): str = '' idx = 0 for ch in packet: if (idx & 1) == 0: str += ch idx += 1 return str #============================================================================== if __name__ == '__main__': theta = THETA360() theta.OpenConnection() theta.PTP_OpenSession() # set EV shift # EV shift: 2000,1700,1300,1000,700,300,0,-300,-700,-1000,-1300,-1700,-2000 # ev_shift = 1000 # theta.PTP_SetDevicePropValue(0x5010, struct.pack('<h', ev_shift)) theta.PTP_InitiateCapture() # set default EV shift # theta.PTP_SetDevicePropValue(0x5010, struct.pack('<h', 0)) theta.CloseConnection()