공유 블로그

def is_number(n):
    if n != 'NULL':
        try:
            float(n)
        except ValueError:
            return False

    return True

def mySQLINSERTQuery(tableName:str,data:dict):


    sql= 'INSERT into `'+tableName+'` (`{}`)values({})'

    key = '`, `'.join(list(data.keys()))
    value=','.join(str(e) if is_number(str(e)) else '"'+str(e)+'"' for e in data.values())

    return sql.format(key,value)

tableName='test'
test={
    # 문자열 숫자는 숫자로 변형됨
    'test1':'132',
    # 순서 상관없음
    'test6':'t1e2s3t',
    'test2':2,
    'test5':'null',
    # 같은이름 제외됨(마지막만 등록됨)
    'test3':'test',
    # 대소문자 NULL 구분함
    'test3':'NULL',
    'test4':'null',
    'test4':'null',
    'test4':'null',
    # 특수문자 입력 가능
    'test4':'49!@23(*#테스트#*)',
}

print( mySQLINSERTQuery(tableName,test))

 

 

2023/03/12

from copy import deepcopy

import sys
import json
import pymysql



# DB관련 클래스
class SQL():
    def __init__(self, host, user, password, db, port=3306, charset='utf8'):
        self.conn = pymysql.connect(host=host, user=user, password=password, db=db,
                                    port=port, charset=charset, cursorclass=pymysql.cursors.DictCursor)  # 접속정보

        self.cur = self.conn.cursor()  # 커서생성

    # Mysql 데이터 타입 체크
    def _queryDataTypeCheck(self, data) -> str:
        # 숫자인지 체크(실수 Type으로 체크)
        def is_number(n: str):
            if n != 'NULL':
                try:
                    float(n)
                    # 문자열 첫글자가 0일경우 False
                    if n[0] == '0':
                        return False
                except ValueError:
                    return False

            return True

        value = ''

        e = deepcopy(data)
        # 리스트 타입일 경우
        if type(data) is list:
            e = deepcopy(str(data).replace(', ', ','))  # 데이터 최적화(띄어쓰기 제거)
        # 딕셔너리 타입일 경우
        elif type(data) is dict:
            e = deepcopy(
                json.dumps(
                    data,
                    ensure_ascii=False,  # 한글 인코딩
                    separators=(',', ':')  # 데이터 최적화(띄어쓰기 제거)
                )
            )

        if is_number(str(e)):
            value = e
        else:
            value = str(e)

        return value

    # 함수에 따라 리스트 생성
    def _functionTypeMakeList(self, function, data: list) -> list:
        try:
            data = list(data)
        except:
            return False

        return [function(value) for value in data]

    #### SQL(mysql & mariaDB)자동 완성 ####
    # 등록
    def mySQLINSERTQuery(self, tableName: str, data: dict, showConsole=False):
        dataLen = len(data)
        if dataLen == 0:
            return (False, 'data가 비어있습니다.')

        sqlQuery = 'INSERT into `'+tableName+'` (`{}`)values({})'
        key = '`, `'.join(list(data.keys()))

        try:
            value = self._functionTypeMakeList(
                self._queryDataTypeCheck, data.values()
            )
            if value == False:
                return (False, 'data가 dict가 아닙니다.')
        except:
            return (False, 'data가 dict가 아닙니다.')

        sql = sqlQuery.format(key,  ', '.join(['%s'] * dataLen))
        # 리턴값 제공
        returnDATA = {
            'CHECK': False
        }
        try:
            query = self.cur.mogrify(sql, value)  # 커서로sql 실행
            self.cur.execute(query)  # 커서로sql 실행
            if showConsole:
                returnDATA['TEXT_DEBUG'] = f'O (성공) : {query}'

            self.conn.commit()  # 최종 저장
            returnDATA['CHECK'] = True

        except Exception as e:
            e = str(e)
            text = 'X (실패'

            if showConsole:
                if "for key 'PRIMARY'" in e:
                    text += f' 이미존재) : {sql}'
                    returnDATA['TEXT_WARNING'] = text
                    return returnDATA

                elif "Incorrect date value" in e:
                    try:
                        errorCode = e.split('.')[-1].split(' ')[0]
                        text += f'{errorCode}값이 없습니다.) : ERROR -> {e}\n\t\t\t{query}'
                    except:
                        text += f' 값이 없습니다.) : ERROR -> {e}\n\t\t\t{query}'
                else:
                    text += f' ERROR) :  -> {e}\n\t\t\t{query}'
                returnDATA['TEXT_ERROR'] = text

        return returnDATA

    # 검색
    def mySQLSELECTQuery(self, tableName: str, findData: str = None):
        if findData:
            sqlQuery = 'SELECT * from `'+tableName + \
                '` where {}'.format(findData)
        else:
            sqlQuery = 'SELECT * FROM `'+tableName+'`'

        # print(sqlQuery)
        # cur = self.conn.cursor()

        try:
            self.cur.execute(sqlQuery)  # 커서로sql 실행
            # print('확인 : '+sqlQuery)
            self.conn.commit()  # 최종 저장

            data = self.cur.fetchall()

            # cur.close()

            return data
        except Exception as e:
            # print(e)
            # print('실패 : '+sqlQuery)

            return False

    # 수정
    def mySQLUPDATEQuery(self, tableName: str, upData: dict, findData: dict):
        sql = 'UPDATE `'+tableName+'` set {} where {}'

        def dataMakeEqual(key):
            return f'`{key}` = %s'

        changes = ''
        if type(upData) is dict:
            changes = self._functionTypeMakeList(dataMakeEqual, upData.keys())
            changesData = self._functionTypeMakeList(
                self._queryDataTypeCheck, upData.values()
            )

        where = ''
        if type(findData) is dict:
            where = self._functionTypeMakeList(dataMakeEqual, findData.keys())
            whereData = self._functionTypeMakeList(
                self._queryDataTypeCheck, findData.values()
            )

        # upData와 findData가 모두 dict일 경우
        if type(changes) is list and type(where) is list:
            sql = sql.format(', '.join(changes), ' and '.join(where))
            sql = self.cur.mogrify(sql, changesData+whereData)

        # upData만 dict일 경우
        elif type(changes) is list and type(findData) is str:
            sql = sql.format(', '.join(changes), findData)
            sql = self.cur.mogrify(sql, changesData)

        # findData만 dict일 경우
        elif type(where) is list and type(upData) is str:
            sql = sql.format(upData, ' and '.join(where))
            sql = self.cur.mogrify(sql, whereData)

        # 둘다 str일 경우
        else:
            sql = sql.format(upData, findData)

        try:
            self.cur.execute(sql)  # 커서로sql 실행
            # print('확인 : '+sql)
            self.conn.commit()  # 최종 저장

            return True
        except Exception as e:
            # print(e)
            # print('실패 : '+sql)

            return False
