本文主要介绍使用pandas.read_sql_query()一些示例demo代码。

1、参数查询示例demo代码

def get_classified_songs(self, telegram_id):
    conn = sqlite3.connect(self._DATABASE)
    sql = """
          SELECT
            danceability, energy, loudness, speechiness, acousticness,
            instrumentalness, liveness, valence, tempo, activity
          FROM songs s, users u, song_user su
          WHERE
            activity IS NOT NULL AND
            s.id = su.song_id AND
            su.user_id = u.id AND
            u.telegram_user_id = {}
    """.format(telegram_id)
    resp = pd.read_sql_query(sql, conn)
    conn.close()
    return resp 

源代码:https://github.com/mongonauta/heydjbot/tree/master/demo3/server/database.py

2、创建Dataframe示例demo代码

def build_df(table: str = 'articles',
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None) -> pd.DataFrame:
"""Build dataframe with derived fields."""
with closing(sqlite3.connect(DB_FILE_NAME)) as conn:
articles = pd.read_sql_query(f'select * from {table}', conn)
articles['date'] = pd.to_datetime(articles['publish_date'])
if start_date:
articles = articles.loc[articles['date'] >= start_date]
if end_date:
articles = articles.loc[articles['date'] <= end_date]
articles = articles.replace([None], [''], regex=True)
articles['base_url'] = articles.apply(get_url_base, axis=1)
articles['word_count'] = articles.apply(count_words, axis=1)
return articles

源代码:https://github.com/bmassman/fake_news/blob/master/fake_news/pipeline/build_df.py

3、表连接查询示例demo代码

def SensitivityQuery(self, table, data_set):
	#Returns the number of times an analyte is found at each concentration and the
	#number of repetitions in a particular data set.
	sql_statement = "SELECT COUNT(%s.id) AS Count, %s.Concentration_pg AS Conc_pg, \
						DataSetConcentrations.Repetitions AS Repetitions \
					FROM \
						Sample \
					INNER JOIN %s ON \
						%s.id = Sample.%s_foreignkey \
					INNER JOIN DataSetConcentrations ON \
						DataSetConcentrations.id = Sample.DataSetConcentrations_foreignkey \
					WHERE \
						Sample.DataSetName = '%s' \
					GROUP BY \
						Conc_pg \
					ORDER BY \
						Conc_pg;" % (table, table, table, table, table, data_set)
	return pd.read_sql_query(sql_statement, self.conn) 

4、分组查询示例demo代码

def GetRepsAtEachConcentration(self, analyte_table_lst, data_set):
	df = pd.DataFrame()
	for table in analyte_table_lst:
		sql_statement = "SELECT \
							%s.Concentration_pg AS Conc, COUNT(%s.Concentration_pg) AS %s \
						FROM \
							Sample \
						Inner Join %s ON \
							%s.id = Sample.%s_foreignkey \
						WHERE \
							DataSetName = '%s' \
						GROUP BY 1 \
						ORDER BY 1 ASC;" % (table, table, table, table, table, table, data_set)
		df1 = pd.read_sql_query(sql_statement, self.conn)
		df1.set_index('Conc', inplace=True)
		df = pd.concat([df, df1], axis=1)
	return df

5、导出数据示例demo代码

def import_data_from_psql(user_id):
    """Import data from psql; clean & merge dataframes."""
    library = pd.read_sql_table(
        'library',
        con='postgres:///nextbook',
        columns=['book_id', 'title', 'author', 'pub_year', 'original_pub_year', 'pages'])
    book_subjects = pd.read_sql_table(
        'book_subjects',
        con='postgres:///nextbook')
    subjects = pd.read_sql_table(
        'subjects', con='postgres:///nextbook',
        columns=['subject_id', 'subject'])
    user_ratings = pd.read_sql_query(
        sql=('SELECT book_id, user_id, status, rating FROM user_books WHERE user_id=%s' % user_id),
        con='postgres:///nextbook')
    library = library.merge(user_ratings, how='left', on='book_id')
    library['pages'].fillna(0, inplace=True)
    #merge subject names into book_subjects; drop uninteresting subjects from book_subjects table
    book_subjects = book_subjects.merge(subjects, how='left', on='subject_id')
    delete_values = ["protected daisy", "accessible book", "in library", "overdrive", "large type books", 'ficci\xc3\xb3n juvenil', 'ficci\xc3\xb3n', 'lending library']
    book_subjects = book_subjects[~book_subjects['subject'].isin(delete_values)]
    return [library, book_subjects, subjects] 

源代码:https://github.com/EmmaOnThursday/next-book/blob/master/app/recommendation_creation.py

6、数据库连接示例demo代码

def test_sql_open_close(self):
        #Test if the IO in the database still work if the connection closed
        #between the writing and reading (as in many real situations).
        with tm.ensure_clean() as name:
            conn = self.connect(name)
            sql.to_sql(self.test_frame3, "test_frame3_legacy", conn,
                       flavor="sqlite", index=False)
            conn.close()
            conn = self.connect(name)
            result = sql.read_sql_query("SELECT * FROM test_frame3_legacy;",
                                        conn)
            conn.close()
        tm.assert_frame_equal(self.test_frame3, result) 

源代码:https://github.com/SignalMedia/PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda/blob/master/pandas/io/tests/test_sql.py

7、有关时间类型的示例demo代码

def test_datetime(self):
    df = DataFrame({'A': date_range('2013-01-01 09:00:00', periods=3),
                    'B': np.arange(3.0)})
    df.to_sql('test_datetime', self.conn)
    # with read_table -> type information from schema used
    result = sql.read_sql_table('test_datetime', self.conn)
    result = result.drop('index', axis=1)
    tm.assert_frame_equal(result, df)
    # with read_sql -> no type information -> sqlite has no native
    result = sql.read_sql_query('SELECT * FROM test_datetime', self.conn)
    result = result.drop('index', axis=1)
    if self.flavor == 'sqlite':
        self.assertTrue(isinstance(result.loc[0, 'A'], string_types))
        result['A'] = to_datetime(result['A'])
        tm.assert_frame_equal(result, df)
    else:
        tm.assert_frame_equal(result, df)

源代码:https://github.com/SignalMedia/PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda/blob/master/pandas/io/tests/test_sql.py

8、时间类型处理的示例demo代码

def test_datetime_NaT(self):
    df = DataFrame({'A': date_range('2013-01-01 09:00:00', periods=3),
                    'B': np.arange(3.0)})
    df.loc[1, 'A'] = np.nan
    df.to_sql('test_datetime', self.conn, index=False)
    # with read_table -> type information from schema used
    result = sql.read_sql_table('test_datetime', self.conn)
    tm.assert_frame_equal(result, df)
    # with read_sql -> no type information -> sqlite has no native
    result = sql.read_sql_query('SELECT * FROM test_datetime', self.conn)
    if self.flavor == 'sqlite':
        self.assertTrue(isinstance(result.loc[0, 'A'], string_types))
        result['A'] = to_datetime(result['A'], errors='coerce')
        tm.assert_frame_equal(result, df)
    else:
        tm.assert_frame_equal(result, df) 

源代码:https://github.com/SignalMedia/PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda/blob/master/pandas/io/tests/test_sql.py

9、相关文档:Python Pandas pandas.read_sql_query函数方法的使用