주식 자동 거래 시스템 구축 (1) get_code_list

공대생 디자이너

·

2021. 1. 21. 13:05

 

지금까지 공부하고 작성하였던 코드를 테스트 케이스의 순서와 함께 복습하며 이해하는 시간을 가지도록하겠습니다. 생각보다 복잡하진 않았지만 기억을 되짚어보고 정보를 찾아가며 공부해보았습니다.

class TestEBest(unittest.TestCase):
	def setUp(self):
		self.ebest = EBest("DEMO")
		self.ebest.login()

	def test_get_code_list(self):

		print("테스트케이스 1. ", inspect.stack()[0][3])
		# 코스피 리스트
		all_result = self.ebest.get_code_list("KOSPI")
		assert all_result is not None, '정보를 가져오지 못했습니다.'	# 가정설명문
		print("KOSPI 종목 개수 : ", len(all_result))
		# 코스닥 리스트
		all_result = self.ebest.get_code_list("KOSDAQ")
		assert all_result is not None, '정보를 가져오지 못했습니다.'
		print("KOSDAQ 종목 개수 : ", len(all_result))

	def test_get_stock_price_list_by_code(self):
		print("테스트 케이스 2. ", inspect.stack()[0][3])
		result = self.ebest.get_stock_price_by_code("005930", "2")
		assert result is not None, '정보를 가져오지 못했습니다.'
		print(result)


	def tearDown(self):
		self.ebest.logout()

 

테스트 클래스는 다음과 같습니다.
setUp > test_get_code_list > tearDown > setUp > test_get_stock_price_list_by_code > tearDown 순으로 진행됩니다. 본문에서는 1 cycle만 설명합니다.

 

setUp Function

setUp 함수는 테스트에 필요한 로그인을 구현하고 있습니다.
모의투자서버를 사용하기 때문에 "DEMO" 인자를 전달합니다.

def __init__(self, mode=None):
		# config.ini 파일을 로드하여 사용자, 서버 정보를 저장
		# query_cnt는 10분당 200개의 Transaction수행을 관리하기 위한 리스트
		# xa_session_client는 XASession 객체
		# :param mode:str - 모의서버는 DEMO 실서버는 PROD로 구분

		if mode not in ["PROD", "DEMO"]:
			raise Exception("Need to run_mode(PROD or DEMO)")

		run_mode = "EBEST_" + mode
        # 모드결정
		config = configparser.ConfigParser()
        # 파서 불러오기
		config.read('T:\DEV\Stock_Lab\conf\config.ini')
        # 해당 파일의 내용을 읽어옴
		self.user = config[run_mode]['user']
        # run_mode의 내용을 중심으로 user의 정보를 불러옴
		self.passwd = config[run_mode]['password']
		self.cert_passwd = config[run_mode]['cert_passwd']
		self.host = config[run_mode]['host']
		self.port = config[run_mode]['port']
		self.account = config[run_mode]['account']

		self.xa_session_client = win32com.client.DispatchWithEvents
        						("XA_Session.XASession",XASession)
		self.query_cnt = []

 

def login(self):
		self.xa_session_client.ConnectServer(self.host, self.port) # 서버연결 시도
		self.xa_session_client.Login(self.user, self.passwd, self.cert_passwd, 0, 0) # 인증
		while XASession.login_state == 0:
			pythoncom.PumpWaitingMessages()	# 오류메시지를 기다림

Ebest 클래스의 선언부는 mode를 전달받아 실투서버와 모의서버를 구별합니다. 모의서버만을 사용하면 굳이 필요하지 않지만 직접 투자를 돌릴 수 있게되었을 때 사용하기 위해 추가해놓았습니다.

run_mode가 결정되면 config.ini에서 해당 정보를 불러와서 win32com 모듈을 사용하여 xa_session_client 객체를 만듭니다.  여기서 win32com 모듈은 Active X 함수를 사용할 수 있게 도와주는 역할을 하며, 다른 언어로 작성된 COM(Componet Object Model) 객체를 생성하기 위해 Dispatch를 사용합니다.

 

login Function

선언부에 사용되었던 객체를 사용하여 서버에 연결을 시도하고 응답을 통해 계정에 엑세스합니다.

 

test_get_code_list Function

이 부분부터는 TR(Transaction)의 핵심 부분이기 때문에 설명이 다소 깁니다.

테스트 케이스의 부분을 확인하기위해 inspect.stack함수를 사용하였습니다. pythondoc에 따르면,

inspect.stack(context=1)
Return a list of frame records for the caller’s stack. The first entry in the returned
list represents the caller; the last entry represents the outermost call on the stack.

Changed in version 3.5: A list of named tuples
FrameInfo(frame, filename, lineno, function, code_context, index) is returned.

다음과 같이 정의되어있고 [0][3]을 통해 현재 위치하고 있는 함수의 이름을 불러옵니다.

