/*
* The PyHST program is Copyright (C) 2002-2011 of the
* European Synchrotron Radiation Facility (ESRF) and
* Karlsruhe Institute of Technology (KIT).
*
* PyHST is free software: you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* hst is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program. If not, see .
*/
#define _GNU_SOURCE
#include
#include
#include
#include "hw_config.h"
#ifdef HW_HAVE_SCHED_HEADERS
# include
# include
# include
#endif /* HW_HAVE_SCHED_HEADERS */
#include "hw_sched.h"
#ifdef HW_USE_THREADS
# define MUTEX_INIT(ctx, name) \
if (!err) { \
ctx->name##_mutex = g_mutex_new(); \
if (!ctx->name##_mutex) err = 1; \
}
# define MUTEX_FREE(ctx, name) \
if (ctx->name##_mutex) g_mutex_free(ctx->name##_mutex);
# define COND_INIT(ctx, name) \
MUTEX_INIT(ctx, name##_cond) \
if (!err) { \
ctx->name##_cond = g_cond_new(); \
if (!ctx->name##_cond) { \
err = 1; \
MUTEX_FREE(ctx, name##_cond) \
} \
}
# define COND_FREE(ctx, name) \
if (ctx->name##_cond) g_cond_free(ctx->name##_cond); \
MUTEX_FREE(ctx, name##_cond)
#else /* HW_USE_THREADS */
# define MUTEX_INIT(ctx, name)
# define MUTEX_FREE(ctx, name)
# define COND_INIT(ctx, name)
# define COND_FREE(ctx, name)
#endif /* HW_USE_THREADS */
HWRunFunction ppu_run[] = {
(HWRunFunction)NULL
};
static int hw_sched_initialized = 0;
int hw_sched_init(void) {
if (!hw_sched_initialized) {
#ifdef HW_USE_THREADS
g_thread_init(NULL);
#endif /* HW_USE_THREADS */
hw_sched_initialized = 1;
}
return 0;
}
int hw_sched_get_cpu_count(void) {
#ifdef HW_HAVE_SCHED_HEADERS
int err;
int cpu_count;
cpu_set_t mask;
err = sched_getaffinity(getpid(), sizeof(mask), &mask);
if (err) return 1;
# ifdef CPU_COUNT
cpu_count = CPU_COUNT(&mask);
# else
for (cpu_count = 0; cpu_count < CPU_SETSIZE; cpu_count++) {
if (!CPU_ISSET(cpu_count, &mask)) break;
}
# endif
if (!cpu_count) cpu_count = 1;
return cpu_count;
#else /* HW_HAVE_SCHED_HEADERS */
return 1;
#endif /* HW_HAVE_SCHED_HEADERS */
}
HWSched hw_sched_create(int cpu_count) {
int i;
int err = 0;
HWSched ctx;
//hw_sched_init();
ctx = (HWSched)malloc(sizeof(HWSchedS));
if (!ctx) return NULL;
memset(ctx, 0, sizeof(HWSchedS));
ctx->status = 1;
MUTEX_INIT(ctx, sync);
MUTEX_INIT(ctx, data);
COND_INIT(ctx, compl);
COND_INIT(ctx, job);
if (err) {
hw_sched_destroy(ctx);
return NULL;
}
if (!cpu_count) cpu_count = hw_sched_get_cpu_count();
if (cpu_count > HW_MAX_THREADS) cpu_count = HW_MAX_THREADS;
ctx->n_threads = 0;
for (i = 0; i < cpu_count; i++) {
ctx->thread[ctx->n_threads] = hw_thread_create(ctx, ctx->n_threads, NULL, ppu_run, NULL);
if (ctx->thread[ctx->n_threads]) {
#ifndef HW_USE_THREADS
ctx->thread[ctx->n_threads]->status = HW_THREAD_STATUS_STARTING;
#endif /* HW_USE_THREADS */
++ctx->n_threads;
}
}
if (!ctx->n_threads) {
hw_sched_destroy(ctx);
return NULL;
}
return ctx;
}
static int hw_sched_wait_threads(HWSched ctx) {
#ifdef HW_USE_THREADS
int i = 0;
hw_sched_lock(ctx, compl_cond);
while (i < ctx->n_threads) {
for (; i < ctx->n_threads; i++) {
if (ctx->thread[i]->status == HW_THREAD_STATUS_INIT) {
hw_sched_wait(ctx, compl);
break;
}
}
}
hw_sched_unlock(ctx, compl_cond);
#endif /* HW_USE_THREADS */
ctx->started = 1;
return 0;
}
void hw_sched_destroy(HWSched ctx) {
int i;
if (ctx->n_threads > 0) {
if (!ctx->started) {
hw_sched_wait_threads(ctx);
}
ctx->status = 0;
hw_sched_lock(ctx, job_cond);
hw_sched_broadcast(ctx, job);
hw_sched_unlock(ctx, job_cond);
for (i = 0; i < ctx->n_threads; i++) {
hw_thread_destroy(ctx->thread[i]);
}
}
COND_FREE(ctx, job);
COND_FREE(ctx, compl);
MUTEX_FREE(ctx, data);
MUTEX_FREE(ctx, sync);
free(ctx);
}
int hw_sched_set_sequential_mode(HWSched ctx, int *n_blocks, int *cur_block, HWSchedFlags flags) {
ctx->mode = HW_SCHED_MODE_SEQUENTIAL;
ctx->n_blocks = n_blocks;
ctx->cur_block = cur_block;
ctx->flags = flags;
return 0;
}
int hw_sched_get_chunk(HWSched ctx, int thread_id) {
int block;
switch (ctx->mode) {
case HW_SCHED_MODE_PREALLOCATED:
if (ctx->thread[thread_id]->status == HW_THREAD_STATUS_STARTING) {
#ifndef HW_USE_THREADS
ctx->thread[thread_id]->status = HW_THREAD_STATUS_DONE;
#endif /* HW_USE_THREADS */
return thread_id;
} else {
return HW_SCHED_CHUNK_INVALID;
}
case HW_SCHED_MODE_SEQUENTIAL:
if ((ctx->flags&HW_SCHED_FLAG_INIT_CALL)&&(ctx->thread[thread_id]->status == HW_THREAD_STATUS_STARTING)) {
return HW_SCHED_CHUNK_INIT;
}
hw_sched_lock(ctx, data);
block = *ctx->cur_block;
if (block < *ctx->n_blocks) {
*ctx->cur_block = *ctx->cur_block + 1;
} else {
block = HW_SCHED_CHUNK_INVALID;
}
hw_sched_unlock(ctx, data);
if (block == HW_SCHED_CHUNK_INVALID) {
if (((ctx->flags&HW_SCHED_FLAG_FREE_CALL)&&(ctx->thread[thread_id]->status == HW_THREAD_STATUS_RUNNING))) {
ctx->thread[thread_id]->status = HW_THREAD_STATUS_FINISHING;
return HW_SCHED_CHUNK_FREE;
}
if ((ctx->flags&HW_SCHED_FLAG_TERMINATOR_CALL)&&((ctx->thread[thread_id]->status == HW_THREAD_STATUS_RUNNING)||(ctx->thread[thread_id]->status == HW_THREAD_STATUS_FINISHING))) {
int i;
hw_sched_lock(ctx, data);
for (i = 0; i < ctx->n_threads; i++) {
if (thread_id == i) continue;
if ((ctx->thread[i]->status != HW_THREAD_STATUS_DONE)&&(ctx->thread[i]->status != HW_THREAD_STATUS_FINISHING2)&&(ctx->thread[i]->status != HW_THREAD_STATUS_IDLE)) {
break;
}
}
ctx->thread[thread_id]->status = HW_THREAD_STATUS_FINISHING2;
hw_sched_unlock(ctx, data);
if (i == ctx->n_threads) {
return HW_SCHED_CHUNK_TERMINATOR;
}
}
}
return block;
default:
return HW_SCHED_CHUNK_INVALID;
}
return -1;
}
int hw_sched_schedule_task(HWSched ctx, void *appctx, HWEntry entry) {
#ifdef HW_USE_THREADS
if (!ctx->started) {
hw_sched_wait_threads(ctx);
}
#else /* HW_USE_THREADS */
int err;
int i, chunk_id, n_threads;
HWRunFunction run;
HWThread thrctx;
#endif /* HW_USE_THREADS */
ctx->ctx = appctx;
ctx->entry = entry;
switch (ctx->mode) {
case HW_SCHED_MODE_SEQUENTIAL:
*ctx->cur_block = 0;
break;
default:
;
}
#ifdef HW_USE_THREADS
hw_sched_lock(ctx, compl_cond);
hw_sched_lock(ctx, job_cond);
hw_sched_broadcast(ctx, job);
hw_sched_unlock(ctx, job_cond);
#else /* HW_USE_THREADS */
n_threads = ctx->n_threads;
for (i = 0; i < n_threads; i++) {
thrctx = ctx->thread[i];
thrctx->err = 0;
}
i = 0;
thrctx = ctx->thread[i];
chunk_id = hw_sched_get_chunk(ctx, thrctx->thread_id);
while (chunk_id >= 0) {
run = hw_run_entry(thrctx->runs, entry);
err = run(thrctx, thrctx->hwctx, chunk_id, appctx);
if (err) {
thrctx->err = err;
break;
}
if ((++i) == n_threads) i = 0;
thrctx = ctx->thread[i];
chunk_id = hw_sched_get_chunk(ctx, thrctx->thread_id);
}
#endif /* HW_USE_THREADS */
return 0;
}
int hw_sched_wait_task(HWSched ctx) {
int err = 0;
int i = 0, n_threads = ctx->n_threads;
#ifdef HW_USE_THREADS
while (i < ctx->n_threads) {
for (; i < ctx->n_threads; i++) {
if (ctx->thread[i]->status == HW_THREAD_STATUS_DONE) {
ctx->thread[i]->status = HW_THREAD_STATUS_IDLE;
} else {
hw_sched_wait(ctx, compl);
break;
}
}
}
hw_sched_unlock(ctx, compl_cond);
#endif /* HW_USE_THREADS */
for (i = 0; i < n_threads; i++) {
HWThread thrctx = ctx->thread[i];
if (thrctx->err) return err = thrctx->err;
#ifndef HW_USE_THREADS
thrctx->status = HW_THREAD_STATUS_IDLE;
#endif /* HW_USE_THREADS */
}
return err;
}
int hw_sched_execute_task(HWSched ctx, void *appctx, HWEntry entry) {
int err;
err = hw_sched_schedule_task(ctx, appctx, entry);
if (err) return err;
return hw_sched_wait_task(ctx);
}
int hw_sched_schedule_thread_task(HWSched ctx, void *appctx, HWEntry entry) {
int err;
ctx->saved_mode = ctx->mode;
ctx->mode = HW_SCHED_MODE_PREALLOCATED;
err = hw_sched_schedule_task(ctx, appctx, entry);
return err;
}
int hw_sched_wait_thread_task(HWSched ctx) {
int err;
err = hw_sched_wait_task(ctx);
ctx->mode = ctx->saved_mode;
return err;
}
int hw_sched_execute_thread_task(HWSched ctx, void *appctx, HWEntry entry) {
int err;
int saved_mode = ctx->mode;
ctx->mode = HW_SCHED_MODE_PREALLOCATED;
err = hw_sched_execute_task(ctx, appctx, entry);
ctx->mode = saved_mode;
return err;
}