########################


# DB설정
def DBConnect():
    try:
        returnSQL = SQL(
            host="MySQL IP주소",
            user="사용자",
            password="암호",
            database="데이터베이스"
            port=3306
        )
        print('인터넷 연결중...')

        # 접속정보
        return returnSQL
    except Exception as e:
        # print(e)
        print('인터넷 연결이 끊겼습니다.')
        sys.exit(0)


# 사용법
if __name__ == '__main__':
    DB = DBConnect()

    computer = {
        'CPU': 'AMD Ryzen 9 5950X 16-Core Processor',
        'MainBoard': {'제조사': 'ASUSTeK COMPUTER INC.', '모델': 'ROG STRIX B550-XE GAMING WIFI'},
        'RAM': {'용량': [32.0, 32.0, 32.0, 32.0], '총용량': 128.0},
        'GPU': ['NVIDIA GeForce RTX 3080', 'NVIDIA GeForce RTX 2080 Ti']
    }

    for key, value in DB.mySQLINSERTQuery('렌탈PC', computer, True).items():
        print(key, ":", value)

 

2024/01/15


# DB관련 클래스
class SQL():
    def __init__(self, host, user, password, db, port=3306, charset='utf8'):
        self.conn = pymysql.connect(host=host, user=user, password=password, db=db,
                                    port=port, charset=charset, cursorclass=pymysql.cursors.DictCursor)  # 접속정보

        self.cur = self.conn.cursor()  # 커서생성

    # Mysql 데이터 타입 체크
    def _queryDataTypeCheck(self, data) -> str:
        value = ''

        e = deepcopy(data)
        # 리스트 타입일 경우
        if type(data) is list:
            e = deepcopy(str(data).replace(', ', ','))  # 데이터 최적화(띄어쓰기 제거)
        # 딕셔너리 타입일 경우
        elif type(data) is dict:
            e = deepcopy(
                json.dumps(
                    data,
                    ensure_ascii=False,  # 한글 인코딩
                    separators=(',', ':')  # 데이터 최적화(띄어쓰기 제거)
                )
            )

        if IF.is_number(str(e)):
            value = e
        else:
            value = str(e)

        return value

    # 함수에 따라 리스트 생성
    def _functionTypeMakeList(self, function, data: list) -> list:
        try:
            data = list(data)
        except:
            return False

        return [function(value) for value in data]

    #### SQL(mysql & mariaDB)자동 완성 ####
    # SQL(mysql & mariaDB) Insert 쿼리 자동 완성
    def mySQLINSERTQuery(self, tableName: str, data: dict, showConsole=False):
        dataLen = len(data)
        if dataLen == 0:
            return (False, 'data가 비어있습니다.')

        # 기본 쿼리
        sqlQuery = 'INSERT into `'+tableName+'` (`{}`)values({})'

        # 컬럼명 가져오기
        key = '`, `'.join(list(data.keys()))
        # 값 가져오기
        # value = ','.join(str(e) if self._is_number(str(e)) else '"' + str(e)+'"' for e in data.values())
        try:
            value = self._functionTypeMakeList(
                self._queryDataTypeCheck, data.values()
            )
            if value == False:
                return (False, 'data가 dict가 아닙니다.')
        except:
            return (False, 'data가 dict가 아닙니다.')

        # 쿼리 완성
        sql = sqlQuery.format(key,  ', '.join(['%s'] * dataLen))

        # 해당 쿼리 시도
        returnDATA = self.returnDATA(sql, value, showConsole)

        # DB연결 끊킬시
        if not returnDATA['CHECK'] and "2006" in returnDATA['CODE']:
            self.mySQLINSERTQuery(tableName, data, showConsole)

        # 성공시 최종 반환
        return returnDATA

    # SQL(mysql & mariaDB) Select 쿼리 자동 완성
    def mySQLSELECTQuery(self, tableName: str, findData: str = None, showConsole=False):
        # 기본 쿼리
        if findData:
            sqlQuery = 'SELECT * from `'+tableName + \
                '` where {}'.format(findData)
        else:
            sqlQuery = 'SELECT * FROM `'+tableName+'`'

        # 해당 쿼리 시도
        returnDATA = self.returnDATA(sqlQuery, showConsole=showConsole)

        # 쿼리 성공시
        if returnDATA['CHECK']:
            returnDATA['DATA'] = self.cur.fetchall()

        # DB연결 끊킬시
        elif not returnDATA['CHECK'] and "2006" in returnDATA['CODE']:
            self.mySQLSELECTQuery(tableName, findData, showConsole)

        # 성공시 최종 반환
        return returnDATA

    # SQL(mysql & mariaDB) Update 쿼리 자동 완성
    def mySQLUPDATEQuery(self, tableName: str, upData: dict, findData: dict, showConsole=False):
        sql = 'UPDATE `'+tableName+'` set {} where {}'

        def dataMakeEqual(key):
            return f'`{key}` = %s'

        changes = ''
        if isinstance(upData, dict):
            changes = self._functionTypeMakeList(dataMakeEqual, upData.keys())
            changesData = self._functionTypeMakeList(
                self._queryDataTypeCheck, upData.values()
            )

        where = ''
        if isinstance(findData, dict):
            where = self._functionTypeMakeList(dataMakeEqual, findData.keys())
            whereData = self._functionTypeMakeList(
                self._queryDataTypeCheck, findData.values()
            )

        # upData와 findData가 모두 dict일 경우
        if isinstance(changes, list) and isinstance(where, list):
            sql = sql.format(', '.join(changes), ' and '.join(where))
            # 해당 쿼리 시도
            returnDATA = self.returnDATA(sql, changesData+whereData, showConsole)

        # upData만 dict일 경우
        elif isinstance(changes, list) and isinstance(findData, str):
            sql = sql.format(', '.join(changes), findData)
            sql = self.cur.mogrify(sql, changesData)
            # 해당 쿼리 시도
            returnDATA = self.returnDATA(sql, changesData, showConsole)

        # findData만 dict일 경우
        elif isinstance(where, list) and isinstance(upData, str):
            sql = sql.format(upData, ' and '.join(where))
            # 해당 쿼리 시도
            returnDATA = self.returnDATA(sql, whereData, showConsole)

        # 둘다 str일 경우
        else:
            sql = sql.format(upData, findData)
            # 해당 쿼리 시도
            returnDATA = self.returnDATA(sql, showConsole=showConsole)

        # DB연결 끊킬시
        if not returnDATA['CHECK'] and "2006" in returnDATA['CODE']:
            self.mySQLUPDATEQuery(tableName, upData, findData, showConsole)

        # 성공시 최종 반환
        return returnDATA
    ######################################

    # SQL쿼리 시도후 결과값 반환
    def returnDATA(self, sqlQuery, value=None, showConsole=False):
        returnDATA = {
            'CHECK': False,
            'CODE': '',
        }

        try:
            if value:
                sqlQuery = self.cur.mogrify(sqlQuery, value)  # sql query 생성
            self.cur.execute(sqlQuery)  # 커서로sql 실행
            if showConsole:
                returnDATA['TEXT_DEBUG'] = f'O (성공) : {sqlQuery}'

            self.conn.commit()  # 최종 저장
            returnDATA['CHECK'] = True

        except Exception as e:
            e = str(e)
            text = 'X (실패'

            if showConsole:
                if "for key 'PRIMARY'" in e:
                    text += f' 이미존재) : {sqlQuery}'
                    returnDATA['TEXT_WARNING'] = text
                    return returnDATA

                elif "Incorrect date value" in e:
                    try:
                        errorCode = e.split('.')[-1].split(' ')[0]
                        text += f'{errorCode}값이 없습니다.) : ERROR -> {e}\n\t\t\t{sqlQuery}'
                    except:
                        text += f' 값이 없습니다.) : ERROR -> {e}\n\t\t\t{sqlQuery}'
                else:
                    text += f' ERROR) :  -> {e}\n\t\t\t{sqlQuery}'
                returnDATA['TEXT_ERROR'] = text

            if "2006" in e:
                self.conn.ping(reconnect=True)  # try reconnect
                returnDATA['CODE'] = f"2006 : 재열결 시도함... -> {e}"

        return returnDATA

 

 

확인

이젠 위코드를 응용해서 select, delete, update도 한번 만들어 보시길 바랍니다^^

 

2023/03/12

1. 'json', 'list' type도 가능하게 코드 수정함

[참고 : list type으로 할시 Mysql 타입을 varchar로 할것(아래 이미지 참조)]

Mysql 타입 예)
확인

 

버그또는 수정 사항 있으면 댓글로 부탁드립니다~

공유하기

facebook twitter kakaoTalk naver band kakaostory Copy URL