programing

판다 수술 진행 상황 표시기

sourcetip 2022. 12. 26. 22:34
반응형

판다 수술 진행 상황 표시기

1500만 행이 넘는 데이터 프레임에 대해 정기적으로 팬더 작업을 수행하고 있으며, 특정 작업에 대한 진행률 표시기에 액세스하고 싶습니다.

팬더 분할 적용 결합 작업에 대한 텍스트 기반 진행률 표시기가 존재합니까?

예를 들어 다음과 같습니다.

df_users.groupby(['userID', 'requestDate']).apply(feature_rollup)

서 ''는feature_rollup는 다수의 DF 컬럼을 사용하여 다양한 방법으로 새로운 사용자 컬럼을 작성하는 다소 복잡한 기능입니다.큰 데이터 프레임의 경우, 이 조작은 시간이 걸릴 수 있기 때문에, 진척 상황을 갱신하는 텍스트 베이스의 출력을 iPython 노트북에 넣을 수 있는지 알고 싶습니다.

지금까지 Python의 표준 루프 진행률 지표를 시험해 보았지만 판다와 의미 있는 상호작용은 하지 않았습니다.

스플릿 애플리케이션 콤바인의 진행 상황을 알 수 있는 팬더 라이브러리/문서에서 제가 간과한 것이 있기를 바랍니다.할 수 , 「」는 「」를 참조해 주세요.apply함수는 동작하고 있으며, 이러한 서브셋의 완료된 부분으로 진행 상황을 보고합니다.

이것은 아마도 도서관에 추가해야 하는 것인가요?

일반 수요로 인해, 저는 다음과 같이 덧붙였습니다.pandastqdm )pip install "tqdm>=4.9.0"다른 대답과는 달리, 이것은 판다의 속도를 눈에 띄게 늦추지 않을 이다.-- 여기 그 예가 있다.DataFrameGroupBy.progress_apply:

import pandas as pd
import numpy as np
from tqdm import tqdm
# from tqdm.auto import tqdm  # for notebooks

# Create new `pandas` methods which use `tqdm` progress
# (can use tqdm_gui, optional kwargs, etc.)
tqdm.pandas()

df = pd.DataFrame(np.random.randint(0, int(1e8), (10000, 1000)))
# Now you can use `progress_apply` instead of `apply`
df.groupby(0).progress_apply(lambda x: x**2)

이것이 어떻게 동작하는지(및 자신의 콜백을 위해 그것을 수정하는 방법에 관심이 있는 경우), PyPI에 대한 전체 설명서GitHub의 예를 참조하거나 모듈을 Import하여 실행하세요.help(tqdm)기타 지원되는 기능은 다음과 같습니다.map,applymap,aggregate , , , , 입니다.transform.

편집


원래의 질문에 직접 답하려면 , 다음의 순서를 바꿉니다.

df_users.groupby(['userID', 'requestDate']).apply(feature_rollup)

포함:

from tqdm import tqdm
tqdm.pandas()
df_users.groupby(['userID', 'requestDate']).progress_apply(feature_rollup)

참고: tqdm <= v4.8: tqdm 버전이 4.8 미만인 경우tqdm.pandas()하다

from tqdm import tqdm, tqdm_pandas
tqdm_pandas(tqdm())

저처럼 Jupyter/ipython 노트북에서 이를 사용하는 방법에 대한 지원이 필요한 경우 관련 기사에 대한 유용한 가이드와 출처를 알려드리겠습니다.

from tqdm._tqdm_notebook import tqdm_notebook
import pandas as pd
tqdm_notebook.pandas()
df = pd.DataFrame(np.random.randint(0, int(1e8), (10000, 1000)))
df.groupby(0).progress_apply(lambda x: x**2)

「 Import 」의 ._tqdm_notebook참조 기사에서 언급한 바와 같이 개발은 후기 베타 단계에 있습니다.

2021년 11월 12일 갱신

현재 사용하고 있습니다.pandas==1.3.4 ★★★★★★★★★★★★★★★★★」tqdm==4.62.3어떤 버전의 tqdm 작성자가 이 변경을 구현했는지 알 수 없지만 위의 Import 문장은 권장되지 않습니다.대신 다음을 사용합니다.

 from tqdm.notebook import tqdm_notebook

2022년 02월 01일 현재 업데이트 .py와 .ipynb 파일의 Import 문을 단순화할 수 있게 되었습니다.

from tqdm.auto import tqdm
tqdm.pandas()

이는 두 가지 개발 환경 모두에서 예상대로 작동하며 팬더 데이터 프레임이나 기타 tqdm 가치가 있는 반복 가능한 데이터에서도 작동해야 합니다.

