diff options
Diffstat (limited to 'source3/lib/ctdbd_conn.c')
-rw-r--r-- | source3/lib/ctdbd_conn.c | 665 |
1 files changed, 523 insertions, 142 deletions
diff --git a/source3/lib/ctdbd_conn.c b/source3/lib/ctdbd_conn.c index 4e8f090097..84f26e00d3 100644 --- a/source3/lib/ctdbd_conn.c +++ b/source3/lib/ctdbd_conn.c @@ -20,32 +20,61 @@ #include "includes.h" #include "util_tdb.h" +#include "serverid.h" +#include "ctdbd_conn.h" #ifdef CLUSTER_SUPPORT -#include "ctdbd_conn.h" -#include "packet.h" +#include "ctdb_packet.h" #include "messages.h" +/* + * It is not possible to include ctdb.h and tdb_compat.h (included via + * some other include above) without warnings. This fixes those + * warnings. + */ + +#ifdef typesafe_cb +#undef typesafe_cb +#endif + +#ifdef typesafe_cb_preargs +#undef typesafe_cb_preargs +#endif + +#ifdef typesafe_cb_postargs +#undef typesafe_cb_postargs +#endif + /* paths to these include files come from --with-ctdb= in configure */ + #include "ctdb.h" #include "ctdb_private.h" struct ctdbd_connection { struct messaging_context *msg_ctx; - uint32 reqid; - uint32 our_vnn; - uint64 rand_srvid; - struct packet_context *pkt; + uint32_t reqid; + uint32_t our_vnn; + uint64_t rand_srvid; + struct ctdb_packet_context *pkt; struct fd_event *fde; void (*release_ip_handler)(const char *ip_addr, void *private_data); void *release_ip_priv; }; +static uint32_t ctdbd_next_reqid(struct ctdbd_connection *conn) +{ + conn->reqid += 1; + if (conn->reqid == 0) { + conn->reqid += 1; + } + return conn->reqid; +} + static NTSTATUS ctdbd_control(struct ctdbd_connection *conn, - uint32_t vnn, uint32 opcode, - uint64_t srvid, uint32_t flags, TDB_DATA data, + uint32_t vnn, uint32_t opcode, + uint64_t srvid, uint32_t flags, TDB_DATA data, TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int *cstatus); @@ -67,10 +96,10 @@ static void cluster_fatal(const char *why) */ static void ctdb_packet_dump(struct ctdb_req_header *hdr) { - if (DEBUGLEVEL < 10) { + if (DEBUGLEVEL < 11) { return; } - DEBUGADD(10, ("len=%d, magic=%x, vers=%d, gen=%d, op=%d, reqid=%d\n", + DEBUGADD(11, ("len=%d, magic=%x, vers=%d, gen=%d, op=%d, reqid=%d\n", (int)hdr->length, (int)hdr->ctdb_magic, (int)hdr->ctdb_version, (int)hdr->generation, (int)hdr->operation, (int)hdr->reqid)); @@ -79,8 +108,7 @@ static void ctdb_packet_dump(struct ctdb_req_header *hdr) /* * Register a srvid with ctdbd */ -static NTSTATUS register_with_ctdbd(struct ctdbd_connection *conn, - uint64_t srvid) +NTSTATUS register_with_ctdbd(struct ctdbd_connection *conn, uint64_t srvid) { int cstatus; @@ -92,7 +120,7 @@ static NTSTATUS register_with_ctdbd(struct ctdbd_connection *conn, /* * get our vnn from the cluster */ -static NTSTATUS get_cluster_vnn(struct ctdbd_connection *conn, uint32 *vnn) +static NTSTATUS get_cluster_vnn(struct ctdbd_connection *conn, uint32_t *vnn) { int32_t cstatus=-1; NTSTATUS status; @@ -159,7 +187,7 @@ fail: return ret; } -uint32 ctdbd_vnn(const struct ctdbd_connection *conn) +uint32_t ctdbd_vnn(const struct ctdbd_connection *conn) { return conn->our_vnn; } @@ -169,16 +197,13 @@ uint32 ctdbd_vnn(const struct ctdbd_connection *conn) */ static NTSTATUS ctdbd_connect(TALLOC_CTX *mem_ctx, - struct packet_context **presult) + struct ctdb_packet_context **presult) { - struct packet_context *result; + struct ctdb_packet_context *result; const char *sockname = lp_ctdbd_socket(); struct sockaddr_un addr; int fd; - - if (!sockname || !*sockname) { - sockname = CTDB_PATH; - } + socklen_t salen; fd = socket(AF_UNIX, SOCK_STREAM, 0); if (fd == -1) { @@ -190,14 +215,15 @@ static NTSTATUS ctdbd_connect(TALLOC_CTX *mem_ctx, addr.sun_family = AF_UNIX; strncpy(addr.sun_path, sockname, sizeof(addr.sun_path)); - if (sys_connect(fd, (struct sockaddr *)(void *)&addr) == -1) { + salen = sizeof(struct sockaddr_un); + if (connect(fd, (struct sockaddr *)(void *)&addr, salen) == -1) { DEBUG(1, ("connect(%s) failed: %s\n", sockname, strerror(errno))); close(fd); return map_nt_error_from_unix(errno); } - if (!(result = packet_init(mem_ctx, fd))) { + if (!(result = ctdb_packet_init(mem_ctx, fd))) { close(fd); return NT_STATUS_NO_MEMORY; } @@ -214,15 +240,15 @@ static bool ctdb_req_complete(const uint8_t *buf, size_t available, size_t *length, void *private_data) { - uint32 msglen; + uint32_t msglen; if (available < sizeof(msglen)) { return False; } - msglen = *((uint32 *)buf); + msglen = *((const uint32_t *)buf); - DEBUG(10, ("msglen = %d\n", msglen)); + DEBUG(11, ("msglen = %d\n", msglen)); if (msglen < sizeof(struct ctdb_req_header)) { DEBUG(0, ("Got invalid msglen: %d, expected at least %d for " @@ -272,7 +298,7 @@ struct req_pull_state { }; /* - * Pull a ctdb request out of the incoming packet queue + * Pull a ctdb request out of the incoming ctdb_packet queue */ static NTSTATUS ctdb_req_pull(uint8_t *buf, size_t length, @@ -304,7 +330,7 @@ static struct messaging_rec *ctdb_pull_messaging_rec(TALLOC_CTX *mem_ctx, cluster_fatal("got invalid msg length"); } - if (!(result = TALLOC_P(mem_ctx, struct messaging_rec))) { + if (!(result = talloc(mem_ctx, struct messaging_rec))) { DEBUG(0, ("talloc failed\n")); return NULL; } @@ -322,22 +348,22 @@ static struct messaging_rec *ctdb_pull_messaging_rec(TALLOC_CTX *mem_ctx, return NULL; } - if (DEBUGLEVEL >= 10) { - DEBUG(10, ("ctdb_pull_messaging_rec:\n")); + if (DEBUGLEVEL >= 11) { + DEBUG(11, ("ctdb_pull_messaging_rec:\n")); NDR_PRINT_DEBUG(messaging_rec, result); } return result; } -static NTSTATUS ctdb_packet_fd_read_sync(struct packet_context *ctx) +static NTSTATUS ctdb_packet_fd_read_sync(struct ctdb_packet_context *ctx) { int timeout = lp_ctdb_timeout(); if (timeout == 0) { timeout = -1; } - return packet_fd_read_sync(ctx, timeout); + return ctdb_packet_fd_read_sync_timeout(ctx, timeout); } /* @@ -345,52 +371,46 @@ static NTSTATUS ctdb_packet_fd_read_sync(struct packet_context *ctx) * messages that might come in between. */ -static NTSTATUS ctdb_read_req(struct ctdbd_connection *conn, uint32 reqid, +static NTSTATUS ctdb_read_req(struct ctdbd_connection *conn, uint32_t reqid, TALLOC_CTX *mem_ctx, void *result) { struct ctdb_req_header *hdr; struct req_pull_state state; NTSTATUS status; - again: - - status = ctdb_packet_fd_read_sync(conn->pkt); - - if (NT_STATUS_EQUAL(status, NT_STATUS_NETWORK_BUSY)) { - /* EAGAIN */ - goto again; - } else if (NT_STATUS_EQUAL(status, NT_STATUS_RETRY)) { - /* EAGAIN */ - goto again; - } - - if (!NT_STATUS_IS_OK(status)) { - DEBUG(0, ("packet_fd_read failed: %s\n", nt_errstr(status))); - cluster_fatal("ctdbd died\n"); - } - next_pkt: - ZERO_STRUCT(state); state.mem_ctx = mem_ctx; - if (!packet_handler(conn->pkt, ctdb_req_complete, ctdb_req_pull, - &state, &status)) { + while (!ctdb_packet_handler(conn->pkt, ctdb_req_complete, + ctdb_req_pull, &state, &status)) { /* * Not enough data */ - DEBUG(10, ("not enough data from ctdb socket, retrying\n")); - goto again; + status = ctdb_packet_fd_read_sync(conn->pkt); + + if (NT_STATUS_EQUAL(status, NT_STATUS_NETWORK_BUSY)) { + /* EAGAIN */ + continue; + } else if (NT_STATUS_EQUAL(status, NT_STATUS_RETRY)) { + /* EAGAIN */ + continue; + } + + if (!NT_STATUS_IS_OK(status)) { + DEBUG(0, ("packet_fd_read failed: %s\n", nt_errstr(status))); + cluster_fatal("ctdbd died\n"); + } } if (!NT_STATUS_IS_OK(status)) { - DEBUG(0, ("Could not read packet: %s\n", nt_errstr(status))); + DEBUG(0, ("Could not read ctdb_packet: %s\n", nt_errstr(status))); cluster_fatal("ctdbd died\n"); } hdr = (struct ctdb_req_header *)state.req.data; - DEBUG(10, ("Received ctdb packet\n")); + DEBUG(11, ("Received ctdb packet\n")); ctdb_packet_dump(hdr); if (hdr->operation == CTDB_REQ_MESSAGE) { @@ -433,7 +453,7 @@ static NTSTATUS ctdb_read_req(struct ctdbd_connection *conn, uint32 reqid, goto next_pkt; } - msg_state = TALLOC_P(NULL, struct deferred_msg_state); + msg_state = talloc(NULL, struct deferred_msg_state); if (msg_state == NULL) { DEBUG(0, ("talloc failed\n")); TALLOC_FREE(hdr); @@ -471,12 +491,12 @@ static NTSTATUS ctdb_read_req(struct ctdbd_connection *conn, uint32 reqid, goto next_pkt; } - if (hdr->reqid != reqid) { + if ((reqid != 0) && (hdr->reqid != reqid)) { /* we got the wrong reply */ DEBUG(0,("Discarding mismatched ctdb reqid %u should have " "been %u\n", hdr->reqid, reqid)); TALLOC_FREE(hdr); - goto again; + goto next_pkt; } *((void **)result) = talloc_move(mem_ctx, &hdr); @@ -494,7 +514,7 @@ static NTSTATUS ctdbd_init_connection(TALLOC_CTX *mem_ctx, struct ctdbd_connection *conn; NTSTATUS status; - if (!(conn = TALLOC_ZERO_P(mem_ctx, struct ctdbd_connection))) { + if (!(conn = talloc_zero(mem_ctx, struct ctdbd_connection))) { DEBUG(0, ("talloc failed\n")); return NT_STATUS_NO_MEMORY; } @@ -554,7 +574,7 @@ NTSTATUS ctdbd_messaging_connection(TALLOC_CTX *mem_ctx, return status; } - status = register_with_ctdbd(conn, (uint64_t)sys_getpid()); + status = register_with_ctdbd(conn, (uint64_t)getpid()); if (!NT_STATUS_IS_OK(status)) { goto fail; } @@ -584,7 +604,7 @@ struct messaging_context *ctdb_conn_msg_ctx(struct ctdbd_connection *conn) int ctdbd_conn_get_fd(struct ctdbd_connection *conn) { - return packet_get_fd(conn->pkt); + return ctdb_packet_get_fd(conn->pkt); } /* @@ -641,7 +661,7 @@ static NTSTATUS ctdb_handle_message(uint8_t *buf, size_t length, } /* only messages to our pid or the broadcast are valid here */ - if (msg->srvid != sys_getpid() && msg->srvid != MSG_SRVID_SAMBA) { + if (msg->srvid != getpid() && msg->srvid != MSG_SRVID_SAMBA) { DEBUG(0,("Got unexpected message with srvid=%llu\n", (unsigned long long)msg->srvid)); TALLOC_FREE(buf); @@ -675,14 +695,14 @@ static void ctdbd_socket_handler(struct event_context *event_ctx, NTSTATUS status; - status = packet_fd_read(conn->pkt); + status = ctdb_packet_fd_read(conn->pkt); if (!NT_STATUS_IS_OK(status)) { DEBUG(0, ("packet_fd_read failed: %s\n", nt_errstr(status))); cluster_fatal("ctdbd died\n"); } - while (packet_handler(conn->pkt, ctdb_req_complete, + while (ctdb_packet_handler(conn->pkt, ctdb_req_complete, ctdb_handle_message, conn, &status)) { if (!NT_STATUS_IS_OK(status)) { DEBUG(10, ("could not handle incoming message: %s\n", @@ -702,7 +722,7 @@ NTSTATUS ctdbd_register_msg_ctx(struct ctdbd_connection *conn, SMB_ASSERT(conn->fde == NULL); if (!(conn->fde = event_add_fd(msg_ctx->event_ctx, conn, - packet_get_fd(conn->pkt), + ctdb_packet_get_fd(conn->pkt), EVENT_FD_READ, ctdbd_socket_handler, conn))) { @@ -720,32 +740,37 @@ NTSTATUS ctdbd_register_msg_ctx(struct ctdbd_connection *conn, */ NTSTATUS ctdbd_messaging_send(struct ctdbd_connection *conn, - uint32 dst_vnn, uint64 dst_srvid, + uint32_t dst_vnn, uint64_t dst_srvid, struct messaging_rec *msg) { - struct ctdb_req_message r; - TALLOC_CTX *mem_ctx; DATA_BLOB blob; NTSTATUS status; enum ndr_err_code ndr_err; - if (!(mem_ctx = talloc_init("ctdbd_messaging_send"))) { - DEBUG(0, ("talloc failed\n")); - return NT_STATUS_NO_MEMORY; - } - ndr_err = ndr_push_struct_blob( - &blob, mem_ctx, msg, + &blob, talloc_tos(), msg, (ndr_push_flags_fn_t)ndr_push_messaging_rec); if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) { DEBUG(0, ("ndr_push_struct_blob failed: %s\n", ndr_errstr(ndr_err))); - status = ndr_map_error2ntstatus(ndr_err); - goto fail; + return ndr_map_error2ntstatus(ndr_err); } - r.hdr.length = offsetof(struct ctdb_req_message, data) + blob.length; + status = ctdbd_messaging_send_blob(conn, dst_vnn, dst_srvid, + blob.data, blob.length); + TALLOC_FREE(blob.data); + return status; +} + +NTSTATUS ctdbd_messaging_send_blob(struct ctdbd_connection *conn, + uint32_t dst_vnn, uint64_t dst_srvid, + const uint8_t *buf, size_t buflen) +{ + struct ctdb_req_message r; + NTSTATUS status; + + r.hdr.length = offsetof(struct ctdb_req_message, data) + buflen; r.hdr.ctdb_magic = CTDB_MAGIC; r.hdr.ctdb_version = CTDB_VERSION; r.hdr.generation = 1; @@ -754,41 +779,36 @@ NTSTATUS ctdbd_messaging_send(struct ctdbd_connection *conn, r.hdr.srcnode = conn->our_vnn; r.hdr.reqid = 0; r.srvid = dst_srvid; - r.datalen = blob.length; + r.datalen = buflen; DEBUG(10, ("ctdbd_messaging_send: Sending ctdb packet\n")); ctdb_packet_dump(&r.hdr); - status = packet_send( + status = ctdb_packet_send( conn->pkt, 2, data_blob_const(&r, offsetof(struct ctdb_req_message, data)), - blob); + data_blob_const(buf, buflen)); if (!NT_STATUS_IS_OK(status)) { - DEBUG(0, ("packet_send failed: %s\n", nt_errstr(status))); - goto fail; + DEBUG(0, ("ctdb_packet_send failed: %s\n", nt_errstr(status))); + return status; } - status = packet_flush(conn->pkt); - + status = ctdb_packet_flush(conn->pkt); if (!NT_STATUS_IS_OK(status)) { DEBUG(3, ("write to ctdbd failed: %s\n", nt_errstr(status))); cluster_fatal("cluster dispatch daemon msg write error\n"); } - - status = NT_STATUS_OK; - fail: - TALLOC_FREE(mem_ctx); - return status; + return NT_STATUS_OK; } /* * send/recv a generic ctdb control message */ static NTSTATUS ctdbd_control(struct ctdbd_connection *conn, - uint32_t vnn, uint32 opcode, - uint64_t srvid, uint32_t flags, - TDB_DATA data, + uint32_t vnn, uint32_t opcode, + uint64_t srvid, uint32_t flags, + TDB_DATA data, TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int *cstatus) { @@ -797,9 +817,6 @@ static NTSTATUS ctdbd_control(struct ctdbd_connection *conn, struct ctdbd_connection *new_conn = NULL; NTSTATUS status; - /* the samba3 ctdb code can't handle NOREPLY yet */ - flags &= ~CTDB_CTRL_FLAG_NOREPLY; - if (conn == NULL) { status = ctdbd_init_connection(NULL, &new_conn); @@ -817,7 +834,7 @@ static NTSTATUS ctdbd_control(struct ctdbd_connection *conn, req.hdr.ctdb_magic = CTDB_MAGIC; req.hdr.ctdb_version = CTDB_VERSION; req.hdr.operation = CTDB_REQ_CONTROL; - req.hdr.reqid = ++conn->reqid; + req.hdr.reqid = ctdbd_next_reqid(conn); req.hdr.destnode = vnn; req.opcode = opcode; req.srvid = srvid; @@ -827,17 +844,17 @@ static NTSTATUS ctdbd_control(struct ctdbd_connection *conn, DEBUG(10, ("ctdbd_control: Sending ctdb packet\n")); ctdb_packet_dump(&req.hdr); - status = packet_send( + status = ctdb_packet_send( conn->pkt, 2, data_blob_const(&req, offsetof(struct ctdb_req_control, data)), data_blob_const(data.dptr, data.dsize)); if (!NT_STATUS_IS_OK(status)) { - DEBUG(3, ("packet_send failed: %s\n", nt_errstr(status))); + DEBUG(3, ("ctdb_packet_send failed: %s\n", nt_errstr(status))); goto fail; } - status = packet_flush(conn->pkt); + status = ctdb_packet_flush(conn->pkt); if (!NT_STATUS_IS_OK(status)) { DEBUG(3, ("write to ctdbd failed: %s\n", nt_errstr(status))); @@ -887,26 +904,377 @@ static NTSTATUS ctdbd_control(struct ctdbd_connection *conn, /* * see if a remote process exists */ -bool ctdbd_process_exists(struct ctdbd_connection *conn, uint32 vnn, pid_t pid) +bool ctdbd_process_exists(struct ctdbd_connection *conn, uint32_t vnn, pid_t pid) { + struct server_id id; + bool result; + + id.pid = pid; + id.vnn = vnn; + + if (!ctdb_processes_exist(conn, &id, 1, &result)) { + DEBUG(10, ("ctdb_processes_exist failed\n")); + return false; + } + return result; +} + +bool ctdb_processes_exist(struct ctdbd_connection *conn, + const struct server_id *pids, int num_pids, + bool *results) +{ + TALLOC_CTX *frame = talloc_stackframe(); + int i, num_received; NTSTATUS status; - TDB_DATA data; - int32_t cstatus; + uint32_t *reqids; + bool result = false; - data.dptr = (uint8_t*)&pid; - data.dsize = sizeof(pid); + reqids = talloc_array(talloc_tos(), uint32_t, num_pids); + if (reqids == NULL) { + goto fail; + } - status = ctdbd_control(conn, vnn, CTDB_CONTROL_PROCESS_EXISTS, 0, 0, - data, NULL, NULL, &cstatus); + for (i=0; i<num_pids; i++) { + struct ctdb_req_control req; + pid_t pid; + + results[i] = false; + reqids[i] = ctdbd_next_reqid(conn); + + ZERO_STRUCT(req); + + /* + * pids[i].pid is uint64_t, scale down to pid_t which + * is the wire protocol towards ctdb. + */ + pid = pids[i].pid; + + DEBUG(10, ("Requesting PID %d/%d, reqid=%d\n", + (int)pids[i].vnn, (int)pid, + (int)reqids[i])); + + req.hdr.length = offsetof(struct ctdb_req_control, data); + req.hdr.length += sizeof(pid); + req.hdr.ctdb_magic = CTDB_MAGIC; + req.hdr.ctdb_version = CTDB_VERSION; + req.hdr.operation = CTDB_REQ_CONTROL; + req.hdr.reqid = reqids[i]; + req.hdr.destnode = pids[i].vnn; + req.opcode = CTDB_CONTROL_PROCESS_EXISTS; + req.srvid = 0; + req.datalen = sizeof(pid); + req.flags = 0; + + DEBUG(10, ("ctdbd_control: Sending ctdb packet\n")); + ctdb_packet_dump(&req.hdr); + + status = ctdb_packet_send( + conn->pkt, 2, + data_blob_const( + &req, offsetof(struct ctdb_req_control, data)), + data_blob_const(&pid, sizeof(pid))); + if (!NT_STATUS_IS_OK(status)) { + DEBUG(10, ("ctdb_packet_send failed: %s\n", + nt_errstr(status))); + goto fail; + } + } + + status = ctdb_packet_flush(conn->pkt); if (!NT_STATUS_IS_OK(status)) { - DEBUG(0, (__location__ " ctdb_control for process_exists " - "failed\n")); - return False; + DEBUG(10, ("ctdb_packet_flush failed: %s\n", + nt_errstr(status))); + goto fail; + } + + num_received = 0; + + while (num_received < num_pids) { + struct ctdb_reply_control *reply = NULL; + uint32_t reqid; + + status = ctdb_read_req(conn, 0, talloc_tos(), (void *)&reply); + if (!NT_STATUS_IS_OK(status)) { + DEBUG(10, ("ctdb_read_req failed: %s\n", + nt_errstr(status))); + goto fail; + } + + if (reply->hdr.operation != CTDB_REPLY_CONTROL) { + DEBUG(10, ("Received invalid reply\n")); + goto fail; + } + + reqid = reply->hdr.reqid; + + DEBUG(10, ("Received reqid %d\n", (int)reqid)); + + for (i=0; i<num_pids; i++) { + if (reqid == reqids[i]) { + break; + } + } + if (i == num_pids) { + DEBUG(10, ("Received unknown record number %u\n", + (unsigned)reqid)); + goto fail; + } + results[i] = ((reply->status) == 0); + TALLOC_FREE(reply); + num_received += 1; } - return cstatus == 0; + result = true; +fail: + TALLOC_FREE(frame); + return result; +} + +struct ctdb_vnn_list { + uint32_t vnn; + uint32_t reqid; + unsigned num_srvids; + unsigned num_filled; + uint64_t *srvids; + unsigned *pid_indexes; +}; + +/* + * Get a list of all vnns mentioned in a list of + * server_ids. vnn_indexes tells where in the vnns array we have to + * place the pids. + */ +static bool ctdb_collect_vnns(TALLOC_CTX *mem_ctx, + const struct server_id *pids, unsigned num_pids, + struct ctdb_vnn_list **pvnns, + unsigned *pnum_vnns) +{ + struct ctdb_vnn_list *vnns = NULL; + unsigned *vnn_indexes = NULL; + unsigned i, num_vnns = 0; + + vnn_indexes = talloc_array(mem_ctx, unsigned, num_pids); + if (vnn_indexes == NULL) { + DEBUG(1, ("talloc_array failed\n")); + goto fail; + } + + for (i=0; i<num_pids; i++) { + unsigned j; + uint32_t vnn = pids[i].vnn; + + for (j=0; j<num_vnns; j++) { + if (vnn == vnns[j].vnn) { + break; + } + } + vnn_indexes[i] = j; + + if (j < num_vnns) { + /* + * Already in the array + */ + vnns[j].num_srvids += 1; + continue; + } + vnns = talloc_realloc(mem_ctx, vnns, struct ctdb_vnn_list, + num_vnns+1); + if (vnns == NULL) { + DEBUG(1, ("talloc_realloc failed\n")); + goto fail; + } + vnns[num_vnns].vnn = vnn; + vnns[num_vnns].num_srvids = 1; + vnns[num_vnns].num_filled = 0; + num_vnns += 1; + } + for (i=0; i<num_vnns; i++) { + struct ctdb_vnn_list *vnn = &vnns[i]; + + vnn->srvids = talloc_array(vnns, uint64_t, vnn->num_srvids); + if (vnn->srvids == NULL) { + DEBUG(1, ("talloc_array failed\n")); + goto fail; + } + vnn->pid_indexes = talloc_array(vnns, unsigned, + vnn->num_srvids); + if (vnn->pid_indexes == NULL) { + DEBUG(1, ("talloc_array failed\n")); + goto fail; + } + } + for (i=0; i<num_pids; i++) { + struct ctdb_vnn_list *vnn = &vnns[vnn_indexes[i]]; + vnn->srvids[vnn->num_filled] = pids[i].unique_id; + vnn->pid_indexes[vnn->num_filled] = i; + vnn->num_filled += 1; + } + + TALLOC_FREE(vnn_indexes); + *pvnns = vnns; + *pnum_vnns = num_vnns; + return true; +fail: + TALLOC_FREE(vnns); + TALLOC_FREE(vnn_indexes); + return false; +} + +#ifdef HAVE_CTDB_CONTROL_CHECK_SRVIDS_DECL + +bool ctdb_serverids_exist(struct ctdbd_connection *conn, + const struct server_id *pids, unsigned num_pids, + bool *results) +{ + unsigned i, num_received; + NTSTATUS status; + struct ctdb_vnn_list *vnns = NULL; + unsigned num_vnns; + bool result = false; + + if (!ctdb_collect_vnns(talloc_tos(), pids, num_pids, + &vnns, &num_vnns)) { + DEBUG(1, ("ctdb_collect_vnns failed\n")); + goto fail; + } + + for (i=0; i<num_vnns; i++) { + struct ctdb_vnn_list *vnn = &vnns[i]; + struct ctdb_req_control req; + + vnn->reqid = ctdbd_next_reqid(conn); + + ZERO_STRUCT(req); + + DEBUG(10, ("Requesting VNN %d, reqid=%d, num_srvids=%u\n", + (int)vnn->vnn, (int)vnn->reqid, vnn->num_srvids)); + + req.hdr.length = offsetof(struct ctdb_req_control, data); + req.hdr.ctdb_magic = CTDB_MAGIC; + req.hdr.ctdb_version = CTDB_VERSION; + req.hdr.operation = CTDB_REQ_CONTROL; + req.hdr.reqid = vnn->reqid; + req.hdr.destnode = vnn->vnn; + req.opcode = CTDB_CONTROL_CHECK_SRVIDS; + req.srvid = 0; + req.datalen = sizeof(uint64_t) * vnn->num_srvids; + req.hdr.length += req.datalen; + req.flags = 0; + + DEBUG(10, ("ctdbd_control: Sending ctdb packet\n")); + ctdb_packet_dump(&req.hdr); + + status = ctdb_packet_send( + conn->pkt, 2, + data_blob_const( + &req, offsetof(struct ctdb_req_control, + data)), + data_blob_const(vnn->srvids, req.datalen)); + if (!NT_STATUS_IS_OK(status)) { + DEBUG(1, ("ctdb_packet_send failed: %s\n", + nt_errstr(status))); + goto fail; + } + } + + status = ctdb_packet_flush(conn->pkt); + if (!NT_STATUS_IS_OK(status)) { + DEBUG(1, ("ctdb_packet_flush failed: %s\n", + nt_errstr(status))); + goto fail; + } + + num_received = 0; + + while (num_received < num_vnns) { + struct ctdb_reply_control *reply = NULL; + struct ctdb_vnn_list *vnn; + uint32_t reqid; + uint8_t *reply_data; + + status = ctdb_read_req(conn, 0, talloc_tos(), (void *)&reply); + if (!NT_STATUS_IS_OK(status)) { + DEBUG(1, ("ctdb_read_req failed: %s\n", + nt_errstr(status))); + goto fail; + } + + if (reply->hdr.operation != CTDB_REPLY_CONTROL) { + DEBUG(1, ("Received invalid reply %u\n", + (unsigned)reply->hdr.operation)); + goto fail; + } + + reqid = reply->hdr.reqid; + + DEBUG(10, ("Received reqid %d\n", (int)reqid)); + + for (i=0; i<num_vnns; i++) { + if (reqid == vnns[i].reqid) { + break; + } + } + if (i == num_vnns) { + DEBUG(1, ("Received unknown reqid number %u\n", + (unsigned)reqid)); + goto fail; + } + + DEBUG(10, ("Found index %u\n", i)); + + vnn = &vnns[i]; + + DEBUG(10, ("Received vnn %u, vnn->num_srvids %u, datalen %u\n", + (unsigned)vnn->vnn, vnn->num_srvids, + (unsigned)reply->datalen)); + + if (reply->datalen >= ((vnn->num_srvids+7)/8)) { + /* + * Got a real reply + */ + reply_data = reply->data; + } else { + /* + * Got an error reply + */ + DEBUG(5, ("Received short reply len %d, status %u, " + "errorlen %u\n", + (unsigned)reply->datalen, + (unsigned)reply->status, + (unsigned)reply->errorlen)); + dump_data(5, reply->data, reply->errorlen); + + /* + * This will trigger everything set to false + */ + reply_data = NULL; + } + + for (i=0; i<vnn->num_srvids; i++) { + int idx = vnn->pid_indexes[i]; + + if (pids[i].unique_id == + SERVERID_UNIQUE_ID_NOT_TO_VERIFY) { + results[idx] = true; + continue; + } + results[idx] = + (reply_data != NULL) && + ((reply_data[i/8] & (1<<(i%8))) != 0); + } + + TALLOC_FREE(reply); + num_received += 1; + } + + result = true; +fail: + TALLOC_FREE(vnns); + return result; } +#endif /* HAVE_CTDB_CONTROL_CHECK_SRVIDS_DECL */ + /* * Get a db path */ @@ -942,8 +1310,7 @@ NTSTATUS ctdbd_db_attach(struct ctdbd_connection *conn, int32_t cstatus; bool persistent = (tdb_flags & TDB_CLEAR_IF_FIRST) == 0; - data.dptr = (uint8_t*)name; - data.dsize = strlen(name)+1; + data = string_term_tdb_data(name); status = ctdbd_control(conn, CTDB_CURRENT_NODE, persistent @@ -987,7 +1354,7 @@ NTSTATUS ctdbd_db_attach(struct ctdbd_connection *conn, /* * force the migration of a record to this node */ -NTSTATUS ctdbd_migrate(struct ctdbd_connection *conn, uint32 db_id, +NTSTATUS ctdbd_migrate(struct ctdbd_connection *conn, uint32_t db_id, TDB_DATA key) { struct ctdb_req_call req; @@ -1000,7 +1367,7 @@ NTSTATUS ctdbd_migrate(struct ctdbd_connection *conn, uint32 db_id, req.hdr.ctdb_magic = CTDB_MAGIC; req.hdr.ctdb_version = CTDB_VERSION; req.hdr.operation = CTDB_REQ_CALL; - req.hdr.reqid = ++conn->reqid; + req.hdr.reqid = ctdbd_next_reqid(conn); req.flags = CTDB_IMMEDIATE_MIGRATION; req.callid = CTDB_NULL_FUNC; req.db_id = db_id; @@ -1009,17 +1376,17 @@ NTSTATUS ctdbd_migrate(struct ctdbd_connection *conn, uint32 db_id, DEBUG(10, ("ctdbd_migrate: Sending ctdb packet\n")); ctdb_packet_dump(&req.hdr); - status = packet_send( + status = ctdb_packet_send( conn->pkt, 2, data_blob_const(&req, offsetof(struct ctdb_req_call, data)), data_blob_const(key.dptr, key.dsize)); if (!NT_STATUS_IS_OK(status)) { - DEBUG(3, ("packet_send failed: %s\n", nt_errstr(status))); + DEBUG(3, ("ctdb_packet_send failed: %s\n", nt_errstr(status))); return status; } - status = packet_flush(conn->pkt); + status = ctdb_packet_flush(conn->pkt); if (!NT_STATUS_IS_OK(status)) { DEBUG(3, ("write to ctdbd failed: %s\n", nt_errstr(status))); @@ -1047,14 +1414,22 @@ NTSTATUS ctdbd_migrate(struct ctdbd_connection *conn, uint32 db_id, } /* - * remotely fetch a record without locking it or forcing a migration + * remotely fetch a record (read-only) */ -NTSTATUS ctdbd_fetch(struct ctdbd_connection *conn, uint32 db_id, - TDB_DATA key, TALLOC_CTX *mem_ctx, TDB_DATA *data) +NTSTATUS ctdbd_fetch(struct ctdbd_connection *conn, uint32_t db_id, + TDB_DATA key, TALLOC_CTX *mem_ctx, TDB_DATA *data, + bool local_copy) { struct ctdb_req_call req; struct ctdb_reply_call *reply; NTSTATUS status; + uint32_t flags; + +#ifdef HAVE_CTDB_WANT_READONLY_DECL + flags = local_copy ? CTDB_WANT_READONLY : 0; +#else + flags = 0; +#endif ZERO_STRUCT(req); @@ -1062,23 +1437,23 @@ NTSTATUS ctdbd_fetch(struct ctdbd_connection *conn, uint32 db_id, req.hdr.ctdb_magic = CTDB_MAGIC; req.hdr.ctdb_version = CTDB_VERSION; req.hdr.operation = CTDB_REQ_CALL; - req.hdr.reqid = ++conn->reqid; - req.flags = 0; + req.hdr.reqid = ctdbd_next_reqid(conn); + req.flags = flags; req.callid = CTDB_FETCH_FUNC; req.db_id = db_id; req.keylen = key.dsize; - status = packet_send( + status = ctdb_packet_send( conn->pkt, 2, data_blob_const(&req, offsetof(struct ctdb_req_call, data)), data_blob_const(key.dptr, key.dsize)); if (!NT_STATUS_IS_OK(status)) { - DEBUG(3, ("packet_send failed: %s\n", nt_errstr(status))); + DEBUG(3, ("ctdb_packet_send failed: %s\n", nt_errstr(status))); return status; } - status = packet_flush(conn->pkt); + status = ctdb_packet_flush(conn->pkt); if (!NT_STATUS_IS_OK(status)) { DEBUG(3, ("write to ctdbd failed: %s\n", nt_errstr(status))); @@ -1187,7 +1562,7 @@ static NTSTATUS ctdb_traverse_handler(uint8_t *buf, size_t length, everything in-line. */ -NTSTATUS ctdbd_traverse(uint32 db_id, +NTSTATUS ctdbd_traverse(uint32_t db_id, void (*fn)(TDB_DATA key, TDB_DATA data, void *private_data), void *private_data) @@ -1211,7 +1586,7 @@ NTSTATUS ctdbd_traverse(uint32 db_id, t.db_id = db_id; t.srvid = conn->rand_srvid; - t.reqid = ++conn->reqid; + t.reqid = ctdbd_next_reqid(conn); data.dptr = (uint8_t *)&t; data.dsize = sizeof(t); @@ -1241,7 +1616,7 @@ NTSTATUS ctdbd_traverse(uint32 db_id, status = NT_STATUS_OK; - if (packet_handler(conn->pkt, ctdb_req_complete, + if (ctdb_packet_handler(conn->pkt, ctdb_req_complete, ctdb_traverse_handler, &state, &status)) { if (NT_STATUS_EQUAL(status, NT_STATUS_END_OF_FILE)) { @@ -1274,7 +1649,7 @@ NTSTATUS ctdbd_traverse(uint32 db_id, } if (!NT_STATUS_IS_OK(status)) { - DEBUG(0, ("packet_fd_read_sync failed: %s\n", nt_errstr(status))); + DEBUG(0, ("ctdb_packet_fd_read_sync failed: %s\n", nt_errstr(status))); cluster_fatal("ctdbd died\n"); } } @@ -1295,7 +1670,8 @@ static void smbd_ctdb_canonicalize_ip(const struct sockaddr_storage *in, #ifdef HAVE_IPV6 if (in->ss_family == AF_INET6) { const char prefix[12] = { 0,0,0,0,0,0,0,0,0,0,0xff,0xff }; - const struct sockaddr_in6 *in6 = (struct sockaddr_in6 *)in; + const struct sockaddr_in6 *in6 = + (const struct sockaddr_in6 *)in; struct sockaddr_in *out4 = (struct sockaddr_in *)out; if (memcmp(&in6->sin6_addr, prefix, 12) == 0) { memset(out, 0, sizeof(*out)); @@ -1304,7 +1680,7 @@ static void smbd_ctdb_canonicalize_ip(const struct sockaddr_storage *in, #endif out4->sin_family = AF_INET; out4->sin_port = in6->sin6_port; - memcpy(&out4->sin_addr, &in6->sin6_addr.s6_addr32[3], 4); + memcpy(&out4->sin_addr, &in6->sin6_addr.s6_addr[12], 4); } } #endif @@ -1345,15 +1721,15 @@ NTSTATUS ctdbd_register_ips(struct ctdbd_connection *conn, switch (client.ss_family) { case AF_INET: - p4.dest = *(struct sockaddr_in *)(void *)&server; - p4.src = *(struct sockaddr_in *)(void *)&client; + memcpy(&p4.dest, &server, sizeof(p4.dest)); + memcpy(&p4.src, &client, sizeof(p4.src)); data.dptr = (uint8_t *)&p4; data.dsize = sizeof(p4); break; #ifdef HAVE_STRUCT_CTDB_CONTROL_TCP_ADDR case AF_INET6: - p.dest.ip6 = *(struct sockaddr_in6 *)(void *)&server; - p.src.ip6 = *(struct sockaddr_in6 *)(void *)&client; + memcpy(&p.dest.ip6, &server, sizeof(p.dest.ip6)); + memcpy(&p.src.ip6, &client, sizeof(p.src.ip6)); data.dptr = (uint8_t *)&p; data.dsize = sizeof(p); break; @@ -1363,10 +1739,6 @@ NTSTATUS ctdbd_register_ips(struct ctdbd_connection *conn, } conn->release_ip_handler = release_ip_handler; - /* - * store the IP address of the server socket for later - * comparison in release_ip() - */ conn->release_ip_priv = private_data; /* @@ -1399,8 +1771,8 @@ NTSTATUS ctdbd_register_reconfigure(struct ctdbd_connection *conn) /* call a control on the local node */ -NTSTATUS ctdbd_control_local(struct ctdbd_connection *conn, uint32 opcode, - uint64_t srvid, uint32_t flags, TDB_DATA data, +NTSTATUS ctdbd_control_local(struct ctdbd_connection *conn, uint32_t opcode, + uint64_t srvid, uint32_t flags, TDB_DATA data, TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int *cstatus) { @@ -1451,4 +1823,13 @@ NTSTATUS ctdb_unwatch(struct ctdbd_connection *conn) return status; } +#else + +NTSTATUS ctdbd_messaging_send_blob(struct ctdbd_connection *conn, + uint32_t dst_vnn, uint64_t dst_srvid, + const uint8_t *buf, size_t buflen) +{ + return NT_STATUS_NOT_IMPLEMENTED; +} + #endif |