def get_code_list(self, market=None):

		"""
		TR : t8436 코스피, 코스닥의 종목 리스트 가져오기
		:param market:str 전체(0) 코스피(1) 코스닥(2)	
		:return result:list 시장별 종목리스트
		"""

		if market != "ALL" and market != "KOSPI" and market != "KOSDAQ":
			raise Exception("Need to market param(ALL, KOSPI, KOSDAQ)")

		market_code = {"ALL":"0", "KOSPI":"1", "KOSDAQ":"2"}
		print(market_code[market])
		in_params = {"gubun":market_code[market]}
		out_params = ['hname', 'shcode', 'excode', 'etfgubun', 'memedan', 'gubun', 'spac_gubun']
		result = self._excute_query("t8436", "t8436InBlock", "t8436OutBlock", *out_params, **in_params)
		return result

종목 리스트의 숫자를 확인하기 위한 함수입니다. TR번호는 t8436이며 이를 기준으로 API를 찾아서 사용합니다.

 

TR에서 요구하는 arg는 다음과 같습니다. InBlock으로 정보를 선택하고 OutBlock으로 정보를 전달하게됩니다.
작성한 코드를 보면 투자서버를 선택하기 위했던 Mode처럼 in_params에 list arg를 넣고 out_params에 정보를 받기위한 dict key를 추가하여 _excute_query함수를 호출하여 result로 반환합니다.

 

Transaction Part

(주석으로 간단한 설명을 적어놓았으므로 참고하시면됩니다).

 

def _excute_query(self, res, in_block_name, out_block_name, *out_fields, **set_fields):
		"""
		Transaction 코드를 실행하기위한 메서드
		:param res:str 리소스이름(TR)
		:param in_block_name:str 인블럭 이름
		:param out_block_name:str 아웃블럭 이름
		:param out_params:list 출력 필드 리스트
		:param in_params:dict 인블럭에 설정할 필드 딕셔너리
		:return result:list 결과를 list에 담아 리턴
		"""
		time.sleep(1)
		print("Current Query Count : ", len(self.query_cnt))
		print(res, in_block_name, out_block_name)
		while len(self.query_cnt) >= EBest.QUERY_LIMIT_10MIN:	# 전송횟수 초과시 1초 대기
			time.sleep(1)
			print("Wating for excute query...\nCurrent query count : ", len(self.query_cnt_))
			self.query_cnt = list(filter(lambda x: (datetime.today() - x).total_seconds() < EBest.LIMIT_SECONS, self.query_cnt))
		
		xa_query = win32com.client.DispatchWithEvents("XA_DataSet.XAQuery", XAQuery)
		print("테스트 : ", XAQuery.RES_PATH + res + ".res")
		xa_query.LoadFromResFile(XAQuery.RES_PATH + res + ".res")

		# in_block_name Setting
		for key, value in set_fields.items():
			xa_query.SetFieldData(in_block_name, key, 0, value)
		errorCode = xa_query.Request(0)

		# 요청 후 대기
		waiting_cnt = 0
		while xa_query.tr_run_state == 0:
			waiting_cnt +=1
			if waiting_cnt % 100000 == 0:
				print("Waiting...", self.xa_session_client.GetLastError())
			pythoncom.PumpWaitingMessages()

		# 결과 블럭
		result = []
		count = xa_query.GetBlockCount(out_block_name)
		for i in range(count):
			item = {}
			for field in out_fields:
				value = xa_query.GetFieldData(out_block_name, field, i)
				item[field] = value
			result.append(item)

		# 제약시간 체크
		XAQuery.tr_run_state = 0
		self.query_cnt.append(datetime.today())

		# 영문 필드명을 한글 필드명으로 보기 쉽게 변환
		for item in result:
			for field in list(item.keys()):
				if getattr(field, res, None):
					res_field = getattr(field, res, None)
					if out_block_name in res_field:
						field_hname = res_field[out_block_name]
						if field in field_hname:
							item[field_hname[field]] = item[field]
							item.pop(field)

		return result
QUERY_LIMIT_10MIN = 200	# 10분당 200개의 트랜젝션 제한
LIMIT_SECONDS = 600		# 10분

API를 사용하기 위한 규칙으로는 10분당 200개의 TR요청이 있습니다. 서버로부터 차단규칙이 정해져있으므로 조심하도록 해야합니다.  query_cnt의 개수를 파악하여 전송횟수를 판단합니다. query_cnt에는 TR을 요청할 때의 시간의 값이 들어가있습니다.

class XAQuery:
	RES_PATH = "C:\\eBEST\\xingAPI\\Res\\"
	tr_run_state = 0

	def OnReceiveData(self, code):
		print("OnReceiveData", code)
		XAQuery.tr_run_state = 1

	def OnReceiveMessage(self, error, code, message):
		print("OnReceiveMessage", error, code, message)