2022년 5월 27일 현재 업데이트 SageMaker에서 주피터 노트북을 사용하는 경우 이 조합이 작동합니다.

from tqdm import tqdm
from tqdm.gui import tqdm as tqdm_gui
tqdm.pandas(ncols=50)

Jeff의 답변을 수정하는 것(그리고 이것을 재사용 가능한 기능으로 사용).

def logged_apply(g, func, *args, **kwargs):
    step_percentage = 100. / len(g)
    import sys
    sys.stdout.write('apply progress:   0%')
    sys.stdout.flush()

    def logging_decorator(func):
        def wrapper(*args, **kwargs):
            progress = wrapper.count * step_percentage
            sys.stdout.write('\033[D \033[D' * 4 + format(progress, '3.0f') + '%')
            sys.stdout.flush()
            wrapper.count += 1
            return func(*args, **kwargs)
        wrapper.count = 0
        return wrapper

    logged_func = logging_decorator(func)
    res = g.apply(logged_func, *args, **kwargs)
    sys.stdout.write('\033[D \033[D' * 4 + format(100., '3.0f') + '%' + '\n')
    sys.stdout.flush()
    return res

주의: 진행률 업데이트를 인라인으로 적용합니다.당신의 기능이 오래되면, 이것은 작동하지 않습니다.

In [11]: g = df_users.groupby(['userID', 'requestDate'])

In [12]: f = feature_rollup

In [13]: logged_apply(g, f)
apply progress: 100%
Out[13]: 
...

통상대로 이것을 메서드로서 그룹별 오브젝트에 추가할 수 있습니다.

from pandas.core.groupby import DataFrameGroupBy
DataFrameGroupBy.logged_apply = logged_apply

In [21]: g.logged_apply(f)
apply progress: 100%
Out[21]: 
...

댓글에서 언급했듯이, 이것은 핵심 판다들이 구현에 관심을 가질만한 기능이 아니다.하지만 비단뱀을 사용하면 많은 판다 개체/방법에 사용할 수 있습니다(그렇게 하는 것은 꽤 많은 작업이 필요할 것입니다).단, 이 접근방식을 일반화할 수 있을 것입니다.

커스텀 병행 팬더 적용 코드에 tqdm을 적용하고자 하는 분들을 위해.

(몇 년 동안 병렬화를 위해 일부 라이브러리를 시도했지만, 주로 적용 기능을 위한 100% 병렬화 솔루션을 찾지 못했고, 항상 "수동" 코드를 위해 돌아와야 했습니다.)

df_multi_core: 이 명령어를 호출합니다.수용 가능한 것은 다음과 같습니다.

  1. df 객체
  2. 호출할 함수 이름
  3. 기능을 수행할 수 있는 열의 하위 집합(시간/메모리를 줄이는 데 도움이 됨)
  4. 병렬로 실행할 작업 수(-1 또는 모든 코어에 대해 생략)
  5. df의 함수가 받아들이는 다른 모든 kwargs("축")

_df_split - 이는 실행 중인 모듈에 대해 글로벌하게 배치해야 하는 내부 도우미 함수입니다(Pool.map은 "배치에 의존합니다"). 그렇지 않으면 내부에서 찾을 수 있습니다.

다음은 제 요지의 코드입니다(팬더 기능 테스트를 추가하겠습니다).

import pandas as pd
import numpy as np
import multiprocessing
from functools import partial

def _df_split(tup_arg, **kwargs):
    split_ind, df_split, df_f_name = tup_arg
    return (split_ind, getattr(df_split, df_f_name)(**kwargs))

def df_multi_core(df, df_f_name, subset=None, njobs=-1, **kwargs):
    if njobs == -1:
        njobs = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=njobs)

    try:
        splits = np.array_split(df[subset], njobs)
    except ValueError:
        splits = np.array_split(df, njobs)

    pool_data = [(split_ind, df_split, df_f_name) for split_ind, df_split in enumerate(splits)]
    results = pool.map(partial(_df_split, **kwargs), pool_data)
    pool.close()
    pool.join()
    results = sorted(results, key=lambda x:x[0])
    results = pd.concat([split[1] for split in results])
    return results

Bellow는 tqdm "progress_apply"를 사용한 병렬 적용 테스트 코드입니다.

from time import time
from tqdm import tqdm
tqdm.pandas()

if __name__ == '__main__': 
    sep = '-' * 50

    # tqdm progress_apply test      
    def apply_f(row):
        return row['c1'] + 0.1
    N = 1000000
    np.random.seed(0)
    df = pd.DataFrame({'c1': np.arange(N), 'c2': np.arange(N)})

    print('testing pandas apply on {}\n{}'.format(df.shape, sep))
    t1 = time()
    res = df.progress_apply(apply_f, axis=1)
    t2 = time()
    print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
    print('time for native implementation {}\n{}'.format(round(t2 - t1, 2), sep))

    t3 = time()
    # res = df_multi_core(df=df, df_f_name='apply', subset=['c1'], njobs=-1, func=apply_f, axis=1)
    res = df_multi_core(df=df, df_f_name='progress_apply', subset=['c1'], njobs=-1, func=apply_f, axis=1)
    t4 = time()
    print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
    print('time for multi core implementation {}\n{}'.format(round(t4 - t3, 2), sep))

출력에는 병렬화 없이 실행할 경우 1개의 진행률 바가 표시되고 병렬화로 실행할 경우 코어별 진행률 바가 표시됩니다.약간의 히크업이 있어 나머지 코어가 한 번에 표시되는 경우도 있습니다만, 그 경우에도 코어 단위의 진척 통계(예: 1초당 총 레코드 수)를 얻을 수 있기 때문에 도움이 된다고 생각합니다.

여기에 이미지 설명 입력

@abcdaa님 감사합니다.

데코레이터를 사용하면 쉽게 할 수 있습니다.

from functools import wraps 

def logging_decorator(func):

    @wraps
    def wrapper(*args, **kwargs):
        wrapper.count += 1
        print "The function I modify has been called {0} times(s).".format(
              wrapper.count)
        func(*args, **kwargs)
    wrapper.count = 0
    return wrapper

modified_function = logging_decorator(feature_rollup)

modified_function(인쇄할 때 변경)을 사용합니다.

여기에 사용된 모든 답변pandas.DataFrame.groupby진행 표시줄을 켜려면pandas.Series.apply그룹비 없이 주피터 안에서 할 수 있는 방법은 다음과 같습니다.

from tqdm.notebook import tqdm
tqdm.pandas()


df['<applied-col-name>'] = df['<col-name>'].progress_apply(<your-manipulation-function>)

Jeff의 답변은 총계를 포함하여 변경되어 진행상황과 변수를 추적할 수 있습니다('print_at'이 상당히 높은 경우 실제로 성능이 크게 향상됩니다).

def count_wrapper(func,total, print_at):

    def wrapper(*args):
        wrapper.count += 1
        if wrapper.count % wrapper.print_at == 0:
            clear_output()
            sys.stdout.write( "%d / %d"%(calc_time.count,calc_time.total) )
            sys.stdout.flush()
        return func(*args)
    wrapper.count = 0
    wrapper.total = total
    wrapper.print_at = print_at

    return wrapper

clear_output() 함수는 다음과 같습니다.

from IPython.core.display import clear_output

만약 IPython Andy Hayden의 답변이 없다면 그것 없이 그렇게 할 수 있습니다.

다음과 같은 조작의 경우merge,concat,join진행 표시줄은 Dask를 사용하여 표시할 수 있습니다.

Panda DataFrames를 Dask DataFrames로 변환할 수 있습니다.그런 다음 Dask 진행 표시줄을 표시할 수 있습니다.

다음 코드는 간단한 예를 보여줍니다.

Panda 데이터 프레임 생성 및 변환

import pandas as pd
import numpy as np
from tqdm import tqdm
import dask.dataframe as dd

n = 450000
maxa = 700

df1 = pd.DataFrame({'lkey': np.random.randint(0, maxa, n),'lvalue': np.random.randint(0,int(1e8),n)})
df2 = pd.DataFrame({'rkey': np.random.randint(0, maxa, n),'rvalue': np.random.randint(0, int(1e8),n)})

sd1 = dd.from_pandas(df1, npartitions=3)
sd2 = dd.from_pandas(df2, npartitions=3)

진행 표시줄과 병합

from tqdm.dask import TqdmCallback
from dask.diagnostics import ProgressBar
ProgressBar().register()

with TqdmCallback(desc="compute"):
    sd1.merge(sd2, left_on='lkey', right_on='rkey').compute()

Dask는 같은 작업에 Panda보다 더 빠르고 적은 자원을 필요로 합니다.

  • 판다류74.7 ms
  • 다스크20.2 ms

상세한 것에 대하여는, 다음을 참조해 주세요.

주 1: 이 솔루션은 https://stackoverflow.com/a/56257514/3921758에서 테스트하고 있습니다만, 이 솔루션에서는 동작하지 않습니다.병합 작업을 측정하지 않습니다.

주의 2: "오픈 요청"을 확인했습니다.tqdm팬더에게는 다음과 같습니다.

콘캣 작업의 경우:

df = pd.concat(
    [
        get_data(f)
        for f in tqdm(files, total=len(files))
    ]
)

tqdm은 반복 가능한 값을 반환합니다.

언급URL : https://stackoverflow.com/questions/18603270/progress-indicator-during-pandas-operations

반응형