xa_query는 XAQuery클래스의 win32com 객체로 선언합니다. API를 사용하기 위한 res파일의 PATH를 선언하고 1 by 1 통신을 하기 위해 tr_run_state를 구성합니다.

 

# in_block_name Setting

res파일에 맞는 필드를 맞추기 위해 키워드 인자의 값을 토대로 xa_query의 필드데이터값을 지정합니다.

그 다음 Request 메서드를 이용하여 TR을 요청합니다.

 

# 요청 후 대기

요청 후 대기를 위해 waiting_cnt 변수를 사용하여 대기 중이라는 결과값을 받기 전까지 루프하게됩니다.

tr_run_state를 통해 응답을 받았다면 루프를 escape합니다.

 

# 결과 블럭

응답 메시지는 리스트 형식의 result로 반환하기위해 미리 선언해줍니다.

응답 메시지의 개수를 확인하기위해 GetBlockCount 메서드를 사용하고 count만큼 반복합니다.

_excute_query에서  out_field에 정의한 필드의 값만 GetFieldData 메서드를 이용하여 value에 가져오고 item dict에 추가한 후 result 변수에 담습니다.

 

# 제약시간 체크

TR요청과 응답을 모두 처리한 상태가 되면 TR프로세스상태를 나타내는 tr_run_state변수를 0으로 바꾸고 query_cnt에 현재시각을 추가합니다.

 

# 영문 필드명을 한글 필드명으로 보기 쉽게 변환

해당 부분은 연습과정에서 추가했으나 필요한 TR과 필드만 한글로 변환시켜주기위해서 구현하지 않았으므로 생략하겠습니다.

 

마무리

TR을 처리하는 _excute_query부분이 종료되었으므로 result값을 반환합니다. 이렇게되면 get_code_list에는 반환된 result가 들어가게되고 test_get_code_list로 다시 반환합니다.

def test_get_code_list(self):

		print("테스트케이스 1. ", inspect.stack()[0][3])
		# 코스피 리스트
		all_result = self.ebest.get_code_list("KOSPI")
		assert all_result is not None, '정보를 가져오지 못했습니다.'	# 가정설명문
		print("KOSPI 종목 개수 : ", len(all_result))
		# 코스닥 리스트
		all_result = self.ebest.get_code_list("KOSDAQ")
		assert all_result is not None, '정보를 가져오지 못했습니다.'
		print("KOSDAQ 종목 개수 : ", len(all_result))

다시한번 test_get_code_list를 보겠습니다. 반환받은 값은 all_result 변수에 담겨집니다.
assert문을 통해서 결과값을 받았는지 확인하게되는데 assert문을 처음봐서 찾아봤습니다.

어떤 함수는 성능을 높이기 위해 반드시 정수만을 입력받아 처리하도록 만들 수 있다.
이런 함수를 만들기 위해서는 반드시 함수에 정수만 들어오는지 확인할 필요가 있다.
이를 위해 if문을 사용할 수도 있고 '예외 처리'를 사용할 수도 있지만
'가정 설정문'을 사용하는 방법도 있다.

assert는 가정 설정문으로 함수의 에러를 찾는것에서 그치지않고 보증하기 위해서 사용한다고 합니다. 방어적인 프로그래밍에 해당하며 저처럼 처음 공부하는 사람들에게는 적절한 코드라고 생각됩니다.

 

test_get_code_list의 최종 목적인 print문을 통해 종목의 개수를 len(all_result)로 변환하여 출력하면 다음 그림과 같이 테스트가 완료됩니다.

 

핵심 TR부분을 구현하여 이해에 성공하였습니다...!

 

TR을 처리하는 _excute_query 함수를 만들게되면서 알고리즘을 짜고 구현하는 법에 대해서 다시 생각하게되고,오류를 찾게되는 과정에서 이해가 되는 것 같습니다. 주식 종목의 개수를 구하는 함수와 주식 종목 조회 함수를 구현하는게 짧은 시간이 걸리지는 않았습니다. 여러 개의 오류와 몸으로 직접 부딪혀가고 구글링을 통해 알아가게되면서 점차 알아게되는게 재밌는 것 같습니다. 이 글을 작성하면서도 한번 더 공부하게 되서 기억에 남을 것 같습니다. 내가 원하는 기능을 만들어서 직접 해보기까지 열심히 진행해보겠습니다.

 

 


작성한 내용은 개인적인 경험에 의해 작성되었습니다.

포스팅 내용의 잘못된 부분이나 질문은 댓글이메일로 언제든 남겨주세요!

포스팅 내용이 도움이 되고 공감이 되었다면 공감버튼 꾹 눌러주시면 감사하겠습니다!

 

 

 

'major > python' 카테고리의 다른 글

FastAPI 찍어먹기  (0) 2021.02.02
주식 자동 거래 시스템 구축을 시작하며  (0) 2021.01.07